mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 19:05:42 +03:00
client: fix tr lifecycle logic and shutdown delay
ShutdownDelay must be honored whenever the task is killed or restarted. Services were not being deregistered prior to restarting.
This commit is contained in:
@@ -188,24 +188,41 @@ func (ar *allocRunner) Run() {
|
||||
return
|
||||
}
|
||||
|
||||
// Do not run allocs that are terminal on the client
|
||||
if ar.Alloc().ClientTerminalStatus() {
|
||||
ar.logger.Trace("alloc terminal; not running", "status", ar.Alloc().ClientStatus)
|
||||
// If an alloc should not be run, ensure any restored task handles are
|
||||
// destroyed and exit to wait for the AR to be GC'd by the client.
|
||||
if !ar.shouldRun() {
|
||||
ar.logger.Debug("not running terminal alloc")
|
||||
ar.killTasks()
|
||||
return
|
||||
}
|
||||
|
||||
// Run! (and mark as having been run to ensure Destroy cleans up properly)
|
||||
ar.runLaunched = true
|
||||
go ar.runImpl()
|
||||
}
|
||||
|
||||
// shouldRun returns true if the alloc is in a state that the alloc runner
|
||||
// should run it.
|
||||
func (ar *allocRunner) shouldRun() bool {
|
||||
// Do not run allocs that are terminal
|
||||
if ar.Alloc().TerminalStatus() {
|
||||
ar.logger.Trace("alloc terminal; not running",
|
||||
"desired_status", ar.Alloc().DesiredStatus,
|
||||
"client_status", ar.Alloc().ClientStatus,
|
||||
)
|
||||
return false
|
||||
}
|
||||
|
||||
// It's possible that the alloc local state was marked terminal before
|
||||
// the server copy of the alloc (checked above) was marked as terminal,
|
||||
// so check the local state as well.
|
||||
switch clientStatus := ar.AllocState().ClientStatus; clientStatus {
|
||||
case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed, structs.AllocClientStatusLost:
|
||||
ar.logger.Trace("alloc terminal; updating server and not running", "status", clientStatus)
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// Run! (and mark as having been run to ensure Destroy cleans up properly)
|
||||
ar.runLaunched = true
|
||||
go ar.runImpl()
|
||||
return true
|
||||
}
|
||||
|
||||
func (ar *allocRunner) runImpl() {
|
||||
|
||||
@@ -109,8 +109,8 @@ type TaskKillResponse struct{}
|
||||
type TaskKillHook interface {
|
||||
TaskHook
|
||||
|
||||
// Kill is called when a task is going to be killed.
|
||||
Kill(context.Context, *TaskKillRequest, *TaskKillResponse) error
|
||||
// Killing is called when a task is going to be Killed or Restarted.
|
||||
Killing(context.Context, *TaskKillRequest, *TaskKillResponse) error
|
||||
}
|
||||
|
||||
type TaskExitedRequest struct{}
|
||||
|
||||
@@ -7,7 +7,13 @@ import (
|
||||
)
|
||||
|
||||
type TaskLifecycle interface {
|
||||
// Restart a task in place. If failure=false then the restart does not
|
||||
// count as an attempt in the restart policy.
|
||||
Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error
|
||||
|
||||
// Sends a signal to a task.
|
||||
Signal(event *structs.TaskEvent, signal string) error
|
||||
|
||||
// Kill a task permanently.
|
||||
Kill(ctx context.Context, event *structs.TaskEvent) error
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
|
||||
// Grab the handle
|
||||
handle := tr.getDriverHandle()
|
||||
|
||||
// Check it is running
|
||||
if handle == nil {
|
||||
return ErrTaskNotRunning
|
||||
@@ -20,12 +21,14 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
|
||||
// Emit the event since it may take a long time to kill
|
||||
tr.EmitEvent(event)
|
||||
|
||||
// Run the hooks prior to restarting the task
|
||||
tr.killing()
|
||||
|
||||
// Tell the restart tracker that a restart triggered the exit
|
||||
tr.restartTracker.SetRestartTriggered(failure)
|
||||
|
||||
// Kill the task using an exponential backoff in-case of failures.
|
||||
destroySuccess, err := tr.handleDestroy(handle)
|
||||
if !destroySuccess {
|
||||
if err := tr.killTask(handle); err != nil {
|
||||
// We couldn't successfully destroy the resource created.
|
||||
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", err)
|
||||
}
|
||||
@@ -75,16 +78,17 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error
|
||||
tr.EmitEvent(event)
|
||||
|
||||
// Run the hooks prior to killing the task
|
||||
tr.kill()
|
||||
tr.killing()
|
||||
|
||||
// Tell the restart tracker that the task has been killed
|
||||
// Tell the restart tracker that the task has been killed so it doesn't
|
||||
// attempt to restart it.
|
||||
tr.restartTracker.SetKilled()
|
||||
|
||||
// Kill the task using an exponential backoff in-case of failures.
|
||||
destroySuccess, destroyErr := tr.handleDestroy(handle)
|
||||
if !destroySuccess {
|
||||
killErr := tr.killTask(handle)
|
||||
if killErr != nil {
|
||||
// We couldn't successfully destroy the resource created.
|
||||
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", destroyErr)
|
||||
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr)
|
||||
}
|
||||
|
||||
// Block until task has exited.
|
||||
@@ -103,10 +107,10 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error
|
||||
<-waitCh
|
||||
|
||||
// Store that the task has been destroyed and any associated error.
|
||||
tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr))
|
||||
tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(killErr))
|
||||
|
||||
if destroyErr != nil {
|
||||
return destroyErr
|
||||
if killErr != nil {
|
||||
return killErr
|
||||
} else if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
@@ -34,6 +35,7 @@ type serviceHook struct {
|
||||
logger log.Logger
|
||||
|
||||
// The following fields may be updated
|
||||
delay time.Duration
|
||||
driverExec tinterfaces.ScriptExecutor
|
||||
driverNet *cstructs.DriverNetwork
|
||||
canary bool
|
||||
@@ -53,6 +55,7 @@ func newServiceHook(c serviceHookConfig) *serviceHook {
|
||||
taskName: c.task.Name,
|
||||
services: c.task.Services,
|
||||
restarter: c.restarter,
|
||||
delay: c.task.ShutdownDelay,
|
||||
}
|
||||
|
||||
if res := c.alloc.TaskResources[c.task.Name]; res != nil {
|
||||
@@ -111,6 +114,7 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
|
||||
}
|
||||
|
||||
// Update service hook fields
|
||||
h.delay = task.ShutdownDelay
|
||||
h.taskEnv = req.TaskEnv
|
||||
h.services = task.Services
|
||||
h.networks = networks
|
||||
@@ -122,10 +126,35 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
|
||||
return h.consul.UpdateTask(oldTaskServices, newTaskServices)
|
||||
}
|
||||
|
||||
func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
|
||||
func (h *serviceHook) Killing(ctx context.Context, req *interfaces.TaskKillRequest, resp *interfaces.TaskKillResponse) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Deregister before killing task
|
||||
h.deregister()
|
||||
|
||||
// If there's no shutdown delay, exit early
|
||||
if h.delay == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
h.logger.Debug("waiting before killing task", "shutdown_delay", h.delay)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.After(h.delay):
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.deregister()
|
||||
return nil
|
||||
}
|
||||
|
||||
// deregister services from Consul.
|
||||
func (h *serviceHook) deregister() {
|
||||
taskServices := h.getTaskServices()
|
||||
h.consul.RemoveTask(taskServices)
|
||||
|
||||
@@ -134,7 +163,6 @@ func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *in
|
||||
taskServices.Canary = !taskServices.Canary
|
||||
h.consul.RemoveTask(taskServices)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *serviceHook) getTaskServices() *agentconsul.TaskServices {
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
)
|
||||
|
||||
// shutdownDelayHook delays shutting down a task between deregistering it from
|
||||
// Consul and actually killing it.
|
||||
type shutdownDelayHook struct {
|
||||
delay time.Duration
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func newShutdownDelayHook(delay time.Duration, logger log.Logger) *shutdownDelayHook {
|
||||
h := &shutdownDelayHook{
|
||||
delay: delay,
|
||||
}
|
||||
h.logger = logger.Named(h.Name())
|
||||
return h
|
||||
}
|
||||
|
||||
func (*shutdownDelayHook) Name() string {
|
||||
return "shutdown-delay"
|
||||
}
|
||||
|
||||
func (h *shutdownDelayHook) Kill(ctx context.Context, req *interfaces.TaskKillRequest, resp *interfaces.TaskKillResponse) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.After(h.delay):
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -479,58 +479,21 @@ func (tr *TaskRunner) runDriver() error {
|
||||
//TODO mounts and devices
|
||||
//XXX Evaluate and encode driver config
|
||||
|
||||
var handle *drivers.TaskHandle
|
||||
var net *cstructs.DriverNetwork
|
||||
var err error
|
||||
|
||||
// Check to see if a task handle was restored
|
||||
tr.localStateLock.RLock()
|
||||
handle = tr.localState.TaskHandle
|
||||
net = tr.localState.DriverNetwork
|
||||
tr.localStateLock.RUnlock()
|
||||
|
||||
if handle != nil {
|
||||
tr.logger.Trace("restored handle; recovering task", "task_id", handle.Config.ID)
|
||||
if err := tr.driver.RecoverTask(handle); err != nil {
|
||||
tr.logger.Error("error recovering task; destroying and restarting",
|
||||
"error", err, "task_id", handle.Config.ID)
|
||||
|
||||
// Clear invalid task state
|
||||
tr.localStateLock.Lock()
|
||||
tr.localState.TaskHandle = nil
|
||||
tr.localState.DriverNetwork = nil
|
||||
tr.localStateLock.Unlock()
|
||||
|
||||
// Try to cleanup any existing task state in the plugin before restarting
|
||||
if err := tr.driver.DestroyTask(handle.Config.ID, true); err != nil {
|
||||
// Ignore ErrTaskNotFound errors as ideally
|
||||
// this task has already been stopped and
|
||||
// therefore doesn't exist.
|
||||
if err != drivers.ErrTaskNotFound {
|
||||
tr.logger.Warn("error destroying unrecoverable task",
|
||||
"error", err, "task_id", handle.Config.ID)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
goto START
|
||||
}
|
||||
|
||||
// Update driver handle on task runner
|
||||
tr.setDriverHandle(NewDriverHandle(tr.driver, handle.Config.ID, tr.Task(), net))
|
||||
|
||||
// If there's already a task handle (eg from a Restore) there's nothing
|
||||
// to do except update state.
|
||||
if tr.getDriverHandle() != nil {
|
||||
// Ensure running state is persisted but do *not* append a new
|
||||
// task event as restoring is a client event and not relevant
|
||||
// to a task's lifecycle.
|
||||
if err := tr.updateStateImpl(structs.TaskStateRunning); err != nil {
|
||||
//TODO return error and destroy task to avoid an orphaned task?
|
||||
tr.logger.Warn("error persisting task state", "error", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
START:
|
||||
// Start the job if there's no existing handle (or if RecoverTask failed)
|
||||
handle, net, err = tr.driver.StartTask(taskConfig)
|
||||
handle, net, err := tr.driver.StartTask(taskConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("driver start failed: %v", err)
|
||||
}
|
||||
@@ -619,17 +582,17 @@ func (tr *TaskRunner) initDriver() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleDestroy kills the task handle. In the case that killing fails,
|
||||
// handleDestroy will retry with an exponential backoff and will give up at a
|
||||
// given limit. It returns whether the task was destroyed and the error
|
||||
// associated with the last kill attempt.
|
||||
func (tr *TaskRunner) handleDestroy(handle *DriverHandle) (destroyed bool, err error) {
|
||||
// killTask kills the task handle. In the case that killing fails,
|
||||
// killTask will retry with an exponential backoff and will give up at a
|
||||
// given limit. Returns an error if the task could not be killed.
|
||||
func (tr *TaskRunner) killTask(handle *DriverHandle) error {
|
||||
// Cap the number of times we attempt to kill the task.
|
||||
var err error
|
||||
for i := 0; i < killFailureLimit; i++ {
|
||||
if err = handle.Kill(); err != nil {
|
||||
if err == drivers.ErrTaskNotFound {
|
||||
tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID())
|
||||
return true, nil
|
||||
return nil
|
||||
}
|
||||
// Calculate the new backoff
|
||||
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
|
||||
@@ -641,10 +604,10 @@ func (tr *TaskRunner) handleDestroy(handle *DriverHandle) (destroyed bool, err e
|
||||
time.Sleep(backoff)
|
||||
} else {
|
||||
// Kill was successful
|
||||
return true, nil
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// persistLocalState persists local state to disk synchronously.
|
||||
@@ -685,13 +648,54 @@ func (tr *TaskRunner) Restore() error {
|
||||
ls.Canonicalize()
|
||||
tr.localState = ls
|
||||
}
|
||||
|
||||
if ts != nil {
|
||||
ts.Canonicalize()
|
||||
tr.state = ts
|
||||
}
|
||||
|
||||
// If a TaskHandle was persisted, ensure it is valid or destroy it.
|
||||
if taskHandle := tr.localState.TaskHandle; taskHandle != nil {
|
||||
//TODO if RecoverTask returned the DriverNetwork we wouldn't
|
||||
// have to persist it at all!
|
||||
tr.restoreHandle(taskHandle, tr.localState.DriverNetwork)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreHandle ensures a TaskHandle is valid by calling Driver.RecoverTask
|
||||
// and sets the driver handle. If the TaskHandle is not valid, DestroyTask is
|
||||
// called.
|
||||
func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstructs.DriverNetwork) {
|
||||
// Ensure handle is well-formed
|
||||
if taskHandle.Config == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := tr.driver.RecoverTask(taskHandle); err != nil {
|
||||
tr.logger.Error("error recovering task; destroying and restarting",
|
||||
"error", err, "task_id", taskHandle.Config.ID)
|
||||
|
||||
// Try to cleanup any existing task state in the plugin before restarting
|
||||
if err := tr.driver.DestroyTask(taskHandle.Config.ID, true); err != nil {
|
||||
// Ignore ErrTaskNotFound errors as ideally
|
||||
// this task has already been stopped and
|
||||
// therefore doesn't exist.
|
||||
if err != drivers.ErrTaskNotFound {
|
||||
tr.logger.Warn("error destroying unrecoverable task",
|
||||
"error", err, "task_id", taskHandle.Config.ID)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Update driver handle on task runner
|
||||
tr.setDriverHandle(NewDriverHandle(tr.driver, taskHandle.Config.ID, tr.Task(), net))
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateState sets the task runners allocation state and triggers a server
|
||||
// update.
|
||||
func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) {
|
||||
|
||||
@@ -26,7 +26,6 @@ func (tr *TaskRunner) initHooks() {
|
||||
newLogMonHook(tr.logmonHookConfig, hookLogger),
|
||||
newDispatchHook(tr.Alloc(), hookLogger),
|
||||
newArtifactHook(tr, hookLogger),
|
||||
newShutdownDelayHook(task.ShutdownDelay, hookLogger),
|
||||
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
|
||||
}
|
||||
|
||||
@@ -349,8 +348,8 @@ func (tr *TaskRunner) updateHooks() {
|
||||
}
|
||||
}
|
||||
|
||||
// kill is used to run the runners kill hooks.
|
||||
func (tr *TaskRunner) kill() {
|
||||
// killing is used to run the runners kill hooks.
|
||||
func (tr *TaskRunner) killing() {
|
||||
if tr.logger.IsTrace() {
|
||||
start := time.Now()
|
||||
tr.logger.Trace("running kill hooks", "start", start)
|
||||
@@ -378,7 +377,7 @@ func (tr *TaskRunner) kill() {
|
||||
// Run the update hook
|
||||
req := interfaces.TaskKillRequest{}
|
||||
var resp interfaces.TaskKillResponse
|
||||
if err := upd.Kill(context.Background(), &req, &resp); err != nil {
|
||||
if err := upd.Killing(context.Background(), &req, &resp); err != nil {
|
||||
tr.logger.Error("kill hook failed", "name", name, "error", err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user