client: improve group service stanza interpolation and check_re… (#6586)

* client: improve group service stanza interpolation and check_restart support

Interpolation can now be done on group service stanzas. Note that some task runtime specific information
that was previously available when the service was registered poststart of a task is no longer available.

The check_restart stanza for checks defined on group services will now properly restart the allocation upon
check failures if configured.
This commit is contained in:
Nick Ethier
2019-11-18 13:04:01 -05:00
committed by GitHub
parent 39f1d61938
commit 387b016ac4
29 changed files with 922 additions and 734 deletions

View File

@@ -419,7 +419,7 @@ OUTER:
type taskHealthState struct {
task *structs.Task
state *structs.TaskState
taskRegistrations *consul.TaskRegistration
taskRegistrations *consul.ServiceRegistrations
}
// event takes the deadline time for the allocation to be healthy and the update

View File

@@ -22,6 +22,7 @@ import (
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
@@ -1001,6 +1002,39 @@ func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent
return tr.Restart(context.TODO(), taskEvent, false)
}
// Restart satisfies the WorkloadRestarter interface restarts all task runners
// concurrently
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
waitCh := make(chan struct{})
var err *multierror.Error
var errMutex sync.Mutex
go func() {
var wg sync.WaitGroup
defer close(waitCh)
for tn, tr := range ar.tasks {
wg.Add(1)
go func(taskName string, r agentconsul.WorkloadRestarter) {
defer wg.Done()
e := r.Restart(ctx, event, failure)
if e != nil {
errMutex.Lock()
defer errMutex.Unlock()
err = multierror.Append(err, fmt.Errorf("failed to restart task %s: %v", taskName, e))
}
}(tn, tr)
}
wg.Wait()
}()
select {
case <-waitCh:
case <-ctx.Done():
}
return err.ErrorOrNil()
}
// RestartAll signalls all task runners in the allocation to restart and passes
// a copy of the task event to each restart event.
// Returns any errors in a concatenated form.

View File

@@ -7,6 +7,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
@@ -125,7 +126,13 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient),
newNetworkHook(hookLogger, ns, alloc, nm, nc),
newGroupServiceHook(hookLogger, alloc, ar.consulClient),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
consul: ar.consulClient,
restarter: ar,
taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir),
logger: hookLogger,
}),
newConsulSockHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
}

View File

