diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 509e9a31b..a5bc23cfc 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -188,24 +188,41 @@ func (ar *allocRunner) Run() { return } - // Do not run allocs that are terminal on the client - if ar.Alloc().ClientTerminalStatus() { - ar.logger.Trace("alloc terminal; not running", "status", ar.Alloc().ClientStatus) + // If an alloc should not be run, ensure any restored task handles are + // destroyed and exit to wait for the AR to be GC'd by the client. + if !ar.shouldRun() { + ar.logger.Debug("not running terminal alloc") + ar.killTasks() return } + // Run! (and mark as having been run to ensure Destroy cleans up properly) + ar.runLaunched = true + go ar.runImpl() +} + +// shouldRun returns true if the alloc is in a state that the alloc runner +// should run it. +func (ar *allocRunner) shouldRun() bool { + // Do not run allocs that are terminal + if ar.Alloc().TerminalStatus() { + ar.logger.Trace("alloc terminal; not running", + "desired_status", ar.Alloc().DesiredStatus, + "client_status", ar.Alloc().ClientStatus, + ) + return false + } + // It's possible that the alloc local state was marked terminal before // the server copy of the alloc (checked above) was marked as terminal, // so check the local state as well. switch clientStatus := ar.AllocState().ClientStatus; clientStatus { case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed, structs.AllocClientStatusLost: ar.logger.Trace("alloc terminal; updating server and not running", "status", clientStatus) - return + return false } - // Run! (and mark as having been run to ensure Destroy cleans up properly) - ar.runLaunched = true - go ar.runImpl() + return true } func (ar *allocRunner) runImpl() { diff --git a/client/allocrunner/interfaces/task_lifecycle.go b/client/allocrunner/interfaces/task_lifecycle.go index b22ef285b..808afd37a 100644 --- a/client/allocrunner/interfaces/task_lifecycle.go +++ b/client/allocrunner/interfaces/task_lifecycle.go @@ -109,8 +109,8 @@ type TaskKillResponse struct{} type TaskKillHook interface { TaskHook - // Kill is called when a task is going to be killed. - Kill(context.Context, *TaskKillRequest, *TaskKillResponse) error + // Killing is called when a task is going to be Killed or Restarted. + Killing(context.Context, *TaskKillRequest, *TaskKillResponse) error } type TaskExitedRequest struct{} diff --git a/client/allocrunner/taskrunner/interfaces/lifecycle.go b/client/allocrunner/taskrunner/interfaces/lifecycle.go index 84bbda228..1890471bf 100644 --- a/client/allocrunner/taskrunner/interfaces/lifecycle.go +++ b/client/allocrunner/taskrunner/interfaces/lifecycle.go @@ -7,7 +7,13 @@ import ( ) type TaskLifecycle interface { + // Restart a task in place. If failure=false then the restart does not + // count as an attempt in the restart policy. Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error + + // Sends a signal to a task. Signal(event *structs.TaskEvent, signal string) error + + // Kill a task permanently. Kill(ctx context.Context, event *structs.TaskEvent) error } diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index 17353044e..9083e75d7 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -12,6 +12,7 @@ import ( func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { // Grab the handle handle := tr.getDriverHandle() + // Check it is running if handle == nil { return ErrTaskNotRunning @@ -20,12 +21,14 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai // Emit the event since it may take a long time to kill tr.EmitEvent(event) + // Run the hooks prior to restarting the task + tr.killing() + // Tell the restart tracker that a restart triggered the exit tr.restartTracker.SetRestartTriggered(failure) // Kill the task using an exponential backoff in-case of failures. - destroySuccess, err := tr.handleDestroy(handle) - if !destroySuccess { + if err := tr.killTask(handle); err != nil { // We couldn't successfully destroy the resource created. tr.logger.Error("failed to kill task. Resources may have been leaked", "error", err) } @@ -75,16 +78,17 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error tr.EmitEvent(event) // Run the hooks prior to killing the task - tr.kill() + tr.killing() - // Tell the restart tracker that the task has been killed + // Tell the restart tracker that the task has been killed so it doesn't + // attempt to restart it. tr.restartTracker.SetKilled() // Kill the task using an exponential backoff in-case of failures. - destroySuccess, destroyErr := tr.handleDestroy(handle) - if !destroySuccess { + killErr := tr.killTask(handle) + if killErr != nil { // We couldn't successfully destroy the resource created. - tr.logger.Error("failed to kill task. Resources may have been leaked", "error", destroyErr) + tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr) } // Block until task has exited. @@ -103,10 +107,10 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error <-waitCh // Store that the task has been destroyed and any associated error. - tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr)) + tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(killErr)) - if destroyErr != nil { - return destroyErr + if killErr != nil { + return killErr } else if err := ctx.Err(); err != nil { return err } diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index 801cc2073..a754cbcef 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" @@ -34,6 +35,7 @@ type serviceHook struct { logger log.Logger // The following fields may be updated + delay time.Duration driverExec tinterfaces.ScriptExecutor driverNet *cstructs.DriverNetwork canary bool @@ -53,6 +55,7 @@ func newServiceHook(c serviceHookConfig) *serviceHook { taskName: c.task.Name, services: c.task.Services, restarter: c.restarter, + delay: c.task.ShutdownDelay, } if res := c.alloc.TaskResources[c.task.Name]; res != nil { @@ -111,6 +114,7 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ } // Update service hook fields + h.delay = task.ShutdownDelay h.taskEnv = req.TaskEnv h.services = task.Services h.networks = networks @@ -122,10 +126,35 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ return h.consul.UpdateTask(oldTaskServices, newTaskServices) } -func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error { +func (h *serviceHook) Killing(ctx context.Context, req *interfaces.TaskKillRequest, resp *interfaces.TaskKillResponse) error { h.mu.Lock() defer h.mu.Unlock() + // Deregister before killing task + h.deregister() + + // If there's no shutdown delay, exit early + if h.delay == 0 { + return nil + } + + h.logger.Debug("waiting before killing task", "shutdown_delay", h.delay) + select { + case <-ctx.Done(): + case <-time.After(h.delay): + } + return nil +} + +func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error { + h.mu.Lock() + defer h.mu.Unlock() + h.deregister() + return nil +} + +// deregister services from Consul. +func (h *serviceHook) deregister() { taskServices := h.getTaskServices() h.consul.RemoveTask(taskServices) @@ -134,7 +163,6 @@ func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *in taskServices.Canary = !taskServices.Canary h.consul.RemoveTask(taskServices) - return nil } func (h *serviceHook) getTaskServices() *agentconsul.TaskServices { diff --git a/client/allocrunner/taskrunner/shutdown_delay_hook.go b/client/allocrunner/taskrunner/shutdown_delay_hook.go deleted file mode 100644 index 15f76ebd1..000000000 --- a/client/allocrunner/taskrunner/shutdown_delay_hook.go +++ /dev/null @@ -1,36 +0,0 @@ -package taskrunner - -import ( - "context" - "time" - - log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/allocrunner/interfaces" -) - -// shutdownDelayHook delays shutting down a task between deregistering it from -// Consul and actually killing it. -type shutdownDelayHook struct { - delay time.Duration - logger log.Logger -} - -func newShutdownDelayHook(delay time.Duration, logger log.Logger) *shutdownDelayHook { - h := &shutdownDelayHook{ - delay: delay, - } - h.logger = logger.Named(h.Name()) - return h -} - -func (*shutdownDelayHook) Name() string { - return "shutdown-delay" -} - -func (h *shutdownDelayHook) Kill(ctx context.Context, req *interfaces.TaskKillRequest, resp *interfaces.TaskKillResponse) error { - select { - case <-ctx.Done(): - case <-time.After(h.delay): - } - return nil -} diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 5a45bdb71..b90f45457 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -479,58 +479,21 @@ func (tr *TaskRunner) runDriver() error { //TODO mounts and devices //XXX Evaluate and encode driver config - var handle *drivers.TaskHandle - var net *cstructs.DriverNetwork - var err error - - // Check to see if a task handle was restored - tr.localStateLock.RLock() - handle = tr.localState.TaskHandle - net = tr.localState.DriverNetwork - tr.localStateLock.RUnlock() - - if handle != nil { - tr.logger.Trace("restored handle; recovering task", "task_id", handle.Config.ID) - if err := tr.driver.RecoverTask(handle); err != nil { - tr.logger.Error("error recovering task; destroying and restarting", - "error", err, "task_id", handle.Config.ID) - - // Clear invalid task state - tr.localStateLock.Lock() - tr.localState.TaskHandle = nil - tr.localState.DriverNetwork = nil - tr.localStateLock.Unlock() - - // Try to cleanup any existing task state in the plugin before restarting - if err := tr.driver.DestroyTask(handle.Config.ID, true); err != nil { - // Ignore ErrTaskNotFound errors as ideally - // this task has already been stopped and - // therefore doesn't exist. - if err != drivers.ErrTaskNotFound { - tr.logger.Warn("error destroying unrecoverable task", - "error", err, "task_id", handle.Config.ID) - } - - } - - goto START - } - - // Update driver handle on task runner - tr.setDriverHandle(NewDriverHandle(tr.driver, handle.Config.ID, tr.Task(), net)) - + // If there's already a task handle (eg from a Restore) there's nothing + // to do except update state. + if tr.getDriverHandle() != nil { // Ensure running state is persisted but do *not* append a new // task event as restoring is a client event and not relevant // to a task's lifecycle. if err := tr.updateStateImpl(structs.TaskStateRunning); err != nil { + //TODO return error and destroy task to avoid an orphaned task? tr.logger.Warn("error persisting task state", "error", err) } return nil } -START: // Start the job if there's no existing handle (or if RecoverTask failed) - handle, net, err = tr.driver.StartTask(taskConfig) + handle, net, err := tr.driver.StartTask(taskConfig) if err != nil { return fmt.Errorf("driver start failed: %v", err) } @@ -619,17 +582,17 @@ func (tr *TaskRunner) initDriver() error { return nil } -// handleDestroy kills the task handle. In the case that killing fails, -// handleDestroy will retry with an exponential backoff and will give up at a -// given limit. It returns whether the task was destroyed and the error -// associated with the last kill attempt. -func (tr *TaskRunner) handleDestroy(handle *DriverHandle) (destroyed bool, err error) { +// killTask kills the task handle. In the case that killing fails, +// killTask will retry with an exponential backoff and will give up at a +// given limit. Returns an error if the task could not be killed. +func (tr *TaskRunner) killTask(handle *DriverHandle) error { // Cap the number of times we attempt to kill the task. + var err error for i := 0; i < killFailureLimit; i++ { if err = handle.Kill(); err != nil { if err == drivers.ErrTaskNotFound { tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID()) - return true, nil + return nil } // Calculate the new backoff backoff := (1 << (2 * uint64(i))) * killBackoffBaseline @@ -641,10 +604,10 @@ func (tr *TaskRunner) handleDestroy(handle *DriverHandle) (destroyed bool, err e time.Sleep(backoff) } else { // Kill was successful - return true, nil + return nil } } - return + return err } // persistLocalState persists local state to disk synchronously. @@ -685,13 +648,54 @@ func (tr *TaskRunner) Restore() error { ls.Canonicalize() tr.localState = ls } + if ts != nil { ts.Canonicalize() tr.state = ts } + + // If a TaskHandle was persisted, ensure it is valid or destroy it. + if taskHandle := tr.localState.TaskHandle; taskHandle != nil { + //TODO if RecoverTask returned the DriverNetwork we wouldn't + // have to persist it at all! + tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) + } return nil } +// restoreHandle ensures a TaskHandle is valid by calling Driver.RecoverTask +// and sets the driver handle. If the TaskHandle is not valid, DestroyTask is +// called. +func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstructs.DriverNetwork) { + // Ensure handle is well-formed + if taskHandle.Config == nil { + return + } + + if err := tr.driver.RecoverTask(taskHandle); err != nil { + tr.logger.Error("error recovering task; destroying and restarting", + "error", err, "task_id", taskHandle.Config.ID) + + // Try to cleanup any existing task state in the plugin before restarting + if err := tr.driver.DestroyTask(taskHandle.Config.ID, true); err != nil { + // Ignore ErrTaskNotFound errors as ideally + // this task has already been stopped and + // therefore doesn't exist. + if err != drivers.ErrTaskNotFound { + tr.logger.Warn("error destroying unrecoverable task", + "error", err, "task_id", taskHandle.Config.ID) + } + + } + + return + } + + // Update driver handle on task runner + tr.setDriverHandle(NewDriverHandle(tr.driver, taskHandle.Config.ID, tr.Task(), net)) + return +} + // UpdateState sets the task runners allocation state and triggers a server // update. func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index e64683107..40e157a5f 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -26,7 +26,6 @@ func (tr *TaskRunner) initHooks() { newLogMonHook(tr.logmonHookConfig, hookLogger), newDispatchHook(tr.Alloc(), hookLogger), newArtifactHook(tr, hookLogger), - newShutdownDelayHook(task.ShutdownDelay, hookLogger), newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger), } @@ -349,8 +348,8 @@ func (tr *TaskRunner) updateHooks() { } } -// kill is used to run the runners kill hooks. -func (tr *TaskRunner) kill() { +// killing is used to run the runners kill hooks. +func (tr *TaskRunner) killing() { if tr.logger.IsTrace() { start := time.Now() tr.logger.Trace("running kill hooks", "start", start) @@ -378,7 +377,7 @@ func (tr *TaskRunner) kill() { // Run the update hook req := interfaces.TaskKillRequest{} var resp interfaces.TaskKillResponse - if err := upd.Kill(context.Background(), &req, &resp); err != nil { + if err := upd.Killing(context.Background(), &req, &resp); err != nil { tr.logger.Error("kill hook failed", "name", name, "error", err) }