Use a DriverAbility to expose Exec functionality

This commit is contained in:
Michael Schurter
2017-04-13 09:52:16 -07:00
parent 4002d87a6b
commit 0e0845e94b
10 changed files with 84 additions and 39 deletions

View File

@@ -429,6 +429,7 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error {
func (d *DockerDriver) Abilities() DriverAbilities {
return DriverAbilities{
SendSignals: true,
Exec: true,
}
}

View File

@@ -192,6 +192,10 @@ type Driver interface {
type DriverAbilities struct {
// SendSignals marks the driver as being able to send signals
SendSignals bool
// Exec marks the driver as being able to execute arbitrary commands
// such as health checks. Used by the ScriptExecutor interface.
Exec bool
}
// LogEventFn is a callback which allows Drivers to emit task events.
@@ -255,10 +259,14 @@ type DriverHandle interface {
// Signal is used to send a signal to the task
Signal(s os.Signal) error
// ScriptExecutor is an interface used to execute commands such as
// health check scripts in the a DriverHandle's context.
ScriptExecutor
}
// ScriptExecutor is a DriverHandle that supports Exec()ing commands in the
// driver's context.
// ScriptExecutor is an interface that supports Exec()ing commands in the
// driver's context. Split out of DriverHandle to ease testing.
type ScriptExecutor interface {
Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error)
}

View File

@@ -86,6 +86,7 @@ func (d *ExecDriver) Validate(config map[string]interface{}) error {
func (d *ExecDriver) Abilities() DriverAbilities {
return DriverAbilities{
SendSignals: true,
Exec: true,
}
}

View File

@@ -108,6 +108,7 @@ func (d *JavaDriver) Validate(config map[string]interface{}) error {
func (d *JavaDriver) Abilities() DriverAbilities {
return DriverAbilities{
SendSignals: true,
Exec: true,
}
}

View File

@@ -3,6 +3,7 @@
package driver
import (
"context"
"encoding/json"
"fmt"
"log"
@@ -149,6 +150,7 @@ func (d *LxcDriver) Validate(config map[string]interface{}) error {
func (d *LxcDriver) Abilities() DriverAbilities {
return DriverAbilities{
SendSignals: false,
Exec: false,
}
}
@@ -375,6 +377,10 @@ func (h *lxcDriverHandle) Update(task *structs.Task) error {
return nil
}
func (h *lxcDriverHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
return nil, 0, fmt.Errorf("lxc driver cannot execute commands")
}
func (h *lxcDriverHandle) Kill() error {
h.logger.Printf("[INFO] driver.lxc: shutting down container %q", h.container.Name())
if err := h.container.Shutdown(h.killTimeout); err != nil {

View File

@@ -76,6 +76,7 @@ func NewMockDriver(ctx *DriverContext) Driver {
func (d *MockDriver) Abilities() DriverAbilities {
return DriverAbilities{
SendSignals: false,
Exec: true,
}
}

View File

@@ -1,6 +1,7 @@
package driver
import (
"context"
"encoding/json"
"fmt"
"log"
@@ -97,6 +98,7 @@ func (d *QemuDriver) Validate(config map[string]interface{}) error {
func (d *QemuDriver) Abilities() DriverAbilities {
return DriverAbilities{
SendSignals: false,
Exec: false,
}
}
@@ -353,6 +355,10 @@ func (h *qemuHandle) Update(task *structs.Task) error {
return nil
}
func (h *qemuHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
return nil, 0, fmt.Errorf("Qemu driver can't execute commands")
}
func (h *qemuHandle) Signal(s os.Signal) error {
return fmt.Errorf("Qemu driver can't send signals")
}

View File

@@ -80,6 +80,7 @@ func (d *RawExecDriver) Validate(config map[string]interface{}) error {
func (d *RawExecDriver) Abilities() DriverAbilities {
return DriverAbilities{
SendSignals: true,
Exec: true,
}
}

View File

@@ -165,6 +165,7 @@ func (d *RktDriver) Validate(config map[string]interface{}) error {
func (d *RktDriver) Abilities() DriverAbilities {
return DriverAbilities{
SendSignals: false,
Exec: true,
}
}

View File

@@ -292,15 +292,11 @@ func (r *TaskRunner) RestoreState() error {
return nil
}
//FIXME is there a better place to do this? used to be in executor
// Prepare services
interpolateServices(r.getTaskEnv(), r.task)
// Ensure the service is registered
scriptExec, _ := handle.(driver.ScriptExecutor)
if err := r.consul.RegisterTask(r.alloc.ID, r.task, scriptExec); err != nil {
//FIXME What to do if this fails?
r.logger.Printf("[WARN] client: failed to register services and checks for task %q alloc %q: %v",
if err := r.registerServices(d, handle); err != nil {
// Don't hard fail here as there's a chance this task
// registered with Consul properly when it initial
// started.
r.logger.Printf("[WARN] client: failed to register services and checks with consul for task %q in alloc %q: %v",
r.task.Name, r.alloc.ID, err)
}
@@ -1186,8 +1182,12 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) {
// Mark that we received the kill event
r.setState(structs.TaskStateRunning, event)
r.handleLock.Lock()
handle := r.handle
r.handleLock.Unlock()
// Kill the task using an exponential backoff in-case of failures.
destroySuccess, err := r.handleDestroy()
destroySuccess, err := r.handleDestroy(handle)
if !destroySuccess {
// We couldn't successfully destroy the resource created.
r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err)
@@ -1236,26 +1236,37 @@ func (r *TaskRunner) startTask() error {
}
if err := r.registerServices(drv, handle); err != nil {
// All IO is done asynchronously, so errors from registering
// services are hard failures.
r.logger.Printf("[ERR] client: failed to register services and checks for task %q alloc %q: %v", r.task.Name, r.alloc.ID, err)
// Kill the started task
if destroyed, err := r.handleDestroy(handle); !destroyed {
r.logger.Printf("[ERR] client: failed to kill task %q alloc %q. Resources may be leaked: %v",
r.task.Name, r.alloc.ID, err)
}
return structs.NewRecoverableError(err, false)
}
r.handleLock.Lock()
r.handle = handle
r.handleLock.Unlock()
//FIXME is there a better place to do this? used to be in executor
// Prepare services
interpolateServices(r.getTaskEnv(), r.task)
// RegisterTask properly handles scriptExec being nil, so it just
// ignore the ok value.
scriptExec, _ := handle.(driver.ScriptExecutor)
if err := r.consul.RegisterTask(r.alloc.ID, r.task, scriptExec); err != nil {
//FIXME handle errors?!
//FIXME could break into prepare & submit steps as only preperation can error...
r.logger.Printf("[ERR] client: failed to register services and checks for task %q alloc %q: %v", r.task.Name, r.alloc.ID, err)
}
return nil
}
// registerServices and checks with Consul.
func (r *TaskRunner) registerServices(d driver.Driver, h driver.ScriptExecutor) error {
var exec driver.ScriptExecutor
if d.Abilities().Exec {
// Allow set the script executor if the driver supports it
exec = h
}
interpolateServices(r.getTaskEnv(), r.task)
return r.consul.RegisterTask(r.alloc.ID, r.task, exec)
}
// interpolateServices interpolates tags in a service and checks with values from the
// task's environment.
func interpolateServices(taskEnv *env.TaskEnvironment, task *structs.Task) {
@@ -1391,23 +1402,20 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
var mErr multierror.Error
r.handleLock.Lock()
if r.handle != nil {
// Need to check driver abilities for updating services
drv, err := r.createDriver()
if err != nil {
// Something has really gone wrong; don't continue
r.handleLock.Unlock()
return fmt.Errorf("error accessing driver when updating task %q: %v", r.task.Name, err)
}
// Update will update resources and store the new kill timeout.
if err := r.handle.Update(updatedTask); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err))
}
//FIXME is there a better place to do this? used to be in executor
// Prepare services
interpolateServices(r.getTaskEnv(), updatedTask)
// Not all drivers support Exec (eg QEMU), but RegisterTask
// handles nil ScriptExecutors
scriptExec, _ := r.handle.(driver.ScriptExecutor)
// Since the handle exists, the task is running, so we need to
// update it in Consul (if the handle doesn't exist
// registration in Consul will happen when it's created)
if err := r.consul.UpdateTask(r.alloc.ID, r.task, updatedTask, scriptExec); err != nil {
if err := r.updateServices(drv, r.handle, r.task, updatedTask); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err))
}
}
@@ -1424,14 +1432,25 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
return mErr.ErrorOrNil()
}
// updateServices and checks with Consul.
func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task) error {
var exec driver.ScriptExecutor
if d.Abilities().Exec {
// Allow set the script executor if the driver supports it
exec = h
}
interpolateServices(r.getTaskEnv(), r.task)
return r.consul.UpdateTask(r.alloc.ID, old, new, exec)
}
// handleDestroy kills the task handle. In the case that killing fails,
// handleDestroy will retry with an exponential backoff and will give up at a
// given limit. It returns whether the task was destroyed and the error
// associated with the last kill attempt.
func (r *TaskRunner) handleDestroy() (destroyed bool, err error) {
func (r *TaskRunner) handleDestroy(handle driver.DriverHandle) (destroyed bool, err error) {
// Cap the number of times we attempt to kill the task.
for i := 0; i < killFailureLimit; i++ {
if err = r.handle.Kill(); err != nil {
if err = handle.Kill(); err != nil {
// Calculate the new backoff
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
if backoff > killBackoffLimit {