mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
allocrunner: fix health check monitoring for Consul services (#16402)
Services must be interpolated to replace runtime variables before they can be compared against the values returned by Consul.
This commit is contained in:
3
.changelog/16402.txt
Normal file
3
.changelog/16402.txt
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
```release-note:bug
|
||||||
|
client: Fixed a bug that prevented allocations with interpolated values in Consul services from being marked as healthy
|
||||||
|
```
|
||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/hashicorp/nomad/client/serviceregistration"
|
"github.com/hashicorp/nomad/client/serviceregistration"
|
||||||
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
|
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
|
||||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||||
|
"github.com/hashicorp/nomad/client/taskenv"
|
||||||
"github.com/hashicorp/nomad/helper"
|
"github.com/hashicorp/nomad/helper"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
@@ -96,6 +97,10 @@ type Tracker struct {
|
|||||||
// name -> state
|
// name -> state
|
||||||
taskHealth map[string]*taskHealthState
|
taskHealth map[string]*taskHealthState
|
||||||
|
|
||||||
|
// taskEnvs maps each task in the allocation to a *taskenv.TaskEnv that is
|
||||||
|
// used to interpolate runtime variables used in service definitions.
|
||||||
|
taskEnvs map[string]*taskenv.TaskEnv
|
||||||
|
|
||||||
// logger is for logging things
|
// logger is for logging things
|
||||||
logger hclog.Logger
|
logger hclog.Logger
|
||||||
}
|
}
|
||||||
@@ -111,6 +116,7 @@ func NewTracker(
|
|||||||
logger hclog.Logger,
|
logger hclog.Logger,
|
||||||
alloc *structs.Allocation,
|
alloc *structs.Allocation,
|
||||||
allocUpdates *cstructs.AllocListener,
|
allocUpdates *cstructs.AllocListener,
|
||||||
|
taskEnvBuilder *taskenv.Builder,
|
||||||
consulClient serviceregistration.Handler,
|
consulClient serviceregistration.Handler,
|
||||||
checkStore checkstore.Shim,
|
checkStore checkstore.Shim,
|
||||||
minHealthyTime time.Duration,
|
minHealthyTime time.Duration,
|
||||||
@@ -132,6 +138,12 @@ func NewTracker(
|
|||||||
lifecycleTasks: map[string]string{},
|
lifecycleTasks: map[string]string{},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build the map of TaskEnv for each task. Create the group-level TaskEnv
|
||||||
|
// first because taskEnvBuilder is mutated in every loop and we can't undo
|
||||||
|
// a call to UpdateTask().
|
||||||
|
t.taskEnvs = make(map[string]*taskenv.TaskEnv, len(t.tg.Tasks)+1)
|
||||||
|
t.taskEnvs[""] = taskEnvBuilder.Build()
|
||||||
|
|
||||||
t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks))
|
t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks))
|
||||||
for _, task := range t.tg.Tasks {
|
for _, task := range t.tg.Tasks {
|
||||||
t.taskHealth[task.Name] = &taskHealthState{task: task}
|
t.taskHealth[task.Name] = &taskHealthState{task: task}
|
||||||
@@ -140,6 +152,8 @@ func NewTracker(
|
|||||||
t.lifecycleTasks[task.Name] = task.Lifecycle.Hook
|
t.lifecycleTasks[task.Name] = task.Lifecycle.Hook
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.taskEnvs[task.Name] = taskEnvBuilder.UpdateTask(alloc, task).Build()
|
||||||
|
|
||||||
c, n := countChecks(task.Services)
|
c, n := countChecks(task.Services)
|
||||||
t.consulCheckCount += c
|
t.consulCheckCount += c
|
||||||
t.nomadCheckCount += n
|
t.nomadCheckCount += n
|
||||||
@@ -503,8 +517,24 @@ OUTER:
|
|||||||
// Detect if all the checks are passing
|
// Detect if all the checks are passing
|
||||||
passed := true
|
passed := true
|
||||||
|
|
||||||
|
// interpolate services to replace runtime variables
|
||||||
|
consulServices := t.tg.ConsulServices()
|
||||||
|
interpolatedServices := make([]*structs.Service, 0, len(consulServices))
|
||||||
|
for _, service := range consulServices {
|
||||||
|
env := t.taskEnvs[service.TaskName]
|
||||||
|
if env == nil {
|
||||||
|
// This is not expected to happen, but guard against a nil
|
||||||
|
// task environment that could case a panic.
|
||||||
|
t.logger.Error("failed to interpolate service runtime variables: task environment not found",
|
||||||
|
"alloc_id", t.alloc.ID, "task", service.TaskName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
interpolatedService := taskenv.InterpolateService(env, service)
|
||||||
|
interpolatedServices = append(interpolatedServices, interpolatedService)
|
||||||
|
}
|
||||||
|
|
||||||
// scan for missing or unhealthy consul checks
|
// scan for missing or unhealthy consul checks
|
||||||
if !evaluateConsulChecks(t.tg, allocReg) {
|
if !evaluateConsulChecks(interpolatedServices, allocReg) {
|
||||||
t.setCheckHealth(false)
|
t.setCheckHealth(false)
|
||||||
passed = false
|
passed = false
|
||||||
}
|
}
|
||||||
@@ -523,11 +553,10 @@ OUTER:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func evaluateConsulChecks(tg *structs.TaskGroup, registrations *serviceregistration.AllocRegistration) bool {
|
func evaluateConsulChecks(services []*structs.Service, registrations *serviceregistration.AllocRegistration) bool {
|
||||||
// First, identify any case where a check definition is missing or outdated
|
// First, identify any case where a check definition is missing or outdated
|
||||||
// on the Consul side. Note that because check names are not unique, we must
|
// on the Consul side. Note that because check names are not unique, we must
|
||||||
// also keep track of the counts on each side and make sure those also match.
|
// also keep track of the counts on each side and make sure those also match.
|
||||||
services := tg.ConsulServices()
|
|
||||||
expChecks := make(map[string]int)
|
expChecks := make(map[string]int)
|
||||||
regChecks := make(map[string]int)
|
regChecks := make(map[string]int)
|
||||||
for _, service := range services {
|
for _, service := range services {
|
||||||
|
|||||||
@@ -14,8 +14,10 @@ import (
|
|||||||
regmock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
regmock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
||||||
"github.com/hashicorp/nomad/client/state"
|
"github.com/hashicorp/nomad/client/state"
|
||||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||||
|
"github.com/hashicorp/nomad/client/taskenv"
|
||||||
"github.com/hashicorp/nomad/helper"
|
"github.com/hashicorp/nomad/helper"
|
||||||
"github.com/hashicorp/nomad/helper/testlog"
|
"github.com/hashicorp/nomad/helper/testlog"
|
||||||
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
"github.com/hashicorp/nomad/nomad/mock"
|
"github.com/hashicorp/nomad/nomad/mock"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
"github.com/hashicorp/nomad/testutil"
|
"github.com/hashicorp/nomad/testutil"
|
||||||
@@ -24,6 +26,171 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestTracker_ConsulChecks_Interpolation(t *testing.T) {
|
||||||
|
ci.Parallel(t)
|
||||||
|
|
||||||
|
alloc := mock.Alloc()
|
||||||
|
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
|
||||||
|
|
||||||
|
// Generate services at multiple levels that reference runtime variables.
|
||||||
|
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||||
|
tg.Services = []*structs.Service{
|
||||||
|
{
|
||||||
|
Name: "group-${TASKGROUP}-service-${NOMAD_DC}",
|
||||||
|
PortLabel: "http",
|
||||||
|
Checks: []*structs.ServiceCheck{
|
||||||
|
{
|
||||||
|
Type: structs.ServiceCheckTCP,
|
||||||
|
Interval: 30 * time.Second,
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "group-${NOMAD_GROUP_NAME}-check",
|
||||||
|
Type: structs.ServiceCheckTCP,
|
||||||
|
Interval: 30 * time.Second,
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tg.Tasks[0].Name = "server"
|
||||||
|
tg.Tasks[0].Services = []*structs.Service{
|
||||||
|
{
|
||||||
|
Name: "task-${TASK}-service-${NOMAD_REGION}",
|
||||||
|
TaskName: "server",
|
||||||
|
PortLabel: "http",
|
||||||
|
Checks: []*structs.ServiceCheck{
|
||||||
|
{
|
||||||
|
Type: structs.ServiceCheckTCP,
|
||||||
|
Interval: 30 * time.Second,
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "task-${NOMAD_TASK_NAME}-check-${NOMAD_REGION}",
|
||||||
|
Type: structs.ServiceCheckTCP,
|
||||||
|
Interval: 30 * time.Second,
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add another task to make sure each task gets its own environment.
|
||||||
|
tg.Tasks = append(tg.Tasks, tg.Tasks[0].Copy())
|
||||||
|
tg.Tasks[1].Name = "proxy"
|
||||||
|
tg.Tasks[1].Services[0].TaskName = "proxy"
|
||||||
|
|
||||||
|
// Canonicalize allocation to re-interpolate some of the variables.
|
||||||
|
alloc.Canonicalize()
|
||||||
|
|
||||||
|
// Synthesize running alloc and tasks
|
||||||
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
||||||
|
alloc.TaskStates = map[string]*structs.TaskState{
|
||||||
|
tg.Tasks[0].Name: {
|
||||||
|
State: structs.TaskStateRunning,
|
||||||
|
StartedAt: time.Now(),
|
||||||
|
},
|
||||||
|
tg.Tasks[1].Name: {
|
||||||
|
State: structs.TaskStateRunning,
|
||||||
|
StartedAt: time.Now(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make Consul response
|
||||||
|
taskRegs := map[string]*serviceregistration.ServiceRegistrations{
|
||||||
|
"group-web": {
|
||||||
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
||||||
|
"group-web-service-dc1": {
|
||||||
|
Service: &consulapi.AgentService{
|
||||||
|
ID: uuid.Generate(),
|
||||||
|
Service: "group-web-service-dc1",
|
||||||
|
},
|
||||||
|
Checks: []*consulapi.AgentCheck{
|
||||||
|
{
|
||||||
|
Name: `service: "group-web-service-dc1" check`,
|
||||||
|
Status: consulapi.HealthPassing,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "group-web-check",
|
||||||
|
Status: consulapi.HealthPassing,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"server": {
|
||||||
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
||||||
|
"task-server-service-global": {
|
||||||
|
Service: &consulapi.AgentService{
|
||||||
|
ID: uuid.Generate(),
|
||||||
|
Service: "task-server-service-global",
|
||||||
|
},
|
||||||
|
Checks: []*consulapi.AgentCheck{
|
||||||
|
{
|
||||||
|
Name: `service: "task-server-service-global" check`,
|
||||||
|
Status: consulapi.HealthPassing,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "task-server-check-global",
|
||||||
|
Status: consulapi.HealthPassing,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"proxy": {
|
||||||
|
Services: map[string]*serviceregistration.ServiceRegistration{
|
||||||
|
"task-proxy-service-global": {
|
||||||
|
Service: &consulapi.AgentService{
|
||||||
|
ID: uuid.Generate(),
|
||||||
|
Service: "task-proxy-service-global",
|
||||||
|
},
|
||||||
|
Checks: []*consulapi.AgentCheck{
|
||||||
|
{
|
||||||
|
Name: `service: "task-proxy-service-global" check`,
|
||||||
|
Status: consulapi.HealthPassing,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "task-proxy-check-global",
|
||||||
|
Status: consulapi.HealthPassing,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := testlog.HCLogger(t)
|
||||||
|
b := cstructs.NewAllocBroadcaster(logger)
|
||||||
|
defer b.Close()
|
||||||
|
|
||||||
|
// Inject Consul response.
|
||||||
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
||||||
|
consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) {
|
||||||
|
return &serviceregistration.AllocRegistration{
|
||||||
|
Tasks: taskRegs,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancelFn := context.WithCancel(context.Background())
|
||||||
|
defer cancelFn()
|
||||||
|
|
||||||
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||||
|
checkInterval := 10 * time.Millisecond
|
||||||
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||||
|
tracker.checkLookupInterval = checkInterval
|
||||||
|
tracker.Start()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(4 * checkInterval):
|
||||||
|
require.Fail(t, "timed out while waiting for health")
|
||||||
|
case h := <-tracker.HealthyCh():
|
||||||
|
require.True(t, h)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestTracker_ConsulChecks_Healthy(t *testing.T) {
|
func TestTracker_ConsulChecks_Healthy(t *testing.T) {
|
||||||
ci.Parallel(t)
|
ci.Parallel(t)
|
||||||
|
|
||||||
@@ -83,7 +250,9 @@ func TestTracker_ConsulChecks_Healthy(t *testing.T) {
|
|||||||
|
|
||||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||||
checkInterval := 10 * time.Millisecond
|
checkInterval := 10 * time.Millisecond
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||||
tracker.checkLookupInterval = checkInterval
|
tracker.checkLookupInterval = checkInterval
|
||||||
tracker.Start()
|
tracker.Start()
|
||||||
|
|
||||||
@@ -134,7 +303,9 @@ func TestTracker_NomadChecks_Healthy(t *testing.T) {
|
|||||||
|
|
||||||
consul := regmock.NewServiceRegistrationHandler(logger)
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
||||||
checkInterval := 10 * time.Millisecond
|
checkInterval := 10 * time.Millisecond
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||||
tracker.checkLookupInterval = checkInterval
|
tracker.checkLookupInterval = checkInterval
|
||||||
tracker.Start()
|
tracker.Start()
|
||||||
|
|
||||||
@@ -201,7 +372,9 @@ func TestTracker_NomadChecks_Unhealthy(t *testing.T) {
|
|||||||
|
|
||||||
consul := regmock.NewServiceRegistrationHandler(logger)
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
||||||
checkInterval := 10 * time.Millisecond
|
checkInterval := 10 * time.Millisecond
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||||
tracker.checkLookupInterval = checkInterval
|
tracker.checkLookupInterval = checkInterval
|
||||||
tracker.Start()
|
tracker.Start()
|
||||||
|
|
||||||
@@ -260,7 +433,9 @@ func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) {
|
|||||||
|
|
||||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||||
checkInterval := 10 * time.Millisecond
|
checkInterval := 10 * time.Millisecond
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||||
tracker.checkLookupInterval = checkInterval
|
tracker.checkLookupInterval = checkInterval
|
||||||
tracker.Start()
|
tracker.Start()
|
||||||
|
|
||||||
@@ -301,7 +476,9 @@ func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) {
|
|||||||
|
|
||||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||||
checkInterval := 10 * time.Millisecond
|
checkInterval := 10 * time.Millisecond
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
|
||||||
tracker.checkLookupInterval = checkInterval
|
tracker.checkLookupInterval = checkInterval
|
||||||
tracker.Start()
|
tracker.Start()
|
||||||
|
|
||||||
@@ -380,7 +557,9 @@ func TestTracker_ConsulChecks_Unhealthy(t *testing.T) {
|
|||||||
|
|
||||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||||
checkInterval := 10 * time.Millisecond
|
checkInterval := 10 * time.Millisecond
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||||
tracker.checkLookupInterval = checkInterval
|
tracker.checkLookupInterval = checkInterval
|
||||||
tracker.Start()
|
tracker.Start()
|
||||||
|
|
||||||
@@ -459,7 +638,9 @@ func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) {
|
|||||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||||
checkInterval := 10 * time.Millisecond
|
checkInterval := 10 * time.Millisecond
|
||||||
minHealthyTime := 2 * time.Second
|
minHealthyTime := 2 * time.Second
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, minHealthyTime, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true)
|
||||||
tracker.checkLookupInterval = checkInterval
|
tracker.checkLookupInterval = checkInterval
|
||||||
|
|
||||||
assertChecksHealth := func(exp bool) {
|
assertChecksHealth := func(exp bool) {
|
||||||
@@ -548,7 +729,9 @@ func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) {
|
|||||||
consul := regmock.NewServiceRegistrationHandler(logger)
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
||||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||||
checkInterval := 10 * time.Millisecond
|
checkInterval := 10 * time.Millisecond
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||||
tracker.checkLookupInterval = checkInterval
|
tracker.checkLookupInterval = checkInterval
|
||||||
|
|
||||||
assertChecksHealth := func(exp bool) {
|
assertChecksHealth := func(exp bool) {
|
||||||
@@ -599,7 +782,8 @@ func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) {
|
|||||||
ctx, cancelFn := context.WithCancel(context.Background())
|
ctx, cancelFn := context.WithCancel(context.Background())
|
||||||
defer cancelFn()
|
defer cancelFn()
|
||||||
|
|
||||||
tracker := NewTracker(ctx, logger, alloc, nil, nil, nil, time.Millisecond, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, nil, taskEnvBuilder, nil, nil, time.Millisecond, true)
|
||||||
|
|
||||||
assertNoHealth := func() {
|
assertNoHealth := func() {
|
||||||
require.NoError(t, tracker.ctx.Err())
|
require.NoError(t, tracker.ctx.Err())
|
||||||
@@ -708,7 +892,9 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) {
|
|||||||
|
|
||||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||||
checkInterval := 10 * time.Millisecond
|
checkInterval := 10 * time.Millisecond
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||||
tracker.checkLookupInterval = checkInterval
|
tracker.checkLookupInterval = checkInterval
|
||||||
tracker.Start()
|
tracker.Start()
|
||||||
|
|
||||||
@@ -853,7 +1039,9 @@ func TestTracker_ConsulChecks_OnUpdate(t *testing.T) {
|
|||||||
|
|
||||||
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
|
||||||
checkInterval := 10 * time.Millisecond
|
checkInterval := 10 * time.Millisecond
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
|
||||||
tracker.checkLookupInterval = checkInterval
|
tracker.checkLookupInterval = checkInterval
|
||||||
tracker.Start()
|
tracker.Start()
|
||||||
|
|
||||||
@@ -971,7 +1159,9 @@ func TestTracker_NomadChecks_OnUpdate(t *testing.T) {
|
|||||||
|
|
||||||
consul := regmock.NewServiceRegistrationHandler(logger)
|
consul := regmock.NewServiceRegistrationHandler(logger)
|
||||||
minHealthyTime := 1 * time.Millisecond
|
minHealthyTime := 1 * time.Millisecond
|
||||||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, minHealthyTime, true)
|
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
|
||||||
|
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true)
|
||||||
tracker.checkLookupInterval = 10 * time.Millisecond
|
tracker.checkLookupInterval = 10 * time.Millisecond
|
||||||
tracker.Start()
|
tracker.Start()
|
||||||
|
|
||||||
@@ -1277,7 +1467,7 @@ func TestTracker_evaluateConsulChecks(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
result := evaluateConsulChecks(tc.tg, tc.registrations)
|
result := evaluateConsulChecks(tc.tg.ConsulServices(), tc.registrations)
|
||||||
must.Eq(t, tc.exp, result)
|
must.Eq(t, tc.exp, result)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -133,13 +133,16 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
|||||||
return fmt.Errorf("failed to initialize network configurator: %v", err)
|
return fmt.Errorf("failed to initialize network configurator: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new taskenv.Builder which is used and mutated by networkHook.
|
// Create a new taskenv.Builder which is used by hooks that mutate them to
|
||||||
envBuilder := taskenv.NewBuilder(
|
// build new taskenv.TaskEnv.
|
||||||
config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir)
|
newEnvBuilder := func() *taskenv.Builder {
|
||||||
|
return taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).
|
||||||
|
SetAllocDir(ar.allocDir.AllocDir)
|
||||||
|
}
|
||||||
|
|
||||||
// Create a taskenv.TaskEnv which is used for read only purposes by the
|
// Create a taskenv.TaskEnv which is used for read only purposes by the
|
||||||
// newNetworkHook.
|
// newNetworkHook.
|
||||||
builtTaskEnv := envBuilder.Build()
|
builtTaskEnv := newEnvBuilder().Build()
|
||||||
|
|
||||||
// Create the alloc directory hook. This is run first to ensure the
|
// Create the alloc directory hook. This is run first to ensure the
|
||||||
// directory path exists for other hooks.
|
// directory path exists for other hooks.
|
||||||
@@ -149,14 +152,14 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
|||||||
newCgroupHook(ar.Alloc(), ar.cpusetManager),
|
newCgroupHook(ar.Alloc(), ar.cpusetManager),
|
||||||
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
|
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
|
||||||
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
|
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
|
||||||
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient, ar.checkStore),
|
newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulClient, ar.checkStore),
|
||||||
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
|
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
|
||||||
newGroupServiceHook(groupServiceHookConfig{
|
newGroupServiceHook(groupServiceHookConfig{
|
||||||
alloc: alloc,
|
alloc: alloc,
|
||||||
providerNamespace: alloc.ServiceProviderNamespace(),
|
providerNamespace: alloc.ServiceProviderNamespace(),
|
||||||
serviceRegWrapper: ar.serviceRegWrapper,
|
serviceRegWrapper: ar.serviceRegWrapper,
|
||||||
restarter: ar,
|
restarter: ar,
|
||||||
taskEnvBuilder: envBuilder,
|
taskEnvBuilder: newEnvBuilder(),
|
||||||
networkStatus: ar,
|
networkStatus: ar,
|
||||||
logger: hookLogger,
|
logger: hookLogger,
|
||||||
shutdownDelayCtx: ar.shutdownDelayCtx,
|
shutdownDelayCtx: ar.shutdownDelayCtx,
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/hashicorp/nomad/client/serviceregistration"
|
"github.com/hashicorp/nomad/client/serviceregistration"
|
||||||
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
|
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
|
||||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||||
|
"github.com/hashicorp/nomad/client/taskenv"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -64,6 +65,13 @@ type allocHealthWatcherHook struct {
|
|||||||
// alloc set by new func or Update. Must hold hookLock to access.
|
// alloc set by new func or Update. Must hold hookLock to access.
|
||||||
alloc *structs.Allocation
|
alloc *structs.Allocation
|
||||||
|
|
||||||
|
// taskEnvBuilder is the current builder used to build task environments
|
||||||
|
// for the group and each of its tasks. Must hold hookLock to modify.
|
||||||
|
taskEnvBuilder *taskenv.Builder
|
||||||
|
|
||||||
|
// taskEnvBuilderFactory creates a new *taskenv.Builder instance.
|
||||||
|
taskEnvBuilderFactory func() *taskenv.Builder
|
||||||
|
|
||||||
// isDeploy is true if monitoring a deployment. Set in init(). Must
|
// isDeploy is true if monitoring a deployment. Set in init(). Must
|
||||||
// hold hookLock to access.
|
// hold hookLock to access.
|
||||||
isDeploy bool
|
isDeploy bool
|
||||||
@@ -71,8 +79,15 @@ type allocHealthWatcherHook struct {
|
|||||||
logger hclog.Logger
|
logger hclog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAllocHealthWatcherHook(logger hclog.Logger, alloc *structs.Allocation, hs healthSetter,
|
func newAllocHealthWatcherHook(
|
||||||
listener *cstructs.AllocListener, consul serviceregistration.Handler, checkStore checkstore.Shim) interfaces.RunnerHook {
|
logger hclog.Logger,
|
||||||
|
alloc *structs.Allocation,
|
||||||
|
taskEnvBuilderFactory func() *taskenv.Builder,
|
||||||
|
hs healthSetter,
|
||||||
|
listener *cstructs.AllocListener,
|
||||||
|
consul serviceregistration.Handler,
|
||||||
|
checkStore checkstore.Shim,
|
||||||
|
) interfaces.RunnerHook {
|
||||||
|
|
||||||
// Neither deployments nor migrations care about the health of
|
// Neither deployments nor migrations care about the health of
|
||||||
// non-service jobs so never watch their health
|
// non-service jobs so never watch their health
|
||||||
@@ -85,13 +100,15 @@ func newAllocHealthWatcherHook(logger hclog.Logger, alloc *structs.Allocation, h
|
|||||||
close(closedDone)
|
close(closedDone)
|
||||||
|
|
||||||
h := &allocHealthWatcherHook{
|
h := &allocHealthWatcherHook{
|
||||||
alloc: alloc,
|
alloc: alloc,
|
||||||
cancelFn: func() {}, // initialize to prevent nil func panics
|
taskEnvBuilderFactory: taskEnvBuilderFactory,
|
||||||
watchDone: closedDone,
|
taskEnvBuilder: taskEnvBuilderFactory(),
|
||||||
consul: consul,
|
cancelFn: func() {}, // initialize to prevent nil func panics
|
||||||
checkStore: checkStore,
|
watchDone: closedDone,
|
||||||
healthSetter: hs,
|
consul: consul,
|
||||||
listener: listener,
|
checkStore: checkStore,
|
||||||
|
healthSetter: hs,
|
||||||
|
listener: listener,
|
||||||
}
|
}
|
||||||
|
|
||||||
h.logger = logger.Named(h.Name())
|
h.logger = logger.Named(h.Name())
|
||||||
@@ -138,7 +155,7 @@ func (h *allocHealthWatcherHook) init() error {
|
|||||||
h.logger.Trace("watching", "deadline", deadline, "checks", useChecks, "min_healthy_time", minHealthyTime)
|
h.logger.Trace("watching", "deadline", deadline, "checks", useChecks, "min_healthy_time", minHealthyTime)
|
||||||
// Create a new tracker, start it, and watch for health results.
|
// Create a new tracker, start it, and watch for health results.
|
||||||
tracker := allochealth.NewTracker(
|
tracker := allochealth.NewTracker(
|
||||||
ctx, h.logger, h.alloc, h.listener, h.consul, h.checkStore, minHealthyTime, useChecks,
|
ctx, h.logger, h.alloc, h.listener, h.taskEnvBuilder, h.consul, h.checkStore, minHealthyTime, useChecks,
|
||||||
)
|
)
|
||||||
tracker.Start()
|
tracker.Start()
|
||||||
|
|
||||||
@@ -182,6 +199,9 @@ func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) err
|
|||||||
// Update alloc
|
// Update alloc
|
||||||
h.alloc = req.Alloc
|
h.alloc = req.Alloc
|
||||||
|
|
||||||
|
// Create a new taskEnvBuilder with the updated alloc and a nil task
|
||||||
|
h.taskEnvBuilder = h.taskEnvBuilderFactory().UpdateTask(req.Alloc, nil)
|
||||||
|
|
||||||
return h.init()
|
return h.init()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/hashicorp/nomad/client/serviceregistration"
|
"github.com/hashicorp/nomad/client/serviceregistration"
|
||||||
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
||||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||||
|
"github.com/hashicorp/nomad/client/taskenv"
|
||||||
"github.com/hashicorp/nomad/helper/testlog"
|
"github.com/hashicorp/nomad/helper/testlog"
|
||||||
"github.com/hashicorp/nomad/helper/uuid"
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
"github.com/hashicorp/nomad/nomad/mock"
|
"github.com/hashicorp/nomad/nomad/mock"
|
||||||
@@ -97,7 +98,8 @@ func TestHealthHook_PrerunPostrun(t *testing.T) {
|
|||||||
hs := &mockHealthSetter{}
|
hs := &mockHealthSetter{}
|
||||||
|
|
||||||
checks := new(mock.CheckShim)
|
checks := new(mock.CheckShim)
|
||||||
h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul, checks)
|
alloc := mock.Alloc()
|
||||||
|
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks)
|
||||||
|
|
||||||
// Assert we implemented the right interfaces
|
// Assert we implemented the right interfaces
|
||||||
prerunh, ok := h.(interfaces.RunnerPrerunHook)
|
prerunh, ok := h.(interfaces.RunnerPrerunHook)
|
||||||
@@ -136,7 +138,7 @@ func TestHealthHook_PrerunUpdatePostrun(t *testing.T) {
|
|||||||
hs := &mockHealthSetter{}
|
hs := &mockHealthSetter{}
|
||||||
|
|
||||||
checks := new(mock.CheckShim)
|
checks := new(mock.CheckShim)
|
||||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||||
|
|
||||||
// Prerun
|
// Prerun
|
||||||
require.NoError(h.Prerun())
|
require.NoError(h.Prerun())
|
||||||
@@ -176,7 +178,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) {
|
|||||||
hs := &mockHealthSetter{}
|
hs := &mockHealthSetter{}
|
||||||
|
|
||||||
checks := new(mock.CheckShim)
|
checks := new(mock.CheckShim)
|
||||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||||
|
|
||||||
// Set a DeploymentID to cause ClearHealth to be called
|
// Set a DeploymentID to cause ClearHealth to be called
|
||||||
alloc.DeploymentID = uuid.Generate()
|
alloc.DeploymentID = uuid.Generate()
|
||||||
@@ -217,8 +219,9 @@ func TestHealthHook_Postrun(t *testing.T) {
|
|||||||
consul := regMock.NewServiceRegistrationHandler(logger)
|
consul := regMock.NewServiceRegistrationHandler(logger)
|
||||||
hs := &mockHealthSetter{}
|
hs := &mockHealthSetter{}
|
||||||
|
|
||||||
|
alloc := mock.Alloc()
|
||||||
checks := new(mock.CheckShim)
|
checks := new(mock.CheckShim)
|
||||||
h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||||
|
|
||||||
// Postrun
|
// Postrun
|
||||||
require.NoError(h.Postrun())
|
require.NoError(h.Postrun())
|
||||||
@@ -285,7 +288,7 @@ func TestHealthHook_SetHealth_healthy(t *testing.T) {
|
|||||||
hs := newMockHealthSetter()
|
hs := newMockHealthSetter()
|
||||||
|
|
||||||
checks := new(mock.CheckShim)
|
checks := new(mock.CheckShim)
|
||||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||||
|
|
||||||
// Prerun
|
// Prerun
|
||||||
require.NoError(h.Prerun())
|
require.NoError(h.Prerun())
|
||||||
@@ -374,7 +377,7 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) {
|
|||||||
hs := newMockHealthSetter()
|
hs := newMockHealthSetter()
|
||||||
|
|
||||||
checks := new(mock.CheckShim)
|
checks := new(mock.CheckShim)
|
||||||
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
|
||||||
|
|
||||||
// Prerun
|
// Prerun
|
||||||
require.NoError(h.Prerun())
|
require.NoError(h.Prerun())
|
||||||
@@ -395,7 +398,8 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) {
|
|||||||
func TestHealthHook_SystemNoop(t *testing.T) {
|
func TestHealthHook_SystemNoop(t *testing.T) {
|
||||||
ci.Parallel(t)
|
ci.Parallel(t)
|
||||||
|
|
||||||
h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.SystemAlloc(), nil, nil, nil, nil)
|
alloc := mock.SystemAlloc()
|
||||||
|
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil)
|
||||||
|
|
||||||
// Assert that it's the noop impl
|
// Assert that it's the noop impl
|
||||||
_, ok := h.(noopAllocHealthWatcherHook)
|
_, ok := h.(noopAllocHealthWatcherHook)
|
||||||
@@ -416,9 +420,16 @@ func TestHealthHook_SystemNoop(t *testing.T) {
|
|||||||
func TestHealthHook_BatchNoop(t *testing.T) {
|
func TestHealthHook_BatchNoop(t *testing.T) {
|
||||||
ci.Parallel(t)
|
ci.Parallel(t)
|
||||||
|
|
||||||
h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.BatchAlloc(), nil, nil, nil, nil)
|
alloc := mock.BatchAlloc()
|
||||||
|
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil)
|
||||||
|
|
||||||
// Assert that it's the noop impl
|
// Assert that it's the noop impl
|
||||||
_, ok := h.(noopAllocHealthWatcherHook)
|
_, ok := h.(noopAllocHealthWatcherHook)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func taskEnvBuilderFactory(alloc *structs.Allocation) func() *taskenv.Builder {
|
||||||
|
return func() *taskenv.Builder {
|
||||||
|
return taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -15,41 +15,51 @@ func InterpolateServices(taskEnv *TaskEnv, services []*structs.Service) []*struc
|
|||||||
|
|
||||||
interpolated := make([]*structs.Service, len(services))
|
interpolated := make([]*structs.Service, len(services))
|
||||||
|
|
||||||
for i, origService := range services {
|
for i, service := range services {
|
||||||
// Create a copy as we need to re-interpolate every time the
|
interpolated[i] = InterpolateService(taskEnv, service)
|
||||||
// environment changes.
|
|
||||||
service := origService.Copy()
|
|
||||||
|
|
||||||
for _, check := range service.Checks {
|
|
||||||
check.Name = taskEnv.ReplaceEnv(check.Name)
|
|
||||||
check.Type = taskEnv.ReplaceEnv(check.Type)
|
|
||||||
check.Command = taskEnv.ReplaceEnv(check.Command)
|
|
||||||
check.Args = taskEnv.ParseAndReplace(check.Args)
|
|
||||||
check.Path = taskEnv.ReplaceEnv(check.Path)
|
|
||||||
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
|
|
||||||
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
|
|
||||||
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
|
|
||||||
check.Method = taskEnv.ReplaceEnv(check.Method)
|
|
||||||
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
|
|
||||||
check.Header = interpolateMapStringSliceString(taskEnv, check.Header)
|
|
||||||
}
|
|
||||||
|
|
||||||
service.Name = taskEnv.ReplaceEnv(service.Name)
|
|
||||||
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
|
|
||||||
service.Address = taskEnv.ReplaceEnv(service.Address)
|
|
||||||
service.Tags = taskEnv.ParseAndReplace(service.Tags)
|
|
||||||
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
|
|
||||||
service.Meta = interpolateMapStringString(taskEnv, service.Meta)
|
|
||||||
service.CanaryMeta = interpolateMapStringString(taskEnv, service.CanaryMeta)
|
|
||||||
service.TaggedAddresses = interpolateMapStringString(taskEnv, service.TaggedAddresses)
|
|
||||||
interpolateConnect(taskEnv, service.Connect)
|
|
||||||
|
|
||||||
interpolated[i] = service
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return interpolated
|
return interpolated
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func InterpolateService(taskEnv *TaskEnv, origService *structs.Service) *structs.Service {
|
||||||
|
// Guard against not having a valid taskEnv. This can be the case if the
|
||||||
|
// PreKilling or Exited hook is run before Poststart.
|
||||||
|
if taskEnv == nil || origService == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a copy as we need to re-interpolate every time the
|
||||||
|
// environment changes.
|
||||||
|
service := origService.Copy()
|
||||||
|
|
||||||
|
for _, check := range service.Checks {
|
||||||
|
check.Name = taskEnv.ReplaceEnv(check.Name)
|
||||||
|
check.Type = taskEnv.ReplaceEnv(check.Type)
|
||||||
|
check.Command = taskEnv.ReplaceEnv(check.Command)
|
||||||
|
check.Args = taskEnv.ParseAndReplace(check.Args)
|
||||||
|
check.Path = taskEnv.ReplaceEnv(check.Path)
|
||||||
|
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
|
||||||
|
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
|
||||||
|
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
|
||||||
|
check.Method = taskEnv.ReplaceEnv(check.Method)
|
||||||
|
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
|
||||||
|
check.Header = interpolateMapStringSliceString(taskEnv, check.Header)
|
||||||
|
}
|
||||||
|
|
||||||
|
service.Name = taskEnv.ReplaceEnv(service.Name)
|
||||||
|
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
|
||||||
|
service.Address = taskEnv.ReplaceEnv(service.Address)
|
||||||
|
service.Tags = taskEnv.ParseAndReplace(service.Tags)
|
||||||
|
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
|
||||||
|
service.Meta = interpolateMapStringString(taskEnv, service.Meta)
|
||||||
|
service.CanaryMeta = interpolateMapStringString(taskEnv, service.CanaryMeta)
|
||||||
|
service.TaggedAddresses = interpolateMapStringString(taskEnv, service.TaggedAddresses)
|
||||||
|
interpolateConnect(taskEnv, service.Connect)
|
||||||
|
|
||||||
|
return service
|
||||||
|
}
|
||||||
|
|
||||||
func interpolateMapStringSliceString(taskEnv *TaskEnv, orig map[string][]string) map[string][]string {
|
func interpolateMapStringSliceString(taskEnv *TaskEnv, orig map[string][]string) map[string][]string {
|
||||||
if len(orig) == 0 {
|
if len(orig) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -79,6 +79,7 @@ func Alloc() *structs.Allocation {
|
|||||||
ClientStatus: structs.AllocClientStatusPending,
|
ClientStatus: structs.AllocClientStatusPending,
|
||||||
}
|
}
|
||||||
alloc.JobID = alloc.Job.ID
|
alloc.JobID = alloc.Job.ID
|
||||||
|
alloc.Canonicalize()
|
||||||
return alloc
|
return alloc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user