mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Some of our allocrunner hooks require a task environment for interpolating values based on the node or allocation. But several of the hooks accept an already-built environment or builder and then keep that in memory. Both of these retain a copy of all the node attributes and allocation metadata, which balloons memory usage until the allocation is GC'd. While we'd like to look into ways to avoid keeping the allocrunner around entirely (see #25372), for now we can significantly reduce memory usage by creating the task environment on-demand when calling allocrunner methods, rather than persisting it in the allocrunner hooks. In doing so, we uncover two other bugs: * The WID manager, the group service hook, and the checks hook have to interpolate services for specific tasks. They mutated a taskenv builder to do so, but each time they mutate the builder, they write to the same environment map. When a group has multiple tasks, it's possible for one task to set an environment variable that would then be interpolated in the service definition for another task if that task did not have that environment variable. Only the service definition interpolation is impacted. This does not leak env vars across running tasks, as each taskrunner has its own builder. To fix this, we move the `UpdateTask` method off the builder and onto the taskenv as the `WithTask` method. This makes a shallow copy of the taskenv with a deep clone of the environment map used for interpolation, and then overwrites the environment from the task. * The checks hook interpolates Nomad native service checks only on `Prerun` and not on `Update`. This could cause unexpected deregistration and registration of checks during in-place updates. To fix this, we make sure we interpolate in the `Update` method. I also bumped into an incorrectly implemented interface in the CSI hook. I've pulled that and some better guardrails out to https://github.com/hashicorp/nomad/pull/25472. Fixes: https://github.com/hashicorp/nomad/issues/25269 Fixes: https://hashicorp.atlassian.net/browse/NET-12310 Ref: https://github.com/hashicorp/nomad/issues/25372
1514 lines
42 KiB
Go
1514 lines
42 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package allochealth
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
consulapi "github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/client/serviceregistration"
|
|
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
|
|
regmock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
|
"github.com/hashicorp/nomad/client/state"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/client/taskenv"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/helper/testlog"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/shoenig/test/must"
|
|
"github.com/shoenig/test/wait"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestTracker_ConsulChecks_Interpolation(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
|
|
// Generate services at multiple levels that reference runtime variables.
|
|
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
|
tg.Services = []*structs.Service{
|
|
{
|
|
Name: "group-${TASKGROUP}-service-${NOMAD_DC}",
|
|
PortLabel: "http",
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Type: structs.ServiceCheckTCP,
|
|
Interval: 30 * time.Second,
|
|
Timeout: 5 * time.Second,
|
|
},
|
|
{
|
|
Name: "group-${NOMAD_GROUP_NAME}-check",
|
|
Type: structs.ServiceCheckTCP,
|
|
Interval: 30 * time.Second,
|
|
Timeout: 5 * time.Second,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
tg.Tasks[0].Name = "server"
|
|
tg.Tasks[0].Services = []*structs.Service{
|
|
{
|
|
Name: "task-${TASK}-service-${NOMAD_REGION}",
|
|
TaskName: "server",
|
|
PortLabel: "http",
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Type: structs.ServiceCheckTCP,
|
|
Interval: 30 * time.Second,
|
|
Timeout: 5 * time.Second,
|
|
},
|
|
{
|
|
Name: "task-${NOMAD_TASK_NAME}-check-${NOMAD_REGION}",
|
|
Type: structs.ServiceCheckTCP,
|
|
Interval: 30 * time.Second,
|
|
Timeout: 5 * time.Second,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Add another task to make sure each task gets its own environment.
|
|
tg.Tasks = append(tg.Tasks, tg.Tasks[0].Copy())
|
|
tg.Tasks[1].Name = "proxy"
|
|
tg.Tasks[1].Services[0].TaskName = "proxy"
|
|
|
|
// Canonicalize allocation to re-interpolate some of the variables.
|
|
alloc.Canonicalize()
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
tg.Tasks[0].Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
tg.Tasks[1].Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
// Make Consul response
|
|
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
|
|
"group-web": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"group-web-service-dc1": {
|
|
Service: &consulapi.AgentService{
|
|
ID: uuid.Generate(),
|
|
Service: "group-web-service-dc1",
|
|
},
|
|
Checks: []*consulapi.AgentCheck{
|
|
{
|
|
Name: `service: "group-web-service-dc1" check`,
|
|
Status: consulapi.HealthPassing,
|
|
},
|
|
{
|
|
Name: "group-web-check",
|
|
Status: consulapi.HealthPassing,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"server": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"task-server-service-global": {
|
|
Service: &consulapi.AgentService{
|
|
ID: uuid.Generate(),
|
|
Service: "task-server-service-global",
|
|
},
|
|
Checks: []*consulapi.AgentCheck{
|
|
{
|
|
Name: `service: "task-server-service-global" check`,
|
|
Status: consulapi.HealthPassing,
|
|
},
|
|
{
|
|
Name: "task-server-check-global",
|
|
Status: consulapi.HealthPassing,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"proxy": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"task-proxy-service-global": {
|
|
Service: &consulapi.AgentService{
|
|
ID: uuid.Generate(),
|
|
Service: "task-proxy-service-global",
|
|
},
|
|
Checks: []*consulapi.AgentCheck{
|
|
{
|
|
Name: `service: "task-proxy-service-global" check`,
|
|
Status: consulapi.HealthPassing,
|
|
},
|
|
{
|
|
Name: "task-proxy-check-global",
|
|
Status: consulapi.HealthPassing,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
// Inject Consul response.
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
|
|
return &serviceregistration.AllocRegistration{
|
|
Tasks: taskRegs,
|
|
}, nil
|
|
}
|
|
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
defer cancelFn()
|
|
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
checkInterval := 10 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
tracker.Start()
|
|
|
|
select {
|
|
case <-time.After(4 * checkInterval):
|
|
require.Fail(t, "timed out while waiting for health")
|
|
case h := <-tracker.HealthyCh():
|
|
require.True(t, h)
|
|
}
|
|
}
|
|
|
|
func TestTracker_ConsulChecks_Healthy(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
task.Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
// Make Consul response
|
|
check := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[0].Name,
|
|
Status: consulapi.HealthPassing,
|
|
}
|
|
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
|
|
task.Name: {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
task.Services[0].Name: {
|
|
Service: &consulapi.AgentService{
|
|
ID: "foo",
|
|
Service: task.Services[0].Name,
|
|
},
|
|
Checks: []*consulapi.AgentCheck{check},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
// Don't reply on the first call
|
|
var called uint64
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
|
|
if atomic.AddUint64(&called, 1) == 1 {
|
|
return nil, nil
|
|
}
|
|
|
|
reg := &serviceregistration.AllocRegistration{
|
|
Tasks: taskRegs,
|
|
}
|
|
|
|
return reg, nil
|
|
}
|
|
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
defer cancelFn()
|
|
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
checkInterval := 10 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
tracker.Start()
|
|
|
|
select {
|
|
case <-time.After(4 * checkInterval):
|
|
require.Fail(t, "timed out while waiting for health")
|
|
case h := <-tracker.HealthyCh():
|
|
require.True(t, h)
|
|
}
|
|
}
|
|
|
|
func TestTracker_NomadChecks_Healthy(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
alloc.Job.TaskGroups[0].Tasks[0].Services[0].Provider = "nomad"
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
alloc.Job.TaskGroups[0].Tasks[0].Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
err := checks.Set(alloc.ID, &structs.CheckQueryResult{
|
|
ID: "abc123",
|
|
Mode: "healthiness",
|
|
Status: "pending",
|
|
Output: "nomad: waiting to run",
|
|
Timestamp: time.Now().Unix(),
|
|
Group: alloc.TaskGroup,
|
|
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
|
|
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
|
|
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
|
|
})
|
|
must.NoError(t, err)
|
|
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
checkInterval := 10 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
tracker.Start()
|
|
|
|
go func() {
|
|
// wait a bit then update the check to passing
|
|
time.Sleep(15 * time.Millisecond)
|
|
must.NoError(t, checks.Set(alloc.ID, &structs.CheckQueryResult{
|
|
ID: "abc123",
|
|
Mode: "healthiness",
|
|
Status: "success",
|
|
Output: "nomad: http ok",
|
|
Timestamp: time.Now().Unix(),
|
|
Group: alloc.TaskGroup,
|
|
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
|
|
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
|
|
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
|
|
}))
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(4 * checkInterval):
|
|
t.Fatalf("timed out while waiting for success")
|
|
case healthy := <-tracker.HealthyCh():
|
|
must.True(t, healthy)
|
|
}
|
|
}
|
|
|
|
func TestTracker_NomadChecks_Unhealthy(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
alloc.Job.TaskGroups[0].Tasks[0].Services[0].Provider = "nomad"
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
alloc.Job.TaskGroups[0].Tasks[0].Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
err := checks.Set(alloc.ID, &structs.CheckQueryResult{
|
|
ID: "abc123",
|
|
Mode: "healthiness",
|
|
Status: "pending", // start out pending
|
|
Output: "nomad: waiting to run",
|
|
Timestamp: time.Now().Unix(),
|
|
Group: alloc.TaskGroup,
|
|
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
|
|
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
|
|
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
|
|
})
|
|
must.NoError(t, err)
|
|
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
checkInterval := 10 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
tracker.Start()
|
|
|
|
go func() {
|
|
// wait a bit then update the check to failing
|
|
time.Sleep(15 * time.Millisecond)
|
|
must.NoError(t, checks.Set(alloc.ID, &structs.CheckQueryResult{
|
|
ID: "abc123",
|
|
Mode: "healthiness",
|
|
Status: "failing",
|
|
Output: "connection refused",
|
|
Timestamp: time.Now().Unix(),
|
|
Group: alloc.TaskGroup,
|
|
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
|
|
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
|
|
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
|
|
}))
|
|
}()
|
|
|
|
// make sure we are always unhealthy across 4 check intervals
|
|
for i := 0; i < 4; i++ {
|
|
<-time.After(checkInterval)
|
|
select {
|
|
case <-tracker.HealthyCh():
|
|
t.Fatalf("should not receive on healthy chan with failing check")
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.LifecycleAllocWithPoststopDeploy()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
"web": {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
"post": {
|
|
State: structs.TaskStatePending,
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
defer cancelFn()
|
|
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
checkInterval := 10 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
tracker.Start()
|
|
|
|
select {
|
|
case <-time.After(4 * checkInterval):
|
|
require.Fail(t, "timed out while waiting for health")
|
|
case h := <-tracker.HealthyCh():
|
|
require.True(t, h)
|
|
}
|
|
}
|
|
|
|
func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.LifecycleAllocWithPoststartDeploy()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = time.Millisecond * 1
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
"web": {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
"post": {
|
|
State: structs.TaskStateDead,
|
|
StartedAt: time.Now(),
|
|
FinishedAt: time.Now().Add(alloc.Job.TaskGroups[0].Migrate.MinHealthyTime / 2),
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
defer cancelFn()
|
|
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
checkInterval := 10 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
tracker.Start()
|
|
|
|
select {
|
|
case <-time.After(alloc.Job.TaskGroups[0].Migrate.MinHealthyTime * 2):
|
|
require.Fail(t, "timed out while waiting for health")
|
|
case h := <-tracker.HealthyCh():
|
|
require.True(t, h)
|
|
}
|
|
}
|
|
|
|
func TestTracker_ConsulChecks_Unhealthy(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
|
|
newCheck := task.Services[0].Checks[0].Copy()
|
|
newCheck.Name = "failing-check"
|
|
task.Services[0].Checks = append(task.Services[0].Checks, newCheck)
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
task.Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
// Make Consul response
|
|
checkHealthy := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[0].Name,
|
|
Status: consulapi.HealthPassing,
|
|
}
|
|
checksUnhealthy := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[1].Name,
|
|
Status: consulapi.HealthCritical,
|
|
}
|
|
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
|
|
task.Name: {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
task.Services[0].Name: {
|
|
Service: &consulapi.AgentService{
|
|
ID: "foo",
|
|
Service: task.Services[0].Name,
|
|
},
|
|
Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
// Don't reply on the first call
|
|
var called uint64
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
|
|
if atomic.AddUint64(&called, 1) == 1 {
|
|
return nil, nil
|
|
}
|
|
|
|
reg := &serviceregistration.AllocRegistration{
|
|
Tasks: taskRegs,
|
|
}
|
|
|
|
return reg, nil
|
|
}
|
|
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
defer cancelFn()
|
|
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
checkInterval := 10 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
tracker.Start()
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
lookup := atomic.LoadUint64(&called)
|
|
return lookup < 4, fmt.Errorf("wait to get more task registration lookups: %v", lookup)
|
|
}, func(err error) {
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
tracker.lock.Lock()
|
|
require.False(t, tracker.checksHealthy)
|
|
tracker.lock.Unlock()
|
|
|
|
select {
|
|
case v := <-tracker.HealthyCh():
|
|
require.Failf(t, "expected no health value", " got %v", v)
|
|
default:
|
|
// good
|
|
}
|
|
}
|
|
|
|
func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
|
|
newCheck := task.Services[0].Checks[0].Copy()
|
|
newCheck.Name = "my-check"
|
|
task.Services[0].Checks = []*structs.ServiceCheck{newCheck}
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
task.Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
// Make Consul response - starts with a healthy check and transitions to unhealthy
|
|
// during the minimum healthy time window
|
|
checkHealthy := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[0].Name,
|
|
Status: consulapi.HealthPassing,
|
|
}
|
|
checkUnhealthy := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[0].Name,
|
|
Status: consulapi.HealthCritical,
|
|
}
|
|
|
|
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
|
|
task.Name: {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
task.Services[0].Name: {
|
|
Service: &consulapi.AgentService{
|
|
ID: "s1",
|
|
Service: task.Services[0].Name,
|
|
},
|
|
Checks: []*consulapi.AgentCheck{checkHealthy}, // initially healthy
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
checkInterval := 10 * time.Millisecond
|
|
minHealthyTime := 2 * time.Second
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, minHealthyTime, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
|
|
assertChecksHealth := func(exp bool) {
|
|
tracker.lock.Lock()
|
|
must.Eq(t, exp, tracker.checksHealthy, must.Sprint("tracker checks health in unexpected state"))
|
|
tracker.lock.Unlock()
|
|
}
|
|
|
|
// start the clock so we can degrade check status during minimum healthy time
|
|
startTime := time.Now()
|
|
|
|
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
|
|
// after 1 second, start failing the check
|
|
if time.Since(startTime) > 1*time.Second {
|
|
taskRegs[task.Name].Services[task.Services[0].Name].Checks = []*consulapi.AgentCheck{checkUnhealthy}
|
|
}
|
|
|
|
// assert tracker is observing unhealthy - we never cross minimum health
|
|
// time with healthy checks in this test case
|
|
assertChecksHealth(false)
|
|
reg := &serviceregistration.AllocRegistration{Tasks: taskRegs}
|
|
return reg, nil
|
|
}
|
|
|
|
// start the tracker and wait for evaluations to happen
|
|
tracker.Start()
|
|
time.Sleep(2 * time.Second)
|
|
|
|
// tracker should be observing unhealthy check
|
|
assertChecksHealth(false)
|
|
|
|
select {
|
|
case <-tracker.HealthyCh():
|
|
must.Unreachable(t, must.Sprint("did not expect unblock of healthy chan"))
|
|
default:
|
|
// ok
|
|
}
|
|
}
|
|
|
|
func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
|
|
newCheck := task.Services[0].Checks[0].Copy()
|
|
newCheck.Name = "my-check"
|
|
task.Services[0].Checks = []*structs.ServiceCheck{newCheck}
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
task.Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
// Make Consul response - start with check not yet registered
|
|
checkHealthy := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[0].Name,
|
|
Status: consulapi.HealthPassing,
|
|
}
|
|
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
|
|
task.Name: {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
task.Services[0].Name: {
|
|
Service: &consulapi.AgentService{
|
|
ID: "s1",
|
|
Service: task.Services[0].Name,
|
|
},
|
|
Checks: nil, // initially missing
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
checkInterval := 10 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
|
|
assertChecksHealth := func(exp bool) {
|
|
tracker.lock.Lock()
|
|
must.Eq(t, exp, tracker.checksHealthy, must.Sprint("tracker checks health in unexpected state"))
|
|
tracker.lock.Unlock()
|
|
}
|
|
|
|
var hits atomic.Int32
|
|
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
|
|
// after 10 queries, insert the check
|
|
hits.Add(1)
|
|
if count := hits.Load(); count > 10 {
|
|
taskRegs[task.Name].Services[task.Services[0].Name].Checks = []*consulapi.AgentCheck{checkHealthy}
|
|
} else {
|
|
// assert tracker is observing unhealthy (missing) checks
|
|
assertChecksHealth(false)
|
|
}
|
|
reg := &serviceregistration.AllocRegistration{Tasks: taskRegs}
|
|
return reg, nil
|
|
}
|
|
|
|
// start the tracker and wait for evaluations to happen
|
|
tracker.Start()
|
|
must.Wait(t, wait.InitialSuccess(
|
|
wait.BoolFunc(func() bool { return hits.Load() > 10 }),
|
|
wait.Gap(10*time.Millisecond),
|
|
wait.Timeout(1*time.Second),
|
|
))
|
|
|
|
// tracker should be observing healthy check now
|
|
assertChecksHealth(true)
|
|
|
|
select {
|
|
case v := <-tracker.HealthyCh():
|
|
must.True(t, v, must.Sprint("expected value from tracker chan to be healthy"))
|
|
default:
|
|
must.Unreachable(t, must.Sprint("expected value from tracker chan"))
|
|
}
|
|
}
|
|
|
|
func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.Alloc()
|
|
logger := testlog.HCLogger(t)
|
|
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
defer cancelFn()
|
|
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
tracker := NewTracker(ctx, logger, alloc, nil, env, nil, nil, time.Millisecond, true)
|
|
|
|
assertNoHealth := func() {
|
|
require.NoError(t, tracker.ctx.Err())
|
|
select {
|
|
case v := <-tracker.HealthyCh():
|
|
require.Failf(t, "unexpected healthy event", "got %v", v)
|
|
default:
|
|
}
|
|
}
|
|
|
|
// first set task health without checks
|
|
tracker.setTaskHealth(true, false)
|
|
assertNoHealth()
|
|
|
|
// now fail task health again before checks are successful
|
|
tracker.setTaskHealth(false, false)
|
|
assertNoHealth()
|
|
|
|
// now pass health checks - do not propagate health yet
|
|
tracker.setCheckHealth(true)
|
|
assertNoHealth()
|
|
|
|
// set tasks to healthy - don't propagate health yet, wait for the next check
|
|
tracker.setTaskHealth(true, false)
|
|
assertNoHealth()
|
|
|
|
// set checks to true, now propagate health status
|
|
tracker.setCheckHealth(true)
|
|
|
|
require.Error(t, tracker.ctx.Err())
|
|
select {
|
|
case v := <-tracker.HealthyCh():
|
|
require.True(t, v)
|
|
default:
|
|
require.Fail(t, "expected a health status")
|
|
}
|
|
}
|
|
|
|
// TestTracker_Checks_Healthy_Before_TaskHealth asserts that we mark an alloc
|
|
// healthy, if the checks pass before task health pass
|
|
func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
|
|
// new task starting unhealthy, without services
|
|
task2 := task.Copy()
|
|
task2.Name = task2.Name + "2"
|
|
task2.Services = nil
|
|
alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
task.Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
task2.Name: {
|
|
State: structs.TaskStatePending,
|
|
},
|
|
}
|
|
|
|
// Make Consul response
|
|
check := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[0].Name,
|
|
Status: consulapi.HealthPassing,
|
|
}
|
|
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
|
|
task.Name: {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
task.Services[0].Name: {
|
|
Service: &consulapi.AgentService{
|
|
ID: "foo",
|
|
Service: task.Services[0].Name,
|
|
},
|
|
Checks: []*consulapi.AgentCheck{check},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
// Don't reply on the first call
|
|
var called uint64
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
|
|
if atomic.AddUint64(&called, 1) == 1 {
|
|
return nil, nil
|
|
}
|
|
|
|
reg := &serviceregistration.AllocRegistration{
|
|
Tasks: taskRegs,
|
|
}
|
|
|
|
return reg, nil
|
|
}
|
|
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
defer cancelFn()
|
|
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
checkInterval := 10 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
tracker.Start()
|
|
|
|
// assert that we don't get marked healthy
|
|
select {
|
|
case <-time.After(4 * checkInterval):
|
|
// still unhealthy, good
|
|
case h := <-tracker.HealthyCh():
|
|
require.Fail(t, "unexpected health event", h)
|
|
}
|
|
|
|
helper.WithLock(&tracker.lock, func() {
|
|
require.False(t, tracker.tasksHealthy)
|
|
require.False(t, tracker.checksHealthy)
|
|
})
|
|
|
|
// now set task to healthy
|
|
runningAlloc := alloc.Copy()
|
|
runningAlloc.TaskStates = map[string]*structs.TaskState{
|
|
task.Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
task2.Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
err := b.Send(runningAlloc)
|
|
require.NoError(t, err)
|
|
|
|
// eventually, it is marked as healthy
|
|
select {
|
|
case <-time.After(4 * checkInterval):
|
|
require.Fail(t, "timed out while waiting for health")
|
|
case h := <-tracker.HealthyCh():
|
|
require.True(t, h)
|
|
}
|
|
|
|
}
|
|
|
|
func TestTracker_ConsulChecks_OnUpdate(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
cases := []struct {
|
|
desc string
|
|
checkOnUpdate string
|
|
consulResp string
|
|
expectedPass bool
|
|
}{
|
|
{
|
|
desc: "check require_healthy consul healthy",
|
|
checkOnUpdate: structs.OnUpdateRequireHealthy,
|
|
consulResp: consulapi.HealthPassing,
|
|
expectedPass: true,
|
|
},
|
|
{
|
|
desc: "check on_update ignore_warning, consul warn",
|
|
checkOnUpdate: structs.OnUpdateIgnoreWarn,
|
|
consulResp: consulapi.HealthWarning,
|
|
expectedPass: true,
|
|
},
|
|
{
|
|
desc: "check on_update ignore_warning, consul critical",
|
|
checkOnUpdate: structs.OnUpdateIgnoreWarn,
|
|
consulResp: consulapi.HealthCritical,
|
|
expectedPass: false,
|
|
},
|
|
{
|
|
desc: "check on_update ignore_warning, consul healthy",
|
|
checkOnUpdate: structs.OnUpdateIgnoreWarn,
|
|
consulResp: consulapi.HealthPassing,
|
|
expectedPass: true,
|
|
},
|
|
{
|
|
desc: "check on_update ignore, consul critical",
|
|
checkOnUpdate: structs.OnUpdateIgnore,
|
|
consulResp: consulapi.HealthCritical,
|
|
expectedPass: true,
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
task.Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
// Make Consul response
|
|
check := &consulapi.AgentCheck{
|
|
Name: task.Services[0].Checks[0].Name,
|
|
Status: tc.consulResp,
|
|
}
|
|
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
|
|
task.Name: {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
task.Services[0].Name: {
|
|
Service: &consulapi.AgentService{
|
|
ID: "foo",
|
|
Service: task.Services[0].Name,
|
|
},
|
|
Checks: []*consulapi.AgentCheck{check},
|
|
CheckOnUpdate: map[string]string{
|
|
check.CheckID: tc.checkOnUpdate,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
// Don't reply on the first call
|
|
var called uint64
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
|
|
if atomic.AddUint64(&called, 1) == 1 {
|
|
return nil, nil
|
|
}
|
|
|
|
reg := &serviceregistration.AllocRegistration{
|
|
Tasks: taskRegs,
|
|
}
|
|
|
|
return reg, nil
|
|
}
|
|
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
defer cancelFn()
|
|
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
checkInterval := 10 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
|
|
tracker.checkLookupInterval = checkInterval
|
|
tracker.Start()
|
|
|
|
select {
|
|
case <-time.After(4 * checkInterval):
|
|
if !tc.expectedPass {
|
|
// tracker should still be running
|
|
require.Nil(t, tracker.ctx.Err())
|
|
return
|
|
}
|
|
require.Fail(t, "timed out while waiting for health")
|
|
case h := <-tracker.HealthyCh():
|
|
require.True(t, h)
|
|
}
|
|
|
|
// For healthy checks, the tracker should stop watching
|
|
select {
|
|
case <-tracker.ctx.Done():
|
|
// Ok, tracker should exit after reporting healthy
|
|
default:
|
|
require.Fail(t, "expected tracker to exit after reporting healthy")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestTracker_NomadChecks_OnUpdate(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
cases := []struct {
|
|
name string
|
|
checkMode structs.CheckMode
|
|
checkResult structs.CheckStatus
|
|
expectedPass bool
|
|
}{
|
|
{
|
|
name: "mode is healthiness and check is healthy",
|
|
checkMode: structs.Healthiness,
|
|
checkResult: structs.CheckSuccess,
|
|
expectedPass: true,
|
|
},
|
|
{
|
|
name: "mode is healthiness and check is unhealthy",
|
|
checkMode: structs.Healthiness,
|
|
checkResult: structs.CheckFailure,
|
|
expectedPass: false,
|
|
},
|
|
{
|
|
name: "mode is readiness and check is healthy",
|
|
checkMode: structs.Readiness,
|
|
checkResult: structs.CheckSuccess,
|
|
expectedPass: true,
|
|
},
|
|
{
|
|
name: "mode is readiness and check is healthy",
|
|
checkMode: structs.Readiness,
|
|
checkResult: structs.CheckFailure,
|
|
expectedPass: true,
|
|
},
|
|
}
|
|
|
|
for i := range cases {
|
|
tc := cases[i]
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
|
alloc.Job.TaskGroups[0].Tasks[0].Services[0].Provider = "nomad"
|
|
|
|
logger := testlog.HCLogger(t)
|
|
b := cstructs.NewAllocBroadcaster(logger)
|
|
defer b.Close()
|
|
|
|
// Synthesize running alloc and tasks
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.TaskStates = map[string]*structs.TaskState{
|
|
alloc.Job.TaskGroups[0].Tasks[0].Name: {
|
|
State: structs.TaskStateRunning,
|
|
StartedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
// Set a check that is pending
|
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
|
err := checks.Set(alloc.ID, &structs.CheckQueryResult{
|
|
ID: "abc123",
|
|
Mode: tc.checkMode,
|
|
Status: structs.CheckPending,
|
|
Output: "nomad: waiting to run",
|
|
Timestamp: time.Now().Unix(),
|
|
Group: alloc.TaskGroup,
|
|
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
|
|
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
|
|
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
|
|
})
|
|
must.NoError(t, err)
|
|
|
|
go func() {
|
|
// wait a bit then update the check to passing
|
|
time.Sleep(15 * time.Millisecond)
|
|
must.NoError(t, checks.Set(alloc.ID, &structs.CheckQueryResult{
|
|
ID: "abc123",
|
|
Mode: tc.checkMode,
|
|
Status: tc.checkResult,
|
|
Output: "some output",
|
|
Timestamp: time.Now().Unix(),
|
|
Group: alloc.TaskGroup,
|
|
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
|
|
Service: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Name,
|
|
Check: alloc.Job.TaskGroups[0].Tasks[0].Services[0].Checks[0].Name,
|
|
}))
|
|
}()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
|
minHealthyTime := 1 * time.Millisecond
|
|
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
|
|
|
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, minHealthyTime, true)
|
|
tracker.checkLookupInterval = 10 * time.Millisecond
|
|
tracker.Start()
|
|
|
|
select {
|
|
case <-time.After(8 * tracker.checkLookupInterval):
|
|
if !tc.expectedPass {
|
|
// tracker should still be running
|
|
must.NoError(t, tracker.ctx.Err())
|
|
return
|
|
}
|
|
t.Fatal("timed out while waiting for health")
|
|
case h := <-tracker.HealthyCh():
|
|
require.True(t, h)
|
|
}
|
|
|
|
// For healthy checks, the tracker should stop watching
|
|
select {
|
|
case <-tracker.ctx.Done():
|
|
// Ok, tracker should exit after reporting healthy
|
|
default:
|
|
t.Fatal("expected tracker to exit after reporting healthy")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestTracker_evaluateConsulChecks(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
cases := []struct {
|
|
name string
|
|
tg *structs.TaskGroup
|
|
registrations *serviceregistration.AllocRegistration
|
|
exp bool
|
|
}{
|
|
{
|
|
name: "no checks",
|
|
exp: true,
|
|
tg: &structs.TaskGroup{
|
|
Services: []*structs.Service{{Name: "group-s1"}},
|
|
Tasks: []*structs.Task{{Services: []*structs.Service{{Name: "task-s2"}}}},
|
|
},
|
|
registrations: &serviceregistration.AllocRegistration{
|
|
Tasks: map[string]*serviceregistration.ServiceRegistrations{
|
|
"group": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"abc123": {ServiceID: "abc123"},
|
|
},
|
|
},
|
|
"task": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"def234": {ServiceID: "def234"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "missing group check",
|
|
exp: false,
|
|
tg: &structs.TaskGroup{
|
|
Services: []*structs.Service{{
|
|
Name: "group-s1",
|
|
Checks: []*structs.ServiceCheck{
|
|
{Name: "c1"},
|
|
},
|
|
}},
|
|
Tasks: []*structs.Task{{Services: []*structs.Service{{Name: "task-s2"}}}},
|
|
},
|
|
registrations: &serviceregistration.AllocRegistration{
|
|
Tasks: map[string]*serviceregistration.ServiceRegistrations{
|
|
"group": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"abc123": {ServiceID: "abc123"},
|
|
},
|
|
},
|
|
"task": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"def234": {ServiceID: "def234"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "missing task check",
|
|
exp: false,
|
|
tg: &structs.TaskGroup{
|
|
Services: []*structs.Service{{
|
|
Name: "group-s1",
|
|
}},
|
|
Tasks: []*structs.Task{{Services: []*structs.Service{
|
|
{
|
|
Name: "task-s2",
|
|
Checks: []*structs.ServiceCheck{
|
|
{Name: "c1"},
|
|
},
|
|
},
|
|
}}},
|
|
},
|
|
registrations: &serviceregistration.AllocRegistration{
|
|
Tasks: map[string]*serviceregistration.ServiceRegistrations{
|
|
"group": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"abc123": {ServiceID: "abc123"},
|
|
},
|
|
},
|
|
"task": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"def234": {ServiceID: "def234"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "failing group check",
|
|
exp: false,
|
|
tg: &structs.TaskGroup{
|
|
Services: []*structs.Service{{
|
|
Name: "group-s1",
|
|
Checks: []*structs.ServiceCheck{
|
|
{Name: "c1"},
|
|
},
|
|
}},
|
|
},
|
|
registrations: &serviceregistration.AllocRegistration{
|
|
Tasks: map[string]*serviceregistration.ServiceRegistrations{
|
|
"group": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"abc123": {
|
|
ServiceID: "abc123",
|
|
Checks: []*consulapi.AgentCheck{
|
|
{
|
|
Name: "c1",
|
|
Status: consulapi.HealthCritical,
|
|
ServiceID: "abc123",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "failing task check",
|
|
exp: false,
|
|
tg: &structs.TaskGroup{
|
|
Tasks: []*structs.Task{
|
|
{
|
|
Services: []*structs.Service{
|
|
{
|
|
Name: "task-s2",
|
|
Checks: []*structs.ServiceCheck{
|
|
{Name: "c1"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
registrations: &serviceregistration.AllocRegistration{
|
|
Tasks: map[string]*serviceregistration.ServiceRegistrations{
|
|
"task": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"def234": {
|
|
ServiceID: "def234",
|
|
Checks: []*consulapi.AgentCheck{
|
|
{
|
|
Name: "c1",
|
|
Status: consulapi.HealthCritical,
|
|
ServiceID: "abc123",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "passing checks",
|
|
exp: true,
|
|
tg: &structs.TaskGroup{
|
|
Services: []*structs.Service{{
|
|
Name: "group-s1",
|
|
Checks: []*structs.ServiceCheck{
|
|
{Name: "c1"},
|
|
},
|
|
}},
|
|
Tasks: []*structs.Task{
|
|
{
|
|
Services: []*structs.Service{
|
|
{
|
|
Name: "task-s2",
|
|
Checks: []*structs.ServiceCheck{
|
|
{Name: "c2"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
registrations: &serviceregistration.AllocRegistration{
|
|
Tasks: map[string]*serviceregistration.ServiceRegistrations{
|
|
"group": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"abc123": {
|
|
ServiceID: "abc123",
|
|
Checks: []*consulapi.AgentCheck{
|
|
{
|
|
Name: "c1",
|
|
Status: consulapi.HealthPassing,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"task": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"def234": {
|
|
ServiceID: "def234",
|
|
Checks: []*consulapi.AgentCheck{
|
|
{
|
|
Name: "c2",
|
|
Status: consulapi.HealthPassing,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "on update ignore warn",
|
|
exp: true,
|
|
tg: &structs.TaskGroup{
|
|
Services: []*structs.Service{{
|
|
Name: "group-s1",
|
|
OnUpdate: structs.OnUpdateIgnoreWarn,
|
|
Checks: []*structs.ServiceCheck{
|
|
{Name: "c1"},
|
|
},
|
|
}},
|
|
},
|
|
registrations: &serviceregistration.AllocRegistration{
|
|
Tasks: map[string]*serviceregistration.ServiceRegistrations{
|
|
"group": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"abc123": {
|
|
CheckOnUpdate: map[string]string{
|
|
"c1": structs.OnUpdateIgnoreWarn,
|
|
},
|
|
Checks: []*consulapi.AgentCheck{
|
|
{
|
|
CheckID: "c1",
|
|
Name: "c1",
|
|
Status: consulapi.HealthWarning,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "on update ignore critical",
|
|
exp: true,
|
|
tg: &structs.TaskGroup{
|
|
Services: []*structs.Service{{
|
|
Name: "group-s1",
|
|
OnUpdate: structs.OnUpdateIgnore,
|
|
Checks: []*structs.ServiceCheck{
|
|
{Name: "c1"},
|
|
},
|
|
}},
|
|
},
|
|
registrations: &serviceregistration.AllocRegistration{
|
|
Tasks: map[string]*serviceregistration.ServiceRegistrations{
|
|
"group": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"abc123": {
|
|
CheckOnUpdate: map[string]string{
|
|
"c1": structs.OnUpdateIgnore,
|
|
},
|
|
Checks: []*consulapi.AgentCheck{
|
|
{
|
|
Name: "c1",
|
|
CheckID: "c1",
|
|
Status: consulapi.HealthCritical,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "failing sidecar checks only",
|
|
exp: false,
|
|
tg: &structs.TaskGroup{
|
|
Services: []*structs.Service{{
|
|
Name: "group-s1",
|
|
Checks: []*structs.ServiceCheck{
|
|
{Name: "c1"},
|
|
},
|
|
}},
|
|
},
|
|
registrations: &serviceregistration.AllocRegistration{
|
|
Tasks: map[string]*serviceregistration.ServiceRegistrations{
|
|
"group": {
|
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
|
"abc123": {
|
|
ServiceID: "abc123",
|
|
Checks: []*consulapi.AgentCheck{
|
|
{
|
|
Name: "c1",
|
|
Status: consulapi.HealthPassing,
|
|
},
|
|
},
|
|
SidecarService: &consulapi.AgentService{},
|
|
SidecarChecks: []*consulapi.AgentCheck{
|
|
{
|
|
Name: "sidecar-check",
|
|
Status: consulapi.HealthCritical,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
result := evaluateConsulChecks(tc.tg.ConsulServices(), tc.registrations)
|
|
must.Eq(t, tc.exp, result)
|
|
})
|
|
}
|
|
}
|