Merge pull request #6746 from hashicorp/f-shutdown-delay-tg

Group shutdown_delay
This commit is contained in:
Drew Bailey
2019-12-18 16:01:30 -05:00
committed by GitHub
16 changed files with 260 additions and 13 deletions

View File

@@ -1,5 +1,10 @@
## 0.10.3 (Unreleased)
FEATURES:
* jobspec: Add `shutdown_delay` to task groups so task groups can delay shutdown
after deregistering from Consul [[GH-6746](https://github.com/hashicorp/nomad/issues/6746)]
IMPROVEMENTS:
* scheduler: Removed penalty for allocation's previous node if the allocation did not fail. [[GH-6781](https://github.com/hashicorp/nomad/issues/6781)]

View File

@@ -411,6 +411,7 @@ type TaskGroup struct {
Networks []*NetworkResource
Meta map[string]string
Services []*Service
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
}
// NewTaskGroup creates a new TaskGroup.

View File

@@ -500,6 +500,9 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
var mu sync.Mutex
states := make(map[string]*structs.TaskState, len(ar.tasks))
// run alloc prekill hooks
ar.preKillHooks()
// Kill leader first, synchronously
for name, tr := range ar.tasks {
if !tr.IsLeader() {

View File

@@ -295,6 +295,29 @@ func (ar *allocRunner) destroy() error {
return merr.ErrorOrNil()
}
func (ar *allocRunner) preKillHooks() {
for _, hook := range ar.runnerHooks {
pre, ok := hook.(interfaces.RunnerPreKillHook)
if !ok {
continue
}
name := pre.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running alloc pre shutdown hook", "name", name, "start", start)
}
pre.PreKill()
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished alloc pre shutdown hook", "name", name, "end", end, "duration", end.Sub(start))
}
}
}
// shutdownHooks calls graceful shutdown hooks for when the agent is exiting.
func (ar *allocRunner) shutdownHooks() {
for _, hook := range ar.runnerHooks {

View File

@@ -141,6 +141,133 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
})
}
func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
// Create a group service
tg := alloc.Job.TaskGroups[0]
tg.Services = []*structs.Service{
{
Name: "shutdown_service",
},
}
// Create two tasks in the group
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "follower1"
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "10s",
}
task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "leader"
task2.Driver = "mock_driver"
task2.Leader = true
task2.Config = map[string]interface{}{
"run_for": "10s",
}
alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
alloc.AllocatedResources.Tasks[task.Name] = tr
alloc.AllocatedResources.Tasks[task2.Name] = tr
// Set a shutdown delay
shutdownDelay := 1 * time.Second
alloc.Job.TaskGroups[0].ShutdownDelay = &shutdownDelay
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()
// Wait for tasks to start
upd := conf.StateUpdater.(*MockStateUpdater)
last := upd.Last()
testutil.WaitForResult(func() (bool, error) {
last = upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if n := len(last.TaskStates); n != 2 {
return false, fmt.Errorf("Not enough task states (want: 2; found %d)", n)
}
for name, state := range last.TaskStates {
if state.State != structs.TaskStateRunning {
return false, fmt.Errorf("Task %q is not running yet (it's %q)", name, state.State)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Reset updates
upd.Reset()
// Stop alloc
shutdownInit := time.Now()
update := alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)
// Wait for tasks to stop
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
fin := last.TaskStates["leader"].FinishedAt
if fin.IsZero() {
return false, nil
}
return true, nil
}, func(err error) {
last := upd.Last()
for name, state := range last.TaskStates {
t.Logf("%s: %s", name, state.State)
}
t.Fatalf("err: %v", err)
})
// Get consul client operations
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
consulOpts := consulClient.GetOps()
var groupRemoveOp cconsul.MockConsulOp
for _, op := range consulOpts {
// Grab the first deregistration request
if op.Op == "remove" && op.Name == "group-web" {
groupRemoveOp = op
break
}
}
// Ensure remove operation is close to shutdown initiation
require.True(t, groupRemoveOp.OccurredAt.Sub(shutdownInit) < 100*time.Millisecond)
last = upd.Last()
minShutdown := shutdownInit.Add(task.ShutdownDelay)
leaderFinished := last.TaskStates["leader"].FinishedAt
followerFinished := last.TaskStates["follower1"].FinishedAt
// Check that both tasks shut down after min possible shutdown time
require.Greater(t, leaderFinished.UnixNano(), minShutdown.UnixNano())
require.Greater(t, followerFinished.UnixNano(), minShutdown.UnixNano())
// Check that there is at least shutdown_delay between consul
// remove operation and task finished at time
require.True(t, leaderFinished.Sub(groupRemoveOp.OccurredAt) > shutdownDelay)
}
// TestAllocRunner_TaskLeader_StopTG asserts that when stopping an alloc with a
// leader the leader is stopped before other tasks.
func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {

View File

@@ -2,6 +2,7 @@ package allocrunner
import (
"sync"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
@@ -20,6 +21,8 @@ type groupServiceHook struct {
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
prerun bool
delay time.Duration
deregistered bool
logger log.Logger
@@ -43,12 +46,20 @@ type groupServiceHookConfig struct {
}
func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
var shutdownDelay time.Duration
tg := cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup)
if tg.ShutdownDelay != nil {
shutdownDelay = *tg.ShutdownDelay
}
h := &groupServiceHook{
allocID: cfg.alloc.ID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
}
h.logger = cfg.logger.Named(h.Name())
h.services = cfg.alloc.Job.LookupTaskGroup(h.group).Services
@@ -117,10 +128,34 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}
func (h *groupServiceHook) PreKill() {
h.mu.Lock()
defer h.mu.Unlock()
// If we have a shutdown delay deregister
// group services and then wait
// before continuing to kill tasks
h.deregister()
h.deregistered = true
if h.delay == 0 {
return
}
h.logger.Debug("waiting before removing group service", "shutdown_delay", h.delay)
// Wait for specified shutdown_delay
// this will block an agent from shutting down
<-time.After(h.delay)
}
func (h *groupServiceHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()
h.deregister()
if !h.deregistered {
h.deregister()
}
return nil
}

View File

@@ -20,6 +20,7 @@ import (
var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerUpdateHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPostrunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPreKillHook = (*groupServiceHook)(nil)
// TestGroupServiceHook_NoGroupServices asserts calling group service hooks
// without group services does not error.

View File

@@ -16,6 +16,15 @@ type RunnerPrerunHook interface {
Prerun() error
}
// RunnerPreKillHooks are executed inside of KillTasks before
// iterating and killing each task. It will run before the Leader
// task is killed.
type RunnerPreKillHook interface {
RunnerHook
PreKill()
}
// RunnerPostrunHooks are executed after calling TaskRunner.Run, even for
// terminal allocations. Therefore Postrun hooks must be safe to call without
// first calling Prerun hooks.

View File

@@ -3,6 +3,7 @@ package consul
import (
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
@@ -12,9 +13,10 @@ import (
// MockConsulOp represents the register/deregister operations.
type MockConsulOp struct {
Op string // add, remove, or update
AllocID string
Name string // task or group name
Op string // add, remove, or update
AllocID string
Name string // task or group name
OccurredAt time.Time
}
func NewMockConsulOp(op, allocID, name string) MockConsulOp {
@@ -25,9 +27,10 @@ func NewMockConsulOp(op, allocID, name string) MockConsulOp {
panic(fmt.Errorf("invalid consul op: %s", op))
}
return MockConsulOp{
Op: op,
AllocID: allocID,
Name: name,
Op: op,
AllocID: allocID,
Name: name,
OccurredAt: time.Now(),
}
}

View File

@@ -708,6 +708,10 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
Mode: *taskGroup.RestartPolicy.Mode,
}
if taskGroup.ShutdownDelay != nil {
tg.ShutdownDelay = taskGroup.ShutdownDelay
}
if taskGroup.ReschedulePolicy != nil {
tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,

View File

@@ -51,6 +51,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"vault",
"migrate",
"spread",
"shutdown_delay",
"network",
"service",
"volume",
@@ -63,6 +64,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
if err := hcl.DecodeObject(&m, item.Val); err != nil {
return err
}
delete(m, "constraint")
delete(m, "affinity")
delete(m, "meta")
@@ -80,7 +82,16 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
// Build the group with the basic decode
var g api.TaskGroup
g.Name = helper.StringToPtr(n)
if err := mapstructure.WeakDecode(m, &g); err != nil {
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &g,
})
if err != nil {
return err
}
if err := dec.Decode(m); err != nil {
return err
}
@@ -201,7 +212,6 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
return multierror.Prefix(err, fmt.Sprintf("'%s',", n))
}
}
collection = append(collection, &g)
}

