provide -no-shutdown-delay flag for job/alloc stop (#11596)

Some operators use very long group/task `shutdown_delay` settings to
safely drain network connections to their workloads after service
deregistration. But during incident response, they may want to cause
that drain to be skipped so they can quickly shed load.

Provide a `-no-shutdown-delay` flag on the `nomad alloc stop` and
`nomad job stop` commands that bypasses the delay. This sets a new
desired transition state on the affected allocations that the
allocation/task runner will identify during pre-kill on the client.

Note (as documented here) that using this flag will almost always
result in failed inbound network connections for workloads as the
tasks will exit before clients receive updated service discovery
information and won't be gracefully drained.
This commit is contained in:
Tim Gross
2021-12-13 14:54:53 -05:00
committed by GitHub
parent 43b3e1628f
commit 35c22bcb6c
18 changed files with 372 additions and 47 deletions

View File

@@ -112,6 +112,11 @@ type TaskRunner struct {
killErr error
killErrLock sync.Mutex
// shutdownDelayCtx is a context from the alloc runner which will
// tell us to exit early from shutdown_delay
shutdownDelayCtx context.Context
shutdownDelayCancelFn context.CancelFunc
// Logger is the logger for the task runner.
logger log.Logger
@@ -287,6 +292,13 @@ type Config struct {
// startConditionMetCtx is done when TR should start the task
StartConditionMetCtx <-chan struct{}
// ShutdownDelayCtx is a context from the alloc runner which will
// tell us to exit early from shutdown_delay
ShutdownDelayCtx context.Context
// ShutdownDelayCancelFn should only be used in testing.
ShutdownDelayCancelFn context.CancelFunc
}
func NewTaskRunner(config *Config) (*TaskRunner, error) {
@@ -342,6 +354,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
startConditionMetCtx: config.StartConditionMetCtx,
shutdownDelayCtx: config.ShutdownDelayCtx,
shutdownDelayCancelFn: config.ShutdownDelayCancelFn,
}
// Create the logger based on the allocation ID
@@ -895,6 +909,8 @@ func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.E
select {
case result := <-resultCh:
return result
case <-tr.shutdownDelayCtx.Done():
break
case <-time.After(delay):
}
}
@@ -1478,3 +1494,9 @@ func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) {
func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
tr.allocHookResources = res
}
// shutdownDelayCancel is used for testing only and cancels the
// shutdownDelayCtx
func (tr *TaskRunner) shutdownDelayCancel() {
tr.shutdownDelayCancelFn()
}

View File

@@ -14,6 +14,10 @@ import (
"time"
"github.com/golang/snappy"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/config"
@@ -26,6 +30,7 @@ import (
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
mockdriver "github.com/hashicorp/nomad/drivers/mock"
"github.com/hashicorp/nomad/drivers/rawexec"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
@@ -33,9 +38,6 @@ import (
"github.com/hashicorp/nomad/plugins/device"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type MockTaskStateUpdater struct {
@@ -94,26 +96,30 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
cleanup()
}
shutdownDelayCtx, shutdownDelayCancelFn := context.WithCancel(context.Background())
// Create a closed channel to mock TaskHookCoordinator.startConditionForTask.
// Closed channel indicates this task is not blocked on prestart hooks.
closedCh := make(chan struct{})
close(closedCh)
conf := &Config{
Alloc: alloc,
ClientConfig: clientConf,
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
ConsulSI: consulapi.NewMockServiceIdentitiesClient(),
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
StartConditionMetCtx: closedCh,
Alloc: alloc,
ClientConfig: clientConf,
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
ConsulSI: consulapi.NewMockServiceIdentitiesClient(),
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
StartConditionMetCtx: closedCh,
ShutdownDelayCtx: shutdownDelayCtx,
ShutdownDelayCancelFn: shutdownDelayCancelFn,
}
return conf, trCleanup
}
@@ -996,6 +1002,82 @@ WAIT:
}
}
// TestTaskRunner_NoShutdownDelay asserts services are removed from
// Consul and tasks are killed without waiting for ${shutdown_delay}
// when the alloc has the NoShutdownDelay transition flag set.
func TestTaskRunner_NoShutdownDelay(t *testing.T) {
t.Parallel()
// don't set this too high so that we don't block the test runner
// on shutting down the agent if the test fails
maxTestDuration := time.Duration(testutil.TestMultiplier()*10) * time.Second
maxTimeToFailDuration := time.Duration(testutil.TestMultiplier()) * time.Second
alloc := mock.Alloc()
alloc.DesiredTransition = structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)}
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Services[0].Tags = []string{"tag1"}
task.Services = task.Services[:1] // only need 1 for this test
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1000s",
}
task.ShutdownDelay = maxTestDuration
tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()
mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient)
testWaitForTaskToStart(t, tr)
testutil.WaitForResult(func() (bool, error) {
ops := mockConsul.GetOps()
if n := len(ops); n != 1 {
return false, fmt.Errorf("expected 1 consul operation. Found %d", n)
}
return ops[0].Op == "add", fmt.Errorf("consul operation was not a registration: %#v", ops[0])
}, func(err error) {
t.Fatalf("err: %v", err)
})
testCtx, cancel := context.WithTimeout(context.Background(), maxTimeToFailDuration)
defer cancel()
killed := make(chan error)
go func() {
tr.shutdownDelayCancel()
err := tr.Kill(testCtx, structs.NewTaskEvent("test"))
killed <- err
}()
// Wait for first de-registration call. Note that unlike
// TestTaskRunner_ShutdownDelay, we're racing with task exit
// and can't assert that we only get the first deregistration op
// (from serviceHook.PreKill).
testutil.WaitForResult(func() (bool, error) {
ops := mockConsul.GetOps()
if n := len(ops); n < 2 {
return false, fmt.Errorf("expected at least 2 consul operations.")
}
return ops[1].Op == "remove", fmt.Errorf(
"consul operation was not a deregistration: %#v", ops[1])
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Wait for the task to exit
select {
case <-tr.WaitCh():
case <-time.After(maxTimeToFailDuration):
t.Fatalf("task kill did not ignore shutdown delay")
return
}
err := <-killed
require.NoError(t, err, "killing task returned unexpected error")
}
// TestTaskRunner_Dispatch_Payload asserts that a dispatch job runs and the
// payload was written to disk.
func TestTaskRunner_Dispatch_Payload(t *testing.T) {