Tardistribute (#2)

* tar started

* tarcopy for distribute
This commit is contained in:
Pavel Vorobyov
2019-09-24 12:34:54 +03:00
committed by GitHub
parent 3ff1f17faf
commit 9f5347e616
5 changed files with 111 additions and 1 deletions

5
.gitignore vendored
View File

@@ -10,3 +10,8 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# sandbox sources
/cmd/sandbox
# logfile
/xc.log

View File

@@ -11,6 +11,72 @@ import (
"github.com/viert/xc/log"
)
func (w *Worker) tarcopy(task *Task) int {
var err error
var n int
cmd := createTarCopyCmd(task.Hostname, task.LocalFilename, task.RemoteFilename)
cmd.Env = append(os.Environ(), environment...)
ptmx, err := pty.Start(cmd)
if err != nil {
return ErrTerminalError
}
defer ptmx.Close()
fd, err := poller.NewFD(int(ptmx.Fd()))
if err != nil {
return ErrTerminalError
}
defer fd.Close()
buf := make([]byte, bufferSize)
taskForceStopped := false
for {
if w.forceStopped() {
taskForceStopped = true
break
}
fd.SetReadDeadline(time.Now().Add(pollDeadline))
n, err = fd.Read(buf)
if err != nil {
if err != poller.ErrTimeout {
// EOF, done
break
} else {
continue
}
}
if n == 0 {
continue
}
w.data <- &Message{buf[:n], MTData, task.Hostname, 0}
buf = make([]byte, bufferSize)
}
exitCode := 0
if taskForceStopped {
cmd.Process.Kill()
exitCode = ErrForceStop
log.Debugf("WRK[%d]: Task on %s was force stopped", w.id, task.Hostname)
}
err = cmd.Wait()
if !taskForceStopped {
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
ws := exitErr.Sys().(syscall.WaitStatus)
exitCode = ws.ExitStatus()
} else {
// MacOS hack
exitCode = ErrMacOsExit
}
}
log.Debugf("WRK[%d]: Task on %s exit code is %d", w.id, task.Hostname, exitCode)
}
return exitCode
}
func (w *Worker) copy(task *Task) int {
var err error
var n int

View File

@@ -1,12 +1,15 @@
package remote
import (
"bytes"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"github.com/viert/xc/log"
"github.com/viert/xc/term"
pb "gopkg.in/cheggaaa/pb.v1"
)
@@ -38,6 +41,7 @@ func Distribute(hosts []string, localFilename string, remoteFilename string, rec
LocalFilename: localFilename,
RemoteFilename: remoteFilename,
RecursiveCopy: recursive,
Copy: CTTar,
Cmd: "",
WG: &wg,
}
@@ -50,6 +54,15 @@ func Distribute(hosts []string, localFilename string, remoteFilename string, rec
select {
case d := <-pool.Data:
switch d.Type {
case MTData:
if !bytes.HasSuffix(d.Data, []byte{'\n'}) {
d.Data = append(d.Data, '\n')
}
if currentPrependHostnames {
fmt.Printf("%s: ", term.Blue(d.Hostname))
}
fmt.Print(string(d.Data))
writeHostOutput(d.Hostname, d.Data)
case MTDebug:
if currentDebug {
log.Debugf("DATASTREAM @ %s\n%v\n[%v]", d.Hostname, d.Data, string(d.Data))

View File

@@ -28,6 +28,18 @@ func sshOpts() (params []string) {
return
}
func createTarCopyCmd(host string, local string, remote string) *exec.Cmd {
if remote == "" || remote == local {
remote = "."
}
options := strings.Join(sshOpts(), " ")
sshCmd := fmt.Sprintf("ssh -l %s %s %s", currentUser, options, host)
tarCmd := fmt.Sprintf("tar c %s | %s tar x -C %s", local, sshCmd, remote)
params := []string{"-c", tarCmd}
log.Debugf("Created command bash %v", params)
return exec.Command("bash", params...)
}
func createSCPCmd(host string, local string, remote string, recursive bool) *exec.Cmd {
params := []string{}
if recursive {

View File

@@ -18,6 +18,15 @@ const (
RTSudo
)
// CopyType enum
type CopyType int
// Copy types
const (
CTScp CopyType = iota
CTTar
)
// Task type represents a worker task descriptor
type Task struct {
Hostname string
@@ -25,6 +34,7 @@ type Task struct {
RemoteFilename string
RecursiveCopy bool
Cmd string
Copy CopyType
WG *sync.WaitGroup
}
@@ -119,7 +129,11 @@ func (w *Worker) run() {
// does the task have anything to copy?
if task.RemoteFilename != "" && task.LocalFilename != "" {
result = w.copy(task)
if task.Copy == CTScp {
result = w.copy(task)
} else {
result = w.tarcopy(task)
}
log.Debugf("WRK[%d] Copy on %s, status=%d", w.id, task.Hostname, result)
w.data <- &Message{nil, MTCopyFinished, task.Hostname, result}
if result != 0 {