From 9f5347e616ab6afc50b8be5ea988bb85bda1d3dd Mon Sep 17 00:00:00 2001 From: Pavel Vorobyov Date: Tue, 24 Sep 2019 12:34:54 +0300 Subject: [PATCH] Tardistribute (#2) * tar started * tarcopy for distribute --- .gitignore | 5 ++++ remote/copy.go | 66 ++++++++++++++++++++++++++++++++++++++++++++ remote/distribute.go | 13 +++++++++ remote/ssh.go | 12 ++++++++ remote/worker.go | 16 ++++++++++- 5 files changed, 111 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index f1c181e..db7b712 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,8 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out + +# sandbox sources +/cmd/sandbox +# logfile +/xc.log diff --git a/remote/copy.go b/remote/copy.go index 002f376..06e712f 100644 --- a/remote/copy.go +++ b/remote/copy.go @@ -11,6 +11,72 @@ import ( "github.com/viert/xc/log" ) +func (w *Worker) tarcopy(task *Task) int { + var err error + var n int + + cmd := createTarCopyCmd(task.Hostname, task.LocalFilename, task.RemoteFilename) + cmd.Env = append(os.Environ(), environment...) + ptmx, err := pty.Start(cmd) + if err != nil { + return ErrTerminalError + } + defer ptmx.Close() + + fd, err := poller.NewFD(int(ptmx.Fd())) + if err != nil { + return ErrTerminalError + } + defer fd.Close() + + buf := make([]byte, bufferSize) + taskForceStopped := false + for { + if w.forceStopped() { + taskForceStopped = true + break + } + fd.SetReadDeadline(time.Now().Add(pollDeadline)) + n, err = fd.Read(buf) + if err != nil { + if err != poller.ErrTimeout { + // EOF, done + break + } else { + continue + } + } + if n == 0 { + continue + } + w.data <- &Message{buf[:n], MTData, task.Hostname, 0} + buf = make([]byte, bufferSize) + } + + exitCode := 0 + if taskForceStopped { + cmd.Process.Kill() + exitCode = ErrForceStop + log.Debugf("WRK[%d]: Task on %s was force stopped", w.id, task.Hostname) + } + + err = cmd.Wait() + + if !taskForceStopped { + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + ws := exitErr.Sys().(syscall.WaitStatus) + exitCode = ws.ExitStatus() + } else { + // MacOS hack + exitCode = ErrMacOsExit + } + } + log.Debugf("WRK[%d]: Task on %s exit code is %d", w.id, task.Hostname, exitCode) + } + return exitCode +} + func (w *Worker) copy(task *Task) int { var err error var n int diff --git a/remote/distribute.go b/remote/distribute.go index e341cf9..6690c53 100644 --- a/remote/distribute.go +++ b/remote/distribute.go @@ -1,12 +1,15 @@ package remote import ( + "bytes" + "fmt" "os" "os/signal" "sync" "syscall" "github.com/viert/xc/log" + "github.com/viert/xc/term" pb "gopkg.in/cheggaaa/pb.v1" ) @@ -38,6 +41,7 @@ func Distribute(hosts []string, localFilename string, remoteFilename string, rec LocalFilename: localFilename, RemoteFilename: remoteFilename, RecursiveCopy: recursive, + Copy: CTTar, Cmd: "", WG: &wg, } @@ -50,6 +54,15 @@ func Distribute(hosts []string, localFilename string, remoteFilename string, rec select { case d := <-pool.Data: switch d.Type { + case MTData: + if !bytes.HasSuffix(d.Data, []byte{'\n'}) { + d.Data = append(d.Data, '\n') + } + if currentPrependHostnames { + fmt.Printf("%s: ", term.Blue(d.Hostname)) + } + fmt.Print(string(d.Data)) + writeHostOutput(d.Hostname, d.Data) case MTDebug: if currentDebug { log.Debugf("DATASTREAM @ %s\n%v\n[%v]", d.Hostname, d.Data, string(d.Data)) diff --git a/remote/ssh.go b/remote/ssh.go index 27e0e6c..306f764 100644 --- a/remote/ssh.go +++ b/remote/ssh.go @@ -28,6 +28,18 @@ func sshOpts() (params []string) { return } +func createTarCopyCmd(host string, local string, remote string) *exec.Cmd { + if remote == "" || remote == local { + remote = "." + } + options := strings.Join(sshOpts(), " ") + sshCmd := fmt.Sprintf("ssh -l %s %s %s", currentUser, options, host) + tarCmd := fmt.Sprintf("tar c %s | %s tar x -C %s", local, sshCmd, remote) + params := []string{"-c", tarCmd} + log.Debugf("Created command bash %v", params) + return exec.Command("bash", params...) +} + func createSCPCmd(host string, local string, remote string, recursive bool) *exec.Cmd { params := []string{} if recursive { diff --git a/remote/worker.go b/remote/worker.go index 14622a0..1a2b84a 100644 --- a/remote/worker.go +++ b/remote/worker.go @@ -18,6 +18,15 @@ const ( RTSudo ) +// CopyType enum +type CopyType int + +// Copy types +const ( + CTScp CopyType = iota + CTTar +) + // Task type represents a worker task descriptor type Task struct { Hostname string @@ -25,6 +34,7 @@ type Task struct { RemoteFilename string RecursiveCopy bool Cmd string + Copy CopyType WG *sync.WaitGroup } @@ -119,7 +129,11 @@ func (w *Worker) run() { // does the task have anything to copy? if task.RemoteFilename != "" && task.LocalFilename != "" { - result = w.copy(task) + if task.Copy == CTScp { + result = w.copy(task) + } else { + result = w.tarcopy(task) + } log.Debugf("WRK[%d] Copy on %s, status=%d", w.id, task.Hostname, result) w.data <- &Message{nil, MTCopyFinished, task.Hostname, result} if result != 0 {