From 976bfbc41a0d6fd38f6f80e9fbb02164c1382048 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Sun, 28 Apr 2019 17:30:10 -0400 Subject: [PATCH] executors: implement streaming exec Implements streamign exec handling in both executors (i.e. universal and libcontainer). For creation of TTY, some incidental complexity leaked in. The universal executor uses github.com/kr/pty for creation of TTYs. On the other hand, libcontainer expects a console socket and for libcontainer to create the underlying console object on process start. The caller can then use `libcontainer.utils.RecvFd()` to get tty master end. I chose github.com/kr/pty for managing TTYs here. I tried `github.com/containerd/console` package (which is already imported), but the package did not work as expected on macOS. --- drivers/shared/executor/exec_utils.go | 287 ++++++++++++++++++++++ drivers/shared/executor/executor.go | 48 ++++ drivers/shared/executor/executor_linux.go | 49 ++++ drivers/shared/executor/pty_unix.go | 43 ++++ drivers/shared/executor/pty_windows.go | 23 ++ 5 files changed, 450 insertions(+) create mode 100644 drivers/shared/executor/exec_utils.go create mode 100644 drivers/shared/executor/pty_unix.go create mode 100644 drivers/shared/executor/pty_windows.go diff --git a/drivers/shared/executor/exec_utils.go b/drivers/shared/executor/exec_utils.go new file mode 100644 index 000000000..17194d0db --- /dev/null +++ b/drivers/shared/executor/exec_utils.go @@ -0,0 +1,287 @@ +package executor + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + "sync" + "syscall" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/plugins/drivers" + dproto "github.com/hashicorp/nomad/plugins/drivers/proto" +) + +// execHelper is a convenient wrapper for starting and executing commands, and handling their output +type execHelper struct { + logger hclog.Logger + + // newTerminal function creates a tty appropriate for the command + // The returned master end of tty function is to be called after process start. + newTerminal func() (master func() (*os.File, error), slave *os.File, err error) + + // setTTY is a callback to configure the command with slave end of the tty of the terminal, when tty is enabled + setTTY func(tty *os.File) error + + // setTTY is a callback to configure the command with std{in|out|err}, when tty is disabled + setIO func(stdin io.Reader, stdout, stderr io.Writer) error + + // processStart starts the process, like `exec.Cmd.Start()` + processStart func() error + + // processWait blocks until command terminates and returns its final state + processWait func() (*os.ProcessState, error) +} + +func (e *execHelper) run(ctx context.Context, tty bool, stream drivers.ExecTaskStream) error { + if tty { + return e.runTTY(ctx, stream) + } + return e.runNoTTY(ctx, stream) +} + +func (e *execHelper) runTTY(ctx context.Context, stream drivers.ExecTaskStream) error { + ptyF, tty, err := e.newTerminal() + if err != nil { + return fmt.Errorf("failed to open a tty: %v", err) + } + defer tty.Close() + + if err := e.setTTY(tty); err != nil { + return fmt.Errorf("failed to set command tty: %v", err) + } + if err := e.processStart(); err != nil { + return fmt.Errorf("failed to start command: %v", err) + } + + var wg sync.WaitGroup + errCh := make(chan error, 3) + + pty, err := ptyF() + if err != nil { + return fmt.Errorf("failed to get tty master: %v", err) + } + + defer pty.Close() + wg.Add(1) + go handleStdin(e.logger, pty, stream, errCh) + // when tty is on, stdout and stderr point to the same pty so only read once + go handleStdout(e.logger, pty, &wg, stream.Send, errCh) + + ps, err := e.processWait() + + // force close streams to close out the stream copying goroutines + tty.Close() + + // wait until we get all process output + wg.Wait() + + // wait to flush out output + stream.Send(cmdExitResult(ps, err)) + + select { + case cerr := <-errCh: + return cerr + default: + return nil + } +} + +func (e *execHelper) runNoTTY(ctx context.Context, stream drivers.ExecTaskStream) error { + var sendLock sync.Mutex + send := func(v *drivers.ExecTaskStreamingResponseMsg) error { + sendLock.Lock() + defer sendLock.Unlock() + + return stream.Send(v) + } + + stdinPr, stdinPw := io.Pipe() + stdoutPr, stdoutPw := io.Pipe() + stderrPr, stderrPw := io.Pipe() + + defer stdoutPw.Close() + defer stderrPw.Close() + + if err := e.setIO(stdinPr, stdoutPw, stderrPw); err != nil { + return fmt.Errorf("failed to set command io: %v", err) + } + + if err := e.processStart(); err != nil { + return fmt.Errorf("failed to start command: %v", err) + } + + var wg sync.WaitGroup + errCh := make(chan error, 3) + + wg.Add(2) + go handleStdin(e.logger, stdinPw, stream, errCh) + go handleStdout(e.logger, stdoutPr, &wg, send, errCh) + go handleStderr(e.logger, stderrPr, &wg, send, errCh) + + ps, err := e.processWait() + + // force close streams to close out the stream copying goroutines + stdinPr.Close() + stdoutPw.Close() + stderrPw.Close() + + // wait until we get all process output + wg.Wait() + + // wait to flush out output + stream.Send(cmdExitResult(ps, err)) + + select { + case cerr := <-errCh: + return cerr + default: + return nil + } +} +func cmdExitResult(ps *os.ProcessState, err error) *drivers.ExecTaskStreamingResponseMsg { + exitCode := -1 + + if ps == nil { + if ee, ok := err.(*exec.ExitError); ok { + ps = ee.ProcessState + } + } + + if ps == nil { + exitCode = -2 + } else if status, ok := ps.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + if status.Signaled() { + const exitSignalBase = 128 + signal := int(status.Signal()) + exitCode = exitSignalBase + signal + } + } + + return &drivers.ExecTaskStreamingResponseMsg{ + Exited: true, + Result: &dproto.ExitResult{ + ExitCode: int32(exitCode), + }, + } +} + +func handleStdin(logger hclog.Logger, stdin io.WriteCloser, stream drivers.ExecTaskStream, errCh chan<- error) { + for { + m, err := stream.Recv() + if isClosedError(err) { + return + } else if err != nil { + errCh <- err + return + } + + if m.Stdin != nil { + if len(m.Stdin.Data) != 0 { + _, err := stdin.Write(m.Stdin.Data) + if err != nil { + errCh <- err + return + } + } + if m.Stdin.Close { + stdin.Close() + } + } else if m.TtySize != nil { + err := setTTYSize(stdin, m.TtySize.Height, m.TtySize.Width) + if err != nil { + errCh <- fmt.Errorf("failed to resize tty: %v", err) + return + } + } else { + // ignore heartbeats + } + } +} + +func handleStdout(logger hclog.Logger, reader io.Reader, wg *sync.WaitGroup, send func(*drivers.ExecTaskStreamingResponseMsg) error, errCh chan<- error) { + defer wg.Done() + + buf := make([]byte, 4096) + for { + n, err := reader.Read(buf) + // always send output first if we read something + if n > 0 { + if err := send(&drivers.ExecTaskStreamingResponseMsg{ + Stdout: &dproto.ExecTaskStreamingIOOperation{ + Data: buf[:n], + }, + }); err != nil { + errCh <- err + return + } + } + + // then process error + if isClosedError(err) { + if err := send(&drivers.ExecTaskStreamingResponseMsg{ + Stdout: &dproto.ExecTaskStreamingIOOperation{ + Close: true, + }, + }); err != nil { + errCh <- err + return + } + return + } else if err != nil { + errCh <- err + return + } + + } +} + +func handleStderr(logger hclog.Logger, reader io.Reader, wg *sync.WaitGroup, send func(*drivers.ExecTaskStreamingResponseMsg) error, errCh chan<- error) { + defer wg.Done() + + buf := make([]byte, 4096) + for { + n, err := reader.Read(buf) + // always send output first if we read something + if n > 0 { + if err := send(&drivers.ExecTaskStreamingResponseMsg{ + Stderr: &dproto.ExecTaskStreamingIOOperation{ + Data: buf[:n], + }, + }); err != nil { + errCh <- err + return + } + } + + // then process error + if isClosedError(err) { + if err := send(&drivers.ExecTaskStreamingResponseMsg{ + Stderr: &dproto.ExecTaskStreamingIOOperation{ + Close: true, + }, + }); err != nil { + errCh <- err + return + } + return + } else if err != nil { + errCh <- err + return + } + + } +} + +func isClosedError(err error) bool { + if err == nil { + return false + } + + return err == io.EOF || + err == io.ErrClosedPipe || + isUnixEIOErr(err) +} diff --git a/drivers/shared/executor/executor.go b/drivers/shared/executor/executor.go index f077bd488..255a49db8 100644 --- a/drivers/shared/executor/executor.go +++ b/drivers/shared/executor/executor.go @@ -22,6 +22,7 @@ import ( "github.com/hashicorp/nomad/client/stats" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/plugins/drivers" + "github.com/kr/pty" shelpers "github.com/hashicorp/nomad/helper/stats" ) @@ -359,6 +360,53 @@ func ExecScript(ctx context.Context, dir string, env []string, attrs *syscall.Sy return buf.Bytes(), 0, nil } +func (e *UniversalExecutor) ExecStreaming(ctx context.Context, command []string, tty bool, + stream drivers.ExecTaskStream) error { + + if len(command) == 0 { + return fmt.Errorf("command is required") + } + + cmd := exec.CommandContext(ctx, command[0], command[1:]...) + + cmd.Dir = "/" + cmd.Env = e.childCmd.Env + + execHelper := &execHelper{ + logger: e.logger, + + newTerminal: func() (func() (*os.File, error), *os.File, error) { + pty, tty, err := pty.Open() + if err != nil { + return nil, nil, err + } + + return func() (*os.File, error) { return pty, nil }, tty, err + }, + setTTY: func(tty *os.File) error { + cmd.SysProcAttr = sessionCmdAttr(tty) + + cmd.Stdin = tty + cmd.Stdout = tty + cmd.Stderr = tty + return nil + }, + setIO: func(stdin io.Reader, stdout, stderr io.Writer) error { + cmd.Stdin = stdin + cmd.Stdout = stdout + cmd.Stderr = stderr + return nil + }, + processStart: cmd.Start, + processWait: func() (*os.ProcessState, error) { + err := cmd.Wait() + return cmd.ProcessState, err + }, + } + + return execHelper.run(ctx, tty, stream) +} + // Wait waits until a process has exited and returns it's exitcode and errors func (e *UniversalExecutor) Wait(ctx context.Context) (*ProcessState, error) { select { diff --git a/drivers/shared/executor/executor_linux.go b/drivers/shared/executor/executor_linux.go index b007da139..33900f876 100644 --- a/drivers/shared/executor/executor_linux.go +++ b/drivers/shared/executor/executor_linux.go @@ -5,6 +5,7 @@ package executor import ( "context" "fmt" + "io" "os" "os/exec" "path" @@ -28,6 +29,7 @@ import ( cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" lconfigs "github.com/opencontainers/runc/libcontainer/configs" ldevices "github.com/opencontainers/runc/libcontainer/devices" + lutils "github.com/opencontainers/runc/libcontainer/utils" "github.com/syndtr/gocapability/capability" "golang.org/x/sys/unix" ) @@ -504,6 +506,53 @@ func (l *LibcontainerExecutor) Exec(deadline time.Time, cmd string, args []strin } +func (l *LibcontainerExecutor) newTerminalSocket() (master func() (*os.File, error), socket *os.File, err error) { + parent, child, err := lutils.NewSockPair("socket") + if err != nil { + return nil, nil, fmt.Errorf("failed to create terminal: %v", err) + } + + return func() (*os.File, error) { return lutils.RecvFd(parent) }, child, err + +} + +func (l *LibcontainerExecutor) ExecStreaming(ctx context.Context, cmd []string, tty bool, + stream drivers.ExecTaskStream) error { + + // the task process will be started by the container + process := &libcontainer.Process{ + Args: cmd, + Env: l.userProc.Env, + User: l.userProc.User, + Init: false, + Cwd: "/", + } + + execHelper := &execHelper{ + logger: l.logger, + + newTerminal: l.newTerminalSocket, + setTTY: func(tty *os.File) error { + process.ConsoleSocket = tty + return nil + }, + setIO: func(stdin io.Reader, stdout, stderr io.Writer) error { + process.Stdin = stdin + process.Stdout = stdout + process.Stderr = stderr + return nil + }, + + processStart: func() error { return l.container.Run(process) }, + processWait: func() (*os.ProcessState, error) { + return process.Wait() + }, + } + + return execHelper.run(ctx, tty, stream) + +} + type waitResult struct { ps *os.ProcessState err error diff --git a/drivers/shared/executor/pty_unix.go b/drivers/shared/executor/pty_unix.go new file mode 100644 index 000000000..0c1be1914 --- /dev/null +++ b/drivers/shared/executor/pty_unix.go @@ -0,0 +1,43 @@ +// +build darwin dragonfly freebsd linux netbsd openbsd solaris + +package executor + +import ( + "fmt" + "io" + "os" + "strings" + "syscall" + + "github.com/kr/pty" + "golang.org/x/sys/unix" +) + +func sessionCmdAttr(tty *os.File) *syscall.SysProcAttr { + return &syscall.SysProcAttr{ + Setsid: true, + Setctty: true, + Ctty: int(tty.Fd()), + } +} + +func setTTYSize(w io.Writer, height, width int32) error { + f, ok := w.(*os.File) + if !ok { + return fmt.Errorf("attempted to resize a non-tty session") + } + + return pty.Setsize(f, &pty.Winsize{ + Rows: uint16(height), + Cols: uint16(width), + }) + +} + +func isUnixEIOErr(err error) bool { + if err == nil { + return false + } + + return strings.Contains(err.Error(), unix.EIO.Error()) +} diff --git a/drivers/shared/executor/pty_windows.go b/drivers/shared/executor/pty_windows.go new file mode 100644 index 000000000..12411a5a5 --- /dev/null +++ b/drivers/shared/executor/pty_windows.go @@ -0,0 +1,23 @@ +// +build windows + +package executor + +import ( + "fmt" + "io" + "os" + "syscall" +) + +func sessionCmdAttr(tty *os.File) *syscall.SysProcAttr { + return &syscall.SysProcAttr{} +} + +func setTTYSize(w io.Writer, height, width int32) error { + return fmt.Errorf("unsupported") + +} + +func isUnixEIOErr(err error) bool { + return false +}