Files
xc/remote/worker.go
Pavel Vorobyov 7ea2ec5330 No parallel pty (#5)
* poller and pty removed for parallel execution
2019-10-01 10:42:10 +03:00

410 lines
9.4 KiB
Go

package remote
import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"sync"
"syscall"
"time"
"github.com/viert/xc/log"
"github.com/viert/xc/passmgr"
)
// RaiseType enum
type RaiseType int
// Raise types
const (
RTNone RaiseType = iota
RTSu
RTSudo
)
// CopyType enum
type CopyType int
// Copy types
const (
CTScp CopyType = iota
CTTar
)
// Task type represents a worker task descriptor
type Task struct {
Hostname string
LocalFilename string
RemoteFilename string
RecursiveCopy bool
Cmd string
Copy CopyType
WG *sync.WaitGroup
}
// MessageType describes a type of worker message
type MessageType int
// Message represents a worker message
type Message struct {
Data []byte
Type MessageType
Hostname string
StatusCode int
}
// Enum of OutputTypes
const (
MTData MessageType = iota
MTDebug
MTCopyFinished
MTExecFinished
)
// Custom error codes
const (
ErrMacOsExit = 32500 + iota
ErrForceStop
ErrCopyFailed
ErrTerminalError
ErrAuthenticationError
ErrCommandStartFailed
)
const (
pollDeadline = 50 * time.Millisecond
bufferSize = 4096
)
// Worker type represents a worker object
type Worker struct {
id int
queue chan *Task
data chan *Message
stop chan bool
busy bool
}
var (
wrkseq = 1
environment = []string{"LC_ALL=en_US.UTF-8", "LANG=en_US.UTF-8"}
// remote expressions to catch
exConnectionClosed = regexp.MustCompile(`([Ss]hared\s+)?[Cc]onnection\s+to\s+.+\s+closed\.?[\n\r]+`)
exPasswdPrompt = regexp.MustCompile(`[Pp]assword`)
exWrongPassword = regexp.MustCompile(`[Ss]orry.+try.+again\.?`)
exPermissionDenied = regexp.MustCompile(`[Pp]ermission\s+denied`)
exLostConnection = regexp.MustCompile(`[Ll]ost\sconnection`)
exEcho = regexp.MustCompile(`^[\n\r]+$`)
)
// NewWorker creates a new worker
func NewWorker(queue chan *Task, data chan *Message) *Worker {
w := &Worker{
id: wrkseq,
queue: queue,
data: data,
stop: make(chan bool, 1),
busy: false,
}
wrkseq++
go w.run()
return w
}
// ID is a worker id getter
func (w *Worker) ID() int {
return w.id
}
func (w *Worker) run() {
var result int
log.Debugf("WRK[%d] Started", w.id)
for task := range w.queue {
// Every task consists of copying part and executing part
// It may contain both or just one of them
// If there are both parts, worker copies data and then runs
// the given command immediately. This behaviour is handy for runscript
// command when the script is being copied to a remote server
// and called right after it.
w.busy = true
log.Debugf("WRK[%d] Got a task for host %s by worker", w.id, task.Hostname)
// does the task have anything to copy?
if task.RemoteFilename != "" && task.LocalFilename != "" {
if task.Copy == CTScp {
result = w.copy(task)
} else {
result = w.tarcopy(task)
}
log.Debugf("WRK[%d] Copy on %s, status=%d", w.id, task.Hostname, result)
w.data <- &Message{nil, MTCopyFinished, task.Hostname, result}
if result != 0 {
log.Debugf("WRK[%d] Copy on %s, result != 0, catching", w.id, task.Hostname)
// if copying failed we can't proceed further with the task if there's anything to run
if task.Cmd != "" {
log.Debugf("WRK[%d] Copy on %s, result != 0, task.Cmd == \"%s\", sending ExecFinished", w.id, task.Hostname, task.Cmd)
w.data <- &Message{nil, MTExecFinished, task.Hostname, ErrCopyFailed}
}
w.busy = false
if task.WG != nil {
task.WG.Done()
}
// next task
continue
}
}
// does the task have anything to run?
if task.Cmd != "" {
log.Debugf("WRK[%d] runcmd(%s) at %s", w.id, task.Cmd, task.Hostname)
result = w.runcmd(task)
w.data <- &Message{nil, MTExecFinished, task.Hostname, result}
}
if task.WG != nil {
task.WG.Done()
}
w.busy = false
}
log.Debugf("WRK[%d] Task queue has closed, worker is exiting", w.id)
}
// ForceStop stops the current task execution and returns true
// if any task were actually executed at the moment of calling ForceStop
func (w *Worker) ForceStop() bool {
if w.busy {
w.stop <- true
return true
}
return false
}
func (w *Worker) forceStopped() bool {
select {
case <-w.stop:
return true
default:
return false
}
}
func (w *Worker) log(format string, args ...interface{}) {
format = fmt.Sprintf("WRK[%d]: %s", w.id, format)
log.Debugf(format, args...)
}
func (w *Worker) processStderr(rd io.ReadCloser, wr io.WriteCloser, finished *bool, task *Task) {
var (
n int
err error
buf []byte
)
buf = make([]byte, bufferSize)
w.log("starting stderr processor for host %s", task.Hostname)
for {
n, err = rd.Read(buf)
if err != nil {
*finished = true
break
}
if n > 0 {
w.data <- &Message{buf[:n], MTDebug, task.Hostname, -1}
chunks := bytes.SplitAfter(buf[:n], []byte{'\n'})
for _, chunk := range chunks {
if currentDebug {
w.log("STDERR CHUNK IN @ %s: %v %s", task.Hostname, chunk, string(chunk))
}
if exConnectionClosed.Match(chunk) {
chunk = exConnectionClosed.ReplaceAll(chunk, []byte{})
w.log("expr connection closed on stderr")
}
if exLostConnection.Match(chunk) {
chunk = exLostConnection.ReplaceAll(chunk, []byte{})
w.log("expr lost connection on stderr")
}
if len(chunk) == 0 {
continue
}
// avoiding passing loop variable further as it's going to change its contents
data := make([]byte, len(chunk))
copy(data, chunk)
if currentDebug {
w.log("STDERR CHUNK OUT @ %s: %v %s", task.Hostname, data, string(data))
}
w.data <- &Message{data, MTData, task.Hostname, -1}
}
}
}
w.log("exiting stderr processor for host %s", task.Hostname)
}
func (w *Worker) processStdout(rd io.ReadCloser, wr io.WriteCloser, finished *bool, task *Task) {
var (
n int
msgCount int
err error
buf []byte
password string
passwordSent bool
shouldSkipEcho bool
)
w.log("starting stdout processor for host %s", task.Hostname)
buf = make([]byte, bufferSize)
msgCount = 0
if currentRaise != RTNone {
passwordSent = false
if currentUsePasswordManager {
password = passmgr.GetPass(task.Hostname)
} else {
password = currentPassword
}
} else {
passwordSent = true
}
execLoop:
for {
n, err = rd.Read(buf)
if err != nil {
*finished = true
break
}
if n > 0 {
w.data <- &Message{buf[:n], MTDebug, task.Hostname, -1}
msgCount++
chunks := bytes.SplitAfter(buf[:n], []byte{'\n'})
for _, chunk := range chunks {
if currentDebug {
w.log("STDOUT CHUNK IN @ %s: %v %s", task.Hostname, chunk, string(chunk))
}
// Trying to find Password prompt in first 5 chunks of data from server
if msgCount < 10 {
if !passwordSent && exPasswdPrompt.Match(chunk) {
w.log("sending password for %s, msgCount=%d", task.Hostname, msgCount)
_, err := wr.Write([]byte(password + "\n"))
if err != nil {
w.log("error sending password: %v", err)
}
passwordSent = true
shouldSkipEcho = true
continue
}
}
if shouldSkipEcho && exEcho.Match(chunk) {
shouldSkipEcho = false
continue
}
if passwordSent && exWrongPassword.Match(chunk) {
w.data <- &Message{[]byte("sudo: Authentication failure\n"), MTData, task.Hostname, -1}
*finished = true
break execLoop
}
if len(chunk) == 0 {
continue
}
// avoiding passing loop variable further as it's going to change its contents
data := make([]byte, len(chunk))
copy(data, chunk)
if currentDebug {
w.log("STDOUT CHUNK OUT @ %s: %v %s", task.Hostname, data, string(data))
}
w.data <- &Message{data, MTData, task.Hostname, -1}
}
}
}
w.log("exiting stdout processor for host %s", task.Hostname)
}
func (w *Worker) _run(task *Task, cmd *exec.Cmd) int {
cmd.Env = append(os.Environ(), environment...)
sout, err := cmd.StdoutPipe()
if err != nil {
w.log("error creating stdout pipe: %v", err)
return ErrTerminalError
}
serr, err := cmd.StderrPipe()
if err != nil {
w.log("error creating stderr pipe: %v", err)
w.log("closing stdout pipe, err=%v", sout.Close())
return ErrTerminalError
}
sin, err := cmd.StdinPipe()
if err != nil {
w.log("error creating stdin pipe: %v", err)
w.log("closing stderr pipe, err=%v", serr.Close())
w.log("closing stdout pipe, err=%v", sout.Close())
return ErrTerminalError
}
err = cmd.Start()
if err != nil {
w.log("error starting cmd: %v", err)
w.log("closing stderr pipe, err=%v", serr.Close())
w.log("closing stdout pipe, err=%v", sout.Close())
w.log("closing stdin pipe, err=%v", sin.Close())
return ErrCommandStartFailed
}
stdoutFinished := false
stderrFinished := false
taskForceStopped := false
go w.processStdout(sout, sin, &stdoutFinished, task)
go w.processStderr(serr, sin, &stderrFinished, task)
for !(stdoutFinished && stderrFinished) {
if w.forceStopped() {
taskForceStopped = true
err = cmd.Process.Kill()
if err != nil {
w.log("error killing process: %v", err)
}
break
}
time.Sleep(pollDeadline)
}
exitCode := 0
w.log("out of waitloop running cmd.Wait to cleanup")
err = cmd.Wait()
if taskForceStopped {
return ErrForceStop
}
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
ws := exitErr.Sys().(syscall.WaitStatus)
exitCode = ws.ExitStatus()
} else {
// MacOS hack
exitCode = ErrMacOsExit
}
}
w.log("Task on %s exit coded is %d", task.Hostname, exitCode)
return exitCode
}