diff --git a/.changelog/26025.txt b/.changelog/26025.txt new file mode 100644 index 000000000..3496f30d7 --- /dev/null +++ b/.changelog/26025.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: Fixed bug where drained batch jobs would not be rescheduled if no eligible nodes were immediately available +``` diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 478cff439..5bebf0018 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -729,14 +729,35 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { // run alloc prekill hooks ar.preKillHooks() + // generate task event for given task runner + taskEventFn := func(tr *taskrunner.TaskRunner) (te *structs.TaskEvent) { + te = structs.NewTaskEvent(structs.TaskKilling). + SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout) + + // if the task is not set failed, the task has not finished, + // the job type is batch, and the allocation is being migrated + // then mark the task as failed. this ensures the task is recreated + // if no eligible nodes are immediately available. + if !tr.TaskState().Failed && + tr.TaskState().FinishedAt.IsZero() && + ar.alloc.Job.Type == structs.JobTypeBatch && + ar.alloc.DesiredTransition.Migrate != nil && + *ar.alloc.DesiredTransition.Migrate { + + ar.logger.Trace("marking migrating batch job task failed on kill", "task_name", tr.Task().Name) + te.SetFailsTask() + } + return + } + // Kill leader first, synchronously for name, tr := range ar.tasks { if !tr.IsLeader() { continue } - taskEvent := structs.NewTaskEvent(structs.TaskKilling) - taskEvent.SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout) + taskEvent := taskEventFn(tr) + err := tr.Kill(context.TODO(), taskEvent) if err != nil && err != taskrunner.ErrTaskNotRunning { ar.logger.Warn("error stopping leader task", "error", err, "task_name", name) @@ -758,8 +779,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { wg.Add(1) go func(name string, tr *taskrunner.TaskRunner) { defer wg.Done() - taskEvent := structs.NewTaskEvent(structs.TaskKilling) - taskEvent.SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout) + taskEvent := taskEventFn(tr) + err := tr.Kill(context.TODO(), taskEvent) if err != nil && err != taskrunner.ErrTaskNotRunning { ar.logger.Warn("error stopping task", "error", err, "task_name", name) @@ -782,8 +803,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { wg.Add(1) go func(name string, tr *taskrunner.TaskRunner) { defer wg.Done() - taskEvent := structs.NewTaskEvent(structs.TaskKilling) - taskEvent.SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout) + taskEvent := taskEventFn(tr) + err := tr.Kill(context.TODO(), taskEvent) if err != nil && err != taskrunner.ErrTaskNotRunning { ar.logger.Warn("error stopping sidecar task", "error", err, "task_name", name) diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index af9679854..a2101d364 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1804,6 +1804,160 @@ func TestAllocRunner_HandlesArtifactFailure(t *testing.T) { require.True(t, state.TaskStates["bad"].Failed) } +// Test that alloc runner kills tasks in task group when stopping and +// fails tasks when job is batch job type and migrating +func TestAllocRunner_Migrate_Batch_KillTG(t *testing.T) { + ci.Parallel(t) + + alloc := mock.BatchAlloc() + tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] + alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 + alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 + + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config["run_for"] = "10s" + alloc.AllocatedResources.Tasks[task.Name] = tr + + task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "task 2" + task2.Driver = "mock_driver" + task2.Config["run_for"] = "1ms" + alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2) + alloc.AllocatedResources.Tasks[task2.Name] = tr + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + must.NoError(t, err) + + defer destroy(ar) + go ar.Run() + upd := conf.StateUpdater.(*MockStateUpdater) + + // Wait for running + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) + + // Wait for completed task + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + + // task should not have finished yet, task2 should be finished + if !last.TaskStates[task.Name].FinishedAt.IsZero() { + return false, fmt.Errorf("task should not be finished") + } + if last.TaskStates[task2.Name].FinishedAt.IsZero() { + return false, fmt.Errorf("task should be finished") + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) + + update := ar.Alloc().Copy() + migrate := true + update.DesiredTransition.Migrate = &migrate + update.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(update) + + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + + if last.ClientStatus != structs.AllocClientStatusFailed { + return false, fmt.Errorf("got client status %q; want %q", last.ClientStatus, structs.AllocClientStatusFailed) + } + + // task should be failed since it was killed, task2 should not + // be failed since it was already completed + if !last.TaskStates[task.Name].Failed { + return false, fmt.Errorf("task should be failed") + } + if last.TaskStates[task2.Name].Failed { + return false, fmt.Errorf("task should not be failed") + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) +} + +// Test that alloc runner kills tasks in task group when stopping and +// does not fail tasks when job is batch job type and not migrating +func TestAllocRunner_Batch_KillTG(t *testing.T) { + ci.Parallel(t) + + alloc := mock.BatchAlloc() + tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] + alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 + alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 + + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config["run_for"] = "10s" + alloc.AllocatedResources.Tasks[task.Name] = tr + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + must.NoError(t, err) + + defer destroy(ar) + go ar.Run() + upd := conf.StateUpdater.(*MockStateUpdater) + + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) + + update := ar.Alloc().Copy() + update.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(update) + + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + + if last.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("got client status %q; want %q", last.ClientStatus, structs.AllocClientStatusComplete) + } + + return true, nil + }, func(err error) { + must.NoError(t, err) + }) +} + // Test that alloc runner kills tasks in task group when another task fails func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { ci.Parallel(t)