diff --git a/client/driver/docker.go b/client/driver/docker.go index 171c1ed6d..bea4a3501 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -429,6 +429,7 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error { func (d *DockerDriver) Abilities() DriverAbilities { return DriverAbilities{ SendSignals: true, + Exec: true, } } diff --git a/client/driver/driver.go b/client/driver/driver.go index 74c898031..ee28888cf 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -192,6 +192,10 @@ type Driver interface { type DriverAbilities struct { // SendSignals marks the driver as being able to send signals SendSignals bool + + // Exec marks the driver as being able to execute arbitrary commands + // such as health checks. Used by the ScriptExecutor interface. + Exec bool } // LogEventFn is a callback which allows Drivers to emit task events. @@ -255,10 +259,14 @@ type DriverHandle interface { // Signal is used to send a signal to the task Signal(s os.Signal) error + + // ScriptExecutor is an interface used to execute commands such as + // health check scripts in the a DriverHandle's context. + ScriptExecutor } -// ScriptExecutor is a DriverHandle that supports Exec()ing commands in the -// driver's context. +// ScriptExecutor is an interface that supports Exec()ing commands in the +// driver's context. Split out of DriverHandle to ease testing. type ScriptExecutor interface { Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) } diff --git a/client/driver/exec.go b/client/driver/exec.go index 7da657e1e..bc6ee3aee 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -86,6 +86,7 @@ func (d *ExecDriver) Validate(config map[string]interface{}) error { func (d *ExecDriver) Abilities() DriverAbilities { return DriverAbilities{ SendSignals: true, + Exec: true, } } diff --git a/client/driver/java.go b/client/driver/java.go index c684e85bf..c215e6882 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -108,6 +108,7 @@ func (d *JavaDriver) Validate(config map[string]interface{}) error { func (d *JavaDriver) Abilities() DriverAbilities { return DriverAbilities{ SendSignals: true, + Exec: true, } } diff --git a/client/driver/lxc.go b/client/driver/lxc.go index 0d369d34c..ac7d3c298 100644 --- a/client/driver/lxc.go +++ b/client/driver/lxc.go @@ -3,6 +3,7 @@ package driver import ( + "context" "encoding/json" "fmt" "log" @@ -149,6 +150,7 @@ func (d *LxcDriver) Validate(config map[string]interface{}) error { func (d *LxcDriver) Abilities() DriverAbilities { return DriverAbilities{ SendSignals: false, + Exec: false, } } @@ -375,6 +377,10 @@ func (h *lxcDriverHandle) Update(task *structs.Task) error { return nil } +func (h *lxcDriverHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) { + return nil, 0, fmt.Errorf("lxc driver cannot execute commands") +} + func (h *lxcDriverHandle) Kill() error { h.logger.Printf("[INFO] driver.lxc: shutting down container %q", h.container.Name()) if err := h.container.Shutdown(h.killTimeout); err != nil { diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index 518e79cea..bdaff3f70 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -76,6 +76,7 @@ func NewMockDriver(ctx *DriverContext) Driver { func (d *MockDriver) Abilities() DriverAbilities { return DriverAbilities{ SendSignals: false, + Exec: true, } } diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 4e04f9522..856b2c023 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -1,6 +1,7 @@ package driver import ( + "context" "encoding/json" "fmt" "log" @@ -97,6 +98,7 @@ func (d *QemuDriver) Validate(config map[string]interface{}) error { func (d *QemuDriver) Abilities() DriverAbilities { return DriverAbilities{ SendSignals: false, + Exec: false, } } @@ -353,6 +355,10 @@ func (h *qemuHandle) Update(task *structs.Task) error { return nil } +func (h *qemuHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) { + return nil, 0, fmt.Errorf("Qemu driver can't execute commands") +} + func (h *qemuHandle) Signal(s os.Signal) error { return fmt.Errorf("Qemu driver can't send signals") } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index e0e86c20b..5e602f47b 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -80,6 +80,7 @@ func (d *RawExecDriver) Validate(config map[string]interface{}) error { func (d *RawExecDriver) Abilities() DriverAbilities { return DriverAbilities{ SendSignals: true, + Exec: true, } } diff --git a/client/driver/rkt.go b/client/driver/rkt.go index d1e039dd6..a8cff3c02 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -165,6 +165,7 @@ func (d *RktDriver) Validate(config map[string]interface{}) error { func (d *RktDriver) Abilities() DriverAbilities { return DriverAbilities{ SendSignals: false, + Exec: true, } } diff --git a/client/task_runner.go b/client/task_runner.go index c4baef9f9..88ce02017 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -292,15 +292,11 @@ func (r *TaskRunner) RestoreState() error { return nil } - //FIXME is there a better place to do this? used to be in executor - // Prepare services - interpolateServices(r.getTaskEnv(), r.task) - - // Ensure the service is registered - scriptExec, _ := handle.(driver.ScriptExecutor) - if err := r.consul.RegisterTask(r.alloc.ID, r.task, scriptExec); err != nil { - //FIXME What to do if this fails? - r.logger.Printf("[WARN] client: failed to register services and checks for task %q alloc %q: %v", + if err := r.registerServices(d, handle); err != nil { + // Don't hard fail here as there's a chance this task + // registered with Consul properly when it initial + // started. + r.logger.Printf("[WARN] client: failed to register services and checks with consul for task %q in alloc %q: %v", r.task.Name, r.alloc.ID, err) } @@ -1186,8 +1182,12 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) { // Mark that we received the kill event r.setState(structs.TaskStateRunning, event) + r.handleLock.Lock() + handle := r.handle + r.handleLock.Unlock() + // Kill the task using an exponential backoff in-case of failures. - destroySuccess, err := r.handleDestroy() + destroySuccess, err := r.handleDestroy(handle) if !destroySuccess { // We couldn't successfully destroy the resource created. r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err) @@ -1236,26 +1236,37 @@ func (r *TaskRunner) startTask() error { } + if err := r.registerServices(drv, handle); err != nil { + // All IO is done asynchronously, so errors from registering + // services are hard failures. + r.logger.Printf("[ERR] client: failed to register services and checks for task %q alloc %q: %v", r.task.Name, r.alloc.ID, err) + + // Kill the started task + if destroyed, err := r.handleDestroy(handle); !destroyed { + r.logger.Printf("[ERR] client: failed to kill task %q alloc %q. Resources may be leaked: %v", + r.task.Name, r.alloc.ID, err) + } + return structs.NewRecoverableError(err, false) + } + r.handleLock.Lock() r.handle = handle r.handleLock.Unlock() - //FIXME is there a better place to do this? used to be in executor - // Prepare services - interpolateServices(r.getTaskEnv(), r.task) - - // RegisterTask properly handles scriptExec being nil, so it just - // ignore the ok value. - scriptExec, _ := handle.(driver.ScriptExecutor) - if err := r.consul.RegisterTask(r.alloc.ID, r.task, scriptExec); err != nil { - //FIXME handle errors?! - //FIXME could break into prepare & submit steps as only preperation can error... - r.logger.Printf("[ERR] client: failed to register services and checks for task %q alloc %q: %v", r.task.Name, r.alloc.ID, err) - } - return nil } +// registerServices and checks with Consul. +func (r *TaskRunner) registerServices(d driver.Driver, h driver.ScriptExecutor) error { + var exec driver.ScriptExecutor + if d.Abilities().Exec { + // Allow set the script executor if the driver supports it + exec = h + } + interpolateServices(r.getTaskEnv(), r.task) + return r.consul.RegisterTask(r.alloc.ID, r.task, exec) +} + // interpolateServices interpolates tags in a service and checks with values from the // task's environment. func interpolateServices(taskEnv *env.TaskEnvironment, task *structs.Task) { @@ -1391,23 +1402,20 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { var mErr multierror.Error r.handleLock.Lock() if r.handle != nil { + // Need to check driver abilities for updating services + drv, err := r.createDriver() + if err != nil { + // Something has really gone wrong; don't continue + r.handleLock.Unlock() + return fmt.Errorf("error accessing driver when updating task %q: %v", r.task.Name, err) + } + // Update will update resources and store the new kill timeout. if err := r.handle.Update(updatedTask); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err)) } - //FIXME is there a better place to do this? used to be in executor - // Prepare services - interpolateServices(r.getTaskEnv(), updatedTask) - - // Not all drivers support Exec (eg QEMU), but RegisterTask - // handles nil ScriptExecutors - scriptExec, _ := r.handle.(driver.ScriptExecutor) - - // Since the handle exists, the task is running, so we need to - // update it in Consul (if the handle doesn't exist - // registration in Consul will happen when it's created) - if err := r.consul.UpdateTask(r.alloc.ID, r.task, updatedTask, scriptExec); err != nil { + if err := r.updateServices(drv, r.handle, r.task, updatedTask); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err)) } } @@ -1424,14 +1432,25 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { return mErr.ErrorOrNil() } +// updateServices and checks with Consul. +func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task) error { + var exec driver.ScriptExecutor + if d.Abilities().Exec { + // Allow set the script executor if the driver supports it + exec = h + } + interpolateServices(r.getTaskEnv(), r.task) + return r.consul.UpdateTask(r.alloc.ID, old, new, exec) +} + // handleDestroy kills the task handle. In the case that killing fails, // handleDestroy will retry with an exponential backoff and will give up at a // given limit. It returns whether the task was destroyed and the error // associated with the last kill attempt. -func (r *TaskRunner) handleDestroy() (destroyed bool, err error) { +func (r *TaskRunner) handleDestroy(handle driver.DriverHandle) (destroyed bool, err error) { // Cap the number of times we attempt to kill the task. for i := 0; i < killFailureLimit; i++ { - if err = r.handle.Kill(); err != nil { + if err = handle.Kill(); err != nil { // Calculate the new backoff backoff := (1 << (2 * uint64(i))) * killBackoffBaseline if backoff > killBackoffLimit {