From 7ea2ec53302cb2e3d3c9024ba9951f490e5c3e61 Mon Sep 17 00:00:00 2001 From: Pavel Vorobyov Date: Tue, 1 Oct 2019 10:42:10 +0300 Subject: [PATCH] No parallel pty (#5) * poller and pty removed for parallel execution --- go.mod | 2 +- go.sum | 17 +--- remote/commands.go | 75 +++++++++++++++ remote/copy.go | 145 ----------------------------- remote/remote.go | 4 +- remote/runcmd.go | 160 -------------------------------- remote/serial.go | 35 ++++--- remote/ssh.go | 55 ----------- remote/worker.go | 221 +++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 320 insertions(+), 394 deletions(-) create mode 100644 remote/commands.go delete mode 100644 remote/copy.go delete mode 100644 remote/runcmd.go diff --git a/go.mod b/go.mod index 4586340..c95e485 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( golang.org/x/crypto v0.0.0-20190926180335-cea2066c6411 golang.org/x/net v0.0.0-20190926025831-c00fd9afed17 // indirect golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect - golang.org/x/sys v0.0.0-20190927073244-c990c680b611 // indirect + golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a golang.org/x/text v0.3.2 // indirect golang.org/x/tools v0.0.0-20190927052746-69890759d905 // indirect gopkg.in/cheggaaa/pb.v1 v1.0.28 diff --git a/go.sum b/go.sum index de605d6..dac5c09 100644 --- a/go.sum +++ b/go.sum @@ -1,24 +1,17 @@ -github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= -github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/creack/pty v1.1.7 h1:6pwm8kMQKCmgUg0ZHTm5+/YvRK0s3THD/28+T6/kk4A= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9 h1:uDmaGzcdjhF4i/plgjmEsriH11Y0o7RKapEf/LDaM3w= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 h1:zN2lZNZRflqFyxVaTIU61KNKQ9C0055u9CAfpmqUvo4= github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3/go.mod h1:nPpo7qLxd6XL3hWJG/O60sR8ZKfMCiIoNap5GvD12KU= github.com/kr/pty v1.1.8 h1:AkaSdXYQOWeaO3neb8EM634ahkXXe3jYbVh/F9lq+GI= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= -github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= -github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= @@ -31,8 +24,6 @@ github.com/viert/properties v0.0.0-20190120163359-e72631698e82/go.mod h1:f8oD3Ns github.com/viert/sekwence v0.0.0-20190110111110-24bab1ce82a0 h1:jH3SJmLOQEMH8EIjbl2uV7rn1mxWlW/RRih09/+fdUU= github.com/viert/sekwence v0.0.0-20190110111110-24bab1ce82a0/go.mod h1:zPZmp3wodzVxymq5GjioSuamAO/vVE2zh2iTsjTA3Z0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M= -golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= golang.org/x/crypto v0.0.0-20190926180335-cea2066c6411 h1:kuW9k4QvBJpRjC3rxEytsfIYPs8oGY3Jw7iR36h0FIY= golang.org/x/crypto v0.0.0-20190926180335-cea2066c6411/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -43,14 +34,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190924062700-2aa67d56cdd7 h1:9Vs0Vm0p/0tnWLBWn79aav6fpcxKjBZbd21Lhxzit4k= -golang.org/x/sys v0.0.0-20190924062700-2aa67d56cdd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe h1:6fAMxZRR6sl1Uq8U61gxU+kPTs2tR8uOySCbBP7BN/M= -golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190927073244-c990c680b611 h1:q9u40nxWT5zRClI/uU9dHCiYGottAg6Nzz4YUQyHxdA= -golang.org/x/sys v0.0.0-20190927073244-c990c680b611/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/remote/commands.go b/remote/commands.go new file mode 100644 index 0000000..df4cf24 --- /dev/null +++ b/remote/commands.go @@ -0,0 +1,75 @@ +package remote + +import ( + "fmt" + "os/exec" + "strings" + + "github.com/viert/xc/log" +) + +func (w *Worker) copy(task *Task) int { + cmd := createSCPCmd(task.Hostname, task.LocalFilename, task.RemoteFilename, task.RecursiveCopy) + return w._run(task, cmd) +} + +func (w *Worker) runcmd(task *Task) int { + cmd := createSSHCmd(task.Hostname, task.Cmd) + return w._run(task, cmd) +} + +func (w *Worker) tarcopy(task *Task) int { + cmd := createTarCopyCmd(task.Hostname, task.LocalFilename, task.RemoteFilename) + return w._run(task, cmd) +} + +func createTarCopyCmd(host string, local string, remote string) *exec.Cmd { + if remote == "" || remote == local { + remote = "." + } + options := strings.Join(sshOpts(), " ") + sshCmd := fmt.Sprintf("ssh -l %s %s %s", currentUser, options, host) + tarCmd := fmt.Sprintf("tar c %s | %s tar x -C %s", local, sshCmd, remote) + params := []string{"-c", tarCmd} + log.Debugf("Created command bash %v", params) + return exec.Command("bash", params...) +} + +func createSCPCmd(host string, local string, remote string, recursive bool) *exec.Cmd { + params := []string{} + if recursive { + params = []string{"-r"} + } + params = append(params, sshOpts()...) + remoteExpr := fmt.Sprintf("%s@%s:%s", currentUser, host, remote) + params = append(params, local, remoteExpr) + log.Debugf("Created command scp %v", params) + return exec.Command("scp", params...) +} + +func createSSHCmd(host string, argv string) *exec.Cmd { + params := []string{ + "-tt", + "-l", + currentUser, + } + params = append(params, sshOpts()...) + params = append(params, host) + params = append(params, getInterpreter()...) + if argv != "" { + params = append(params, "-c", argv) + } + log.Debugf("Created command ssh %v", params) + return exec.Command("ssh", params...) +} + +func getInterpreter() []string { + switch currentRaise { + case RTSudo: + return strings.Split(sudoInterpreter, " ") + case RTSu: + return strings.Split(suInterpreter, " ") + default: + return strings.Split(noneInterpreter, " ") + } +} diff --git a/remote/copy.go b/remote/copy.go deleted file mode 100644 index 06e712f..0000000 --- a/remote/copy.go +++ /dev/null @@ -1,145 +0,0 @@ -package remote - -import ( - "os" - "os/exec" - "syscall" - "time" - - "github.com/kr/pty" - "github.com/npat-efault/poller" - "github.com/viert/xc/log" -) - -func (w *Worker) tarcopy(task *Task) int { - var err error - var n int - - cmd := createTarCopyCmd(task.Hostname, task.LocalFilename, task.RemoteFilename) - cmd.Env = append(os.Environ(), environment...) - ptmx, err := pty.Start(cmd) - if err != nil { - return ErrTerminalError - } - defer ptmx.Close() - - fd, err := poller.NewFD(int(ptmx.Fd())) - if err != nil { - return ErrTerminalError - } - defer fd.Close() - - buf := make([]byte, bufferSize) - taskForceStopped := false - for { - if w.forceStopped() { - taskForceStopped = true - break - } - fd.SetReadDeadline(time.Now().Add(pollDeadline)) - n, err = fd.Read(buf) - if err != nil { - if err != poller.ErrTimeout { - // EOF, done - break - } else { - continue - } - } - if n == 0 { - continue - } - w.data <- &Message{buf[:n], MTData, task.Hostname, 0} - buf = make([]byte, bufferSize) - } - - exitCode := 0 - if taskForceStopped { - cmd.Process.Kill() - exitCode = ErrForceStop - log.Debugf("WRK[%d]: Task on %s was force stopped", w.id, task.Hostname) - } - - err = cmd.Wait() - - if !taskForceStopped { - if err != nil { - if exitErr, ok := err.(*exec.ExitError); ok { - ws := exitErr.Sys().(syscall.WaitStatus) - exitCode = ws.ExitStatus() - } else { - // MacOS hack - exitCode = ErrMacOsExit - } - } - log.Debugf("WRK[%d]: Task on %s exit code is %d", w.id, task.Hostname, exitCode) - } - return exitCode -} - -func (w *Worker) copy(task *Task) int { - var err error - var n int - - cmd := createSCPCmd(task.Hostname, task.LocalFilename, task.RemoteFilename, task.RecursiveCopy) - cmd.Env = append(os.Environ(), environment...) - - ptmx, err := pty.Start(cmd) - if err != nil { - return ErrTerminalError - } - defer ptmx.Close() - - fd, err := poller.NewFD(int(ptmx.Fd())) - if err != nil { - return ErrTerminalError - } - defer fd.Close() - - buf := make([]byte, bufferSize) - taskForceStopped := false - for { - if w.forceStopped() { - taskForceStopped = true - break - } - fd.SetReadDeadline(time.Now().Add(pollDeadline)) - n, err = fd.Read(buf) - if err != nil { - if err != poller.ErrTimeout { - // EOF, done - break - } else { - continue - } - } - if n == 0 { - continue - } - w.data <- &Message{buf[:n], MTDebug, task.Hostname, 0} - buf = make([]byte, bufferSize) - } - - exitCode := 0 - if taskForceStopped { - cmd.Process.Kill() - exitCode = ErrForceStop - log.Debugf("WRK[%d]: Task on %s was force stopped", w.id, task.Hostname) - } - - err = cmd.Wait() - - if !taskForceStopped { - if err != nil { - if exitErr, ok := err.(*exec.ExitError); ok { - ws := exitErr.Sys().(syscall.WaitStatus) - exitCode = ws.ExitStatus() - } else { - // MacOS hack - exitCode = ErrMacOsExit - } - } - log.Debugf("WRK[%d]: Task on %s exit code is %d", w.id, task.Hostname, exitCode) - } - return exitCode -} diff --git a/remote/remote.go b/remote/remote.go index 588e6d4..9cb9b78 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -22,7 +22,7 @@ var ( currentRemoteTmpdir string currentDebug bool outputFile *os.File - ptyLock *sync.Mutex + poolLock *sync.Mutex noneInterpreter string suInterpreter string @@ -32,7 +32,7 @@ var ( // Initialize initializes new execution pool func Initialize(numThreads int, username string) { pool = NewPool(numThreads) - ptyLock = new(sync.Mutex) + poolLock = new(sync.Mutex) SetUser(username) SetPassword("") SetRaise(RTNone) diff --git a/remote/runcmd.go b/remote/runcmd.go deleted file mode 100644 index 67ea525..0000000 --- a/remote/runcmd.go +++ /dev/null @@ -1,160 +0,0 @@ -package remote - -import ( - "bytes" - "os" - "os/exec" - "syscall" - "time" - - "github.com/kr/pty" - "github.com/npat-efault/poller" - "github.com/viert/xc/log" - "github.com/viert/xc/passmgr" -) - -func (w *Worker) runcmd(task *Task) int { - var ( - err error - n int - password string - passwordSent bool - ptmx *os.File - fd *poller.FD - ) - - cmd := createSSHCmd(task.Hostname, task.Cmd) - cmd.Env = append(os.Environ(), environment...) - - // threadsafe acquiring necessary file descriptors - ptyLock.Lock() - ptmx, err = pty.Start(cmd) - if err != nil { - log.Debugf("WRK[%d]: Error creating ptmx: %v", w.id, err) - ptyLock.Unlock() - return ErrTerminalError - } - defer ptmx.Close() - - fd, err = poller.NewFD(int(ptmx.Fd())) - if err != nil { - log.Debugf("WRK[%d]: Error creating poller FD: %v", w.id, err) - ptyLock.Unlock() - return ErrTerminalError - } - defer fd.Close() - ptyLock.Unlock() - // threadsafe acquiring necessary file descriptors ends - - buf := make([]byte, bufferSize) - taskForceStopped := false - shouldSkipEcho := false - msgCount := 0 - - if currentRaise != RTNone { - passwordSent = false - if currentUsePasswordManager { - password = passmgr.GetPass(task.Hostname) - } else { - password = currentPassword - } - } else { - passwordSent = true - } - -execLoop: - for { - if w.forceStopped() { - taskForceStopped = true - break - } - - fd.SetReadDeadline(time.Now().Add(pollDeadline)) - n, err = fd.Read(buf) - if err != nil { - if err != poller.ErrTimeout { - // EOF, done - log.Debugf("WRK[%d]: error reading process output: %v", w.id, err) - break - } else { - continue - } - } - - if n == 0 { - continue - } - - w.data <- &Message{buf[:n], MTDebug, task.Hostname, -1} - msgCount++ - - chunks := bytes.SplitAfter(buf[:n], []byte{'\n'}) - for _, chunk := range chunks { - // Trying to find Password prompt in first 5 chunks of data from server - if msgCount < 5 { - if !passwordSent && exPasswdPrompt.Match(chunk) { - _, err := ptmx.Write([]byte(password + "\n")) - if err != nil { - log.Debugf("WRK[%d]: Error sending password: %v", w.id, err) - } - passwordSent = true - shouldSkipEcho = true - continue - } - if shouldSkipEcho && exEcho.Match(chunk) { - shouldSkipEcho = false - continue - } - if passwordSent && exWrongPassword.Match(chunk) { - w.data <- &Message{[]byte("sudo: Authentication failure\n"), MTData, task.Hostname, -1} - taskForceStopped = true - break execLoop - } - - } - - if len(chunk) == 0 { - continue - } - - if exConnectionClosed.Match(chunk) { - continue - } - - if exLostConnection.Match(chunk) { - continue - } - - // avoiding passing loop variable further as it's going to change its contents - data := make([]byte, len(chunk)) - copy(data, chunk) - w.data <- &Message{data, MTData, task.Hostname, -1} - } - } - - exitCode := 0 - if taskForceStopped { - err = cmd.Process.Kill() - if err != nil { - log.Debugf("WRK[%d]: Error killing the process: %v", w.id, err) - } - exitCode = ErrForceStop - log.Debugf("WRK[%d]: Task on %s was force stopped", w.id, task.Hostname) - } - - err = cmd.Wait() - - if !taskForceStopped { - if err != nil { - if exitErr, ok := err.(*exec.ExitError); ok { - ws := exitErr.Sys().(syscall.WaitStatus) - exitCode = ws.ExitStatus() - } else { - // MacOS hack - exitCode = ErrMacOsExit - } - } - log.Debugf("WRK[%d]: Task on %s exit code is %d", w.id, task.Hostname, exitCode) - } - return exitCode -} diff --git a/remote/serial.go b/remote/serial.go index 0d4ced6..b761d9d 100644 --- a/remote/serial.go +++ b/remote/serial.go @@ -17,11 +17,6 @@ import ( "golang.org/x/crypto/ssh/terminal" ) -var ( - passwordSent = false - shouldSkipEcho = false -) - func forwardUserInput(in *poller.FD, out *os.File, stopped *bool) { inBuf := make([]byte, bufferSize) // processing stdin @@ -45,35 +40,42 @@ func forwardUserInput(in *poller.FD, out *os.File, stopped *bool) { } } -func interceptProcessOutput(in []byte, ptmx *os.File, password string) (out []byte, err error) { +func interceptProcessOutput(in []byte, ptmx *os.File, password string, passwordSent *bool, shouldSkipEcho *bool) (out []byte, err error) { out = []byte{} err = nil + if currentDebug { + log.Debugf("DATASTREAM: %s", string(in)) + } + if exConnectionClosed.Match(in) { + out = exConnectionClosed.ReplaceAll(in, []byte{}) log.Debug("Connection closed message catched") return } if exLostConnection.Match(in) { + out = exLostConnection.ReplaceAll(in, []byte{}) log.Debug("Lost connection message catched") return } - if !passwordSent && exPasswdPrompt.Match(in) { + if !*passwordSent && exPasswdPrompt.Match(in) { ptmx.Write([]byte(password + "\n")) - passwordSent = true - shouldSkipEcho = true + *passwordSent = true + *shouldSkipEcho = true log.Debug("Password sent") return } - if shouldSkipEcho && exEcho.Match(in) { + if *shouldSkipEcho && exEcho.Match(in) { log.Debug("Echo skipped") - shouldSkipEcho = false + *shouldSkipEcho = false + out = exEcho.ReplaceAll(in, []byte{}) return } - if passwordSent && exWrongPassword.Match(in) { + if *passwordSent && exWrongPassword.Match(in) { log.Debug("Authentication error while raising privileges") err = fmt.Errorf("auth_error") return @@ -91,7 +93,9 @@ func runAtHost(host string, cmd *exec.Cmd, r *ExecResult) { err error password string - stopped = false + passwordSent = false + shouldSkipEcho = false + stopped = false ) password = currentPassword @@ -138,6 +142,7 @@ func runAtHost(host string, cmd *exec.Cmd, r *ExecResult) { r.Codes[host] = ErrTerminalError return } + defer func() { log.Debug("Setting stdin back to blocking mode") si.Close() @@ -156,7 +161,7 @@ func runAtHost(host string, cmd *exec.Cmd, r *ExecResult) { n, err := ptmx.Read(buf) if n > 0 { // TODO random stuff with intercepting and omitting data - data, err := interceptProcessOutput(buf[:n], ptmx, password) + data, err := interceptProcessOutput(buf[:n], ptmx, password, &passwordSent, &shouldSkipEcho) if err != nil { // auth error, can't proceed raise := "su" @@ -188,6 +193,7 @@ func runAtHost(host string, cmd *exec.Cmd, r *ExecResult) { } if err != nil && err != poller.ErrTimeout { + log.Debugf("pty read error: %v", err) stopped = true break } @@ -231,7 +237,6 @@ execLoop: if argv != "" { remoteCmd = fmt.Sprintf("%s.%s.sh", remotePrefix, host) cmd = createSCPCmd(host, local, remoteCmd, false) - log.Debugf("Created SCP command: %v", cmd) signal.Notify(sigs, syscall.SIGINT) err = cmd.Run() signal.Reset() diff --git a/remote/ssh.go b/remote/ssh.go index 306f764..c985cdf 100644 --- a/remote/ssh.go +++ b/remote/ssh.go @@ -2,10 +2,6 @@ package remote import ( "fmt" - "os/exec" - "strings" - - "github.com/viert/xc/log" ) var ( @@ -27,54 +23,3 @@ func sshOpts() (params []string) { } return } - -func createTarCopyCmd(host string, local string, remote string) *exec.Cmd { - if remote == "" || remote == local { - remote = "." - } - options := strings.Join(sshOpts(), " ") - sshCmd := fmt.Sprintf("ssh -l %s %s %s", currentUser, options, host) - tarCmd := fmt.Sprintf("tar c %s | %s tar x -C %s", local, sshCmd, remote) - params := []string{"-c", tarCmd} - log.Debugf("Created command bash %v", params) - return exec.Command("bash", params...) -} - -func createSCPCmd(host string, local string, remote string, recursive bool) *exec.Cmd { - params := []string{} - if recursive { - params = []string{"-r"} - } - params = append(params, sshOpts()...) - remoteExpr := fmt.Sprintf("%s@%s:%s", currentUser, host, remote) - params = append(params, local, remoteExpr) - log.Debugf("Created command scp %v", params) - return exec.Command("scp", params...) -} - -func createSSHCmd(host string, argv string) *exec.Cmd { - params := []string{ - "-tt", - "-l", - currentUser, - } - params = append(params, sshOpts()...) - params = append(params, host) - params = append(params, getInterpreter()...) - if argv != "" { - params = append(params, "-c", argv) - } - log.Debugf("Created command ssh %v", params) - return exec.Command("ssh", params...) -} - -func getInterpreter() []string { - switch currentRaise { - case RTSudo: - return strings.Split(sudoInterpreter, " ") - case RTSu: - return strings.Split(suInterpreter, " ") - default: - return strings.Split(noneInterpreter, " ") - } -} diff --git a/remote/worker.go b/remote/worker.go index 58b9b28..0c5d00a 100644 --- a/remote/worker.go +++ b/remote/worker.go @@ -1,11 +1,18 @@ package remote import ( + "bytes" + "fmt" + "io" + "os" + "os/exec" "regexp" "sync" + "syscall" "time" "github.com/viert/xc/log" + "github.com/viert/xc/passmgr" ) // RaiseType enum @@ -64,6 +71,7 @@ const ( ErrCopyFailed ErrTerminalError ErrAuthenticationError + ErrCommandStartFailed ) const ( @@ -186,3 +194,216 @@ func (w *Worker) forceStopped() bool { return false } } + +func (w *Worker) log(format string, args ...interface{}) { + format = fmt.Sprintf("WRK[%d]: %s", w.id, format) + log.Debugf(format, args...) +} + +func (w *Worker) processStderr(rd io.ReadCloser, wr io.WriteCloser, finished *bool, task *Task) { + var ( + n int + err error + buf []byte + ) + + buf = make([]byte, bufferSize) + w.log("starting stderr processor for host %s", task.Hostname) + for { + n, err = rd.Read(buf) + if err != nil { + *finished = true + break + } + + if n > 0 { + w.data <- &Message{buf[:n], MTDebug, task.Hostname, -1} + chunks := bytes.SplitAfter(buf[:n], []byte{'\n'}) + for _, chunk := range chunks { + if currentDebug { + w.log("STDERR CHUNK IN @ %s: %v %s", task.Hostname, chunk, string(chunk)) + } + + if exConnectionClosed.Match(chunk) { + chunk = exConnectionClosed.ReplaceAll(chunk, []byte{}) + w.log("expr connection closed on stderr") + } + + if exLostConnection.Match(chunk) { + chunk = exLostConnection.ReplaceAll(chunk, []byte{}) + w.log("expr lost connection on stderr") + } + + if len(chunk) == 0 { + continue + } + + // avoiding passing loop variable further as it's going to change its contents + data := make([]byte, len(chunk)) + copy(data, chunk) + if currentDebug { + w.log("STDERR CHUNK OUT @ %s: %v %s", task.Hostname, data, string(data)) + } + w.data <- &Message{data, MTData, task.Hostname, -1} + + } + } + } + w.log("exiting stderr processor for host %s", task.Hostname) +} + +func (w *Worker) processStdout(rd io.ReadCloser, wr io.WriteCloser, finished *bool, task *Task) { + var ( + n int + msgCount int + err error + buf []byte + password string + passwordSent bool + shouldSkipEcho bool + ) + + w.log("starting stdout processor for host %s", task.Hostname) + buf = make([]byte, bufferSize) + msgCount = 0 + + if currentRaise != RTNone { + passwordSent = false + if currentUsePasswordManager { + password = passmgr.GetPass(task.Hostname) + } else { + password = currentPassword + } + } else { + passwordSent = true + } + +execLoop: + for { + n, err = rd.Read(buf) + if err != nil { + *finished = true + break + } + + if n > 0 { + w.data <- &Message{buf[:n], MTDebug, task.Hostname, -1} + msgCount++ + + chunks := bytes.SplitAfter(buf[:n], []byte{'\n'}) + for _, chunk := range chunks { + if currentDebug { + w.log("STDOUT CHUNK IN @ %s: %v %s", task.Hostname, chunk, string(chunk)) + } + // Trying to find Password prompt in first 5 chunks of data from server + if msgCount < 10 { + if !passwordSent && exPasswdPrompt.Match(chunk) { + w.log("sending password for %s, msgCount=%d", task.Hostname, msgCount) + _, err := wr.Write([]byte(password + "\n")) + if err != nil { + w.log("error sending password: %v", err) + } + passwordSent = true + shouldSkipEcho = true + continue + } + } + + if shouldSkipEcho && exEcho.Match(chunk) { + shouldSkipEcho = false + continue + } + if passwordSent && exWrongPassword.Match(chunk) { + w.data <- &Message{[]byte("sudo: Authentication failure\n"), MTData, task.Hostname, -1} + *finished = true + break execLoop + } + + if len(chunk) == 0 { + continue + } + + // avoiding passing loop variable further as it's going to change its contents + data := make([]byte, len(chunk)) + copy(data, chunk) + if currentDebug { + w.log("STDOUT CHUNK OUT @ %s: %v %s", task.Hostname, data, string(data)) + } + w.data <- &Message{data, MTData, task.Hostname, -1} + } + } + } + w.log("exiting stdout processor for host %s", task.Hostname) +} + +func (w *Worker) _run(task *Task, cmd *exec.Cmd) int { + cmd.Env = append(os.Environ(), environment...) + + sout, err := cmd.StdoutPipe() + if err != nil { + w.log("error creating stdout pipe: %v", err) + return ErrTerminalError + } + + serr, err := cmd.StderrPipe() + if err != nil { + w.log("error creating stderr pipe: %v", err) + w.log("closing stdout pipe, err=%v", sout.Close()) + return ErrTerminalError + } + + sin, err := cmd.StdinPipe() + if err != nil { + w.log("error creating stdin pipe: %v", err) + w.log("closing stderr pipe, err=%v", serr.Close()) + w.log("closing stdout pipe, err=%v", sout.Close()) + return ErrTerminalError + } + + err = cmd.Start() + if err != nil { + w.log("error starting cmd: %v", err) + w.log("closing stderr pipe, err=%v", serr.Close()) + w.log("closing stdout pipe, err=%v", sout.Close()) + w.log("closing stdin pipe, err=%v", sin.Close()) + return ErrCommandStartFailed + } + + stdoutFinished := false + stderrFinished := false + taskForceStopped := false + go w.processStdout(sout, sin, &stdoutFinished, task) + go w.processStderr(serr, sin, &stderrFinished, task) + + for !(stdoutFinished && stderrFinished) { + if w.forceStopped() { + taskForceStopped = true + err = cmd.Process.Kill() + if err != nil { + w.log("error killing process: %v", err) + } + break + } + time.Sleep(pollDeadline) + } + + exitCode := 0 + w.log("out of waitloop running cmd.Wait to cleanup") + err = cmd.Wait() + + if taskForceStopped { + return ErrForceStop + } + + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + ws := exitErr.Sys().(syscall.WaitStatus) + exitCode = ws.ExitStatus() + } else { + // MacOS hack + exitCode = ErrMacOsExit + } + } + w.log("Task on %s exit coded is %d", task.Hostname, exitCode) + return exitCode +}