From 0087a51a7ade86caa22b67e1007878864e68a81c Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Thu, 6 Dec 2018 20:54:14 -0500 Subject: [PATCH] executor: remove structs package --- drivers/exec/driver.go | 8 +- drivers/exec/handle.go | 4 +- drivers/java/driver.go | 6 +- drivers/java/handle.go | 4 +- drivers/qemu/driver.go | 4 +- drivers/qemu/handle.go | 4 +- drivers/rawexec/driver.go | 4 +- drivers/rawexec/handle.go | 4 +- drivers/rkt/driver.go | 6 +- drivers/rkt/handle.go | 4 +- drivers/shared/executor/executor.go | 190 ++++++++++++++++-- drivers/shared/executor/executor_linux.go | 33 ++- .../shared/executor/executor_linux_test.go | 9 +- drivers/shared/executor/executor_test.go | 13 +- drivers/shared/executor/structs/structs.go | 177 ---------------- plugins/drivers/utils/utils.go | 10 +- plugins/executor/client.go | 14 +- plugins/executor/server.go | 6 +- plugins/executor/utils.go | 16 +- 19 files changed, 251 insertions(+), 265 deletions(-) delete mode 100644 drivers/shared/executor/structs/structs.go diff --git a/drivers/exec/driver.go b/drivers/exec/driver.go index 7398ee022..9353bb695 100644 --- a/drivers/exec/driver.go +++ b/drivers/exec/driver.go @@ -14,7 +14,7 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/drivers/shared/eventer" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers/utils" @@ -298,7 +298,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru return nil, nil, fmt.Errorf("failed to create executor: %v", err) } - execCmd := &structs.ExecCommand{ + execCmd := &executor.ExecCommand{ Cmd: driverConfig.Command, Args: driverConfig.Args, Env: cfg.EnvList(), @@ -345,12 +345,12 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru return handle, nil, nil } -func toExecResources(resources *drivers.Resources) *structs.Resources { +func toExecResources(resources *drivers.Resources) *executor.Resources { if resources == nil || resources.NomadResources == nil { return nil } - return &structs.Resources{ + return &executor.Resources{ CPU: resources.NomadResources.CPU, MemoryMB: resources.NomadResources.MemoryMB, DiskMB: resources.NomadResources.DiskMB, diff --git a/drivers/exec/handle.go b/drivers/exec/handle.go index 1c73573ac..606406b64 100644 --- a/drivers/exec/handle.go +++ b/drivers/exec/handle.go @@ -8,12 +8,12 @@ import ( hclog "github.com/hashicorp/go-hclog" plugin "github.com/hashicorp/go-plugin" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/drivers" ) type taskHandle struct { - exec structs.Executor + exec executor.Executor pid int pluginClient *plugin.Client logger hclog.Logger diff --git a/drivers/java/driver.go b/drivers/java/driver.go index 840f99245..9fbf442c8 100644 --- a/drivers/java/driver.go +++ b/drivers/java/driver.go @@ -16,7 +16,7 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/drivers/shared/eventer" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers/utils" @@ -330,13 +330,13 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru return nil, nil, fmt.Errorf("failed to create executor: %v", err) } - execCmd := &structs.ExecCommand{ + execCmd := &executor.ExecCommand{ Cmd: absPath, Args: args, Env: cfg.EnvList(), User: cfg.User, ResourceLimits: true, - Resources: &structs.Resources{ + Resources: &executor.Resources{ CPU: cfg.Resources.NomadResources.CPU, MemoryMB: cfg.Resources.NomadResources.MemoryMB, DiskMB: cfg.Resources.NomadResources.DiskMB, diff --git a/drivers/java/handle.go b/drivers/java/handle.go index 339036612..8ef2dc6a0 100644 --- a/drivers/java/handle.go +++ b/drivers/java/handle.go @@ -8,12 +8,12 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/drivers" ) type taskHandle struct { - exec structs.Executor + exec executor.Executor pid int pluginClient *plugin.Client logger hclog.Logger diff --git a/drivers/qemu/driver.go b/drivers/qemu/driver.go index 1f632e22c..9c498b9ce 100644 --- a/drivers/qemu/driver.go +++ b/drivers/qemu/driver.go @@ -18,7 +18,7 @@ import ( "github.com/hashicorp/go-plugin" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/drivers/shared/eventer" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers/utils" @@ -424,7 +424,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru return nil, nil, err } - execCmd := &structs.ExecCommand{ + execCmd := &executor.ExecCommand{ Cmd: args[0], Args: args[1:], Env: cfg.EnvList(), diff --git a/drivers/qemu/handle.go b/drivers/qemu/handle.go index 4f54c22ad..63223f1c2 100644 --- a/drivers/qemu/handle.go +++ b/drivers/qemu/handle.go @@ -8,12 +8,12 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/drivers" ) type taskHandle struct { - exec structs.Executor + exec executor.Executor pid int pluginClient *plugin.Client logger hclog.Logger diff --git a/drivers/rawexec/driver.go b/drivers/rawexec/driver.go index 78cdad364..c5adb62dd 100644 --- a/drivers/rawexec/driver.go +++ b/drivers/rawexec/driver.go @@ -15,7 +15,7 @@ import ( plugin "github.com/hashicorp/go-plugin" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/drivers/shared/eventer" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers/utils" @@ -327,7 +327,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru // will cause an error. useCgroups := !d.config.NoCgroups && runtime.GOOS == "linux" && syscall.Geteuid() == 0 - execCmd := &structs.ExecCommand{ + execCmd := &executor.ExecCommand{ Cmd: driverConfig.Command, Args: driverConfig.Args, Env: cfg.EnvList(), diff --git a/drivers/rawexec/handle.go b/drivers/rawexec/handle.go index 72401556a..0e46277e3 100644 --- a/drivers/rawexec/handle.go +++ b/drivers/rawexec/handle.go @@ -8,12 +8,12 @@ import ( hclog "github.com/hashicorp/go-hclog" plugin "github.com/hashicorp/go-plugin" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/drivers" ) type taskHandle struct { - exec structs.Executor + exec executor.Executor pid int pluginClient *plugin.Client logger hclog.Logger diff --git a/drivers/rkt/driver.go b/drivers/rkt/driver.go index 7cab0c8b0..99ffea850 100644 --- a/drivers/rkt/driver.go +++ b/drivers/rkt/driver.go @@ -28,7 +28,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/drivers/shared/eventer" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers/utils" @@ -643,11 +643,11 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru // Enable ResourceLimits to place the executor in a parent cgroup of // the rkt container. This allows stats collection via the executor to // work just like it does for exec. - execCmd := &structs.ExecCommand{ + execCmd := &executor.ExecCommand{ Cmd: absPath, Args: runArgs, ResourceLimits: true, - Resources: &structs.Resources{ + Resources: &executor.Resources{ CPU: int(cfg.Resources.LinuxResources.CPUShares), MemoryMB: int(drivers.BytesToMB(cfg.Resources.LinuxResources.MemoryLimitBytes)), DiskMB: cfg.Resources.NomadResources.DiskMB, diff --git a/drivers/rkt/handle.go b/drivers/rkt/handle.go index d77ae2173..e9965a22f 100644 --- a/drivers/rkt/handle.go +++ b/drivers/rkt/handle.go @@ -11,12 +11,12 @@ import ( hclog "github.com/hashicorp/go-hclog" plugin "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/taskenv" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/drivers" ) type taskHandle struct { - exec structs.Executor + exec executor.Executor env *taskenv.TaskEnv uuid string pid int diff --git a/drivers/shared/executor/executor.go b/drivers/shared/executor/executor.go index cb34ce3bd..b1d898103 100644 --- a/drivers/shared/executor/executor.go +++ b/drivers/shared/executor/executor.go @@ -3,6 +3,8 @@ package executor import ( "context" "fmt" + "io" + "io/ioutil" "os" "os/exec" "path/filepath" @@ -16,8 +18,8 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/lib/fifo" "github.com/hashicorp/nomad/client/stats" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" shelpers "github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/consul-template/signals" @@ -39,14 +41,178 @@ var ( ExecutorBasicMeasuredCpuStats = []string{"System Mode", "User Mode", "Percent"} ) +// Executor is the interface which allows a driver to launch and supervise +// a process +type Executor interface { + // Launch a user process configured by the given ExecCommand + Launch(launchCmd *ExecCommand) (*ProcessState, error) + + // Wait blocks until the process exits or an error occures + Wait(ctx context.Context) (*ProcessState, error) + + // Shutdown will shutdown the executor by stopping the user process, + // cleaning up and resources created by the executor. The shutdown sequence + // will first send the given signal to the process. This defaults to "SIGINT" + // if not specified. The executor will then wait for the process to exit + // before cleaning up other resources. If the executor waits longer than the + // given grace period, the process is forcefully killed. + // + // To force kill the user process, gracePeriod can be set to 0. + Shutdown(signal string, gracePeriod time.Duration) error + + // UpdateResources updates any resource isolation enforcement with new + // constraints if supported. + UpdateResources(*Resources) error + + // Version returns the executor API version + Version() (*ExecutorVersion, error) + + // Stats fetchs process usage stats for the executor and each pid if available + Stats() (*cstructs.TaskResourceUsage, error) + + // Signal sends the given signal to the user process + Signal(os.Signal) error + + // Exec executes the given command and args inside the executor context + // and returns the output and exit code. + Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) +} + +// Resources describes the resource isolation required +type Resources struct { + CPU int + MemoryMB int + DiskMB int + IOPS int +} + +// ExecCommand holds the user command, args, and other isolation related +// settings. +type ExecCommand struct { + // Cmd is the command that the user wants to run. + Cmd string + + // Args is the args of the command that the user wants to run. + Args []string + + // Resources defined by the task + Resources *Resources + + // StdoutPath is the path the process stdout should be written to + StdoutPath string + stdout io.WriteCloser + + // StderrPath is the path the process stderr should be written to + StderrPath string + stderr io.WriteCloser + + // Env is the list of KEY=val pairs of environment variables to be set + Env []string + + // User is the user which the executor uses to run the command. + User string + + // TaskDir is the directory path on the host where for the task + TaskDir string + + // ResourceLimits determines whether resource limits are enforced by the + // executor. + ResourceLimits bool + + // Cgroup marks whether we put the process in a cgroup. Setting this field + // doesn't enforce resource limits. To enforce limits, set ResourceLimits. + // Using the cgroup does allow more precise cleanup of processes. + BasicProcessCgroup bool +} + +// SetWriters sets the writer for the process stdout and stderr. This should +// not be used if writing to a file path such as a fifo file. SetStdoutWriter +// is mainly used for unit testing purposes. +func (c *ExecCommand) SetWriters(out io.WriteCloser, err io.WriteCloser) { + c.stdout = out + c.stderr = err +} + +// GetWriters returns the unexported io.WriteCloser for the stdout and stderr +// handles. This is mainly used for unit testing purposes. +func (c *ExecCommand) GetWriters() (stdout io.WriteCloser, stderr io.WriteCloser) { + return c.stdout, c.stderr +} + +type nopCloser struct { + io.Writer +} + +func (nopCloser) Close() error { return nil } + +// Stdout returns a writer for the configured file descriptor +func (c *ExecCommand) Stdout() (io.WriteCloser, error) { + if c.stdout == nil { + if c.StdoutPath != "" { + f, err := fifo.Open(c.StdoutPath) + if err != nil { + return nil, fmt.Errorf("failed to create stdout: %v", err) + } + c.stdout = f + } else { + c.stdout = nopCloser{ioutil.Discard} + } + } + return c.stdout, nil +} + +// Stderr returns a writer for the configured file descriptor +func (c *ExecCommand) Stderr() (io.WriteCloser, error) { + if c.stderr == nil { + if c.StderrPath != "" { + f, err := fifo.Open(c.StderrPath) + if err != nil { + return nil, fmt.Errorf("failed to create stderr: %v", err) + } + c.stderr = f + } else { + c.stderr = nopCloser{ioutil.Discard} + } + } + return c.stderr, nil +} + +func (c *ExecCommand) Close() { + stdout, err := c.Stdout() + if err == nil { + stdout.Close() + } + stderr, err := c.Stderr() + if err == nil { + stderr.Close() + } +} + +// ProcessState holds information about the state of a user process. +type ProcessState struct { + Pid int + ExitCode int + Signal int + Time time.Time +} + +// ExecutorVersion is the version of the executor +type ExecutorVersion struct { + Version string +} + +func (v *ExecutorVersion) GoString() string { + return v.Version +} + // UniversalExecutor is an implementation of the Executor which launches and // supervises processes. In addition to process supervision it provides resource // and file system isolation type UniversalExecutor struct { childCmd exec.Cmd - commandCfg *structs.ExecCommand + commandCfg *ExecCommand - exitState *structs.ProcessState + exitState *ProcessState processExited chan interface{} // resConCtx is used to track and cleanup additional resources created by @@ -62,7 +228,7 @@ type UniversalExecutor struct { } // NewExecutor returns an Executor -func NewExecutor(logger hclog.Logger) structs.Executor { +func NewExecutor(logger hclog.Logger) Executor { logger = logger.Named("executor") if err := shelpers.Init(); err != nil { logger.Error("unable to initialize stats", "error", err) @@ -78,13 +244,13 @@ func NewExecutor(logger hclog.Logger) structs.Executor { } // Version returns the api version of the executor -func (e *UniversalExecutor) Version() (*structs.ExecutorVersion, error) { - return &structs.ExecutorVersion{Version: ExecutorVersionLatest}, nil +func (e *UniversalExecutor) Version() (*ExecutorVersion, error) { + return &ExecutorVersion{Version: ExecutorVersionLatest}, nil } // Launch launches the main process and returns its state. It also // configures an applies isolation on certain platforms. -func (e *UniversalExecutor) Launch(command *structs.ExecCommand) (*structs.ProcessState, error) { +func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) { e.logger.Info("launching command", "command", command.Cmd, "args", strings.Join(command.Args, " ")) e.commandCfg = command @@ -146,7 +312,7 @@ func (e *UniversalExecutor) Launch(command *structs.ExecCommand) (*structs.Proce go e.pidCollector.collectPids(e.processExited, getAllPids) go e.wait() - return &structs.ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil + return &ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil } // Exec a command inside a container for exec and java drivers. @@ -194,7 +360,7 @@ func ExecScript(ctx context.Context, dir string, env []string, attrs *syscall.Sy } // Wait waits until a process has exited and returns it's exitcode and errors -func (e *UniversalExecutor) Wait(ctx context.Context) (*structs.ProcessState, error) { +func (e *UniversalExecutor) Wait(ctx context.Context) (*ProcessState, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -203,7 +369,7 @@ func (e *UniversalExecutor) Wait(ctx context.Context) (*structs.ProcessState, er } } -func (e *UniversalExecutor) UpdateResources(resources *structs.Resources) error { +func (e *UniversalExecutor) UpdateResources(resources *Resources) error { return nil } @@ -212,7 +378,7 @@ func (e *UniversalExecutor) wait() { pid := e.childCmd.Process.Pid err := e.childCmd.Wait() if err == nil { - e.exitState = &structs.ProcessState{Pid: pid, ExitCode: 0, Time: time.Now()} + e.exitState = &ProcessState{Pid: pid, ExitCode: 0, Time: time.Now()} return } @@ -240,7 +406,7 @@ func (e *UniversalExecutor) wait() { e.logger.Warn("unexpected Cmd.Wait() error type", "error", err) } - e.exitState = &structs.ProcessState{Pid: pid, ExitCode: exitCode, Signal: signal, Time: time.Now()} + e.exitState = &ProcessState{Pid: pid, ExitCode: exitCode, Signal: signal, Time: time.Now()} } var ( diff --git a/drivers/shared/executor/executor_linux.go b/drivers/shared/executor/executor_linux.go index f5182d0c9..dc361f3a3 100644 --- a/drivers/shared/executor/executor_linux.go +++ b/drivers/shared/executor/executor_linux.go @@ -19,7 +19,6 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/stats" cstructs "github.com/hashicorp/nomad/client/structs" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" "github.com/hashicorp/nomad/helper/discover" shelpers "github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/nomad/helper/uuid" @@ -62,7 +61,7 @@ func init() { // LibcontainerExecutor implements an Executor with the runc/libcontainer api type LibcontainerExecutor struct { id string - command *structs.ExecCommand + command *ExecCommand logger hclog.Logger @@ -74,10 +73,10 @@ type LibcontainerExecutor struct { container libcontainer.Container userProc *libcontainer.Process userProcExited chan interface{} - exitState *structs.ProcessState + exitState *ProcessState } -func NewExecutorWithIsolation(logger hclog.Logger) structs.Executor { +func NewExecutorWithIsolation(logger hclog.Logger) Executor { logger = logger.Named("isolated_executor") if err := shelpers.Init(); err != nil { logger.Error("unable to initialize stats", "error", err) @@ -93,7 +92,7 @@ func NewExecutorWithIsolation(logger hclog.Logger) structs.Executor { } // Launch creates a new container in libcontainer and starts a new process with it -func (l *LibcontainerExecutor) Launch(command *structs.ExecCommand) (*structs.ProcessState, error) { +func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, error) { l.logger.Info("launching command", "command", command.Cmd, "args", strings.Join(command.Args, " ")) // Find the nomad executable to launch the executor process with bin, err := discover.NomadExecutable() @@ -102,7 +101,7 @@ func (l *LibcontainerExecutor) Launch(command *structs.ExecCommand) (*structs.Pr } if command.Resources == nil { - command.Resources = &structs.Resources{} + command.Resources = &Resources{} } l.command = command @@ -207,7 +206,7 @@ func (l *LibcontainerExecutor) Launch(command *structs.ExecCommand) (*structs.Pr go l.pidCollector.collectPids(l.userProcExited, l.getAllPids) go l.wait() - return &structs.ProcessState{ + return &ProcessState{ Pid: pid, ExitCode: -1, Time: time.Now(), @@ -232,7 +231,7 @@ func (l *LibcontainerExecutor) getAllPids() (map[int]*nomadPid, error) { } // Wait waits until a process has exited and returns it's exitcode and errors -func (l *LibcontainerExecutor) Wait(ctx context.Context) (*structs.ProcessState, error) { +func (l *LibcontainerExecutor) Wait(ctx context.Context) (*ProcessState, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -252,7 +251,7 @@ func (l *LibcontainerExecutor) wait() { ps = exitErr.ProcessState } else { l.logger.Error("failed to call wait on user process", "error", err) - l.exitState = &structs.ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()} + l.exitState = &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()} return } } @@ -270,7 +269,7 @@ func (l *LibcontainerExecutor) wait() { } } - l.exitState = &structs.ProcessState{ + l.exitState = &ProcessState{ Pid: ps.Pid(), ExitCode: exitCode, Signal: signal, @@ -332,13 +331,13 @@ func (l *LibcontainerExecutor) Shutdown(signal string, grace time.Duration) erro } // UpdateResources updates the resource isolation with new values to be enforced -func (l *LibcontainerExecutor) UpdateResources(resources *structs.Resources) error { +func (l *LibcontainerExecutor) UpdateResources(resources *Resources) error { return nil } // Version returns the api version of the executor -func (l *LibcontainerExecutor) Version() (*structs.ExecutorVersion, error) { - return &structs.ExecutorVersion{Version: ExecutorVersionLatest}, nil +func (l *LibcontainerExecutor) Version() (*ExecutorVersion, error) { + return &ExecutorVersion{Version: ExecutorVersionLatest}, nil } // Stats returns the resource statistics for processes managed by the executor @@ -458,7 +457,7 @@ func (l *LibcontainerExecutor) handleExecWait(ch chan *waitResult, process *libc ch <- &waitResult{ps, err} } -func configureCapabilities(cfg *lconfigs.Config, command *structs.ExecCommand) { +func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) { // TODO: allow better control of these cfg.Capabilities = &lconfigs.Capabilities{ Bounding: allCaps, @@ -470,7 +469,7 @@ func configureCapabilities(cfg *lconfigs.Config, command *structs.ExecCommand) { } -func configureIsolation(cfg *lconfigs.Config, command *structs.ExecCommand) { +func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) { defaultMountFlags := syscall.MS_NOEXEC | syscall.MS_NOSUID | syscall.MS_NODEV // set the new root directory for the container @@ -536,7 +535,7 @@ func configureIsolation(cfg *lconfigs.Config, command *structs.ExecCommand) { } } -func configureCgroups(cfg *lconfigs.Config, command *structs.ExecCommand) error { +func configureCgroups(cfg *lconfigs.Config, command *ExecCommand) error { // If resources are not limited then manually create cgroups needed if !command.ResourceLimits { @@ -602,7 +601,7 @@ func configureBasicCgroups(cfg *lconfigs.Config) error { return nil } -func newLibcontainerConfig(command *structs.ExecCommand) *lconfigs.Config { +func newLibcontainerConfig(command *ExecCommand) *lconfigs.Config { cfg := &lconfigs.Config{ Cgroups: &lconfigs.Cgroup{ Resources: &lconfigs.Resources{ diff --git a/drivers/shared/executor/executor_linux_test.go b/drivers/shared/executor/executor_linux_test.go index 3298fdca3..e11e9d5df 100644 --- a/drivers/shared/executor/executor_linux_test.go +++ b/drivers/shared/executor/executor_linux_test.go @@ -16,7 +16,6 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/testutil" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" tu "github.com/hashicorp/nomad/testutil" @@ -27,7 +26,7 @@ func init() { executorFactories["LibcontainerExecutor"] = libcontainerFactory } -func libcontainerFactory(l hclog.Logger) structs.Executor { +func libcontainerFactory(l hclog.Logger) Executor { return NewExecutorWithIsolation(l) } @@ -35,7 +34,7 @@ func libcontainerFactory(l hclog.Logger) structs.Executor { // chroot. Use testExecutorContext if you don't need a chroot. // // The caller is responsible for calling AllocDir.Destroy() to cleanup. -func testExecutorCommandWithChroot(t *testing.T) (*structs.ExecCommand, *allocdir.AllocDir) { +func testExecutorCommandWithChroot(t *testing.T) (*ExecCommand, *allocdir.AllocDir) { chrootEnv := map[string]string{ "/etc/ld.so.cache": "/etc/ld.so.cache", "/etc/ld.so.conf": "/etc/ld.so.conf", @@ -63,10 +62,10 @@ func testExecutorCommandWithChroot(t *testing.T) (*structs.ExecCommand, *allocdi t.Fatalf("allocDir.NewTaskDir(%q) failed: %v", task.Name, err) } td := allocDir.TaskDirs[task.Name] - cmd := &structs.ExecCommand{ + cmd := &ExecCommand{ Env: taskEnv.List(), TaskDir: td.Dir, - Resources: &structs.Resources{ + Resources: &Resources{ CPU: task.Resources.CPU, MemoryMB: task.Resources.MemoryMB, IOPS: task.Resources.IOPS, diff --git a/drivers/shared/executor/executor_test.go b/drivers/shared/executor/executor_test.go index aa4711ac9..26067ebc8 100644 --- a/drivers/shared/executor/executor_test.go +++ b/drivers/shared/executor/executor_test.go @@ -12,7 +12,6 @@ import ( "testing" "time" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" tu "github.com/hashicorp/nomad/testutil" hclog "github.com/hashicorp/go-hclog" @@ -24,8 +23,8 @@ import ( "github.com/stretchr/testify/require" ) -var executorFactories = map[string]func(hclog.Logger) structs.Executor{} -var universalFactory = func(l hclog.Logger) structs.Executor { +var executorFactories = map[string]func(hclog.Logger) Executor{} +var universalFactory = func(l hclog.Logger) Executor { return NewExecutor(l) } @@ -36,7 +35,7 @@ func init() { // testExecutorContext returns an ExecutorContext and AllocDir. // // The caller is responsible for calling AllocDir.Destroy() to cleanup. -func testExecutorCommand(t *testing.T) (*structs.ExecCommand, *allocdir.AllocDir) { +func testExecutorCommand(t *testing.T) (*ExecCommand, *allocdir.AllocDir) { alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] taskEnv := taskenv.NewBuilder(mock.Node(), alloc, task, "global").Build() @@ -50,10 +49,10 @@ func testExecutorCommand(t *testing.T) (*structs.ExecCommand, *allocdir.AllocDir t.Fatalf("allocDir.NewTaskDir(%q) failed: %v", task.Name, err) } td := allocDir.TaskDirs[task.Name] - cmd := &structs.ExecCommand{ + cmd := &ExecCommand{ Env: taskEnv.List(), TaskDir: td.Dir, - Resources: &structs.Resources{ + Resources: &Resources{ CPU: task.Resources.CPU, MemoryMB: task.Resources.MemoryMB, IOPS: task.Resources.IOPS, @@ -70,7 +69,7 @@ type bufferCloser struct { func (_ *bufferCloser) Close() error { return nil } -func configureTLogging(cmd *structs.ExecCommand) (stdout bufferCloser, stderr bufferCloser) { +func configureTLogging(cmd *ExecCommand) (stdout bufferCloser, stderr bufferCloser) { cmd.SetWriters(&stdout, &stderr) return } diff --git a/drivers/shared/executor/structs/structs.go b/drivers/shared/executor/structs/structs.go deleted file mode 100644 index 56495f7ab..000000000 --- a/drivers/shared/executor/structs/structs.go +++ /dev/null @@ -1,177 +0,0 @@ -package structs - -import ( - "context" - "fmt" - "io" - "io/ioutil" - "os" - "time" - - "github.com/hashicorp/nomad/client/lib/fifo" - cstructs "github.com/hashicorp/nomad/client/structs" -) - -// Executor is the interface which allows a driver to launch and supervise -// a process -type Executor interface { - // Launch a user process configured by the given ExecCommand - Launch(launchCmd *ExecCommand) (*ProcessState, error) - - // Wait blocks until the process exits or an error occures - Wait(ctx context.Context) (*ProcessState, error) - - // Shutdown will shutdown the executor by stopping the user process, - // cleaning up and resources created by the executor. The shutdown sequence - // will first send the given signal to the process. This defaults to "SIGINT" - // if not specified. The executor will then wait for the process to exit - // before cleaning up other resources. If the executor waits longer than the - // given grace period, the process is forcefully killed. - // - // To force kill the user process, gracePeriod can be set to 0. - Shutdown(signal string, gracePeriod time.Duration) error - - // UpdateResources updates any resource isolation enforcement with new - // constraints if supported. - UpdateResources(*Resources) error - - // Version returns the executor API version - Version() (*ExecutorVersion, error) - - // Stats fetchs process usage stats for the executor and each pid if available - Stats() (*cstructs.TaskResourceUsage, error) - - // Signal sends the given signal to the user process - Signal(os.Signal) error - - // Exec executes the given command and args inside the executor context - // and returns the output and exit code. - Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) -} - -// Resources describes the resource isolation required -type Resources struct { - CPU int - MemoryMB int - DiskMB int - IOPS int -} - -// ExecCommand holds the user command, args, and other isolation related -// settings. -type ExecCommand struct { - // Cmd is the command that the user wants to run. - Cmd string - - // Args is the args of the command that the user wants to run. - Args []string - - // Resources defined by the task - Resources *Resources - - // StdoutPath is the path the process stdout should be written to - StdoutPath string - stdout io.WriteCloser - - // StderrPath is the path the process stderr should be written to - StderrPath string - stderr io.WriteCloser - - // Env is the list of KEY=val pairs of environment variables to be set - Env []string - - // User is the user which the executor uses to run the command. - User string - - // TaskDir is the directory path on the host where for the task - TaskDir string - - // ResourceLimits determines whether resource limits are enforced by the - // executor. - ResourceLimits bool - - // Cgroup marks whether we put the process in a cgroup. Setting this field - // doesn't enforce resource limits. To enforce limits, set ResourceLimits. - // Using the cgroup does allow more precise cleanup of processes. - BasicProcessCgroup bool -} - -// SetWriters sets the writer for the process stdout and stderr. This should -// not be used if writing to a file path such as a fifo file. SetStdoutWriter -// is mainly used for unit testing purposes. -func (c *ExecCommand) SetWriters(out io.WriteCloser, err io.WriteCloser) { - c.stdout = out - c.stderr = err -} - -// GetWriters returns the unexported io.WriteCloser for the stdout and stderr -// handles. This is mainly used for unit testing purposes. -func (c *ExecCommand) GetWriters() (stdout io.WriteCloser, stderr io.WriteCloser) { - return c.stdout, c.stderr -} - -type nopCloser struct { - io.Writer -} - -func (nopCloser) Close() error { return nil } - -// Stdout returns a writer for the configured file descriptor -func (c *ExecCommand) Stdout() (io.WriteCloser, error) { - if c.stdout == nil { - if c.StdoutPath != "" { - f, err := fifo.Open(c.StdoutPath) - if err != nil { - return nil, fmt.Errorf("failed to create stdout: %v", err) - } - c.stdout = f - } else { - c.stdout = nopCloser{ioutil.Discard} - } - } - return c.stdout, nil -} - -// Stderr returns a writer for the configured file descriptor -func (c *ExecCommand) Stderr() (io.WriteCloser, error) { - if c.stderr == nil { - if c.StderrPath != "" { - f, err := fifo.Open(c.StderrPath) - if err != nil { - return nil, fmt.Errorf("failed to create stderr: %v", err) - } - c.stderr = f - } else { - c.stderr = nopCloser{ioutil.Discard} - } - } - return c.stderr, nil -} - -func (c *ExecCommand) Close() { - stdout, err := c.Stdout() - if err == nil { - stdout.Close() - } - stderr, err := c.Stderr() - if err == nil { - stderr.Close() - } -} - -// ProcessState holds information about the state of a user process. -type ProcessState struct { - Pid int - ExitCode int - Signal int - Time time.Time -} - -// ExecutorVersion is the version of the executor -type ExecutorVersion struct { - Version string -} - -func (v *ExecutorVersion) GoString() string { - return v.Version -} diff --git a/plugins/drivers/utils/utils.go b/plugins/drivers/utils/utils.go index 1060dfce2..f36534354 100644 --- a/plugins/drivers/utils/utils.go +++ b/plugins/drivers/utils/utils.go @@ -14,7 +14,7 @@ import ( "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" - estructs "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" @@ -64,7 +64,7 @@ func CgroupsMounted(node *structs.Node) bool { // CreateExecutor launches an executor plugin and returns an instance of the // Executor interface func CreateExecutor(w io.Writer, level hclog.Level, driverConfig *base.ClientDriverConfig, - executorConfig *pexecutor.ExecutorConfig) (estructs.Executor, *plugin.Client, error) { + executorConfig *pexecutor.ExecutorConfig) (executor.Executor, *plugin.Client, error) { c, err := json.Marshal(executorConfig) if err != nil { @@ -106,12 +106,12 @@ func CreateExecutor(w io.Writer, level hclog.Level, driverConfig *base.ClientDri if err != nil { return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err) } - executorPlugin := raw.(estructs.Executor) + executorPlugin := raw.(executor.Executor) return executorPlugin, executorClient, nil } // CreateExecutorWithConfig launches a plugin with a given plugin config -func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (estructs.Executor, *plugin.Client, error) { +func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (executor.Executor, *plugin.Client, error) { config.HandshakeConfig = base.Handshake // Setting this to DEBUG since the log level at the executor server process @@ -131,7 +131,7 @@ func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (estruct if err != nil { return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err) } - executorPlugin, ok := raw.(estructs.Executor) + executorPlugin, ok := raw.(executor.Executor) if !ok { return nil, nil, fmt.Errorf("unexpected executor rpc type: %T", raw) } diff --git a/plugins/executor/client.go b/plugins/executor/client.go index 08b429d95..089966c43 100644 --- a/plugins/executor/client.go +++ b/plugins/executor/client.go @@ -8,12 +8,12 @@ import ( "github.com/LK4D4/joincontext" "github.com/golang/protobuf/ptypes" cstructs "github.com/hashicorp/nomad/client/structs" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/executor/proto" ) -var _ structs.Executor = (*grpcExecutorClient)(nil) +var _ executor.Executor = (*grpcExecutorClient)(nil) type grpcExecutorClient struct { client proto.ExecutorClient @@ -22,7 +22,7 @@ type grpcExecutorClient struct { doneCtx context.Context } -func (c *grpcExecutorClient) Launch(cmd *structs.ExecCommand) (*structs.ProcessState, error) { +func (c *grpcExecutorClient) Launch(cmd *executor.ExecCommand) (*executor.ProcessState, error) { ctx := context.Background() req := &proto.LaunchRequest{ Cmd: cmd.Cmd, @@ -48,7 +48,7 @@ func (c *grpcExecutorClient) Launch(cmd *structs.ExecCommand) (*structs.ProcessS return ps, nil } -func (c *grpcExecutorClient) Wait(ctx context.Context) (*structs.ProcessState, error) { +func (c *grpcExecutorClient) Wait(ctx context.Context) (*executor.ProcessState, error) { // Join the passed context and the shutdown context ctx, _ = joincontext.Join(ctx, c.doneCtx) @@ -78,7 +78,7 @@ func (c *grpcExecutorClient) Shutdown(signal string, gracePeriod time.Duration) return nil } -func (c *grpcExecutorClient) UpdateResources(r *structs.Resources) error { +func (c *grpcExecutorClient) UpdateResources(r *executor.Resources) error { ctx := context.Background() req := &proto.UpdateResourcesRequest{Resources: resourcesToProto(r)} if _, err := c.client.UpdateResources(ctx, req); err != nil { @@ -88,13 +88,13 @@ func (c *grpcExecutorClient) UpdateResources(r *structs.Resources) error { return nil } -func (c *grpcExecutorClient) Version() (*structs.ExecutorVersion, error) { +func (c *grpcExecutorClient) Version() (*executor.ExecutorVersion, error) { ctx := context.Background() resp, err := c.client.Version(ctx, &proto.VersionRequest{}) if err != nil { return nil, err } - return &structs.ExecutorVersion{Version: resp.Version}, nil + return &executor.ExecutorVersion{Version: resp.Version}, nil } func (c *grpcExecutorClient) Stats() (*cstructs.TaskResourceUsage, error) { diff --git a/plugins/executor/server.go b/plugins/executor/server.go index d3146dd70..f6b3a0b24 100644 --- a/plugins/executor/server.go +++ b/plugins/executor/server.go @@ -6,19 +6,19 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/hashicorp/consul-template/signals" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/executor/proto" "golang.org/x/net/context" ) type grpcExecutorServer struct { - impl structs.Executor + impl executor.Executor doneCtx context.Context } func (s *grpcExecutorServer) Launch(ctx context.Context, req *proto.LaunchRequest) (*proto.LaunchResponse, error) { - ps, err := s.impl.Launch(&structs.ExecCommand{ + ps, err := s.impl.Launch(&executor.ExecCommand{ Cmd: req.Cmd, Args: req.Args, Resources: resourcesFromProto(req.Resources), diff --git a/plugins/executor/utils.go b/plugins/executor/utils.go index e511c42a1..222e95727 100644 --- a/plugins/executor/utils.go +++ b/plugins/executor/utils.go @@ -2,11 +2,11 @@ package executor import ( "github.com/golang/protobuf/ptypes" - "github.com/hashicorp/nomad/drivers/shared/executor/structs" + "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/executor/proto" ) -func processStateToProto(ps *structs.ProcessState) (*proto.ProcessState, error) { +func processStateToProto(ps *executor.ProcessState) (*proto.ProcessState, error) { timestamp, err := ptypes.TimestampProto(ps.Time) if err != nil { return nil, err @@ -21,13 +21,13 @@ func processStateToProto(ps *structs.ProcessState) (*proto.ProcessState, error) return pb, nil } -func processStateFromProto(pb *proto.ProcessState) (*structs.ProcessState, error) { +func processStateFromProto(pb *proto.ProcessState) (*executor.ProcessState, error) { timestamp, err := ptypes.Timestamp(pb.Time) if err != nil { return nil, err } - return &structs.ProcessState{ + return &executor.ProcessState{ Pid: int(pb.Pid), ExitCode: int(pb.ExitCode), Signal: int(pb.Signal), @@ -35,7 +35,7 @@ func processStateFromProto(pb *proto.ProcessState) (*structs.ProcessState, error }, nil } -func resourcesToProto(r *structs.Resources) *proto.Resources { +func resourcesToProto(r *executor.Resources) *proto.Resources { if r == nil { return &proto.Resources{} } @@ -48,12 +48,12 @@ func resourcesToProto(r *structs.Resources) *proto.Resources { } } -func resourcesFromProto(pb *proto.Resources) *structs.Resources { +func resourcesFromProto(pb *proto.Resources) *executor.Resources { if pb == nil { - return &structs.Resources{} + return &executor.Resources{} } - return &structs.Resources{ + return &executor.Resources{ CPU: int(pb.Cpu), MemoryMB: int(pb.MemoryMB), DiskMB: int(pb.DiskMB),