mirror of
https://github.com/kemko/xc.git
synced 2026-01-01 15:55:43 +03:00
410 lines
9.4 KiB
Go
410 lines
9.4 KiB
Go
package remote
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"regexp"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/viert/xc/log"
|
|
"github.com/viert/xc/passmgr"
|
|
)
|
|
|
|
// RaiseType enum
|
|
type RaiseType int
|
|
|
|
// Raise types
|
|
const (
|
|
RTNone RaiseType = iota
|
|
RTSu
|
|
RTSudo
|
|
)
|
|
|
|
// CopyType enum
|
|
type CopyType int
|
|
|
|
// Copy types
|
|
const (
|
|
CTScp CopyType = iota
|
|
CTTar
|
|
)
|
|
|
|
// Task type represents a worker task descriptor
|
|
type Task struct {
|
|
Hostname string
|
|
LocalFilename string
|
|
RemoteFilename string
|
|
RecursiveCopy bool
|
|
Cmd string
|
|
Copy CopyType
|
|
WG *sync.WaitGroup
|
|
}
|
|
|
|
// MessageType describes a type of worker message
|
|
type MessageType int
|
|
|
|
// Message represents a worker message
|
|
type Message struct {
|
|
Data []byte
|
|
Type MessageType
|
|
Hostname string
|
|
StatusCode int
|
|
}
|
|
|
|
// Enum of OutputTypes
|
|
const (
|
|
MTData MessageType = iota
|
|
MTDebug
|
|
MTCopyFinished
|
|
MTExecFinished
|
|
)
|
|
|
|
// Custom error codes
|
|
const (
|
|
ErrMacOsExit = 32500 + iota
|
|
ErrForceStop
|
|
ErrCopyFailed
|
|
ErrTerminalError
|
|
ErrAuthenticationError
|
|
ErrCommandStartFailed
|
|
)
|
|
|
|
const (
|
|
pollDeadline = 50 * time.Millisecond
|
|
bufferSize = 4096
|
|
)
|
|
|
|
// Worker type represents a worker object
|
|
type Worker struct {
|
|
id int
|
|
queue chan *Task
|
|
data chan *Message
|
|
stop chan bool
|
|
busy bool
|
|
}
|
|
|
|
var (
|
|
wrkseq = 1
|
|
environment = []string{"LC_ALL=en_US.UTF-8", "LANG=en_US.UTF-8"}
|
|
|
|
// remote expressions to catch
|
|
exConnectionClosed = regexp.MustCompile(`([Ss]hared\s+)?[Cc]onnection\s+to\s+.+\s+closed\.?[\n\r]+`)
|
|
exPasswdPrompt = regexp.MustCompile(`[Pp]assword`)
|
|
exWrongPassword = regexp.MustCompile(`[Ss]orry.+try.+again\.?`)
|
|
exPermissionDenied = regexp.MustCompile(`[Pp]ermission\s+denied`)
|
|
exLostConnection = regexp.MustCompile(`[Ll]ost\sconnection`)
|
|
exEcho = regexp.MustCompile(`^[\n\r]+$`)
|
|
)
|
|
|
|
// NewWorker creates a new worker
|
|
func NewWorker(queue chan *Task, data chan *Message) *Worker {
|
|
w := &Worker{
|
|
id: wrkseq,
|
|
queue: queue,
|
|
data: data,
|
|
stop: make(chan bool, 1),
|
|
busy: false,
|
|
}
|
|
wrkseq++
|
|
go w.run()
|
|
return w
|
|
}
|
|
|
|
// ID is a worker id getter
|
|
func (w *Worker) ID() int {
|
|
return w.id
|
|
}
|
|
|
|
func (w *Worker) run() {
|
|
var result int
|
|
|
|
log.Debugf("WRK[%d] Started", w.id)
|
|
for task := range w.queue {
|
|
// Every task consists of copying part and executing part
|
|
// It may contain both or just one of them
|
|
// If there are both parts, worker copies data and then runs
|
|
// the given command immediately. This behaviour is handy for runscript
|
|
// command when the script is being copied to a remote server
|
|
// and called right after it.
|
|
|
|
w.busy = true
|
|
log.Debugf("WRK[%d] Got a task for host %s by worker", w.id, task.Hostname)
|
|
|
|
// does the task have anything to copy?
|
|
if task.RemoteFilename != "" && task.LocalFilename != "" {
|
|
if task.Copy == CTScp {
|
|
result = w.copy(task)
|
|
} else {
|
|
result = w.tarcopy(task)
|
|
}
|
|
log.Debugf("WRK[%d] Copy on %s, status=%d", w.id, task.Hostname, result)
|
|
w.data <- &Message{nil, MTCopyFinished, task.Hostname, result}
|
|
if result != 0 {
|
|
log.Debugf("WRK[%d] Copy on %s, result != 0, catching", w.id, task.Hostname)
|
|
// if copying failed we can't proceed further with the task if there's anything to run
|
|
if task.Cmd != "" {
|
|
log.Debugf("WRK[%d] Copy on %s, result != 0, task.Cmd == \"%s\", sending ExecFinished", w.id, task.Hostname, task.Cmd)
|
|
w.data <- &Message{nil, MTExecFinished, task.Hostname, ErrCopyFailed}
|
|
}
|
|
w.busy = false
|
|
if task.WG != nil {
|
|
task.WG.Done()
|
|
}
|
|
// next task
|
|
continue
|
|
}
|
|
}
|
|
|
|
// does the task have anything to run?
|
|
if task.Cmd != "" {
|
|
log.Debugf("WRK[%d] runcmd(%s) at %s", w.id, task.Cmd, task.Hostname)
|
|
result = w.runcmd(task)
|
|
w.data <- &Message{nil, MTExecFinished, task.Hostname, result}
|
|
}
|
|
|
|
if task.WG != nil {
|
|
task.WG.Done()
|
|
}
|
|
|
|
w.busy = false
|
|
}
|
|
log.Debugf("WRK[%d] Task queue has closed, worker is exiting", w.id)
|
|
}
|
|
|
|
// ForceStop stops the current task execution and returns true
|
|
// if any task were actually executed at the moment of calling ForceStop
|
|
func (w *Worker) ForceStop() bool {
|
|
if w.busy {
|
|
w.stop <- true
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (w *Worker) forceStopped() bool {
|
|
select {
|
|
case <-w.stop:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (w *Worker) log(format string, args ...interface{}) {
|
|
format = fmt.Sprintf("WRK[%d]: %s", w.id, format)
|
|
log.Debugf(format, args...)
|
|
}
|
|
|
|
func (w *Worker) processStderr(rd io.ReadCloser, wr io.WriteCloser, finished *bool, task *Task) {
|
|
var (
|
|
n int
|
|
err error
|
|
buf []byte
|
|
)
|
|
|
|
buf = make([]byte, bufferSize)
|
|
w.log("starting stderr processor for host %s", task.Hostname)
|
|
for {
|
|
n, err = rd.Read(buf)
|
|
if err != nil {
|
|
*finished = true
|
|
break
|
|
}
|
|
|
|
if n > 0 {
|
|
w.data <- &Message{buf[:n], MTDebug, task.Hostname, -1}
|
|
chunks := bytes.SplitAfter(buf[:n], []byte{'\n'})
|
|
for _, chunk := range chunks {
|
|
if currentDebug {
|
|
w.log("STDERR CHUNK IN @ %s: %v %s", task.Hostname, chunk, string(chunk))
|
|
}
|
|
|
|
if exConnectionClosed.Match(chunk) {
|
|
chunk = exConnectionClosed.ReplaceAll(chunk, []byte{})
|
|
w.log("expr connection closed on stderr")
|
|
}
|
|
|
|
if exLostConnection.Match(chunk) {
|
|
chunk = exLostConnection.ReplaceAll(chunk, []byte{})
|
|
w.log("expr lost connection on stderr")
|
|
}
|
|
|
|
if len(chunk) == 0 {
|
|
continue
|
|
}
|
|
|
|
// avoiding passing loop variable further as it's going to change its contents
|
|
data := make([]byte, len(chunk))
|
|
copy(data, chunk)
|
|
if currentDebug {
|
|
w.log("STDERR CHUNK OUT @ %s: %v %s", task.Hostname, data, string(data))
|
|
}
|
|
w.data <- &Message{data, MTData, task.Hostname, -1}
|
|
|
|
}
|
|
}
|
|
}
|
|
w.log("exiting stderr processor for host %s", task.Hostname)
|
|
}
|
|
|
|
func (w *Worker) processStdout(rd io.ReadCloser, wr io.WriteCloser, finished *bool, task *Task) {
|
|
var (
|
|
n int
|
|
msgCount int
|
|
err error
|
|
buf []byte
|
|
password string
|
|
passwordSent bool
|
|
shouldSkipEcho bool
|
|
)
|
|
|
|
w.log("starting stdout processor for host %s", task.Hostname)
|
|
buf = make([]byte, bufferSize)
|
|
msgCount = 0
|
|
|
|
if currentRaise != RTNone {
|
|
passwordSent = false
|
|
if currentUsePasswordManager {
|
|
password = passmgr.GetPass(task.Hostname)
|
|
} else {
|
|
password = currentPassword
|
|
}
|
|
} else {
|
|
passwordSent = true
|
|
}
|
|
|
|
execLoop:
|
|
for {
|
|
n, err = rd.Read(buf)
|
|
if err != nil {
|
|
*finished = true
|
|
break
|
|
}
|
|
|
|
if n > 0 {
|
|
w.data <- &Message{buf[:n], MTDebug, task.Hostname, -1}
|
|
msgCount++
|
|
|
|
chunks := bytes.SplitAfter(buf[:n], []byte{'\n'})
|
|
for _, chunk := range chunks {
|
|
if currentDebug {
|
|
w.log("STDOUT CHUNK IN @ %s: %v %s", task.Hostname, chunk, string(chunk))
|
|
}
|
|
// Trying to find Password prompt in first 5 chunks of data from server
|
|
if msgCount < 10 {
|
|
if !passwordSent && exPasswdPrompt.Match(chunk) {
|
|
w.log("sending password for %s, msgCount=%d", task.Hostname, msgCount)
|
|
_, err := wr.Write([]byte(password + "\n"))
|
|
if err != nil {
|
|
w.log("error sending password: %v", err)
|
|
}
|
|
passwordSent = true
|
|
shouldSkipEcho = true
|
|
continue
|
|
}
|
|
}
|
|
|
|
if shouldSkipEcho && exEcho.Match(chunk) {
|
|
shouldSkipEcho = false
|
|
continue
|
|
}
|
|
if passwordSent && exWrongPassword.Match(chunk) {
|
|
w.data <- &Message{[]byte("sudo: Authentication failure\n"), MTData, task.Hostname, -1}
|
|
*finished = true
|
|
break execLoop
|
|
}
|
|
|
|
if len(chunk) == 0 {
|
|
continue
|
|
}
|
|
|
|
// avoiding passing loop variable further as it's going to change its contents
|
|
data := make([]byte, len(chunk))
|
|
copy(data, chunk)
|
|
if currentDebug {
|
|
w.log("STDOUT CHUNK OUT @ %s: %v %s", task.Hostname, data, string(data))
|
|
}
|
|
w.data <- &Message{data, MTData, task.Hostname, -1}
|
|
}
|
|
}
|
|
}
|
|
w.log("exiting stdout processor for host %s", task.Hostname)
|
|
}
|
|
|
|
func (w *Worker) _run(task *Task, cmd *exec.Cmd) int {
|
|
cmd.Env = append(os.Environ(), environment...)
|
|
|
|
sout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
w.log("error creating stdout pipe: %v", err)
|
|
return ErrTerminalError
|
|
}
|
|
|
|
serr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
w.log("error creating stderr pipe: %v", err)
|
|
w.log("closing stdout pipe, err=%v", sout.Close())
|
|
return ErrTerminalError
|
|
}
|
|
|
|
sin, err := cmd.StdinPipe()
|
|
if err != nil {
|
|
w.log("error creating stdin pipe: %v", err)
|
|
w.log("closing stderr pipe, err=%v", serr.Close())
|
|
w.log("closing stdout pipe, err=%v", sout.Close())
|
|
return ErrTerminalError
|
|
}
|
|
|
|
err = cmd.Start()
|
|
if err != nil {
|
|
w.log("error starting cmd: %v", err)
|
|
w.log("closing stderr pipe, err=%v", serr.Close())
|
|
w.log("closing stdout pipe, err=%v", sout.Close())
|
|
w.log("closing stdin pipe, err=%v", sin.Close())
|
|
return ErrCommandStartFailed
|
|
}
|
|
|
|
stdoutFinished := false
|
|
stderrFinished := false
|
|
taskForceStopped := false
|
|
go w.processStdout(sout, sin, &stdoutFinished, task)
|
|
go w.processStderr(serr, sin, &stderrFinished, task)
|
|
|
|
for !(stdoutFinished && stderrFinished) {
|
|
if w.forceStopped() {
|
|
taskForceStopped = true
|
|
err = cmd.Process.Kill()
|
|
if err != nil {
|
|
w.log("error killing process: %v", err)
|
|
}
|
|
break
|
|
}
|
|
time.Sleep(pollDeadline)
|
|
}
|
|
|
|
exitCode := 0
|
|
w.log("out of waitloop running cmd.Wait to cleanup")
|
|
err = cmd.Wait()
|
|
|
|
if taskForceStopped {
|
|
return ErrForceStop
|
|
}
|
|
|
|
if err != nil {
|
|
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
ws := exitErr.Sys().(syscall.WaitStatus)
|
|
exitCode = ws.ExitStatus()
|
|
} else {
|
|
// MacOS hack
|
|
exitCode = ErrMacOsExit
|
|
}
|
|
}
|
|
w.log("Task on %s exit coded is %d", task.Hostname, exitCode)
|
|
return exitCode
|
|
}
|