@@ -528,7 +528,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) {
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
return &consul.AllocRegistration{
Tasks: map[string]*consul.TaskRegistration{
Tasks: map[string]*consul.ServiceRegistrations{
task.Name: {
Services: map[string]*consul.ServiceRegistration{
"123": {
@@ -847,7 +847,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
return &consul.AllocRegistration{
Tasks: map[string]*consul.TaskRegistration{
Tasks: map[string]*consul.ServiceRegistrations{
task.Name: {
Services: map[string]*consul.ServiceRegistration{
"123": {

View File

@@ -33,6 +33,12 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
// 5. Assert task and logmon are cleaned up
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Services = []*structs.Service{
{
Name: "foo",
PortLabel: "8888",
},
}
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
@@ -117,13 +123,12 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
// 2 removals (canary+noncanary) during prekill
// 2 removals (canary+noncanary) during exited
// 2 removals (canary+noncanary) during stop
// 1 remove group during stop
// 2 removals (canary+noncanary) group during stop
consulOps := conf2.Consul.(*consul.MockConsulServiceClient).GetOps()
require.Len(t, consulOps, 7)
for _, op := range consulOps[:6] {
require.Len(t, consulOps, 8)
for _, op := range consulOps {
require.Equal(t, "remove", op.Op)
}
require.Equal(t, "remove_group", consulOps[6].Op)
// Assert terminated task event was emitted
events := ar2.AllocState().TaskStates[task.Name].Events

View File

@@ -3,30 +3,63 @@ package allocrunner
import (
"sync"
hclog "github.com/hashicorp/go-hclog"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
// groupServiceHook manages task group Consul service registration and
// deregistration.
type groupServiceHook struct {
alloc *structs.Allocation
allocID string
group string
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
prerun bool
mu sync.Mutex
logger log.Logger
// The following fields may be updated
canary bool
services []*structs.Service
networks structs.Networks
taskEnvBuilder *taskenv.Builder
// Since Update() may be called concurrently with any other hook all
// hook methods must be fully serialized
mu sync.Mutex
}
func newGroupServiceHook(logger hclog.Logger, alloc *structs.Allocation, consulClient consul.ConsulServiceAPI) *groupServiceHook {
type groupServiceHookConfig struct {
alloc *structs.Allocation
consul consul.ConsulServiceAPI
restarter agentconsul.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
logger log.Logger
}
func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
h := &groupServiceHook{
alloc: alloc,
consulClient: consulClient,
allocID: cfg.alloc.ID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
taskEnvBuilder: cfg.taskEnvBuilder,
}
h.logger = cfg.logger.Named(h.Name())
h.services = cfg.alloc.Job.LookupTaskGroup(h.group).Services
if cfg.alloc.AllocatedResources != nil {
h.networks = cfg.alloc.AllocatedResources.Shared.Networks
}
if cfg.alloc.DeploymentStatus != nil {
h.canary = cfg.alloc.DeploymentStatus.Canary
}
h.logger = logger.Named(h.Name())
return h
}
@@ -41,14 +74,39 @@ func (h *groupServiceHook) Prerun() error {
h.prerun = true
h.mu.Unlock()
}()
return h.consulClient.RegisterGroup(h.alloc)
if len(h.services) == 0 {
return nil
}
services := h.getWorkloadServices()
return h.consulClient.RegisterWorkload(services)
}
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()
oldAlloc := h.alloc
h.alloc = req.Alloc
oldWorkloadServices := h.getWorkloadServices()
// Store new updated values out of request
canary := false
if req.Alloc.DeploymentStatus != nil {
canary = req.Alloc.DeploymentStatus.Canary
}
var networks structs.Networks
if req.Alloc.AllocatedResources != nil {
networks = req.Alloc.AllocatedResources.Shared.Networks
}
// Update group service hook fields
h.networks = networks
h.services = req.Alloc.Job.LookupTaskGroup(h.group).Services
h.canary = canary
h.taskEnvBuilder.UpdateTask(req.Alloc, nil)
// Create new task services struct with those new values
newWorkloadServices := h.getWorkloadServices()
if !h.prerun {
// Update called before Prerun. Update alloc and exit to allow
@@ -56,11 +114,57 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
return nil
}
return h.consulClient.UpdateGroup(oldAlloc, h.alloc)
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}
func (h *groupServiceHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.consulClient.RemoveGroup(h.alloc)
h.deregister()
return nil
}
func (h *groupServiceHook) driverNet() *drivers.DriverNetwork {
if len(h.networks) == 0 {
return nil
}
//TODO(schmichael) only support one network for now
net := h.networks[0]
//TODO(schmichael) there's probably a better way than hacking driver network
return &drivers.DriverNetwork{
AutoAdvertise: true,
IP: net.IP,
// Copy PortLabels from group network
PortMap: net.PortLabels(),
}
}
// deregister services from Consul.
func (h *groupServiceHook) deregister() {
if len(h.services) > 0 {
workloadServices := h.getWorkloadServices()
h.consulClient.RemoveWorkload(workloadServices)
// Canary flag may be getting flipped when the alloc is being
// destroyed, so remove both variations of the service
workloadServices.Canary = !workloadServices.Canary
h.consulClient.RemoveWorkload(workloadServices)
}
}
func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)
// Create task services struct with request's driver metadata
return &agentconsul.WorkloadServices{
AllocID: h.allocID,
Group: h.group,
Restarter: h.restarter,
Services: interpolatedServices,
DriverNetwork: h.driverNet(),
Networks: h.networks,
Canary: h.canary,
}
}

View File

@@ -1,10 +1,15 @@
package allocrunner
import (
"io/ioutil"
"testing"
"time"
consulapi "github.com/hashicorp/consul/api"
ctestutil "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
@@ -22,10 +27,20 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Services = []*structs.Service{{
Name: "foo",
PortLabel: "9999",
}}
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
h := newGroupServiceHook(logger, alloc, consulClient)
h := newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
consul: consulClient,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
@@ -34,10 +49,10 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
require.NoError(t, h.Postrun())
ops := consulClient.GetOps()
require.Len(t, ops, 3)
require.Equal(t, "add_group", ops[0].Op)
require.Equal(t, "update_group", ops[1].Op)
require.Equal(t, "remove_group", ops[2].Op)
require.Len(t, ops, 4)
require.Equal(t, "add", ops[0].Op)
require.Equal(t, "update", ops[1].Op)
require.Equal(t, "remove", ops[2].Op)
}
// TestGroupServiceHook_GroupServices asserts group service hooks with group
@@ -49,7 +64,13 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
h := newGroupServiceHook(logger, alloc, consulClient)
h := newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
consul: consulClient,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
@@ -58,18 +79,19 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
require.NoError(t, h.Postrun())
ops := consulClient.GetOps()
require.Len(t, ops, 3)
require.Equal(t, "add_group", ops[0].Op)
require.Equal(t, "update_group", ops[1].Op)
require.Equal(t, "remove_group", ops[2].Op)
require.Len(t, ops, 4)
require.Equal(t, "add", ops[0].Op)
require.Equal(t, "update", ops[1].Op)
require.Equal(t, "remove", ops[2].Op)
}
// TestGroupServiceHook_Error asserts group service hooks with group
// services but no group network returns an error.
func TestGroupServiceHook_Error(t *testing.T) {
func TestGroupServiceHook_NoNetwork(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Networks = []*structs.NetworkResource{}
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
tg.Services = []*structs.Service{
{
@@ -82,15 +104,147 @@ func TestGroupServiceHook_Error(t *testing.T) {
}
logger := testlog.HCLogger(t)
// No need to set Consul client or call Run. This hould fail before
// attempting to register.
consulClient := agentconsul.NewServiceClient(nil, logger, false)
consulClient := consul.NewMockConsulServiceClient(t, logger)
h := newGroupServiceHook(logger, alloc, consulClient)
require.Error(t, h.Prerun())
h := newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
consul: consulClient,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.Error(t, h.Update(req))
require.NoError(t, h.Update(req))
require.NoError(t, h.Postrun())
ops := consulClient.GetOps()
require.Len(t, ops, 4)
require.Equal(t, "add", ops[0].Op)
require.Equal(t, "update", ops[1].Op)
require.Equal(t, "remove", ops[2].Op)
}
func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Networks = []*structs.NetworkResource{}
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
tg.Services = []*structs.Service{
{
Name: "testconnect",
PortLabel: "9999",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
},
}
logger := testlog.HCLogger(t)
consulClient := consul.NewMockConsulServiceClient(t, logger)
h := newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
consul: consulClient,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
services := h.getWorkloadServices()
require.Len(t, services.Services, 1)
}
// TestGroupServiceHook_Update08Alloc asserts that adding group services to a previously
// 0.8 alloc works.
//
// COMPAT(0.11) Only valid for upgrades from 0.8.
func TestGroupServiceHook_Update08Alloc(t *testing.T) {
// Create an embedded Consul server
testconsul, err := ctestutil.NewTestServerConfig(func(c *ctestutil.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
if err != nil {
t.Fatalf("error starting test consul server: %v", err)
}
defer testconsul.Stop()
consulConfig := consulapi.DefaultConfig()
consulConfig.Address = testconsul.HTTPAddr
consulClient, err := consulapi.NewClient(consulConfig)
require.NoError(t, err)
serviceClient := agentconsul.NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true)
// Lower periodicInterval to ensure periodic syncing doesn't improperly
// remove Connect services.
//const interval = 50 * time.Millisecond
//serviceClient.periodicInterval = interval
// Disable deregistration probation to test syncing
//serviceClient.deregisterProbationExpiry = time.Time{}
go serviceClient.Run()
defer serviceClient.Shutdown()
// Create new 0.10-style alloc
alloc := mock.Alloc()
alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{
{
Mode: "bridge",
IP: "10.0.0.1",
DynamicPorts: []structs.Port{
{
Label: "connect-proxy-testconnect",
Value: 9999,
To: 9998,
},
},
},
}
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
tg.Services = []*structs.Service{
{
Name: "testconnect",
PortLabel: "9999",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{
Proxy: &structs.ConsulProxy{
LocalServicePort: 9000,
},
},
},
},
}
// Create old 0.8-style alloc from new alloc
oldAlloc := alloc.Copy()
oldAlloc.AllocatedResources = nil
oldAlloc.Job.LookupTaskGroup(alloc.TaskGroup).Services = nil
// Create the group service hook
h := newGroupServiceHook(groupServiceHookConfig{
alloc: oldAlloc,
consul: serviceClient,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), oldAlloc, nil, oldAlloc.Job.Region),
logger: testlog.HCLogger(t),
})
require.NoError(t, h.Prerun())
require.NoError(t, h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc}))
// Assert the group and sidecar services are registered
require.Eventually(t, func() bool {
services, err := consulClient.Agent().Services()
require.NoError(t, err)
return len(services) == 2
}, 3*time.Second, 100*time.Millisecond)
require.Error(t, h.Postrun())
}

View File

@@ -243,7 +243,7 @@ func TestHealthHook_SetHealth(t *testing.T) {
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*agentconsul.TaskRegistration{
taskRegs := map[string]*agentconsul.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {

View File

@@ -80,7 +80,7 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *interfaces.TaskP
// it to the secrets directory like Vault tokens.
fn := filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json")
id := agentconsul.MakeTaskServiceID(h.alloc.ID, "group-"+tg.Name, service)
id := agentconsul.MakeAllocServiceID(h.alloc.ID, "group-"+tg.Name, service)
h.logger.Debug("bootstrapping envoy", "sidecar_for", service.Name, "boostrap_file", fn, "sidecar_for_id", id, "grpc_addr", grpcAddr)
// Since Consul services are registered asynchronously with this task

View File

@@ -86,7 +86,7 @@ func TestTaskRunner_EnvoyBootstrapHook_Ok(t *testing.T) {
consulClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), logger, true)
go consulClient.Run()
defer consulClient.Shutdown()
require.NoError(t, consulClient.RegisterGroup(alloc))
require.NoError(t, consulClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter())))
// Run Connect bootstrap Hook
h := newEnvoyBootstrapHook(alloc, testconsul.HTTPAddr, logger)

View File

@@ -179,7 +179,7 @@ func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck {
if check.Type != structs.ServiceCheckScript {
continue
}
serviceID := agentconsul.MakeTaskServiceID(
serviceID := agentconsul.MakeAllocServiceID(
h.alloc.ID, h.task.Name, service)
sc := newScriptCheck(&scriptCheckConfig{
allocID: h.alloc.ID,
@@ -213,7 +213,7 @@ func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck {
continue
}
groupTaskName := "group-" + tg.Name
serviceID := agentconsul.MakeTaskServiceID(
serviceID := agentconsul.MakeAllocServiceID(
h.alloc.ID, groupTaskName, service)
sc := newScriptCheck(&scriptCheckConfig{
allocID: h.alloc.ID,

View File

@@ -27,7 +27,7 @@ type serviceHookConfig struct {
consul consul.ConsulServiceAPI
// Restarter is a subset of the TaskLifecycle interface
restarter agentconsul.TaskRestarter
restarter agentconsul.WorkloadRestarter
logger log.Logger
}
@@ -36,7 +36,7 @@ type serviceHook struct {
consul consul.ConsulServiceAPI
allocID string
taskName string
restarter agentconsul.TaskRestarter
restarter agentconsul.WorkloadRestarter
logger log.Logger
// The following fields may be updated
@@ -97,9 +97,9 @@ func (h *serviceHook) Poststart(ctx context.Context, req *interfaces.TaskPoststa
h.taskEnv = req.TaskEnv
// Create task services struct with request's driver metadata
taskServices := h.getTaskServices()
workloadServices := h.getWorkloadServices()
return h.consul.RegisterTask(taskServices)
return h.consul.RegisterWorkload(workloadServices)
}
func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error {
@@ -108,7 +108,7 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
// Create old task services struct with request's driver metadata as it
// can't change due to Updates
oldTaskServices := h.getTaskServices()
oldWorkloadServices := h.getWorkloadServices()
// Store new updated values out of request
canary := false
@@ -142,9 +142,9 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
h.canary = canary
// Create new task services struct with those new values
newTaskServices := h.getTaskServices()
newWorkloadServices := h.getWorkloadServices()
return h.consul.UpdateTask(oldTaskServices, newTaskServices)
return h.consul.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}
func (h *serviceHook) PreKilling(ctx context.Context, req *interfaces.TaskPreKillRequest, resp *interfaces.TaskPreKillResponse) error {
@@ -176,13 +176,13 @@ func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *in
// deregister services from Consul.
func (h *serviceHook) deregister() {
taskServices := h.getTaskServices()
h.consul.RemoveTask(taskServices)
workloadServices := h.getWorkloadServices()
h.consul.RemoveWorkload(workloadServices)
// Canary flag may be getting flipped when the alloc is being
// destroyed, so remove both variations of the service
taskServices.Canary = !taskServices.Canary
h.consul.RemoveTask(taskServices)
workloadServices.Canary = !workloadServices.Canary
h.consul.RemoveWorkload(workloadServices)
}
@@ -193,14 +193,14 @@ func (h *serviceHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest,
return nil
}
func (h *serviceHook) getTaskServices() *agentconsul.TaskServices {
func (h *serviceHook) getWorkloadServices() *agentconsul.WorkloadServices {
// Interpolate with the task's environment
interpolatedServices := interpolateServices(h.taskEnv, h.services)
interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services)
// Create task services struct with request's driver metadata
return &agentconsul.TaskServices{
return &agentconsul.WorkloadServices{
AllocID: h.allocID,
Name: h.taskName,
Task: h.taskName,
Restarter: h.restarter,
Services: interpolatedServices,
DriverExec: h.driverExec,
@@ -209,62 +209,3 @@ func (h *serviceHook) getTaskServices() *agentconsul.TaskServices {
Canary: h.canary,
}
}
// interpolateServices returns an interpolated copy of services and checks with
// values from the task's environment.
func interpolateServices(taskEnv *taskenv.TaskEnv, services []*structs.Service) []*structs.Service {
// Guard against not having a valid taskEnv. This can be the case if the
// PreKilling or Exited hook is run before Poststart.
if taskEnv == nil || len(services) == 0 {
return nil
}
interpolated := make([]*structs.Service, len(services))
for i, origService := range services {
// Create a copy as we need to reinterpolate every time the
// environment changes
service := origService.Copy()
for _, check := range service.Checks {
check.Name = taskEnv.ReplaceEnv(check.Name)
check.Type = taskEnv.ReplaceEnv(check.Type)
check.Command = taskEnv.ReplaceEnv(check.Command)
check.Args = taskEnv.ParseAndReplace(check.Args)
check.Path = taskEnv.ReplaceEnv(check.Path)
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
check.Method = taskEnv.ReplaceEnv(check.Method)
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
if len(check.Header) > 0 {
header := make(map[string][]string, len(check.Header))
for k, vs := range check.Header {
newVals := make([]string, len(vs))
for i, v := range vs {
newVals[i] = taskEnv.ReplaceEnv(v)
}
header[taskEnv.ReplaceEnv(k)] = newVals
}
check.Header = header
}
}
service.Name = taskEnv.ReplaceEnv(service.Name)
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
service.Tags = taskEnv.ParseAndReplace(service.Tags)
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
if len(service.Meta) > 0 {
meta := make(map[string]string, len(service.Meta))
for k, v := range service.Meta {
meta[k] = taskEnv.ReplaceEnv(v)
}
service.Meta = meta
}
interpolated[i] = service
}
return interpolated
}

View File

@@ -1,12 +1,7 @@
package taskrunner
import (
"testing"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
// Statically assert the stats hook implements the expected interfaces
@@ -14,80 +9,3 @@ var _ interfaces.TaskPoststartHook = (*serviceHook)(nil)
var _ interfaces.TaskExitedHook = (*serviceHook)(nil)
var _ interfaces.TaskPreKillHook = (*serviceHook)(nil)
var _ interfaces.TaskUpdateHook = (*serviceHook)(nil)
// TestTaskRunner_ServiceHook_InterpolateServices asserts that all service
// and check fields are properly interpolated.
func TestTaskRunner_ServiceHook_InterpolateServices(t *testing.T) {
t.Parallel()
services := []*structs.Service{
{
Name: "${name}",
PortLabel: "${portlabel}",
Tags: []string{"${tags}"},
Checks: []*structs.ServiceCheck{
{
Name: "${checkname}",
Type: "${checktype}",
Command: "${checkcmd}",
Args: []string{"${checkarg}"},
Path: "${checkstr}",
Protocol: "${checkproto}",
PortLabel: "${checklabel}",
InitialStatus: "${checkstatus}",
Method: "${checkmethod}",
Header: map[string][]string{
"${checkheaderk}": {"${checkheaderv}"},
},
},
},
},
}
env := &taskenv.TaskEnv{
EnvMap: map[string]string{
"name": "name",
"portlabel": "portlabel",
"tags": "tags",
"checkname": "checkname",
"checktype": "checktype",
"checkcmd": "checkcmd",
"checkarg": "checkarg",
"checkstr": "checkstr",
"checkpath": "checkpath",
"checkproto": "checkproto",
"checklabel": "checklabel",
"checkstatus": "checkstatus",
"checkmethod": "checkmethod",
"checkheaderk": "checkheaderk",
"checkheaderv": "checkheaderv",
},
}
interpolated := interpolateServices(env, services)
exp := []*structs.Service{
{
Name: "name",
PortLabel: "portlabel",
Tags: []string{"tags"},
Checks: []*structs.ServiceCheck{
{
Name: "checkname",
Type: "checktype",
Command: "checkcmd",
Args: []string{"checkarg"},
Path: "checkstr",
Protocol: "checkproto",
PortLabel: "checklabel",
InitialStatus: "checkstatus",
Method: "checkmethod",
Header: map[string][]string{
"checkheaderk": {"checkheaderv"},
},
},
},
},
}
require.Equal(t, exp, interpolated)
}

View File

@@ -2,18 +2,14 @@ package consul
import (
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
type ConsulServiceAPI interface {
RegisterGroup(*structs.Allocation) error
RemoveGroup(*structs.Allocation) error
UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error
RegisterTask(*consul.TaskServices) error
RemoveTask(*consul.TaskServices)
UpdateTask(old, newTask *consul.TaskServices) error
RegisterWorkload(*consul.WorkloadServices) error
RemoveWorkload(*consul.WorkloadServices)
UpdateWorkload(old, newTask *consul.WorkloadServices) error
AllocRegistrations(allocID string) (*consul.AllocRegistration, error)
UpdateTTL(id, output, status string) error
}

View File

@@ -7,7 +7,6 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
testing "github.com/mitchellh/go-testing-interface"
)
@@ -54,60 +53,33 @@ func NewMockConsulServiceClient(t testing.T, logger log.Logger) *MockConsulServi
return &m
}
func (m *MockConsulServiceClient) RegisterGroup(alloc *structs.Allocation) error {
func (m *MockConsulServiceClient) UpdateWorkload(old, newSvcs *consul.WorkloadServices) error {
m.mu.Lock()
defer m.mu.Unlock()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
m.logger.Trace("RegisterGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services))
m.ops = append(m.ops, NewMockConsulOp("add_group", alloc.ID, alloc.TaskGroup))
return nil
}
func (m *MockConsulServiceClient) UpdateGroup(_, alloc *structs.Allocation) error {
m.mu.Lock()
defer m.mu.Unlock()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
m.logger.Trace("UpdateGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services))
m.ops = append(m.ops, NewMockConsulOp("update_group", alloc.ID, alloc.TaskGroup))
return nil
}
func (m *MockConsulServiceClient) RemoveGroup(alloc *structs.Allocation) error {
m.mu.Lock()
defer m.mu.Unlock()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
m.logger.Trace("RemoveGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services))
m.ops = append(m.ops, NewMockConsulOp("remove_group", alloc.ID, alloc.TaskGroup))
return nil
}
func (m *MockConsulServiceClient) UpdateTask(old, newSvcs *consul.TaskServices) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Trace("UpdateTask", "alloc_id", newSvcs.AllocID, "task", newSvcs.Name,
m.logger.Trace("UpdateWorkload", "alloc_id", newSvcs.AllocID, "name", newSvcs.Name(),
"old_services", len(old.Services), "new_services", len(newSvcs.Services),
)
m.ops = append(m.ops, NewMockConsulOp("update", newSvcs.AllocID, newSvcs.Name))
m.ops = append(m.ops, NewMockConsulOp("update", newSvcs.AllocID, newSvcs.Name()))
return nil
}
func (m *MockConsulServiceClient) RegisterTask(task *consul.TaskServices) error {
func (m *MockConsulServiceClient) RegisterWorkload(svcs *consul.WorkloadServices) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Trace("RegisterTask", "alloc_id", task.AllocID, "task", task.Name,
"services", len(task.Services),
m.logger.Trace("RegisterWorkload", "alloc_id", svcs.AllocID, "name", svcs.Name(),
"services", len(svcs.Services),
)
m.ops = append(m.ops, NewMockConsulOp("add", task.AllocID, task.Name))
m.ops = append(m.ops, NewMockConsulOp("add", svcs.AllocID, svcs.Name()))
return nil
}
func (m *MockConsulServiceClient) RemoveTask(task *consul.TaskServices) {
func (m *MockConsulServiceClient) RemoveWorkload(svcs *consul.WorkloadServices) {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Trace("RemoveTask", "alloc_id", task.AllocID, "task", task.Name,
"services", len(task.Services),
m.logger.Trace("RemoveWorkload", "alloc_id", svcs.AllocID, "name", svcs.Name(),
"services", len(svcs.Services),
)
m.ops = append(m.ops, NewMockConsulOp("remove", task.AllocID, task.Name))
m.ops = append(m.ops, NewMockConsulOp("remove", svcs.AllocID, svcs.Name()))
}
func (m *MockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) {

View File

@@ -547,6 +547,9 @@ func (b *Builder) SetDeviceHookEnv(hookName string, envs map[string]string) *Bui
// setTask is called from NewBuilder to populate task related environment
// variables.
func (b *Builder) setTask(task *structs.Task) *Builder {
if task == nil {
return b
}
b.taskName = task.Name
b.envvars = make(map[string]string, len(task.Env))
for k, v := range task.Env {

View File

@@ -832,3 +832,18 @@ func TestEnvironment_SetPortMapEnvs(t *testing.T) {
}
require.Equal(t, expected, envs)
}
func TestEnvironment_TasklessBuilder(t *testing.T) {
node := mock.Node()
alloc := mock.Alloc()
alloc.Job.Meta["jobt"] = "foo"
alloc.Job.TaskGroups[0].Meta["groupt"] = "bar"
require := require.New(t)
var taskEnv *TaskEnv
require.NotPanics(func() {
taskEnv = NewBuilder(node, alloc, nil, "global").SetAllocDir("/tmp/alloc").Build()
})
require.Equal("foo", taskEnv.ReplaceEnv("${NOMAD_META_jobt}"))
require.Equal("bar", taskEnv.ReplaceEnv("${NOMAD_META_groupt}"))
}

View File

@@ -0,0 +1,64 @@
package taskenv
import (
"github.com/hashicorp/nomad/nomad/structs"
)
// InterpolateServices returns an interpolated copy of services and checks with
// values from the task's environment.
func InterpolateServices(taskEnv *TaskEnv, services []*structs.Service) []*structs.Service {
// Guard against not having a valid taskEnv. This can be the case if the
// PreKilling or Exited hook is run before Poststart.
if taskEnv == nil || len(services) == 0 {
return nil
}
interpolated := make([]*structs.Service, len(services))
for i, origService := range services {
// Create a copy as we need to reinterpolate every time the
// environment changes
service := origService.Copy()
for _, check := range service.Checks {
check.Name = taskEnv.ReplaceEnv(check.Name)
check.Type = taskEnv.ReplaceEnv(check.Type)
check.Command = taskEnv.ReplaceEnv(check.Command)
check.Args = taskEnv.ParseAndReplace(check.Args)
check.Path = taskEnv.ReplaceEnv(check.Path)
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
check.Method = taskEnv.ReplaceEnv(check.Method)
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
if len(check.Header) > 0 {
header := make(map[string][]string, len(check.Header))
for k, vs := range check.Header {
newVals := make([]string, len(vs))
for i, v := range vs {
newVals[i] = taskEnv.ReplaceEnv(v)
}
header[taskEnv.ReplaceEnv(k)] = newVals
}
check.Header = header
}
}
service.Name = taskEnv.ReplaceEnv(service.Name)
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
service.Tags = taskEnv.ParseAndReplace(service.Tags)
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
if len(service.Meta) > 0 {
meta := make(map[string]string, len(service.Meta))
for k, v := range service.Meta {
meta[k] = taskEnv.ReplaceEnv(v)
}
service.Meta = meta
}
interpolated[i] = service
}
return interpolated
}

View File

@@ -0,0 +1,85 @@
package taskenv
import (
"testing"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
// TestInterpolateServices asserts that all service
// and check fields are properly interpolated.
func TestInterpolateServices(t *testing.T) {
t.Parallel()
services := []*structs.Service{
{
Name: "${name}",
PortLabel: "${portlabel}",
Tags: []string{"${tags}"},
Checks: []*structs.ServiceCheck{
{
Name: "${checkname}",
Type: "${checktype}",
Command: "${checkcmd}",
Args: []string{"${checkarg}"},
Path: "${checkstr}",
Protocol: "${checkproto}",
PortLabel: "${checklabel}",
InitialStatus: "${checkstatus}",
Method: "${checkmethod}",
Header: map[string][]string{
"${checkheaderk}": {"${checkheaderv}"},
},
},
},
},
}
env := &TaskEnv{
EnvMap: map[string]string{
"name": "name",
"portlabel": "portlabel",
"tags": "tags",
"checkname": "checkname",
"checktype": "checktype",
"checkcmd": "checkcmd",
"checkarg": "checkarg",
"checkstr": "checkstr",
"checkpath": "checkpath",
"checkproto": "checkproto",
"checklabel": "checklabel",
"checkstatus": "checkstatus",
"checkmethod": "checkmethod",
"checkheaderk": "checkheaderk",
"checkheaderv": "checkheaderv",
},
}
interpolated := InterpolateServices(env, services)
exp := []*structs.Service{
{
Name: "name",
PortLabel: "portlabel",
Tags: []string{"tags"},
Checks: []*structs.ServiceCheck{
{
Name: "checkname",
Type: "checktype",
Command: "checkcmd",
Args: []string{"checkarg"},
Path: "checkstr",
Protocol: "checkproto",
PortLabel: "checklabel",
InitialStatus: "checkstatus",
Method: "checkmethod",
Header: map[string][]string{
"checkheaderk": {"checkheaderv"},
},
},
},
},
}
require.Equal(t, exp, interpolated)
}

File diff suppressed because one or more lines are too long

View File

@@ -22,8 +22,8 @@ type ChecksAPI interface {
Checks() (map[string]*api.AgentCheck, error)
}
// TaskRestarter allows the checkWatcher to restart tasks.
type TaskRestarter interface {
// WorkloadRestarter allows the checkWatcher to restart tasks or entire task groups.
type WorkloadRestarter interface {
Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error
}
@@ -35,7 +35,7 @@ type checkRestart struct {
checkName string
taskKey string // composite of allocID + taskName for uniqueness
task TaskRestarter
task WorkloadRestarter
grace time.Duration
interval time.Duration
timeLimit time.Duration
@@ -114,7 +114,7 @@ func (c *checkRestart) apply(ctx context.Context, now time.Time, status string)
// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended
// to be called in a goroutine.
func asyncRestart(ctx context.Context, logger log.Logger, task TaskRestarter, event *structs.TaskEvent) {
func asyncRestart(ctx context.Context, logger log.Logger, task WorkloadRestarter, event *structs.TaskEvent) {
// Check watcher restarts are always failures
const failure = true
@@ -292,7 +292,7 @@ func (w *checkWatcher) Run(ctx context.Context) {
}
// Watch a check and restart its task if unhealthy.
func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, restarter TaskRestarter) {
func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, restarter WorkloadRestarter) {
if !check.TriggersRestarts() {
// Not watched, noop
return

View File

@@ -115,12 +115,12 @@ type operations struct {
// allocations by task.
type AllocRegistration struct {
// Tasks maps the name of a task to its registered services and checks
Tasks map[string]*TaskRegistration
Tasks map[string]*ServiceRegistrations
}
func (a *AllocRegistration) copy() *AllocRegistration {
c := &AllocRegistration{
Tasks: make(map[string]*TaskRegistration, len(a.Tasks)),
Tasks: make(map[string]*ServiceRegistrations, len(a.Tasks)),
}
for k, v := range a.Tasks {
@@ -164,14 +164,14 @@ func (a *AllocRegistration) NumChecks() int {
return total
}
// TaskRegistration holds the status of services registered for a particular
// task.
type TaskRegistration struct {
// ServiceRegistrations holds the status of services registered for a particular
// task or task group.
type ServiceRegistrations struct {
Services map[string]*ServiceRegistration
}
func (t *TaskRegistration) copy() *TaskRegistration {
c := &TaskRegistration{
func (t *ServiceRegistrations) copy() *ServiceRegistrations {
c := &ServiceRegistrations{
Services: make(map[string]*ServiceRegistration, len(t.Services)),
}
@@ -675,11 +675,11 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
// serviceRegs creates service registrations, check registrations, and script
// checks from a service. It returns a service registration object with the
// service and check IDs populated.
func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, task *TaskServices) (
func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, workload *WorkloadServices) (
*ServiceRegistration, error) {
// Get the services ID
id := MakeTaskServiceID(task.AllocID, task.Name, service)
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
sreg := &ServiceRegistration{
serviceID: id,
checkIDs: make(map[string]struct{}, len(service.Checks)),
@@ -692,14 +692,14 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
}
// Determine the address to advertise based on the mode
ip, port, err := getAddress(addrMode, service.PortLabel, task.Networks, task.DriverNetwork)
ip, port, err := getAddress(addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork)
if err != nil {
return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err)
}
// Determine whether to use tags or canary_tags
var tags []string
if task.Canary && len(service.CanaryTags) > 0 {
if workload.Canary && len(service.CanaryTags) > 0 {
tags = make([]string, len(service.CanaryTags))
copy(tags, service.CanaryTags)
} else {
@@ -708,7 +708,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
}
// newConnect returns (nil, nil) if there's no Connect-enabled service.
connect, err := newConnect(service.Name, service.Connect, task.Networks)
connect, err := newConnect(service.Name, service.Connect, workload.Networks)
if err != nil {
return nil, fmt.Errorf("invalid Consul Connect configuration for service %q: %v", service.Name, err)
}
@@ -734,7 +734,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
ops.regServices = append(ops.regServices, serviceReg)
// Build the check registrations
checkIDs, err := c.checkRegs(ops, id, service, task)
checkIDs, err := c.checkRegs(ops, id, service, workload)
if err != nil {
return nil, err
}
@@ -747,7 +747,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
// checkRegs registers the checks for the given service and returns the
// registered check ids.
func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *structs.Service,
task *TaskServices) ([]string, error) {
workload *WorkloadServices) ([]string, error) {
// Fast path
numChecks := len(service.Checks)
@@ -782,7 +782,7 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
addrMode = structs.AddressModeHost
}
ip, port, err := getAddress(addrMode, portLabel, task.Networks, task.DriverNetwork)
ip, port, err := getAddress(addrMode, portLabel, workload.Networks, workload.DriverNetwork)
if err != nil {
return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err)
}
@@ -796,186 +796,67 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
return checkIDs, nil
}
//TODO(schmichael) remove
type noopRestarter struct{}
func (noopRestarter) Restart(context.Context, *structs.TaskEvent, bool) error { return nil }
// makeAllocTaskServices creates a TaskServices struct for a group service.
//
//TODO(schmichael) rename TaskServices and refactor this into a New method
func makeAllocTaskServices(alloc *structs.Allocation, tg *structs.TaskGroup) (*TaskServices, error) {
//COMPAT(0.11) AllocatedResources is only nil when upgrading directly
// from 0.8.
if alloc.AllocatedResources == nil || len(alloc.AllocatedResources.Shared.Networks) == 0 {
return nil, fmt.Errorf("unable to register a group service without a group network")
}
//TODO(schmichael) only support one network for now
net := alloc.AllocatedResources.Shared.Networks[0]
ts := &TaskServices{
AllocID: alloc.ID,
Name: "group-" + alloc.TaskGroup,
Services: tg.Services,
Networks: alloc.AllocatedResources.Shared.Networks,
//TODO(schmichael) there's probably a better way than hacking driver network
DriverNetwork: &drivers.DriverNetwork{
AutoAdvertise: true,
IP: net.IP,
// Copy PortLabels from group network
PortMap: net.PortLabels(),
},
// unsupported for group services
Restarter: noopRestarter{},
DriverExec: nil,
}
if alloc.DeploymentStatus != nil {
ts.Canary = alloc.DeploymentStatus.Canary
}
return ts, nil
}
// RegisterGroup services with Consul. Adds all task group-level service
// entries and checks to Consul.
func (c *ServiceClient) RegisterGroup(alloc *structs.Allocation) error {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
}
if len(tg.Services) == 0 {
// noop
return nil
}
ts, err := makeAllocTaskServices(alloc, tg)
if err != nil {
return err
}
return c.RegisterTask(ts)
}
// UpdateGroup services with Consul. Updates all task group-level service
// entries and checks to Consul.
func (c *ServiceClient) UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error {
oldTG := oldAlloc.Job.LookupTaskGroup(oldAlloc.TaskGroup)
if oldTG == nil {
return fmt.Errorf("task group %q not in old allocation", oldAlloc.TaskGroup)
}
if len(oldTG.Services) == 0 {
// No old group services, simply add new group services
return c.RegisterGroup(newAlloc)
}
oldServices, err := makeAllocTaskServices(oldAlloc, oldTG)
if err != nil {
return err
}
newTG := newAlloc.Job.LookupTaskGroup(newAlloc.TaskGroup)
if newTG == nil {
return fmt.Errorf("task group %q not in new allocation", newAlloc.TaskGroup)
}
newServices, err := makeAllocTaskServices(newAlloc, newTG)
if err != nil {
return err
}
return c.UpdateTask(oldServices, newServices)
}
// RemoveGroup services with Consul. Removes all task group-level service
// entries and checks from Consul.
func (c *ServiceClient) RemoveGroup(alloc *structs.Allocation) error {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
}
if len(tg.Services) == 0 {
// noop
return nil
}
ts, err := makeAllocTaskServices(alloc, tg)
if err != nil {
return err
}
c.RemoveTask(ts)
return nil
}
// RegisterTask with Consul. Adds all service entries and checks to Consul. If
// exec is nil and a script check exists an error is returned.
// RegisterWorkload with Consul. Adds all service entries and checks to Consul.
//
// If the service IP is set it used as the address in the service registration.
// Checks will always use the IP from the Task struct (host's IP).
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RegisterTask(task *TaskServices) error {
func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
// Fast path
numServices := len(task.Services)
numServices := len(workload.Services)
if numServices == 0 {
return nil
}
t := new(TaskRegistration)
t := new(ServiceRegistrations)
t.Services = make(map[string]*ServiceRegistration, numServices)
ops := &operations{}
for _, service := range task.Services {
sreg, err := c.serviceRegs(ops, service, task)
for _, service := range workload.Services {
sreg, err := c.serviceRegs(ops, service, workload)
if err != nil {
return err
}
t.Services[sreg.serviceID] = sreg
}
// Add the task to the allocation's registration
c.addTaskRegistration(task.AllocID, task.Name, t)
// Add the workload to the allocation's registration
c.addRegistrations(workload.AllocID, workload.Name(), t)
c.commit(ops)
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range task.Services {
serviceID := MakeTaskServiceID(task.AllocID, task.Name, service)
for _, service := range workload.Services {
serviceID := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := MakeCheckID(serviceID, check)
c.checkWatcher.Watch(task.AllocID, task.Name, checkID, check, task.Restarter)
c.checkWatcher.Watch(workload.AllocID, workload.Name(), checkID, check, workload.Restarter)
}
}
}
return nil
}
// UpdateTask in Consul. Does not alter the service if only checks have
// UpdateWorkload in Consul. Does not alter the service if only checks have
// changed.
//
// DriverNetwork must not change between invocations for the same allocation.
func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error {
ops := &operations{}
taskReg := new(TaskRegistration)
taskReg.Services = make(map[string]*ServiceRegistration, len(newTask.Services))
regs := new(ServiceRegistrations)
regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services))
existingIDs := make(map[string]*structs.Service, len(old.Services))
for _, s := range old.Services {
existingIDs[MakeTaskServiceID(old.AllocID, old.Name, s)] = s
existingIDs[MakeAllocServiceID(old.AllocID, old.Name(), s)] = s
}
newIDs := make(map[string]*structs.Service, len(newTask.Services))
for _, s := range newTask.Services {
newIDs[MakeTaskServiceID(newTask.AllocID, newTask.Name, s)] = s
newIDs := make(map[string]*structs.Service, len(newWorkload.Services))
for _, s := range newWorkload.Services {
newIDs[MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s
}
// Loop over existing Service IDs to see if they have been removed
@@ -997,8 +878,8 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
continue
}
oldHash := existingSvc.Hash(old.AllocID, old.Name, old.Canary)
newHash := newSvc.Hash(newTask.AllocID, newTask.Name, newTask.Canary)
oldHash := existingSvc.Hash(old.AllocID, old.Name(), old.Canary)
newHash := newSvc.Hash(newWorkload.AllocID, newWorkload.Name(), newWorkload.Canary)
if oldHash == newHash {
// Service exists and hasn't changed, don't re-add it later
delete(newIDs, existingID)
@@ -1009,7 +890,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
serviceID: existingID,
checkIDs: make(map[string]struct{}, len(newSvc.Checks)),
}
taskReg.Services[existingID] = sreg
regs.Services[existingID] = sreg
// See if any checks were updated
existingChecks := make(map[string]*structs.ServiceCheck, len(existingSvc.Checks))
@@ -1028,7 +909,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
}
// New check on an unchanged service; add them now
newCheckIDs, err := c.checkRegs(ops, existingID, newSvc, newTask)
newCheckIDs, err := c.checkRegs(ops, existingID, newSvc, newWorkload)
if err != nil {
return err
}
@@ -1039,7 +920,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
// Update all watched checks as CheckRestart fields aren't part of ID
if check.TriggersRestarts() {
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter)
}
}
@@ -1056,41 +937,41 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
// Any remaining services should just be enqueued directly
for _, newSvc := range newIDs {
sreg, err := c.serviceRegs(ops, newSvc, newTask)
sreg, err := c.serviceRegs(ops, newSvc, newWorkload)
if err != nil {
return err
}
taskReg.Services[sreg.serviceID] = sreg
regs.Services[sreg.serviceID] = sreg
}
// Add the task to the allocation's registration
c.addTaskRegistration(newTask.AllocID, newTask.Name, taskReg)
c.addRegistrations(newWorkload.AllocID, newWorkload.Name(), regs)
c.commit(ops)
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range newIDs {
serviceID := MakeTaskServiceID(newTask.AllocID, newTask.Name, service)
serviceID := MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), service)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := MakeCheckID(serviceID, check)
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter)
}
}
}
return nil
}
// RemoveTask from Consul. Removes all service entries and checks.
// RemoveWorkload from Consul. Removes all service entries and checks.
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RemoveTask(task *TaskServices) {
func (c *ServiceClient) RemoveWorkload(workload *WorkloadServices) {
ops := operations{}
for _, service := range task.Services {
id := MakeTaskServiceID(task.AllocID, task.Name, service)
for _, service := range workload.Services {
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
ops.deregServices = append(ops.deregServices, id)
for _, check := range service.Checks {
@@ -1103,8 +984,8 @@ func (c *ServiceClient) RemoveTask(task *TaskServices) {
}
}
// Remove the task from the alloc's registrations
c.removeTaskRegistration(task.AllocID, task.Name)
// Remove the workload from the alloc's registrations
c.removeRegistration(workload.AllocID, workload.Name())
// Now add them to the deregistration fields; main Run loop will update
c.commit(&ops)
@@ -1203,23 +1084,23 @@ func (c *ServiceClient) Shutdown() error {
return nil
}
// addTaskRegistration adds the task registration for the given allocation.
func (c *ServiceClient) addTaskRegistration(allocID, taskName string, reg *TaskRegistration) {
// addRegistration adds the service registrations for the given allocation.
func (c *ServiceClient) addRegistrations(allocID, taskName string, reg *ServiceRegistrations) {
c.allocRegistrationsLock.Lock()
defer c.allocRegistrationsLock.Unlock()
alloc, ok := c.allocRegistrations[allocID]
if !ok {
alloc = &AllocRegistration{
Tasks: make(map[string]*TaskRegistration),
Tasks: make(map[string]*ServiceRegistrations),
}
c.allocRegistrations[allocID] = alloc
}
alloc.Tasks[taskName] = reg
}
// removeTaskRegistration removes the task registration for the given allocation.
func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) {
// removeRegistrations removes the registration for the given allocation.
func (c *ServiceClient) removeRegistration(allocID, taskName string) {
c.allocRegistrationsLock.Lock()
defer c.allocRegistrationsLock.Unlock()
@@ -1249,11 +1130,11 @@ func makeAgentServiceID(role string, service *structs.Service) string {
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "", false))
}
// MakeTaskServiceID creates a unique ID for identifying a task service in
// MakeAllocServiceID creates a unique ID for identifying an alloc service in
// Consul.
//
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http-http
func MakeTaskServiceID(allocID, taskName string, service *structs.Service) string {
func MakeAllocServiceID(allocID, taskName string, service *structs.Service) string {
return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel)
}

View File

@@ -64,6 +64,9 @@ func TestConsul_Connect(t *testing.T) {
{
Name: "testconnect",
PortLabel: "9999",
Meta: map[string]string{
"alloc_id": "${NOMAD_ALLOC_ID}",
},
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{
Proxy: &structs.ConsulProxy{
@@ -76,10 +79,10 @@ func TestConsul_Connect(t *testing.T) {
// required by isNomadSidecar assertion below
serviceRegMap := map[string]*api.AgentServiceRegistration{
MakeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0]): nil,
MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0]): nil,
}
require.NoError(t, serviceClient.RegisterGroup(alloc))
require.NoError(t, serviceClient.RegisterWorkload(BuildAllocServices(mock.Node(), alloc, NoopRestarter())))
require.Eventually(t, func() bool {
services, err := consulClient.Agent().Services()
@@ -94,7 +97,7 @@ func TestConsul_Connect(t *testing.T) {
require.NoError(t, err)
require.Len(t, services, 2)
serviceID := MakeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0])
serviceID := MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0])
connectID := serviceID + "-sidecar-proxy"
require.Contains(t, services, serviceID)
@@ -123,88 +126,8 @@ func TestConsul_Connect(t *testing.T) {
"bind_address": "0.0.0.0",
"bind_port": float64(9998),
})
require.Equal(t, alloc.ID, agentService.Meta["alloc_id"])
time.Sleep(interval >> 2)
}
}
// TestConsul_Update08Alloc asserts that adding group services to a previously
// 0.8 alloc works.
//
// COMPAT(0.11) Only valid for upgrades from 0.8.
func TestConsul_Update08Alloc(t *testing.T) {
// Create an embedded Consul server
testconsul, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
if err != nil {
t.Fatalf("error starting test consul server: %v", err)
}
defer testconsul.Stop()
consulConfig := consulapi.DefaultConfig()
consulConfig.Address = testconsul.HTTPAddr
consulClient, err := consulapi.NewClient(consulConfig)
require.NoError(t, err)
serviceClient := NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true)
// Lower periodicInterval to ensure periodic syncing doesn't improperly
// remove Connect services.
const interval = 50 * time.Millisecond
serviceClient.periodicInterval = interval
// Disable deregistration probation to test syncing
serviceClient.deregisterProbationExpiry = time.Time{}
go serviceClient.Run()
defer serviceClient.Shutdown()
// Create new 0.10-style alloc
alloc := mock.Alloc()
alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{
{
Mode: "bridge",
IP: "10.0.0.1",
DynamicPorts: []structs.Port{
{
Label: "connect-proxy-testconnect",
Value: 9999,
To: 9998,
},
},
},
}
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
tg.Services = []*structs.Service{
{
Name: "testconnect",
PortLabel: "9999",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{
Proxy: &structs.ConsulProxy{
LocalServicePort: 9000,
},
},
},
},
}
// Create old 0.8-style alloc from new alloc
oldAlloc := alloc.Copy()
oldAlloc.AllocatedResources = nil
oldAlloc.Job.LookupTaskGroup(alloc.TaskGroup).Services = nil
// Expect new services to get registered
require.NoError(t, serviceClient.UpdateGroup(oldAlloc, alloc))
// Assert the group and sidecar services are registered
require.Eventually(t, func() bool {
services, err := consulClient.Agent().Services()
require.NoError(t, err)
return len(services) == 2
}, 3*time.Second, 100*time.Millisecond)
}

View File

@@ -2,22 +2,28 @@ package consul
import (
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
type TaskServices struct {
// WorkloadServices describes services defined in either a Task or TaskGroup
// that need to be syncronized with Consul
type WorkloadServices struct {
AllocID string
// Name of the task
Name string
// Name of the task and task group the services are defined for. For
// group based services, Task will be empty
Task string
Group string
// Canary indicates whether or not the allocation is a canary
Canary bool
// Restarter allows restarting the task depending on the task's
// Restarter allows restarting the task or task group depending on the
// check_restart stanzas.
Restarter TaskRestarter
Restarter WorkloadRestarter
// Services and checks to register for the task.
Services []*structs.Service
@@ -26,41 +32,49 @@ type TaskServices struct {
Networks structs.Networks
// DriverExec is the script executor for the task's driver.
// For group services this is nil and script execution is managed by
// a tasklet in the taskrunner script_check_hook
DriverExec interfaces.ScriptExecutor
// DriverNetwork is the network specified by the driver and may be nil.
DriverNetwork *drivers.DriverNetwork
}
func NewTaskServices(alloc *structs.Allocation, task *structs.Task, restarter TaskRestarter, exec interfaces.ScriptExecutor, net *drivers.DriverNetwork) *TaskServices {
ts := TaskServices{
AllocID: alloc.ID,
Name: task.Name,
Restarter: restarter,
Services: task.Services,
DriverExec: exec,
DriverNetwork: net,
func BuildAllocServices(node *structs.Node, alloc *structs.Allocation, restarter WorkloadRestarter) *WorkloadServices {
//TODO(schmichael) only support one network for now
net := alloc.AllocatedResources.Shared.Networks[0]
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
ws := &WorkloadServices{
AllocID: alloc.ID,
Group: alloc.TaskGroup,
Services: taskenv.InterpolateServices(taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build(), tg.Services),
Networks: alloc.AllocatedResources.Shared.Networks,
//TODO(schmichael) there's probably a better way than hacking driver network
DriverNetwork: &drivers.DriverNetwork{
AutoAdvertise: true,
IP: net.IP,
// Copy PortLabels from group network
PortMap: net.PortLabels(),
},
Restarter: restarter,
DriverExec: nil,
}
if alloc.AllocatedResources != nil {
if tr, ok := alloc.AllocatedResources.Tasks[task.Name]; ok {
ts.Networks = tr.Networks
}
} else if task.Resources != nil {
// COMPAT(0.11): Remove in 0.11
ts.Networks = task.Resources.Networks
if alloc.DeploymentStatus != nil {
ws.Canary = alloc.DeploymentStatus.Canary
}
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Canary {
ts.Canary = true
}
return &ts
return ws
}
// Copy method for easing tests
func (t *TaskServices) Copy() *TaskServices {
newTS := new(TaskServices)
func (t *WorkloadServices) Copy() *WorkloadServices {
newTS := new(WorkloadServices)
*newTS = *t
// Deep copy Services
@@ -70,3 +84,11 @@ func (t *TaskServices) Copy() *TaskServices {
}
return newTS
}
func (w *WorkloadServices) Name() string {
if w.Task != "" {
return w.Task
}
return "group-" + w.Group
}

View File

@@ -0,0 +1,17 @@
package consul
import (
"context"
"github.com/hashicorp/nomad/nomad/structs"
)
func NoopRestarter() WorkloadRestarter {
return noopRestarter{}
}
type noopRestarter struct{}
func (noopRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
return nil
}

View File

@@ -20,15 +20,15 @@ import (
)
const (
// Ports used in testTask
// Ports used in testWorkload
xPort = 1234
yPort = 1235
)
func testTask() *TaskServices {
return &TaskServices{
func testWorkload() *WorkloadServices {
return &WorkloadServices{
AllocID: uuid.Generate(),
Name: "taskname",
Task: "taskname",
Restarter: &restartRecorder{},
Services: []*structs.Service{
{
@@ -48,7 +48,7 @@ func testTask() *TaskServices {
}
}
// restartRecorder is a minimal TaskRestarter implementation that simply
// restartRecorder is a minimal WorkloadRestarter implementation that simply
// counts how many restarts were triggered.
type restartRecorder struct {
restarts int64
@@ -63,7 +63,7 @@ func (r *restartRecorder) Restart(ctx context.Context, event *structs.TaskEvent,
type testFakeCtx struct {
ServiceClient *ServiceClient
FakeConsul *MockAgent
Task *TaskServices
Workload *WorkloadServices
}
var errNoOps = fmt.Errorf("testing error: no pending operations")
@@ -85,10 +85,10 @@ func (t *testFakeCtx) syncOnce() error {
}
// setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul.
// A test Task is also provided.
// A test Workload is also provided.
func setupFake(t *testing.T) *testFakeCtx {
fc := NewMockAgent()
tt := testTask()
tw := testWorkload()
// by default start fake client being out of probation
sc := NewServiceClient(fc, testlog.HCLogger(t), true)
@@ -97,7 +97,7 @@ func setupFake(t *testing.T) *testFakeCtx {
return &testFakeCtx{
ServiceClient: sc,
FakeConsul: fc,
Task: tt,
Workload: tw,
}
}
@@ -105,35 +105,35 @@ func TestConsul_ChangeTags(t *testing.T) {
ctx := setupFake(t)
require := require.New(t)
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
// Validate the alloc registration
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
require.NoError(err)
require.NotNil(reg1, "Unexpected nil alloc registration")
require.Equal(1, reg1.NumServices())
require.Equal(0, reg1.NumChecks())
for _, v := range ctx.FakeConsul.services {
require.Equal(v.Name, ctx.Task.Services[0].Name)
require.Equal(v.Tags, ctx.Task.Services[0].Tags)
require.Equal(v.Name, ctx.Workload.Services[0].Name)
require.Equal(v.Tags, ctx.Workload.Services[0].Tags)
}
// Update the task definition
origTask := ctx.Task.Copy()
ctx.Task.Services[0].Tags[0] = "newtag"
origWorkload := ctx.Workload.Copy()
ctx.Workload.Services[0].Tags[0] = "newtag"
// Register and sync the update
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
// Validate the metadata changed
for _, v := range ctx.FakeConsul.services {
require.Equal(v.Name, ctx.Task.Services[0].Name)
require.Equal(v.Tags, ctx.Task.Services[0].Tags)
require.Equal(v.Name, ctx.Workload.Services[0].Name)
require.Equal(v.Tags, ctx.Workload.Services[0].Tags)
require.Equal("newtag", v.Tags[0])
}
}
@@ -145,7 +145,7 @@ func TestConsul_ChangePorts(t *testing.T) {
ctx := setupFake(t)
require := require.New(t)
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
@@ -170,13 +170,13 @@ func TestConsul_ChangePorts(t *testing.T) {
},
}
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
for _, v := range ctx.FakeConsul.services {
require.Equal(ctx.Task.Services[0].Name, v.Name)
require.Equal(ctx.Task.Services[0].Tags, v.Tags)
require.Equal(ctx.Workload.Services[0].Name, v.Name)
require.Equal(ctx.Workload.Services[0].Tags, v.Tags)
require.Equal(xPort, v.Port)
}
@@ -205,9 +205,9 @@ func TestConsul_ChangePorts(t *testing.T) {
require.NotEmpty(origHTTPKey)
// Now update the PortLabel on the Service and Check c3
origTask := ctx.Task.Copy()
ctx.Task.Services[0].PortLabel = "y"
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
origWorkload := ctx.Workload.Copy()
ctx.Workload.Services[0].PortLabel = "y"
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
@@ -232,13 +232,13 @@ func TestConsul_ChangePorts(t *testing.T) {
},
}
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
for _, v := range ctx.FakeConsul.services {
require.Equal(ctx.Task.Services[0].Name, v.Name)
require.Equal(ctx.Task.Services[0].Tags, v.Tags)
require.Equal(ctx.Workload.Services[0].Name, v.Name)
require.Equal(ctx.Workload.Services[0].Tags, v.Tags)
require.Equal(yPort, v.Port)
}
@@ -266,7 +266,7 @@ func TestConsul_ChangePorts(t *testing.T) {
// properly syncs with Consul.
func TestConsul_ChangeChecks(t *testing.T) {
ctx := setupFake(t)
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
@@ -279,7 +279,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
},
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -300,7 +300,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
// Query the allocs registrations and then again when we update. The IDs
// should change
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
@@ -317,8 +317,8 @@ func TestConsul_ChangeChecks(t *testing.T) {
origServiceKey := ""
for k, v := range ctx.FakeConsul.services {
origServiceKey = k
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
if v.Name != ctx.Workload.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Workload.Services[0].Name, v.Name)
}
if v.Port != xPort {
t.Errorf("expected Port x=%v but found: %v", xPort, v.Port)
@@ -335,8 +335,8 @@ func TestConsul_ChangeChecks(t *testing.T) {
}
// Now add a check and modify the original
origTask := ctx.Task.Copy()
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
origWorkload := ctx.Workload.Copy()
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
@@ -356,7 +356,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
PortLabel: "x",
},
}
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -414,7 +414,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
}
// Check again and ensure the IDs changed
reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
@@ -449,8 +449,8 @@ func TestConsul_ChangeChecks(t *testing.T) {
}
// Alter a CheckRestart and make sure the watcher is updated but nothing else
origTask = ctx.Task.Copy()
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
origWorkload = ctx.Workload.Copy()
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
@@ -470,7 +470,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
PortLabel: "x",
},
}
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
@@ -502,7 +502,7 @@ func TestConsul_RegServices(t *testing.T) {
ctx := setupFake(t)
// Add a check w/restarting
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "testcheck",
Type: "tcp",
@@ -513,7 +513,7 @@ func TestConsul_RegServices(t *testing.T) {
},
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -526,11 +526,11 @@ func TestConsul_RegServices(t *testing.T) {
}
for _, v := range ctx.FakeConsul.services {
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
if v.Name != ctx.Workload.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Workload.Services[0].Name, v.Name)
}
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
if !reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Workload.Services[0].Tags, v.Tags)
}
if v.Port != xPort {
t.Errorf("expected Port=%d != %d", xPort, v.Port)
@@ -544,7 +544,7 @@ func TestConsul_RegServices(t *testing.T) {
// Assert the check update is properly formed
checkUpd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if checkUpd.checkRestart.allocID != ctx.Task.AllocID {
if checkUpd.checkRestart.allocID != ctx.Workload.AllocID {
t.Fatalf("expected check's allocid to be %q but found %q", "allocid", checkUpd.checkRestart.allocID)
}
if expected := 200 * time.Millisecond; checkUpd.checkRestart.timeLimit != expected {
@@ -552,9 +552,9 @@ func TestConsul_RegServices(t *testing.T) {
}
// Make a change which will register a new service
ctx.Task.Services[0].Name = "taskname-service2"
ctx.Task.Services[0].Tags[0] = "tag3"
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
ctx.Workload.Services[0].Name = "taskname-service2"
ctx.Workload.Services[0].Tags[0] = "tag3"
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -575,7 +575,7 @@ func TestConsul_RegServices(t *testing.T) {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
for _, v := range ctx.FakeConsul.services {
if reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
if reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) {
t.Errorf("expected Tags to differ, changes applied before sync()")
}
}
@@ -589,22 +589,22 @@ func TestConsul_RegServices(t *testing.T) {
}
found := false
for _, v := range ctx.FakeConsul.services {
if v.Name == ctx.Task.Services[0].Name {
if v.Name == ctx.Workload.Services[0].Name {
if found {
t.Fatalf("found new service name %q twice", v.Name)
}
found = true
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
if !reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Workload.Services[0].Tags, v.Tags)
}
}
}
if !found {
t.Fatalf("did not find new service %q", ctx.Task.Services[0].Name)
t.Fatalf("did not find new service %q", ctx.Workload.Services[0].Name)
}
// Remove the new task
ctx.ServiceClient.RemoveTask(ctx.Task)
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
@@ -744,7 +744,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
ctx.Task.Services = []*structs.Service{
ctx.Workload.Services = []*structs.Service{
{
Name: "auto-advertise-x",
PortLabel: "x",
@@ -785,7 +785,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
},
}
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
ctx.Workload.DriverNetwork = &drivers.DriverNetwork{
PortMap: map[string]int{
"x": 8888,
"y": 9999,
@@ -794,7 +794,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
AutoAdvertise: true,
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -808,14 +808,14 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
for _, v := range ctx.FakeConsul.services {
switch v.Name {
case ctx.Task.Services[0].Name: // x
case ctx.Workload.Services[0].Name: // x
// Since DriverNetwork.AutoAdvertise=true, driver ports should be used
if v.Port != ctx.Task.DriverNetwork.PortMap["x"] {
if v.Port != ctx.Workload.DriverNetwork.PortMap["x"] {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
v.Name, ctx.Workload.DriverNetwork.PortMap["x"], v.Port)
}
// The order of checks in Consul is not guaranteed to
// be the same as their order in the Task definition,
// be the same as their order in the Workload definition,
// so check in a loop
if expected := 2; len(v.Checks) != expected {
t.Errorf("expected %d checks but found %d", expected, len(v.Checks))
@@ -838,22 +838,22 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
t.Errorf("unexpected check %#v on service %q", c, v.Name)
}
}
case ctx.Task.Services[1].Name: // y
case ctx.Workload.Services[1].Name: // y
// Service should be container ip:port
if v.Address != ctx.Task.DriverNetwork.IP {
if v.Address != ctx.Workload.DriverNetwork.IP {
t.Errorf("expected service %s's address to be %s but found %s",
v.Name, ctx.Task.DriverNetwork.IP, v.Address)
v.Name, ctx.Workload.DriverNetwork.IP, v.Address)
}
if v.Port != ctx.Task.DriverNetwork.PortMap["y"] {
if v.Port != ctx.Workload.DriverNetwork.PortMap["y"] {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
v.Name, ctx.Workload.DriverNetwork.PortMap["x"], v.Port)
}
// Check should be host ip:port
if v.Checks[0].TCP != ":1235" { // yPort
t.Errorf("expected service %s check's port to be %d but found %s",
v.Name, yPort, v.Checks[0].TCP)
}
case ctx.Task.Services[2].Name: // y + host mode
case ctx.Workload.Services[2].Name: // y + host mode
if v.Port != yPort {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, yPort, v.Port)
@@ -871,7 +871,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
ctx.Task.Services = []*structs.Service{
ctx.Workload.Services = []*structs.Service{
{
Name: "auto-advertise-x",
PortLabel: "x",
@@ -889,7 +889,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
},
}
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
ctx.Workload.DriverNetwork = &drivers.DriverNetwork{
PortMap: map[string]int{
"x": 8888,
"y": 9999,
@@ -898,7 +898,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
AutoAdvertise: false,
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
@@ -912,23 +912,23 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
for _, v := range ctx.FakeConsul.services {
switch v.Name {
case ctx.Task.Services[0].Name: // x + auto
case ctx.Workload.Services[0].Name: // x + auto
// Since DriverNetwork.AutoAdvertise=false, host ports should be used
if v.Port != xPort {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, xPort, v.Port)
}
case ctx.Task.Services[1].Name: // y + driver mode
case ctx.Workload.Services[1].Name: // y + driver mode
// Service should be container ip:port
if v.Address != ctx.Task.DriverNetwork.IP {
if v.Address != ctx.Workload.DriverNetwork.IP {
t.Errorf("expected service %s's address to be %s but found %s",
v.Name, ctx.Task.DriverNetwork.IP, v.Address)
v.Name, ctx.Workload.DriverNetwork.IP, v.Address)
}
if v.Port != ctx.Task.DriverNetwork.PortMap["y"] {
if v.Port != ctx.Workload.DriverNetwork.PortMap["y"] {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
v.Name, ctx.Workload.DriverNetwork.PortMap["x"], v.Port)
}
case ctx.Task.Services[2].Name: // y + host mode
case ctx.Workload.Services[2].Name: // y + host mode
if v.Port != yPort {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, yPort, v.Port)
@@ -945,7 +945,7 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
ctx.Task.Services = []*structs.Service{
ctx.Workload.Services = []*structs.Service{
{
Name: "service-foo",
PortLabel: "x",
@@ -953,7 +953,7 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
},
}
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
ctx.Workload.DriverNetwork = &drivers.DriverNetwork{
PortMap: map[string]int{
"x": 8888,
"y": 9999,
@@ -973,7 +973,7 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
for _, v := range ctx.FakeConsul.services {
switch v.Name {
case ctx.Task.Services[0].Name:
case ctx.Workload.Services[0].Name:
if v.Port != port {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, port, v.Port)
@@ -985,31 +985,31 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
}
// Initial service should advertise host port x
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
syncAndAssertPort(xPort)
// UpdateTask to use Host (shouldn't change anything)
origTask := ctx.Task.Copy()
ctx.Task.Services[0].AddressMode = structs.AddressModeHost
// UpdateWorkload to use Host (shouldn't change anything)
origWorkload := ctx.Workload.Copy()
ctx.Workload.Services[0].AddressMode = structs.AddressModeHost
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
t.Fatalf("unexpected error updating task: %v", err)
}
syncAndAssertPort(xPort)
// UpdateTask to use Driver (*should* change IP and port)
origTask = ctx.Task.Copy()
ctx.Task.Services[0].AddressMode = structs.AddressModeDriver
// UpdateWorkload to use Driver (*should* change IP and port)
origWorkload = ctx.Workload.Copy()
ctx.Workload.Services[0].AddressMode = structs.AddressModeDriver
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
t.Fatalf("unexpected error updating task: %v", err)
}
syncAndAssertPort(ctx.Task.DriverNetwork.PortMap["x"])
syncAndAssertPort(ctx.Workload.DriverNetwork.PortMap["x"])
}
// TestConsul_CanaryTags asserts CanaryTags are used when Canary=true
@@ -1019,10 +1019,10 @@ func TestConsul_CanaryTags(t *testing.T) {
ctx := setupFake(t)
canaryTags := []string{"tag1", "canary"}
ctx.Task.Canary = true
ctx.Task.Services[0].CanaryTags = canaryTags
ctx.Workload.Canary = true
ctx.Workload.Services[0].CanaryTags = canaryTags
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
@@ -1030,16 +1030,16 @@ func TestConsul_CanaryTags(t *testing.T) {
}
// Disable canary and assert tags are not the canary tags
origTask := ctx.Task.Copy()
ctx.Task.Canary = false
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
origWorkload := ctx.Workload.Copy()
ctx.Workload.Canary = false
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.NotEqual(canaryTags, service.Tags)
}
ctx.ServiceClient.RemoveTask(ctx.Task)
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 0)
}
@@ -1052,10 +1052,10 @@ func TestConsul_CanaryTags_NoTags(t *testing.T) {
ctx := setupFake(t)
tags := []string{"tag1", "foo"}
ctx.Task.Canary = true
ctx.Task.Services[0].Tags = tags
ctx.Workload.Canary = true
ctx.Workload.Services[0].Tags = tags
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
@@ -1063,16 +1063,16 @@ func TestConsul_CanaryTags_NoTags(t *testing.T) {
}
// Disable canary and assert tags dont change
origTask := ctx.Task.Copy()
ctx.Task.Canary = false
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
origWorkload := ctx.Workload.Copy()
ctx.Workload.Canary = false
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Equal(tags, service.Tags)
}
ctx.ServiceClient.RemoveTask(ctx.Task)
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 0)
}
@@ -1402,7 +1402,7 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) {
ctx := setupFake(t)
require := require.New(t)
ctx.Task.Services = []*structs.Service{
ctx.Workload.Services = []*structs.Service{
{
Name: "best-service",
PortLabel: "x",
@@ -1439,20 +1439,20 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) {
},
}
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 3)
for _, v := range ctx.FakeConsul.services {
if v.Name == ctx.Task.Services[0].Name && v.Port == xPort {
require.ElementsMatch(v.Tags, ctx.Task.Services[0].Tags)
if v.Name == ctx.Workload.Services[0].Name && v.Port == xPort {
require.ElementsMatch(v.Tags, ctx.Workload.Services[0].Tags)
require.Len(v.Checks, 1)
} else if v.Name == ctx.Task.Services[1].Name && v.Port == yPort {
require.ElementsMatch(v.Tags, ctx.Task.Services[1].Tags)
} else if v.Name == ctx.Workload.Services[1].Name && v.Port == yPort {
require.ElementsMatch(v.Tags, ctx.Workload.Services[1].Tags)
require.Len(v.Checks, 1)
} else if v.Name == ctx.Task.Services[2].Name {
} else if v.Name == ctx.Workload.Services[2].Name {
require.Len(v.Checks, 0)
}
}
@@ -1467,8 +1467,8 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
remainingTask := testTask()
remainingTask.Services = []*structs.Service{
remainingWorkload := testWorkload()
remainingWorkload.Services = []*structs.Service{
{
Name: "remaining-service",
PortLabel: "x",
@@ -1482,16 +1482,16 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
remainingTaskServiceID := MakeTaskServiceID(remainingTask.AllocID,
remainingTask.Name, remainingTask.Services[0])
remainingWorkloadServiceID := MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkload.Name(), remainingWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
require.Len(ctx.FakeConsul.checks, 1)
explicitlyRemovedTask := testTask()
explicitlyRemovedTask.Services = []*structs.Service{
explicitlyRemovedWorkload := testWorkload()
explicitlyRemovedWorkload.Services = []*structs.Service{
{
Name: "explicitly-removed-service",
PortLabel: "y",
@@ -1505,18 +1505,18 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
explicitlyRemovedTaskServiceID := MakeTaskServiceID(explicitlyRemovedTask.AllocID,
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0])
explicitlyRemovedWorkloadServiceID := MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 2)
require.Len(ctx.FakeConsul.checks, 2)
// we register a task through nomad API then remove it out of band
outofbandTask := testTask()
outofbandTask.Services = []*structs.Service{
outofbandWorkload := testWorkload()
outofbandWorkload.Services = []*structs.Service{
{
Name: "unknown-service",
PortLabel: "x",
@@ -1530,39 +1530,39 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
outofbandTaskServiceID := MakeTaskServiceID(outofbandTask.AllocID,
outofbandTask.Name, outofbandTask.Services[0])
outofbandWorkloadServiceID := MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkload.Name(), outofbandWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 3)
// remove outofbandTask from local services so it appears unknown to client
// remove outofbandWorkload from local services so it appears unknown to client
require.Len(ctx.ServiceClient.services, 3)
require.Len(ctx.ServiceClient.checks, 3)
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
delete(ctx.ServiceClient.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
delete(ctx.ServiceClient.services, outofbandWorkloadServiceID)
delete(ctx.ServiceClient.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.Len(ctx.ServiceClient.services, 2)
require.Len(ctx.ServiceClient.checks, 2)
// Sync and ensure that explicitly removed service as well as outofbandTask were removed
// Sync and ensure that explicitly removed service as well as outofbandWorkload were removed
ctx.ServiceClient.RemoveTask(explicitlyRemovedTask)
ctx.ServiceClient.RemoveWorkload(explicitlyRemovedWorkload)
require.NoError(ctx.syncOnce())
require.NoError(ctx.ServiceClient.sync())
require.Len(ctx.FakeConsul.services, 1)
require.Len(ctx.FakeConsul.checks, 1)
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
require.Contains(ctx.FakeConsul.services, remainingWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services, outofbandWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedWorkloadServiceID)
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
}
// TestConsul_ServiceDeregistration_InProbation asserts that during initialization
@@ -1576,8 +1576,8 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(1 * time.Hour)
remainingTask := testTask()
remainingTask.Services = []*structs.Service{
remainingWorkload := testWorkload()
remainingWorkload.Services = []*structs.Service{
{
Name: "remaining-service",
PortLabel: "x",
@@ -1591,16 +1591,16 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
remainingTaskServiceID := MakeTaskServiceID(remainingTask.AllocID,
remainingTask.Name, remainingTask.Services[0])
remainingWorkloadServiceID := MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkload.Name(), remainingWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
require.Len(ctx.FakeConsul.checks, 1)
explicitlyRemovedTask := testTask()
explicitlyRemovedTask.Services = []*structs.Service{
explicitlyRemovedWorkload := testWorkload()
explicitlyRemovedWorkload.Services = []*structs.Service{
{
Name: "explicitly-removed-service",
PortLabel: "y",
@@ -1614,18 +1614,18 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
explicitlyRemovedTaskServiceID := MakeTaskServiceID(explicitlyRemovedTask.AllocID,
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0])
explicitlyRemovedWorkloadServiceID := MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 2)
require.Len(ctx.FakeConsul.checks, 2)
// we register a task through nomad API then remove it out of band
outofbandTask := testTask()
outofbandTask.Services = []*structs.Service{
outofbandWorkload := testWorkload()
outofbandWorkload.Services = []*structs.Service{
{
Name: "unknown-service",
PortLabel: "x",
@@ -1639,39 +1639,39 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
outofbandTaskServiceID := MakeTaskServiceID(outofbandTask.AllocID,
outofbandTask.Name, outofbandTask.Services[0])
outofbandWorkloadServiceID := MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkload.Name(), outofbandWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 3)
// remove outofbandTask from local services so it appears unknown to client
// remove outofbandWorkload from local services so it appears unknown to client
require.Len(ctx.ServiceClient.services, 3)
require.Len(ctx.ServiceClient.checks, 3)
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
delete(ctx.ServiceClient.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
delete(ctx.ServiceClient.services, outofbandWorkloadServiceID)
delete(ctx.ServiceClient.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.Len(ctx.ServiceClient.services, 2)
require.Len(ctx.ServiceClient.checks, 2)
// Sync and ensure that explicitly removed service was removed, but outofbandTask remains
// Sync and ensure that explicitly removed service was removed, but outofbandWorkload remains
ctx.ServiceClient.RemoveTask(explicitlyRemovedTask)
ctx.ServiceClient.RemoveWorkload(explicitlyRemovedWorkload)
require.NoError(ctx.syncOnce())
require.NoError(ctx.ServiceClient.sync())
require.Len(ctx.FakeConsul.services, 2)
require.Len(ctx.FakeConsul.checks, 2)
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
require.Contains(ctx.FakeConsul.services, outofbandTaskServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
require.Contains(ctx.FakeConsul.services, remainingWorkloadServiceID)
require.Contains(ctx.FakeConsul.services, outofbandWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedWorkloadServiceID)
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
// after probation, outofband services and checks are removed
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
@@ -1680,12 +1680,12 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
require.Len(ctx.FakeConsul.services, 1)
require.Len(ctx.FakeConsul.checks, 1)
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
require.Contains(ctx.FakeConsul.services, remainingWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services, outofbandWorkloadServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedWorkloadServiceID)
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
}

View File

@@ -4455,6 +4455,7 @@ func TestJobEndpoint_ValidateJob_ConsulConnect(t *testing.T) {
tg := j.TaskGroups[0]
tg.Services = tgServices
tg.Networks = nil
err := validateJob(j)
require.Error(t, err)

View File

@@ -3629,15 +3629,16 @@ func (j *Job) LookupTaskGroup(name string) *TaskGroup {
func (j *Job) CombinedTaskMeta(groupName, taskName string) map[string]string {
group := j.LookupTaskGroup(groupName)
if group == nil {
return nil
return j.Meta
}
var meta map[string]string
task := group.LookupTask(taskName)
if task == nil {
return nil
if task != nil {
meta = helper.CopyMapStringString(task.Meta)
}
meta := helper.CopyMapStringString(task.Meta)
if meta == nil {
meta = make(map[string]string, len(group.Meta)+len(j.Meta))
}

View File

@@ -2565,6 +2565,51 @@ func TestJob_ExpandServiceNames(t *testing.T) {
}
func TestJob_CombinedTaskMeta(t *testing.T) {
j := &Job{
Meta: map[string]string{
"job_test": "job",
"group_test": "job",
"task_test": "job",
},
TaskGroups: []*TaskGroup{
{
Name: "group",
Meta: map[string]string{
"group_test": "group",
"task_test": "group",
},
Tasks: []*Task{
{
Name: "task",
Meta: map[string]string{
"task_test": "task",
},
},
},
},
},
}
require := require.New(t)
require.EqualValues(map[string]string{
"job_test": "job",
"group_test": "group",
"task_test": "task",
}, j.CombinedTaskMeta("group", "task"))
require.EqualValues(map[string]string{
"job_test": "job",
"group_test": "group",
"task_test": "group",
}, j.CombinedTaskMeta("group", ""))
require.EqualValues(map[string]string{
"job_test": "job",
"group_test": "job",
"task_test": "job",
}, j.CombinedTaskMeta("", "task"))
}
func TestPeriodicConfig_EnabledInvalid(t *testing.T) {
// Create a config that is enabled but with no interval specified.
p := &PeriodicConfig{Enabled: true}