Files
nomad/client/allochealth/tracker_test.go
Tim Gross e168548341 provide allocrunner hooks with prebuilt taskenv and fix mutation bugs (#25373)
Some of our allocrunner hooks require a task environment for interpolating values based on the node or allocation. But several of the hooks accept an already-built environment or builder and then keep that in memory. Both of these retain a copy of all the node attributes and allocation metadata, which balloons memory usage until the allocation is GC'd.

While we'd like to look into ways to avoid keeping the allocrunner around entirely (see #25372), for now we can significantly reduce memory usage by creating the task environment on-demand when calling allocrunner methods, rather than persisting it in the allocrunner hooks.

In doing so, we uncover two other bugs:
* The WID manager, the group service hook, and the checks hook have to interpolate services for specific tasks. They mutated a taskenv builder to do so, but each time they mutate the builder, they write to the same environment map. When a group has multiple tasks, it's possible for one task to set an environment variable that would then be interpolated in the service definition for another task if that task did not have that environment variable. Only the service definition interpolation is impacted. This does not leak env vars across running tasks, as each taskrunner has its own builder.

  To fix this, we move the `UpdateTask` method off the builder and onto the taskenv as the `WithTask` method. This makes a shallow copy of the taskenv with a deep clone of the environment map used for interpolation, and then overwrites the environment from the task.

* The checks hook interpolates Nomad native service checks only on `Prerun` and not on `Update`. This could cause unexpected deregistration and registration of checks during in-place updates. To fix this, we make sure we interpolate in the `Update` method.

I also bumped into an incorrectly implemented interface in the CSI hook. I've pulled that and some better guardrails out to https://github.com/hashicorp/nomad/pull/25472.

Fixes: https://github.com/hashicorp/nomad/issues/25269
Fixes: https://hashicorp.atlassian.net/browse/NET-12310
Ref: https://github.com/hashicorp/nomad/issues/25372
2025-03-24 12:05:04 -04:00

1514 lines
42 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package allochealth
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
regmock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"
)
func TestTracker_ConsulChecks_Interpolation(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
// Generate services at multiple levels that reference runtime variables.
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
tg.Services = []*structs.Service{
{
Name: "group-${TASKGROUP}-service-${NOMAD_DC}",
PortLabel: "http",
Checks: []*structs.ServiceCheck{
{
Type: structs.ServiceCheckTCP,
Interval: 30 * time.Second,
Timeout: 5 * time.Second,
},
{
Name: "group-${NOMAD_GROUP_NAME}-check",
Type: structs.ServiceCheckTCP,
Interval: 30 * time.Second,
Timeout: 5 * time.Second,
},
},
},
}
tg.Tasks[0].Name = "server"
tg.Tasks[0].Services = []*structs.Service{
{
Name: "task-${TASK}-service-${NOMAD_REGION}",
TaskName: "server",
PortLabel: "http",
Checks: []*structs.ServiceCheck{
{
Type: structs.ServiceCheckTCP,
Interval: 30 * time.Second,
Timeout: 5 * time.Second,
},
{
Name: "task-${NOMAD_TASK_NAME}-check-${NOMAD_REGION}",
Type: structs.ServiceCheckTCP,
Interval: 30 * time.Second,
Timeout: 5 * time.Second,
},
},
},
}
// Add another task to make sure each task gets its own environment.
tg.Tasks = append(tg.Tasks, tg.Tasks[0].Copy())
tg.Tasks[1].Name = "proxy"
tg.Tasks[1].Services[0].TaskName = "proxy"
// Canonicalize allocation to re-interpolate some of the variables.
alloc.Canonicalize()
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
tg.Tasks[0].Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
tg.Tasks[1].Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
"group-web": {
Services: map[string]*serviceregistration.ServiceRegistration{
"group-web-service-dc1": {
Service: &consulapi.AgentService{
ID: uuid.Generate(),
Service: "group-web-service-dc1",
},
Checks: []*consulapi.AgentCheck{
{
Name: `service: "group-web-service-dc1" check`,
Status: consulapi.HealthPassing,
},
{
Name: "group-web-check",
Status: consulapi.HealthPassing,
},
},
},
},
},
"server": {
Services: map[string]*serviceregistration.ServiceRegistration{
"task-server-service-global": {
Service: &consulapi.AgentService{
ID: uuid.Generate(),
Service: "task-server-service-global",
},
Checks: []*consulapi.AgentCheck{
{
Name: `service: "task-server-service-global" check`,
Status: consulapi.HealthPassing,
},
{
Name: "task-server-check-global",
Status: consulapi.HealthPassing,
},
},
},
},
},
"proxy": {
Services: map[string]*serviceregistration.ServiceRegistration{
"task-proxy-service-global": {
Service: &consulapi.AgentService{
ID: uuid.Generate(),
Service: "task-proxy-service-global",
},
Checks: []*consulapi.AgentCheck{
{
Name: `service: "task-proxy-service-global" check`,
Status: consulapi.HealthPassing,
},
{
Name: "task-proxy-check-global",
Status: consulapi.HealthPassing,
},
},
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
// Inject Consul response.
consul := regmock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
return &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}, nil
}
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
select {
case <-time.After(4 * checkInterval):
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
}
func TestTracker_ConsulChecks_Healthy(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response
check := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{check},
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
// Don't reply on the first call
var called uint64
consul := regmock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}
return reg, nil
}
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
select {
case <-time.After(4 * checkInterval):
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
}
func TestTracker_NomadChecks_Healthy(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
alloc.Job.TaskGroups[0].Tasks[0].Services[0].Provider = "nomad"
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
alloc.Job.TaskGroups[0].Tasks[0].Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
err := checks.Set(alloc.ID, &structs.CheckQueryResult{
ID: "abc123",
Mode: "healthiness",
Status: "pending",
Output: "nomad: waiting to run",
Timestamp: time.Now().Unix(),
Group: alloc.TaskGroup,
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
})
must.NoError(t, err)
consul := regmock.NewServiceRegistrationHandler(logger)
checkInterval := 10 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
go func() {
// wait a bit then update the check to passing
time.Sleep(15 * time.Millisecond)
must.NoError(t, checks.Set(alloc.ID, &structs.CheckQueryResult{
ID: "abc123",
Mode: "healthiness",
Status: "success",
Output: "nomad: http ok",
Timestamp: time.Now().Unix(),
Group: alloc.TaskGroup,
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
}))
}()
select {
case <-time.After(4 * checkInterval):
t.Fatalf("timed out while waiting for success")
case healthy := <-tracker.HealthyCh():
must.True(t, healthy)
}
}
func TestTracker_NomadChecks_Unhealthy(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
alloc.Job.TaskGroups[0].Tasks[0].Services[0].Provider = "nomad"
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
alloc.Job.TaskGroups[0].Tasks[0].Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
err := checks.Set(alloc.ID, &structs.CheckQueryResult{
ID: "abc123",
Mode: "healthiness",
Status: "pending", // start out pending
Output: "nomad: waiting to run",
Timestamp: time.Now().Unix(),
Group: alloc.TaskGroup,
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
})
must.NoError(t, err)
consul := regmock.NewServiceRegistrationHandler(logger)
checkInterval := 10 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
go func() {
// wait a bit then update the check to failing
time.Sleep(15 * time.Millisecond)
must.NoError(t, checks.Set(alloc.ID, &structs.CheckQueryResult{
ID: "abc123",
Mode: "healthiness",
Status: "failing",
Output: "connection refused",
Timestamp: time.Now().Unix(),
Group: alloc.TaskGroup,
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
}))
}()
// make sure we are always unhealthy across 4 check intervals
for i := 0; i < 4; i++ {
<-time.After(checkInterval)
select {
case <-tracker.HealthyCh():
t.Fatalf("should not receive on healthy chan with failing check")
default:
}
}
}
func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) {
ci.Parallel(t)
alloc := mock.LifecycleAllocWithPoststopDeploy()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
"web": {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
"post": {
State: structs.TaskStatePending,
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
consul := regmock.NewServiceRegistrationHandler(logger)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
select {
case <-time.After(4 * checkInterval):
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
}
func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) {
ci.Parallel(t)
alloc := mock.LifecycleAllocWithPoststartDeploy()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = time.Millisecond * 1
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
"web": {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
"post": {
State: structs.TaskStateDead,
StartedAt: time.Now(),
FinishedAt: time.Now().Add(alloc.Job.TaskGroups[0].Migrate.MinHealthyTime / 2),
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
consul := regmock.NewServiceRegistrationHandler(logger)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
select {
case <-time.After(alloc.Job.TaskGroups[0].Migrate.MinHealthyTime * 2):
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
}
func TestTracker_ConsulChecks_Unhealthy(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
newCheck := task.Services[0].Checks[0].Copy()
newCheck.Name = "failing-check"
task.Services[0].Checks = append(task.Services[0].Checks, newCheck)
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response
checkHealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
checksUnhealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[1].Name,
Status: consulapi.HealthCritical,
}
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy},
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
// Don't reply on the first call
var called uint64
consul := regmock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}
return reg, nil
}
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
testutil.WaitForResult(func() (bool, error) {
lookup := atomic.LoadUint64(&called)
return lookup < 4, fmt.Errorf("wait to get more task registration lookups: %v", lookup)
}, func(err error) {
require.NoError(t, err)
})
tracker.lock.Lock()
require.False(t, tracker.checksHealthy)
tracker.lock.Unlock()
select {
case v := <-tracker.HealthyCh():
require.Failf(t, "expected no health value", " got %v", v)
default:
// good
}
}
func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1
task := alloc.Job.TaskGroups[0].Tasks[0]
newCheck := task.Services[0].Checks[0].Copy()
newCheck.Name = "my-check"
task.Services[0].Checks = []*structs.ServiceCheck{newCheck}
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response - starts with a healthy check and transitions to unhealthy
// during the minimum healthy time window
checkHealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
checkUnhealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthCritical,
}
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "s1",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{checkHealthy}, // initially healthy
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consul := regmock.NewServiceRegistrationHandler(logger)
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
minHealthyTime := 2 * time.Second
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, minHealthyTime, true)
tracker.checkLookupInterval = checkInterval
assertChecksHealth := func(exp bool) {
tracker.lock.Lock()
must.Eq(t, exp, tracker.checksHealthy, must.Sprint("tracker checks health in unexpected state"))
tracker.lock.Unlock()
}
// start the clock so we can degrade check status during minimum healthy time
startTime := time.Now()
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
// after 1 second, start failing the check
if time.Since(startTime) > 1*time.Second {
taskRegs[task.Name].Services[task.Services[0].Name].Checks = []*consulapi.AgentCheck{checkUnhealthy}
}
// assert tracker is observing unhealthy - we never cross minimum health
// time with healthy checks in this test case
assertChecksHealth(false)
reg := &serviceregistration.AllocRegistration{Tasks: taskRegs}
return reg, nil
}
// start the tracker and wait for evaluations to happen
tracker.Start()
time.Sleep(2 * time.Second)
// tracker should be observing unhealthy check
assertChecksHealth(false)
select {
case <-tracker.HealthyCh():
must.Unreachable(t, must.Sprint("did not expect unblock of healthy chan"))
default:
// ok
}
}
func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
newCheck := task.Services[0].Checks[0].Copy()
newCheck.Name = "my-check"
task.Services[0].Checks = []*structs.ServiceCheck{newCheck}
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response - start with check not yet registered
checkHealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "s1",
Service: task.Services[0].Name,
},
Checks: nil, // initially missing
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consul := regmock.NewServiceRegistrationHandler(logger)
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
assertChecksHealth := func(exp bool) {
tracker.lock.Lock()
must.Eq(t, exp, tracker.checksHealthy, must.Sprint("tracker checks health in unexpected state"))
tracker.lock.Unlock()
}
var hits atomic.Int32
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
// after 10 queries, insert the check
hits.Add(1)
if count := hits.Load(); count > 10 {
taskRegs[task.Name].Services[task.Services[0].Name].Checks = []*consulapi.AgentCheck{checkHealthy}
} else {
// assert tracker is observing unhealthy (missing) checks
assertChecksHealth(false)
}
reg := &serviceregistration.AllocRegistration{Tasks: taskRegs}
return reg, nil
}
// start the tracker and wait for evaluations to happen
tracker.Start()
must.Wait(t, wait.InitialSuccess(
wait.BoolFunc(func() bool { return hits.Load() > 10 }),
wait.Gap(10*time.Millisecond),
wait.Timeout(1*time.Second),
))
// tracker should be observing healthy check now
assertChecksHealth(true)
select {
case v := <-tracker.HealthyCh():
must.True(t, v, must.Sprint("expected value from tracker chan to be healthy"))
default:
must.Unreachable(t, must.Sprint("expected value from tracker chan"))
}
}
func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
logger := testlog.HCLogger(t)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, nil, env, nil, nil, time.Millisecond, true)
assertNoHealth := func() {
require.NoError(t, tracker.ctx.Err())
select {
case v := <-tracker.HealthyCh():
require.Failf(t, "unexpected healthy event", "got %v", v)
default:
}
}
// first set task health without checks
tracker.setTaskHealth(true, false)
assertNoHealth()
// now fail task health again before checks are successful
tracker.setTaskHealth(false, false)
assertNoHealth()
// now pass health checks - do not propagate health yet
tracker.setCheckHealth(true)
assertNoHealth()
// set tasks to healthy - don't propagate health yet, wait for the next check
tracker.setTaskHealth(true, false)
assertNoHealth()
// set checks to true, now propagate health status
tracker.setCheckHealth(true)
require.Error(t, tracker.ctx.Err())
select {
case v := <-tracker.HealthyCh():
require.True(t, v)
default:
require.Fail(t, "expected a health status")
}
}
// TestTracker_Checks_Healthy_Before_TaskHealth asserts that we mark an alloc
// healthy, if the checks pass before task health pass
func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
// new task starting unhealthy, without services
task2 := task.Copy()
task2.Name = task2.Name + "2"
task2.Services = nil
alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
task2.Name: {
State: structs.TaskStatePending,
},
}
// Make Consul response
check := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{check},
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
// Don't reply on the first call
var called uint64
consul := regmock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}
return reg, nil
}
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
// assert that we don't get marked healthy
select {
case <-time.After(4 * checkInterval):
// still unhealthy, good
case h := <-tracker.HealthyCh():
require.Fail(t, "unexpected health event", h)
}
helper.WithLock(&tracker.lock, func() {
require.False(t, tracker.tasksHealthy)
require.False(t, tracker.checksHealthy)
})
// now set task to healthy
runningAlloc := alloc.Copy()
runningAlloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
task2.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
err := b.Send(runningAlloc)
require.NoError(t, err)
// eventually, it is marked as healthy
select {
case <-time.After(4 * checkInterval):
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
}
func TestTracker_ConsulChecks_OnUpdate(t *testing.T) {
ci.Parallel(t)
cases := []struct {
desc string
checkOnUpdate string
consulResp string
expectedPass bool
}{
{
desc: "check require_healthy consul healthy",
checkOnUpdate: structs.OnUpdateRequireHealthy,
consulResp: consulapi.HealthPassing,
expectedPass: true,
},
{
desc: "check on_update ignore_warning, consul warn",
checkOnUpdate: structs.OnUpdateIgnoreWarn,
consulResp: consulapi.HealthWarning,
expectedPass: true,
},
{
desc: "check on_update ignore_warning, consul critical",
checkOnUpdate: structs.OnUpdateIgnoreWarn,
consulResp: consulapi.HealthCritical,
expectedPass: false,
},
{
desc: "check on_update ignore_warning, consul healthy",
checkOnUpdate: structs.OnUpdateIgnoreWarn,
consulResp: consulapi.HealthPassing,
expectedPass: true,
},
{
desc: "check on_update ignore, consul critical",
checkOnUpdate: structs.OnUpdateIgnore,
consulResp: consulapi.HealthCritical,
expectedPass: true,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response
check := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: tc.consulResp,
}
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
task.Name: {
Services: map[string]*serviceregistration.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{check},
CheckOnUpdate: map[string]string{
check.CheckID: tc.checkOnUpdate,
},
},
},
},
}
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
// Don't reply on the first call
var called uint64
consul := regmock.NewServiceRegistrationHandler(logger)
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}
reg := &serviceregistration.AllocRegistration{
Tasks: taskRegs,
}
return reg, nil
}
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
select {
case <-time.After(4 * checkInterval):
if !tc.expectedPass {
// tracker should still be running
require.Nil(t, tracker.ctx.Err())
return
}
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
// For healthy checks, the tracker should stop watching
select {
case <-tracker.ctx.Done():
// Ok, tracker should exit after reporting healthy
default:
require.Fail(t, "expected tracker to exit after reporting healthy")
}
})
}
}
func TestTracker_NomadChecks_OnUpdate(t *testing.T) {
ci.Parallel(t)
cases := []struct {
name string
checkMode structs.CheckMode
checkResult structs.CheckStatus
expectedPass bool
}{
{
name: "mode is healthiness and check is healthy",
checkMode: structs.Healthiness,
checkResult: structs.CheckSuccess,
expectedPass: true,
},
{
name: "mode is healthiness and check is unhealthy",
checkMode: structs.Healthiness,
checkResult: structs.CheckFailure,
expectedPass: false,
},
{
name: "mode is readiness and check is healthy",
checkMode: structs.Readiness,
checkResult: structs.CheckSuccess,
expectedPass: true,
},
{
name: "mode is readiness and check is healthy",
checkMode: structs.Readiness,
checkResult: structs.CheckFailure,
expectedPass: true,
},
}
for i := range cases {
tc := cases[i]
t.Run(tc.name, func(t *testing.T) {
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
alloc.Job.TaskGroups[0].Tasks[0].Services[0].Provider = "nomad"
logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
alloc.Job.TaskGroups[0].Tasks[0].Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Set a check that is pending
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
err := checks.Set(alloc.ID, &structs.CheckQueryResult{
ID: "abc123",
Mode: tc.checkMode,
Status: structs.CheckPending,
Output: "nomad: waiting to run",
Timestamp: time.Now().Unix(),
Group: alloc.TaskGroup,
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
})
must.NoError(t, err)
go func() {
// wait a bit then update the check to passing
time.Sleep(15 * time.Millisecond)
must.NoError(t, checks.Set(alloc.ID, &structs.CheckQueryResult{
ID: "abc123",
Mode: tc.checkMode,
Status: tc.checkResult,
Output: "some output",
Timestamp: time.Now().Unix(),
Group: alloc.TaskGroup,
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
}))
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consul := regmock.NewServiceRegistrationHandler(logger)
minHealthyTime := 1 * time.Millisecond
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, minHealthyTime, true)
tracker.checkLookupInterval = 10 * time.Millisecond
tracker.Start()
select {
case <-time.After(8 * tracker.checkLookupInterval):
if !tc.expectedPass {
// tracker should still be running
must.NoError(t, tracker.ctx.Err())
return
}
t.Fatal("timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
// For healthy checks, the tracker should stop watching
select {
case <-tracker.ctx.Done():
// Ok, tracker should exit after reporting healthy
default:
t.Fatal("expected tracker to exit after reporting healthy")
}
})
}
}
func TestTracker_evaluateConsulChecks(t *testing.T) {
ci.Parallel(t)
cases := []struct {
name string
tg *structs.TaskGroup
registrations *serviceregistration.AllocRegistration
exp bool
}{
{
name: "no checks",
exp: true,
tg: &structs.TaskGroup{
Services: []*structs.Service{{Name: "group-s1"}},
Tasks: []*structs.Task{{Services: []*structs.Service{{Name: "task-s2"}}}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {ServiceID: "abc123"},
},
},
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {ServiceID: "def234"},
},
},
},
},
},
{
name: "missing group check",
exp: false,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
Tasks: []*structs.Task{{Services: []*structs.Service{{Name: "task-s2"}}}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {ServiceID: "abc123"},
},
},
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {ServiceID: "def234"},
},
},
},
},
},
{
name: "missing task check",
exp: false,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
}},
Tasks: []*structs.Task{{Services: []*structs.Service{
{
Name: "task-s2",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
},
}}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {ServiceID: "abc123"},
},
},
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {ServiceID: "def234"},
},
},
},
},
},
{
name: "failing group check",
exp: false,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {
ServiceID: "abc123",
Checks: []*consulapi.AgentCheck{
{
Name: "c1",
Status: consulapi.HealthCritical,
ServiceID: "abc123",
},
},
},
},
},
},
},
},
{
name: "failing task check",
exp: false,
tg: &structs.TaskGroup{
Tasks: []*structs.Task{
{
Services: []*structs.Service{
{
Name: "task-s2",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
},
},
},
},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {
ServiceID: "def234",
Checks: []*consulapi.AgentCheck{
{
Name: "c1",
Status: consulapi.HealthCritical,
ServiceID: "abc123",
},
},
},
},
},
},
},
},
{
name: "passing checks",
exp: true,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
Tasks: []*structs.Task{
{
Services: []*structs.Service{
{
Name: "task-s2",
Checks: []*structs.ServiceCheck{
{Name: "c2"},
},
},
},
},
},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {
ServiceID: "abc123",
Checks: []*consulapi.AgentCheck{
{
Name: "c1",
Status: consulapi.HealthPassing,
},
},
},
},
},
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {
ServiceID: "def234",
Checks: []*consulapi.AgentCheck{
{
Name: "c2",
Status: consulapi.HealthPassing,
},
},
},
},
},
},
},
},
{
name: "on update ignore warn",
exp: true,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
OnUpdate: structs.OnUpdateIgnoreWarn,
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {
CheckOnUpdate: map[string]string{
"c1": structs.OnUpdateIgnoreWarn,
},
Checks: []*consulapi.AgentCheck{
{
CheckID: "c1",
Name: "c1",
Status: consulapi.HealthWarning,
},
},
},
},
},
},
},
},
{
name: "on update ignore critical",
exp: true,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
OnUpdate: structs.OnUpdateIgnore,
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {
CheckOnUpdate: map[string]string{
"c1": structs.OnUpdateIgnore,
},
Checks: []*consulapi.AgentCheck{
{
Name: "c1",
CheckID: "c1",
Status: consulapi.HealthCritical,
},
},
},
},
},
},
},
},
{
name: "failing sidecar checks only",
exp: false,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "group-s1",
Checks: []*structs.ServiceCheck{
{Name: "c1"},
},
}},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {
ServiceID: "abc123",
Checks: []*consulapi.AgentCheck{
{
Name: "c1",
Status: consulapi.HealthPassing,
},
},
SidecarService: &consulapi.AgentService{},
SidecarChecks: []*consulapi.AgentCheck{
{
Name: "sidecar-check",
Status: consulapi.HealthCritical,
},
},
},
},
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
result := evaluateConsulChecks(tc.tg.ConsulServices(), tc.registrations)
must.Eq(t, tc.exp, result)
})
}
}