View File

@@ -926,8 +926,9 @@ func TestParse(t *testing.T) {
Datacenters: []string{"dc1"},
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("bar"),
Count: helper.IntToPtr(3),
Name: helper.StringToPtr("bar"),
ShutdownDelay: helper.TimeToPtr(14 * time.Second),
Count: helper.IntToPtr(3),
Networks: []*api.NetworkResource{
{
Mode: "bridge",

View File

@@ -2,7 +2,8 @@ job "foo" {
datacenters = ["dc1"]
group "bar" {
count = 3
count = 3
shutdown_delay = "14s"
network {
mode = "bridge"

View File

@@ -3547,6 +3547,10 @@ func (j *Job) Validate() error {
taskGroups[tg.Name] = idx
}
if tg.ShutdownDelay != nil && *tg.ShutdownDelay < 0 {
mErr.Errors = append(mErr.Errors, errors.New("ShutdownDelay must be a positive value"))
}
if j.Type == "system" && tg.Count > 1 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Job task group %s has count %d. Count cannot exceed 1 with system scheduler",
@@ -4736,6 +4740,10 @@ type TaskGroup struct {
// Volumes is a map of volumes that have been requested by the task group.
Volumes map[string]*VolumeRequest
// ShutdownDelay is the amount of time to wait between deregistering
// group services in consul and stopping tasks.
ShutdownDelay *time.Duration
}
func (tg *TaskGroup) Copy() *TaskGroup {
@@ -4782,6 +4790,10 @@ func (tg *TaskGroup) Copy() *TaskGroup {
}
}
if tg.ShutdownDelay != nil {
ntg.ShutdownDelay = tg.ShutdownDelay
}
return ntg
}

View File

@@ -65,6 +65,16 @@ job "docs" {
all tasks in this group. If omitted, a default policy exists for each job
type, which can be found in the [restart stanza documentation][restart].
- `shutdown_delay` `(string: "0s")` - Specifies the duration to wait when
stopping a group's tasks. The delay occurs between Consul deregistration
and sending each task a shutdown signal. Ideally, services would fail
healthchecks once they receive a shutdown signal. Alternatively
`shutdown_delay` may be set to give in flight requests time to complete
before shutting down. A group level `shutdown_delay` will run regardless
if there are any defined group services. In addition, tasks may have their
own [`shutdown_delay`](/docs/job-specification/task.html#shutdown_delay)
which waits between deregistering task services and stopping the task.
- `task` <code>([Task][]: <required>)</code> - Specifies one or more tasks to run
within this group. This can be specified multiple times, to add a task as part
of the group.

View File

@@ -90,7 +90,9 @@ job "docs" {
killing a task between removing it from Consul and sending it a shutdown
signal. Ideally services would fail healthchecks once they receive a shutdown
signal. Alternatively `shutdown_delay` may be set to give in flight requests
time to complete before shutting down.
time to complete before shutting down. In addition, task groups may have their
own [`shutdown_delay`](/docs/job-specification/group.html#shutdown_delay)
which waits between deregistering group services and stopping tasks.
- `user` `(string: <varies>)` - Specifies the user that will run the task.
Defaults to `nobody` for the [`exec`][exec] and [`java`][java] drivers.