mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
Add optional shutdown delay to tasks
Fixes #2441 Defaults to 0 (no delay) for backward compat and because this feature should be opt-in.
This commit is contained in:
@@ -292,6 +292,7 @@ type Task struct {
|
||||
Templates []*Template
|
||||
DispatchPayload *DispatchPayloadConfig
|
||||
Leader bool
|
||||
ShutdownDelay time.Duration `mapstructure:"shutdown_delay"`
|
||||
}
|
||||
|
||||
func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {
|
||||
|
||||
@@ -1171,6 +1171,12 @@ func (r *TaskRunner) run() {
|
||||
interpTask := interpolateServices(r.envBuilder.Build(), r.task)
|
||||
r.consul.RemoveTask(r.alloc.ID, interpTask)
|
||||
|
||||
// Delay actually killing the task if configured. See #244
|
||||
if r.task.ShutdownDelay > 0 {
|
||||
r.logger.Printf("[INFO] client: delaying shutdown of alloc %q task %q for %q", r.alloc.ID, r.task.Name, r.task.ShutdownDelay)
|
||||
<-time.After(r.task.ShutdownDelay)
|
||||
}
|
||||
|
||||
// Store the task event that provides context on the task
|
||||
// destroy. The Killed event is set from the alloc_runner and
|
||||
// doesn't add detail
|
||||
|
||||
@@ -1614,3 +1614,67 @@ func TestTaskRunner_Pre06ScriptCheck(t *testing.T) {
|
||||
t.Run(run("0.5.6", "java", "tcp", false))
|
||||
t.Run(run("0.5.6", "mock_driver", "tcp", false))
|
||||
}
|
||||
|
||||
func TestTaskRunner_ShutdownDelay(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "1000s",
|
||||
}
|
||||
|
||||
// No shutdown escape hatch for this delay, so don't set it too high
|
||||
task.ShutdownDelay = 5 * time.Second
|
||||
|
||||
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
||||
ctx.tr.MarkReceived()
|
||||
go ctx.tr.Run()
|
||||
defer ctx.Cleanup()
|
||||
|
||||
// Wait for the task to start
|
||||
testWaitForTaskToStart(t, ctx)
|
||||
|
||||
// Begin the tear down
|
||||
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
destroyed := time.Now()
|
||||
|
||||
// Service should get removed quickly; loop until RemoveTask is called
|
||||
found := false
|
||||
mockConsul := ctx.tr.consul.(*mockConsulServiceClient)
|
||||
deadline := destroyed.Add(task.ShutdownDelay)
|
||||
for time.Now().Before(deadline) {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
mockConsul.mu.Lock()
|
||||
n := len(mockConsul.ops)
|
||||
if n < 2 {
|
||||
mockConsul.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
lastOp := mockConsul.ops[n-1].op
|
||||
mockConsul.mu.Unlock()
|
||||
|
||||
if lastOp == "remove" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("task was not removed from Consul first")
|
||||
}
|
||||
|
||||
// Wait for actual exit
|
||||
select {
|
||||
case <-ctx.tr.WaitCh():
|
||||
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
// It should be impossible to reach here in less time than the shutdown delay
|
||||
if time.Now().Before(destroyed.Add(task.ShutdownDelay)) {
|
||||
t.Fatalf("task exited before shutdown delay")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -663,6 +663,7 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
|
||||
structsTask.Env = apiTask.Env
|
||||
structsTask.Meta = apiTask.Meta
|
||||
structsTask.KillTimeout = *apiTask.KillTimeout
|
||||
structsTask.ShutdownDelay = apiTask.ShutdownDelay
|
||||
|
||||
if l := len(apiTask.Constraints); l != 0 {
|
||||
structsTask.Constraints = make([]*structs.Constraint, l)
|
||||
|
||||
@@ -586,6 +586,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*api.Task, list
|
||||
"meta",
|
||||
"resources",
|
||||
"service",
|
||||
"shutdown_delay",
|
||||
"template",
|
||||
"user",
|
||||
"vault",
|
||||
|
||||
@@ -148,7 +148,8 @@ func TestParse(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
KillTimeout: helper.TimeToPtr(22 * time.Second),
|
||||
KillTimeout: helper.TimeToPtr(22 * time.Second),
|
||||
ShutdownDelay: 11 * time.Second,
|
||||
LogConfig: &api.LogConfig{
|
||||
MaxFiles: helper.IntToPtr(14),
|
||||
MaxFileSizeMB: helper.IntToPtr(101),
|
||||
|
||||
@@ -129,6 +129,8 @@ job "binstore-storagelocker" {
|
||||
|
||||
kill_timeout = "22s"
|
||||
|
||||
shutdown_delay = "11s"
|
||||
|
||||
artifact {
|
||||
source = "http://foo.com/artifact"
|
||||
|
||||
|
||||
@@ -1922,8 +1922,8 @@ func TestTaskGroupDiff(t *testing.T) {
|
||||
Driver: "docker",
|
||||
},
|
||||
{
|
||||
Name: "baz",
|
||||
Driver: "docker",
|
||||
Name: "baz",
|
||||
ShutdownDelay: 1 * time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1934,8 +1934,8 @@ func TestTaskGroupDiff(t *testing.T) {
|
||||
Driver: "docker",
|
||||
},
|
||||
{
|
||||
Name: "baz",
|
||||
Driver: "exec",
|
||||
Name: "baz",
|
||||
ShutdownDelay: 2 * time.Second,
|
||||
},
|
||||
{
|
||||
Name: "bam",
|
||||
@@ -1981,8 +1981,8 @@ func TestTaskGroupDiff(t *testing.T) {
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Driver",
|
||||
Old: "docker",
|
||||
New: "exec",
|
||||
Old: "1s",
|
||||
New: "2s",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -2997,6 +2997,10 @@ type Task struct {
|
||||
// Leader marks the task as the leader within the group. When the leader
|
||||
// task exits, other tasks will be gracefully terminated.
|
||||
Leader bool
|
||||
|
||||
// ShutdownDelay is the duration of the delay between deregistering a
|
||||
// task from Consul and sending it a signal to shutdown. See #2441
|
||||
ShutdownDelay time.Duration
|
||||
}
|
||||
|
||||
func (t *Task) Copy() *Task {
|
||||
@@ -3104,9 +3108,12 @@ func (t *Task) Validate(ephemeralDisk *EphemeralDisk) error {
|
||||
if t.Driver == "" {
|
||||
mErr.Errors = append(mErr.Errors, errors.New("Missing task driver"))
|
||||
}
|
||||
if t.KillTimeout.Nanoseconds() < 0 {
|
||||
if t.KillTimeout < 0 {
|
||||
mErr.Errors = append(mErr.Errors, errors.New("KillTimeout must be a positive value"))
|
||||
}
|
||||
if t.ShutdownDelay < 0 {
|
||||
mErr.Errors = append(mErr.Errors, errors.New("ShutdownDelay must be a positive value"))
|
||||
}
|
||||
|
||||
// Validate the resources.
|
||||
if t.Resources == nil {
|
||||
|
||||
@@ -71,6 +71,12 @@ job "docs" {
|
||||
[Consul][] for service discovery. Nomad automatically registers when a task
|
||||
is started and de-registers it when the task dies.
|
||||
|
||||
- `shutdown_delay` `(string: "0s")` - Specifies the duration to wait when
|
||||
killing a task between removing it from Consul and sending it a shutdown
|
||||
signal. Ideally services would fail healthchecks once they receive a shutdown
|
||||
signal. Alternatively `shutdown_delay` may be set to give in flight requests
|
||||
time to complete before shutting down.
|
||||
|
||||
- `user` `(string: <varies>)` - Specifies the user that will run the task.
|
||||
Defaults to `nobody` for the [`exec`][exec] and [`java`][java] drivers.
|
||||
[Docker][] and [rkt][] images specify their own default users. This can only
|
||||
|
||||
Reference in New Issue
Block a user