client: fix batch job drain behavior (#26025)

Batch job allocations that are drained from a node will be moved
to an eligible node. However, when no eligible nodes are available
to place the draining allocations, the tasks will end up being
complete and will not be placed when an eligible node becomes
available. This occurs because the drained allocations are
simultaneously stopped on the draining node while attempting to
be placed on an eligible node. The stopping of the allocations on
the draining node result in tasks being killed, but importantly this
kill does not fail the task. The result is tasks reporting as complete
due to their state being dead and not being failed. As such, when an
eligible node becomes available, all tasks will show as complete and
no allocations will need to be placed.

To prevent the behavior described above a check is performed when
the alloc runner kills its tasks. If the allocation's job type is
batch, and the allocation has a desired transition of migrate, the
task will be failed when it is killed. This ensures the task does
not report as complete, and when an eligible node becomes available
the allocations are placed as expected.
This commit is contained in:
Chris Roberts
2025-06-13 08:28:31 -07:00
committed by GitHub
parent 42b024db4d
commit dfa07e10ed
3 changed files with 184 additions and 6 deletions

3
.changelog/26025.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
client: Fixed bug where drained batch jobs would not be rescheduled if no eligible nodes were immediately available
```

View File

@@ -729,14 +729,35 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
// run alloc prekill hooks
ar.preKillHooks()
// generate task event for given task runner
taskEventFn := func(tr *taskrunner.TaskRunner) (te *structs.TaskEvent) {
te = structs.NewTaskEvent(structs.TaskKilling).
SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout)
// if the task is not set failed, the task has not finished,
// the job type is batch, and the allocation is being migrated
// then mark the task as failed. this ensures the task is recreated
// if no eligible nodes are immediately available.
if !tr.TaskState().Failed &&
tr.TaskState().FinishedAt.IsZero() &&
ar.alloc.Job.Type == structs.JobTypeBatch &&
ar.alloc.DesiredTransition.Migrate != nil &&
*ar.alloc.DesiredTransition.Migrate {
ar.logger.Trace("marking migrating batch job task failed on kill", "task_name", tr.Task().Name)
te.SetFailsTask()
}
return
}
// Kill leader first, synchronously
for name, tr := range ar.tasks {
if !tr.IsLeader() {
continue
}
taskEvent := structs.NewTaskEvent(structs.TaskKilling)
taskEvent.SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout)
taskEvent := taskEventFn(tr)
err := tr.Kill(context.TODO(), taskEvent)
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
@@ -758,8 +779,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
wg.Add(1)
go func(name string, tr *taskrunner.TaskRunner) {
defer wg.Done()
taskEvent := structs.NewTaskEvent(structs.TaskKilling)
taskEvent.SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout)
taskEvent := taskEventFn(tr)
err := tr.Kill(context.TODO(), taskEvent)
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
@@ -782,8 +803,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
wg.Add(1)
go func(name string, tr *taskrunner.TaskRunner) {
defer wg.Done()
taskEvent := structs.NewTaskEvent(structs.TaskKilling)
taskEvent.SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout)
taskEvent := taskEventFn(tr)
err := tr.Kill(context.TODO(), taskEvent)
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping sidecar task", "error", err, "task_name", name)

View File

@@ -1804,6 +1804,160 @@ func TestAllocRunner_HandlesArtifactFailure(t *testing.T) {
require.True(t, state.TaskStates["bad"].Failed)
}
// Test that alloc runner kills tasks in task group when stopping and
// fails tasks when job is batch job type and migrating
func TestAllocRunner_Migrate_Batch_KillTG(t *testing.T) {
ci.Parallel(t)
alloc := mock.BatchAlloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "10s"
alloc.AllocatedResources.Tasks[task.Name] = tr
task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "task 2"
task2.Driver = "mock_driver"
task2.Config["run_for"] = "1ms"
alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
alloc.AllocatedResources.Tasks[task2.Name] = tr
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
must.NoError(t, err)
defer destroy(ar)
go ar.Run()
upd := conf.StateUpdater.(*MockStateUpdater)
// Wait for running
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
// Wait for completed task
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
// task should not have finished yet, task2 should be finished
if !last.TaskStates[task.Name].FinishedAt.IsZero() {
return false, fmt.Errorf("task should not be finished")
}
if last.TaskStates[task2.Name].FinishedAt.IsZero() {
return false, fmt.Errorf("task should be finished")
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
update := ar.Alloc().Copy()
migrate := true
update.DesiredTransition.Migrate = &migrate
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusFailed {
return false, fmt.Errorf("got client status %q; want %q", last.ClientStatus, structs.AllocClientStatusFailed)
}
// task should be failed since it was killed, task2 should not
// be failed since it was already completed
if !last.TaskStates[task.Name].Failed {
return false, fmt.Errorf("task should be failed")
}
if last.TaskStates[task2.Name].Failed {
return false, fmt.Errorf("task should not be failed")
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
}
// Test that alloc runner kills tasks in task group when stopping and
// does not fail tasks when job is batch job type and not migrating
func TestAllocRunner_Batch_KillTG(t *testing.T) {
ci.Parallel(t)
alloc := mock.BatchAlloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "10s"
alloc.AllocatedResources.Tasks[task.Name] = tr
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
must.NoError(t, err)
defer destroy(ar)
go ar.Run()
upd := conf.StateUpdater.(*MockStateUpdater)
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
update := ar.Alloc().Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got client status %q; want %q", last.ClientStatus, structs.AllocClientStatusComplete)
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
}
// Test that alloc runner kills tasks in task group when another task fails
func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
ci.Parallel(t)