lifecycle: add poststop hook (#8194)

This commit is contained in:
Jasmine Dahilig
2020-11-12 08:01:42 -08:00
committed by GitHub
parent 34121b3825
commit b85cce42fe
12 changed files with 280 additions and 15 deletions

View File

@@ -633,6 +633,7 @@ type DispatchPayloadConfig struct {
const (
TaskLifecycleHookPrestart = "prestart"
TaskLifecycleHookPoststart = "poststart"
TaskLifecycleHookPoststop = "poststop"
)
type TaskLifecycle struct {

View File

@@ -353,12 +353,26 @@ func (ar *allocRunner) shouldRun() bool {
// runTasks is used to run the task runners and block until they exit.
func (ar *allocRunner) runTasks() {
// Start all tasks
for _, task := range ar.tasks {
go task.Run()
}
// Block on all tasks except poststop tasks
for _, task := range ar.tasks {
<-task.WaitCh()
if !task.IsPoststopTask() {
<-task.WaitCh()
}
}
// Signal poststop tasks to proceed to main runtime
ar.taskHookCoordinator.StartPoststopTasks()
// Wait for poststop tasks to finish before proceeding
for _, task := range ar.tasks {
if task.IsPoststopTask() {
<-task.WaitCh()
}
}
}
@@ -485,6 +499,10 @@ func (ar *allocRunner) handleTaskStateUpdates() {
state := tr.TaskState()
states[name] = state
if tr.IsPoststopTask() {
continue
}
// Capture live task runners in case we need to kill them
if state.State != structs.TaskStateDead {
liveRunners = append(liveRunners, tr)
@@ -535,6 +553,7 @@ func (ar *allocRunner) handleTaskStateUpdates() {
// prevent looping before TaskRunners have transitioned
// to Dead.
for _, tr := range liveRunners {
ar.logger.Info("killing task: ", tr.Task().Name)
select {
case <-tr.WaitCh():
case <-ar.waitCh:
@@ -586,7 +605,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
// Kill the rest concurrently
wg := sync.WaitGroup{}
for name, tr := range ar.tasks {
if tr.IsLeader() {
// Filter out poststop tasks so they run after all the other tasks are killed
if tr.IsLeader() || tr.IsPoststopTask() {
continue
}

View File

@@ -394,6 +394,89 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
})
}
// TestAllocRunner_Lifecycle_Poststop asserts that a service job with 1
// postop lifecycle hook starts all 3 tasks, only
// the ephemeral one finishes, and the other 2 exit when the alloc is stopped.
func TestAllocRunner_Lifecycle_Poststop(t *testing.T) {
alloc := mock.LifecycleAlloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.Type = structs.JobTypeService
mainTask := alloc.Job.TaskGroups[0].Tasks[0]
mainTask.Config["run_for"] = "100s"
ephemeralTask := alloc.Job.TaskGroups[0].Tasks[1]
ephemeralTask.Name = "quit"
ephemeralTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststop
ephemeralTask.Config["run_for"] = "10s"
alloc.Job.TaskGroups[0].Tasks = []*structs.Task{mainTask, ephemeralTask}
alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
mainTask.Name: tr,
ephemeralTask.Name: tr,
}
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()
upd := conf.StateUpdater.(*MockStateUpdater)
// Wait for main task to be running
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}
if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected main task to be running not %s", s)
}
if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStatePending {
return false, fmt.Errorf("expected ephemeral task to be pending not %s", s)
}
return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})
// Tell the alloc to stop
stopAlloc := alloc.Copy()
stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(stopAlloc)
// Wait for main task to die & poststop task to run.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}
if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected main task to be dead not %s", s)
}
if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected poststop task to be running not %s", s)
}
return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})
}
func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
t.Parallel()

View File

