diff --git a/CHANGELOG.md b/CHANGELOG.md index f53be37bf..365d6caf7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ BACKWARDS INCOMPATIBILITIES: * core: Improved restart policy with more user configuration [GH-594] - * core/cli: Print short identifiers [GH-675] + * core/cli: Print short identifiers [GH-760] * core/consul: Validate service name doesn't include period [GH-770] * core/jobspec: Variables/constraints interpreted using ${} notation [GH-675] * client: Environment variable containing address for each allocated port diff --git a/api/allocations.go b/api/allocations.go index 9a87bbaf8..6badd3f69 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -60,6 +60,7 @@ type Allocation struct { TaskStates map[string]*TaskState CreateIndex uint64 ModifyIndex uint64 + CreateTime int64 } // AllocationMetric is used to deserialize allocation metrics. @@ -93,6 +94,7 @@ type AllocationListStub struct { TaskStates map[string]*TaskState CreateIndex uint64 ModifyIndex uint64 + CreateTime int64 } // AllocIndexSort reverse sorts allocs by CreateIndex. diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index f082ba08e..6b526b846 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -53,7 +53,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) { return false, fmt.Errorf("No updates") } last := upd.Allocs[upd.Count-1] - if last.ClientStatus == structs.AllocClientStatusDead { + if last.ClientStatus != structs.AllocClientStatusDead { return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) } return true, nil @@ -77,7 +77,7 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { return false, fmt.Errorf("No updates") } last := upd.Allocs[upd.Count-1] - if last.ClientStatus == structs.AllocClientStatusRunning { + if last.ClientStatus != structs.AllocClientStatusRunning { return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) } return true, nil diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index ab56f6982..6904cf118 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -8,6 +8,7 @@ import ( "path/filepath" "time" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" ) @@ -37,9 +38,6 @@ type AllocDir struct { // TaskDirs is a mapping of task names to their non-shared directory. TaskDirs map[string]string - - // A list of locations the shared alloc has been mounted to. - Mounted []string } // AllocFileInfo holds information about a file inside the AllocDir @@ -67,13 +65,39 @@ func NewAllocDir(allocDir string) *AllocDir { // Tears down previously build directory structure. func (d *AllocDir) Destroy() error { // Unmount all mounted shared alloc dirs. - for _, m := range d.Mounted { - if err := d.unmountSharedDir(m); err != nil { - return fmt.Errorf("Failed to unmount shared directory: %v", err) - } + var mErr multierror.Error + if err := d.UnmountAll(); err != nil { + mErr.Errors = append(mErr.Errors, err) } - return os.RemoveAll(d.AllocDir) + if err := os.RemoveAll(d.AllocDir); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + + return mErr.ErrorOrNil() +} + +func (d *AllocDir) UnmountAll() error { + var mErr multierror.Error + for _, dir := range d.TaskDirs { + // Check if the directory has the shared alloc mounted. + taskAlloc := filepath.Join(dir, SharedAllocName) + if d.pathExists(taskAlloc) { + if err := d.unmountSharedDir(taskAlloc); err != nil { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("failed to unmount shared alloc dir %q: %v", taskAlloc, err)) + } + if err := os.RemoveAll(taskAlloc); err != nil { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("failed to delete shared alloc dir %q: %v", taskAlloc, err)) + } + } + + // Unmount dev/ and proc/ have been mounted. + d.unmountSpecialDirs(dir) + } + + return mErr.ErrorOrNil() } // Given a list of a task build the correct alloc structure. @@ -248,7 +272,6 @@ func (d *AllocDir) MountSharedDir(task string) error { return fmt.Errorf("Failed to mount shared directory for task %v: %v", task, err) } - d.Mounted = append(d.Mounted, taskLoc) return nil } @@ -325,3 +348,13 @@ func fileCopy(src, dst string, perm os.FileMode) error { return nil } + +// pathExists is a helper function to check if the path exists. +func (d *AllocDir) pathExists(path string) bool { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} diff --git a/client/allocdir/alloc_dir_darwin.go b/client/allocdir/alloc_dir_darwin.go index 9c247d15d..2cfdd38c3 100644 --- a/client/allocdir/alloc_dir_darwin.go +++ b/client/allocdir/alloc_dir_darwin.go @@ -13,3 +13,14 @@ func (d *AllocDir) mountSharedDir(dir string) error { func (d *AllocDir) unmountSharedDir(dir string) error { return syscall.Unlink(dir) } + +// MountSpecialDirs mounts the dev and proc file system on the chroot of the +// task. It's a no-op on darwin. +func (d *AllocDir) MountSpecialDirs(taskDir string) error { + return nil +} + +// unmountSpecialDirs unmounts the dev and proc file system from the chroot +func (d *AllocDir) unmountSpecialDirs(taskDir string) error { + return nil +} diff --git a/client/allocdir/alloc_dir_linux.go b/client/allocdir/alloc_dir_linux.go index 89d5df6a4..d8d44af1f 100644 --- a/client/allocdir/alloc_dir_linux.go +++ b/client/allocdir/alloc_dir_linux.go @@ -1,8 +1,12 @@ package allocdir import ( + "fmt" "os" + "path/filepath" "syscall" + + "github.com/hashicorp/go-multierror" ) // Bind mounts the shared directory into the task directory. Must be root to @@ -18,3 +22,62 @@ func (d *AllocDir) mountSharedDir(taskDir string) error { func (d *AllocDir) unmountSharedDir(dir string) error { return syscall.Unmount(dir, 0) } + +// MountSpecialDirs mounts the dev and proc file system from the host to the +// chroot +func (d *AllocDir) MountSpecialDirs(taskDir string) error { + // Mount dev + dev := filepath.Join(taskDir, "dev") + if !d.pathExists(dev) { + if err := os.Mkdir(dev, 0777); err != nil { + return fmt.Errorf("Mkdir(%v) failed: %v", dev, err) + } + + if err := syscall.Mount("none", dev, "devtmpfs", syscall.MS_RDONLY, ""); err != nil { + return fmt.Errorf("Couldn't mount /dev to %v: %v", dev, err) + } + } + + // Mount proc + proc := filepath.Join(taskDir, "proc") + if !d.pathExists(proc) { + if err := os.Mkdir(proc, 0777); err != nil { + return fmt.Errorf("Mkdir(%v) failed: %v", proc, err) + } + + if err := syscall.Mount("none", proc, "proc", syscall.MS_RDONLY, ""); err != nil { + return fmt.Errorf("Couldn't mount /proc to %v: %v", proc, err) + } + } + + return nil +} + +// unmountSpecialDirs unmounts the dev and proc file system from the chroot +func (d *AllocDir) unmountSpecialDirs(taskDir string) error { + errs := new(multierror.Error) + dev := filepath.Join(taskDir, "dev") + if d.pathExists(dev) { + if err := syscall.Unmount(dev, 0); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed to unmount dev (%v): %v", dev, err)) + } + + if err := os.RemoveAll(dev); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed to delete dev directory (%v): %v", dev, err)) + } + } + + // Unmount proc. + proc := filepath.Join(taskDir, "proc") + if d.pathExists(proc) { + if err := syscall.Unmount(proc, 0); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed to unmount proc (%v): %v", proc, err)) + } + + if err := os.RemoveAll(proc); err != nil { + errs = multierror.Append(errs, fmt.Errorf("Failed to delete proc directory (%v): %v", dev, err)) + } + } + + return errs.ErrorOrNil() +} diff --git a/client/allocdir/alloc_dir_windows.go b/client/allocdir/alloc_dir_windows.go index ab0692b5b..7211125ae 100644 --- a/client/allocdir/alloc_dir_windows.go +++ b/client/allocdir/alloc_dir_windows.go @@ -23,3 +23,14 @@ func (d *AllocDir) dropDirPermissions(path string) error { func (d *AllocDir) unmountSharedDir(dir string) error { return nil } + +// MountSpecialDirs mounts the dev and proc file system on the chroot of the +// task. It's a no-op on windows. +func (d *AllocDir) MountSpecialDirs(taskDir string) error { + return nil +} + +// unmountSpecialDirs unmounts the dev and proc file system from the chroot +func (d *AllocDir) unmountSpecialDirs(taskDir string) error { + return nil +} diff --git a/client/config/config.go b/client/config/config.go index 0d59a1b9a..dd2c2b967 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -57,11 +57,13 @@ type Config struct { // Node provides the base node Node *structs.Node - // ExecutorMaxPort defines the highest port a plugin process can use - ExecutorMaxPort int + // ClientMaxPort is the upper range of the ports that the client uses for + // communicating with plugin subsystems + ClientMaxPort uint - // ExecutorMinPort defines the lowest port a plugin process can use - ExecutorMinPort int + // ClientMinPort is the lower range of the ports that the client uses for + // communicating with plugin subsystems + ClientMinPort uint // Options provides arbitrary key-value configuration for nomad internals, // like fingerprinters and drivers. The format is: diff --git a/client/driver/driver.go b/client/driver/driver.go index f92b84fad..db808b18d 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -89,8 +89,14 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration { max := d.config.MaxKillTimeout.Nanoseconds() desired := task.KillTimeout.Nanoseconds() + + // Make the minimum time between signal and kill, 1 second. + if desired == 0 { + desired = (1 * time.Second).Nanoseconds() + } + if desired < max { - return task.KillTimeout + return time.Duration(desired) } return d.config.MaxKillTimeout diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 0ab20d813..f76f8c68e 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -46,6 +46,7 @@ func testConfig() *config.Config { conf := &config.Config{} conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() + conf.MaxKillTimeout = 10 * time.Second return conf } diff --git a/client/driver/exec.go b/client/driver/exec.go index 6b1f4f7a7..9647b4a94 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -9,7 +9,9 @@ import ( "syscall" "time" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" @@ -17,8 +19,6 @@ import ( "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" - - cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" ) // ExecDriver fork/execs tasks using as many of the underlying OS's isolation @@ -36,14 +36,15 @@ type ExecDriverConfig struct { // execHandle is returned from Start/Open as a handle to the PID type execHandle struct { - pluginClient *plugin.Client - executor executor.Executor - groups *cgroupConfig.Cgroup - userPid int - killTimeout time.Duration - logger *log.Logger - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + pluginClient *plugin.Client + executor executor.Executor + isolationConfig *executor.IsolationConfig + userPid int + allocDir *allocdir.AllocDir + killTimeout time.Duration + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewExecDriver is used to create a new exec driver @@ -110,7 +111,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, Cmd: exec.Command(bin, "executor", pluginLogFile), } - exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { return nil, err } @@ -133,24 +134,27 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, // Return a driver handle h := &execHandle{ - pluginClient: pluginClient, - userPid: ps.Pid, - executor: exec, - groups: &ps.IsolationConfig, - killTimeout: d.DriverContext.KillTimeout(task), - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + userPid: ps.Pid, + executor: exec, + allocDir: ctx.AllocDir, + isolationConfig: ps.IsolationConfig, + killTimeout: d.DriverContext.KillTimeout(task), + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } type execId struct { - KillTimeout time.Duration - UserPid int - Groups *cgroupConfig.Cgroup - PluginConfig *ExecutorReattachConfig + KillTimeout time.Duration + UserPid int + TaskDir string + AllocDir *allocdir.AllocDir + IsolationConfig *executor.IsolationConfig + PluginConfig *ExecutorReattachConfig } func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -162,28 +166,36 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro pluginConfig := &plugin.ClientConfig{ Reattach: id.PluginConfig.PluginConfig(), } - executor, client, err := createExecutor(pluginConfig, d.config.LogOutput) + exec, client, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { + merrs := new(multierror.Error) + merrs.Errors = append(merrs.Errors, err) d.logger.Println("[ERROR] driver.exec: error connecting to plugin so destroying plugin pid and user pid") if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil { - d.logger.Printf("[ERROR] driver.exec: error destroying plugin and userpid: %v", e) + merrs.Errors = append(merrs.Errors, fmt.Errorf("error destroying plugin and userpid: %v", e)) } - if e := destroyCgroup(id.Groups); e != nil { - d.logger.Printf("[ERROR] driver.exec: %v", e) + if id.IsolationConfig != nil { + if e := executor.DestroyCgroup(id.IsolationConfig.Cgroup); e != nil { + merrs.Errors = append(merrs.Errors, fmt.Errorf("destroying cgroup failed: %v", e)) + } } - return nil, fmt.Errorf("error connecting to plugin: %v", err) + if e := ctx.AllocDir.UnmountAll(); e != nil { + merrs.Errors = append(merrs.Errors, e) + } + return nil, fmt.Errorf("error connecting to plugin: %v", merrs.ErrorOrNil()) } // Return a driver handle h := &execHandle{ - pluginClient: client, - executor: executor, - userPid: id.UserPid, - groups: id.Groups, - logger: d.logger, - killTimeout: id.KillTimeout, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: client, + executor: exec, + userPid: id.UserPid, + allocDir: id.AllocDir, + isolationConfig: id.IsolationConfig, + logger: d.logger, + killTimeout: id.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -191,10 +203,11 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro func (h *execHandle) ID() string { id := execId{ - KillTimeout: h.killTimeout, - PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), - UserPid: h.userPid, - Groups: h.groups, + KillTimeout: h.killTimeout, + PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + UserPid: h.userPid, + AllocDir: h.allocDir, + IsolationConfig: h.isolationConfig, } data, err := json.Marshal(id) @@ -217,7 +230,10 @@ func (h *execHandle) Update(task *structs.Task) error { } func (h *execHandle) Kill() error { - h.executor.ShutDown() + if err := h.executor.ShutDown(); err != nil { + return fmt.Errorf("executor Shutdown failed: %v", err) + } + select { case <-h.doneCh: return nil @@ -225,15 +241,35 @@ func (h *execHandle) Kill() error { if h.pluginClient.Exited() { return nil } - err := h.executor.Exit() - return err + if err := h.executor.Exit(); err != nil { + return fmt.Errorf("executor Exit failed: %v", err) + } + + return nil } } func (h *execHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) - h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} + + // If the exitcode is 0 and we had an error that means the plugin didn't + // connect and doesn't know the state of the user process so we are killing + // the user process so that when we create a new executor on restarting the + // new user process doesn't have collisions with resources that the older + // user pid might be holding onto. + if ps.ExitCode == 0 && err != nil { + if h.isolationConfig != nil { + if e := executor.DestroyCgroup(h.isolationConfig.Cgroup); e != nil { + h.logger.Printf("[ERROR] driver.exec: destroying cgroup failed while killing cgroup: %v", e) + } + } + if e := h.allocDir.UnmountAll(); e != nil { + h.logger.Printf("[ERROR] driver.exec: unmounting dev,proc and alloc dirs failed: %v", e) + } + } + h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, + Err: err} close(h.waitCh) h.pluginClient.Kill() } diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 047584db0..85da6bfa5 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -129,7 +129,8 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { userProc, err := os.FindProcess(id.UserPid) err = userProc.Signal(syscall.Signal(0)) - if err != nil { + + if err == nil { t.Fatalf("expected user process to die") } } @@ -321,9 +322,10 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": "/bin/sleep", - "args": []string{"10"}, + "args": []string{"100"}, }, - Resources: basicResources, + Resources: basicResources, + KillTimeout: 10 * time.Second, } driverCtx, execCtx := testDriverContexts(task) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 1d92a013b..08d5dc834 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -61,12 +61,18 @@ type ExecCommand struct { Args []string } +// IsolationConfig has information about the isolation mechanism the executor +// uses to put resource constraints and isolation on the user process +type IsolationConfig struct { + Cgroup *cgroupConfig.Cgroup +} + // ProcessState holds information about the state of a user process. type ProcessState struct { Pid int ExitCode int Signal int - IsolationConfig cgroupConfig.Cgroup + IsolationConfig *IsolationConfig Time time.Time } @@ -168,9 +174,9 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext if err := e.cmd.Start(); err != nil { return nil, fmt.Errorf("error starting command: %v", err) } - go e.wait() - return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: *e.groups, Time: time.Now()}, nil + ic := &IsolationConfig{Cgroup: e.groups} + return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil } // Wait waits until a process has exited and returns it's exitcode and errors @@ -212,11 +218,19 @@ func (e *UniversalExecutor) wait() { e.removeChrootMounts() } if e.ctx.ResourceLimits { - e.destroyCgroup() + e.lock.Lock() + DestroyCgroup(e.groups) + e.lock.Unlock() } e.exitState = &ProcessState{Pid: 0, ExitCode: exitCode, Time: time.Now()} } +var ( + // finishedErr is the error message received when trying to kill and already + // exited process. + finishedErr = "os: process already finished" +) + // Exit cleans up the alloc directory, destroys cgroups and kills the user // process func (e *UniversalExecutor) Exit() error { @@ -224,10 +238,11 @@ func (e *UniversalExecutor) Exit() error { if e.cmd.Process != nil { proc, err := os.FindProcess(e.cmd.Process.Pid) if err != nil { - e.logger.Printf("[ERROR] can't find process with pid: %v, err: %v", e.cmd.Process.Pid, err) - } - if err := proc.Kill(); err != nil { - e.logger.Printf("[ERROR] can't kill process with pid: %v, err: %v", e.cmd.Process.Pid, err) + e.logger.Printf("[ERROR] executor: can't find process with pid: %v, err: %v", + e.cmd.Process.Pid, err) + } else if err := proc.Kill(); err != nil && err.Error() != finishedErr { + merr.Errors = append(merr.Errors, + fmt.Errorf("can't kill process with pid: %v, err: %v", e.cmd.Process.Pid, err)) } } @@ -237,9 +252,11 @@ func (e *UniversalExecutor) Exit() error { } } if e.ctx.ResourceLimits { - if err := e.destroyCgroup(); err != nil { + e.lock.Lock() + if err := DestroyCgroup(e.groups); err != nil { merr.Errors = append(merr.Errors, err) } + e.lock.Unlock() } return merr.ErrorOrNil() } @@ -262,11 +279,12 @@ func (e *UniversalExecutor) ShutDown() error { return nil } +// configureTaskDir sets the task dir in the executor func (e *UniversalExecutor) configureTaskDir() error { taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.TaskName] e.taskDir = taskDir if !ok { - return fmt.Errorf("Couldn't find task directory for task %v", e.ctx.TaskName) + return fmt.Errorf("couldn't find task directory for task %v", e.ctx.TaskName) } e.cmd.Dir = taskDir return nil diff --git a/client/driver/executor/executor_basic.go b/client/driver/executor/executor_basic.go index 84d080233..9e531a547 100644 --- a/client/driver/executor/executor_basic.go +++ b/client/driver/executor/executor_basic.go @@ -2,11 +2,15 @@ package executor +import ( + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" +) + func (e *UniversalExecutor) configureChroot() error { return nil } -func (e *UniversalExecutor) destroyCgroup() error { +func DestroyCgroup(groups *cgroupConfig.Cgroup) error { return nil } diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go index c95db04d0..d156736f4 100644 --- a/client/driver/executor/executor_linux.go +++ b/client/driver/executor/executor_linux.go @@ -46,11 +46,11 @@ func (e *UniversalExecutor) configureIsolation() error { return fmt.Errorf("error creating cgroups: %v", err) } if err := e.applyLimits(os.Getpid()); err != nil { - if er := e.destroyCgroup(); er != nil { - e.logger.Printf("[ERROR] error destroying cgroup: %v", er) + if er := DestroyCgroup(e.groups); er != nil { + e.logger.Printf("[ERROR] executor: error destroying cgroup: %v", er) } if er := e.removeChrootMounts(); er != nil { - e.logger.Printf("[ERROR] error removing chroot: %v", er) + e.logger.Printf("[ERROR] executor: error removing chroot: %v", er) } return fmt.Errorf("error entering the plugin process in the cgroup: %v:", err) } @@ -65,11 +65,11 @@ func (e *UniversalExecutor) applyLimits(pid int) error { } // Entering the process in the cgroup - manager := e.getCgroupManager(e.groups) + manager := getCgroupManager(e.groups) if err := manager.Apply(pid); err != nil { - e.logger.Printf("[ERROR] unable to join cgroup: %v", err) + e.logger.Printf("[ERROR] executor: unable to join cgroup: %v", err) if err := e.Exit(); err != nil { - e.logger.Printf("[ERROR] unable to kill process: %v", err) + e.logger.Printf("[ERROR] executor: unable to kill process: %v", err) } return err } @@ -144,16 +144,6 @@ func (e *UniversalExecutor) runAs(userid string) error { return nil } -// pathExists is a helper function to check if the path exists. -func (e *UniversalExecutor) pathExists(path string) bool { - if _, err := os.Stat(path); err != nil { - if os.IsNotExist(err) { - return false - } - } - return true -} - // configureChroot configures a chroot func (e *UniversalExecutor) configureChroot() error { allocDir := e.ctx.AllocDir @@ -165,40 +155,19 @@ func (e *UniversalExecutor) configureChroot() error { return err } - // Mount dev - dev := filepath.Join(e.taskDir, "dev") - if !e.pathExists(dev) { - if err := os.Mkdir(dev, 0777); err != nil { - return fmt.Errorf("Mkdir(%v) failed: %v", dev, err) - } - - if err := syscall.Mount("none", dev, "devtmpfs", syscall.MS_RDONLY, ""); err != nil { - return fmt.Errorf("Couldn't mount /dev to %v: %v", dev, err) - } - } - - // Mount proc - proc := filepath.Join(e.taskDir, "proc") - if !e.pathExists(proc) { - if err := os.Mkdir(proc, 0777); err != nil { - return fmt.Errorf("Mkdir(%v) failed: %v", proc, err) - } - - if err := syscall.Mount("none", proc, "proc", syscall.MS_RDONLY, ""); err != nil { - return fmt.Errorf("Couldn't mount /proc to %v: %v", proc, err) - } - } - // Set the tasks AllocDir environment variable. e.ctx.TaskEnv.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName)).SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)).Build() if e.cmd.SysProcAttr == nil { e.cmd.SysProcAttr = &syscall.SysProcAttr{} } - e.cmd.SysProcAttr.Chroot = e.taskDir e.cmd.Dir = "/" + if err := allocDir.MountSpecialDirs(e.taskDir); err != nil { + return err + } + return nil } @@ -208,80 +177,44 @@ func (e *UniversalExecutor) removeChrootMounts() error { // Prevent a race between Wait/ForceStop e.lock.Lock() defer e.lock.Unlock() - - // Unmount dev. - errs := new(multierror.Error) - dev := filepath.Join(e.taskDir, "dev") - if e.pathExists(dev) { - if err := syscall.Unmount(dev, 0); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed to unmount dev (%v): %v", dev, err)) - } - - if err := os.RemoveAll(dev); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed to delete dev directory (%v): %v", dev, err)) - } - } - - // Unmount proc. - proc := filepath.Join(e.taskDir, "proc") - if e.pathExists(proc) { - if err := syscall.Unmount(proc, 0); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed to unmount proc (%v): %v", proc, err)) - } - - if err := os.RemoveAll(proc); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed to delete proc directory (%v): %v", dev, err)) - } - } - - return errs.ErrorOrNil() + return e.ctx.AllocDir.UnmountAll() } // destroyCgroup kills all processes in the cgroup and removes the cgroup // configuration from the host. -func (e *UniversalExecutor) destroyCgroup() error { - if e.groups == nil { +func DestroyCgroup(groups *cgroupConfig.Cgroup) error { + merrs := new(multierror.Error) + if groups == nil { return fmt.Errorf("Can't destroy: cgroup configuration empty") } - // Prevent a race between Wait/ForceStop - e.lock.Lock() - defer e.lock.Unlock() - - manager := e.getCgroupManager(e.groups) - pids, err := manager.GetPids() - if err != nil { - return fmt.Errorf("Failed to get pids in the cgroup %v: %v", e.groups.Name, err) - } - - errs := new(multierror.Error) - for _, pid := range pids { - process, err := os.FindProcess(pid) - if err != nil { - multierror.Append(errs, fmt.Errorf("Failed to find Pid %v: %v", pid, err)) - continue - } - - if err := process.Kill(); err != nil && err.Error() != "os: process already finished" { - multierror.Append(errs, fmt.Errorf("Failed to kill Pid %v: %v", pid, err)) - continue + manager := getCgroupManager(groups) + if pids, perr := manager.GetPids(); perr == nil { + for _, pid := range pids { + proc, err := os.FindProcess(pid) + if err != nil { + merrs.Errors = append(merrs.Errors, fmt.Errorf("error finding process %v: %v", pid, err)) + } else { + if e := proc.Kill(); e != nil { + merrs.Errors = append(merrs.Errors, fmt.Errorf("error killing process %v: %v", pid, e)) + } + } } } // Remove the cgroup. if err := manager.Destroy(); err != nil { - multierror.Append(errs, fmt.Errorf("Failed to delete the cgroup directories: %v", err)) + multierror.Append(merrs, fmt.Errorf("Failed to delete the cgroup directories: %v", err)) } - if len(errs.Errors) != 0 { - return fmt.Errorf("Failed to destroy cgroup: %v", errs) + if len(merrs.Errors) != 0 { + return fmt.Errorf("errors while destroying cgroup: %v", merrs) } - return nil } // getCgroupManager returns the correct libcontainer cgroup manager. -func (e *UniversalExecutor) getCgroupManager(groups *cgroupConfig.Cgroup) cgroups.Manager { +func getCgroupManager(groups *cgroupConfig.Cgroup) cgroups.Manager { var manager cgroups.Manager manager = &cgroupFs.Manager{Cgroups: groups} if systemd.UseSystemd() { diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index 1d9af9bb4..ece9c0428 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -19,7 +19,7 @@ var HandshakeConfig = plugin.HandshakeConfig{ func GetPluginMap(w io.Writer) map[string]plugin.Plugin { p := new(ExecutorPlugin) - p.logger = log.New(w, "executor-plugin-server:", log.LstdFlags) + p.logger = log.New(w, "", log.LstdFlags) return map[string]plugin.Plugin{"executor": p} } @@ -115,10 +115,14 @@ func (e *ExecutorRPCServer) UpdateLogConfig(args *structs.LogConfig, resp *inter type ExecutorPlugin struct { logger *log.Logger + Impl *ExecutorRPCServer } func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { - return &ExecutorRPCServer{Impl: executor.NewExecutor(p.logger)}, nil + if p.Impl == nil { + p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutor(p.logger)} + } + return p.Impl, nil } func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { diff --git a/client/driver/java.go b/client/driver/java.go index b30f87c26..6d8a15e7d 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -12,10 +12,11 @@ import ( "syscall" "time" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-plugin" "github.com/mitchellh/mapstructure" - cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" @@ -41,11 +42,13 @@ type JavaDriverConfig struct { // javaHandle is returned from Start/Open as a handle to the PID type javaHandle struct { - pluginClient *plugin.Client - userPid int - executor executor.Executor - groups *cgroupConfig.Cgroup + pluginClient *plugin.Client + userPid int + executor executor.Executor + isolationConfig *executor.IsolationConfig + taskDir string + allocDir *allocdir.AllocDir killTimeout time.Duration logger *log.Logger waitCh chan *cstructs.WaitResult @@ -155,7 +158,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, Cmd: exec.Command(bin, "executor", pluginLogFile), } - exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { return nil, err } @@ -176,14 +179,16 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, // Return a driver handle h := &javaHandle{ - pluginClient: pluginClient, - executor: exec, - userPid: ps.Pid, - groups: &ps.IsolationConfig, - killTimeout: d.DriverContext.KillTimeout(task), - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: exec, + userPid: ps.Pid, + isolationConfig: ps.IsolationConfig, + taskDir: taskDir, + allocDir: ctx.AllocDir, + killTimeout: d.DriverContext.KillTimeout(task), + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -191,10 +196,12 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } type javaId struct { - KillTimeout time.Duration - PluginConfig *ExecutorReattachConfig - Groups *cgroupConfig.Cgroup - UserPid int + KillTimeout time.Duration + PluginConfig *ExecutorReattachConfig + IsolationConfig *executor.IsolationConfig + TaskDir string + AllocDir *allocdir.AllocDir + UserPid int } func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -206,29 +213,38 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro pluginConfig := &plugin.ClientConfig{ Reattach: id.PluginConfig.PluginConfig(), } - executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { + merrs := new(multierror.Error) + merrs.Errors = append(merrs.Errors, err) d.logger.Println("[ERROR] driver.java: error connecting to plugin so destroying plugin pid and user pid") if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil { - d.logger.Printf("[ERROR] driver.java: error destroying plugin and userpid: %v", e) + merrs.Errors = append(merrs.Errors, fmt.Errorf("error destroying plugin and userpid: %v", e)) } - if e := destroyCgroup(id.Groups); e != nil { - d.logger.Printf("[ERROR] driver.exec: %v", e) + if id.IsolationConfig != nil { + if e := executor.DestroyCgroup(id.IsolationConfig.Cgroup); e != nil { + merrs.Errors = append(merrs.Errors, fmt.Errorf("destroying cgroup failed: %v", e)) + } + } + if e := ctx.AllocDir.UnmountAll(); e != nil { + merrs.Errors = append(merrs.Errors, e) } - return nil, fmt.Errorf("error connecting to plugin: %v", err) + return nil, fmt.Errorf("error connecting to plugin: %v", merrs.ErrorOrNil()) } // Return a driver handle h := &javaHandle{ - pluginClient: pluginClient, - executor: executor, - userPid: id.UserPid, - groups: id.Groups, - logger: d.logger, - killTimeout: id.KillTimeout, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: pluginClient, + executor: exec, + userPid: id.UserPid, + isolationConfig: id.IsolationConfig, + taskDir: id.TaskDir, + allocDir: id.AllocDir, + logger: d.logger, + killTimeout: id.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -237,10 +253,12 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro func (h *javaHandle) ID() string { id := javaId{ - KillTimeout: h.killTimeout, - PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), - UserPid: h.userPid, - Groups: h.groups, + KillTimeout: h.killTimeout, + PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + UserPid: h.userPid, + TaskDir: h.taskDir, + AllocDir: h.allocDir, + IsolationConfig: h.isolationConfig, } data, err := json.Marshal(id) @@ -275,6 +293,20 @@ func (h *javaHandle) Kill() error { func (h *javaHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) + if ps.ExitCode == 0 && err != nil { + if h.isolationConfig != nil { + if e := executor.DestroyCgroup(h.isolationConfig.Cgroup); e != nil { + h.logger.Printf("[ERROR] driver.java: destroying cgroup failed while killing cgroup: %v", e) + } + } else { + if e := killProcess(h.userPid); e != nil { + h.logger.Printf("[ERROR] driver.java: error killing user process: %v", e) + } + } + if e := h.allocDir.UnmountAll(); e != nil { + h.logger.Printf("[ERROR] driver.java: unmounting dev,proc and alloc dirs failed: %v", e) + } + } h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) h.pluginClient.Kill() diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 44ce6957a..68b7182ab 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -12,6 +12,7 @@ import ( "time" "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" @@ -46,6 +47,7 @@ type qemuHandle struct { pluginClient *plugin.Client userPid int executor executor.Executor + allocDir *allocdir.AllocDir killTimeout time.Duration logger *log.Logger waitCh chan *cstructs.WaitResult @@ -197,7 +199,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, Cmd: exec.Command(bin, "executor", pluginLogFile), } - exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { return nil, err } @@ -220,6 +222,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, pluginClient: pluginClient, executor: exec, userPid: ps.Pid, + allocDir: ctx.AllocDir, killTimeout: d.DriverContext.KillTimeout(task), logger: d.logger, doneCh: make(chan struct{}), @@ -234,6 +237,7 @@ type qemuId struct { KillTimeout time.Duration UserPid int PluginConfig *ExecutorReattachConfig + AllocDir *allocdir.AllocDir } func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -246,7 +250,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro Reattach: id.PluginConfig.PluginConfig(), } - executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput) + executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { d.logger.Println("[ERROR] driver.qemu: error connecting to plugin so destroying plugin pid and user pid") if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil { @@ -260,6 +264,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro pluginClient: pluginClient, executor: executor, userPid: id.UserPid, + allocDir: id.AllocDir, logger: d.logger, killTimeout: id.KillTimeout, doneCh: make(chan struct{}), @@ -274,6 +279,7 @@ func (h *qemuHandle) ID() string { KillTimeout: h.killTimeout, PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, + AllocDir: h.allocDir, } data, err := json.Marshal(id) @@ -309,6 +315,14 @@ func (h *qemuHandle) Kill() error { func (h *qemuHandle) run() { ps, err := h.executor.Wait() + if ps.ExitCode == 0 && err != nil { + if e := killProcess(h.userPid); e != nil { + h.logger.Printf("[ERROR] driver.qemu: error killing user process: %v", e) + } + if e := h.allocDir.UnmountAll(); e != nil { + h.logger.Printf("[ERROR] driver.qemu: unmounting dev,proc and alloc dirs failed: %v", e) + } + } close(h.doneCh) h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 154a7ed1b..9b89cad48 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -9,6 +9,7 @@ import ( "time" "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" @@ -38,6 +39,7 @@ type rawExecHandle struct { userPid int executor executor.Executor killTimeout time.Duration + allocDir *allocdir.AllocDir logger *log.Logger waitCh chan *cstructs.WaitResult doneCh chan struct{} @@ -103,7 +105,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl Cmd: exec.Command(bin, "executor", pluginLogFile), } - exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { return nil, err } @@ -127,6 +129,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl executor: exec, userPid: ps.Pid, killTimeout: d.DriverContext.KillTimeout(task), + allocDir: ctx.AllocDir, logger: d.logger, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), @@ -139,6 +142,7 @@ type rawExecId struct { KillTimeout time.Duration UserPid int PluginConfig *ExecutorReattachConfig + AllocDir *allocdir.AllocDir } func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -150,7 +154,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e pluginConfig := &plugin.ClientConfig{ Reattach: id.PluginConfig.PluginConfig(), } - executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput) + executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { d.logger.Println("[ERROR] driver.raw_exec: error connecting to plugin so destroying plugin pid and user pid") if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil { @@ -166,6 +170,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e userPid: id.UserPid, logger: d.logger, killTimeout: id.KillTimeout, + allocDir: id.AllocDir, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } @@ -178,6 +183,7 @@ func (h *rawExecHandle) ID() string { KillTimeout: h.killTimeout, PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, + AllocDir: h.allocDir, } data, err := json.Marshal(id) @@ -212,6 +218,14 @@ func (h *rawExecHandle) Kill() error { func (h *rawExecHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) + if ps.ExitCode == 0 && err != nil { + if e := killProcess(h.userPid); e != nil { + h.logger.Printf("[ERROR] driver.raw_exec: error killing user process: %v", e) + } + if e := h.allocDir.UnmountAll(); e != nil { + h.logger.Printf("[ERROR] driver.raw_exec: unmounting dev,proc and alloc dirs failed: %v", e) + } + } h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) h.pluginClient.Kill() diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index 08f578a06..ab6a948d7 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -1,10 +1,12 @@ package driver import ( + "encoding/json" "fmt" "io/ioutil" "net/http" "net/http/httptest" + "os" "path/filepath" "reflect" "testing" @@ -287,7 +289,7 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { Name: "sleep", Config: map[string]interface{}{ "command": testtask.Path(), - "args": []string{"sleep", "15s"}, + "args": []string{"sleep", "45s"}, }, Resources: basicResources, } diff --git a/client/driver/utils.go b/client/driver/utils.go index f2f8fc02a..188e467d2 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -7,17 +7,24 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" ) // createExecutor launches an executor plugin and returns an instance of the // Executor interface -func createExecutor(config *plugin.ClientConfig, w io.Writer) (executor.Executor, *plugin.Client, error) { +func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *config.Config) (executor.Executor, *plugin.Client, error) { config.HandshakeConfig = HandshakeConfig config.Plugins = GetPluginMap(w) + config.MaxPort = clientConfig.ClientMaxPort + config.MinPort = clientConfig.ClientMinPort + + // setting the setsid of the plugin process so that it doesn't get signals sent to + // the nomad client. if config.Cmd != nil { isolateCommand(config.Cmd) } + executorClient := plugin.NewClient(config) rpcClient, err := executorClient.Client() if err != nil { diff --git a/client/driver/utils_linux.go b/client/driver/utils_linux.go index eb7022cd0..693bba6f6 100644 --- a/client/driver/utils_linux.go +++ b/client/driver/utils_linux.go @@ -3,12 +3,6 @@ package driver import ( "os/exec" "syscall" - - "fmt" - "github.com/opencontainers/runc/libcontainer/cgroups" - cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" - "github.com/opencontainers/runc/libcontainer/cgroups/systemd" - cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" ) // isolateCommand sets the setsid flag in exec.Cmd to true so that the process @@ -20,21 +14,3 @@ func isolateCommand(cmd *exec.Cmd) { } cmd.SysProcAttr.Setsid = true } - -// destroyCgroup destroys a cgroup and thereby killing all the processes in that -// group -func destroyCgroup(group *cgroupConfig.Cgroup) error { - if group == nil { - return nil - } - var manager cgroups.Manager - manager = &cgroupFs.Manager{Cgroups: group} - if systemd.UseSystemd() { - manager = &systemd.Manager{Cgroups: group} - } - - if err := manager.Destroy(); err != nil { - return fmt.Errorf("failed to destroy cgroup: %v", err) - } - return nil -} diff --git a/client/driver/utils_posix.go b/client/driver/utils_posix.go index cf90d109d..fef4a002f 100644 --- a/client/driver/utils_posix.go +++ b/client/driver/utils_posix.go @@ -5,8 +5,6 @@ package driver import ( "os/exec" "syscall" - - cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" ) // isolateCommand sets the setsid flag in exec.Cmd to true so that the process @@ -18,7 +16,3 @@ func isolateCommand(cmd *exec.Cmd) { } cmd.SysProcAttr.Setsid = true } - -func destroyCgroup(group *cgroupConfig.Cgroup) error { - return nil -} diff --git a/client/driver/utils_windows.go b/client/driver/utils_windows.go index 84aff1e5f..5b2b7d842 100644 --- a/client/driver/utils_windows.go +++ b/client/driver/utils_windows.go @@ -2,14 +2,8 @@ package driver import ( "os/exec" - - cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" ) // TODO Figure out if this is needed in Wondows func isolateCommand(cmd *exec.Cmd) { } - -func destroyCgroup(group *cgroupConfig.Cgroup) error { - return nil -} diff --git a/command/agent/agent.go b/command/agent/agent.go index 28e5e6b60..6f0f1e8a3 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -216,6 +216,8 @@ func (a *Agent) setupClient() error { } conf.MaxKillTimeout = dur } + conf.ClientMaxPort = a.config.Client.ClientMaxPort + conf.ClientMinPort = a.config.Client.ClientMinPort // Setup the node conf.Node = new(structs.Node) @@ -242,14 +244,8 @@ func (a *Agent) setupClient() error { // Reserve some ports for the plugins if runtime.GOOS == "windows" { deviceName, err := a.findLoopbackDevice() - if conf.ExecutorMaxPort == 0 { - conf.ExecutorMaxPort = 15000 - } - if conf.ExecutorMinPort == 0 { - conf.ExecutorMinPort = 14000 - } if err != nil { - return fmt.Errorf("error finding the device name for the ip 127.0.0.1: %v", err) + return fmt.Errorf("error finding the device name for loopback: %v", err) } var nr *structs.NetworkResource for _, n := range conf.Node.Reserved.Networks { @@ -263,10 +259,9 @@ func (a *Agent) setupClient() error { ReservedPorts: make([]structs.Port, 0), } } - for i := conf.ExecutorMinPort; i <= conf.ExecutorMaxPort; i++ { - nr.ReservedPorts = append(nr.ReservedPorts, structs.Port{Label: fmt.Sprintf("plugin-%d", i), Value: i}) + for i := conf.ClientMinPort; i <= conf.ClientMaxPort; i++ { + nr.ReservedPorts = append(nr.ReservedPorts, structs.Port{Label: fmt.Sprintf("plugin-%d", i), Value: int(i)}) } - } // Create the client diff --git a/command/agent/config.go b/command/agent/config.go index 6d9219f96..89abd6300 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -159,6 +159,14 @@ type ClientConfig struct { // MaxKillTimeout allows capping the user-specifiable KillTimeout. MaxKillTimeout string `hcl:"max_kill_timeout"` + + // ClientMaxPort is the upper range of the ports that the client uses for + // communicating with plugin subsystems + ClientMaxPort uint `hcl:"client_max_port"` + + // ClientMinPort is the lower range of the ports that the client uses for + // communicating with plugin subsystems + ClientMinPort uint `hcl:"client_min_port"` } // ServerConfig is configuration specific to the server mode @@ -288,6 +296,8 @@ func DefaultConfig() *Config { Enabled: false, NetworkSpeed: 100, MaxKillTimeout: "30s", + ClientMinPort: 14000, + ClientMaxPort: 19000, }, Server: &ServerConfig{ Enabled: false, @@ -505,6 +515,12 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { if b.MaxKillTimeout != "" { result.MaxKillTimeout = b.MaxKillTimeout } + if b.ClientMaxPort != 0 { + result.ClientMaxPort = b.ClientMaxPort + } + if b.ClientMinPort != 0 { + result.ClientMinPort = b.ClientMinPort + } // Add the servers result.Servers = append(result.Servers, b.Servers...) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 2662e1155..aa5349a8d 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -104,6 +104,8 @@ func TestConfig_Merge(t *testing.T) { "foo": "bar", "baz": "zip", }, + ClientMaxPort: 20000, + ClientMinPort: 22000, NetworkSpeed: 105, MaxKillTimeout: "50s", }, diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index acf32fbd3..23ac96c1b 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -127,6 +127,15 @@ func (s *Server) applyPlan(result *structs.PlanResult, snap *state.StateSnapshot } req.Alloc = append(req.Alloc, result.FailedAllocs...) + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + now := time.Now().UTC().UnixNano() + for _, alloc := range req.Alloc { + if alloc.CreateTime == 0 { + alloc.CreateTime = now + } + } + // Dispatch the Raft transaction future, err := s.raftApplyFuture(structs.AllocUpdateRequestType, &req) if err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9e3b85076..88991884c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1794,6 +1794,10 @@ type Allocation struct { // AllocModifyIndex is not updated when the client updates allocations. This // lets the client pull only the allocs updated by the server. AllocModifyIndex uint64 + + // CreateTime is the time the allocation has finished scheduling and been + // verified by the plan applier. + CreateTime int64 } func (a *Allocation) Copy() *Allocation { @@ -1840,6 +1844,7 @@ func (a *Allocation) Stub() *AllocListStub { TaskStates: a.TaskStates, CreateIndex: a.CreateIndex, ModifyIndex: a.ModifyIndex, + CreateTime: a.CreateTime, } } @@ -1887,6 +1892,7 @@ type AllocListStub struct { TaskStates map[string]*TaskState CreateIndex uint64 ModifyIndex uint64 + CreateTime int64 } // AllocMetric is used to track various metrics while attempting diff --git a/website/source/docs/http/alloc.html.md b/website/source/docs/http/alloc.html.md index c507e81bd..202820d28 100644 --- a/website/source/docs/http/alloc.html.md +++ b/website/source/docs/http/alloc.html.md @@ -246,7 +246,7 @@ be specified using the `?region=` query parameter. * `TaskStateRunning` - The task is currently running. * `TaskStateDead` - The task is dead and will not run again. -

The latest 10 events are stored per task. Each event is timestamped (unix seconds) +

The latest 10 events are stored per task. Each event is timestamped (unix nano-seconds) and has one of the following types:

* `Driver Failure` - The task could not be started due to a failure in the diff --git a/website/source/docs/http/eval.html.md b/website/source/docs/http/eval.html.md index 87e048209..e11ff967b 100644 --- a/website/source/docs/http/eval.html.md +++ b/website/source/docs/http/eval.html.md @@ -24,7 +24,7 @@ be specified using the `?region=` query parameter.
GET
URL
-
`/v1/evaluations`
+
`/v1/evaluation/`
Parameters