mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
Merge pull request #5890 from hashicorp/b-dont-start-completed-allocs-2
task runner to avoid running task if terminal
This commit is contained in:
@@ -131,3 +131,87 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
|
||||
require.Equal(t, events[2].Type, structs.TaskStarted)
|
||||
require.Equal(t, events[3].Type, structs.TaskTerminated)
|
||||
}
|
||||
|
||||
// TestAllocRunner_Restore_CompletedBatch asserts that restoring a completed
|
||||
// batch alloc doesn't run it again
|
||||
func TestAllocRunner_Restore_CompletedBatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// 1. Run task and wait for it to complete
|
||||
// 2. Start new alloc runner
|
||||
// 3. Assert task didn't run again
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job.Type = structs.JobTypeBatch
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "2ms",
|
||||
}
|
||||
|
||||
conf, cleanup := testAllocRunnerConfig(t, alloc.Copy())
|
||||
defer cleanup()
|
||||
|
||||
// Maintain state for subsequent run
|
||||
conf.StateDB = state.NewMemDB(conf.Logger)
|
||||
|
||||
// Start and wait for task to be running
|
||||
ar, err := NewAllocRunner(conf)
|
||||
require.NoError(t, err)
|
||||
go ar.Run()
|
||||
defer destroy(ar)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
s := ar.AllocState()
|
||||
if s.ClientStatus != structs.AllocClientStatusComplete {
|
||||
return false, fmt.Errorf("expected complete, got %s", s.ClientStatus)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
// once job finishes, it shouldn't run again
|
||||
require.False(t, ar.shouldRun())
|
||||
initialRunEvents := ar.AllocState().TaskStates[task.Name].Events
|
||||
require.Len(t, initialRunEvents, 4)
|
||||
|
||||
ls, ts, err := conf.StateDB.GetTaskRunnerState(alloc.ID, task.Name)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ls)
|
||||
require.Equal(t, structs.TaskStateDead, ts.State)
|
||||
|
||||
// Start a new alloc runner and assert it gets stopped
|
||||
conf2, cleanup2 := testAllocRunnerConfig(t, alloc)
|
||||
defer cleanup2()
|
||||
|
||||
// Use original statedb to maintain hook state
|
||||
conf2.StateDB = conf.StateDB
|
||||
|
||||
// Restore, start, and wait for task to be killed
|
||||
ar2, err := NewAllocRunner(conf2)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, ar2.Restore())
|
||||
|
||||
go ar2.Run()
|
||||
defer destroy(ar2)
|
||||
|
||||
// AR waitCh must be closed even when task doesn't run again
|
||||
select {
|
||||
case <-ar2.WaitCh():
|
||||
case <-time.After(10 * time.Second):
|
||||
require.Fail(t, "alloc.waitCh wasn't closed")
|
||||
}
|
||||
|
||||
// TR waitCh must be closed too!
|
||||
select {
|
||||
case <-ar2.tasks[task.Name].WaitCh():
|
||||
case <-time.After(10 * time.Second):
|
||||
require.Fail(t, "tr.waitCh wasn't closed")
|
||||
}
|
||||
|
||||
// Assert that events are unmodified, which they would if task re-run
|
||||
events := ar2.AllocState().TaskStates[task.Name].Events
|
||||
require.Equal(t, initialRunEvents, events)
|
||||
}
|
||||
|
||||
@@ -394,6 +394,24 @@ func (tr *TaskRunner) Run() {
|
||||
defer close(tr.waitCh)
|
||||
var result *drivers.ExitResult
|
||||
|
||||
tr.stateLock.RLock()
|
||||
dead := tr.state.State == structs.TaskStateDead
|
||||
tr.stateLock.RUnlock()
|
||||
|
||||
// if restoring a dead task, ensure that task is cleared and all post hooks
|
||||
// are called without additional state updates
|
||||
if dead {
|
||||
// do cleanup functions without emitting any additional events/work
|
||||
// to handle cases where we restored a dead task where client terminated
|
||||
// after task finished before completing post-run actions.
|
||||
tr.clearDriverHandle()
|
||||
tr.stateUpdater.TaskStateUpdated()
|
||||
if err := tr.stop(); err != nil {
|
||||
tr.logger.Error("stop failed on terminal task", "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Updates are handled asynchronously with the other hooks but each
|
||||
// triggered update - whether due to alloc updates or a new vault token
|
||||
// - should be handled serially.
|
||||
@@ -899,7 +917,7 @@ func (tr *TaskRunner) Restore() error {
|
||||
}
|
||||
|
||||
alloc := tr.Alloc()
|
||||
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
|
||||
if tr.state.State == structs.TaskStateDead || alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user