diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index f082ba08e..6b526b846 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -53,7 +53,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) { return false, fmt.Errorf("No updates") } last := upd.Allocs[upd.Count-1] - if last.ClientStatus == structs.AllocClientStatusDead { + if last.ClientStatus != structs.AllocClientStatusDead { return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) } return true, nil @@ -77,7 +77,7 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { return false, fmt.Errorf("No updates") } last := upd.Allocs[upd.Count-1] - if last.ClientStatus == structs.AllocClientStatusRunning { + if last.ClientStatus != structs.AllocClientStatusRunning { return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) } return true, nil diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index ab56f6982..6904cf118 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -8,6 +8,7 @@ import ( "path/filepath" "time" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" ) @@ -37,9 +38,6 @@ type AllocDir struct { // TaskDirs is a mapping of task names to their non-shared directory. TaskDirs map[string]string - - // A list of locations the shared alloc has been mounted to. - Mounted []string } // AllocFileInfo holds information about a file inside the AllocDir @@ -67,13 +65,39 @@ func NewAllocDir(allocDir string) *AllocDir { // Tears down previously build directory structure. func (d *AllocDir) Destroy() error { // Unmount all mounted shared alloc dirs. - for _, m := range d.Mounted { - if err := d.unmountSharedDir(m); err != nil { - return fmt.Errorf("Failed to unmount shared directory: %v", err) - } + var mErr multierror.Error + if err := d.UnmountAll(); err != nil { + mErr.Errors = append(mErr.Errors, err) } - return os.RemoveAll(d.AllocDir) + if err := os.RemoveAll(d.AllocDir); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + + return mErr.ErrorOrNil() +} + +func (d *AllocDir) UnmountAll() error { + var mErr multierror.Error + for _, dir := range d.TaskDirs { + // Check if the directory has the shared alloc mounted. + taskAlloc := filepath.Join(dir, SharedAllocName) + if d.pathExists(taskAlloc) { + if err := d.unmountSharedDir(taskAlloc); err != nil { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("failed to unmount shared alloc dir %q: %v", taskAlloc, err)) + } + if err := os.RemoveAll(taskAlloc); err != nil { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("failed to delete shared alloc dir %q: %v", taskAlloc, err)) + } + } + + // Unmount dev/ and proc/ have been mounted. + d.unmountSpecialDirs(dir) + } + + return mErr.ErrorOrNil() } // Given a list of a task build the correct alloc structure. @@ -248,7 +272,6 @@ func (d *AllocDir) MountSharedDir(task string) error { return fmt.Errorf("Failed to mount shared directory for task %v: %v", task, err) } - d.Mounted = append(d.Mounted, taskLoc) return nil } @@ -325,3 +348,13 @@ func fileCopy(src, dst string, perm os.FileMode) error { return nil } + +// pathExists is a helper function to check if the path exists. +func (d *AllocDir) pathExists(path string) bool { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} diff --git a/client/allocdir/alloc_dir_darwin.go b/client/allocdir/alloc_dir_darwin.go index 9c247d15d..2cfdd38c3 100644 --- a/client/allocdir/alloc_dir_darwin.go +++ b/client/allocdir/alloc_dir_darwin.go @@ -13,3 +13,14 @@ func (d *AllocDir) mountSharedDir(dir string) error { func (d *AllocDir) unmountSharedDir(dir string) error { return syscall.Unlink(dir) } + +// MountSpecialDirs mounts the dev and proc file system on the chroot of the +// task. It's a no-op on darwin. +func (d *AllocDir) MountSpecialDirs(taskDir string) error { + return nil +} + +// unmountSpecialDirs unmounts the dev and proc file system from the chroot +func (d *AllocDir) unmountSpecialDirs(taskDir string) error { + return nil +} diff --git a/client/allocdir/alloc_dir_linux.go b/client/allocdir/alloc_dir_linux.go index 89d5df6a4..d8d44af1f 100644 --- a/client/allocdir/alloc_dir_linux.go +++ b/client/allocdir/alloc_dir_linux.go @@ -1,8 +1,12 @@ package allocdir import ( + "fmt" "os" + "path/filepath" "syscall" + + "github.com/hashicorp/go-multierror" ) // Bind mounts the shared directory into the task directory. Must be root to @@ -18,3 +22,62 @@ func (d *AllocDir) mountSharedDir(taskDir string) error { func (d *AllocDir) unmountSharedDir(dir string) error { return syscall.Unmount(dir, 0) } + +// MountSpecialDirs mounts the dev and proc file system from the host to the +// chroot +func (d *AllocDir) MountSpecialDirs(taskDir string) error { + // Mount dev + dev := filepath.Join(taskDir, "dev") + if !d.pathExists(dev) { + if err := os.Mkdir(dev, 0777); err != nil { + return fmt.Errorf("Mkdir(%v) failed: %v", dev, err) + } + + if err := syscall.Mount("none", dev, "devtmpfs", syscall.MS_RDONLY, ""); err != nil { + return fmt.Errorf("Couldn't mount /dev to %v: %v", dev, err) + } + } + + // Mount proc + proc := filepath.Join(taskDir, "proc") + if !d.pathExists(proc) { + if err := os.Mkdir(proc, 0777); err != nil { + return fmt.Errorf("Mkdir(%v) failed: %v", proc, err) + } + + if err := syscall.Mount("none", proc, "proc", syscall.MS_RDONLY, ""); err != nil { + return fmt.Errorf("Couldn't mount /proc to %v: %v", proc, err) + } + } + + return nil +} + +// unmountSpecialDirs unmounts the dev and proc file system from the chroot +func (d *AllocDir) unmountSpecialDirs(taskDir string) error { + errs := new(multierror.Error) + dev := filepath.Join(taskDir, "dev") + if d.pathExists(dev) { + if err := syscall.Unmount(dev, 0); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed to unmount dev (%v): %v", dev, err)) + } + + if err := os.RemoveAll(dev); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed to delete dev directory (%v): %v", dev, err)) + } + } + + // Unmount proc. + proc := filepath.Join(taskDir, "proc") + if d.pathExists(proc) { + if err := syscall.Unmount(proc, 0); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed to unmount proc (%v): %v", proc, err)) + } + + if err := os.RemoveAll(proc); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed to delete proc directory (%v): %v", dev, err)) + } + } + + return errs.ErrorOrNil() +} diff --git a/client/allocdir/alloc_dir_windows.go b/client/allocdir/alloc_dir_windows.go index ab0692b5b..7211125ae 100644 --- a/client/allocdir/alloc_dir_windows.go +++ b/client/allocdir/alloc_dir_windows.go @@ -23,3 +23,14 @@ func (d *AllocDir) dropDirPermissions(path string) error { func (d *AllocDir) unmountSharedDir(dir string) error { return nil } + +// MountSpecialDirs mounts the dev and proc file system on the chroot of the +// task. It's a no-op on windows. +func (d *AllocDir) MountSpecialDirs(taskDir string) error { + return nil +} + +// unmountSpecialDirs unmounts the dev and proc file system from the chroot +func (d *AllocDir) unmountSpecialDirs(taskDir string) error { + return nil +} diff --git a/client/config/config.go b/client/config/config.go index 9e07bc12e..dd2c2b967 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -57,6 +57,14 @@ type Config struct { // Node provides the base node Node *structs.Node + // ClientMaxPort is the upper range of the ports that the client uses for + // communicating with plugin subsystems + ClientMaxPort uint + + // ClientMinPort is the lower range of the ports that the client uses for + // communicating with plugin subsystems + ClientMinPort uint + // Options provides arbitrary key-value configuration for nomad internals, // like fingerprinters and drivers. The format is: // diff --git a/client/driver/driver.go b/client/driver/driver.go index f92b84fad..db808b18d 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -89,8 +89,14 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration { max := d.config.MaxKillTimeout.Nanoseconds() desired := task.KillTimeout.Nanoseconds() + + // Make the minimum time between signal and kill, 1 second. + if desired == 0 { + desired = (1 * time.Second).Nanoseconds() + } + if desired < max { - return task.KillTimeout + return time.Duration(desired) } return d.config.MaxKillTimeout diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 0ab20d813..f76f8c68e 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -46,6 +46,7 @@ func testConfig() *config.Config { conf := &config.Config{} conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() + conf.MaxKillTimeout = 10 * time.Second return conf } diff --git a/client/driver/env/env.go b/client/driver/env/env.go index 1846403ac..6df5fe64f 100644 --- a/client/driver/env/env.go +++ b/client/driver/env/env.go @@ -55,26 +55,26 @@ const ( // TaskEnvironment is used to expose information to a task via environment // variables and provide interpolation of Nomad variables. type TaskEnvironment struct { - env map[string]string - meta map[string]string - allocDir string - taskDir string - cpuLimit int - memLimit int - node *structs.Node - networks []*structs.NetworkResource - portMap map[string]int + Env map[string]string + Meta map[string]string + AllocDir string + TaskDir string + CpuLimit int + MemLimit int + Node *structs.Node + Networks []*structs.NetworkResource + PortMap map[string]int // taskEnv is the variables that will be set in the tasks environment - taskEnv map[string]string + TaskEnv map[string]string // nodeValues is the values that are allowed for interprolation from the // node. - nodeValues map[string]string + NodeValues map[string]string } func NewTaskEnvironment(node *structs.Node) *TaskEnvironment { - return &TaskEnvironment{node: node} + return &TaskEnvironment{Node: node} } // ParseAndReplace takes the user supplied args replaces any instance of an @@ -82,7 +82,7 @@ func NewTaskEnvironment(node *structs.Node) *TaskEnvironment { func (t *TaskEnvironment) ParseAndReplace(args []string) []string { replaced := make([]string, len(args)) for i, arg := range args { - replaced[i] = hargs.ReplaceEnv(arg, t.taskEnv, t.nodeValues) + replaced[i] = hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues) } return replaced @@ -92,75 +92,75 @@ func (t *TaskEnvironment) ParseAndReplace(args []string) []string { // and nomad variables. If the variable is found in the passed map it is // replaced, otherwise the original string is returned. func (t *TaskEnvironment) ReplaceEnv(arg string) string { - return hargs.ReplaceEnv(arg, t.taskEnv, t.nodeValues) + return hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues) } // Build must be called after all the tasks environment values have been set. func (t *TaskEnvironment) Build() *TaskEnvironment { - t.nodeValues = make(map[string]string) - t.taskEnv = make(map[string]string) + t.NodeValues = make(map[string]string) + t.TaskEnv = make(map[string]string) // Build the task metadata - for k, v := range t.meta { - t.taskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v + for k, v := range t.Meta { + t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v } // Build the ports - for _, network := range t.networks { - for label, value := range network.MapLabelToValues(t.portMap) { + for _, network := range t.Networks { + for label, value := range network.MapLabelToValues(t.PortMap) { IPPort := fmt.Sprintf("%s:%d", network.IP, value) - t.taskEnv[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort + t.TaskEnv[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort // Pass an explicit port mapping to the environment - if port, ok := t.portMap[label]; ok { - t.taskEnv[fmt.Sprintf("%s%s", HostPortPrefix, label)] = strconv.Itoa(port) + if port, ok := t.PortMap[label]; ok { + t.TaskEnv[fmt.Sprintf("%s%s", HostPortPrefix, label)] = strconv.Itoa(port) } } } // Build the directories - if t.allocDir != "" { - t.taskEnv[AllocDir] = t.allocDir + if t.AllocDir != "" { + t.TaskEnv[AllocDir] = t.AllocDir } - if t.taskDir != "" { - t.taskEnv[TaskLocalDir] = t.taskDir + if t.TaskDir != "" { + t.TaskEnv[TaskLocalDir] = t.TaskDir } // Build the resource limits - if t.memLimit != 0 { - t.taskEnv[MemLimit] = strconv.Itoa(t.memLimit) + if t.MemLimit != 0 { + t.TaskEnv[MemLimit] = strconv.Itoa(t.MemLimit) } - if t.cpuLimit != 0 { - t.taskEnv[CpuLimit] = strconv.Itoa(t.cpuLimit) + if t.CpuLimit != 0 { + t.TaskEnv[CpuLimit] = strconv.Itoa(t.CpuLimit) } // Build the node - if t.node != nil { + if t.Node != nil { // Set up the node values. - t.nodeValues[nodeIdKey] = t.node.ID - t.nodeValues[nodeDcKey] = t.node.Datacenter - t.nodeValues[nodeNameKey] = t.node.Name - t.nodeValues[nodeClassKey] = t.node.NodeClass + t.NodeValues[nodeIdKey] = t.Node.ID + t.NodeValues[nodeDcKey] = t.Node.Datacenter + t.NodeValues[nodeNameKey] = t.Node.Name + t.NodeValues[nodeClassKey] = t.Node.NodeClass // Set up the attributes. - for k, v := range t.node.Attributes { - t.nodeValues[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v + for k, v := range t.Node.Attributes { + t.NodeValues[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v } // Set up the meta. - for k, v := range t.node.Meta { - t.nodeValues[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v + for k, v := range t.Node.Meta { + t.NodeValues[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v } } // Interpret the environment variables - interpreted := make(map[string]string, len(t.env)) - for k, v := range t.env { - interpreted[k] = hargs.ReplaceEnv(v, t.nodeValues, t.taskEnv) + interpreted := make(map[string]string, len(t.Env)) + for k, v := range t.Env { + interpreted[k] = hargs.ReplaceEnv(v, t.NodeValues, t.TaskEnv) } for k, v := range interpreted { - t.taskEnv[k] = v + t.TaskEnv[k] = v } return t @@ -169,7 +169,7 @@ func (t *TaskEnvironment) Build() *TaskEnvironment { // EnvList returns a list of strings with NAME=value pairs. func (t *TaskEnvironment) EnvList() []string { env := []string{} - for k, v := range t.taskEnv { + for k, v := range t.TaskEnv { env = append(env, fmt.Sprintf("%s=%s", k, v)) } @@ -178,8 +178,8 @@ func (t *TaskEnvironment) EnvList() []string { // EnvMap returns a copy of the tasks environment variables. func (t *TaskEnvironment) EnvMap() map[string]string { - m := make(map[string]string, len(t.taskEnv)) - for k, v := range t.taskEnv { + m := make(map[string]string, len(t.TaskEnv)) + for k, v := range t.TaskEnv { m[k] = v } @@ -188,95 +188,95 @@ func (t *TaskEnvironment) EnvMap() map[string]string { // Builder methods to build the TaskEnvironment func (t *TaskEnvironment) SetAllocDir(dir string) *TaskEnvironment { - t.allocDir = dir + t.AllocDir = dir return t } func (t *TaskEnvironment) ClearAllocDir() *TaskEnvironment { - t.allocDir = "" + t.AllocDir = "" return t } func (t *TaskEnvironment) SetTaskLocalDir(dir string) *TaskEnvironment { - t.taskDir = dir + t.TaskDir = dir return t } func (t *TaskEnvironment) ClearTaskLocalDir() *TaskEnvironment { - t.taskDir = "" + t.TaskDir = "" return t } func (t *TaskEnvironment) SetMemLimit(limit int) *TaskEnvironment { - t.memLimit = limit + t.MemLimit = limit return t } func (t *TaskEnvironment) ClearMemLimit() *TaskEnvironment { - t.memLimit = 0 + t.MemLimit = 0 return t } func (t *TaskEnvironment) SetCpuLimit(limit int) *TaskEnvironment { - t.cpuLimit = limit + t.CpuLimit = limit return t } func (t *TaskEnvironment) ClearCpuLimit() *TaskEnvironment { - t.cpuLimit = 0 + t.CpuLimit = 0 return t } func (t *TaskEnvironment) SetNetworks(networks []*structs.NetworkResource) *TaskEnvironment { - t.networks = networks + t.Networks = networks return t } func (t *TaskEnvironment) clearNetworks() *TaskEnvironment { - t.networks = nil + t.Networks = nil return t } func (t *TaskEnvironment) SetPortMap(portMap map[string]int) *TaskEnvironment { - t.portMap = portMap + t.PortMap = portMap return t } func (t *TaskEnvironment) clearPortMap() *TaskEnvironment { - t.portMap = nil + t.PortMap = nil return t } // Takes a map of meta values to be passed to the task. The keys are capatilized // when the environent variable is set. func (t *TaskEnvironment) SetMeta(m map[string]string) *TaskEnvironment { - t.meta = m + t.Meta = m return t } func (t *TaskEnvironment) ClearMeta() *TaskEnvironment { - t.meta = nil + t.Meta = nil return t } func (t *TaskEnvironment) SetEnvvars(m map[string]string) *TaskEnvironment { - t.env = m + t.Env = m return t } // Appends the given environment variables. func (t *TaskEnvironment) AppendEnvvars(m map[string]string) *TaskEnvironment { - if t.env == nil { - t.env = make(map[string]string, len(m)) + if t.Env == nil { + t.Env = make(map[string]string, len(m)) } for k, v := range m { - t.env[k] = v + t.Env[k] = v } return t } func (t *TaskEnvironment) ClearEnvvars() *TaskEnvironment { - t.env = nil + t.Env = nil return t } diff --git a/client/driver/exec.go b/client/driver/exec.go index 191175d78..35e73dd1b 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -4,13 +4,19 @@ import ( "encoding/json" "fmt" "log" + "os/exec" + "path/filepath" "syscall" "time" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/getter" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" ) @@ -30,11 +36,15 @@ type ExecDriverConfig struct { // execHandle is returned from Start/Open as a handle to the PID type execHandle struct { - cmd executor.Executor - killTimeout time.Duration - logger *log.Logger - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + pluginClient *plugin.Client + executor executor.Executor + isolationConfig *executor.IsolationConfig + userPid int + allocDir *allocdir.AllocDir + killTimeout time.Duration + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewExecDriver is used to create a new exec driver @@ -92,39 +102,58 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } } - // Setup the command - execCtx := executor.NewExecutorContext(d.taskEnv) - cmd := executor.Command(execCtx, command, driverConfig.Args...) - if err := cmd.Limit(task.Resources); err != nil { - return nil, fmt.Errorf("failed to constrain resources: %s", err) + bin, err := discover.NomadExecutable() + if err != nil { + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) + } + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) + pluginConfig := &plugin.ClientConfig{ + Cmd: exec.Command(bin, "executor", pluginLogFile), } - // Populate environment variables - cmd.Command().Env = d.taskEnv.EnvList() - - if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil { - return nil, fmt.Errorf("failed to configure task directory: %v", err) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + return nil, err } - - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start command: %v", err) + executorCtx := &executor.ExecutorContext{ + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + TaskName: task.Name, + TaskResources: task.Resources, + ResourceLimits: true, + FSIsolation: true, + UnprivilegedUser: true, } + ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) + if err != nil { + pluginClient.Kill() + return nil, fmt.Errorf("error starting process via the plugin: %v", err) + } + d.logger.Printf("[DEBUG] driver.exec: started process via plugin with pid: %v", ps.Pid) // Return a driver handle h := &execHandle{ - cmd: cmd, - killTimeout: d.DriverContext.KillTimeout(task), - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + userPid: ps.Pid, + executor: exec, + allocDir: ctx.AllocDir, + isolationConfig: ps.IsolationConfig, + killTimeout: d.DriverContext.KillTimeout(task), + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } type execId struct { - ExecutorId string - KillTimeout time.Duration + KillTimeout time.Duration + UserPid int + TaskDir string + AllocDir *allocdir.AllocDir + IsolationConfig *executor.IsolationConfig + PluginConfig *ExecutorReattachConfig } func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -133,30 +162,51 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } - // Find the process - execCtx := executor.NewExecutorContext(d.taskEnv) - cmd, err := executor.OpenId(execCtx, id.ExecutorId) + pluginConfig := &plugin.ClientConfig{ + Reattach: id.PluginConfig.PluginConfig(), + } + exec, client, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { - return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) + merrs := new(multierror.Error) + merrs.Errors = append(merrs.Errors, err) + d.logger.Println("[ERROR] driver.exec: error connecting to plugin so destroying plugin pid and user pid") + if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil { + merrs.Errors = append(merrs.Errors, fmt.Errorf("error destroying plugin and userpid: %v", e)) + } + if id.IsolationConfig != nil { + if e := executor.DestroyCgroup(id.IsolationConfig.Cgroup); e != nil { + merrs.Errors = append(merrs.Errors, fmt.Errorf("destroying cgroup failed: %v", e)) + } + } + if e := ctx.AllocDir.UnmountAll(); e != nil { + merrs.Errors = append(merrs.Errors, e) + } + return nil, fmt.Errorf("error connecting to plugin: %v", merrs.ErrorOrNil()) } // Return a driver handle h := &execHandle{ - cmd: cmd, - logger: d.logger, - killTimeout: id.KillTimeout, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: client, + executor: exec, + userPid: id.UserPid, + allocDir: id.AllocDir, + isolationConfig: id.IsolationConfig, + logger: d.logger, + killTimeout: id.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } func (h *execHandle) ID() string { - executorId, _ := h.cmd.ID() id := execId{ - ExecutorId: executorId, - KillTimeout: h.killTimeout, + KillTimeout: h.killTimeout, + PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + UserPid: h.userPid, + AllocDir: h.allocDir, + IsolationConfig: h.isolationConfig, } data, err := json.Marshal(id) @@ -179,18 +229,46 @@ func (h *execHandle) Update(task *structs.Task) error { } func (h *execHandle) Kill() error { - h.cmd.Shutdown() + if err := h.executor.ShutDown(); err != nil { + return fmt.Errorf("executor Shutdown failed: %v", err) + } + select { case <-h.doneCh: return nil case <-time.After(h.killTimeout): - return h.cmd.ForceStop() + if h.pluginClient.Exited() { + return nil + } + if err := h.executor.Exit(); err != nil { + return fmt.Errorf("executor Exit failed: %v", err) + } + + return nil } } func (h *execHandle) run() { - res := h.cmd.Wait() + ps, err := h.executor.Wait() close(h.doneCh) - h.waitCh <- res + + // If the exitcode is 0 and we had an error that means the plugin didn't + // connect and doesn't know the state of the user process so we are killing + // the user process so that when we create a new executor on restarting the + // new user process doesn't have collisions with resources that the older + // user pid might be holding onto. + if ps.ExitCode == 0 && err != nil { + if h.isolationConfig != nil { + if e := executor.DestroyCgroup(h.isolationConfig.Cgroup); e != nil { + h.logger.Printf("[ERROR] driver.exec: destroying cgroup failed while killing cgroup: %v", e) + } + } + if e := h.allocDir.UnmountAll(); e != nil { + h.logger.Printf("[ERROR] driver.exec: unmounting dev,proc and alloc dirs failed: %v", e) + } + } + h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, + Err: err} close(h.waitCh) + h.pluginClient.Kill() } diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 853c90cb0..85da6bfa5 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -1,10 +1,13 @@ package driver import ( + "encoding/json" "fmt" "io/ioutil" + "os" "path/filepath" "reflect" + "syscall" "testing" "time" @@ -70,6 +73,66 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) { if handle2 == nil { t.Fatalf("missing handle") } + + handle.Kill() + handle2.Kill() +} + +func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { + t.Parallel() + ctestutils.ExecCompatible(t) + task := &structs.Task{ + Name: "sleep", + Config: map[string]interface{}{ + "command": "/bin/sleep", + "args": []string{"1000000"}, + }, + Resources: basicResources, + } + + driverCtx, execCtx := testDriverContexts(task) + defer execCtx.AllocDir.Destroy() + d := NewExecDriver(driverCtx) + + handle, err := d.Start(execCtx, task) + defer handle.Kill() + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + id := &execId{} + if err := json.Unmarshal([]byte(handle.ID()), id); err != nil { + t.Fatalf("Failed to parse handle '%s': %v", handle.ID(), err) + } + pluginPid := id.PluginConfig.Pid + proc, err := os.FindProcess(pluginPid) + if err != nil { + t.Fatalf("can't find plugin pid: %v", pluginPid) + } + if err := proc.Kill(); err != nil { + t.Fatalf("can't kill plugin pid: %v", err) + } + + // Attempt to open + handle2, err := d.Open(execCtx, handle.ID()) + if err == nil { + t.Fatalf("expected error") + } + if handle2 != nil { + handle2.Kill() + t.Fatalf("expected handle2 to be nil") + } + // Test if the userpid is still present + userProc, err := os.FindProcess(id.UserPid) + + err = userProc.Signal(syscall.Signal(0)) + + if err == nil { + t.Fatalf("expected user process to die") + } } func TestExecDriver_Start_Wait(t *testing.T) { @@ -259,9 +322,10 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": "/bin/sleep", - "args": []string{"1"}, + "args": []string{"100"}, }, - Resources: basicResources, + Resources: basicResources, + KillTimeout: 10 * time.Second, } driverCtx, execCtx := testDriverContexts(task) diff --git a/client/driver/executor/exec.go b/client/driver/executor/exec.go deleted file mode 100644 index 5acf8eccc..000000000 --- a/client/driver/executor/exec.go +++ /dev/null @@ -1,123 +0,0 @@ -// Package executor is used to invoke child processes across various operating -// systems in a way that provides the following features: -// -// - Least privilege -// - Resource constraints -// - Process isolation -// -// An operating system may be something like "windows" or "linux with systemd". -// Executors allow drivers like `exec` and `java` to share an implementation -// for isolation capabilities on a particular operating system. -// -// For example: -// -// - `exec` and `java` on Linux use a cgroups executor -// - `exec` and `java` on FreeBSD use a jails executor -// -// However, drivers that provide their own isolation should not use executors. -// For example, using an executor to start QEMU means that the QEMU call is -// run inside a chroot+cgroup, even though the VM already provides isolation for -// the task running inside it. This is an extraneous level of indirection. -package executor - -import ( - "fmt" - "os/exec" - "path/filepath" - - "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/nomad/structs" - - "github.com/hashicorp/nomad/client/driver/env" - cstructs "github.com/hashicorp/nomad/client/driver/structs" -) - -var errNoResources = fmt.Errorf("No resources are associated with this task") - -// Executor is an interface that any platform- or capability-specific exec -// wrapper must implement. You should not need to implement a Java executor. -// Rather, you would implement a cgroups executor that the Java driver will use. -type Executor interface { - // Limit must be called before Start and restricts the amount of resources - // the process can use. Note that an error may be returned ONLY IF the - // executor implements resource limiting. Otherwise Limit is ignored. - Limit(*structs.Resources) error - - // ConfigureTaskDir must be called before Start and ensures that the tasks - // directory is properly configured. - ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error - - // Start the process. This may wrap the actual process in another command, - // depending on the capabilities in this environment. Errors that arise from - // Limits or Runas may bubble through Start() - Start() error - - // Open should be called to restore a previous execution. This might be needed if - // nomad is restarted. - Open(string) error - - // Wait waits till the user's command is completed. - Wait() *cstructs.WaitResult - - // Returns a handle that is executor specific for use in reopening. - ID() (string, error) - - // Shutdown should use a graceful stop mechanism so the application can - // perform checkpointing or cleanup, if such a mechanism is available. - // If such a mechanism is not available, Shutdown() should call ForceStop(). - Shutdown() error - - // ForceStop will terminate the process without waiting for cleanup. Every - // implementations must provide this. - ForceStop() error - - // Command provides access the underlying Cmd struct in case the Executor - // interface doesn't expose the functionality you need. - Command() *exec.Cmd -} - -// ExecutorContext is a means to inject dependencies such as loggers, configs, and -// node attributes into a Driver without having to change the Driver interface -// each time we do it. Used in conjection with Factory, above. -type ExecutorContext struct { - taskEnv *env.TaskEnvironment -} - -// NewExecutorContext initializes a new DriverContext with the specified fields. -func NewExecutorContext(taskEnv *env.TaskEnvironment) *ExecutorContext { - return &ExecutorContext{ - taskEnv: taskEnv, - } -} - -// Command returns a platform-specific Executor -func Command(ctx *ExecutorContext, name string, args ...string) Executor { - executor := NewExecutor(ctx) - SetCommand(executor, name, args) - return executor -} - -func SetCommand(e Executor, name string, args []string) { - cmd := e.Command() - cmd.Path = name - cmd.Args = append([]string{name}, args...) - - if filepath.Base(name) == name { - if lp, err := exec.LookPath(name); err != nil { - // cmd.lookPathErr = err - } else { - cmd.Path = lp - } - } -} - -// OpenId is similar to executor.Command but will attempt to reopen with the -// passed ID. -func OpenId(ctx *ExecutorContext, id string) (Executor, error) { - executor := NewExecutor(ctx) - err := executor.Open(id) - if err != nil { - return nil, err - } - return executor, nil -} diff --git a/client/driver/executor/exec_basic.go b/client/driver/executor/exec_basic.go deleted file mode 100644 index b0b687245..000000000 --- a/client/driver/executor/exec_basic.go +++ /dev/null @@ -1,131 +0,0 @@ -package executor - -import ( - "bytes" - "encoding/json" - "fmt" - "os" - "os/exec" - "path/filepath" - "runtime" - "strings" - - "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/driver/spawn" - "github.com/hashicorp/nomad/nomad/structs" - - cstructs "github.com/hashicorp/nomad/client/driver/structs" -) - -// BasicExecutor should work everywhere, and as a result does not include -// any resource restrictions or runas capabilities. -type BasicExecutor struct { - *ExecutorContext - cmd exec.Cmd - spawn *spawn.Spawner - taskName string - taskDir string - allocDir string -} - -func NewBasicExecutor(ctx *ExecutorContext) Executor { - return &BasicExecutor{ExecutorContext: ctx} -} - -func (e *BasicExecutor) Limit(resources *structs.Resources) error { - if resources == nil { - return errNoResources - } - return nil -} - -func (e *BasicExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error { - taskDir, ok := alloc.TaskDirs[taskName] - if !ok { - return fmt.Errorf("Couldn't find task directory for task %v", taskName) - } - e.cmd.Dir = taskDir - - e.taskDir = taskDir - e.taskName = taskName - e.allocDir = alloc.AllocDir - return nil -} - -func (e *BasicExecutor) Start() error { - // Parse the commands arguments and replace instances of Nomad environment - // variables. - e.cmd.Path = e.taskEnv.ReplaceEnv(e.cmd.Path) - e.cmd.Args = e.taskEnv.ParseAndReplace(e.cmd.Args) - e.cmd.Env = e.taskEnv.Build().EnvList() - - spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status")) - e.spawn = spawn.NewSpawner(spawnState) - e.spawn.SetCommand(&e.cmd) - e.spawn.SetLogs(&spawn.Logs{ - Stdout: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", e.taskName)), - Stderr: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", e.taskName)), - Stdin: os.DevNull, - }) - - return e.spawn.Spawn(nil) -} - -func (e *BasicExecutor) Open(id string) error { - var spawn spawn.Spawner - dec := json.NewDecoder(strings.NewReader(id)) - if err := dec.Decode(&spawn); err != nil { - return fmt.Errorf("Failed to parse id: %v", err) - } - - // Setup the executor. - e.spawn = &spawn - return e.spawn.Valid() -} - -func (e *BasicExecutor) Wait() *cstructs.WaitResult { - return e.spawn.Wait() -} - -func (e *BasicExecutor) ID() (string, error) { - if e.spawn == nil { - return "", fmt.Errorf("Process was never started") - } - - var buffer bytes.Buffer - enc := json.NewEncoder(&buffer) - if err := enc.Encode(e.spawn); err != nil { - return "", fmt.Errorf("Failed to serialize id: %v", err) - } - - return buffer.String(), nil -} - -func (e *BasicExecutor) Shutdown() error { - proc, err := os.FindProcess(e.spawn.UserPid) - if err != nil { - return fmt.Errorf("Failed to find user processes %v: %v", e.spawn.UserPid, err) - } - - if runtime.GOOS == "windows" { - return proc.Kill() - } - - return proc.Signal(os.Interrupt) -} - -func (e *BasicExecutor) ForceStop() error { - proc, err := os.FindProcess(e.spawn.UserPid) - if err != nil { - return fmt.Errorf("Failed to find user processes %v: %v", e.spawn.UserPid, err) - } - - if err := proc.Kill(); err != nil && err.Error() != "os: process already finished" { - return err - } - return nil -} - -func (e *BasicExecutor) Command() *exec.Cmd { - return &e.cmd -} diff --git a/client/driver/executor/exec_basic_test.go b/client/driver/executor/exec_basic_test.go deleted file mode 100644 index 1a829b95d..000000000 --- a/client/driver/executor/exec_basic_test.go +++ /dev/null @@ -1,8 +0,0 @@ -package executor - -import "testing" - -func TestExecutorBasic(t *testing.T) { - t.Parallel() - testExecutor(t, NewBasicExecutor, nil) -} diff --git a/client/driver/executor/exec_linux.go b/client/driver/executor/exec_linux.go deleted file mode 100644 index 221751644..000000000 --- a/client/driver/executor/exec_linux.go +++ /dev/null @@ -1,426 +0,0 @@ -package executor - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "os" - "os/exec" - "os/user" - "path/filepath" - "strconv" - "strings" - "sync" - "syscall" - - "github.com/hashicorp/go-multierror" - "github.com/opencontainers/runc/libcontainer/cgroups" - cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" - "github.com/opencontainers/runc/libcontainer/cgroups/systemd" - cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" - - "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/driver/spawn" - cstructs "github.com/hashicorp/nomad/client/driver/structs" - "github.com/hashicorp/nomad/nomad/structs" -) - -var ( - // A mapping of directories on the host OS to attempt to embed inside each - // task's chroot. - chrootEnv = map[string]string{ - "/bin": "/bin", - "/etc": "/etc", - "/lib": "/lib", - "/lib32": "/lib32", - "/lib64": "/lib64", - "/usr/bin": "/usr/bin", - "/usr/lib": "/usr/lib", - "/usr/share": "/usr/share", - } -) - -func NewExecutor(ctx *ExecutorContext) Executor { - return NewLinuxExecutor(ctx) -} - -func NewLinuxExecutor(ctx *ExecutorContext) Executor { - return &LinuxExecutor{ExecutorContext: ctx} -} - -// Linux executor is designed to run on linux kernel 2.8+. -type LinuxExecutor struct { - *ExecutorContext - cmd exec.Cmd - user *user.User - l sync.Mutex - - // Isolation configurations. - groups *cgroupConfig.Cgroup - taskName string - taskDir string - allocDir string - - // Spawn process. - spawn *spawn.Spawner -} - -func (e *LinuxExecutor) Command() *exec.Cmd { - return &e.cmd -} - -func (e *LinuxExecutor) Limit(resources *structs.Resources) error { - if resources == nil { - return errNoResources - } - - return e.configureCgroups(resources) -} - -// execLinuxID contains the necessary information to reattach to an executed -// process and cleanup the created cgroups. -type ExecLinuxID struct { - Groups *cgroupConfig.Cgroup - Spawn *spawn.Spawner - TaskDir string -} - -func (e *LinuxExecutor) Open(id string) error { - // De-serialize the ID. - dec := json.NewDecoder(strings.NewReader(id)) - var execID ExecLinuxID - if err := dec.Decode(&execID); err != nil { - return fmt.Errorf("Failed to parse id: %v", err) - } - - // Setup the executor. - e.groups = execID.Groups - e.spawn = execID.Spawn - e.taskDir = execID.TaskDir - return e.spawn.Valid() -} - -func (e *LinuxExecutor) ID() (string, error) { - if e.groups == nil || e.spawn == nil || e.taskDir == "" { - return "", fmt.Errorf("LinuxExecutor not properly initialized.") - } - - // Build the ID. - id := ExecLinuxID{ - Groups: e.groups, - Spawn: e.spawn, - TaskDir: e.taskDir, - } - - var buffer bytes.Buffer - enc := json.NewEncoder(&buffer) - if err := enc.Encode(id); err != nil { - return "", fmt.Errorf("Failed to serialize id: %v", err) - } - - return buffer.String(), nil -} - -// runAs takes a user id as a string and looks up the user, and sets the command -// to execute as that user. -func (e *LinuxExecutor) runAs(userid string) error { - u, err := user.Lookup(userid) - if err != nil { - return fmt.Errorf("Failed to identify user %v: %v", userid, err) - } - - // Convert the uid and gid - uid, err := strconv.ParseUint(u.Uid, 10, 32) - if err != nil { - return fmt.Errorf("Unable to convert userid to uint32: %s", err) - } - gid, err := strconv.ParseUint(u.Gid, 10, 32) - if err != nil { - return fmt.Errorf("Unable to convert groupid to uint32: %s", err) - } - - // Set the command to run as that user and group. - if e.cmd.SysProcAttr == nil { - e.cmd.SysProcAttr = &syscall.SysProcAttr{} - } - if e.cmd.SysProcAttr.Credential == nil { - e.cmd.SysProcAttr.Credential = &syscall.Credential{} - } - e.cmd.SysProcAttr.Credential.Uid = uint32(uid) - e.cmd.SysProcAttr.Credential.Gid = uint32(gid) - - return nil -} - -func (e *LinuxExecutor) Start() error { - // Run as "nobody" user so we don't leak root privilege to the spawned - // process. - if err := e.runAs("nobody"); err != nil { - return err - } - - // Parse the commands arguments and replace instances of Nomad environment - // variables. - e.cmd.Path = e.taskEnv.ReplaceEnv(e.cmd.Path) - e.cmd.Args = e.taskEnv.ParseAndReplace(e.cmd.Args) - e.cmd.Env = e.taskEnv.EnvList() - - spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status")) - e.spawn = spawn.NewSpawner(spawnState) - e.spawn.SetCommand(&e.cmd) - e.spawn.SetChroot(e.taskDir) - e.spawn.SetLogs(&spawn.Logs{ - Stdout: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", e.taskName)), - Stderr: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", e.taskName)), - Stdin: os.DevNull, - }) - - enterCgroup := func(pid int) error { - // Join the spawn-daemon to the cgroup. - manager := e.getCgroupManager(e.groups) - - // Apply will place the spawn dameon into the created cgroups. - if err := manager.Apply(pid); err != nil { - return fmt.Errorf("Failed to join spawn-daemon to the cgroup (%+v): %v", e.groups, err) - } - - return nil - } - - return e.spawn.Spawn(enterCgroup) -} - -// Wait waits til the user process exits and returns an error on non-zero exit -// codes. Wait also cleans up the task directory and created cgroups. -func (e *LinuxExecutor) Wait() *cstructs.WaitResult { - errs := new(multierror.Error) - res := e.spawn.Wait() - if res.Err != nil { - errs = multierror.Append(errs, res.Err) - } - - if err := e.destroyCgroup(); err != nil { - errs = multierror.Append(errs, err) - } - - if err := e.cleanTaskDir(); err != nil { - errs = multierror.Append(errs, err) - } - - res.Err = errs.ErrorOrNil() - return res -} - -// Shutdown sends the user process an interrupt signal indicating that it is -// about to be forcefully shutdown in sometime -func (e *LinuxExecutor) Shutdown() error { - proc, err := os.FindProcess(e.spawn.UserPid) - if err != nil { - return fmt.Errorf("Failed to find user processes %v: %v", e.spawn.UserPid, err) - } - - return proc.Signal(os.Interrupt) -} - -// ForceStop immediately exits the user process and cleans up both the task -// directory and the cgroups. -func (e *LinuxExecutor) ForceStop() error { - errs := new(multierror.Error) - if err := e.destroyCgroup(); err != nil { - errs = multierror.Append(errs, err) - } - - if err := e.cleanTaskDir(); err != nil { - errs = multierror.Append(errs, err) - } - - return errs.ErrorOrNil() -} - -// Task Directory related functions. - -// ConfigureTaskDir creates the necessary directory structure for a proper -// chroot. cleanTaskDir should be called after. -func (e *LinuxExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error { - e.taskName = taskName - e.allocDir = alloc.AllocDir - - taskDir, ok := alloc.TaskDirs[taskName] - if !ok { - fmt.Errorf("Couldn't find task directory for task %v", taskName) - } - e.taskDir = taskDir - - if err := alloc.MountSharedDir(taskName); err != nil { - return err - } - - if err := alloc.Embed(taskName, chrootEnv); err != nil { - return err - } - - // Mount dev - dev := filepath.Join(taskDir, "dev") - if !e.pathExists(dev) { - if err := os.Mkdir(dev, 0777); err != nil { - return fmt.Errorf("Mkdir(%v) failed: %v", dev, err) - } - - if err := syscall.Mount("none", dev, "devtmpfs", syscall.MS_RDONLY, ""); err != nil { - return fmt.Errorf("Couldn't mount /dev to %v: %v", dev, err) - } - } - - // Mount proc - proc := filepath.Join(taskDir, "proc") - if !e.pathExists(proc) { - if err := os.Mkdir(proc, 0777); err != nil { - return fmt.Errorf("Mkdir(%v) failed: %v", proc, err) - } - - if err := syscall.Mount("none", proc, "proc", syscall.MS_RDONLY, ""); err != nil { - return fmt.Errorf("Couldn't mount /proc to %v: %v", proc, err) - } - } - - // Set the tasks AllocDir environment variable. - e.taskEnv.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName)).SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)).Build() - return nil -} - -// pathExists is a helper function to check if the path exists. -func (e *LinuxExecutor) pathExists(path string) bool { - if _, err := os.Stat(path); err != nil { - if os.IsNotExist(err) { - return false - } - } - return true -} - -// cleanTaskDir is an idempotent operation to clean the task directory and -// should be called when tearing down the task. -func (e *LinuxExecutor) cleanTaskDir() error { - // Prevent a race between Wait/ForceStop - e.l.Lock() - defer e.l.Unlock() - - // Unmount dev. - errs := new(multierror.Error) - dev := filepath.Join(e.taskDir, "dev") - if e.pathExists(dev) { - if err := syscall.Unmount(dev, 0); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed to unmount dev (%v): %v", dev, err)) - } - - if err := os.RemoveAll(dev); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed to delete dev directory (%v): %v", dev, err)) - } - } - - // Unmount proc. - proc := filepath.Join(e.taskDir, "proc") - if e.pathExists(proc) { - if err := syscall.Unmount(proc, 0); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed to unmount proc (%v): %v", proc, err)) - } - - if err := os.RemoveAll(proc); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed to delete proc directory (%v): %v", dev, err)) - } - } - - return errs.ErrorOrNil() -} - -// Cgroup related functions. - -// configureCgroups converts a Nomad Resources specification into the equivalent -// cgroup configuration. It returns an error if the resources are invalid. -func (e *LinuxExecutor) configureCgroups(resources *structs.Resources) error { - e.groups = &cgroupConfig.Cgroup{} - e.groups.Resources = &cgroupConfig.Resources{} - e.groups.Name = structs.GenerateUUID() - - // TODO: verify this is needed for things like network access - e.groups.Resources.AllowAllDevices = true - - if resources.MemoryMB > 0 { - // Total amount of memory allowed to consume - e.groups.Resources.Memory = int64(resources.MemoryMB * 1024 * 1024) - // Disable swap to avoid issues on the machine - e.groups.Resources.MemorySwap = int64(-1) - } - - if resources.CPU < 2 { - return fmt.Errorf("resources.CPU must be equal to or greater than 2: %v", resources.CPU) - } - - // Set the relative CPU shares for this cgroup. - e.groups.Resources.CpuShares = int64(resources.CPU) - - if resources.IOPS != 0 { - // Validate it is in an acceptable range. - if resources.IOPS < 10 || resources.IOPS > 1000 { - return fmt.Errorf("resources.IOPS must be between 10 and 1000: %d", resources.IOPS) - } - - e.groups.Resources.BlkioWeight = uint16(resources.IOPS) - } - - return nil -} - -// destroyCgroup kills all processes in the cgroup and removes the cgroup -// configuration from the host. -func (e *LinuxExecutor) destroyCgroup() error { - if e.groups == nil { - return errors.New("Can't destroy: cgroup configuration empty") - } - - // Prevent a race between Wait/ForceStop - e.l.Lock() - defer e.l.Unlock() - - manager := e.getCgroupManager(e.groups) - pids, err := manager.GetPids() - if err != nil { - return fmt.Errorf("Failed to get pids in the cgroup %v: %v", e.groups.Name, err) - } - - errs := new(multierror.Error) - for _, pid := range pids { - process, err := os.FindProcess(pid) - if err != nil { - multierror.Append(errs, fmt.Errorf("Failed to find Pid %v: %v", pid, err)) - continue - } - - if err := process.Kill(); err != nil && err.Error() != "os: process already finished" { - multierror.Append(errs, fmt.Errorf("Failed to kill Pid %v: %v", pid, err)) - continue - } - } - - // Remove the cgroup. - if err := manager.Destroy(); err != nil { - multierror.Append(errs, fmt.Errorf("Failed to delete the cgroup directories: %v", err)) - } - - if len(errs.Errors) != 0 { - return fmt.Errorf("Failed to destroy cgroup: %v", errs) - } - - return nil -} - -// getCgroupManager returns the correct libcontainer cgroup manager. -func (e *LinuxExecutor) getCgroupManager(groups *cgroupConfig.Cgroup) cgroups.Manager { - var manager cgroups.Manager - manager = &cgroupFs.Manager{Cgroups: groups} - if systemd.UseSystemd() { - manager = &systemd.Manager{Cgroups: groups} - } - return manager -} diff --git a/client/driver/executor/exec_linux_test.go b/client/driver/executor/exec_linux_test.go deleted file mode 100644 index 6cc2d104c..000000000 --- a/client/driver/executor/exec_linux_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package executor - -import ( - "testing" - - ctestutil "github.com/hashicorp/nomad/client/testutil" - "github.com/hashicorp/nomad/helper/testtask" -) - -func init() { - // Add test binary to chroot during test run. - chrootEnv[testtask.Path()] = testtask.Path() -} - -func TestExecutorLinux(t *testing.T) { - t.Parallel() - testExecutor(t, NewLinuxExecutor, ctestutil.ExecCompatible) -} diff --git a/client/driver/executor/exec_universal.go b/client/driver/executor/exec_universal.go deleted file mode 100644 index 5ce25ec8e..000000000 --- a/client/driver/executor/exec_universal.go +++ /dev/null @@ -1,14 +0,0 @@ -// +build !linux - -package executor - -func NewExecutor(ctx *ExecutorContext) Executor { - return &UniversalExecutor{ - BasicExecutor: NewBasicExecutor(ctx).(*BasicExecutor), - } -} - -// UniversalExecutor wraps the BasicExecutor -type UniversalExecutor struct { - *BasicExecutor -} diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go new file mode 100644 index 000000000..2de8e7bca --- /dev/null +++ b/client/driver/executor/executor.go @@ -0,0 +1,259 @@ +package executor + +import ( + "fmt" + "log" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "sync" + "syscall" + "time" + + "github.com/hashicorp/go-multierror" + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/driver/env" + "github.com/hashicorp/nomad/nomad/structs" +) + +// ExecutorContext holds context to configure the command user +// wants to run and isolate it +type ExecutorContext struct { + + // TaskEnv holds information about the environment of a Task + TaskEnv *env.TaskEnvironment + + // AllocDir is the handle to do operations on the alloc dir of + // the task + AllocDir *allocdir.AllocDir + + // TaskName is the name of the Task + TaskName string + + // TaskResources are the resource constraints for the Task + TaskResources *structs.Resources + + // FSIsolation is a flag for drivers to impose file system + // isolation on certain platforms + FSIsolation bool + + // ResourceLimits is a flag for drivers to impose resource + // contraints on a Task on certain platforms + ResourceLimits bool + + // UnprivilegedUser is a flag for drivers to make the process + // run as nobody + UnprivilegedUser bool +} + +// ExecCommand holds the user command and args. It's a lightweight replacement +// of exec.Cmd for serialization purposes. +type ExecCommand struct { + Cmd string + Args []string +} + +// IsolationConfig has information about the isolation mechanism the executor +// uses to put resource constraints and isolation on the user process +type IsolationConfig struct { + Cgroup *cgroupConfig.Cgroup +} + +// ProcessState holds information about the state of a user process. +type ProcessState struct { + Pid int + ExitCode int + Signal int + IsolationConfig *IsolationConfig + Time time.Time +} + +// Executor is the interface which allows a driver to launch and supervise +// a process +type Executor interface { + LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) + Wait() (*ProcessState, error) + ShutDown() error + Exit() error +} + +// UniversalExecutor is an implementation of the Executor which launches and +// supervises processes. In addition to process supervision it provides resource +// and file system isolation +type UniversalExecutor struct { + cmd exec.Cmd + ctx *ExecutorContext + + taskDir string + groups *cgroupConfig.Cgroup + exitState *ProcessState + processExited chan interface{} + + logger *log.Logger + lock sync.Mutex +} + +// NewExecutor returns an Executor +func NewExecutor(logger *log.Logger) Executor { + return &UniversalExecutor{logger: logger, processExited: make(chan interface{})} +} + +// LaunchCmd launches a process and returns it's state. It also configures an +// applies isolation on certain platforms. +func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) { + e.logger.Printf("[DEBUG] executor: launching command %v %v", command.Cmd, strings.Join(command.Args, "")) + + e.ctx = ctx + + // configuring the task dir + if err := e.configureTaskDir(); err != nil { + return nil, err + } + + // configuring the chroot, cgroup and enters the plugin process in the + // chroot + if err := e.configureIsolation(); err != nil { + return nil, err + } + + // setting the user of the process + if e.ctx.UnprivilegedUser { + if err := e.runAs("nobody"); err != nil { + return nil, err + } + } + + // configuring log rotate + stdoPath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", ctx.TaskName)) + stdo, err := os.OpenFile(stdoPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + e.cmd.Stdout = stdo + + stdePath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", ctx.TaskName)) + stde, err := os.OpenFile(stdePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + e.cmd.Stderr = stde + + // setting the env, path and args for the command + e.ctx.TaskEnv.Build() + e.cmd.Env = ctx.TaskEnv.EnvList() + e.cmd.Path = ctx.TaskEnv.ReplaceEnv(command.Cmd) + e.cmd.Args = append([]string{e.cmd.Path}, ctx.TaskEnv.ParseAndReplace(command.Args)...) + if filepath.Base(command.Cmd) == command.Cmd { + if lp, err := exec.LookPath(command.Cmd); err != nil { + } else { + e.cmd.Path = lp + } + } + + // starting the process + if err := e.cmd.Start(); err != nil { + return nil, fmt.Errorf("error starting command: %v", err) + } + go e.wait() + ic := &IsolationConfig{Cgroup: e.groups} + return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil +} + +// Wait waits until a process has exited and returns it's exitcode and errors +func (e *UniversalExecutor) Wait() (*ProcessState, error) { + <-e.processExited + return e.exitState, nil +} + +func (e *UniversalExecutor) wait() { + defer close(e.processExited) + err := e.cmd.Wait() + if err == nil { + e.exitState = &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()} + return + } + exitCode := 1 + if exitErr, ok := err.(*exec.ExitError); ok { + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + } + } + if e.ctx.FSIsolation { + e.removeChrootMounts() + } + if e.ctx.ResourceLimits { + e.lock.Lock() + DestroyCgroup(e.groups) + e.lock.Unlock() + } + e.exitState = &ProcessState{Pid: 0, ExitCode: exitCode, Time: time.Now()} +} + +var ( + // finishedErr is the error message received when trying to kill and already + // exited process. + finishedErr = "os: process already finished" +) + +// Exit cleans up the alloc directory, destroys cgroups and kills the user +// process +func (e *UniversalExecutor) Exit() error { + var merr multierror.Error + if e.cmd.Process != nil { + proc, err := os.FindProcess(e.cmd.Process.Pid) + if err != nil { + e.logger.Printf("[ERROR] executor: can't find process with pid: %v, err: %v", + e.cmd.Process.Pid, err) + } else if err := proc.Kill(); err != nil && err.Error() != finishedErr { + merr.Errors = append(merr.Errors, + fmt.Errorf("can't kill process with pid: %v, err: %v", e.cmd.Process.Pid, err)) + } + } + + if e.ctx.FSIsolation { + if err := e.removeChrootMounts(); err != nil { + merr.Errors = append(merr.Errors, err) + } + } + if e.ctx.ResourceLimits { + e.lock.Lock() + if err := DestroyCgroup(e.groups); err != nil { + merr.Errors = append(merr.Errors, err) + } + e.lock.Unlock() + } + return merr.ErrorOrNil() +} + +// Shutdown sends an interrupt signal to the user process +func (e *UniversalExecutor) ShutDown() error { + if e.cmd.Process == nil { + return fmt.Errorf("executor.shutdown error: no process found") + } + proc, err := os.FindProcess(e.cmd.Process.Pid) + if err != nil { + return fmt.Errorf("executor.shutdown error: %v", err) + } + if runtime.GOOS == "windows" { + return proc.Kill() + } + if err = proc.Signal(os.Interrupt); err != nil { + return fmt.Errorf("executor.shutdown error: %v", err) + } + return nil +} + +// configureTaskDir sets the task dir in the executor +func (e *UniversalExecutor) configureTaskDir() error { + taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.TaskName] + e.taskDir = taskDir + if !ok { + return fmt.Errorf("couldn't find task directory for task %v", e.ctx.TaskName) + } + e.cmd.Dir = taskDir + return nil +} diff --git a/client/driver/executor/executor_basic.go b/client/driver/executor/executor_basic.go new file mode 100644 index 000000000..9e531a547 --- /dev/null +++ b/client/driver/executor/executor_basic.go @@ -0,0 +1,31 @@ +// +build !linux + +package executor + +import ( + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" +) + +func (e *UniversalExecutor) configureChroot() error { + return nil +} + +func DestroyCgroup(groups *cgroupConfig.Cgroup) error { + return nil +} + +func (e *UniversalExecutor) removeChrootMounts() error { + return nil +} + +func (e *UniversalExecutor) runAs(userid string) error { + return nil +} + +func (e *UniversalExecutor) applyLimits(pid int) error { + return nil +} + +func (e *UniversalExecutor) configureIsolation() error { + return nil +} diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go new file mode 100644 index 000000000..d156736f4 --- /dev/null +++ b/client/driver/executor/executor_linux.go @@ -0,0 +1,224 @@ +package executor + +import ( + "fmt" + "os" + "os/user" + "path/filepath" + "strconv" + "syscall" + + "github.com/hashicorp/go-multierror" + "github.com/opencontainers/runc/libcontainer/cgroups" + cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" + "github.com/opencontainers/runc/libcontainer/cgroups/systemd" + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/nomad/structs" +) + +var ( + // A mapping of directories on the host OS to attempt to embed inside each + // task's chroot. + chrootEnv = map[string]string{ + "/bin": "/bin", + "/etc": "/etc", + "/lib": "/lib", + "/lib32": "/lib32", + "/lib64": "/lib64", + "/usr/bin": "/usr/bin", + "/usr/lib": "/usr/lib", + "/usr/share": "/usr/share", + } +) + +// configureIsolation configures chroot and creates cgroups +func (e *UniversalExecutor) configureIsolation() error { + if e.ctx.FSIsolation { + if err := e.configureChroot(); err != nil { + return err + } + } + + if e.ctx.ResourceLimits { + if err := e.configureCgroups(e.ctx.TaskResources); err != nil { + return fmt.Errorf("error creating cgroups: %v", err) + } + if err := e.applyLimits(os.Getpid()); err != nil { + if er := DestroyCgroup(e.groups); er != nil { + e.logger.Printf("[ERROR] executor: error destroying cgroup: %v", er) + } + if er := e.removeChrootMounts(); er != nil { + e.logger.Printf("[ERROR] executor: error removing chroot: %v", er) + } + return fmt.Errorf("error entering the plugin process in the cgroup: %v:", err) + } + } + return nil +} + +// applyLimits puts a process in a pre-configured cgroup +func (e *UniversalExecutor) applyLimits(pid int) error { + if !e.ctx.ResourceLimits { + return nil + } + + // Entering the process in the cgroup + manager := getCgroupManager(e.groups) + if err := manager.Apply(pid); err != nil { + e.logger.Printf("[ERROR] executor: unable to join cgroup: %v", err) + if err := e.Exit(); err != nil { + e.logger.Printf("[ERROR] executor: unable to kill process: %v", err) + } + return err + } + + return nil +} + +// configureCgroups converts a Nomad Resources specification into the equivalent +// cgroup configuration. It returns an error if the resources are invalid. +func (e *UniversalExecutor) configureCgroups(resources *structs.Resources) error { + e.groups = &cgroupConfig.Cgroup{} + e.groups.Resources = &cgroupConfig.Resources{} + e.groups.Name = structs.GenerateUUID() + + // TODO: verify this is needed for things like network access + e.groups.Resources.AllowAllDevices = true + + if resources.MemoryMB > 0 { + // Total amount of memory allowed to consume + e.groups.Resources.Memory = int64(resources.MemoryMB * 1024 * 1024) + // Disable swap to avoid issues on the machine + e.groups.Resources.MemorySwap = int64(-1) + } + + if resources.CPU < 2 { + return fmt.Errorf("resources.CPU must be equal to or greater than 2: %v", resources.CPU) + } + + // Set the relative CPU shares for this cgroup. + e.groups.Resources.CpuShares = int64(resources.CPU) + + if resources.IOPS != 0 { + // Validate it is in an acceptable range. + if resources.IOPS < 10 || resources.IOPS > 1000 { + return fmt.Errorf("resources.IOPS must be between 10 and 1000: %d", resources.IOPS) + } + + e.groups.Resources.BlkioWeight = uint16(resources.IOPS) + } + + return nil +} + +// runAs takes a user id as a string and looks up the user, and sets the command +// to execute as that user. +func (e *UniversalExecutor) runAs(userid string) error { + u, err := user.Lookup(userid) + if err != nil { + return fmt.Errorf("Failed to identify user %v: %v", userid, err) + } + + // Convert the uid and gid + uid, err := strconv.ParseUint(u.Uid, 10, 32) + if err != nil { + return fmt.Errorf("Unable to convert userid to uint32: %s", err) + } + gid, err := strconv.ParseUint(u.Gid, 10, 32) + if err != nil { + return fmt.Errorf("Unable to convert groupid to uint32: %s", err) + } + + // Set the command to run as that user and group. + if e.cmd.SysProcAttr == nil { + e.cmd.SysProcAttr = &syscall.SysProcAttr{} + } + if e.cmd.SysProcAttr.Credential == nil { + e.cmd.SysProcAttr.Credential = &syscall.Credential{} + } + e.cmd.SysProcAttr.Credential.Uid = uint32(uid) + e.cmd.SysProcAttr.Credential.Gid = uint32(gid) + + return nil +} + +// configureChroot configures a chroot +func (e *UniversalExecutor) configureChroot() error { + allocDir := e.ctx.AllocDir + if err := allocDir.MountSharedDir(e.ctx.TaskName); err != nil { + return err + } + + if err := allocDir.Embed(e.ctx.TaskName, chrootEnv); err != nil { + return err + } + + // Set the tasks AllocDir environment variable. + e.ctx.TaskEnv.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName)).SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)).Build() + + if e.cmd.SysProcAttr == nil { + e.cmd.SysProcAttr = &syscall.SysProcAttr{} + } + e.cmd.SysProcAttr.Chroot = e.taskDir + e.cmd.Dir = "/" + + if err := allocDir.MountSpecialDirs(e.taskDir); err != nil { + return err + } + + return nil +} + +// cleanTaskDir is an idempotent operation to clean the task directory and +// should be called when tearing down the task. +func (e *UniversalExecutor) removeChrootMounts() error { + // Prevent a race between Wait/ForceStop + e.lock.Lock() + defer e.lock.Unlock() + return e.ctx.AllocDir.UnmountAll() +} + +// destroyCgroup kills all processes in the cgroup and removes the cgroup +// configuration from the host. +func DestroyCgroup(groups *cgroupConfig.Cgroup) error { + merrs := new(multierror.Error) + if groups == nil { + return fmt.Errorf("Can't destroy: cgroup configuration empty") + } + + manager := getCgroupManager(groups) + if pids, perr := manager.GetPids(); perr == nil { + for _, pid := range pids { + proc, err := os.FindProcess(pid) + if err != nil { + merrs.Errors = append(merrs.Errors, fmt.Errorf("error finding process %v: %v", pid, err)) + } else { + if e := proc.Kill(); e != nil { + merrs.Errors = append(merrs.Errors, fmt.Errorf("error killing process %v: %v", pid, e)) + } + } + } + } + + // Remove the cgroup. + if err := manager.Destroy(); err != nil { + multierror.Append(merrs, fmt.Errorf("Failed to delete the cgroup directories: %v", err)) + } + + if len(merrs.Errors) != 0 { + return fmt.Errorf("errors while destroying cgroup: %v", merrs) + } + return nil +} + +// getCgroupManager returns the correct libcontainer cgroup manager. +func getCgroupManager(groups *cgroupConfig.Cgroup) cgroups.Manager { + var manager cgroups.Manager + manager = &cgroupFs.Manager{Cgroups: groups} + if systemd.UseSystemd() { + manager = &systemd.Manager{Cgroups: groups} + } + return manager +} diff --git a/client/driver/executor/executor_test.go b/client/driver/executor/executor_test.go new file mode 100644 index 000000000..26876e7e9 --- /dev/null +++ b/client/driver/executor/executor_test.go @@ -0,0 +1,203 @@ +package executor + +import ( + "io/ioutil" + "log" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/driver/env" + "github.com/hashicorp/nomad/client/testutil" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + tu "github.com/hashicorp/nomad/testutil" +) + +var ( + constraint = &structs.Resources{ + CPU: 250, + MemoryMB: 256, + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + MBits: 50, + DynamicPorts: []structs.Port{{Label: "http"}}, + }, + }, + } +) + +func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) { + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + + allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID)) + if err := allocDir.Build([]*structs.Task{task}); err != nil { + log.Panicf("allocDir.Build() failed: %v", err) + } + + return task.Name, allocDir +} + +func testExecutorContext(t *testing.T) *ExecutorContext { + taskEnv := env.NewTaskEnvironment(mock.Node()) + taskName, allocDir := mockAllocDir(t) + ctx := &ExecutorContext{ + TaskEnv: taskEnv, + TaskName: taskName, + AllocDir: allocDir, + TaskResources: constraint, + } + return ctx +} + +func TestExecutor_Start_Invalid(t *testing.T) { + invalid := "/bin/foobar" + execCmd := ExecCommand{Cmd: invalid, Args: []string{"1"}} + ctx := testExecutorContext(t) + defer ctx.AllocDir.Destroy() + executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) + _, err := executor.LaunchCmd(&execCmd, ctx) + if err == nil { + t.Fatalf("Expected error") + } +} + +func TestExecutor_Start_Wait_Failure_Code(t *testing.T) { + execCmd := ExecCommand{Cmd: "/bin/sleep", Args: []string{"fail"}} + ctx := testExecutorContext(t) + defer ctx.AllocDir.Destroy() + executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) + ps, _ := executor.LaunchCmd(&execCmd, ctx) + if ps.Pid == 0 { + t.Fatalf("expected process to start and have non zero pid") + } + ps, _ = executor.Wait() + if ps.ExitCode < 1 { + t.Fatalf("expected exit code to be non zero, actual: %v", ps.ExitCode) + } +} + +func TestExecutor_Start_Wait(t *testing.T) { + execCmd := ExecCommand{Cmd: "/bin/echo", Args: []string{"hello world"}} + ctx := testExecutorContext(t) + defer ctx.AllocDir.Destroy() + executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) + ps, err := executor.LaunchCmd(&execCmd, ctx) + if err != nil { + t.Fatalf("error in launching command: %v", err) + } + if ps.Pid == 0 { + t.Fatalf("expected process to start and have non zero pid") + } + ps, err = executor.Wait() + if err != nil { + t.Fatalf("error in waiting for command: %v", err) + } + + task := "web" + taskDir, ok := ctx.AllocDir.TaskDirs[task] + if !ok { + log.Panicf("No task directory found for task %v", task) + } + + expected := "hello world" + file := filepath.Join(allocdir.TaskLocal, "web.stdout") + absFilePath := filepath.Join(taskDir, file) + output, err := ioutil.ReadFile(absFilePath) + if err != nil { + t.Fatalf("Couldn't read file %v", absFilePath) + } + + act := strings.TrimSpace(string(output)) + if act != expected { + t.Fatalf("Command output incorrectly: want %v; got %v", expected, act) + } +} + +func TestExecutor_IsolationAndConstraints(t *testing.T) { + testutil.ExecCompatible(t) + + execCmd := ExecCommand{Cmd: "/bin/echo", Args: []string{"hello world"}} + ctx := testExecutorContext(t) + defer ctx.AllocDir.Destroy() + + ctx.FSIsolation = true + ctx.ResourceLimits = true + ctx.UnprivilegedUser = true + + executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) + ps, err := executor.LaunchCmd(&execCmd, ctx) + if err != nil { + t.Fatalf("error in launching command: %v", err) + } + if ps.Pid == 0 { + t.Fatalf("expected process to start and have non zero pid") + } + ps, err = executor.Wait() + if err != nil { + t.Fatalf("error in waiting for command: %v", err) + } + + task := "web" + taskDir, ok := ctx.AllocDir.TaskDirs[task] + if !ok { + log.Panicf("No task directory found for task %v", task) + } + + expected := "hello world" + file := filepath.Join(allocdir.TaskLocal, "web.stdout") + absFilePath := filepath.Join(taskDir, file) + output, err := ioutil.ReadFile(absFilePath) + if err != nil { + t.Fatalf("Couldn't read file %v", absFilePath) + } + + act := strings.TrimSpace(string(output)) + if act != expected { + t.Fatalf("Command output incorrectly: want %v; got %v", expected, act) + } +} + +func TestExecutor_Start_Kill(t *testing.T) { + execCmd := ExecCommand{Cmd: "/bin/sleep", Args: []string{"10 && hello world"}} + ctx := testExecutorContext(t) + defer ctx.AllocDir.Destroy() + executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) + ps, err := executor.LaunchCmd(&execCmd, ctx) + if err != nil { + t.Fatalf("error in launching command: %v", err) + } + if ps.Pid == 0 { + t.Fatalf("expected process to start and have non zero pid") + } + ps, err = executor.Wait() + if err != nil { + t.Fatalf("error in waiting for command: %v", err) + } + + task := "web" + taskDir, ok := ctx.AllocDir.TaskDirs[task] + if !ok { + t.Fatalf("No task directory found for task %v", task) + } + + file := filepath.Join(allocdir.TaskLocal, "web.stdout") + absFilePath := filepath.Join(taskDir, file) + + time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second) + + output, err := ioutil.ReadFile(absFilePath) + if err != nil { + t.Fatalf("Couldn't read file %v", absFilePath) + } + + expected := "" + act := strings.TrimSpace(string(output)) + if act != expected { + t.Fatalf("Command output incorrectly: want %v; got %v", expected, act) + } +} diff --git a/client/driver/executor/test_harness_test.go b/client/driver/executor/test_harness_test.go deleted file mode 100644 index 5574b2c7b..000000000 --- a/client/driver/executor/test_harness_test.go +++ /dev/null @@ -1,286 +0,0 @@ -package executor - -import ( - "io/ioutil" - "log" - "os" - "path/filepath" - "testing" - "time" - - "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/driver/env" - "github.com/hashicorp/nomad/helper/testtask" - "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" -) - -func TestMain(m *testing.M) { - if !testtask.Run() { - os.Exit(m.Run()) - } -} - -var ( - constraint = &structs.Resources{ - CPU: 250, - MemoryMB: 256, - Networks: []*structs.NetworkResource{ - &structs.NetworkResource{ - MBits: 50, - DynamicPorts: []structs.Port{{Label: "http"}}, - }, - }, - } -) - -func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) { - alloc := mock.Alloc() - task := alloc.Job.TaskGroups[0].Tasks[0] - - allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID)) - if err := allocDir.Build([]*structs.Task{task}); err != nil { - log.Panicf("allocDir.Build() failed: %v", err) - } - - return task.Name, allocDir -} - -func testExecutorContext() *ExecutorContext { - taskEnv := env.NewTaskEnvironment(mock.Node()) - return &ExecutorContext{taskEnv: taskEnv} -} - -func testExecutor(t *testing.T, buildExecutor func(*ExecutorContext) Executor, compatible func(*testing.T)) { - if compatible != nil { - compatible(t) - } - - command := func(name string, args ...string) Executor { - ctx := testExecutorContext() - e := buildExecutor(ctx) - SetCommand(e, name, args) - testtask.SetEnv(ctx.taskEnv) - return e - } - - Executor_Start_Invalid(t, command) - Executor_Start_Wait_Failure_Code(t, command) - Executor_Start_Wait(t, command) - Executor_Start_Kill(t, command) - Executor_Open(t, command, buildExecutor) - Executor_Open_Invalid(t, command, buildExecutor) -} - -type buildExecCommand func(name string, args ...string) Executor - -func Executor_Start_Invalid(t *testing.T, command buildExecCommand) { - invalid := "/bin/foobar" - e := command(invalid, "1") - - if err := e.Limit(constraint); err != nil { - log.Panicf("Limit() failed: %v", err) - } - - task, alloc := mockAllocDir(t) - defer alloc.Destroy() - if err := e.ConfigureTaskDir(task, alloc); err != nil { - log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err == nil { - log.Panicf("Start(%v) should have failed", invalid) - } -} - -func Executor_Start_Wait_Failure_Code(t *testing.T, command buildExecCommand) { - e := command(testtask.Path(), "fail") - - if err := e.Limit(constraint); err != nil { - log.Panicf("Limit() failed: %v", err) - } - - task, alloc := mockAllocDir(t) - defer alloc.Destroy() - if err := e.ConfigureTaskDir(task, alloc); err != nil { - log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err != nil { - log.Panicf("Start() failed: %v", err) - } - - if err := e.Wait(); err == nil { - log.Panicf("Wait() should have failed") - } -} - -func Executor_Start_Wait(t *testing.T, command buildExecCommand) { - task, alloc := mockAllocDir(t) - defer alloc.Destroy() - - taskDir, ok := alloc.TaskDirs[task] - if !ok { - log.Panicf("No task directory found for task %v", task) - } - - expected := "hello world" - file := filepath.Join(allocdir.TaskLocal, "output.txt") - absFilePath := filepath.Join(taskDir, file) - e := command(testtask.Path(), "sleep", "1s", "write", expected, file) - - if err := e.Limit(constraint); err != nil { - log.Panicf("Limit() failed: %v", err) - } - - if err := e.ConfigureTaskDir(task, alloc); err != nil { - log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err != nil { - log.Panicf("Start() failed: %v", err) - } - - if res := e.Wait(); !res.Successful() { - log.Panicf("Wait() failed: %v", res) - } - - output, err := ioutil.ReadFile(absFilePath) - if err != nil { - log.Panicf("Couldn't read file %v", absFilePath) - } - - act := string(output) - if act != expected { - log.Panicf("Command output incorrectly: want %v; got %v", expected, act) - } -} - -func Executor_Start_Kill(t *testing.T, command buildExecCommand) { - task, alloc := mockAllocDir(t) - defer alloc.Destroy() - - taskDir, ok := alloc.TaskDirs[task] - if !ok { - log.Panicf("No task directory found for task %v", task) - } - - filePath := filepath.Join(taskDir, "output") - e := command(testtask.Path(), "sleep", "1s", "write", "failure", filePath) - - if err := e.Limit(constraint); err != nil { - log.Panicf("Limit() failed: %v", err) - } - - if err := e.ConfigureTaskDir(task, alloc); err != nil { - log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err != nil { - log.Panicf("Start() failed: %v", err) - } - - if err := e.Shutdown(); err != nil { - log.Panicf("Shutdown() failed: %v", err) - } - - time.Sleep(time.Duration(testutil.TestMultiplier()*2) * time.Second) - - // Check that the file doesn't exist. - if _, err := os.Stat(filePath); err == nil { - log.Panicf("Stat(%v) should have failed: task not killed", filePath) - } -} - -func Executor_Open(t *testing.T, command buildExecCommand, newExecutor func(*ExecutorContext) Executor) { - task, alloc := mockAllocDir(t) - defer alloc.Destroy() - - taskDir, ok := alloc.TaskDirs[task] - if !ok { - log.Panicf("No task directory found for task %v", task) - } - - expected := "hello world" - file := filepath.Join(allocdir.TaskLocal, "output.txt") - absFilePath := filepath.Join(taskDir, file) - e := command(testtask.Path(), "sleep", "1s", "write", expected, file) - - if err := e.Limit(constraint); err != nil { - log.Panicf("Limit() failed: %v", err) - } - - if err := e.ConfigureTaskDir(task, alloc); err != nil { - log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err != nil { - log.Panicf("Start() failed: %v", err) - } - - id, err := e.ID() - if err != nil { - log.Panicf("ID() failed: %v", err) - } - - e2 := newExecutor(testExecutorContext()) - if err := e2.Open(id); err != nil { - log.Panicf("Open(%v) failed: %v", id, err) - } - - if res := e2.Wait(); !res.Successful() { - log.Panicf("Wait() failed: %v", res) - } - - output, err := ioutil.ReadFile(absFilePath) - if err != nil { - log.Panicf("Couldn't read file %v", absFilePath) - } - - act := string(output) - if act != expected { - log.Panicf("Command output incorrectly: want %v; got %v", expected, act) - } -} - -func Executor_Open_Invalid(t *testing.T, command buildExecCommand, newExecutor func(*ExecutorContext) Executor) { - task, alloc := mockAllocDir(t) - e := command(testtask.Path(), "echo", "foo") - - if err := e.Limit(constraint); err != nil { - log.Panicf("Limit() failed: %v", err) - } - - if err := e.ConfigureTaskDir(task, alloc); err != nil { - log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err != nil { - log.Panicf("Start() failed: %v", err) - } - - id, err := e.ID() - if err != nil { - log.Panicf("ID() failed: %v", err) - } - - // Kill the task because some OSes (windows) will not let us destroy the - // alloc (below) if the task is still running. - if err := e.ForceStop(); err != nil { - log.Panicf("e.ForceStop() failed: %v", err) - } - - // Wait until process is actually gone, we don't care what the result was. - e.Wait() - - // Destroy the allocdir which removes the exit code. - if err := alloc.Destroy(); err != nil { - log.Panicf("alloc.Destroy() failed: %v", err) - } - - e2 := newExecutor(testExecutorContext()) - if err := e2.Open(id); err == nil { - log.Panicf("Open(%v) should have failed", id) - } -} diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go new file mode 100644 index 000000000..5d1af4f5e --- /dev/null +++ b/client/driver/executor_plugin.go @@ -0,0 +1,121 @@ +package driver + +import ( + "io" + "log" + "net" + "net/rpc" + + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/driver/executor" +) + +var HandshakeConfig = plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE", + MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255", +} + +func GetPluginMap(w io.Writer) map[string]plugin.Plugin { + p := new(ExecutorPlugin) + p.logger = log.New(w, "", log.LstdFlags) + return map[string]plugin.Plugin{"executor": p} +} + +// ExecutorReattachConfig is the config that we seralize and de-serialize and +// store in disk +type ExecutorReattachConfig struct { + Pid int + AddrNet string + AddrName string +} + +// PluginConfig returns a config from an ExecutorReattachConfig +func (c *ExecutorReattachConfig) PluginConfig() *plugin.ReattachConfig { + var addr net.Addr + switch c.AddrNet { + case "unix", "unixgram", "unixpacket": + addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName) + case "tcp", "tcp4", "tcp6": + addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName) + } + return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr} +} + +func NewExecutorReattachConfig(c *plugin.ReattachConfig) *ExecutorReattachConfig { + return &ExecutorReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()} +} + +type ExecutorRPC struct { + client *rpc.Client +} + +// LaunchCmdArgs wraps a user command and the args for the purposes of RPC +type LaunchCmdArgs struct { + Cmd *executor.ExecCommand + Ctx *executor.ExecutorContext +} + +func (e *ExecutorRPC) LaunchCmd(cmd *executor.ExecCommand, ctx *executor.ExecutorContext) (*executor.ProcessState, error) { + var ps *executor.ProcessState + err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd, Ctx: ctx}, &ps) + return ps, err +} + +func (e *ExecutorRPC) Wait() (*executor.ProcessState, error) { + var ps executor.ProcessState + err := e.client.Call("Plugin.Wait", new(interface{}), &ps) + return &ps, err +} + +func (e *ExecutorRPC) ShutDown() error { + return e.client.Call("Plugin.ShutDown", new(interface{}), new(interface{})) +} + +func (e *ExecutorRPC) Exit() error { + return e.client.Call("Plugin.Exit", new(interface{}), new(interface{})) +} + +type ExecutorRPCServer struct { + Impl executor.Executor +} + +func (e *ExecutorRPCServer) LaunchCmd(args LaunchCmdArgs, ps *executor.ProcessState) error { + state, err := e.Impl.LaunchCmd(args.Cmd, args.Ctx) + if state != nil { + *ps = *state + } + return err +} + +func (e *ExecutorRPCServer) Wait(args interface{}, ps *executor.ProcessState) error { + state, err := e.Impl.Wait() + if state != nil { + *ps = *state + } + return err +} + +func (e *ExecutorRPCServer) ShutDown(args interface{}, resp *interface{}) error { + return e.Impl.ShutDown() +} + +func (e *ExecutorRPCServer) Exit(args interface{}, resp *interface{}) error { + return e.Impl.Exit() +} + +type ExecutorPlugin struct { + logger *log.Logger + Impl *ExecutorRPCServer +} + +func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { + if p.Impl == nil { + p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutor(p.logger)} + } + return p.Impl, nil +} + +func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { + return &ExecutorRPC{client: c}, nil +} diff --git a/client/driver/java.go b/client/driver/java.go index d8bd394ac..0c206c174 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -12,13 +12,18 @@ import ( "syscall" "time" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-plugin" + "github.com/mitchellh/mapstructure" + + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" - "github.com/mitchellh/mapstructure" ) // JavaDriver is a simple driver to execute applications packaged in Jars. @@ -37,7 +42,13 @@ type JavaDriverConfig struct { // javaHandle is returned from Start/Open as a handle to the PID type javaHandle struct { - cmd executor.Executor + pluginClient *plugin.Client + userPid int + executor executor.Executor + isolationConfig *executor.IsolationConfig + + taskDir string + allocDir *allocdir.AllocDir killTimeout time.Duration logger *log.Logger waitCh chan *cstructs.WaitResult @@ -137,33 +148,48 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, args = append(args, driverConfig.Args...) } - // Setup the command - // Assumes Java is in the $PATH, but could probably be detected - execCtx := executor.NewExecutorContext(d.taskEnv) - cmd := executor.Command(execCtx, "java", args...) - - // Populate environment variables - cmd.Command().Env = d.taskEnv.EnvList() - - if err := cmd.Limit(task.Resources); err != nil { - return nil, fmt.Errorf("failed to constrain resources: %s", err) + bin, err := discover.NomadExecutable() + if err != nil { + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) } - if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil { - return nil, fmt.Errorf("failed to configure task directory: %v", err) + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) + pluginConfig := &plugin.ClientConfig{ + Cmd: exec.Command(bin, "executor", pluginLogFile), } - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start source: %v", err) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + return nil, err } + executorCtx := &executor.ExecutorContext{ + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + TaskName: task.Name, + TaskResources: task.Resources, + FSIsolation: true, + ResourceLimits: true, + UnprivilegedUser: true, + } + ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "java", Args: args}, executorCtx) + if err != nil { + pluginClient.Kill() + return nil, fmt.Errorf("error starting process via the plugin: %v", err) + } + d.logger.Printf("[DEBUG] driver.java: started process with pid: %v", ps.Pid) // Return a driver handle h := &javaHandle{ - cmd: cmd, - killTimeout: d.DriverContext.KillTimeout(task), - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: exec, + userPid: ps.Pid, + isolationConfig: ps.IsolationConfig, + taskDir: taskDir, + allocDir: ctx.AllocDir, + killTimeout: d.DriverContext.KillTimeout(task), + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -171,8 +197,12 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } type javaId struct { - ExecutorId string - KillTimeout time.Duration + KillTimeout time.Duration + PluginConfig *ExecutorReattachConfig + IsolationConfig *executor.IsolationConfig + TaskDir string + AllocDir *allocdir.AllocDir + UserPid int } func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -181,20 +211,41 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } - // Find the process - execCtx := executor.NewExecutorContext(d.taskEnv) - cmd, err := executor.OpenId(execCtx, id.ExecutorId) + pluginConfig := &plugin.ClientConfig{ + Reattach: id.PluginConfig.PluginConfig(), + } + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { - return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) + merrs := new(multierror.Error) + merrs.Errors = append(merrs.Errors, err) + d.logger.Println("[ERROR] driver.java: error connecting to plugin so destroying plugin pid and user pid") + if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil { + merrs.Errors = append(merrs.Errors, fmt.Errorf("error destroying plugin and userpid: %v", e)) + } + if id.IsolationConfig != nil { + if e := executor.DestroyCgroup(id.IsolationConfig.Cgroup); e != nil { + merrs.Errors = append(merrs.Errors, fmt.Errorf("destroying cgroup failed: %v", e)) + } + } + if e := ctx.AllocDir.UnmountAll(); e != nil { + merrs.Errors = append(merrs.Errors, e) + } + + return nil, fmt.Errorf("error connecting to plugin: %v", merrs.ErrorOrNil()) } // Return a driver handle h := &javaHandle{ - cmd: cmd, - logger: d.logger, - killTimeout: id.KillTimeout, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: exec, + userPid: id.UserPid, + isolationConfig: id.IsolationConfig, + taskDir: id.TaskDir, + allocDir: id.AllocDir, + logger: d.logger, + killTimeout: id.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -202,10 +253,13 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro } func (h *javaHandle) ID() string { - executorId, _ := h.cmd.ID() id := javaId{ - ExecutorId: executorId, - KillTimeout: h.killTimeout, + KillTimeout: h.killTimeout, + PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + UserPid: h.userPid, + TaskDir: h.taskDir, + AllocDir: h.allocDir, + IsolationConfig: h.isolationConfig, } data, err := json.Marshal(id) @@ -228,18 +282,33 @@ func (h *javaHandle) Update(task *structs.Task) error { } func (h *javaHandle) Kill() error { - h.cmd.Shutdown() + h.executor.ShutDown() select { case <-h.doneCh: return nil case <-time.After(h.killTimeout): - return h.cmd.ForceStop() + return h.executor.Exit() } } func (h *javaHandle) run() { - res := h.cmd.Wait() + ps, err := h.executor.Wait() close(h.doneCh) - h.waitCh <- res + if ps.ExitCode == 0 && err != nil { + if h.isolationConfig != nil { + if e := executor.DestroyCgroup(h.isolationConfig.Cgroup); e != nil { + h.logger.Printf("[ERROR] driver.java: destroying cgroup failed while killing cgroup: %v", e) + } + } else { + if e := killProcess(h.userPid); e != nil { + h.logger.Printf("[ERROR] driver.java: error killing user process: %v", e) + } + } + if e := h.allocDir.UnmountAll(); e != nil { + h.logger.Printf("[ERROR] driver.java: unmounting dev,proc and alloc dirs failed: %v", e) + } + } + h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + h.pluginClient.Kill() } diff --git a/client/driver/java_test.go b/client/driver/java_test.go index 79d289a9e..a546c611f 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -87,6 +87,7 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { // There is a race condition between the handle waiting and killing. One // will return an error. handle.Kill() + handle2.Kill() } func TestJavaDriver_Start_Wait(t *testing.T) { diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 4235517e0..deae7d273 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -11,11 +11,14 @@ import ( "strings" "time" + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" ) @@ -41,11 +44,14 @@ type QemuDriverConfig struct { // qemuHandle is returned from Start/Open as a handle to the PID type qemuHandle struct { - cmd executor.Executor - killTimeout time.Duration - logger *log.Logger - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + pluginClient *plugin.Client + userPid int + executor executor.Executor + allocDir *allocdir.AllocDir + killTimeout time.Duration + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewQemuDriver is used to create a new exec driver @@ -182,30 +188,44 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, ) } - // Setup the command - execCtx := executor.NewExecutorContext(d.taskEnv) - cmd := executor.Command(execCtx, args[0], args[1:]...) - if err := cmd.Limit(task.Resources); err != nil { - return nil, fmt.Errorf("failed to constrain resources: %s", err) - } - - if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil { - return nil, fmt.Errorf("failed to configure task directory: %v", err) - } - d.logger.Printf("[DEBUG] Starting QemuVM command: %q", strings.Join(args, " ")) - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start command: %v", err) + bin, err := discover.NomadExecutable() + if err != nil { + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) + } + + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) + pluginConfig := &plugin.ClientConfig{ + Cmd: exec.Command(bin, "executor", pluginLogFile), + } + + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + return nil, err + } + executorCtx := &executor.ExecutorContext{ + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + TaskName: task.Name, + TaskResources: task.Resources, + } + ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: args[0], Args: args[1:]}, executorCtx) + if err != nil { + pluginClient.Kill() + return nil, fmt.Errorf("error starting process via the plugin: %v", err) } d.logger.Printf("[INFO] Started new QemuVM: %s", vmID) // Create and Return Handle h := &qemuHandle{ - cmd: cmd, - killTimeout: d.DriverContext.KillTimeout(task), - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: exec, + userPid: ps.Pid, + allocDir: ctx.AllocDir, + killTimeout: d.DriverContext.KillTimeout(task), + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -213,8 +233,10 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } type qemuId struct { - ExecutorId string - KillTimeout time.Duration + KillTimeout time.Duration + UserPid int + PluginConfig *ExecutorReattachConfig + AllocDir *allocdir.AllocDir } func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -223,30 +245,40 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } - // Find the process - execCtx := executor.NewExecutorContext(d.taskEnv) - cmd, err := executor.OpenId(execCtx, id.ExecutorId) + pluginConfig := &plugin.ClientConfig{ + Reattach: id.PluginConfig.PluginConfig(), + } + + executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { - return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) + d.logger.Println("[ERROR] driver.qemu: error connecting to plugin so destroying plugin pid and user pid") + if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil { + d.logger.Printf("[ERROR] driver.qemu: error destroying plugin and userpid: %v", e) + } + return nil, fmt.Errorf("error connecting to plugin: %v", err) } // Return a driver handle - h := &execHandle{ - cmd: cmd, - logger: d.logger, - killTimeout: id.KillTimeout, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + h := &qemuHandle{ + pluginClient: pluginClient, + executor: executor, + userPid: id.UserPid, + allocDir: id.AllocDir, + logger: d.logger, + killTimeout: id.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } func (h *qemuHandle) ID() string { - executorId, _ := h.cmd.ID() id := qemuId{ - ExecutorId: executorId, - KillTimeout: h.killTimeout, + KillTimeout: h.killTimeout, + PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + UserPid: h.userPid, + AllocDir: h.allocDir, } data, err := json.Marshal(id) @@ -271,18 +303,27 @@ func (h *qemuHandle) Update(task *structs.Task) error { // TODO: allow a 'shutdown_command' that can be executed over a ssh connection // to the VM func (h *qemuHandle) Kill() error { - h.cmd.Shutdown() + h.executor.ShutDown() select { case <-h.doneCh: return nil case <-time.After(h.killTimeout): - return h.cmd.ForceStop() + return h.executor.Exit() } } func (h *qemuHandle) run() { - res := h.cmd.Wait() + ps, err := h.executor.Wait() + if ps.ExitCode == 0 && err != nil { + if e := killProcess(h.userPid); e != nil { + h.logger.Printf("[ERROR] driver.qemu: error killing user process: %v", e) + } + if e := h.allocDir.UnmountAll(); e != nil { + h.logger.Printf("[ERROR] driver.qemu: unmounting dev,proc and alloc dirs failed: %v", e) + } + } close(h.doneCh) - h.waitCh <- res + h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + h.pluginClient.Kill() } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 08deca270..fe25ecb04 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -4,13 +4,18 @@ import ( "encoding/json" "fmt" "log" + "os/exec" + "path/filepath" "time" + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" ) @@ -30,11 +35,14 @@ type RawExecDriver struct { // rawExecHandle is returned from Start/Open as a handle to the PID type rawExecHandle struct { - cmd executor.Executor - killTimeout time.Duration - logger *log.Logger - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + pluginClient *plugin.Client + userPid int + executor executor.Executor + killTimeout time.Duration + allocDir *allocdir.AllocDir + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewRawExecDriver is used to create a new raw exec driver @@ -88,40 +96,52 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl } } - // Setup the command - execCtx := executor.NewExecutorContext(d.taskEnv) - cmd := executor.NewBasicExecutor(execCtx) - executor.SetCommand(cmd, command, driverConfig.Args) - if err := cmd.Limit(task.Resources); err != nil { - return nil, fmt.Errorf("failed to constrain resources: %s", err) + bin, err := discover.NomadExecutable() + if err != nil { + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) + } + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) + pluginConfig := &plugin.ClientConfig{ + Cmd: exec.Command(bin, "executor", pluginLogFile), } - // Populate environment variables - cmd.Command().Env = d.taskEnv.EnvList() - - if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil { - return nil, fmt.Errorf("failed to configure task directory: %v", err) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + return nil, err } - - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start command: %v", err) + executorCtx := &executor.ExecutorContext{ + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + TaskName: task.Name, + TaskResources: task.Resources, } + ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) + if err != nil { + pluginClient.Kill() + return nil, fmt.Errorf("error starting process via the plugin: %v", err) + } + d.logger.Printf("[DEBUG] driver.raw_exec: started process with pid: %v", ps.Pid) // Return a driver handle - h := &execHandle{ - cmd: cmd, - killTimeout: d.DriverContext.KillTimeout(task), - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + h := &rawExecHandle{ + pluginClient: pluginClient, + executor: exec, + userPid: ps.Pid, + killTimeout: d.DriverContext.KillTimeout(task), + allocDir: ctx.AllocDir, + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } type rawExecId struct { - ExecutorId string - KillTimeout time.Duration + KillTimeout time.Duration + UserPid int + PluginConfig *ExecutorReattachConfig + AllocDir *allocdir.AllocDir } func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -130,30 +150,39 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } - // Find the process - execCtx := executor.NewExecutorContext(d.taskEnv) - cmd := executor.NewBasicExecutor(execCtx) - if err := cmd.Open(id.ExecutorId); err != nil { - return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) + pluginConfig := &plugin.ClientConfig{ + Reattach: id.PluginConfig.PluginConfig(), + } + executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + d.logger.Println("[ERROR] driver.raw_exec: error connecting to plugin so destroying plugin pid and user pid") + if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil { + d.logger.Printf("[ERROR] driver.raw_exec: error destroying plugin and userpid: %v", e) + } + return nil, fmt.Errorf("error connecting to plugin: %v", err) } // Return a driver handle - h := &execHandle{ - cmd: cmd, - logger: d.logger, - killTimeout: id.KillTimeout, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + h := &rawExecHandle{ + pluginClient: pluginClient, + executor: executor, + userPid: id.UserPid, + logger: d.logger, + killTimeout: id.KillTimeout, + allocDir: id.AllocDir, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } func (h *rawExecHandle) ID() string { - executorId, _ := h.cmd.ID() id := rawExecId{ - ExecutorId: executorId, - KillTimeout: h.killTimeout, + KillTimeout: h.killTimeout, + PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + UserPid: h.userPid, + AllocDir: h.allocDir, } data, err := json.Marshal(id) @@ -176,18 +205,27 @@ func (h *rawExecHandle) Update(task *structs.Task) error { } func (h *rawExecHandle) Kill() error { - h.cmd.Shutdown() + h.executor.ShutDown() select { case <-h.doneCh: return nil case <-time.After(h.killTimeout): - return h.cmd.ForceStop() + return h.executor.Exit() } } func (h *rawExecHandle) run() { - res := h.cmd.Wait() + ps, err := h.executor.Wait() close(h.doneCh) - h.waitCh <- res + if ps.ExitCode == 0 && err != nil { + if e := killProcess(h.userPid); e != nil { + h.logger.Printf("[ERROR] driver.raw_exec: error killing user process: %v", e) + } + if e := h.allocDir.UnmountAll(); e != nil { + h.logger.Printf("[ERROR] driver.raw_exec: unmounting dev,proc and alloc dirs failed: %v", e) + } + } + h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + h.pluginClient.Kill() } diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index 2597af880..ab6a948d7 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -1,10 +1,12 @@ package driver import ( + "encoding/json" "fmt" "io/ioutil" "net/http" "net/http/httptest" + "os" "path/filepath" "reflect" "testing" @@ -91,6 +93,8 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) { case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second): t.Fatalf("timeout") } + handle.Kill() + handle2.Kill() } func TestRawExecDriver_Start_Artifact_basic(t *testing.T) { @@ -285,7 +289,7 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": testtask.Path(), - "args": []string{"sleep", "15s"}, + "args": []string{"sleep", "45s"}, }, Resources: basicResources, } diff --git a/client/driver/spawn/spawn.go b/client/driver/spawn/spawn.go deleted file mode 100644 index 5b3200984..000000000 --- a/client/driver/spawn/spawn.go +++ /dev/null @@ -1,308 +0,0 @@ -package spawn - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "os" - "os/exec" - "strconv" - "time" - - "github.com/hashicorp/go-multierror" - "github.com/hashicorp/nomad/client/driver/structs" - "github.com/hashicorp/nomad/command" - "github.com/hashicorp/nomad/helper/discover" -) - -// Spawner is used to start a user command in an isolated fashion that is -// resistent to Nomad agent failure. -type Spawner struct { - spawn *os.Process - SpawnPid int - SpawnPpid int - StateFile string - UserPid int - - // User configuration - UserCmd *exec.Cmd - Logs *Logs - Chroot string -} - -// Logs is used to define the filepaths the user command's logs should be -// redirected to. The files do not need to exist. -type Logs struct { - Stdin, Stdout, Stderr string -} - -// NewSpawner takes a path to a state file. This state file can be used to -// create a new Spawner that can be used to wait on the exit status of a -// process even through Nomad restarts. -func NewSpawner(stateFile string) *Spawner { - return &Spawner{StateFile: stateFile} -} - -// SetCommand sets the user command to spawn. -func (s *Spawner) SetCommand(cmd *exec.Cmd) { - s.UserCmd = cmd -} - -// SetLogs sets the redirection of user command log files. -func (s *Spawner) SetLogs(l *Logs) { - s.Logs = l -} - -// SetChroot puts the user command into a chroot. -func (s *Spawner) SetChroot(root string) { - s.Chroot = root -} - -// Spawn does a double-fork to start and isolate the user command. It takes a -// call-back that is invoked with the pid of the intermediary process. If the -// call back returns an error, the user command is not started and the spawn is -// cancelled. This can be used to put the process into a cgroup or jail and -// cancel starting the user process if that was not successful. An error is -// returned if the call-back returns an error or the user-command couldn't be -// started. -func (s *Spawner) Spawn(cb func(pid int) error) error { - bin, err := discover.NomadExecutable() - if err != nil { - return fmt.Errorf("Failed to determine the nomad executable: %v", err) - } - - exitFile, err := os.OpenFile(s.StateFile, os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return fmt.Errorf("Error opening file to store exit status: %v", err) - } - defer exitFile.Close() - - config, err := s.spawnConfig() - if err != nil { - return err - } - - spawn := exec.Command(bin, "spawn-daemon", config) - - // Capture stdout - spawnStdout, err := spawn.StdoutPipe() - if err != nil { - return fmt.Errorf("Failed to capture spawn-daemon stdout: %v", err) - } - defer spawnStdout.Close() - - // Capture stdin. - spawnStdin, err := spawn.StdinPipe() - if err != nil { - return fmt.Errorf("Failed to capture spawn-daemon stdin: %v", err) - } - defer spawnStdin.Close() - - if err := spawn.Start(); err != nil { - return fmt.Errorf("Failed to call spawn-daemon on nomad executable: %v", err) - } - - if cb != nil { - cbErr := cb(spawn.Process.Pid) - if cbErr != nil { - errs := new(multierror.Error) - errs = multierror.Append(errs, cbErr) - if err := s.sendAbortCommand(spawnStdin); err != nil { - errs = multierror.Append(errs, err) - } - - return errs - } - } - - if err := s.sendStartCommand(spawnStdin); err != nil { - return err - } - - respCh := make(chan command.SpawnStartStatus, 1) - errCh := make(chan error, 1) - - go func() { - var resp command.SpawnStartStatus - dec := json.NewDecoder(spawnStdout) - if err := dec.Decode(&resp); err != nil { - errCh <- fmt.Errorf("Failed to parse spawn-daemon start response: %v", err) - } - respCh <- resp - }() - - select { - case err := <-errCh: - return err - case resp := <-respCh: - if resp.ErrorMsg != "" { - return fmt.Errorf("Failed to execute user command: %s", resp.ErrorMsg) - } - s.UserPid = resp.UserPID - case <-time.After(5 * time.Second): - return fmt.Errorf("timed out waiting for response") - } - - // Store the spawn process. - s.spawn = spawn.Process - s.SpawnPid = s.spawn.Pid - s.SpawnPpid = os.Getpid() - return nil -} - -// spawnConfig returns a serialized config to pass to the Nomad spawn-daemon -// command. -func (s *Spawner) spawnConfig() (string, error) { - if s.UserCmd == nil { - return "", fmt.Errorf("Must specify user command") - } - - config := command.DaemonConfig{ - Cmd: *s.UserCmd, - Chroot: s.Chroot, - ExitStatusFile: s.StateFile, - } - - if s.Logs != nil { - config.StdoutFile = s.Logs.Stdout - config.StdinFile = s.Logs.Stdin - config.StderrFile = s.Logs.Stderr - } - - var buffer bytes.Buffer - enc := json.NewEncoder(&buffer) - if err := enc.Encode(config); err != nil { - return "", fmt.Errorf("Failed to serialize configuration: %v", err) - } - - return strconv.Quote(buffer.String()), nil -} - -// sendStartCommand sends the necessary command to the spawn-daemon to have it -// start the user process. -func (s *Spawner) sendStartCommand(w io.Writer) error { - enc := json.NewEncoder(w) - if err := enc.Encode(true); err != nil { - return fmt.Errorf("Failed to serialize start command: %v", err) - } - - return nil -} - -// sendAbortCommand sends the necessary command to the spawn-daemon to have it -// abort starting the user process. This should be invoked if the spawn-daemon -// could not be isolated into a cgroup. -func (s *Spawner) sendAbortCommand(w io.Writer) error { - enc := json.NewEncoder(w) - if err := enc.Encode(false); err != nil { - return fmt.Errorf("Failed to serialize abort command: %v", err) - } - - return nil -} - -// Wait returns the exit code of the user process or an error if the wait -// failed. -func (s *Spawner) Wait() *structs.WaitResult { - if os.Getpid() == s.SpawnPpid { - return s.waitAsParent() - } - - return s.pollWait() -} - -// waitAsParent waits on the process if the current process was the spawner. -func (s *Spawner) waitAsParent() *structs.WaitResult { - if s.SpawnPpid != os.Getpid() { - return structs.NewWaitResult(-1, 0, fmt.Errorf("not the parent. Spawner parent is %v; current pid is %v", s.SpawnPpid, os.Getpid())) - } - - // Try to reattach to the spawn. - if s.spawn == nil { - // If it can't be reattached, it means the spawn process has exited so - // we should just read its exit file. - var err error - if s.spawn, err = os.FindProcess(s.SpawnPid); err != nil { - return s.pollWait() - } - } - - if _, err := s.spawn.Wait(); err != nil { - return structs.NewWaitResult(-1, 0, err) - } - - return s.pollWait() -} - -// pollWait polls on the spawn daemon to determine when it exits. After it -// exits, it reads the state file and returns the exit code and possibly an -// error. -func (s *Spawner) pollWait() *structs.WaitResult { - // Stat to check if it is there to avoid a race condition. - stat, err := os.Stat(s.StateFile) - if err != nil { - return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to Stat exit status file %v: %v", s.StateFile, err)) - } - - // If there is data it means that the file has already been written. - if stat.Size() > 0 { - return s.readExitCode() - } - - // Read after the process exits. - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - for range ticker.C { - if !s.Alive() { - break - } - } - - return s.readExitCode() -} - -// readExitCode parses the state file and returns the exit code of the task. It -// returns an error if the file can't be read. -func (s *Spawner) readExitCode() *structs.WaitResult { - f, err := os.Open(s.StateFile) - if err != nil { - return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to open %v to read exit code: %v", s.StateFile, err)) - } - defer f.Close() - - stat, err := f.Stat() - if err != nil { - return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to stat file %v: %v", s.StateFile, err)) - } - - if stat.Size() == 0 { - return structs.NewWaitResult(-1, 0, fmt.Errorf("Empty state file: %v", s.StateFile)) - } - - var exitStatus command.SpawnExitStatus - dec := json.NewDecoder(f) - if err := dec.Decode(&exitStatus); err != nil { - return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to parse exit status from %v: %v", s.StateFile, err)) - } - - return structs.NewWaitResult(exitStatus.ExitCode, 0, nil) -} - -// Valid checks that the state of the Spawner is valid and that a subsequent -// Wait could be called. This is useful to call when reopening a Spawner -// through client restarts. If Valid a nil error is returned. -func (s *Spawner) Valid() error { - // If the spawner is still alive, then the task is running and we can wait - // on it. - if s.Alive() { - return nil - } - - // The task isn't alive so check that there is a valid exit code file. - if res := s.readExitCode(); res.Err == nil { - return nil - } - - return fmt.Errorf("Spawner not alive and exit code not written") -} diff --git a/client/driver/spawn/spawn_posix.go b/client/driver/spawn/spawn_posix.go deleted file mode 100644 index 9b10caddc..000000000 --- a/client/driver/spawn/spawn_posix.go +++ /dev/null @@ -1,20 +0,0 @@ -// +build !windows - -package spawn - -import ( - "os" - "syscall" -) - -func (s *Spawner) Alive() bool { - if s.spawn == nil { - var err error - if s.spawn, err = os.FindProcess(s.SpawnPid); err != nil { - return false - } - } - - err := s.spawn.Signal(syscall.Signal(0)) - return err == nil -} diff --git a/client/driver/spawn/spawn_test.go b/client/driver/spawn/spawn_test.go deleted file mode 100644 index f231a335c..000000000 --- a/client/driver/spawn/spawn_test.go +++ /dev/null @@ -1,335 +0,0 @@ -package spawn - -import ( - "fmt" - "io/ioutil" - "os" - "os/exec" - "strings" - "testing" - "time" - - "github.com/hashicorp/nomad/helper/testtask" -) - -func TestMain(m *testing.M) { - if !testtask.Run() { - os.Exit(m.Run()) - } -} - -func TestSpawn_NoCmd(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - if err := spawn.Spawn(nil); err == nil { - t.Fatalf("Spawn() with no user command should fail") - } -} - -func TestSpawn_InvalidCmd(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - spawn.SetCommand(exec.Command("foo")) // non-existent command - if err := spawn.Spawn(nil); err == nil { - t.Fatalf("Spawn() with an invalid command should fail") - } -} - -func TestSpawn_SetsLogs(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - exp := "foo" - spawn.SetCommand(testCommand("echo", exp)) - - // Create file for stdout. - stdout := tempFileName(t) - defer os.Remove(stdout) - spawn.SetLogs(&Logs{Stdout: stdout}) - - if err := spawn.Spawn(nil); err != nil { - t.Fatalf("Spawn() failed: %v", err) - } - - if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { - t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) - } - - stdout2, err := os.Open(stdout) - if err != nil { - t.Fatalf("Open() failed: %v", err) - } - - data, err := ioutil.ReadAll(stdout2) - if err != nil { - t.Fatalf("ReadAll() failed: %v", err) - } - - act := strings.TrimSpace(string(data)) - if act != exp { - t.Fatalf("Unexpected data written to stdout; got %v; want %v", act, exp) - } -} - -func TestSpawn_Callback(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - spawn.SetCommand(testCommand("sleep", "1s")) - - called := false - cbErr := fmt.Errorf("ERROR CB") - cb := func(_ int) error { - called = true - return cbErr - } - - if err := spawn.Spawn(cb); err == nil { - t.Fatalf("Spawn(%#v) should have errored; want %v", cb, cbErr) - } - - if !called { - t.Fatalf("Spawn(%#v) didn't call callback", cb) - } -} - -func TestSpawn_ParentWaitExited(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - spawn.SetCommand(testCommand("echo", "foo")) - if err := spawn.Spawn(nil); err != nil { - t.Fatalf("Spawn() failed %v", err) - } - - time.Sleep(1 * time.Second) - - if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { - t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) - } -} - -func TestSpawn_ParentWait(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - spawn.SetCommand(testCommand("sleep", "2s")) - if err := spawn.Spawn(nil); err != nil { - t.Fatalf("Spawn() failed %v", err) - } - - if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { - t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) - } -} - -func TestSpawn_NonParentWaitExited(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - spawn.SetCommand(testCommand("echo", "foo")) - if err := spawn.Spawn(nil); err != nil { - t.Fatalf("Spawn() failed %v", err) - } - - time.Sleep(1 * time.Second) - - // Force the wait to assume non-parent. - spawn.SpawnPpid = 0 - if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { - t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) - } -} - -func TestSpawn_NonParentWait(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - spawn.SetCommand(testCommand("sleep", "2s")) - if err := spawn.Spawn(nil); err != nil { - t.Fatalf("Spawn() failed %v", err) - } - - // Need to wait on the spawner, otherwise it becomes a zombie and the test - // only finishes after the init process cleans it. This speeds that up. - go func() { - time.Sleep(3 * time.Second) - if _, err := spawn.spawn.Wait(); err != nil { - t.FailNow() - } - }() - - // Force the wait to assume non-parent. - spawn.SpawnPpid = 0 - if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { - t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) - } -} - -func TestSpawn_DeadSpawnDaemon_Parent(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - var spawnPid int - cb := func(pid int) error { - spawnPid = pid - return nil - } - - spawn := NewSpawner(tempFile) - spawn.SetCommand(testCommand("sleep", "5s")) - if err := spawn.Spawn(cb); err != nil { - t.Fatalf("Spawn() errored: %v", err) - } - - proc, err := os.FindProcess(spawnPid) - if err != nil { - t.FailNow() - } - - if err := proc.Kill(); err != nil { - t.FailNow() - } - - if _, err := proc.Wait(); err != nil { - t.FailNow() - } - - if res := spawn.Wait(); res.Err == nil { - t.Fatalf("Wait() should have failed: %v", res.Err) - } -} - -func TestSpawn_DeadSpawnDaemon_NonParent(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - var spawnPid int - cb := func(pid int) error { - spawnPid = pid - return nil - } - - spawn := NewSpawner(tempFile) - spawn.SetCommand(testCommand("sleep", "2s")) - if err := spawn.Spawn(cb); err != nil { - t.Fatalf("Spawn() errored: %v", err) - } - - proc, err := os.FindProcess(spawnPid) - if err != nil { - t.FailNow() - } - - if err := proc.Kill(); err != nil { - t.FailNow() - } - - if _, err := proc.Wait(); err != nil { - t.FailNow() - } - - // Force the wait to assume non-parent. - spawn.SpawnPpid = 0 - if res := spawn.Wait(); res.Err == nil { - t.Fatalf("Wait() should have failed: %v", res.Err) - } -} - -func TestSpawn_Valid_TaskRunning(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - spawn.SetCommand(testCommand("sleep", "2s")) - if err := spawn.Spawn(nil); err != nil { - t.Fatalf("Spawn() failed %v", err) - } - - if err := spawn.Valid(); err != nil { - t.Fatalf("Valid() failed: %v", err) - } - - if res := spawn.Wait(); res.Err != nil { - t.Fatalf("Wait() failed: %v", res.Err) - } -} - -func TestSpawn_Valid_TaskExit_ExitCode(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - spawn.SetCommand(testCommand("echo", "foo")) - if err := spawn.Spawn(nil); err != nil { - t.Fatalf("Spawn() failed %v", err) - } - - if res := spawn.Wait(); res.Err != nil { - t.Fatalf("Wait() failed: %v", res.Err) - } - - if err := spawn.Valid(); err != nil { - t.Fatalf("Valid() failed: %v", err) - } -} - -func TestSpawn_Valid_TaskExit_NoExitCode(t *testing.T) { - t.Parallel() - tempFile := tempFileName(t) - defer os.Remove(tempFile) - - spawn := NewSpawner(tempFile) - spawn.SetCommand(testCommand("echo", "foo")) - if err := spawn.Spawn(nil); err != nil { - t.Fatalf("Spawn() failed %v", err) - } - - if res := spawn.Wait(); res.Err != nil { - t.Fatalf("Wait() failed: %v", res.Err) - } - - // Delete the file so that it can't find the exit code. - os.Remove(tempFile) - - if err := spawn.Valid(); err == nil { - t.Fatalf("Valid() should have failed") - } -} - -func tempFileName(t *testing.T) string { - f, err := ioutil.TempFile("", "") - if err != nil { - t.Fatalf("TempFile() failed") - } - defer f.Close() - return f.Name() -} - -func testCommand(args ...string) *exec.Cmd { - cmd := exec.Command(testtask.Path(), args...) - testtask.SetCmdEnv(cmd) - return cmd -} diff --git a/client/driver/spawn/spawn_windows.go b/client/driver/spawn/spawn_windows.go deleted file mode 100644 index 9683dce97..000000000 --- a/client/driver/spawn/spawn_windows.go +++ /dev/null @@ -1,21 +0,0 @@ -package spawn - -import "syscall" - -const STILL_ACTIVE = 259 - -func (s *Spawner) Alive() bool { - const da = syscall.STANDARD_RIGHTS_READ | syscall.PROCESS_QUERY_INFORMATION | syscall.SYNCHRONIZE - h, e := syscall.OpenProcess(da, false, uint32(s.SpawnPid)) - if e != nil { - return false - } - - var ec uint32 - e = syscall.GetExitCodeProcess(h, &ec) - if e != nil { - return false - } - - return ec == STILL_ACTIVE -} diff --git a/client/driver/utils.go b/client/driver/utils.go new file mode 100644 index 000000000..188e467d2 --- /dev/null +++ b/client/driver/utils.go @@ -0,0 +1,63 @@ +package driver + +import ( + "fmt" + "io" + "os" + + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/executor" +) + +// createExecutor launches an executor plugin and returns an instance of the +// Executor interface +func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *config.Config) (executor.Executor, *plugin.Client, error) { + config.HandshakeConfig = HandshakeConfig + config.Plugins = GetPluginMap(w) + config.MaxPort = clientConfig.ClientMaxPort + config.MinPort = clientConfig.ClientMinPort + + // setting the setsid of the plugin process so that it doesn't get signals sent to + // the nomad client. + if config.Cmd != nil { + isolateCommand(config.Cmd) + } + + executorClient := plugin.NewClient(config) + rpcClient, err := executorClient.Client() + if err != nil { + return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err) + } + + raw, err := rpcClient.Dispense("executor") + if err != nil { + return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err) + } + executorPlugin := raw.(executor.Executor) + return executorPlugin, executorClient, nil +} + +// killProcess kills a process with the given pid +func killProcess(pid int) error { + proc, err := os.FindProcess(pid) + if err != nil { + return err + } + return proc.Kill() +} + +// destroyPlugin kills the plugin with the given pid and also kills the user +// process +func destroyPlugin(pluginPid int, userPid int) error { + var merr error + if err := killProcess(pluginPid); err != nil { + merr = multierror.Append(merr, err) + } + + if err := killProcess(userPid); err != nil { + merr = multierror.Append(merr, err) + } + return merr +} diff --git a/client/driver/utils_linux.go b/client/driver/utils_linux.go new file mode 100644 index 000000000..693bba6f6 --- /dev/null +++ b/client/driver/utils_linux.go @@ -0,0 +1,16 @@ +package driver + +import ( + "os/exec" + "syscall" +) + +// isolateCommand sets the setsid flag in exec.Cmd to true so that the process +// becomes the process leader in a new session and doesn't receive signals that +// are sent to the parent process. +func isolateCommand(cmd *exec.Cmd) { + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + cmd.SysProcAttr.Setsid = true +} diff --git a/client/driver/utils_posix.go b/client/driver/utils_posix.go new file mode 100644 index 000000000..fef4a002f --- /dev/null +++ b/client/driver/utils_posix.go @@ -0,0 +1,18 @@ +// +build !linux + +package driver + +import ( + "os/exec" + "syscall" +) + +// isolateCommand sets the setsid flag in exec.Cmd to true so that the process +// becomes the process leader in a new session and doesn't receive signals that +// are sent to the parent process. +func isolateCommand(cmd *exec.Cmd) { + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + cmd.SysProcAttr.Setsid = true +} diff --git a/client/driver/utils_windows.go b/client/driver/utils_windows.go new file mode 100644 index 000000000..5b2b7d842 --- /dev/null +++ b/client/driver/utils_windows.go @@ -0,0 +1,9 @@ +package driver + +import ( + "os/exec" +) + +// TODO Figure out if this is needed in Wondows +func isolateCommand(cmd *exec.Cmd) { +} diff --git a/command/agent/agent.go b/command/agent/agent.go index 0c7f1dd48..6f0f1e8a3 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -7,6 +7,7 @@ import ( "net" "os" "path/filepath" + "runtime" "sync" "time" @@ -215,6 +216,8 @@ func (a *Agent) setupClient() error { } conf.MaxKillTimeout = dur } + conf.ClientMaxPort = a.config.Client.ClientMaxPort + conf.ClientMinPort = a.config.Client.ClientMinPort // Setup the node conf.Node = new(structs.Node) @@ -238,6 +241,29 @@ func (a *Agent) setupClient() error { } conf.Node.HTTPAddr = httpAddr + // Reserve some ports for the plugins + if runtime.GOOS == "windows" { + deviceName, err := a.findLoopbackDevice() + if err != nil { + return fmt.Errorf("error finding the device name for loopback: %v", err) + } + var nr *structs.NetworkResource + for _, n := range conf.Node.Reserved.Networks { + if n.Device == deviceName { + nr = n + } + } + if nr == nil { + nr = &structs.NetworkResource{ + Device: deviceName, + ReservedPorts: make([]structs.Port, 0), + } + } + for i := conf.ClientMinPort; i <= conf.ClientMaxPort; i++ { + nr.ReservedPorts = append(nr.ReservedPorts, structs.Port{Label: fmt.Sprintf("plugin-%d", i), Value: int(i)}) + } + } + // Create the client client, err := client.NewClient(conf) if err != nil { @@ -247,6 +273,28 @@ func (a *Agent) setupClient() error { return nil } +func (a *Agent) findLoopbackDevice() (string, error) { + var ifcs []net.Interface + var err error + var deviceName string + ifcs, err = net.Interfaces() + if err != nil { + return "", err + } + for _, ifc := range ifcs { + addrs, err := ifc.Addrs() + if err != nil { + return deviceName, err + } + for _, addr := range addrs { + if net.ParseIP(addr.String()).IsLoopback() { + return ifc.Name, nil + } + } + } + return deviceName, err +} + // Leave is used gracefully exit. Clients will inform servers // of their departure so that allocations can be rescheduled. func (a *Agent) Leave() error { diff --git a/command/agent/config.go b/command/agent/config.go index 6d9219f96..89abd6300 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -159,6 +159,14 @@ type ClientConfig struct { // MaxKillTimeout allows capping the user-specifiable KillTimeout. MaxKillTimeout string `hcl:"max_kill_timeout"` + + // ClientMaxPort is the upper range of the ports that the client uses for + // communicating with plugin subsystems + ClientMaxPort uint `hcl:"client_max_port"` + + // ClientMinPort is the lower range of the ports that the client uses for + // communicating with plugin subsystems + ClientMinPort uint `hcl:"client_min_port"` } // ServerConfig is configuration specific to the server mode @@ -288,6 +296,8 @@ func DefaultConfig() *Config { Enabled: false, NetworkSpeed: 100, MaxKillTimeout: "30s", + ClientMinPort: 14000, + ClientMaxPort: 19000, }, Server: &ServerConfig{ Enabled: false, @@ -505,6 +515,12 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { if b.MaxKillTimeout != "" { result.MaxKillTimeout = b.MaxKillTimeout } + if b.ClientMaxPort != 0 { + result.ClientMaxPort = b.ClientMaxPort + } + if b.ClientMinPort != 0 { + result.ClientMinPort = b.ClientMinPort + } // Add the servers result.Servers = append(result.Servers, b.Servers...) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 2662e1155..aa5349a8d 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -104,6 +104,8 @@ func TestConfig_Merge(t *testing.T) { "foo": "bar", "baz": "zip", }, + ClientMaxPort: 20000, + ClientMinPort: 22000, NetworkSpeed: 105, MaxKillTimeout: "50s", }, diff --git a/command/executor_plugin.go b/command/executor_plugin.go new file mode 100644 index 000000000..666cab961 --- /dev/null +++ b/command/executor_plugin.go @@ -0,0 +1,43 @@ +package command + +import ( + "os" + "strings" + + "github.com/hashicorp/go-plugin" + + "github.com/hashicorp/nomad/client/driver" +) + +type ExecutorPluginCommand struct { + Meta +} + +func (e *ExecutorPluginCommand) Help() string { + helpText := ` + This is a command used by Nomad internally to launch an executor plugin" + ` + return strings.TrimSpace(helpText) +} + +func (e *ExecutorPluginCommand) Synopsis() string { + return "internal - launch an executor plugin" +} + +func (e *ExecutorPluginCommand) Run(args []string) int { + if len(args) == 0 { + e.Ui.Error("log output file isn't provided") + return 1 + } + logFileName := args[0] + stdo, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + e.Ui.Error(err.Error()) + return 1 + } + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: driver.HandshakeConfig, + Plugins: driver.GetPluginMap(stdo), + }) + return 0 +} diff --git a/command/spawn_daemon.go b/command/spawn_daemon.go deleted file mode 100644 index 52ffd8e6c..000000000 --- a/command/spawn_daemon.go +++ /dev/null @@ -1,234 +0,0 @@ -package command - -import ( - "encoding/json" - "fmt" - "io" - "os" - "os/exec" - "strconv" - "strings" - "syscall" -) - -type SpawnDaemonCommand struct { - Meta - config *DaemonConfig - exitFile io.WriteCloser -} - -func (c *SpawnDaemonCommand) Help() string { - helpText := ` -Usage: nomad spawn-daemon [options] - - INTERNAL ONLY - - Spawns a daemon process by double forking. The required daemon_config is a - json encoding of the DaemonConfig struct containing the isolation - configuration and command to run. SpawnStartStatus is json serialized to - stdout upon running the user command or if any error prevents its execution. - If there is no error, the process waits on the users command. Once the user - command exits, the exit code is written to a file specified in the - daemon_config and this process exits with the same exit status as the user - command. - ` - - return strings.TrimSpace(helpText) -} - -func (c *SpawnDaemonCommand) Synopsis() string { - return "Spawn a daemon command with configurable isolation." -} - -// Status of executing the user's command. -type SpawnStartStatus struct { - // The PID of the user's command. - UserPID int - - // ErrorMsg will be empty if the user command was started successfully. - // Otherwise it will have an error message. - ErrorMsg string -} - -// Exit status of the user's command. -type SpawnExitStatus struct { - // The exit code of the user's command. - ExitCode int -} - -// Configuration for the command to start as a daemon. -type DaemonConfig struct { - exec.Cmd - - // The filepath to write the exit status to. - ExitStatusFile string - - // The paths, if not /dev/null, must be either in the tasks root directory - // or in the shared alloc directory. - StdoutFile string - StdinFile string - StderrFile string - - // An optional path specifying the directory to chroot the process in. - Chroot string -} - -// Whether to start the user command or abort. -type TaskStart bool - -// parseConfig reads the DaemonConfig from the passed arguments. If not -// successful, an error is returned. -func (c *SpawnDaemonCommand) parseConfig(args []string) (*DaemonConfig, error) { - flags := c.Meta.FlagSet("spawn-daemon", FlagSetClient) - flags.Usage = func() { c.Ui.Output(c.Help()) } - if err := flags.Parse(args); err != nil { - return nil, fmt.Errorf("failed to parse args: %v", err) - } - - // Check that we got json input. - args = flags.Args() - if len(args) != 1 { - return nil, fmt.Errorf("incorrect number of args; got %v; want 1", len(args)) - } - jsonInput, err := strconv.Unquote(args[0]) - if err != nil { - return nil, fmt.Errorf("Failed to unquote json input: %v", err) - } - - // De-serialize the passed command. - var config DaemonConfig - dec := json.NewDecoder(strings.NewReader(jsonInput)) - if err := dec.Decode(&config); err != nil { - return nil, err - } - - return &config, nil -} - -// configureLogs creates the log files and redirects the process -// stdin/stderr/stdout to them. If unsuccessful, an error is returned. -func (c *SpawnDaemonCommand) configureLogs() error { - if len(c.config.StdoutFile) != 0 { - stdo, err := os.OpenFile(c.config.StdoutFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) - if err != nil { - return fmt.Errorf("Error opening file to redirect stdout: %v", err) - } - - c.config.Cmd.Stdout = stdo - } - - if len(c.config.StderrFile) != 0 { - stde, err := os.OpenFile(c.config.StderrFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) - if err != nil { - return fmt.Errorf("Error opening file to redirect stderr: %v", err) - } - c.config.Cmd.Stderr = stde - } - - if len(c.config.StdinFile) != 0 { - stdi, err := os.OpenFile(c.config.StdinFile, os.O_CREATE|os.O_RDONLY, 0666) - if err != nil { - return fmt.Errorf("Error opening file to redirect stdin: %v", err) - } - c.config.Cmd.Stdin = stdi - } - - return nil -} - -func (c *SpawnDaemonCommand) Run(args []string) int { - var err error - c.config, err = c.parseConfig(args) - if err != nil { - return c.outputStartStatus(err, 1) - } - - // Open the file we will be using to write exit codes to. We do this early - // to ensure that we don't start the user process when we can't capture its - // exit status. - c.exitFile, err = os.OpenFile(c.config.ExitStatusFile, os.O_WRONLY, 0666) - if err != nil { - return c.outputStartStatus(fmt.Errorf("Error opening file to store exit status: %v", err), 1) - } - - // Isolate the user process. - if err := c.isolateCmd(); err != nil { - return c.outputStartStatus(err, 1) - } - - // Redirect logs. - if err := c.configureLogs(); err != nil { - return c.outputStartStatus(err, 1) - } - - // Chroot jail the process and set its working directory. - c.configureChroot() - - // Wait to get the start command. - var start TaskStart - dec := json.NewDecoder(os.Stdin) - if err := dec.Decode(&start); err != nil { - return c.outputStartStatus(err, 1) - } - - // Aborted by Nomad process. - if !start { - return 0 - } - - // Spawn the user process. - if err := c.config.Cmd.Start(); err != nil { - return c.outputStartStatus(fmt.Errorf("Error starting user command: %v", err), 1) - } - - // Indicate that the command was started successfully. - c.outputStartStatus(nil, 0) - - // Wait and then output the exit status. - return c.writeExitStatus(c.config.Cmd.Wait()) -} - -// outputStartStatus is a helper function that outputs a SpawnStartStatus to -// Stdout with the passed error, which may be nil to indicate no error. It -// returns the passed status. -func (c *SpawnDaemonCommand) outputStartStatus(err error, status int) int { - startStatus := &SpawnStartStatus{} - enc := json.NewEncoder(os.Stdout) - - if err != nil { - startStatus.ErrorMsg = err.Error() - } - - if c.config != nil && c.config.Cmd.Process != nil { - startStatus.UserPID = c.config.Process.Pid - } - - enc.Encode(startStatus) - return status -} - -// writeExitStatus takes in the error result from calling wait and writes out -// the exit status to a file. It returns the same exit status as the user -// command. -func (c *SpawnDaemonCommand) writeExitStatus(exit error) int { - // Parse the exit code. - exitStatus := &SpawnExitStatus{} - if exit != nil { - // Default to exit code 1 if we can not get the actual exit code. - exitStatus.ExitCode = 1 - - if exiterr, ok := exit.(*exec.ExitError); ok { - if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { - exitStatus.ExitCode = status.ExitStatus() - } - } - } - - if c.exitFile != nil { - enc := json.NewEncoder(c.exitFile) - enc.Encode(exitStatus) - c.exitFile.Close() - } - - return exitStatus.ExitCode -} diff --git a/command/spawn_daemon_darwin.go b/command/spawn_daemon_darwin.go deleted file mode 100644 index f3fe8484a..000000000 --- a/command/spawn_daemon_darwin.go +++ /dev/null @@ -1,4 +0,0 @@ -package command - -// No chroot on darwin. -func (c *SpawnDaemonCommand) configureChroot() {} diff --git a/command/spawn_daemon_linux.go b/command/spawn_daemon_linux.go deleted file mode 100644 index 512ec645f..000000000 --- a/command/spawn_daemon_linux.go +++ /dev/null @@ -1,16 +0,0 @@ -package command - -import "syscall" - -// configureChroot enters the user command into a chroot if specified in the -// config and on an OS that supports Chroots. -func (c *SpawnDaemonCommand) configureChroot() { - if len(c.config.Chroot) != 0 { - if c.config.Cmd.SysProcAttr == nil { - c.config.Cmd.SysProcAttr = &syscall.SysProcAttr{} - } - - c.config.Cmd.SysProcAttr.Chroot = c.config.Chroot - c.config.Cmd.Dir = "/" - } -} diff --git a/command/spawn_daemon_test.go b/command/spawn_daemon_test.go deleted file mode 100644 index 5bfd6ad5a..000000000 --- a/command/spawn_daemon_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package command - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "os/exec" - "testing" -) - -type nopCloser struct { - io.ReadWriter -} - -func (n *nopCloser) Close() error { - return nil -} - -func TestSpawnDaemon_WriteExitStatus(t *testing.T) { - // Check if there is python. - path, err := exec.LookPath("python") - if err != nil { - t.Skip("python not detected") - } - - var b bytes.Buffer - daemon := &SpawnDaemonCommand{exitFile: &nopCloser{&b}} - - code := 3 - cmd := exec.Command(path, "./test-resources/exiter.py", fmt.Sprintf("%d", code)) - err = cmd.Run() - actual := daemon.writeExitStatus(err) - if actual != code { - t.Fatalf("writeExitStatus(%v) returned %v; want %v", err, actual, code) - } - - // De-serialize the passed command. - var exitStatus SpawnExitStatus - dec := json.NewDecoder(&b) - if err := dec.Decode(&exitStatus); err != nil { - t.Fatalf("failed to decode exit status: %v", err) - } - - if exitStatus.ExitCode != code { - t.Fatalf("writeExitStatus(%v) wrote exit status %v; want %v", err, exitStatus.ExitCode, code) - } -} diff --git a/command/spawn_daemon_unix.go b/command/spawn_daemon_unix.go deleted file mode 100644 index 981e52596..000000000 --- a/command/spawn_daemon_unix.go +++ /dev/null @@ -1,16 +0,0 @@ -// +build !windows - -package command - -import "syscall" - -// isolateCmd sets the session id for the process and the umask. -func (c *SpawnDaemonCommand) isolateCmd() error { - if c.config.Cmd.SysProcAttr == nil { - c.config.Cmd.SysProcAttr = &syscall.SysProcAttr{} - } - - c.config.Cmd.SysProcAttr.Setsid = true - syscall.Umask(0) - return nil -} diff --git a/command/spawn_daemon_windows.go b/command/spawn_daemon_windows.go deleted file mode 100644 index bb2d63ed8..000000000 --- a/command/spawn_daemon_windows.go +++ /dev/null @@ -1,7 +0,0 @@ -// build !linux !darwin - -package command - -// No isolation on Windows. -func (c *SpawnDaemonCommand) isolateCmd() error { return nil } -func (c *SpawnDaemonCommand) configureChroot() {} diff --git a/commands.go b/commands.go index 3fc6718a3..6ad1ba3c3 100644 --- a/commands.go +++ b/commands.go @@ -57,6 +57,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "executor": func() (cli.Command, error) { + return &command.ExecutorPluginCommand{ + Meta: meta, + }, nil + }, "fs": func() (cli.Command, error) { return &command.FSCommand{ Meta: meta, @@ -118,13 +123,6 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, - - "spawn-daemon": func() (cli.Command, error) { - return &command.SpawnDaemonCommand{ - Meta: meta, - }, nil - }, - "status": func() (cli.Command, error) { return &command.StatusCommand{ Meta: meta, diff --git a/main.go b/main.go index 36115789d..1e3558658 100644 --- a/main.go +++ b/main.go @@ -32,7 +32,7 @@ func RunCustom(args []string, commands map[string]cli.CommandFactory) int { commandsInclude := make([]string, 0, len(commands)) for k, _ := range commands { switch k { - case "spawn-daemon": + case "executor": default: commandsInclude = append(commandsInclude, k) }