@@ -8,22 +8,29 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// TaskHookCoordinator helps coordinate when main start tasks can launch
// TaskHookCoordinator helps coordinate when mainTasks start tasks can launch
// namely after all Prestart Tasks have run, and after all BlockUntilCompleted have completed
type taskHookCoordinator struct {
logger hclog.Logger
// constant for quickly starting all prestart tasks
closedCh chan struct{}
// Each context is used to gate task runners launching the tasks. A task
// runner waits until the context associated its lifecycle context is
// done/cancelled.
mainTaskCtx context.Context
mainTaskCtxCancel func()
poststartTaskCtx context.Context
poststartTaskCtxCancel func()
poststopTaskCtx context.Context
poststopTaskCtxCancel context.CancelFunc
prestartSidecar map[string]struct{}
prestartEphemeral map[string]struct{}
mainTasksPending map[string]struct{}
mainTasksRunning map[string]struct{} // poststop: main tasks running -> finished
mainTasksPending map[string]struct{} // poststart: main tasks pending -> running
}
func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHookCoordinator {
@@ -32,6 +39,7 @@ func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHoo
mainTaskCtx, mainCancelFn := context.WithCancel(context.Background())
poststartTaskCtx, poststartCancelFn := context.WithCancel(context.Background())
poststopTaskCtx, poststopTaskCancelFn := context.WithCancel(context.Background())
c := &taskHookCoordinator{
logger: logger,
@@ -40,9 +48,12 @@ func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHoo
mainTaskCtxCancel: mainCancelFn,
prestartSidecar: map[string]struct{}{},
prestartEphemeral: map[string]struct{}{},
mainTasksRunning: map[string]struct{}{},
mainTasksPending: map[string]struct{}{},
poststartTaskCtx: poststartTaskCtx,
poststartTaskCtxCancel: poststartCancelFn,
poststopTaskCtx: poststopTaskCtx,
poststopTaskCtxCancel: poststopTaskCancelFn,
}
c.setTasks(tasks)
return c
@@ -53,6 +64,7 @@ func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {
if task.Lifecycle == nil {
c.mainTasksPending[task.Name] = struct{}{}
c.mainTasksRunning[task.Name] = struct{}{}
continue
}
@@ -65,6 +77,8 @@ func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {
}
case structs.TaskLifecycleHookPoststart:
// Poststart hooks don't need to be tracked.
case structs.TaskLifecycleHookPoststop:
// Poststop hooks don't need to be tracked.
default:
c.logger.Error("invalid lifecycle hook", "task", task.Name, "hook", task.Lifecycle.Hook)
}
@@ -79,6 +93,10 @@ func (c *taskHookCoordinator) hasPrestartTasks() bool {
return len(c.prestartSidecar)+len(c.prestartEphemeral) > 0
}
func (c *taskHookCoordinator) hasRunningMainTasks() bool {
return len(c.mainTasksRunning) > 0
}
func (c *taskHookCoordinator) hasPendingMainTasks() bool {
return len(c.mainTasksPending) > 0
}
@@ -94,7 +112,11 @@ func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan s
return c.closedCh
case structs.TaskLifecycleHookPoststart:
return c.poststartTaskCtx.Done()
case structs.TaskLifecycleHookPoststop:
return c.poststopTaskCtx.Done()
default:
// it should never have a lifecycle stanza w/o a hook, so report an error but allow the task to start normally
c.logger.Error("invalid lifecycle hook", "task", task.Name, "hook", task.Lifecycle.Hook)
return c.mainTaskCtx.Done()
}
}
@@ -119,6 +141,16 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
delete(c.prestartEphemeral, task)
}
for task := range c.mainTasksRunning {
st := states[task]
if st == nil || st.State != structs.TaskStateDead {
continue
}
delete(c.mainTasksRunning, task)
}
for task := range c.mainTasksPending {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
@@ -128,7 +160,6 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
delete(c.mainTasksPending, task)
}
// everything well
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}
@@ -136,6 +167,13 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
if !c.hasPendingMainTasks() {
c.poststartTaskCtxCancel()
}
if !c.hasRunningMainTasks() {
c.poststopTaskCtxCancel()
}
}
func (c *taskHookCoordinator) StartPoststopTasks() {
c.poststopTaskCtxCancel()
}
// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks

View File

