support script checks for task group services (#6197)

In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.

To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.

Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.

When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
This commit is contained in:
Tim Gross
2019-09-03 15:09:04 -04:00
committed by GitHub
parent f02c163532
commit 40368d2c63
11 changed files with 1131 additions and 889 deletions

View File

@@ -0,0 +1,370 @@
package taskrunner
import (
"context"
"fmt"
"sync"
"time"
"github.com/hashicorp/consul/api"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
var _ interfaces.TaskPoststartHook = &scriptCheckHook{}
var _ interfaces.TaskUpdateHook = &scriptCheckHook{}
var _ interfaces.TaskStopHook = &scriptCheckHook{}
// default max amount of time to wait for all scripts on shutdown.
const defaultShutdownWait = time.Minute
type scriptCheckHookConfig struct {
alloc *structs.Allocation
task *structs.Task
consul consul.ConsulServiceAPI
logger log.Logger
shutdownWait time.Duration
}
// scriptCheckHook implements a task runner hook for running script
// checks in the context of a task
type scriptCheckHook struct {
consul consul.ConsulServiceAPI
allocID string
taskName string
logger log.Logger
shutdownWait time.Duration // max time to wait for scripts to shutdown
shutdownCh chan struct{} // closed when all scripts should shutdown
// The following fields can be changed by Update()
driverExec tinterfaces.ScriptExecutor
taskEnv *taskenv.TaskEnv
// These maintain state
scripts map[string]*scriptCheck
runningScripts map[string]*taskletHandle
// Since Update() may be called concurrently with any other hook all
// hook methods must be fully serialized
mu sync.Mutex
}
func newScriptCheckHook(c scriptCheckHookConfig) *scriptCheckHook {
scriptChecks := make(map[string]*scriptCheck)
for _, service := range c.task.Services {
for _, check := range service.Checks {
if check.Type != structs.ServiceCheckScript {
continue
}
sc := newScriptCheck(&scriptCheckConfig{
allocID: c.alloc.ID,
taskName: c.task.Name,
check: check,
service: service,
agent: c.consul,
})
scriptChecks[sc.id] = sc
}
}
// Walk back through the task group to see if there are script checks
// associated with the task. If so, we'll create scriptCheck tasklets
// for them. The group-level service and any check restart behaviors it
// needs are entirely encapsulated within the group service hook which
// watches Consul for status changes.
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
for _, service := range tg.Services {
for _, check := range service.Checks {
if check.Type != structs.ServiceCheckScript {
continue
}
if check.TaskName != c.task.Name {
continue
}
groupTaskName := "group-" + tg.Name
sc := newScriptCheck(&scriptCheckConfig{
allocID: c.alloc.ID,
taskName: groupTaskName,
service: service,
check: check,
agent: c.consul,
})
scriptChecks[sc.id] = sc
}
}
h := &scriptCheckHook{
consul: c.consul,
allocID: c.alloc.ID,
taskName: c.task.Name,
scripts: scriptChecks,
runningScripts: make(map[string]*taskletHandle),
shutdownWait: defaultShutdownWait,
shutdownCh: make(chan struct{}),
}
if c.shutdownWait != 0 {
h.shutdownWait = c.shutdownWait // override for testing
}
h.logger = c.logger.Named(h.Name())
return h
}
func (h *scriptCheckHook) Name() string {
return "script_checks"
}
// PostStart implements interfaces.TaskPoststartHook. It adds the current
// task context (driver and env) to the script checks and starts up the
// scripts.
func (h *scriptCheckHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
if req.DriverExec == nil {
return fmt.Errorf("driver doesn't support script checks")
}
// Store the TaskEnv for interpolating now and when Updating
h.driverExec = req.DriverExec
h.taskEnv = req.TaskEnv
h.scripts = h.getTaskScriptChecks()
// Handle starting scripts
for checkID, script := range h.scripts {
// If it's already running, cancel and replace
if oldScript, running := h.runningScripts[checkID]; running {
oldScript.cancel()
}
// Start and store the handle
h.runningScripts[checkID] = script.run()
}
return nil
}
// Updated implements interfaces.TaskUpdateHook. It adds the current
// task context (driver and env) to the script checks and replaces any
// that have been changed.
func (h *scriptCheckHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
// Get current script checks with request's driver metadata as it
// can't change due to Updates
oldScriptChecks := h.getTaskScriptChecks()
task := req.Alloc.LookupTask(h.taskName)
if task == nil {
return fmt.Errorf("task %q not found in updated alloc", h.taskName)
}
// Update service hook fields
h.taskEnv = req.TaskEnv
// Create new script checks struct with those new values
newScriptChecks := h.getTaskScriptChecks()
// Handle starting scripts
for checkID, script := range newScriptChecks {
if _, ok := oldScriptChecks[checkID]; ok {
// If it's already running, cancel and replace
if oldScript, running := h.runningScripts[checkID]; running {
oldScript.cancel()
}
// Start and store the handle
h.runningScripts[checkID] = script.run()
}
}
// Cancel scripts we no longer want
for checkID := range oldScriptChecks {
if _, ok := newScriptChecks[checkID]; !ok {
if oldScript, running := h.runningScripts[checkID]; running {
oldScript.cancel()
}
}
}
return nil
}
// Stop implements interfaces.TaskStopHook and blocks waiting for running
// scripts to finish (or for the shutdownWait timeout to expire).
func (h *scriptCheckHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
close(h.shutdownCh)
deadline := time.After(h.shutdownWait)
err := fmt.Errorf("timed out waiting for script checks to exit")
for _, script := range h.runningScripts {
select {
case <-script.wait():
case <-ctx.Done():
// the caller is passing the background context, so
// we should never really see this outside of testing
case <-deadline:
// at this point the Consul client has been cleaned
// up so we don't want to hang onto this.
return err
}
}
return nil
}
// getTaskScriptChecks returns an interpolated copy of services and checks with
// values from the task's environment.
func (h *scriptCheckHook) getTaskScriptChecks() map[string]*scriptCheck {
// Guard against not having a valid taskEnv. This can be the case if the
// PreKilling or Exited hook is run before Poststart.
if h.taskEnv == nil || h.driverExec == nil {
return nil
}
newChecks := make(map[string]*scriptCheck)
for _, orig := range h.scripts {
sc := orig.Copy()
sc.exec = h.driverExec
sc.logger = h.logger
sc.shutdownCh = h.shutdownCh
sc.callback = newScriptCheckCallback(sc)
sc.Command = h.taskEnv.ReplaceEnv(orig.Command)
sc.Args = h.taskEnv.ParseAndReplace(orig.Args)
newChecks[sc.id] = sc
}
return newChecks
}
// heartbeater is the subset of consul agent functionality needed by script
// checks to heartbeat
type heartbeater interface {
UpdateTTL(id, output, status string) error
}
// scriptCheck runs script checks via a interfaces.ScriptExecutor and updates the
// appropriate check's TTL when the script succeeds.
type scriptCheck struct {
id string
agent heartbeater
lastCheckOk bool // true if the last check was ok; otherwise false
tasklet
}
// scriptCheckConfig is a parameter struct for newScriptCheck
type scriptCheckConfig struct {
allocID string
taskName string
service *structs.Service
check *structs.ServiceCheck
agent heartbeater
}
// newScriptCheck constructs a scriptCheck. we're only going to
// configure the immutable fields of scriptCheck here, with the
// rest being configured during the Poststart hook so that we have
// the rest of the task execution environment
func newScriptCheck(config *scriptCheckConfig) *scriptCheck {
serviceID := agentconsul.MakeTaskServiceID(
config.allocID, config.taskName, config.service)
checkID := agentconsul.MakeCheckID(serviceID, config.check)
sc := &scriptCheck{
id: checkID,
agent: config.agent,
lastCheckOk: true, // start logging on first failure
}
// we can't use the promoted fields of tasklet in the struct literal
sc.Command = config.check.Command
sc.Args = config.check.Args
sc.Interval = config.check.Interval
sc.Timeout = config.check.Timeout
return sc
}
// Copy does a *shallow* copy of script checks.
func (sc *scriptCheck) Copy() *scriptCheck {
newSc := sc
return newSc
}
// closes over the script check and returns the taskletCallback for
// when the script check executes.
func newScriptCheckCallback(s *scriptCheck) taskletCallback {
return func(ctx context.Context, params execResult) {
output := params.output
code := params.code
err := params.err
state := api.HealthCritical
switch code {
case 0:
state = api.HealthPassing
case 1:
state = api.HealthWarning
}
var outputMsg string
if err != nil {
state = api.HealthCritical
outputMsg = err.Error()
} else {
outputMsg = string(output)
}
// heartbeat the check to Consul
err = s.updateTTL(ctx, s.id, outputMsg, state)
select {
case <-ctx.Done():
// check has been removed; don't report errors
return
default:
}
if err != nil {
if s.lastCheckOk {
s.lastCheckOk = false
s.logger.Warn("updating check failed", "error", err)
} else {
s.logger.Debug("updating check still failing", "error", err)
}
} else if !s.lastCheckOk {
// Succeeded for the first time or after failing; log
s.lastCheckOk = true
s.logger.Info("updating check succeeded")
}
}
}
const (
updateTTLBackoffBaseline = 1 * time.Second
updateTTLBackoffLimit = 3 * time.Second
)
// updateTTL updates the state to Consul, performing an expontential backoff
// in the case where the check isn't registered in Consul to avoid a race between
// service registration and the first check.
func (s *scriptCheck) updateTTL(ctx context.Context, id, msg, state string) error {
for attempts := 0; ; attempts++ {
err := s.agent.UpdateTTL(id, msg, state)
if err == nil {
return nil
}
// Handle the retry case
backoff := (1 << (2 * uint64(attempts))) * updateTTLBackoffBaseline
if backoff > updateTTLBackoffLimit {
return err
}
// Wait till retrying
select {
case <-ctx.Done():
return err
case <-time.After(backoff):
}
}
}

View File

@@ -0,0 +1,215 @@
package taskrunner
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/consul/api"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func newScriptMock(hb heartbeater, exec interfaces.ScriptExecutor, logger hclog.Logger, interval, timeout time.Duration) *scriptCheck {
script := newScriptCheck(&scriptCheckConfig{
allocID: "allocid",
taskName: "testtask",
agent: hb,
service: &structs.Service{Name: "xx"},
check: &structs.ServiceCheck{},
})
script.exec = exec
script.logger = logger
script.Interval = interval
script.Timeout = timeout
script.callback = newScriptCheckCallback(script)
script.lastCheckOk = true
return script
}
// fakeHeartbeater implements the heartbeater interface to allow mocking out
// Consul in script executor tests.
type fakeHeartbeater struct {
heartbeats chan heartbeat
}
func (f *fakeHeartbeater) UpdateTTL(checkID, output, status string) error {
f.heartbeats <- heartbeat{checkID: checkID, output: output, status: status}
return nil
}
func newFakeHeartbeater() *fakeHeartbeater {
return &fakeHeartbeater{heartbeats: make(chan heartbeat)}
}
type heartbeat struct {
checkID string
output string
status string
}
// TestScript_Exec_Cancel asserts cancelling a script check shortcircuits
// any running scripts.
func TestScript_Exec_Cancel(t *testing.T) {
exec, cancel := newBlockingScriptExec()
defer cancel()
logger := testlog.HCLogger(t)
script := newScriptMock(nil, // heartbeater should never be called
exec, logger, time.Hour, time.Hour)
handle := script.run()
<-exec.running // wait until Exec is called
handle.cancel() // cancel now that we're blocked in exec
select {
case <-handle.wait():
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
// The underlying ScriptExecutor (newBlockScriptExec) *cannot* be
// canceled. Only a wrapper around it obeys the context cancelation.
require.NotEqual(t, atomic.LoadInt32(&exec.exited), 1,
"expected script executor to still be running after timeout")
}
// TestScript_Exec_TimeoutBasic asserts a script will be killed when the
// timeout is reached.
func TestScript_Exec_TimeoutBasic(t *testing.T) {
t.Parallel()
exec, cancel := newBlockingScriptExec()
defer cancel()
logger := testlog.HCLogger(t)
hb := newFakeHeartbeater()
script := newScriptMock(hb, exec, logger, time.Hour, time.Second)
handle := script.run()
defer handle.cancel() // cleanup
<-exec.running // wait until Exec is called
// Check for UpdateTTL call
select {
case update := <-hb.heartbeats:
require.Equal(t, update.output, context.DeadlineExceeded.Error())
require.Equal(t, update.status, api.HealthCritical)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
// The underlying ScriptExecutor (newBlockScriptExec) *cannot* be
// canceled. Only a wrapper around it obeys the context cancelation.
require.NotEqual(t, atomic.LoadInt32(&exec.exited), 1,
"expected script executor to still be running after timeout")
// Cancel and watch for exit
handle.cancel()
select {
case <-handle.wait(): // ok!
case update := <-hb.heartbeats:
t.Errorf("unexpected UpdateTTL call on exit with status=%q", update)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
}
// TestScript_Exec_TimeoutCritical asserts a script will be killed when
// the timeout is reached and always set a critical status regardless of what
// Exec returns.
func TestScript_Exec_TimeoutCritical(t *testing.T) {
t.Parallel()
logger := testlog.HCLogger(t)
hb := newFakeHeartbeater()
script := newScriptMock(hb, sleeperExec{}, logger, time.Hour, time.Nanosecond)
handle := script.run()
defer handle.cancel() // cleanup
// Check for UpdateTTL call
select {
case update := <-hb.heartbeats:
require.Equal(t, update.output, context.DeadlineExceeded.Error())
require.Equal(t, update.status, api.HealthCritical)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to timeout")
}
}
// TestScript_Exec_Shutdown asserts a script will be executed once more
// when told to shutdown.
func TestScript_Exec_Shutdown(t *testing.T) {
shutdown := make(chan struct{})
exec := newSimpleExec(0, nil)
logger := testlog.HCLogger(t)
hb := newFakeHeartbeater()
script := newScriptMock(hb, exec, logger, time.Hour, 3*time.Second)
script.shutdownCh = shutdown
handle := script.run()
defer handle.cancel() // cleanup
close(shutdown) // tell scriptCheck to exit
select {
case update := <-hb.heartbeats:
require.Equal(t, update.output, "code=0 err=<nil>")
require.Equal(t, update.status, api.HealthPassing)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
select {
case <-handle.wait(): // ok!
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
}
// TestScript_Exec_Codes asserts script exit codes are translated to their
// corresponding Consul health check status.
func TestScript_Exec_Codes(t *testing.T) {
exec := newScriptedExec([]execResult{
{[]byte("output"), 1, nil},
{[]byte("output"), 0, nil},
{[]byte("output"), 0, context.DeadlineExceeded},
{[]byte("output"), 0, nil},
{[]byte("<ignored output>"), 2, fmt.Errorf("some error")},
{[]byte("output"), 0, nil},
{[]byte("error9000"), 9000, nil},
})
logger := testlog.HCLogger(t)
hb := newFakeHeartbeater()
script := newScriptMock(
hb, exec, logger, time.Nanosecond, 3*time.Second)
handle := script.run()
defer handle.cancel() // cleanup
deadline := time.After(3 * time.Second)
expected := []heartbeat{
{script.id, "output", api.HealthWarning},
{script.id, "output", api.HealthPassing},
{script.id, context.DeadlineExceeded.Error(), api.HealthCritical},
{script.id, "output", api.HealthPassing},
{script.id, "some error", api.HealthCritical},
{script.id, "output", api.HealthPassing},
{script.id, "error9000", api.HealthCritical},
}
for i := 0; i <= 6; i++ {
select {
case update := <-hb.heartbeats:
require.Equal(t, update, expected[i],
"expected update %d to be '%s' but received '%s'",
i, expected[i], update)
case <-deadline:
t.Fatalf("timed out waiting for all script checks to finish")
}
}
}

View File

@@ -105,6 +105,15 @@ func (tr *TaskRunner) initHooks() {
logger: hookLogger,
}))
}
// If there are any script checks, add the hook
scriptCheckHook := newScriptCheckHook(scriptCheckHookConfig{
alloc: tr.Alloc(),
task: tr.Task(),
consul: tr.consulClient,
logger: hookLogger,
})
tr.runnerHooks = append(tr.runnerHooks, scriptCheckHook)
}
func (tr *TaskRunner) emitHookError(err error, hookName string) {

View File

@@ -0,0 +1,158 @@
package taskrunner
import (
"context"
"time"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
)
// contextExec allows canceling a interfaces.ScriptExecutor with a context.
type contextExec struct {
// pctx is the parent context. A subcontext will be created with Exec's
// timeout.
pctx context.Context
// exec to be wrapped in a context
exec interfaces.ScriptExecutor
}
func newContextExec(ctx context.Context, exec interfaces.ScriptExecutor) *contextExec {
return &contextExec{
pctx: ctx,
exec: exec,
}
}
// execResult are the outputs of an Exec
type execResult struct {
output []byte
code int
err error
}
// Exec a command until the timeout expires, the context is canceled, or the
// underlying Exec returns.
func (c *contextExec) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
resCh := make(chan execResult, 1)
// Don't trust the underlying implementation to obey timeout
ctx, cancel := context.WithTimeout(c.pctx, timeout)
defer cancel()
go func() {
output, code, err := c.exec.Exec(timeout, cmd, args)
select {
case resCh <- execResult{output, code, err}:
case <-ctx.Done():
}
}()
select {
case res := <-resCh:
return res.output, res.code, res.err
case <-ctx.Done():
return nil, 0, ctx.Err()
}
}
// tasklet is an abstraction around periodically running a script within
// the context of a Task. The interfaces.ScriptExecutor is fired at least
// once and on each interval, and fires a callback whenever the script
// is complete.
type tasklet struct {
Command string // Command is the command to run for tasklet
Args []string // Args is a list of arguments for tasklet
Interval time.Duration // Interval of the tasklet
Timeout time.Duration // Timeout of the tasklet
exec interfaces.ScriptExecutor
callback taskletCallback
logger log.Logger
shutdownCh <-chan struct{}
}
// taskletHandle is returned by tasklet.run by cancelling a tasklet and
// waiting for it to shutdown.
type taskletHandle struct {
// cancel the script
cancel func()
exitCh chan struct{}
}
// wait returns a chan that's closed when the tasklet exits
func (t taskletHandle) wait() <-chan struct{} {
return t.exitCh
}
// taskletCallback is called with a cancellation context and the output of a
// tasklet's Exec whenever it runs.
type taskletCallback func(context.Context, execResult)
// run this tasklet check and return its cancel func. The tasklet's
// callback will be called each time it completes. If the shutdownCh is
// closed the check will be run once more before exiting.
func (t *tasklet) run() *taskletHandle {
ctx, cancel := context.WithCancel(context.Background())
exitCh := make(chan struct{})
// Wrap the original interfaces.ScriptExecutor in one that obeys context
// cancelation.
ctxExec := newContextExec(ctx, t.exec)
go func() {
defer close(exitCh)
timer := time.NewTimer(0)
defer timer.Stop()
for {
// Block until tasklet is removed, Nomad is shutting
// down, or the tasklet interval is up
select {
case <-ctx.Done():
// tasklet has been removed
return
case <-t.shutdownCh:
// unblock but don't exit until after we run once more
case <-timer.C:
timer.Reset(t.Interval)
}
metrics.IncrCounter([]string{
"client", "allocrunner", "taskrunner", "tasklet_runs"}, 1)
// Execute check script with timeout
t.logger.Trace("tasklet executing")
output, code, err := ctxExec.Exec(t.Timeout, t.Command, t.Args)
switch err {
case context.Canceled:
// check removed during execution; exit
return
case context.DeadlineExceeded:
metrics.IncrCounter([]string{
"client", "allocrunner", "taskrunner",
"tasklet_timeouts"}, 1)
// If no error was returned, set one to make sure the tasklet
// is marked as failed
if err == nil {
err = context.DeadlineExceeded
}
// Log deadline exceeded every time as it's a
// distinct issue from the tasklet returning failure
t.logger.Warn("tasklet timed out", "timeout", t.Timeout)
}
t.callback(ctx, execResult{output, code, err})
select {
case <-t.shutdownCh:
// We've been told to exit and just ran so exit
return
default:
}
}
}()
return &taskletHandle{cancel: cancel, exitCh: exitCh}
}

View File

@@ -0,0 +1,268 @@
package taskrunner
import (
"context"
"fmt"
"os"
"os/exec"
"sync/atomic"
"testing"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/stretchr/testify/assert"
)
func TestMain(m *testing.M) {
if !testtask.Run() {
os.Exit(m.Run())
}
}
func TestTasklet_Exec_HappyPath(t *testing.T) {
results := []execResult{
{[]byte("output"), 0, nil},
{[]byte("output"), 1, nil},
{[]byte("output"), 0, context.DeadlineExceeded},
{[]byte("<ignored output>"), 2, fmt.Errorf("some error")},
{[]byte("error9000"), 9000, nil},
}
exec := newScriptedExec(results)
tm := newTaskletMock(exec, testlog.HCLogger(t), time.Nanosecond, 3*time.Second)
handle := tm.run()
defer handle.cancel() // just-in-case cleanup
deadline := time.After(3 * time.Second)
for i := 0; i <= 4; i++ {
select {
case result := <-tm.calls:
// for the happy path without cancelations or shutdowns, we expect
// to get the results passed to the callback in order and without
// modification
assert.Equal(t, result, results[i])
case <-deadline:
t.Fatalf("timed out waiting for all script checks to finish")
}
}
}
// TestTasklet_Exec_Cancel asserts cancelling a tasklet short-circuits
// any running executions the tasklet
func TestTasklet_Exec_Cancel(t *testing.T) {
exec, cancel := newBlockingScriptExec()
defer cancel()
tm := newTaskletMock(exec, testlog.HCLogger(t), time.Hour, time.Hour)
handle := tm.run()
<-exec.running // wait until Exec is called
handle.cancel() // cancel now that we're blocked in exec
select {
case <-handle.wait():
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for tasklet check to exit")
}
// The underlying ScriptExecutor (newBlockScriptExec) *cannot* be
// canceled. Only a wrapper around it obeys the context cancelation.
if atomic.LoadInt32(&exec.exited) == 1 {
t.Errorf("expected script executor to still be running after timeout")
}
// No tasklets finished, so no callbacks should have gotten a
// chance to fire
select {
case call := <-tm.calls:
t.Errorf("expected 0 calls of tasklet, got %v", call)
default:
break
}
}
// TestTasklet_Exec_Timeout asserts a tasklet script will be killed
// when the timeout is reached.
func TestTasklet_Exec_Timeout(t *testing.T) {
t.Parallel()
exec, cancel := newBlockingScriptExec()
defer cancel()
tm := newTaskletMock(exec, testlog.HCLogger(t), time.Hour, time.Second)
handle := tm.run()
defer handle.cancel() // just-in-case cleanup
<-exec.running // wait until Exec is called
// We should get a timeout
select {
case update := <-tm.calls:
if update.err != context.DeadlineExceeded {
t.Errorf("expected context.DeadlineExceeed but received %+v", update)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
// The underlying ScriptExecutor (newBlockScriptExec) *cannot* be
// canceled. Only a wrapper around it obeys the context cancelation.
if atomic.LoadInt32(&exec.exited) == 1 {
t.Errorf("expected executor to still be running after timeout")
}
// Cancel and watch for exit
handle.cancel()
select {
case <-handle.wait(): // ok!
case update := <-tm.calls:
t.Errorf("unexpected extra callback on exit with status=%v", update)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for tasklet to exit")
}
}
// TestTasklet_Exec_Shutdown asserts a script will be executed once more
// when told to shutdown.
func TestTasklet_Exec_Shutdown(t *testing.T) {
exec := newSimpleExec(0, nil)
shutdown := make(chan struct{})
tm := newTaskletMock(exec, testlog.HCLogger(t), time.Hour, 3*time.Second)
tm.shutdownCh = shutdown
handle := tm.run()
defer handle.cancel() // just-in-case cleanup
close(shutdown) // tell script to exit
select {
case update := <-tm.calls:
if update.err != nil {
t.Errorf("expected clean shutdown but received %q", update.err)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
select {
case <-handle.wait(): // ok
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
}
// test helpers
type taskletMock struct {
tasklet
calls chan execResult
}
func newTaskletMock(exec interfaces.ScriptExecutor, logger hclog.Logger, interval, timeout time.Duration) *taskletMock {
tm := &taskletMock{calls: make(chan execResult)}
tm.exec = exec
tm.logger = logger
tm.Interval = interval
tm.Timeout = timeout
tm.callback = func(ctx context.Context, params execResult) {
tm.calls <- params
}
return tm
}
// blockingScriptExec implements ScriptExec by running a subcommand that never
// exits.
type blockingScriptExec struct {
// pctx is canceled *only* for test cleanup. Just like real
// ScriptExecutors its Exec method cannot be canceled directly -- only
// with a timeout.
pctx context.Context
// running is ticked before blocking to allow synchronizing operations
running chan struct{}
// set to 1 with atomics if Exec is called and has exited
exited int32
}
// newBlockingScriptExec returns a ScriptExecutor that blocks Exec() until the
// caller recvs on the b.running chan. It also returns a CancelFunc for test
// cleanup only. The runtime cannot cancel ScriptExecutors before their timeout
// expires.
func newBlockingScriptExec() (*blockingScriptExec, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
exec := &blockingScriptExec{
pctx: ctx,
running: make(chan struct{}),
}
return exec, cancel
}
func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]byte, int, error) {
b.running <- struct{}{}
ctx, cancel := context.WithTimeout(b.pctx, dur)
defer cancel()
cmd := exec.CommandContext(ctx, testtask.Path(), "sleep", "9000h")
testtask.SetCmdEnv(cmd)
err := cmd.Run()
code := 0
if exitErr, ok := err.(*exec.ExitError); ok {
if !exitErr.Success() {
code = 1
}
}
atomic.StoreInt32(&b.exited, 1)
return []byte{}, code, err
}
// sleeperExec sleeps for 100ms but returns successfully to allow testing timeout conditions
type sleeperExec struct{}
func (sleeperExec) Exec(time.Duration, string, []string) ([]byte, int, error) {
time.Sleep(100 * time.Millisecond)
return []byte{}, 0, nil
}
// simpleExec is a fake ScriptExecutor that returns whatever is specified.
type simpleExec struct {
code int
err error
}
func (s simpleExec) Exec(time.Duration, string, []string) ([]byte, int, error) {
return []byte(fmt.Sprintf("code=%d err=%v", s.code, s.err)), s.code, s.err
}
// newSimpleExec creates a new ScriptExecutor that returns the given code and err.
func newSimpleExec(code int, err error) simpleExec {
return simpleExec{code: code, err: err}
}
// scriptedExec is a fake ScriptExecutor with a predetermined sequence
// of results.
type scriptedExec struct {
fn func() ([]byte, int, error)
}
// For each call to Exec, scriptedExec returns the next result in its
// sequence of results
func (s scriptedExec) Exec(time.Duration, string, []string) ([]byte, int, error) {
return s.fn()
}
func newScriptedExec(results []execResult) scriptedExec {
index := 0
s := scriptedExec{}
// we have to close over the index because the interface we're
// mocking expects a value and not a pointer, which prevents
// us from updating the index
fn := func() ([]byte, int, error) {
result := results[index]
// prevents us from iterating off the end of the results
if index+1 < len(results) {
index = index + 1
}
return result.output, result.code, result.err
}
s.fn = fn
return s
}

View File

@@ -15,4 +15,5 @@ type ConsulServiceAPI interface {
RemoveTask(*consul.TaskServices)
UpdateTask(old, newTask *consul.TaskServices) error
AllocRegistrations(allocID string) (*consul.AllocRegistration, error)
UpdateTTL(id, output, status string) error
}

View File

@@ -21,7 +21,7 @@ type MockConsulOp struct {
func NewMockConsulOp(op, allocID, name string) MockConsulOp {
switch op {
case "add", "remove", "update", "alloc_registrations",
"add_group", "remove_group", "update_group":
"add_group", "remove_group", "update_group", "update_ttl":
default:
panic(fmt.Errorf("invalid consul op: %s", op))
}
@@ -123,6 +123,15 @@ func (m *MockConsulServiceClient) AllocRegistrations(allocID string) (*consul.Al
return nil, nil
}
func (m *MockConsulServiceClient) UpdateTTL(checkID, output, status string) error {
// TODO(tgross): this method is here so we can implement the
// interface but the locking we need for testing creates a lot
// of opportunities for deadlocks in testing that will never
// appear in live code.
m.logger.Trace("UpdateTTL", "check_id", checkID, "status", status)
return nil
}
func (m *MockConsulServiceClient) GetOps() []MockConsulOp {
m.mu.Lock()
defer m.mu.Unlock()

View File

@@ -105,10 +105,8 @@ func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.Agen
// operations are submitted to the main loop via commit() for synchronizing
// with Consul.
type operations struct {
regServices []*api.AgentServiceRegistration
regChecks []*api.AgentCheckRegistration
scripts []*scriptCheck
regServices []*api.AgentServiceRegistration
regChecks []*api.AgentCheckRegistration
deregServices []string
deregChecks []string
}
@@ -230,10 +228,8 @@ type ServiceClient struct {
opCh chan *operations
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
scripts map[string]*scriptCheck
runningScripts map[string]*scriptHandle
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
explicitlyDeregisteredServices map[string]bool
explicitlyDeregisteredChecks map[string]bool
@@ -284,8 +280,6 @@ func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bo
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
explicitlyDeregisteredServices: make(map[string]bool),
explicitlyDeregisteredChecks: make(map[string]bool),
allocRegistrations: make(map[string]*AllocRegistration),
@@ -439,25 +433,16 @@ func (c *ServiceClient) merge(ops *operations) {
for _, check := range ops.regChecks {
c.checks[check.ID] = check
}
for _, s := range ops.scripts {
c.scripts[s.id] = s
}
for _, sid := range ops.deregServices {
delete(c.services, sid)
c.explicitlyDeregisteredServices[sid] = true
}
for _, cid := range ops.deregChecks {
if script, ok := c.runningScripts[cid]; ok {
script.cancel()
delete(c.scripts, cid)
delete(c.runningScripts, cid)
}
delete(c.checks, cid)
c.explicitlyDeregisteredChecks[cid] = true
}
metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services)))
metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks)))
metrics.SetGauge([]string{"client", "consul", "script_checks"}, float32(len(c.runningScripts)))
}
// sync enqueued operations.
@@ -593,16 +578,6 @@ func (c *ServiceClient) sync() error {
}
creg++
metrics.IncrCounter([]string{"client", "consul", "check_registrations"}, 1)
// Handle starting scripts
if script, ok := c.scripts[id]; ok {
// If it's already running, cancel and replace
if oldScript, running := c.runningScripts[id]; running {
oldScript.cancel()
}
// Start and store the handle
c.runningScripts[id] = script.run()
}
}
// Only log if something was actually synced
@@ -649,7 +624,7 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
ops.regServices = append(ops.regServices, serviceReg)
for _, check := range service.Checks {
checkID := makeCheckID(id, check)
checkID := MakeCheckID(id, check)
if check.Type == structs.ServiceCheckScript {
return fmt.Errorf("service %q contains invalid check: agent checks do not support scripts", service.Name)
}
@@ -782,17 +757,9 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
checkIDs := make([]string, 0, numChecks)
for _, check := range service.Checks {
checkID := makeCheckID(serviceID, check)
checkID := MakeCheckID(serviceID, check)
checkIDs = append(checkIDs, checkID)
if check.Type == structs.ServiceCheckScript {
if task.DriverExec == nil {
return nil, fmt.Errorf("driver doesn't support script checks")
}
sc := newScriptCheck(task.AllocID, task.Name, checkID, check, task.DriverExec,
c.client, c.logger, c.shutdownCh)
ops.scripts = append(ops.scripts, sc)
// Skip getAddress for script checks
checkReg, err := createCheckReg(serviceID, checkID, check, "", 0)
if err != nil {
@@ -977,7 +944,7 @@ func (c *ServiceClient) RegisterTask(task *TaskServices) error {
serviceID := MakeTaskServiceID(task.AllocID, task.Name, service)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := makeCheckID(serviceID, check)
checkID := MakeCheckID(serviceID, check)
c.checkWatcher.Watch(task.AllocID, task.Name, checkID, check, task.Restarter)
}
}
@@ -1012,7 +979,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
// Existing service entry removed
ops.deregServices = append(ops.deregServices, existingID)
for _, check := range existingSvc.Checks {
cid := makeCheckID(existingID, check)
cid := MakeCheckID(existingID, check)
ops.deregChecks = append(ops.deregChecks, cid)
// Unwatch watched checks
@@ -1040,12 +1007,12 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
// See if any checks were updated
existingChecks := make(map[string]*structs.ServiceCheck, len(existingSvc.Checks))
for _, check := range existingSvc.Checks {
existingChecks[makeCheckID(existingID, check)] = check
existingChecks[MakeCheckID(existingID, check)] = check
}
// Register new checks
for _, check := range newSvc.Checks {
checkID := makeCheckID(existingID, check)
checkID := MakeCheckID(existingID, check)
if _, exists := existingChecks[checkID]; exists {
// Check is still required. Remove it from the map so it doesn't get
// deleted later.
@@ -1101,7 +1068,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
serviceID := MakeTaskServiceID(newTask.AllocID, newTask.Name, service)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := makeCheckID(serviceID, check)
checkID := MakeCheckID(serviceID, check)
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
}
}
@@ -1120,7 +1087,7 @@ func (c *ServiceClient) RemoveTask(task *TaskServices) {
ops.deregServices = append(ops.deregServices, id)
for _, check := range service.Checks {
cid := makeCheckID(id, check)
cid := MakeCheckID(id, check)
ops.deregChecks = append(ops.deregChecks, cid)
if check.TriggersRestarts() {
@@ -1177,6 +1144,11 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration,
return reg, nil
}
// TODO(tgross): make sure this is properly nil-checked, etc.
func (c *ServiceClient) UpdateTTL(id, output, status string) error {
return c.client.UpdateTTL(id, output, status)
}
// Shutdown the Consul client. Update running task registrations and deregister
// agent from Consul. On first call blocks up to shutdownWait before giving up
// on syncing operations.
@@ -1220,14 +1192,6 @@ func (c *ServiceClient) Shutdown() error {
}
}
// Give script checks time to exit (no need to lock as Run() has exited)
for _, h := range c.runningScripts {
select {
case <-h.wait():
case <-deadline:
return fmt.Errorf("timed out waiting for script checks to run")
}
}
return nil
}
@@ -1285,10 +1249,10 @@ func MakeTaskServiceID(allocID, taskName string, service *structs.Service) strin
return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel)
}
// makeCheckID creates a unique ID for a check.
// MakeCheckID creates a unique ID for a check.
//
// Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d
func makeCheckID(serviceID string, check *structs.ServiceCheck) string {
func MakeCheckID(serviceID string, check *structs.ServiceCheck) string {
return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID))
}

View File

@@ -1,215 +0,0 @@
package consul
import (
"context"
"time"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
)
// heartbeater is the subset of consul agent functionality needed by script
// checks to heartbeat
type heartbeater interface {
UpdateTTL(id, output, status string) error
}
// contextExec allows canceling a ScriptExecutor with a context.
type contextExec struct {
// pctx is the parent context. A subcontext will be created with Exec's
// timeout.
pctx context.Context
// exec to be wrapped in a context
exec interfaces.ScriptExecutor
}
func newContextExec(ctx context.Context, exec interfaces.ScriptExecutor) *contextExec {
return &contextExec{
pctx: ctx,
exec: exec,
}
}
type execResult struct {
buf []byte
code int
err error
}
// Exec a command until the timeout expires, the context is canceled, or the
// underlying Exec returns.
func (c *contextExec) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
resCh := make(chan execResult, 1)
// Don't trust the underlying implementation to obey timeout
ctx, cancel := context.WithTimeout(c.pctx, timeout)
defer cancel()
go func() {
output, code, err := c.exec.Exec(timeout, cmd, args)
select {
case resCh <- execResult{output, code, err}:
case <-ctx.Done():
}
}()
select {
case res := <-resCh:
return res.buf, res.code, res.err
case <-ctx.Done():
return nil, 0, ctx.Err()
}
}
// scriptHandle is returned by scriptCheck.run by cancelling a scriptCheck and
// waiting for it to shutdown.
type scriptHandle struct {
// cancel the script
cancel func()
exitCh chan struct{}
}
// wait returns a chan that's closed when the script exits
func (s *scriptHandle) wait() <-chan struct{} {
return s.exitCh
}
// scriptCheck runs script checks via a ScriptExecutor and updates the
// appropriate check's TTL when the script succeeds.
type scriptCheck struct {
allocID string
taskName string
id string
check *structs.ServiceCheck
exec interfaces.ScriptExecutor
agent heartbeater
// lastCheckOk is true if the last check was ok; otherwise false
lastCheckOk bool
logger log.Logger
shutdownCh <-chan struct{}
}
// newScriptCheck creates a new scriptCheck. run() should be called once the
// initial check is registered with Consul.
func newScriptCheck(allocID, taskName, checkID string, check *structs.ServiceCheck,
exec interfaces.ScriptExecutor, agent heartbeater, logger log.Logger,
shutdownCh <-chan struct{}) *scriptCheck {
logger = logger.ResetNamed("consul.checks").With("task", taskName, "alloc_id", allocID, "check", check.Name)
return &scriptCheck{
allocID: allocID,
taskName: taskName,
id: checkID,
check: check,
exec: exec,
agent: agent,
lastCheckOk: true, // start logging on first failure
logger: logger,
shutdownCh: shutdownCh,
}
}
// run this script check and return its cancel func. If the shutdownCh is
// closed the check will be run once more before exiting.
func (s *scriptCheck) run() *scriptHandle {
ctx, cancel := context.WithCancel(context.Background())
exitCh := make(chan struct{})
// Wrap the original ScriptExecutor in one that obeys context
// cancelation.
ctxExec := newContextExec(ctx, s.exec)
go func() {
defer close(exitCh)
timer := time.NewTimer(0)
defer timer.Stop()
for {
// Block until check is removed, Nomad is shutting
// down, or the check interval is up
select {
case <-ctx.Done():
// check has been removed
return
case <-s.shutdownCh:
// unblock but don't exit until after we heartbeat once more
case <-timer.C:
timer.Reset(s.check.Interval)
}
metrics.IncrCounter([]string{"client", "consul", "script_runs"}, 1)
// Execute check script with timeout
output, code, err := ctxExec.Exec(s.check.Timeout, s.check.Command, s.check.Args)
switch err {
case context.Canceled:
// check removed during execution; exit
return
case context.DeadlineExceeded:
metrics.IncrCounter([]string{"client", "consul", "script_timeouts"}, 1)
// If no error was returned, set one to make sure the task goes critical
if err == nil {
err = context.DeadlineExceeded
}
// Log deadline exceeded every time as it's a
// distinct issue from checks returning
// failures
s.logger.Warn("check timed out", "timeout", s.check.Timeout)
}
state := api.HealthCritical
switch code {
case 0:
state = api.HealthPassing
case 1:
state = api.HealthWarning
}
var outputMsg string
if err != nil {
state = api.HealthCritical
outputMsg = err.Error()
} else {
outputMsg = string(output)
}
// Actually heartbeat the check
err = s.agent.UpdateTTL(s.id, outputMsg, state)
select {
case <-ctx.Done():
// check has been removed; don't report errors
return
default:
}
if err != nil {
if s.lastCheckOk {
s.lastCheckOk = false
s.logger.Warn("updating check failed", "error", err)
} else {
s.logger.Debug("updating check still failing", "error", err)
}
} else if !s.lastCheckOk {
// Succeeded for the first time or after failing; log
s.lastCheckOk = true
s.logger.Info("updating check succeeded")
}
select {
case <-s.shutdownCh:
// We've been told to exit and just heartbeated so exit
return
default:
}
}
}()
return &scriptHandle{cancel: cancel, exitCh: exitCh}
}

View File

@@ -1,309 +0,0 @@
package consul
import (
"context"
"fmt"
"os"
"os/exec"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/nomad/structs"
)
func TestMain(m *testing.M) {
if !testtask.Run() {
os.Exit(m.Run())
}
}
// blockingScriptExec implements ScriptExec by running a subcommand that never
// exits.
type blockingScriptExec struct {
// pctx is canceled *only* for test cleanup. Just like real
// ScriptExecutors its Exec method cannot be canceled directly -- only
// with a timeout.
pctx context.Context
// running is ticked before blocking to allow synchronizing operations
running chan struct{}
// set to 1 with atomics if Exec is called and has exited
exited int32
}
// newBlockingScriptExec returns a ScriptExecutor that blocks Exec() until the
// caller recvs on the b.running chan. It also returns a CancelFunc for test
// cleanup only. The runtime cannot cancel ScriptExecutors before their timeout
// expires.
func newBlockingScriptExec() (*blockingScriptExec, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
exec := &blockingScriptExec{
pctx: ctx,
running: make(chan struct{}),
}
return exec, cancel
}
func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]byte, int, error) {
b.running <- struct{}{}
ctx, cancel := context.WithTimeout(b.pctx, dur)
defer cancel()
cmd := exec.CommandContext(ctx, testtask.Path(), "sleep", "9000h")
testtask.SetCmdEnv(cmd)
err := cmd.Run()
code := 0
if exitErr, ok := err.(*exec.ExitError); ok {
if !exitErr.Success() {
code = 1
}
}
atomic.StoreInt32(&b.exited, 1)
return []byte{}, code, err
}
// TestConsulScript_Exec_Cancel asserts cancelling a script check shortcircuits
// any running scripts.
func TestConsulScript_Exec_Cancel(t *testing.T) {
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: time.Hour,
}
exec, cancel := newBlockingScriptExec()
defer cancel()
// pass nil for heartbeater as it shouldn't be called
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.HCLogger(t), nil)
handle := check.run()
// wait until Exec is called
<-exec.running
// cancel now that we're blocked in exec
handle.cancel()
select {
case <-handle.wait():
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
// The underlying ScriptExecutor (newBlockScriptExec) *cannot* be
// canceled. Only a wrapper around it obeys the context cancelation.
if atomic.LoadInt32(&exec.exited) == 1 {
t.Errorf("expected script executor to still be running after timeout")
}
}
type execStatus struct {
checkID string
output string
status string
}
// fakeHeartbeater implements the heartbeater interface to allow mocking out
// Consul in script executor tests.
type fakeHeartbeater struct {
updates chan execStatus
}
func (f *fakeHeartbeater) UpdateTTL(checkID, output, status string) error {
f.updates <- execStatus{checkID: checkID, output: output, status: status}
return nil
}
func newFakeHeartbeater() *fakeHeartbeater {
return &fakeHeartbeater{updates: make(chan execStatus)}
}
// TestConsulScript_Exec_TimeoutBasic asserts a script will be killed when the
// timeout is reached.
func TestConsulScript_Exec_TimeoutBasic(t *testing.T) {
t.Parallel()
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: time.Second,
}
exec, cancel := newBlockingScriptExec()
defer cancel()
hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
<-exec.running
// Check for UpdateTTL call
select {
case update := <-hb.updates:
if update.status != api.HealthCritical {
t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
// The underlying ScriptExecutor (newBlockScriptExec) *cannot* be
// canceled. Only a wrapper around it obeys the context cancelation.
if atomic.LoadInt32(&exec.exited) == 1 {
t.Errorf("expected script executor to still be running after timeout")
}
// Cancel and watch for exit
handle.cancel()
select {
case <-handle.wait():
// ok!
case update := <-hb.updates:
t.Errorf("unexpected UpdateTTL call on exit with status=%q", update)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
}
// sleeperExec sleeps for 100ms but returns successfully to allow testing timeout conditions
type sleeperExec struct{}
func (sleeperExec) Exec(time.Duration, string, []string) ([]byte, int, error) {
time.Sleep(100 * time.Millisecond)
return []byte{}, 0, nil
}
// TestConsulScript_Exec_TimeoutCritical asserts a script will be killed when
// the timeout is reached and always set a critical status regardless of what
// Exec returns.
func TestConsulScript_Exec_TimeoutCritical(t *testing.T) {
t.Parallel()
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: time.Nanosecond,
}
hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.HCLogger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
// Check for UpdateTTL call
select {
case update := <-hb.updates:
if update.status != api.HealthCritical {
t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update)
}
if update.output != context.DeadlineExceeded.Error() {
t.Errorf("expected output=%q but found: %q", context.DeadlineExceeded.Error(), update.output)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to timeout")
}
}
// simpleExec is a fake ScriptExecutor that returns whatever is specified.
type simpleExec struct {
code int
err error
}
func (s simpleExec) Exec(time.Duration, string, []string) ([]byte, int, error) {
return []byte(fmt.Sprintf("code=%d err=%v", s.code, s.err)), s.code, s.err
}
// newSimpleExec creates a new ScriptExecutor that returns the given code and err.
func newSimpleExec(code int, err error) simpleExec {
return simpleExec{code: code, err: err}
}
// TestConsulScript_Exec_Shutdown asserts a script will be executed once more
// when told to shutdown.
func TestConsulScript_Exec_Shutdown(t *testing.T) {
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: 3 * time.Second,
}
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(0, nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), shutdown)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
// Tell scriptCheck to exit
close(shutdown)
select {
case update := <-hb.updates:
if update.status != api.HealthPassing {
t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
select {
case <-handle.wait():
// ok!
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
}
func TestConsulScript_Exec_Codes(t *testing.T) {
run := func(code int, err error, expected string) func(t *testing.T) {
return func(t *testing.T) {
t.Parallel()
serviceCheck := structs.ServiceCheck{
Name: "test",
Interval: time.Hour,
Timeout: 3 * time.Second,
}
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(code, err)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), shutdown)
handle := check.run()
defer handle.cancel()
select {
case update := <-hb.updates:
if update.status != expected {
t.Errorf("expected %q but received %q", expected, update)
}
// assert output is being reported
expectedOutput := fmt.Sprintf("code=%d err=%v", code, err)
if err != nil {
expectedOutput = err.Error()
}
if update.output != expectedOutput {
t.Errorf("expected output=%q but found: %q", expectedOutput, update.output)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exec")
}
}
}
// Test exit codes with errors
t.Run("Passing", run(0, nil, api.HealthPassing))
t.Run("Warning", run(1, nil, api.HealthWarning))
t.Run("Critical-2", run(2, nil, api.HealthCritical))
t.Run("Critical-9000", run(9000, nil, api.HealthCritical))
// Errors should always cause Critical status
err := fmt.Errorf("test error")
t.Run("Error-0", run(0, err, api.HealthCritical))
t.Run("Error-1", run(1, err, api.HealthCritical))
t.Run("Error-2", run(2, err, api.HealthCritical))
t.Run("Error-9000", run(9000, err, api.HealthCritical))
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -46,40 +45,9 @@ func testTask() *TaskServices {
},
},
},
DriverExec: newMockExec(),
}
}
// mockExec implements the ScriptExecutor interface and will use an alternate
// implementation t.ExecFunc if non-nil.
type mockExec struct {
// Ticked whenever a script is called
execs chan int
// If non-nil will be called by script checks
ExecFunc func(ctx context.Context, cmd string, args []string) ([]byte, int, error)
}
func newMockExec() *mockExec {
return &mockExec{
execs: make(chan int, 100),
}
}
func (m *mockExec) Exec(dur time.Duration, cmd string, args []string) ([]byte, int, error) {
select {
case m.execs <- 1:
default:
}
if m.ExecFunc == nil {
// Default impl is just "ok"
return []byte("ok"), 0, nil
}
ctx, cancel := context.WithTimeout(context.Background(), dur)
defer cancel()
return m.ExecFunc(ctx, cmd, args)
}
// restartRecorder is a minimal TaskRestarter implementation that simply
// counts how many restarts were triggered.
type restartRecorder struct {
@@ -96,7 +64,6 @@ type testFakeCtx struct {
ServiceClient *ServiceClient
FakeConsul *MockAgent
Task *TaskServices
MockExec *mockExec
}
var errNoOps = fmt.Errorf("testing error: no pending operations")
@@ -131,7 +98,6 @@ func setupFake(t *testing.T) *testFakeCtx {
ServiceClient: sc,
FakeConsul: fc,
Task: tt,
MockExec: tt.DriverExec.(*mockExec),
}
}
@@ -226,13 +192,6 @@ func TestConsul_ChangePorts(t *testing.T) {
require.Equal(fmt.Sprintf(":%d", xPort), v.TCP)
case "c2":
origScriptKey = k
select {
case <-ctx.MockExec.execs:
// Here we validate there is nothing left on the channel
require.Equal(0, len(ctx.MockExec.execs))
case <-time.After(3 * time.Second):
t.Fatalf("script not called in time")
}
case "c3":
origHTTPKey = k
require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP)
@@ -678,291 +637,104 @@ func TestConsul_RegServices(t *testing.T) {
func TestConsul_ShutdownOK(t *testing.T) {
require := require.New(t)
ctx := setupFake(t)
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheck",
Type: "script",
Command: "true",
// Make check block until shutdown
Interval: 9000 * time.Hour,
Timeout: 10 * time.Second,
InitialStatus: "warning",
},
}
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// register the Nomad agent service and check
agentServices := []*structs.Service{
{
Name: "http",
Tags: []string{"nomad"},
PortLabel: "localhost:2345",
Checks: []*structs.ServiceCheck{
{
Name: "nomad-tcp",
Type: "tcp",
Interval: 9000 * time.Hour, // make check block
Timeout: 10 * time.Second,
InitialStatus: "warning",
},
},
},
}
if err := ctx.ServiceClient.RegisterAgent("client", agentServices); err != nil {
t.Fatalf("unexpected error registering agent: %v", err)
}
require.NoError(ctx.ServiceClient.RegisterAgent("client", agentServices))
require.Eventually(ctx.ServiceClient.hasSeen, time.Second, 10*time.Millisecond)
testutil.WaitForResult(func() (bool, error) {
return ctx.ServiceClient.hasSeen(), fmt.Errorf("error contacting Consul")
}, func(err error) {
t.Fatalf("err: %v", err)
})
// assert successful registration
require.Len(ctx.FakeConsul.services, 1, "expected agent service to be registered")
require.Len(ctx.FakeConsul.checks, 1, "expected agent check to be registered")
require.Contains(ctx.FakeConsul.services,
makeAgentServiceID("client", agentServices[0]))
// Shutdown should block until scripts finish
if err := ctx.ServiceClient.Shutdown(); err != nil {
t.Errorf("unexpected error shutting down client: %v", err)
}
// UpdateTTL should have been called once for the script check and once
// for shutdown
if n := len(ctx.FakeConsul.checkTTLs); n != 1 {
t.Fatalf("expected 1 checkTTL entry but found: %d", n)
}
for _, v := range ctx.FakeConsul.checkTTLs {
require.Equalf(2, v, "expected 2 updates but found %d", v)
}
for _, v := range ctx.FakeConsul.checks {
if v.Status != "passing" {
t.Fatalf("expected check to be passing but found %q", v.Status)
}
}
}
// TestConsul_ShutdownSlow tests the slow but ok path for the shutdown logic in
// ServiceClient.
func TestConsul_ShutdownSlow(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheck",
Type: "script",
Command: "true",
// Make check block until shutdown
Interval: 9000 * time.Hour,
Timeout: 5 * time.Second,
InitialStatus: "warning",
},
}
// Make Exec slow, but not too slow
waiter := make(chan struct{})
ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
select {
case <-waiter:
default:
close(waiter)
}
time.Sleep(time.Second)
return []byte{}, 0, nil
}
// Make shutdown wait time just a bit longer than ctx.Exec takes
ctx.ServiceClient.shutdownWait = 3 * time.Second
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// wait for Exec to get called before shutting down
<-waiter
// Shutdown should block until all enqueued operations finish.
preShutdown := time.Now()
if err := ctx.ServiceClient.Shutdown(); err != nil {
t.Errorf("unexpected error shutting down client: %v", err)
}
// Shutdown time should have taken: ~1s <= shutdown <= 3s
// actual timing might be less than 1s, to account for shutdown invocation overhead
shutdownTime := time.Now().Sub(preShutdown)
if shutdownTime < 900*time.Millisecond || shutdownTime > ctx.ServiceClient.shutdownWait {
t.Errorf("expected shutdown to take >1s and <%s but took: %s", ctx.ServiceClient.shutdownWait, shutdownTime)
}
// UpdateTTL should have been called once for the script check
if n := len(ctx.FakeConsul.checkTTLs); n != 1 {
t.Fatalf("expected 1 checkTTL entry but found: %d", n)
}
for _, v := range ctx.FakeConsul.checkTTLs {
if v != 1 {
t.Fatalf("expected script check to be updated once but found %d", v)
}
}
for _, v := range ctx.FakeConsul.checks {
if v.Status != "passing" {
t.Fatalf("expected check to be passing but found %q", v.Status)
}
}
// Shutdown() should block until Nomad agent service/check is deregistered
require.NoError(ctx.ServiceClient.Shutdown())
require.Len(ctx.FakeConsul.services, 0, "expected agent service to be deregistered")
require.Len(ctx.FakeConsul.checks, 0, "expected agent check to be deregistered")
}
// TestConsul_ShutdownBlocked tests the blocked past deadline path for the
// shutdown logic in ServiceClient.
func TestConsul_ShutdownBlocked(t *testing.T) {
require := require.New(t)
t.Parallel()
ctx := setupFake(t)
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheck",
Type: "script",
Command: "true",
// Make check block until shutdown
Interval: 9000 * time.Hour,
Timeout: 9000 * time.Hour,
InitialStatus: "warning",
},
}
block := make(chan struct{})
defer close(block) // cleanup after test
// Make Exec block forever
waiter := make(chan struct{})
ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
close(waiter)
<-block
return []byte{}, 0, nil
}
// Use a short shutdown deadline since we're intentionally blocking forever
// can be short because we're intentionally blocking, but needs to
// be longer than the time we'll block Consul so we can be sure
// we're not delayed either.
ctx.ServiceClient.shutdownWait = time.Second
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
// register the Nomad agent service and check
agentServices := []*structs.Service{
{
Name: "http",
Tags: []string{"nomad"},
PortLabel: "localhost:2345",
Checks: []*structs.ServiceCheck{
{
Name: "nomad-tcp",
Type: "tcp",
Interval: 9000 * time.Hour, // make check block
Timeout: 10 * time.Second,
InitialStatus: "warning",
},
},
},
}
require.NoError(ctx.ServiceClient.RegisterAgent("client", agentServices))
require.Eventually(ctx.ServiceClient.hasSeen, time.Second, 10*time.Millisecond)
require.Len(ctx.FakeConsul.services, 1, "expected agent service to be registered")
require.Len(ctx.FakeConsul.checks, 1, "expected agent check to be registered")
// Wait for exec to be called
<-waiter
// prevent normal shutdown by blocking Consul. the shutdown should wait
// until agent deregistration has finished
waiter := make(chan struct{})
result := make(chan error)
go func() {
ctx.FakeConsul.mu.Lock()
close(waiter)
result <- ctx.ServiceClient.Shutdown()
}()
<-waiter // wait for lock to be hit
// Shutdown should block until all enqueued operations finish.
preShutdown := time.Now()
err := ctx.ServiceClient.Shutdown()
if err == nil {
t.Errorf("expected a timed out error from shutdown")
}
// Shutdown time should have taken shutdownWait; to avoid timing
// related errors simply test for wait <= shutdown <= wait+3s
shutdownTime := time.Now().Sub(preShutdown)
maxWait := ctx.ServiceClient.shutdownWait + (3 * time.Second)
if shutdownTime < ctx.ServiceClient.shutdownWait || shutdownTime > maxWait {
t.Errorf("expected shutdown to take >%s and <%s but took: %s", ctx.ServiceClient.shutdownWait, maxWait, shutdownTime)
}
// UpdateTTL should not have been called for the script check
if n := len(ctx.FakeConsul.checkTTLs); n != 0 {
t.Fatalf("expected 0 checkTTL entry but found: %d", n)
}
for _, v := range ctx.FakeConsul.checks {
if expected := "warning"; v.Status != expected {
t.Fatalf("expected check to be %q but found %q", expected, v.Status)
}
}
}
// TestConsul_RemoveScript assert removing a script check removes all objects
// related to that check.
func TestConsul_CancelScript(t *testing.T) {
ctx := setupFake(t)
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheckDel",
Type: "script",
Interval: 9000 * time.Hour,
Timeout: 9000 * time.Hour,
},
{
Name: "scriptcheckKeep",
Type: "script",
Interval: 9000 * time.Hour,
Timeout: 9000 * time.Hour,
},
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if len(ctx.FakeConsul.checks) != 2 {
t.Errorf("expected 2 checks but found %d", len(ctx.FakeConsul.checks))
}
if len(ctx.ServiceClient.scripts) != 2 && len(ctx.ServiceClient.runningScripts) != 2 {
t.Errorf("expected 2 running script but found scripts=%d runningScripts=%d",
len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts))
}
for i := 0; i < 2; i++ {
select {
case <-ctx.MockExec.execs:
// Script ran as expected!
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to run")
}
}
// Remove a check and update the task
origTask := ctx.Task.Copy()
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheckKeep",
Type: "script",
Interval: 9000 * time.Hour,
Timeout: 9000 * time.Hour,
},
}
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if len(ctx.FakeConsul.checks) != 1 {
t.Errorf("expected 1 check but found %d", len(ctx.FakeConsul.checks))
}
if len(ctx.ServiceClient.scripts) != 1 && len(ctx.ServiceClient.runningScripts) != 1 {
t.Errorf("expected 1 running script but found scripts=%d runningScripts=%d",
len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts))
}
// Make sure exec wasn't called again
select {
case <-ctx.MockExec.execs:
t.Errorf("unexpected execution of script; was goroutine not cancelled?")
case <-time.After(100 * time.Millisecond):
// No unexpected script execs
}
// Don't leak goroutines
for _, scriptHandle := range ctx.ServiceClient.runningScripts {
scriptHandle.cancel()
case <-time.After(200 * time.Millisecond):
ctx.FakeConsul.mu.Unlock()
require.NoError(<-result)
case <-result:
t.Fatal("should not have received result until Consul unblocked")
}
shutdownTime := time.Now().Sub(preShutdown).Seconds()
require.Less(shutdownTime, time.Second.Seconds(),
"expected shutdown to take >200ms and <1s")
require.Greater(shutdownTime, 200*time.Millisecond.Seconds(),
"expected shutdown to take >200ms and <1s")
require.Len(ctx.FakeConsul.services, 0,
"expected agent service to be deregistered")
require.Len(ctx.FakeConsul.checks, 0,
"expected agent check to be deregistered")
}
// TestConsul_DriverNetwork_AutoUse asserts that if a driver network has
@@ -1771,7 +1543,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
require.Len(ctx.ServiceClient.checks, 3)
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
delete(ctx.ServiceClient.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.Len(ctx.ServiceClient.services, 2)
require.Len(ctx.ServiceClient.checks, 2)
@@ -1788,9 +1560,9 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
}
// TestConsul_ServiceDeregistration_InProbation asserts that during initialization
@@ -1880,7 +1652,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
require.Len(ctx.ServiceClient.checks, 3)
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
delete(ctx.ServiceClient.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.Len(ctx.ServiceClient.services, 2)
require.Len(ctx.ServiceClient.checks, 2)
@@ -1897,9 +1669,9 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
require.Contains(ctx.FakeConsul.services, outofbandTaskServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
// after probation, outofband services and checks are removed
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
@@ -1912,8 +1684,8 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
}