executors: implement streaming exec

Implements streamign exec handling in both executors (i.e. universal and
libcontainer).

For creation of TTY, some incidental complexity leaked in.  The universal
executor uses github.com/kr/pty for creation of TTYs.

On the other hand, libcontainer expects a console socket and for libcontainer to
create the underlying console object on process start.  The caller can then use
`libcontainer.utils.RecvFd()` to get tty master end.

I chose github.com/kr/pty for managing TTYs here.  I tried
`github.com/containerd/console` package (which is already imported), but the
package did not work as expected on macOS.
This commit is contained in:
Mahmood Ali
2019-04-28 17:30:10 -04:00
parent efc4249f85
commit 976bfbc41a
5 changed files with 450 additions and 0 deletions

View File

@@ -22,6 +22,7 @@ import (
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/kr/pty"
shelpers "github.com/hashicorp/nomad/helper/stats"
)
@@ -359,6 +360,53 @@ func ExecScript(ctx context.Context, dir string, env []string, attrs *syscall.Sy
return buf.Bytes(), 0, nil
}
func (e *UniversalExecutor) ExecStreaming(ctx context.Context, command []string, tty bool,
stream drivers.ExecTaskStream) error {
if len(command) == 0 {
return fmt.Errorf("command is required")
}
cmd := exec.CommandContext(ctx, command[0], command[1:]...)
cmd.Dir = "/"
cmd.Env = e.childCmd.Env
execHelper := &execHelper{
logger: e.logger,
newTerminal: func() (func() (*os.File, error), *os.File, error) {
pty, tty, err := pty.Open()
if err != nil {
return nil, nil, err
}
return func() (*os.File, error) { return pty, nil }, tty, err
},
setTTY: func(tty *os.File) error {
cmd.SysProcAttr = sessionCmdAttr(tty)
cmd.Stdin = tty
cmd.Stdout = tty
cmd.Stderr = tty
return nil
},
setIO: func(stdin io.Reader, stdout, stderr io.Writer) error {
cmd.Stdin = stdin
cmd.Stdout = stdout
cmd.Stderr = stderr
return nil
},
processStart: cmd.Start,
processWait: func() (*os.ProcessState, error) {
err := cmd.Wait()
return cmd.ProcessState, err
},
}
return execHelper.run(ctx, tty, stream)
}
// Wait waits until a process has exited and returns it's exitcode and errors
func (e *UniversalExecutor) Wait(ctx context.Context) (*ProcessState, error) {
select {