@@ -34,6 +34,11 @@ func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *struc
onSuccess = tlc.Sidecar
}
// Poststop should never be restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPoststop {
onSuccess = false
}
return &RestartTracker{
startTime: time.Now(),
onSuccess: onSuccess,

View File

@@ -495,6 +495,7 @@ func (tr *TaskRunner) Run() {
select {
case <-tr.startConditionMetCtx:
tr.logger.Debug("lifecycle start condition has been met, proceeding")
// yay proceed
case <-tr.killCtx.Done():
case <-tr.shutdownCtx.Done():
@@ -502,7 +503,7 @@ func (tr *TaskRunner) Run() {
}
MAIN:
for !tr.Alloc().TerminalStatus() {
for !tr.shouldShutdown() {
select {
case <-tr.killCtx.Done():
break MAIN
@@ -625,6 +626,18 @@ MAIN:
tr.logger.Debug("task run loop exiting")
}
func (tr *TaskRunner) shouldShutdown() bool {
if tr.alloc.ClientTerminalStatus() {
return true
}
if !tr.IsPoststopTask() && tr.alloc.ServerTerminalStatus() {
return true
}
return false
}
// handleTaskExitResult handles the results returned by the task exiting. If
// retryWait is true, the caller should attempt to wait on the task again since
// it has not actually finished running. This can happen if the driver plugin

View File

@@ -28,6 +28,11 @@ func (tr *TaskRunner) IsLeader() bool {
return tr.taskLeader
}
// IsPoststopTask returns true if this task is a poststop task in its task group.
func (tr *TaskRunner) IsPoststopTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop
}
func (tr *TaskRunner) Task() *structs.Task {
tr.taskLock.RLock()
defer tr.taskLock.RUnlock()

View File

@@ -165,8 +165,7 @@ func (tr *TaskRunner) emitHookError(err error, hookName string) {
func (tr *TaskRunner) prestart() error {
// Determine if the allocation is terminal and we should avoid running
// prestart hooks.
alloc := tr.Alloc()
if alloc.TerminalStatus() {
if tr.shouldShutdown() {
tr.logger.Trace("skipping prestart hooks since allocation is terminal")
return nil
}

View File

@@ -111,8 +111,7 @@ touch ${NOMAD_ALLOC_DIR}/poststart-ran
touch ${NOMAD_ALLOC_DIR}/poststart-running
touch ${NOMAD_ALLOC_DIR}/poststart-started
sleep 10
# THIS IS WHERE THE ACTUAL TESTING HAPPENS
# IF init-ran doesn't exist, then the init task hasn't run yet, so fail
if [ ! -f ${NOMAD_ALLOC_DIR}/init-ran ]; then exit 12; fi
if [ ! -f ${NOMAD_ALLOC_DIR}/main-started ]; then exit 15; fi
if [ -f ${NOMAD_ALLOC_DIR}/init-running ]; then exit 14; fi
@@ -128,5 +127,43 @@ EOT
}
}
task "poststop" {
lifecycle {
hook = "poststop"
}
driver = "docker"
config {
image = "busybox:1"
command = "/bin/sh"
args = ["local/poststop.sh"]
}
template {
data = <<EOT
#!/bin/sh
sleep 1
touch ${NOMAD_ALLOC_DIR}/poststop-ran
touch ${NOMAD_ALLOC_DIR}/poststop-running
touch ${NOMAD_ALLOC_DIR}/poststop-started
sleep 5
if [ ! -f ${NOMAD_ALLOC_DIR}/init-ran ]; then exit 12; fi
if [ ! -f ${NOMAD_ALLOC_DIR}/main-started ]; then exit 15; fi
if [ -f ${NOMAD_ALLOC_DIR}/init-running ]; then exit 14; fi
if [ -f ${NOMAD_ALLOC_DIR}/main-running ]; then exit 17; fi
rm ${NOMAD_ALLOC_DIR}/poststop-running
EOT
destination = "local/poststop.sh"
}
resources {
cpu = 64
memory = 64
}
}
}
}

View File

@@ -101,6 +101,13 @@ EOT
template {
data = <<EOT
#!/bin/sh
function cleanup() {
echo stopping
rm ${NOMAD_ALLOC_DIR}/main-running
exit
}
touch ${NOMAD_ALLOC_DIR}/main-ran
touch ${NOMAD_ALLOC_DIR}/main-running
touch ${NOMAD_ALLOC_DIR}/main-started
@@ -111,7 +118,17 @@ if [ ! -f ${NOMAD_ALLOC_DIR}/sidecar-running ]; then exit 14; fi
sleep 2
if [ ! -f ${NOMAD_ALLOC_DIR}/poststart-started ]; then exit 15; fi
touch ${NOMAD_ALLOC_DIR}/main-checked
sleep 300
echo trap
trap cleanup SIGINT
echo sleep
while true
do
sleep 1
done
EOT
destination = "local/main.sh"
@@ -161,5 +178,44 @@ EOT
}
}
task "poststop" {
lifecycle {
hook = "poststop"
}
driver = "docker"
config {
image = "busybox:1"
command = "/bin/sh"
args = ["local/poststop.sh"]
}
template {
data = <<EOT
#!/bin/sh
sleep 1
touch ${NOMAD_ALLOC_DIR}/poststop-ran
touch ${NOMAD_ALLOC_DIR}/poststop-running
touch ${NOMAD_ALLOC_DIR}/poststop-started
sleep 5
if [ ! -f ${NOMAD_ALLOC_DIR}/init-ran ]; then exit 12; fi
if [ ! -f ${NOMAD_ALLOC_DIR}/main-started ]; then exit 15; fi
if [ -f ${NOMAD_ALLOC_DIR}/init-running ]; then exit 14; fi
if [ -f ${NOMAD_ALLOC_DIR}/main-running ]; then exit 17; fi
rm ${NOMAD_ALLOC_DIR}/poststop-running
EOT
destination = "local/poststop.sh"
}
resources {
cpu = 64
memory = 64
}
}
}
}

View File

@@ -55,7 +55,7 @@ func (tc *LifecycleE2ETest) TestBatchJob(f *framework.F) {
afi, _, err := nomadClient.AllocFS().List(alloc, "alloc", nil)
require.NoError(err)
expected := map[string]bool{
"init-ran": true, "main-ran": true, "poststart-ran": true,
"init-ran": true, "main-ran": true, "poststart-ran": true, "poststop-ran": true,
"init-running": false, "main-running": false, "poststart-running": false}
got := checkFiles(expected, afi)
require.Equal(expected, got)
@@ -115,13 +115,15 @@ func (tc *LifecycleE2ETest) TestServiceJob(f *framework.F) {
require.NoError(err)
e2eutil.WaitForAllocStopped(t, nomadClient, allocID)
require.False(alloc.TaskStates["poststop"].Failed)
// assert the files were written as expected
afi, _, err := nomadClient.AllocFS().List(alloc, "alloc", nil)
require.NoError(err)
expected := map[string]bool{
"init-ran": true, "sidecar-ran": true, "main-ran": true, "poststart-ran": true,
"poststart-started": true, "main-started": true,
"init-running": false, "poststart-running": false,
"init-ran": true, "sidecar-ran": true, "main-ran": true, "poststart-ran": true, "poststop-ran": true,
"poststart-started": true, "main-started": true, "poststop-started": true,
"init-running": false, "poststart-running": false, "poststop-running": false,
"main-checked": true}
got := checkFiles(expected, afi)
require.Equal(expected, got)

View File

@@ -3348,6 +3348,7 @@ func (a *AllocatedResources) Comparable() *ComparableResources {
prestartSidecarTasks := &AllocatedTaskResources{}
prestartEphemeralTasks := &AllocatedTaskResources{}
main := &AllocatedTaskResources{}
poststopTasks := &AllocatedTaskResources{}
for taskName, r := range a.Tasks {
lc := a.TaskLifecycles[taskName]
@@ -3359,11 +3360,14 @@ func (a *AllocatedResources) Comparable() *ComparableResources {
} else {
prestartEphemeralTasks.Add(r)
}
} else if lc.Hook == TaskLifecycleHookPoststop {
poststopTasks.Add(r)
}
}
// update this loop to account for lifecycle hook
prestartEphemeralTasks.Max(main)
prestartEphemeralTasks.Max(poststopTasks)
prestartSidecarTasks.Add(prestartEphemeralTasks)
c.Flattened.Add(prestartSidecarTasks)
@@ -5105,6 +5109,7 @@ func (d *DispatchPayloadConfig) Validate() error {
const (
TaskLifecycleHookPrestart = "prestart"
TaskLifecycleHookPoststart = "poststart"
TaskLifecycleHookPoststop = "poststop"
)
type TaskLifecycleConfig struct {
@@ -5129,6 +5134,7 @@ func (d *TaskLifecycleConfig) Validate() error {
switch d.Hook {
case TaskLifecycleHookPrestart:
case TaskLifecycleHookPoststart:
case TaskLifecycleHookPoststop:
case "":
return fmt.Errorf("no lifecycle hook provided")
default: