mirror of
https://github.com/kemko/nomad.git
synced 2026-01-12 05:15:41 +03:00
client: improve group service stanza interpolation and check_re… (#6586)
* client: improve group service stanza interpolation and check_restart support Interpolation can now be done on group service stanzas. Note that some task runtime specific information that was previously available when the service was registered poststart of a task is no longer available. The check_restart stanza for checks defined on group services will now properly restart the allocation upon check failures if configured.
This commit is contained in:
@@ -419,7 +419,7 @@ OUTER:
|
||||
type taskHealthState struct {
|
||||
task *structs.Task
|
||||
state *structs.TaskState
|
||||
taskRegistrations *consul.TaskRegistration
|
||||
taskRegistrations *consul.ServiceRegistrations
|
||||
}
|
||||
|
||||
// event takes the deadline time for the allocation to be healthy and the update
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
cstate "github.com/hashicorp/nomad/client/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/device"
|
||||
@@ -1001,6 +1002,39 @@ func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent
|
||||
return tr.Restart(context.TODO(), taskEvent, false)
|
||||
}
|
||||
|
||||
// Restart satisfies the WorkloadRestarter interface restarts all task runners
|
||||
// concurrently
|
||||
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
|
||||
waitCh := make(chan struct{})
|
||||
var err *multierror.Error
|
||||
var errMutex sync.Mutex
|
||||
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
defer close(waitCh)
|
||||
for tn, tr := range ar.tasks {
|
||||
wg.Add(1)
|
||||
go func(taskName string, r agentconsul.WorkloadRestarter) {
|
||||
defer wg.Done()
|
||||
e := r.Restart(ctx, event, failure)
|
||||
if e != nil {
|
||||
errMutex.Lock()
|
||||
defer errMutex.Unlock()
|
||||
err = multierror.Append(err, fmt.Errorf("failed to restart task %s: %v", taskName, e))
|
||||
}
|
||||
}(tn, tr)
|
||||
}
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-waitCh:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
return err.ErrorOrNil()
|
||||
}
|
||||
|
||||
// RestartAll signalls all task runners in the allocation to restart and passes
|
||||
// a copy of the task event to each restart event.
|
||||
// Returns any errors in a concatenated form.
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
clientconfig "github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
)
|
||||
@@ -125,7 +126,13 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
||||
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
|
||||
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient),
|
||||
newNetworkHook(hookLogger, ns, alloc, nm, nc),
|
||||
newGroupServiceHook(hookLogger, alloc, ar.consulClient),
|
||||
newGroupServiceHook(groupServiceHookConfig{
|
||||
alloc: alloc,
|
||||
consul: ar.consulClient,
|
||||
restarter: ar,
|
||||
taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir),
|
||||
logger: hookLogger,
|
||||
}),
|
||||
newConsulSockHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
|
||||
}
|
||||
|
||||
|
||||
@@ -528,7 +528,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) {
|
||||
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
|
||||
consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
|
||||
return &consul.AllocRegistration{
|
||||
Tasks: map[string]*consul.TaskRegistration{
|
||||
Tasks: map[string]*consul.ServiceRegistrations{
|
||||
task.Name: {
|
||||
Services: map[string]*consul.ServiceRegistration{
|
||||
"123": {
|
||||
@@ -847,7 +847,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
|
||||
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
|
||||
consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
|
||||
return &consul.AllocRegistration{
|
||||
Tasks: map[string]*consul.TaskRegistration{
|
||||
Tasks: map[string]*consul.ServiceRegistrations{
|
||||
task.Name: {
|
||||
Services: map[string]*consul.ServiceRegistration{
|
||||
"123": {
|
||||
|
||||
@@ -33,6 +33,12 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
|
||||
// 5. Assert task and logmon are cleaned up
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job.TaskGroups[0].Services = []*structs.Service{
|
||||
{
|
||||
Name: "foo",
|
||||
PortLabel: "8888",
|
||||
},
|
||||
}
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
@@ -117,13 +123,12 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
|
||||
// 2 removals (canary+noncanary) during prekill
|
||||
// 2 removals (canary+noncanary) during exited
|
||||
// 2 removals (canary+noncanary) during stop
|
||||
// 1 remove group during stop
|
||||
// 2 removals (canary+noncanary) group during stop
|
||||
consulOps := conf2.Consul.(*consul.MockConsulServiceClient).GetOps()
|
||||
require.Len(t, consulOps, 7)
|
||||
for _, op := range consulOps[:6] {
|
||||
require.Len(t, consulOps, 8)
|
||||
for _, op := range consulOps {
|
||||
require.Equal(t, "remove", op.Op)
|
||||
}
|
||||
require.Equal(t, "remove_group", consulOps[6].Op)
|
||||
|
||||
// Assert terminated task event was emitted
|
||||
events := ar2.AllocState().TaskStates[task.Name].Events
|
||||
|
||||
@@ -3,30 +3,63 @@ package allocrunner
|
||||
import (
|
||||
"sync"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
)
|
||||
|
||||
// groupServiceHook manages task group Consul service registration and
|
||||
// deregistration.
|
||||
type groupServiceHook struct {
|
||||
alloc *structs.Allocation
|
||||
allocID string
|
||||
group string
|
||||
restarter agentconsul.WorkloadRestarter
|
||||
consulClient consul.ConsulServiceAPI
|
||||
prerun bool
|
||||
mu sync.Mutex
|
||||
|
||||
logger log.Logger
|
||||
|
||||
// The following fields may be updated
|
||||
canary bool
|
||||
services []*structs.Service
|
||||
networks structs.Networks
|
||||
taskEnvBuilder *taskenv.Builder
|
||||
|
||||
// Since Update() may be called concurrently with any other hook all
|
||||
// hook methods must be fully serialized
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newGroupServiceHook(logger hclog.Logger, alloc *structs.Allocation, consulClient consul.ConsulServiceAPI) *groupServiceHook {
|
||||
type groupServiceHookConfig struct {
|
||||
alloc *structs.Allocation
|
||||
consul consul.ConsulServiceAPI
|
||||
restarter agentconsul.WorkloadRestarter
|
||||
taskEnvBuilder *taskenv.Builder
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
|
||||
h := &groupServiceHook{
|
||||
alloc: alloc,
|
||||
consulClient: consulClient,
|
||||
allocID: cfg.alloc.ID,
|
||||
group: cfg.alloc.TaskGroup,
|
||||
restarter: cfg.restarter,
|
||||
consulClient: cfg.consul,
|
||||
taskEnvBuilder: cfg.taskEnvBuilder,
|
||||
}
|
||||
h.logger = cfg.logger.Named(h.Name())
|
||||
h.services = cfg.alloc.Job.LookupTaskGroup(h.group).Services
|
||||
|
||||
if cfg.alloc.AllocatedResources != nil {
|
||||
h.networks = cfg.alloc.AllocatedResources.Shared.Networks
|
||||
}
|
||||
|
||||
if cfg.alloc.DeploymentStatus != nil {
|
||||
h.canary = cfg.alloc.DeploymentStatus.Canary
|
||||
}
|
||||
h.logger = logger.Named(h.Name())
|
||||
return h
|
||||
}
|
||||
|
||||
@@ -41,14 +74,39 @@ func (h *groupServiceHook) Prerun() error {
|
||||
h.prerun = true
|
||||
h.mu.Unlock()
|
||||
}()
|
||||
return h.consulClient.RegisterGroup(h.alloc)
|
||||
|
||||
if len(h.services) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
services := h.getWorkloadServices()
|
||||
return h.consulClient.RegisterWorkload(services)
|
||||
}
|
||||
|
||||
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
oldAlloc := h.alloc
|
||||
h.alloc = req.Alloc
|
||||
oldWorkloadServices := h.getWorkloadServices()
|
||||
|
||||
// Store new updated values out of request
|
||||
canary := false
|
||||
if req.Alloc.DeploymentStatus != nil {
|
||||
canary = req.Alloc.DeploymentStatus.Canary
|
||||
}
|
||||
|
||||
var networks structs.Networks
|
||||
if req.Alloc.AllocatedResources != nil {
|
||||
networks = req.Alloc.AllocatedResources.Shared.Networks
|
||||
}
|
||||
|
||||
// Update group service hook fields
|
||||
h.networks = networks
|
||||
h.services = req.Alloc.Job.LookupTaskGroup(h.group).Services
|
||||
h.canary = canary
|
||||
h.taskEnvBuilder.UpdateTask(req.Alloc, nil)
|
||||
|
||||
// Create new task services struct with those new values
|
||||
newWorkloadServices := h.getWorkloadServices()
|
||||
|
||||
if !h.prerun {
|
||||
// Update called before Prerun. Update alloc and exit to allow
|
||||
@@ -56,11 +114,57 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
return h.consulClient.UpdateGroup(oldAlloc, h.alloc)
|
||||
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
|
||||
}
|
||||
|
||||
func (h *groupServiceHook) Postrun() error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.consulClient.RemoveGroup(h.alloc)
|
||||
h.deregister()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *groupServiceHook) driverNet() *drivers.DriverNetwork {
|
||||
if len(h.networks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
//TODO(schmichael) only support one network for now
|
||||
net := h.networks[0]
|
||||
//TODO(schmichael) there's probably a better way than hacking driver network
|
||||
return &drivers.DriverNetwork{
|
||||
AutoAdvertise: true,
|
||||
IP: net.IP,
|
||||
// Copy PortLabels from group network
|
||||
PortMap: net.PortLabels(),
|
||||
}
|
||||
}
|
||||
|
||||
// deregister services from Consul.
|
||||
func (h *groupServiceHook) deregister() {
|
||||
if len(h.services) > 0 {
|
||||
workloadServices := h.getWorkloadServices()
|
||||
h.consulClient.RemoveWorkload(workloadServices)
|
||||
|
||||
// Canary flag may be getting flipped when the alloc is being
|
||||
// destroyed, so remove both variations of the service
|
||||
workloadServices.Canary = !workloadServices.Canary
|
||||
h.consulClient.RemoveWorkload(workloadServices)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {
|
||||
// Interpolate with the task's environment
|
||||
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)
|
||||
|
||||
// Create task services struct with request's driver metadata
|
||||
return &agentconsul.WorkloadServices{
|
||||
AllocID: h.allocID,
|
||||
Group: h.group,
|
||||
Restarter: h.restarter,
|
||||
Services: interpolatedServices,
|
||||
DriverNetwork: h.driverNet(),
|
||||
Networks: h.networks,
|
||||
Canary: h.canary,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
ctestutil "github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
@@ -22,10 +27,20 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job.TaskGroups[0].Services = []*structs.Service{{
|
||||
Name: "foo",
|
||||
PortLabel: "9999",
|
||||
}}
|
||||
logger := testlog.HCLogger(t)
|
||||
consulClient := consul.NewMockConsulServiceClient(t, logger)
|
||||
|
||||
h := newGroupServiceHook(logger, alloc, consulClient)
|
||||
h := newGroupServiceHook(groupServiceHookConfig{
|
||||
alloc: alloc,
|
||||
consul: consulClient,
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
})
|
||||
require.NoError(t, h.Prerun())
|
||||
|
||||
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
|
||||
@@ -34,10 +49,10 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
|
||||
require.NoError(t, h.Postrun())
|
||||
|
||||
ops := consulClient.GetOps()
|
||||
require.Len(t, ops, 3)
|
||||
require.Equal(t, "add_group", ops[0].Op)
|
||||
require.Equal(t, "update_group", ops[1].Op)
|
||||
require.Equal(t, "remove_group", ops[2].Op)
|
||||
require.Len(t, ops, 4)
|
||||
require.Equal(t, "add", ops[0].Op)
|
||||
require.Equal(t, "update", ops[1].Op)
|
||||
require.Equal(t, "remove", ops[2].Op)
|
||||
}
|
||||
|
||||
// TestGroupServiceHook_GroupServices asserts group service hooks with group
|
||||
@@ -49,7 +64,13 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
|
||||
logger := testlog.HCLogger(t)
|
||||
consulClient := consul.NewMockConsulServiceClient(t, logger)
|
||||
|
||||
h := newGroupServiceHook(logger, alloc, consulClient)
|
||||
h := newGroupServiceHook(groupServiceHookConfig{
|
||||
alloc: alloc,
|
||||
consul: consulClient,
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
})
|
||||
require.NoError(t, h.Prerun())
|
||||
|
||||
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
|
||||
@@ -58,18 +79,19 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
|
||||
require.NoError(t, h.Postrun())
|
||||
|
||||
ops := consulClient.GetOps()
|
||||
require.Len(t, ops, 3)
|
||||
require.Equal(t, "add_group", ops[0].Op)
|
||||
require.Equal(t, "update_group", ops[1].Op)
|
||||
require.Equal(t, "remove_group", ops[2].Op)
|
||||
require.Len(t, ops, 4)
|
||||
require.Equal(t, "add", ops[0].Op)
|
||||
require.Equal(t, "update", ops[1].Op)
|
||||
require.Equal(t, "remove", ops[2].Op)
|
||||
}
|
||||
|
||||
// TestGroupServiceHook_Error asserts group service hooks with group
|
||||
// services but no group network returns an error.
|
||||
func TestGroupServiceHook_Error(t *testing.T) {
|
||||
func TestGroupServiceHook_NoNetwork(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job.TaskGroups[0].Networks = []*structs.NetworkResource{}
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
tg.Services = []*structs.Service{
|
||||
{
|
||||
@@ -82,15 +104,147 @@ func TestGroupServiceHook_Error(t *testing.T) {
|
||||
}
|
||||
logger := testlog.HCLogger(t)
|
||||
|
||||
// No need to set Consul client or call Run. This hould fail before
|
||||
// attempting to register.
|
||||
consulClient := agentconsul.NewServiceClient(nil, logger, false)
|
||||
consulClient := consul.NewMockConsulServiceClient(t, logger)
|
||||
|
||||
h := newGroupServiceHook(logger, alloc, consulClient)
|
||||
require.Error(t, h.Prerun())
|
||||
h := newGroupServiceHook(groupServiceHookConfig{
|
||||
alloc: alloc,
|
||||
consul: consulClient,
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
})
|
||||
require.NoError(t, h.Prerun())
|
||||
|
||||
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
|
||||
require.Error(t, h.Update(req))
|
||||
require.NoError(t, h.Update(req))
|
||||
|
||||
require.NoError(t, h.Postrun())
|
||||
|
||||
ops := consulClient.GetOps()
|
||||
require.Len(t, ops, 4)
|
||||
require.Equal(t, "add", ops[0].Op)
|
||||
require.Equal(t, "update", ops[1].Op)
|
||||
require.Equal(t, "remove", ops[2].Op)
|
||||
}
|
||||
|
||||
func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job.TaskGroups[0].Networks = []*structs.NetworkResource{}
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
tg.Services = []*structs.Service{
|
||||
{
|
||||
Name: "testconnect",
|
||||
PortLabel: "9999",
|
||||
Connect: &structs.ConsulConnect{
|
||||
SidecarService: &structs.ConsulSidecarService{},
|
||||
},
|
||||
},
|
||||
}
|
||||
logger := testlog.HCLogger(t)
|
||||
|
||||
consulClient := consul.NewMockConsulServiceClient(t, logger)
|
||||
|
||||
h := newGroupServiceHook(groupServiceHookConfig{
|
||||
alloc: alloc,
|
||||
consul: consulClient,
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
})
|
||||
|
||||
services := h.getWorkloadServices()
|
||||
require.Len(t, services.Services, 1)
|
||||
}
|
||||
|
||||
// TestGroupServiceHook_Update08Alloc asserts that adding group services to a previously
|
||||
// 0.8 alloc works.
|
||||
//
|
||||
// COMPAT(0.11) Only valid for upgrades from 0.8.
|
||||
func TestGroupServiceHook_Update08Alloc(t *testing.T) {
|
||||
// Create an embedded Consul server
|
||||
testconsul, err := ctestutil.NewTestServerConfig(func(c *ctestutil.TestServerConfig) {
|
||||
// If -v wasn't specified squelch consul logging
|
||||
if !testing.Verbose() {
|
||||
c.Stdout = ioutil.Discard
|
||||
c.Stderr = ioutil.Discard
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("error starting test consul server: %v", err)
|
||||
}
|
||||
defer testconsul.Stop()
|
||||
|
||||
consulConfig := consulapi.DefaultConfig()
|
||||
consulConfig.Address = testconsul.HTTPAddr
|
||||
consulClient, err := consulapi.NewClient(consulConfig)
|
||||
require.NoError(t, err)
|
||||
serviceClient := agentconsul.NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true)
|
||||
|
||||
// Lower periodicInterval to ensure periodic syncing doesn't improperly
|
||||
// remove Connect services.
|
||||
//const interval = 50 * time.Millisecond
|
||||
//serviceClient.periodicInterval = interval
|
||||
|
||||
// Disable deregistration probation to test syncing
|
||||
//serviceClient.deregisterProbationExpiry = time.Time{}
|
||||
|
||||
go serviceClient.Run()
|
||||
defer serviceClient.Shutdown()
|
||||
|
||||
// Create new 0.10-style alloc
|
||||
alloc := mock.Alloc()
|
||||
alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{
|
||||
{
|
||||
Mode: "bridge",
|
||||
IP: "10.0.0.1",
|
||||
DynamicPorts: []structs.Port{
|
||||
{
|
||||
Label: "connect-proxy-testconnect",
|
||||
Value: 9999,
|
||||
To: 9998,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
tg.Services = []*structs.Service{
|
||||
{
|
||||
Name: "testconnect",
|
||||
PortLabel: "9999",
|
||||
Connect: &structs.ConsulConnect{
|
||||
SidecarService: &structs.ConsulSidecarService{
|
||||
Proxy: &structs.ConsulProxy{
|
||||
LocalServicePort: 9000,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Create old 0.8-style alloc from new alloc
|
||||
oldAlloc := alloc.Copy()
|
||||
oldAlloc.AllocatedResources = nil
|
||||
oldAlloc.Job.LookupTaskGroup(alloc.TaskGroup).Services = nil
|
||||
|
||||
// Create the group service hook
|
||||
h := newGroupServiceHook(groupServiceHookConfig{
|
||||
alloc: oldAlloc,
|
||||
consul: serviceClient,
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), oldAlloc, nil, oldAlloc.Job.Region),
|
||||
logger: testlog.HCLogger(t),
|
||||
})
|
||||
|
||||
require.NoError(t, h.Prerun())
|
||||
require.NoError(t, h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc}))
|
||||
|
||||
// Assert the group and sidecar services are registered
|
||||
require.Eventually(t, func() bool {
|
||||
services, err := consulClient.Agent().Services()
|
||||
require.NoError(t, err)
|
||||
return len(services) == 2
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
require.Error(t, h.Postrun())
|
||||
}
|
||||
|
||||
@@ -243,7 +243,7 @@ func TestHealthHook_SetHealth(t *testing.T) {
|
||||
Name: task.Services[0].Checks[0].Name,
|
||||
Status: consulapi.HealthPassing,
|
||||
}
|
||||
taskRegs := map[string]*agentconsul.TaskRegistration{
|
||||
taskRegs := map[string]*agentconsul.ServiceRegistrations{
|
||||
task.Name: {
|
||||
Services: map[string]*agentconsul.ServiceRegistration{
|
||||
task.Services[0].Name: {
|
||||
|
||||
@@ -80,7 +80,7 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *interfaces.TaskP
|
||||
// it to the secrets directory like Vault tokens.
|
||||
fn := filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json")
|
||||
|
||||
id := agentconsul.MakeTaskServiceID(h.alloc.ID, "group-"+tg.Name, service)
|
||||
id := agentconsul.MakeAllocServiceID(h.alloc.ID, "group-"+tg.Name, service)
|
||||
h.logger.Debug("bootstrapping envoy", "sidecar_for", service.Name, "boostrap_file", fn, "sidecar_for_id", id, "grpc_addr", grpcAddr)
|
||||
|
||||
// Since Consul services are registered asynchronously with this task
|
||||
|
||||
@@ -86,7 +86,7 @@ func TestTaskRunner_EnvoyBootstrapHook_Ok(t *testing.T) {
|
||||
consulClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), logger, true)
|
||||
go consulClient.Run()
|
||||
defer consulClient.Shutdown()
|
||||
require.NoError(t, consulClient.RegisterGroup(alloc))
|
||||
require.NoError(t, consulClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter())))
|
||||
|
||||
// Run Connect bootstrap Hook
|
||||
h := newEnvoyBootstrapHook(alloc, testconsul.HTTPAddr, logger)
|
||||
|
||||
@@ -179,7 +179,7 @@ func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck {
|
||||
if check.Type != structs.ServiceCheckScript {
|
||||
continue
|
||||
}
|
||||
serviceID := agentconsul.MakeTaskServiceID(
|
||||
serviceID := agentconsul.MakeAllocServiceID(
|
||||
h.alloc.ID, h.task.Name, service)
|
||||
sc := newScriptCheck(&scriptCheckConfig{
|
||||
allocID: h.alloc.ID,
|
||||
@@ -213,7 +213,7 @@ func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck {
|
||||
continue
|
||||
}
|
||||
groupTaskName := "group-" + tg.Name
|
||||
serviceID := agentconsul.MakeTaskServiceID(
|
||||
serviceID := agentconsul.MakeAllocServiceID(
|
||||
h.alloc.ID, groupTaskName, service)
|
||||
sc := newScriptCheck(&scriptCheckConfig{
|
||||
allocID: h.alloc.ID,
|
||||
|
||||
@@ -27,7 +27,7 @@ type serviceHookConfig struct {
|
||||
consul consul.ConsulServiceAPI
|
||||
|
||||
// Restarter is a subset of the TaskLifecycle interface
|
||||
restarter agentconsul.TaskRestarter
|
||||
restarter agentconsul.WorkloadRestarter
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
@@ -36,7 +36,7 @@ type serviceHook struct {
|
||||
consul consul.ConsulServiceAPI
|
||||
allocID string
|
||||
taskName string
|
||||
restarter agentconsul.TaskRestarter
|
||||
restarter agentconsul.WorkloadRestarter
|
||||
logger log.Logger
|
||||
|
||||
// The following fields may be updated
|
||||
@@ -97,9 +97,9 @@ func (h *serviceHook) Poststart(ctx context.Context, req *interfaces.TaskPoststa
|
||||
h.taskEnv = req.TaskEnv
|
||||
|
||||
// Create task services struct with request's driver metadata
|
||||
taskServices := h.getTaskServices()
|
||||
workloadServices := h.getWorkloadServices()
|
||||
|
||||
return h.consul.RegisterTask(taskServices)
|
||||
return h.consul.RegisterWorkload(workloadServices)
|
||||
}
|
||||
|
||||
func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error {
|
||||
@@ -108,7 +108,7 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
|
||||
|
||||
// Create old task services struct with request's driver metadata as it
|
||||
// can't change due to Updates
|
||||
oldTaskServices := h.getTaskServices()
|
||||
oldWorkloadServices := h.getWorkloadServices()
|
||||
|
||||
// Store new updated values out of request
|
||||
canary := false
|
||||
@@ -142,9 +142,9 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
|
||||
h.canary = canary
|
||||
|
||||
// Create new task services struct with those new values
|
||||
newTaskServices := h.getTaskServices()
|
||||
newWorkloadServices := h.getWorkloadServices()
|
||||
|
||||
return h.consul.UpdateTask(oldTaskServices, newTaskServices)
|
||||
return h.consul.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
|
||||
}
|
||||
|
||||
func (h *serviceHook) PreKilling(ctx context.Context, req *interfaces.TaskPreKillRequest, resp *interfaces.TaskPreKillResponse) error {
|
||||
@@ -176,13 +176,13 @@ func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *in
|
||||
|
||||
// deregister services from Consul.
|
||||
func (h *serviceHook) deregister() {
|
||||
taskServices := h.getTaskServices()
|
||||
h.consul.RemoveTask(taskServices)
|
||||
workloadServices := h.getWorkloadServices()
|
||||
h.consul.RemoveWorkload(workloadServices)
|
||||
|
||||
// Canary flag may be getting flipped when the alloc is being
|
||||
// destroyed, so remove both variations of the service
|
||||
taskServices.Canary = !taskServices.Canary
|
||||
h.consul.RemoveTask(taskServices)
|
||||
workloadServices.Canary = !workloadServices.Canary
|
||||
h.consul.RemoveWorkload(workloadServices)
|
||||
|
||||
}
|
||||
|
||||
@@ -193,14 +193,14 @@ func (h *serviceHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *serviceHook) getTaskServices() *agentconsul.TaskServices {
|
||||
func (h *serviceHook) getWorkloadServices() *agentconsul.WorkloadServices {
|
||||
// Interpolate with the task's environment
|
||||
interpolatedServices := interpolateServices(h.taskEnv, h.services)
|
||||
interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services)
|
||||
|
||||
// Create task services struct with request's driver metadata
|
||||
return &agentconsul.TaskServices{
|
||||
return &agentconsul.WorkloadServices{
|
||||
AllocID: h.allocID,
|
||||
Name: h.taskName,
|
||||
Task: h.taskName,
|
||||
Restarter: h.restarter,
|
||||
Services: interpolatedServices,
|
||||
DriverExec: h.driverExec,
|
||||
@@ -209,62 +209,3 @@ func (h *serviceHook) getTaskServices() *agentconsul.TaskServices {
|
||||
Canary: h.canary,
|
||||
}
|
||||
}
|
||||
|
||||
// interpolateServices returns an interpolated copy of services and checks with
|
||||
// values from the task's environment.
|
||||
func interpolateServices(taskEnv *taskenv.TaskEnv, services []*structs.Service) []*structs.Service {
|
||||
// Guard against not having a valid taskEnv. This can be the case if the
|
||||
// PreKilling or Exited hook is run before Poststart.
|
||||
if taskEnv == nil || len(services) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
interpolated := make([]*structs.Service, len(services))
|
||||
|
||||
for i, origService := range services {
|
||||
// Create a copy as we need to reinterpolate every time the
|
||||
// environment changes
|
||||
service := origService.Copy()
|
||||
|
||||
for _, check := range service.Checks {
|
||||
check.Name = taskEnv.ReplaceEnv(check.Name)
|
||||
check.Type = taskEnv.ReplaceEnv(check.Type)
|
||||
check.Command = taskEnv.ReplaceEnv(check.Command)
|
||||
check.Args = taskEnv.ParseAndReplace(check.Args)
|
||||
check.Path = taskEnv.ReplaceEnv(check.Path)
|
||||
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
|
||||
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
|
||||
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
|
||||
check.Method = taskEnv.ReplaceEnv(check.Method)
|
||||
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
|
||||
if len(check.Header) > 0 {
|
||||
header := make(map[string][]string, len(check.Header))
|
||||
for k, vs := range check.Header {
|
||||
newVals := make([]string, len(vs))
|
||||
for i, v := range vs {
|
||||
newVals[i] = taskEnv.ReplaceEnv(v)
|
||||
}
|
||||
header[taskEnv.ReplaceEnv(k)] = newVals
|
||||
}
|
||||
check.Header = header
|
||||
}
|
||||
}
|
||||
|
||||
service.Name = taskEnv.ReplaceEnv(service.Name)
|
||||
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
|
||||
service.Tags = taskEnv.ParseAndReplace(service.Tags)
|
||||
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
|
||||
|
||||
if len(service.Meta) > 0 {
|
||||
meta := make(map[string]string, len(service.Meta))
|
||||
for k, v := range service.Meta {
|
||||
meta[k] = taskEnv.ReplaceEnv(v)
|
||||
}
|
||||
service.Meta = meta
|
||||
}
|
||||
|
||||
interpolated[i] = service
|
||||
}
|
||||
|
||||
return interpolated
|
||||
}
|
||||
|
||||
@@ -1,12 +1,7 @@
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Statically assert the stats hook implements the expected interfaces
|
||||
@@ -14,80 +9,3 @@ var _ interfaces.TaskPoststartHook = (*serviceHook)(nil)
|
||||
var _ interfaces.TaskExitedHook = (*serviceHook)(nil)
|
||||
var _ interfaces.TaskPreKillHook = (*serviceHook)(nil)
|
||||
var _ interfaces.TaskUpdateHook = (*serviceHook)(nil)
|
||||
|
||||
// TestTaskRunner_ServiceHook_InterpolateServices asserts that all service
|
||||
// and check fields are properly interpolated.
|
||||
func TestTaskRunner_ServiceHook_InterpolateServices(t *testing.T) {
|
||||
t.Parallel()
|
||||
services := []*structs.Service{
|
||||
{
|
||||
Name: "${name}",
|
||||
PortLabel: "${portlabel}",
|
||||
Tags: []string{"${tags}"},
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "${checkname}",
|
||||
Type: "${checktype}",
|
||||
Command: "${checkcmd}",
|
||||
Args: []string{"${checkarg}"},
|
||||
Path: "${checkstr}",
|
||||
Protocol: "${checkproto}",
|
||||
PortLabel: "${checklabel}",
|
||||
InitialStatus: "${checkstatus}",
|
||||
Method: "${checkmethod}",
|
||||
Header: map[string][]string{
|
||||
"${checkheaderk}": {"${checkheaderv}"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
env := &taskenv.TaskEnv{
|
||||
EnvMap: map[string]string{
|
||||
"name": "name",
|
||||
"portlabel": "portlabel",
|
||||
"tags": "tags",
|
||||
"checkname": "checkname",
|
||||
"checktype": "checktype",
|
||||
"checkcmd": "checkcmd",
|
||||
"checkarg": "checkarg",
|
||||
"checkstr": "checkstr",
|
||||
"checkpath": "checkpath",
|
||||
"checkproto": "checkproto",
|
||||
"checklabel": "checklabel",
|
||||
"checkstatus": "checkstatus",
|
||||
"checkmethod": "checkmethod",
|
||||
"checkheaderk": "checkheaderk",
|
||||
"checkheaderv": "checkheaderv",
|
||||
},
|
||||
}
|
||||
|
||||
interpolated := interpolateServices(env, services)
|
||||
|
||||
exp := []*structs.Service{
|
||||
{
|
||||
Name: "name",
|
||||
PortLabel: "portlabel",
|
||||
Tags: []string{"tags"},
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "checkname",
|
||||
Type: "checktype",
|
||||
Command: "checkcmd",
|
||||
Args: []string{"checkarg"},
|
||||
Path: "checkstr",
|
||||
Protocol: "checkproto",
|
||||
PortLabel: "checklabel",
|
||||
InitialStatus: "checkstatus",
|
||||
Method: "checkmethod",
|
||||
Header: map[string][]string{
|
||||
"checkheaderk": {"checkheaderv"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
require.Equal(t, exp, interpolated)
|
||||
}
|
||||
|
||||
@@ -2,18 +2,14 @@ package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// ConsulServiceAPI is the interface the Nomad Client uses to register and
|
||||
// remove services and checks from Consul.
|
||||
type ConsulServiceAPI interface {
|
||||
RegisterGroup(*structs.Allocation) error
|
||||
RemoveGroup(*structs.Allocation) error
|
||||
UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error
|
||||
RegisterTask(*consul.TaskServices) error
|
||||
RemoveTask(*consul.TaskServices)
|
||||
UpdateTask(old, newTask *consul.TaskServices) error
|
||||
RegisterWorkload(*consul.WorkloadServices) error
|
||||
RemoveWorkload(*consul.WorkloadServices)
|
||||
UpdateWorkload(old, newTask *consul.WorkloadServices) error
|
||||
AllocRegistrations(allocID string) (*consul.AllocRegistration, error)
|
||||
UpdateTTL(id, output, status string) error
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
testing "github.com/mitchellh/go-testing-interface"
|
||||
)
|
||||
|
||||
@@ -54,60 +53,33 @@ func NewMockConsulServiceClient(t testing.T, logger log.Logger) *MockConsulServi
|
||||
return &m
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) RegisterGroup(alloc *structs.Allocation) error {
|
||||
func (m *MockConsulServiceClient) UpdateWorkload(old, newSvcs *consul.WorkloadServices) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
m.logger.Trace("RegisterGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services))
|
||||
m.ops = append(m.ops, NewMockConsulOp("add_group", alloc.ID, alloc.TaskGroup))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) UpdateGroup(_, alloc *structs.Allocation) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
m.logger.Trace("UpdateGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services))
|
||||
m.ops = append(m.ops, NewMockConsulOp("update_group", alloc.ID, alloc.TaskGroup))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) RemoveGroup(alloc *structs.Allocation) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
m.logger.Trace("RemoveGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services))
|
||||
m.ops = append(m.ops, NewMockConsulOp("remove_group", alloc.ID, alloc.TaskGroup))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) UpdateTask(old, newSvcs *consul.TaskServices) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Trace("UpdateTask", "alloc_id", newSvcs.AllocID, "task", newSvcs.Name,
|
||||
m.logger.Trace("UpdateWorkload", "alloc_id", newSvcs.AllocID, "name", newSvcs.Name(),
|
||||
"old_services", len(old.Services), "new_services", len(newSvcs.Services),
|
||||
)
|
||||
m.ops = append(m.ops, NewMockConsulOp("update", newSvcs.AllocID, newSvcs.Name))
|
||||
m.ops = append(m.ops, NewMockConsulOp("update", newSvcs.AllocID, newSvcs.Name()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) RegisterTask(task *consul.TaskServices) error {
|
||||
func (m *MockConsulServiceClient) RegisterWorkload(svcs *consul.WorkloadServices) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Trace("RegisterTask", "alloc_id", task.AllocID, "task", task.Name,
|
||||
"services", len(task.Services),
|
||||
m.logger.Trace("RegisterWorkload", "alloc_id", svcs.AllocID, "name", svcs.Name(),
|
||||
"services", len(svcs.Services),
|
||||
)
|
||||
m.ops = append(m.ops, NewMockConsulOp("add", task.AllocID, task.Name))
|
||||
m.ops = append(m.ops, NewMockConsulOp("add", svcs.AllocID, svcs.Name()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) RemoveTask(task *consul.TaskServices) {
|
||||
func (m *MockConsulServiceClient) RemoveWorkload(svcs *consul.WorkloadServices) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Trace("RemoveTask", "alloc_id", task.AllocID, "task", task.Name,
|
||||
"services", len(task.Services),
|
||||
m.logger.Trace("RemoveWorkload", "alloc_id", svcs.AllocID, "name", svcs.Name(),
|
||||
"services", len(svcs.Services),
|
||||
)
|
||||
m.ops = append(m.ops, NewMockConsulOp("remove", task.AllocID, task.Name))
|
||||
m.ops = append(m.ops, NewMockConsulOp("remove", svcs.AllocID, svcs.Name()))
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) {
|
||||
|
||||
@@ -547,6 +547,9 @@ func (b *Builder) SetDeviceHookEnv(hookName string, envs map[string]string) *Bui
|
||||
// setTask is called from NewBuilder to populate task related environment
|
||||
// variables.
|
||||
func (b *Builder) setTask(task *structs.Task) *Builder {
|
||||
if task == nil {
|
||||
return b
|
||||
}
|
||||
b.taskName = task.Name
|
||||
b.envvars = make(map[string]string, len(task.Env))
|
||||
for k, v := range task.Env {
|
||||
|
||||
@@ -832,3 +832,18 @@ func TestEnvironment_SetPortMapEnvs(t *testing.T) {
|
||||
}
|
||||
require.Equal(t, expected, envs)
|
||||
}
|
||||
|
||||
func TestEnvironment_TasklessBuilder(t *testing.T) {
|
||||
node := mock.Node()
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job.Meta["jobt"] = "foo"
|
||||
alloc.Job.TaskGroups[0].Meta["groupt"] = "bar"
|
||||
require := require.New(t)
|
||||
var taskEnv *TaskEnv
|
||||
require.NotPanics(func() {
|
||||
taskEnv = NewBuilder(node, alloc, nil, "global").SetAllocDir("/tmp/alloc").Build()
|
||||
})
|
||||
|
||||
require.Equal("foo", taskEnv.ReplaceEnv("${NOMAD_META_jobt}"))
|
||||
require.Equal("bar", taskEnv.ReplaceEnv("${NOMAD_META_groupt}"))
|
||||
}
|
||||
|
||||
64
client/taskenv/services.go
Normal file
64
client/taskenv/services.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package taskenv
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// InterpolateServices returns an interpolated copy of services and checks with
|
||||
// values from the task's environment.
|
||||
func InterpolateServices(taskEnv *TaskEnv, services []*structs.Service) []*structs.Service {
|
||||
// Guard against not having a valid taskEnv. This can be the case if the
|
||||
// PreKilling or Exited hook is run before Poststart.
|
||||
if taskEnv == nil || len(services) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
interpolated := make([]*structs.Service, len(services))
|
||||
|
||||
for i, origService := range services {
|
||||
// Create a copy as we need to reinterpolate every time the
|
||||
// environment changes
|
||||
service := origService.Copy()
|
||||
|
||||
for _, check := range service.Checks {
|
||||
check.Name = taskEnv.ReplaceEnv(check.Name)
|
||||
check.Type = taskEnv.ReplaceEnv(check.Type)
|
||||
check.Command = taskEnv.ReplaceEnv(check.Command)
|
||||
check.Args = taskEnv.ParseAndReplace(check.Args)
|
||||
check.Path = taskEnv.ReplaceEnv(check.Path)
|
||||
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
|
||||
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
|
||||
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
|
||||
check.Method = taskEnv.ReplaceEnv(check.Method)
|
||||
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
|
||||
if len(check.Header) > 0 {
|
||||
header := make(map[string][]string, len(check.Header))
|
||||
for k, vs := range check.Header {
|
||||
newVals := make([]string, len(vs))
|
||||
for i, v := range vs {
|
||||
newVals[i] = taskEnv.ReplaceEnv(v)
|
||||
}
|
||||
header[taskEnv.ReplaceEnv(k)] = newVals
|
||||
}
|
||||
check.Header = header
|
||||
}
|
||||
}
|
||||
|
||||
service.Name = taskEnv.ReplaceEnv(service.Name)
|
||||
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
|
||||
service.Tags = taskEnv.ParseAndReplace(service.Tags)
|
||||
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
|
||||
|
||||
if len(service.Meta) > 0 {
|
||||
meta := make(map[string]string, len(service.Meta))
|
||||
for k, v := range service.Meta {
|
||||
meta[k] = taskEnv.ReplaceEnv(v)
|
||||
}
|
||||
service.Meta = meta
|
||||
}
|
||||
|
||||
interpolated[i] = service
|
||||
}
|
||||
|
||||
return interpolated
|
||||
}
|
||||
85
client/taskenv/services_test.go
Normal file
85
client/taskenv/services_test.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package taskenv
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestInterpolateServices asserts that all service
|
||||
// and check fields are properly interpolated.
|
||||
func TestInterpolateServices(t *testing.T) {
|
||||
t.Parallel()
|
||||
services := []*structs.Service{
|
||||
{
|
||||
Name: "${name}",
|
||||
PortLabel: "${portlabel}",
|
||||
Tags: []string{"${tags}"},
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "${checkname}",
|
||||
Type: "${checktype}",
|
||||
Command: "${checkcmd}",
|
||||
Args: []string{"${checkarg}"},
|
||||
Path: "${checkstr}",
|
||||
Protocol: "${checkproto}",
|
||||
PortLabel: "${checklabel}",
|
||||
InitialStatus: "${checkstatus}",
|
||||
Method: "${checkmethod}",
|
||||
Header: map[string][]string{
|
||||
"${checkheaderk}": {"${checkheaderv}"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
env := &TaskEnv{
|
||||
EnvMap: map[string]string{
|
||||
"name": "name",
|
||||
"portlabel": "portlabel",
|
||||
"tags": "tags",
|
||||
"checkname": "checkname",
|
||||
"checktype": "checktype",
|
||||
"checkcmd": "checkcmd",
|
||||
"checkarg": "checkarg",
|
||||
"checkstr": "checkstr",
|
||||
"checkpath": "checkpath",
|
||||
"checkproto": "checkproto",
|
||||
"checklabel": "checklabel",
|
||||
"checkstatus": "checkstatus",
|
||||
"checkmethod": "checkmethod",
|
||||
"checkheaderk": "checkheaderk",
|
||||
"checkheaderv": "checkheaderv",
|
||||
},
|
||||
}
|
||||
|
||||
interpolated := InterpolateServices(env, services)
|
||||
|
||||
exp := []*structs.Service{
|
||||
{
|
||||
Name: "name",
|
||||
PortLabel: "portlabel",
|
||||
Tags: []string{"tags"},
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "checkname",
|
||||
Type: "checktype",
|
||||
Command: "checkcmd",
|
||||
Args: []string{"checkarg"},
|
||||
Path: "checkstr",
|
||||
Protocol: "checkproto",
|
||||
PortLabel: "checklabel",
|
||||
InitialStatus: "checkstatus",
|
||||
Method: "checkmethod",
|
||||
Header: map[string][]string{
|
||||
"checkheaderk": {"checkheaderv"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
require.Equal(t, exp, interpolated)
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
@@ -22,8 +22,8 @@ type ChecksAPI interface {
|
||||
Checks() (map[string]*api.AgentCheck, error)
|
||||
}
|
||||
|
||||
// TaskRestarter allows the checkWatcher to restart tasks.
|
||||
type TaskRestarter interface {
|
||||
// WorkloadRestarter allows the checkWatcher to restart tasks or entire task groups.
|
||||
type WorkloadRestarter interface {
|
||||
Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ type checkRestart struct {
|
||||
checkName string
|
||||
taskKey string // composite of allocID + taskName for uniqueness
|
||||
|
||||
task TaskRestarter
|
||||
task WorkloadRestarter
|
||||
grace time.Duration
|
||||
interval time.Duration
|
||||
timeLimit time.Duration
|
||||
@@ -114,7 +114,7 @@ func (c *checkRestart) apply(ctx context.Context, now time.Time, status string)
|
||||
|
||||
// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended
|
||||
// to be called in a goroutine.
|
||||
func asyncRestart(ctx context.Context, logger log.Logger, task TaskRestarter, event *structs.TaskEvent) {
|
||||
func asyncRestart(ctx context.Context, logger log.Logger, task WorkloadRestarter, event *structs.TaskEvent) {
|
||||
// Check watcher restarts are always failures
|
||||
const failure = true
|
||||
|
||||
@@ -292,7 +292,7 @@ func (w *checkWatcher) Run(ctx context.Context) {
|
||||
}
|
||||
|
||||
// Watch a check and restart its task if unhealthy.
|
||||
func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, restarter TaskRestarter) {
|
||||
func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, restarter WorkloadRestarter) {
|
||||
if !check.TriggersRestarts() {
|
||||
// Not watched, noop
|
||||
return
|
||||
|
||||
@@ -115,12 +115,12 @@ type operations struct {
|
||||
// allocations by task.
|
||||
type AllocRegistration struct {
|
||||
// Tasks maps the name of a task to its registered services and checks
|
||||
Tasks map[string]*TaskRegistration
|
||||
Tasks map[string]*ServiceRegistrations
|
||||
}
|
||||
|
||||
func (a *AllocRegistration) copy() *AllocRegistration {
|
||||
c := &AllocRegistration{
|
||||
Tasks: make(map[string]*TaskRegistration, len(a.Tasks)),
|
||||
Tasks: make(map[string]*ServiceRegistrations, len(a.Tasks)),
|
||||
}
|
||||
|
||||
for k, v := range a.Tasks {
|
||||
@@ -164,14 +164,14 @@ func (a *AllocRegistration) NumChecks() int {
|
||||
return total
|
||||
}
|
||||
|
||||
// TaskRegistration holds the status of services registered for a particular
|
||||
// task.
|
||||
type TaskRegistration struct {
|
||||
// ServiceRegistrations holds the status of services registered for a particular
|
||||
// task or task group.
|
||||
type ServiceRegistrations struct {
|
||||
Services map[string]*ServiceRegistration
|
||||
}
|
||||
|
||||
func (t *TaskRegistration) copy() *TaskRegistration {
|
||||
c := &TaskRegistration{
|
||||
func (t *ServiceRegistrations) copy() *ServiceRegistrations {
|
||||
c := &ServiceRegistrations{
|
||||
Services: make(map[string]*ServiceRegistration, len(t.Services)),
|
||||
}
|
||||
|
||||
@@ -675,11 +675,11 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
|
||||
// serviceRegs creates service registrations, check registrations, and script
|
||||
// checks from a service. It returns a service registration object with the
|
||||
// service and check IDs populated.
|
||||
func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, task *TaskServices) (
|
||||
func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, workload *WorkloadServices) (
|
||||
*ServiceRegistration, error) {
|
||||
|
||||
// Get the services ID
|
||||
id := MakeTaskServiceID(task.AllocID, task.Name, service)
|
||||
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
|
||||
sreg := &ServiceRegistration{
|
||||
serviceID: id,
|
||||
checkIDs: make(map[string]struct{}, len(service.Checks)),
|
||||
@@ -692,14 +692,14 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
|
||||
}
|
||||
|
||||
// Determine the address to advertise based on the mode
|
||||
ip, port, err := getAddress(addrMode, service.PortLabel, task.Networks, task.DriverNetwork)
|
||||
ip, port, err := getAddress(addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err)
|
||||
}
|
||||
|
||||
// Determine whether to use tags or canary_tags
|
||||
var tags []string
|
||||
if task.Canary && len(service.CanaryTags) > 0 {
|
||||
if workload.Canary && len(service.CanaryTags) > 0 {
|
||||
tags = make([]string, len(service.CanaryTags))
|
||||
copy(tags, service.CanaryTags)
|
||||
} else {
|
||||
@@ -708,7 +708,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
|
||||
}
|
||||
|
||||
// newConnect returns (nil, nil) if there's no Connect-enabled service.
|
||||
connect, err := newConnect(service.Name, service.Connect, task.Networks)
|
||||
connect, err := newConnect(service.Name, service.Connect, workload.Networks)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid Consul Connect configuration for service %q: %v", service.Name, err)
|
||||
}
|
||||
@@ -734,7 +734,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
|
||||
ops.regServices = append(ops.regServices, serviceReg)
|
||||
|
||||
// Build the check registrations
|
||||
checkIDs, err := c.checkRegs(ops, id, service, task)
|
||||
checkIDs, err := c.checkRegs(ops, id, service, workload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -747,7 +747,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
|
||||
// checkRegs registers the checks for the given service and returns the
|
||||
// registered check ids.
|
||||
func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *structs.Service,
|
||||
task *TaskServices) ([]string, error) {
|
||||
workload *WorkloadServices) ([]string, error) {
|
||||
|
||||
// Fast path
|
||||
numChecks := len(service.Checks)
|
||||
@@ -782,7 +782,7 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
|
||||
addrMode = structs.AddressModeHost
|
||||
}
|
||||
|
||||
ip, port, err := getAddress(addrMode, portLabel, task.Networks, task.DriverNetwork)
|
||||
ip, port, err := getAddress(addrMode, portLabel, workload.Networks, workload.DriverNetwork)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err)
|
||||
}
|
||||
@@ -796,186 +796,67 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
|
||||
return checkIDs, nil
|
||||
}
|
||||
|
||||
//TODO(schmichael) remove
|
||||
type noopRestarter struct{}
|
||||
|
||||
func (noopRestarter) Restart(context.Context, *structs.TaskEvent, bool) error { return nil }
|
||||
|
||||
// makeAllocTaskServices creates a TaskServices struct for a group service.
|
||||
//
|
||||
//TODO(schmichael) rename TaskServices and refactor this into a New method
|
||||
func makeAllocTaskServices(alloc *structs.Allocation, tg *structs.TaskGroup) (*TaskServices, error) {
|
||||
//COMPAT(0.11) AllocatedResources is only nil when upgrading directly
|
||||
// from 0.8.
|
||||
if alloc.AllocatedResources == nil || len(alloc.AllocatedResources.Shared.Networks) == 0 {
|
||||
return nil, fmt.Errorf("unable to register a group service without a group network")
|
||||
}
|
||||
|
||||
//TODO(schmichael) only support one network for now
|
||||
net := alloc.AllocatedResources.Shared.Networks[0]
|
||||
|
||||
ts := &TaskServices{
|
||||
AllocID: alloc.ID,
|
||||
Name: "group-" + alloc.TaskGroup,
|
||||
Services: tg.Services,
|
||||
Networks: alloc.AllocatedResources.Shared.Networks,
|
||||
|
||||
//TODO(schmichael) there's probably a better way than hacking driver network
|
||||
DriverNetwork: &drivers.DriverNetwork{
|
||||
AutoAdvertise: true,
|
||||
IP: net.IP,
|
||||
// Copy PortLabels from group network
|
||||
PortMap: net.PortLabels(),
|
||||
},
|
||||
|
||||
// unsupported for group services
|
||||
Restarter: noopRestarter{},
|
||||
DriverExec: nil,
|
||||
}
|
||||
|
||||
if alloc.DeploymentStatus != nil {
|
||||
ts.Canary = alloc.DeploymentStatus.Canary
|
||||
}
|
||||
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
// RegisterGroup services with Consul. Adds all task group-level service
|
||||
// entries and checks to Consul.
|
||||
func (c *ServiceClient) RegisterGroup(alloc *structs.Allocation) error {
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
|
||||
}
|
||||
|
||||
if len(tg.Services) == 0 {
|
||||
// noop
|
||||
return nil
|
||||
}
|
||||
|
||||
ts, err := makeAllocTaskServices(alloc, tg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.RegisterTask(ts)
|
||||
}
|
||||
|
||||
// UpdateGroup services with Consul. Updates all task group-level service
|
||||
// entries and checks to Consul.
|
||||
func (c *ServiceClient) UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error {
|
||||
oldTG := oldAlloc.Job.LookupTaskGroup(oldAlloc.TaskGroup)
|
||||
if oldTG == nil {
|
||||
return fmt.Errorf("task group %q not in old allocation", oldAlloc.TaskGroup)
|
||||
}
|
||||
|
||||
if len(oldTG.Services) == 0 {
|
||||
// No old group services, simply add new group services
|
||||
return c.RegisterGroup(newAlloc)
|
||||
}
|
||||
|
||||
oldServices, err := makeAllocTaskServices(oldAlloc, oldTG)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newTG := newAlloc.Job.LookupTaskGroup(newAlloc.TaskGroup)
|
||||
if newTG == nil {
|
||||
return fmt.Errorf("task group %q not in new allocation", newAlloc.TaskGroup)
|
||||
}
|
||||
|
||||
newServices, err := makeAllocTaskServices(newAlloc, newTG)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.UpdateTask(oldServices, newServices)
|
||||
}
|
||||
|
||||
// RemoveGroup services with Consul. Removes all task group-level service
|
||||
// entries and checks from Consul.
|
||||
func (c *ServiceClient) RemoveGroup(alloc *structs.Allocation) error {
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
|
||||
}
|
||||
|
||||
if len(tg.Services) == 0 {
|
||||
// noop
|
||||
return nil
|
||||
}
|
||||
ts, err := makeAllocTaskServices(alloc, tg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.RemoveTask(ts)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterTask with Consul. Adds all service entries and checks to Consul. If
|
||||
// exec is nil and a script check exists an error is returned.
|
||||
// RegisterWorkload with Consul. Adds all service entries and checks to Consul.
|
||||
//
|
||||
// If the service IP is set it used as the address in the service registration.
|
||||
// Checks will always use the IP from the Task struct (host's IP).
|
||||
//
|
||||
// Actual communication with Consul is done asynchronously (see Run).
|
||||
func (c *ServiceClient) RegisterTask(task *TaskServices) error {
|
||||
func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
|
||||
// Fast path
|
||||
numServices := len(task.Services)
|
||||
numServices := len(workload.Services)
|
||||
if numServices == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
t := new(TaskRegistration)
|
||||
t := new(ServiceRegistrations)
|
||||
t.Services = make(map[string]*ServiceRegistration, numServices)
|
||||
|
||||
ops := &operations{}
|
||||
for _, service := range task.Services {
|
||||
sreg, err := c.serviceRegs(ops, service, task)
|
||||
for _, service := range workload.Services {
|
||||
sreg, err := c.serviceRegs(ops, service, workload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.Services[sreg.serviceID] = sreg
|
||||
}
|
||||
|
||||
// Add the task to the allocation's registration
|
||||
c.addTaskRegistration(task.AllocID, task.Name, t)
|
||||
// Add the workload to the allocation's registration
|
||||
c.addRegistrations(workload.AllocID, workload.Name(), t)
|
||||
|
||||
c.commit(ops)
|
||||
|
||||
// Start watching checks. Done after service registrations are built
|
||||
// since an error building them could leak watches.
|
||||
for _, service := range task.Services {
|
||||
serviceID := MakeTaskServiceID(task.AllocID, task.Name, service)
|
||||
for _, service := range workload.Services {
|
||||
serviceID := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
|
||||
for _, check := range service.Checks {
|
||||
if check.TriggersRestarts() {
|
||||
checkID := MakeCheckID(serviceID, check)
|
||||
c.checkWatcher.Watch(task.AllocID, task.Name, checkID, check, task.Restarter)
|
||||
c.checkWatcher.Watch(workload.AllocID, workload.Name(), checkID, check, workload.Restarter)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateTask in Consul. Does not alter the service if only checks have
|
||||
// UpdateWorkload in Consul. Does not alter the service if only checks have
|
||||
// changed.
|
||||
//
|
||||
// DriverNetwork must not change between invocations for the same allocation.
|
||||
func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
|
||||
func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error {
|
||||
ops := &operations{}
|
||||
|
||||
taskReg := new(TaskRegistration)
|
||||
taskReg.Services = make(map[string]*ServiceRegistration, len(newTask.Services))
|
||||
regs := new(ServiceRegistrations)
|
||||
regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services))
|
||||
|
||||
existingIDs := make(map[string]*structs.Service, len(old.Services))
|
||||
for _, s := range old.Services {
|
||||
existingIDs[MakeTaskServiceID(old.AllocID, old.Name, s)] = s
|
||||
existingIDs[MakeAllocServiceID(old.AllocID, old.Name(), s)] = s
|
||||
}
|
||||
newIDs := make(map[string]*structs.Service, len(newTask.Services))
|
||||
for _, s := range newTask.Services {
|
||||
newIDs[MakeTaskServiceID(newTask.AllocID, newTask.Name, s)] = s
|
||||
newIDs := make(map[string]*structs.Service, len(newWorkload.Services))
|
||||
for _, s := range newWorkload.Services {
|
||||
newIDs[MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s
|
||||
}
|
||||
|
||||
// Loop over existing Service IDs to see if they have been removed
|
||||
@@ -997,8 +878,8 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
|
||||
continue
|
||||
}
|
||||
|
||||
oldHash := existingSvc.Hash(old.AllocID, old.Name, old.Canary)
|
||||
newHash := newSvc.Hash(newTask.AllocID, newTask.Name, newTask.Canary)
|
||||
oldHash := existingSvc.Hash(old.AllocID, old.Name(), old.Canary)
|
||||
newHash := newSvc.Hash(newWorkload.AllocID, newWorkload.Name(), newWorkload.Canary)
|
||||
if oldHash == newHash {
|
||||
// Service exists and hasn't changed, don't re-add it later
|
||||
delete(newIDs, existingID)
|
||||
@@ -1009,7 +890,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
|
||||
serviceID: existingID,
|
||||
checkIDs: make(map[string]struct{}, len(newSvc.Checks)),
|
||||
}
|
||||
taskReg.Services[existingID] = sreg
|
||||
regs.Services[existingID] = sreg
|
||||
|
||||
// See if any checks were updated
|
||||
existingChecks := make(map[string]*structs.ServiceCheck, len(existingSvc.Checks))
|
||||
@@ -1028,7 +909,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
|
||||
}
|
||||
|
||||
// New check on an unchanged service; add them now
|
||||
newCheckIDs, err := c.checkRegs(ops, existingID, newSvc, newTask)
|
||||
newCheckIDs, err := c.checkRegs(ops, existingID, newSvc, newWorkload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1039,7 +920,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
|
||||
|
||||
// Update all watched checks as CheckRestart fields aren't part of ID
|
||||
if check.TriggersRestarts() {
|
||||
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
|
||||
c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1056,41 +937,41 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
|
||||
|
||||
// Any remaining services should just be enqueued directly
|
||||
for _, newSvc := range newIDs {
|
||||
sreg, err := c.serviceRegs(ops, newSvc, newTask)
|
||||
sreg, err := c.serviceRegs(ops, newSvc, newWorkload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
taskReg.Services[sreg.serviceID] = sreg
|
||||
regs.Services[sreg.serviceID] = sreg
|
||||
}
|
||||
|
||||
// Add the task to the allocation's registration
|
||||
c.addTaskRegistration(newTask.AllocID, newTask.Name, taskReg)
|
||||
c.addRegistrations(newWorkload.AllocID, newWorkload.Name(), regs)
|
||||
|
||||
c.commit(ops)
|
||||
|
||||
// Start watching checks. Done after service registrations are built
|
||||
// since an error building them could leak watches.
|
||||
for _, service := range newIDs {
|
||||
serviceID := MakeTaskServiceID(newTask.AllocID, newTask.Name, service)
|
||||
serviceID := MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), service)
|
||||
for _, check := range service.Checks {
|
||||
if check.TriggersRestarts() {
|
||||
checkID := MakeCheckID(serviceID, check)
|
||||
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
|
||||
c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveTask from Consul. Removes all service entries and checks.
|
||||
// RemoveWorkload from Consul. Removes all service entries and checks.
|
||||
//
|
||||
// Actual communication with Consul is done asynchronously (see Run).
|
||||
func (c *ServiceClient) RemoveTask(task *TaskServices) {
|
||||
func (c *ServiceClient) RemoveWorkload(workload *WorkloadServices) {
|
||||
ops := operations{}
|
||||
|
||||
for _, service := range task.Services {
|
||||
id := MakeTaskServiceID(task.AllocID, task.Name, service)
|
||||
for _, service := range workload.Services {
|
||||
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
|
||||
ops.deregServices = append(ops.deregServices, id)
|
||||
|
||||
for _, check := range service.Checks {
|
||||
@@ -1103,8 +984,8 @@ func (c *ServiceClient) RemoveTask(task *TaskServices) {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the task from the alloc's registrations
|
||||
c.removeTaskRegistration(task.AllocID, task.Name)
|
||||
// Remove the workload from the alloc's registrations
|
||||
c.removeRegistration(workload.AllocID, workload.Name())
|
||||
|
||||
// Now add them to the deregistration fields; main Run loop will update
|
||||
c.commit(&ops)
|
||||
@@ -1203,23 +1084,23 @@ func (c *ServiceClient) Shutdown() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// addTaskRegistration adds the task registration for the given allocation.
|
||||
func (c *ServiceClient) addTaskRegistration(allocID, taskName string, reg *TaskRegistration) {
|
||||
// addRegistration adds the service registrations for the given allocation.
|
||||
func (c *ServiceClient) addRegistrations(allocID, taskName string, reg *ServiceRegistrations) {
|
||||
c.allocRegistrationsLock.Lock()
|
||||
defer c.allocRegistrationsLock.Unlock()
|
||||
|
||||
alloc, ok := c.allocRegistrations[allocID]
|
||||
if !ok {
|
||||
alloc = &AllocRegistration{
|
||||
Tasks: make(map[string]*TaskRegistration),
|
||||
Tasks: make(map[string]*ServiceRegistrations),
|
||||
}
|
||||
c.allocRegistrations[allocID] = alloc
|
||||
}
|
||||
alloc.Tasks[taskName] = reg
|
||||
}
|
||||
|
||||
// removeTaskRegistration removes the task registration for the given allocation.
|
||||
func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) {
|
||||
// removeRegistrations removes the registration for the given allocation.
|
||||
func (c *ServiceClient) removeRegistration(allocID, taskName string) {
|
||||
c.allocRegistrationsLock.Lock()
|
||||
defer c.allocRegistrationsLock.Unlock()
|
||||
|
||||
@@ -1249,11 +1130,11 @@ func makeAgentServiceID(role string, service *structs.Service) string {
|
||||
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "", false))
|
||||
}
|
||||
|
||||
// MakeTaskServiceID creates a unique ID for identifying a task service in
|
||||
// MakeAllocServiceID creates a unique ID for identifying an alloc service in
|
||||
// Consul.
|
||||
//
|
||||
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http-http
|
||||
func MakeTaskServiceID(allocID, taskName string, service *structs.Service) string {
|
||||
func MakeAllocServiceID(allocID, taskName string, service *structs.Service) string {
|
||||
return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel)
|
||||
}
|
||||
|
||||
|
||||
@@ -64,6 +64,9 @@ func TestConsul_Connect(t *testing.T) {
|
||||
{
|
||||
Name: "testconnect",
|
||||
PortLabel: "9999",
|
||||
Meta: map[string]string{
|
||||
"alloc_id": "${NOMAD_ALLOC_ID}",
|
||||
},
|
||||
Connect: &structs.ConsulConnect{
|
||||
SidecarService: &structs.ConsulSidecarService{
|
||||
Proxy: &structs.ConsulProxy{
|
||||
@@ -76,10 +79,10 @@ func TestConsul_Connect(t *testing.T) {
|
||||
|
||||
// required by isNomadSidecar assertion below
|
||||
serviceRegMap := map[string]*api.AgentServiceRegistration{
|
||||
MakeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0]): nil,
|
||||
MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0]): nil,
|
||||
}
|
||||
|
||||
require.NoError(t, serviceClient.RegisterGroup(alloc))
|
||||
require.NoError(t, serviceClient.RegisterWorkload(BuildAllocServices(mock.Node(), alloc, NoopRestarter())))
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
services, err := consulClient.Agent().Services()
|
||||
@@ -94,7 +97,7 @@ func TestConsul_Connect(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Len(t, services, 2)
|
||||
|
||||
serviceID := MakeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0])
|
||||
serviceID := MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0])
|
||||
connectID := serviceID + "-sidecar-proxy"
|
||||
|
||||
require.Contains(t, services, serviceID)
|
||||
@@ -123,88 +126,8 @@ func TestConsul_Connect(t *testing.T) {
|
||||
"bind_address": "0.0.0.0",
|
||||
"bind_port": float64(9998),
|
||||
})
|
||||
require.Equal(t, alloc.ID, agentService.Meta["alloc_id"])
|
||||
|
||||
time.Sleep(interval >> 2)
|
||||
}
|
||||
}
|
||||
|
||||
// TestConsul_Update08Alloc asserts that adding group services to a previously
|
||||
// 0.8 alloc works.
|
||||
//
|
||||
// COMPAT(0.11) Only valid for upgrades from 0.8.
|
||||
func TestConsul_Update08Alloc(t *testing.T) {
|
||||
// Create an embedded Consul server
|
||||
testconsul, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) {
|
||||
// If -v wasn't specified squelch consul logging
|
||||
if !testing.Verbose() {
|
||||
c.Stdout = ioutil.Discard
|
||||
c.Stderr = ioutil.Discard
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("error starting test consul server: %v", err)
|
||||
}
|
||||
defer testconsul.Stop()
|
||||
|
||||
consulConfig := consulapi.DefaultConfig()
|
||||
consulConfig.Address = testconsul.HTTPAddr
|
||||
consulClient, err := consulapi.NewClient(consulConfig)
|
||||
require.NoError(t, err)
|
||||
serviceClient := NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true)
|
||||
|
||||
// Lower periodicInterval to ensure periodic syncing doesn't improperly
|
||||
// remove Connect services.
|
||||
const interval = 50 * time.Millisecond
|
||||
serviceClient.periodicInterval = interval
|
||||
|
||||
// Disable deregistration probation to test syncing
|
||||
serviceClient.deregisterProbationExpiry = time.Time{}
|
||||
|
||||
go serviceClient.Run()
|
||||
defer serviceClient.Shutdown()
|
||||
|
||||
// Create new 0.10-style alloc
|
||||
alloc := mock.Alloc()
|
||||
alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{
|
||||
{
|
||||
Mode: "bridge",
|
||||
IP: "10.0.0.1",
|
||||
DynamicPorts: []structs.Port{
|
||||
{
|
||||
Label: "connect-proxy-testconnect",
|
||||
Value: 9999,
|
||||
To: 9998,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
tg.Services = []*structs.Service{
|
||||
{
|
||||
Name: "testconnect",
|
||||
PortLabel: "9999",
|
||||
Connect: &structs.ConsulConnect{
|
||||
SidecarService: &structs.ConsulSidecarService{
|
||||
Proxy: &structs.ConsulProxy{
|
||||
LocalServicePort: 9000,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Create old 0.8-style alloc from new alloc
|
||||
oldAlloc := alloc.Copy()
|
||||
oldAlloc.AllocatedResources = nil
|
||||
oldAlloc.Job.LookupTaskGroup(alloc.TaskGroup).Services = nil
|
||||
|
||||
// Expect new services to get registered
|
||||
require.NoError(t, serviceClient.UpdateGroup(oldAlloc, alloc))
|
||||
|
||||
// Assert the group and sidecar services are registered
|
||||
require.Eventually(t, func() bool {
|
||||
services, err := consulClient.Agent().Services()
|
||||
require.NoError(t, err)
|
||||
return len(services) == 2
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
@@ -2,22 +2,28 @@ package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
)
|
||||
|
||||
type TaskServices struct {
|
||||
// WorkloadServices describes services defined in either a Task or TaskGroup
|
||||
// that need to be syncronized with Consul
|
||||
type WorkloadServices struct {
|
||||
AllocID string
|
||||
|
||||
// Name of the task
|
||||
Name string
|
||||
// Name of the task and task group the services are defined for. For
|
||||
// group based services, Task will be empty
|
||||
Task string
|
||||
Group string
|
||||
|
||||
// Canary indicates whether or not the allocation is a canary
|
||||
Canary bool
|
||||
|
||||
// Restarter allows restarting the task depending on the task's
|
||||
// Restarter allows restarting the task or task group depending on the
|
||||
// check_restart stanzas.
|
||||
Restarter TaskRestarter
|
||||
Restarter WorkloadRestarter
|
||||
|
||||
// Services and checks to register for the task.
|
||||
Services []*structs.Service
|
||||
@@ -26,41 +32,49 @@ type TaskServices struct {
|
||||
Networks structs.Networks
|
||||
|
||||
// DriverExec is the script executor for the task's driver.
|
||||
// For group services this is nil and script execution is managed by
|
||||
// a tasklet in the taskrunner script_check_hook
|
||||
DriverExec interfaces.ScriptExecutor
|
||||
|
||||
// DriverNetwork is the network specified by the driver and may be nil.
|
||||
DriverNetwork *drivers.DriverNetwork
|
||||
}
|
||||
|
||||
func NewTaskServices(alloc *structs.Allocation, task *structs.Task, restarter TaskRestarter, exec interfaces.ScriptExecutor, net *drivers.DriverNetwork) *TaskServices {
|
||||
ts := TaskServices{
|
||||
AllocID: alloc.ID,
|
||||
Name: task.Name,
|
||||
Restarter: restarter,
|
||||
Services: task.Services,
|
||||
DriverExec: exec,
|
||||
DriverNetwork: net,
|
||||
func BuildAllocServices(node *structs.Node, alloc *structs.Allocation, restarter WorkloadRestarter) *WorkloadServices {
|
||||
|
||||
//TODO(schmichael) only support one network for now
|
||||
net := alloc.AllocatedResources.Shared.Networks[0]
|
||||
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
|
||||
ws := &WorkloadServices{
|
||||
AllocID: alloc.ID,
|
||||
Group: alloc.TaskGroup,
|
||||
Services: taskenv.InterpolateServices(taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build(), tg.Services),
|
||||
Networks: alloc.AllocatedResources.Shared.Networks,
|
||||
|
||||
//TODO(schmichael) there's probably a better way than hacking driver network
|
||||
DriverNetwork: &drivers.DriverNetwork{
|
||||
AutoAdvertise: true,
|
||||
IP: net.IP,
|
||||
// Copy PortLabels from group network
|
||||
PortMap: net.PortLabels(),
|
||||
},
|
||||
|
||||
Restarter: restarter,
|
||||
DriverExec: nil,
|
||||
}
|
||||
|
||||
if alloc.AllocatedResources != nil {
|
||||
if tr, ok := alloc.AllocatedResources.Tasks[task.Name]; ok {
|
||||
ts.Networks = tr.Networks
|
||||
}
|
||||
} else if task.Resources != nil {
|
||||
// COMPAT(0.11): Remove in 0.11
|
||||
ts.Networks = task.Resources.Networks
|
||||
if alloc.DeploymentStatus != nil {
|
||||
ws.Canary = alloc.DeploymentStatus.Canary
|
||||
}
|
||||
|
||||
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Canary {
|
||||
ts.Canary = true
|
||||
}
|
||||
|
||||
return &ts
|
||||
return ws
|
||||
}
|
||||
|
||||
// Copy method for easing tests
|
||||
func (t *TaskServices) Copy() *TaskServices {
|
||||
newTS := new(TaskServices)
|
||||
func (t *WorkloadServices) Copy() *WorkloadServices {
|
||||
newTS := new(WorkloadServices)
|
||||
*newTS = *t
|
||||
|
||||
// Deep copy Services
|
||||
@@ -70,3 +84,11 @@ func (t *TaskServices) Copy() *TaskServices {
|
||||
}
|
||||
return newTS
|
||||
}
|
||||
|
||||
func (w *WorkloadServices) Name() string {
|
||||
if w.Task != "" {
|
||||
return w.Task
|
||||
}
|
||||
|
||||
return "group-" + w.Group
|
||||
}
|
||||
|
||||
17
command/agent/consul/testing.go
Normal file
17
command/agent/consul/testing.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
func NoopRestarter() WorkloadRestarter {
|
||||
return noopRestarter{}
|
||||
}
|
||||
|
||||
type noopRestarter struct{}
|
||||
|
||||
func (noopRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
|
||||
return nil
|
||||
}
|
||||
@@ -20,15 +20,15 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// Ports used in testTask
|
||||
// Ports used in testWorkload
|
||||
xPort = 1234
|
||||
yPort = 1235
|
||||
)
|
||||
|
||||
func testTask() *TaskServices {
|
||||
return &TaskServices{
|
||||
func testWorkload() *WorkloadServices {
|
||||
return &WorkloadServices{
|
||||
AllocID: uuid.Generate(),
|
||||
Name: "taskname",
|
||||
Task: "taskname",
|
||||
Restarter: &restartRecorder{},
|
||||
Services: []*structs.Service{
|
||||
{
|
||||
@@ -48,7 +48,7 @@ func testTask() *TaskServices {
|
||||
}
|
||||
}
|
||||
|
||||
// restartRecorder is a minimal TaskRestarter implementation that simply
|
||||
// restartRecorder is a minimal WorkloadRestarter implementation that simply
|
||||
// counts how many restarts were triggered.
|
||||
type restartRecorder struct {
|
||||
restarts int64
|
||||
@@ -63,7 +63,7 @@ func (r *restartRecorder) Restart(ctx context.Context, event *structs.TaskEvent,
|
||||
type testFakeCtx struct {
|
||||
ServiceClient *ServiceClient
|
||||
FakeConsul *MockAgent
|
||||
Task *TaskServices
|
||||
Workload *WorkloadServices
|
||||
}
|
||||
|
||||
var errNoOps = fmt.Errorf("testing error: no pending operations")
|
||||
@@ -85,10 +85,10 @@ func (t *testFakeCtx) syncOnce() error {
|
||||
}
|
||||
|
||||
// setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul.
|
||||
// A test Task is also provided.
|
||||
// A test Workload is also provided.
|
||||
func setupFake(t *testing.T) *testFakeCtx {
|
||||
fc := NewMockAgent()
|
||||
tt := testTask()
|
||||
tw := testWorkload()
|
||||
|
||||
// by default start fake client being out of probation
|
||||
sc := NewServiceClient(fc, testlog.HCLogger(t), true)
|
||||
@@ -97,7 +97,7 @@ func setupFake(t *testing.T) *testFakeCtx {
|
||||
return &testFakeCtx{
|
||||
ServiceClient: sc,
|
||||
FakeConsul: fc,
|
||||
Task: tt,
|
||||
Workload: tw,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,35 +105,35 @@ func TestConsul_ChangeTags(t *testing.T) {
|
||||
ctx := setupFake(t)
|
||||
require := require.New(t)
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
||||
|
||||
// Validate the alloc registration
|
||||
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
|
||||
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
|
||||
require.NoError(err)
|
||||
require.NotNil(reg1, "Unexpected nil alloc registration")
|
||||
require.Equal(1, reg1.NumServices())
|
||||
require.Equal(0, reg1.NumChecks())
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
require.Equal(v.Name, ctx.Task.Services[0].Name)
|
||||
require.Equal(v.Tags, ctx.Task.Services[0].Tags)
|
||||
require.Equal(v.Name, ctx.Workload.Services[0].Name)
|
||||
require.Equal(v.Tags, ctx.Workload.Services[0].Tags)
|
||||
}
|
||||
|
||||
// Update the task definition
|
||||
origTask := ctx.Task.Copy()
|
||||
ctx.Task.Services[0].Tags[0] = "newtag"
|
||||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Services[0].Tags[0] = "newtag"
|
||||
|
||||
// Register and sync the update
|
||||
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
|
||||
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
||||
|
||||
// Validate the metadata changed
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
require.Equal(v.Name, ctx.Task.Services[0].Name)
|
||||
require.Equal(v.Tags, ctx.Task.Services[0].Tags)
|
||||
require.Equal(v.Name, ctx.Workload.Services[0].Name)
|
||||
require.Equal(v.Tags, ctx.Workload.Services[0].Tags)
|
||||
require.Equal("newtag", v.Tags[0])
|
||||
}
|
||||
}
|
||||
@@ -145,7 +145,7 @@ func TestConsul_ChangePorts(t *testing.T) {
|
||||
ctx := setupFake(t)
|
||||
require := require.New(t)
|
||||
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "c1",
|
||||
Type: "tcp",
|
||||
@@ -170,13 +170,13 @@ func TestConsul_ChangePorts(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
require.Equal(ctx.Task.Services[0].Name, v.Name)
|
||||
require.Equal(ctx.Task.Services[0].Tags, v.Tags)
|
||||
require.Equal(ctx.Workload.Services[0].Name, v.Name)
|
||||
require.Equal(ctx.Workload.Services[0].Tags, v.Tags)
|
||||
require.Equal(xPort, v.Port)
|
||||
}
|
||||
|
||||
@@ -205,9 +205,9 @@ func TestConsul_ChangePorts(t *testing.T) {
|
||||
require.NotEmpty(origHTTPKey)
|
||||
|
||||
// Now update the PortLabel on the Service and Check c3
|
||||
origTask := ctx.Task.Copy()
|
||||
ctx.Task.Services[0].PortLabel = "y"
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Services[0].PortLabel = "y"
|
||||
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "c1",
|
||||
Type: "tcp",
|
||||
@@ -232,13 +232,13 @@ func TestConsul_ChangePorts(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
|
||||
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
require.Equal(ctx.Task.Services[0].Name, v.Name)
|
||||
require.Equal(ctx.Task.Services[0].Tags, v.Tags)
|
||||
require.Equal(ctx.Workload.Services[0].Name, v.Name)
|
||||
require.Equal(ctx.Workload.Services[0].Tags, v.Tags)
|
||||
require.Equal(yPort, v.Port)
|
||||
}
|
||||
|
||||
@@ -266,7 +266,7 @@ func TestConsul_ChangePorts(t *testing.T) {
|
||||
// properly syncs with Consul.
|
||||
func TestConsul_ChangeChecks(t *testing.T) {
|
||||
ctx := setupFake(t)
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "c1",
|
||||
Type: "tcp",
|
||||
@@ -279,7 +279,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
||||
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
@@ -300,7 +300,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
||||
|
||||
// Query the allocs registrations and then again when we update. The IDs
|
||||
// should change
|
||||
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
|
||||
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
|
||||
if err != nil {
|
||||
t.Fatalf("Looking up alloc registration failed: %v", err)
|
||||
}
|
||||
@@ -317,8 +317,8 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
||||
origServiceKey := ""
|
||||
for k, v := range ctx.FakeConsul.services {
|
||||
origServiceKey = k
|
||||
if v.Name != ctx.Task.Services[0].Name {
|
||||
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
|
||||
if v.Name != ctx.Workload.Services[0].Name {
|
||||
t.Errorf("expected Name=%q != %q", ctx.Workload.Services[0].Name, v.Name)
|
||||
}
|
||||
if v.Port != xPort {
|
||||
t.Errorf("expected Port x=%v but found: %v", xPort, v.Port)
|
||||
@@ -335,8 +335,8 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
||||
}
|
||||
|
||||
// Now add a check and modify the original
|
||||
origTask := ctx.Task.Copy()
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "c1",
|
||||
Type: "tcp",
|
||||
@@ -356,7 +356,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
||||
PortLabel: "x",
|
||||
},
|
||||
}
|
||||
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
|
||||
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
@@ -414,7 +414,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
||||
}
|
||||
|
||||
// Check again and ensure the IDs changed
|
||||
reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
|
||||
reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
|
||||
if err != nil {
|
||||
t.Fatalf("Looking up alloc registration failed: %v", err)
|
||||
}
|
||||
@@ -449,8 +449,8 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
||||
}
|
||||
|
||||
// Alter a CheckRestart and make sure the watcher is updated but nothing else
|
||||
origTask = ctx.Task.Copy()
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
origWorkload = ctx.Workload.Copy()
|
||||
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "c1",
|
||||
Type: "tcp",
|
||||
@@ -470,7 +470,7 @@ func TestConsul_ChangeChecks(t *testing.T) {
|
||||
PortLabel: "x",
|
||||
},
|
||||
}
|
||||
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
|
||||
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
@@ -502,7 +502,7 @@ func TestConsul_RegServices(t *testing.T) {
|
||||
ctx := setupFake(t)
|
||||
|
||||
// Add a check w/restarting
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "testcheck",
|
||||
Type: "tcp",
|
||||
@@ -513,7 +513,7 @@ func TestConsul_RegServices(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
||||
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
@@ -526,11 +526,11 @@ func TestConsul_RegServices(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
if v.Name != ctx.Task.Services[0].Name {
|
||||
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
|
||||
if v.Name != ctx.Workload.Services[0].Name {
|
||||
t.Errorf("expected Name=%q != %q", ctx.Workload.Services[0].Name, v.Name)
|
||||
}
|
||||
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
|
||||
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
|
||||
if !reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) {
|
||||
t.Errorf("expected Tags=%v != %v", ctx.Workload.Services[0].Tags, v.Tags)
|
||||
}
|
||||
if v.Port != xPort {
|
||||
t.Errorf("expected Port=%d != %d", xPort, v.Port)
|
||||
@@ -544,7 +544,7 @@ func TestConsul_RegServices(t *testing.T) {
|
||||
|
||||
// Assert the check update is properly formed
|
||||
checkUpd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
|
||||
if checkUpd.checkRestart.allocID != ctx.Task.AllocID {
|
||||
if checkUpd.checkRestart.allocID != ctx.Workload.AllocID {
|
||||
t.Fatalf("expected check's allocid to be %q but found %q", "allocid", checkUpd.checkRestart.allocID)
|
||||
}
|
||||
if expected := 200 * time.Millisecond; checkUpd.checkRestart.timeLimit != expected {
|
||||
@@ -552,9 +552,9 @@ func TestConsul_RegServices(t *testing.T) {
|
||||
}
|
||||
|
||||
// Make a change which will register a new service
|
||||
ctx.Task.Services[0].Name = "taskname-service2"
|
||||
ctx.Task.Services[0].Tags[0] = "tag3"
|
||||
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
||||
ctx.Workload.Services[0].Name = "taskname-service2"
|
||||
ctx.Workload.Services[0].Tags[0] = "tag3"
|
||||
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
@@ -575,7 +575,7 @@ func TestConsul_RegServices(t *testing.T) {
|
||||
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
|
||||
}
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
if reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
|
||||
if reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) {
|
||||
t.Errorf("expected Tags to differ, changes applied before sync()")
|
||||
}
|
||||
}
|
||||
@@ -589,22 +589,22 @@ func TestConsul_RegServices(t *testing.T) {
|
||||
}
|
||||
found := false
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
if v.Name == ctx.Task.Services[0].Name {
|
||||
if v.Name == ctx.Workload.Services[0].Name {
|
||||
if found {
|
||||
t.Fatalf("found new service name %q twice", v.Name)
|
||||
}
|
||||
found = true
|
||||
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
|
||||
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
|
||||
if !reflect.DeepEqual(v.Tags, ctx.Workload.Services[0].Tags) {
|
||||
t.Errorf("expected Tags=%v != %v", ctx.Workload.Services[0].Tags, v.Tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatalf("did not find new service %q", ctx.Task.Services[0].Name)
|
||||
t.Fatalf("did not find new service %q", ctx.Workload.Services[0].Name)
|
||||
}
|
||||
|
||||
// Remove the new task
|
||||
ctx.ServiceClient.RemoveTask(ctx.Task)
|
||||
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
@@ -744,7 +744,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := setupFake(t)
|
||||
|
||||
ctx.Task.Services = []*structs.Service{
|
||||
ctx.Workload.Services = []*structs.Service{
|
||||
{
|
||||
Name: "auto-advertise-x",
|
||||
PortLabel: "x",
|
||||
@@ -785,7 +785,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
|
||||
ctx.Workload.DriverNetwork = &drivers.DriverNetwork{
|
||||
PortMap: map[string]int{
|
||||
"x": 8888,
|
||||
"y": 9999,
|
||||
@@ -794,7 +794,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
|
||||
AutoAdvertise: true,
|
||||
}
|
||||
|
||||
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
||||
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
@@ -808,14 +808,14 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
switch v.Name {
|
||||
case ctx.Task.Services[0].Name: // x
|
||||
case ctx.Workload.Services[0].Name: // x
|
||||
// Since DriverNetwork.AutoAdvertise=true, driver ports should be used
|
||||
if v.Port != ctx.Task.DriverNetwork.PortMap["x"] {
|
||||
if v.Port != ctx.Workload.DriverNetwork.PortMap["x"] {
|
||||
t.Errorf("expected service %s's port to be %d but found %d",
|
||||
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
|
||||
v.Name, ctx.Workload.DriverNetwork.PortMap["x"], v.Port)
|
||||
}
|
||||
// The order of checks in Consul is not guaranteed to
|
||||
// be the same as their order in the Task definition,
|
||||
// be the same as their order in the Workload definition,
|
||||
// so check in a loop
|
||||
if expected := 2; len(v.Checks) != expected {
|
||||
t.Errorf("expected %d checks but found %d", expected, len(v.Checks))
|
||||
@@ -838,22 +838,22 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
|
||||
t.Errorf("unexpected check %#v on service %q", c, v.Name)
|
||||
}
|
||||
}
|
||||
case ctx.Task.Services[1].Name: // y
|
||||
case ctx.Workload.Services[1].Name: // y
|
||||
// Service should be container ip:port
|
||||
if v.Address != ctx.Task.DriverNetwork.IP {
|
||||
if v.Address != ctx.Workload.DriverNetwork.IP {
|
||||
t.Errorf("expected service %s's address to be %s but found %s",
|
||||
v.Name, ctx.Task.DriverNetwork.IP, v.Address)
|
||||
v.Name, ctx.Workload.DriverNetwork.IP, v.Address)
|
||||
}
|
||||
if v.Port != ctx.Task.DriverNetwork.PortMap["y"] {
|
||||
if v.Port != ctx.Workload.DriverNetwork.PortMap["y"] {
|
||||
t.Errorf("expected service %s's port to be %d but found %d",
|
||||
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
|
||||
v.Name, ctx.Workload.DriverNetwork.PortMap["x"], v.Port)
|
||||
}
|
||||
// Check should be host ip:port
|
||||
if v.Checks[0].TCP != ":1235" { // yPort
|
||||
t.Errorf("expected service %s check's port to be %d but found %s",
|
||||
v.Name, yPort, v.Checks[0].TCP)
|
||||
}
|
||||
case ctx.Task.Services[2].Name: // y + host mode
|
||||
case ctx.Workload.Services[2].Name: // y + host mode
|
||||
if v.Port != yPort {
|
||||
t.Errorf("expected service %s's port to be %d but found %d",
|
||||
v.Name, yPort, v.Port)
|
||||
@@ -871,7 +871,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := setupFake(t)
|
||||
|
||||
ctx.Task.Services = []*structs.Service{
|
||||
ctx.Workload.Services = []*structs.Service{
|
||||
{
|
||||
Name: "auto-advertise-x",
|
||||
PortLabel: "x",
|
||||
@@ -889,7 +889,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
|
||||
ctx.Workload.DriverNetwork = &drivers.DriverNetwork{
|
||||
PortMap: map[string]int{
|
||||
"x": 8888,
|
||||
"y": 9999,
|
||||
@@ -898,7 +898,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
|
||||
AutoAdvertise: false,
|
||||
}
|
||||
|
||||
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
||||
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
@@ -912,23 +912,23 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
switch v.Name {
|
||||
case ctx.Task.Services[0].Name: // x + auto
|
||||
case ctx.Workload.Services[0].Name: // x + auto
|
||||
// Since DriverNetwork.AutoAdvertise=false, host ports should be used
|
||||
if v.Port != xPort {
|
||||
t.Errorf("expected service %s's port to be %d but found %d",
|
||||
v.Name, xPort, v.Port)
|
||||
}
|
||||
case ctx.Task.Services[1].Name: // y + driver mode
|
||||
case ctx.Workload.Services[1].Name: // y + driver mode
|
||||
// Service should be container ip:port
|
||||
if v.Address != ctx.Task.DriverNetwork.IP {
|
||||
if v.Address != ctx.Workload.DriverNetwork.IP {
|
||||
t.Errorf("expected service %s's address to be %s but found %s",
|
||||
v.Name, ctx.Task.DriverNetwork.IP, v.Address)
|
||||
v.Name, ctx.Workload.DriverNetwork.IP, v.Address)
|
||||
}
|
||||
if v.Port != ctx.Task.DriverNetwork.PortMap["y"] {
|
||||
if v.Port != ctx.Workload.DriverNetwork.PortMap["y"] {
|
||||
t.Errorf("expected service %s's port to be %d but found %d",
|
||||
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
|
||||
v.Name, ctx.Workload.DriverNetwork.PortMap["x"], v.Port)
|
||||
}
|
||||
case ctx.Task.Services[2].Name: // y + host mode
|
||||
case ctx.Workload.Services[2].Name: // y + host mode
|
||||
if v.Port != yPort {
|
||||
t.Errorf("expected service %s's port to be %d but found %d",
|
||||
v.Name, yPort, v.Port)
|
||||
@@ -945,7 +945,7 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := setupFake(t)
|
||||
|
||||
ctx.Task.Services = []*structs.Service{
|
||||
ctx.Workload.Services = []*structs.Service{
|
||||
{
|
||||
Name: "service-foo",
|
||||
PortLabel: "x",
|
||||
@@ -953,7 +953,7 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
|
||||
ctx.Workload.DriverNetwork = &drivers.DriverNetwork{
|
||||
PortMap: map[string]int{
|
||||
"x": 8888,
|
||||
"y": 9999,
|
||||
@@ -973,7 +973,7 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
switch v.Name {
|
||||
case ctx.Task.Services[0].Name:
|
||||
case ctx.Workload.Services[0].Name:
|
||||
if v.Port != port {
|
||||
t.Errorf("expected service %s's port to be %d but found %d",
|
||||
v.Name, port, v.Port)
|
||||
@@ -985,31 +985,31 @@ func TestConsul_DriverNetwork_Change(t *testing.T) {
|
||||
}
|
||||
|
||||
// Initial service should advertise host port x
|
||||
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
||||
if err := ctx.ServiceClient.RegisterWorkload(ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
syncAndAssertPort(xPort)
|
||||
|
||||
// UpdateTask to use Host (shouldn't change anything)
|
||||
origTask := ctx.Task.Copy()
|
||||
ctx.Task.Services[0].AddressMode = structs.AddressModeHost
|
||||
// UpdateWorkload to use Host (shouldn't change anything)
|
||||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Services[0].AddressMode = structs.AddressModeHost
|
||||
|
||||
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
|
||||
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error updating task: %v", err)
|
||||
}
|
||||
|
||||
syncAndAssertPort(xPort)
|
||||
|
||||
// UpdateTask to use Driver (*should* change IP and port)
|
||||
origTask = ctx.Task.Copy()
|
||||
ctx.Task.Services[0].AddressMode = structs.AddressModeDriver
|
||||
// UpdateWorkload to use Driver (*should* change IP and port)
|
||||
origWorkload = ctx.Workload.Copy()
|
||||
ctx.Workload.Services[0].AddressMode = structs.AddressModeDriver
|
||||
|
||||
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
|
||||
if err := ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload); err != nil {
|
||||
t.Fatalf("unexpected error updating task: %v", err)
|
||||
}
|
||||
|
||||
syncAndAssertPort(ctx.Task.DriverNetwork.PortMap["x"])
|
||||
syncAndAssertPort(ctx.Workload.DriverNetwork.PortMap["x"])
|
||||
}
|
||||
|
||||
// TestConsul_CanaryTags asserts CanaryTags are used when Canary=true
|
||||
@@ -1019,10 +1019,10 @@ func TestConsul_CanaryTags(t *testing.T) {
|
||||
ctx := setupFake(t)
|
||||
|
||||
canaryTags := []string{"tag1", "canary"}
|
||||
ctx.Task.Canary = true
|
||||
ctx.Task.Services[0].CanaryTags = canaryTags
|
||||
ctx.Workload.Canary = true
|
||||
ctx.Workload.Services[0].CanaryTags = canaryTags
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
@@ -1030,16 +1030,16 @@ func TestConsul_CanaryTags(t *testing.T) {
|
||||
}
|
||||
|
||||
// Disable canary and assert tags are not the canary tags
|
||||
origTask := ctx.Task.Copy()
|
||||
ctx.Task.Canary = false
|
||||
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
|
||||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Canary = false
|
||||
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
require.NotEqual(canaryTags, service.Tags)
|
||||
}
|
||||
|
||||
ctx.ServiceClient.RemoveTask(ctx.Task)
|
||||
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 0)
|
||||
}
|
||||
@@ -1052,10 +1052,10 @@ func TestConsul_CanaryTags_NoTags(t *testing.T) {
|
||||
ctx := setupFake(t)
|
||||
|
||||
tags := []string{"tag1", "foo"}
|
||||
ctx.Task.Canary = true
|
||||
ctx.Task.Services[0].Tags = tags
|
||||
ctx.Workload.Canary = true
|
||||
ctx.Workload.Services[0].Tags = tags
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
@@ -1063,16 +1063,16 @@ func TestConsul_CanaryTags_NoTags(t *testing.T) {
|
||||
}
|
||||
|
||||
// Disable canary and assert tags dont change
|
||||
origTask := ctx.Task.Copy()
|
||||
ctx.Task.Canary = false
|
||||
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
|
||||
origWorkload := ctx.Workload.Copy()
|
||||
ctx.Workload.Canary = false
|
||||
require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
for _, service := range ctx.FakeConsul.services {
|
||||
require.Equal(tags, service.Tags)
|
||||
}
|
||||
|
||||
ctx.ServiceClient.RemoveTask(ctx.Task)
|
||||
ctx.ServiceClient.RemoveWorkload(ctx.Workload)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 0)
|
||||
}
|
||||
@@ -1402,7 +1402,7 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) {
|
||||
ctx := setupFake(t)
|
||||
require := require.New(t)
|
||||
|
||||
ctx.Task.Services = []*structs.Service{
|
||||
ctx.Workload.Services = []*structs.Service{
|
||||
{
|
||||
Name: "best-service",
|
||||
PortLabel: "x",
|
||||
@@ -1439,20 +1439,20 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
|
||||
|
||||
require.NoError(ctx.syncOnce())
|
||||
|
||||
require.Len(ctx.FakeConsul.services, 3)
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
if v.Name == ctx.Task.Services[0].Name && v.Port == xPort {
|
||||
require.ElementsMatch(v.Tags, ctx.Task.Services[0].Tags)
|
||||
if v.Name == ctx.Workload.Services[0].Name && v.Port == xPort {
|
||||
require.ElementsMatch(v.Tags, ctx.Workload.Services[0].Tags)
|
||||
require.Len(v.Checks, 1)
|
||||
} else if v.Name == ctx.Task.Services[1].Name && v.Port == yPort {
|
||||
require.ElementsMatch(v.Tags, ctx.Task.Services[1].Tags)
|
||||
} else if v.Name == ctx.Workload.Services[1].Name && v.Port == yPort {
|
||||
require.ElementsMatch(v.Tags, ctx.Workload.Services[1].Tags)
|
||||
require.Len(v.Checks, 1)
|
||||
} else if v.Name == ctx.Task.Services[2].Name {
|
||||
} else if v.Name == ctx.Workload.Services[2].Name {
|
||||
require.Len(v.Checks, 0)
|
||||
}
|
||||
}
|
||||
@@ -1467,8 +1467,8 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
|
||||
|
||||
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
|
||||
|
||||
remainingTask := testTask()
|
||||
remainingTask.Services = []*structs.Service{
|
||||
remainingWorkload := testWorkload()
|
||||
remainingWorkload.Services = []*structs.Service{
|
||||
{
|
||||
Name: "remaining-service",
|
||||
PortLabel: "x",
|
||||
@@ -1482,16 +1482,16 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
remainingTaskServiceID := MakeTaskServiceID(remainingTask.AllocID,
|
||||
remainingTask.Name, remainingTask.Services[0])
|
||||
remainingWorkloadServiceID := MakeAllocServiceID(remainingWorkload.AllocID,
|
||||
remainingWorkload.Name(), remainingWorkload.Services[0])
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
explicitlyRemovedTask := testTask()
|
||||
explicitlyRemovedTask.Services = []*structs.Service{
|
||||
explicitlyRemovedWorkload := testWorkload()
|
||||
explicitlyRemovedWorkload.Services = []*structs.Service{
|
||||
{
|
||||
Name: "explicitly-removed-service",
|
||||
PortLabel: "y",
|
||||
@@ -1505,18 +1505,18 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
explicitlyRemovedTaskServiceID := MakeTaskServiceID(explicitlyRemovedTask.AllocID,
|
||||
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0])
|
||||
explicitlyRemovedWorkloadServiceID := MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
|
||||
explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0])
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
|
||||
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 2)
|
||||
require.Len(ctx.FakeConsul.checks, 2)
|
||||
|
||||
// we register a task through nomad API then remove it out of band
|
||||
outofbandTask := testTask()
|
||||
outofbandTask.Services = []*structs.Service{
|
||||
outofbandWorkload := testWorkload()
|
||||
outofbandWorkload.Services = []*structs.Service{
|
||||
{
|
||||
Name: "unknown-service",
|
||||
PortLabel: "x",
|
||||
@@ -1530,39 +1530,39 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
outofbandTaskServiceID := MakeTaskServiceID(outofbandTask.AllocID,
|
||||
outofbandTask.Name, outofbandTask.Services[0])
|
||||
outofbandWorkloadServiceID := MakeAllocServiceID(outofbandWorkload.AllocID,
|
||||
outofbandWorkload.Name(), outofbandWorkload.Services[0])
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
|
||||
require.Len(ctx.FakeConsul.services, 3)
|
||||
|
||||
// remove outofbandTask from local services so it appears unknown to client
|
||||
// remove outofbandWorkload from local services so it appears unknown to client
|
||||
require.Len(ctx.ServiceClient.services, 3)
|
||||
require.Len(ctx.ServiceClient.checks, 3)
|
||||
|
||||
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
|
||||
delete(ctx.ServiceClient.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
||||
delete(ctx.ServiceClient.services, outofbandWorkloadServiceID)
|
||||
delete(ctx.ServiceClient.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
|
||||
|
||||
require.Len(ctx.ServiceClient.services, 2)
|
||||
require.Len(ctx.ServiceClient.checks, 2)
|
||||
|
||||
// Sync and ensure that explicitly removed service as well as outofbandTask were removed
|
||||
// Sync and ensure that explicitly removed service as well as outofbandWorkload were removed
|
||||
|
||||
ctx.ServiceClient.RemoveTask(explicitlyRemovedTask)
|
||||
ctx.ServiceClient.RemoveWorkload(explicitlyRemovedWorkload)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.ServiceClient.sync())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
|
||||
require.Contains(ctx.FakeConsul.services, remainingWorkloadServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, outofbandWorkloadServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedWorkloadServiceID)
|
||||
|
||||
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
|
||||
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
|
||||
}
|
||||
|
||||
// TestConsul_ServiceDeregistration_InProbation asserts that during initialization
|
||||
@@ -1576,8 +1576,8 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
||||
|
||||
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(1 * time.Hour)
|
||||
|
||||
remainingTask := testTask()
|
||||
remainingTask.Services = []*structs.Service{
|
||||
remainingWorkload := testWorkload()
|
||||
remainingWorkload.Services = []*structs.Service{
|
||||
{
|
||||
Name: "remaining-service",
|
||||
PortLabel: "x",
|
||||
@@ -1591,16 +1591,16 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
remainingTaskServiceID := MakeTaskServiceID(remainingTask.AllocID,
|
||||
remainingTask.Name, remainingTask.Services[0])
|
||||
remainingWorkloadServiceID := MakeAllocServiceID(remainingWorkload.AllocID,
|
||||
remainingWorkload.Name(), remainingWorkload.Services[0])
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
explicitlyRemovedTask := testTask()
|
||||
explicitlyRemovedTask.Services = []*structs.Service{
|
||||
explicitlyRemovedWorkload := testWorkload()
|
||||
explicitlyRemovedWorkload.Services = []*structs.Service{
|
||||
{
|
||||
Name: "explicitly-removed-service",
|
||||
PortLabel: "y",
|
||||
@@ -1614,18 +1614,18 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
explicitlyRemovedTaskServiceID := MakeTaskServiceID(explicitlyRemovedTask.AllocID,
|
||||
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0])
|
||||
explicitlyRemovedWorkloadServiceID := MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
|
||||
explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0])
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
|
||||
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.Len(ctx.FakeConsul.services, 2)
|
||||
require.Len(ctx.FakeConsul.checks, 2)
|
||||
|
||||
// we register a task through nomad API then remove it out of band
|
||||
outofbandTask := testTask()
|
||||
outofbandTask.Services = []*structs.Service{
|
||||
outofbandWorkload := testWorkload()
|
||||
outofbandWorkload.Services = []*structs.Service{
|
||||
{
|
||||
Name: "unknown-service",
|
||||
PortLabel: "x",
|
||||
@@ -1639,39 +1639,39 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
outofbandTaskServiceID := MakeTaskServiceID(outofbandTask.AllocID,
|
||||
outofbandTask.Name, outofbandTask.Services[0])
|
||||
outofbandWorkloadServiceID := MakeAllocServiceID(outofbandWorkload.AllocID,
|
||||
outofbandWorkload.Name(), outofbandWorkload.Services[0])
|
||||
|
||||
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
|
||||
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
|
||||
require.NoError(ctx.syncOnce())
|
||||
|
||||
require.Len(ctx.FakeConsul.services, 3)
|
||||
|
||||
// remove outofbandTask from local services so it appears unknown to client
|
||||
// remove outofbandWorkload from local services so it appears unknown to client
|
||||
require.Len(ctx.ServiceClient.services, 3)
|
||||
require.Len(ctx.ServiceClient.checks, 3)
|
||||
|
||||
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
|
||||
delete(ctx.ServiceClient.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
||||
delete(ctx.ServiceClient.services, outofbandWorkloadServiceID)
|
||||
delete(ctx.ServiceClient.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
|
||||
|
||||
require.Len(ctx.ServiceClient.services, 2)
|
||||
require.Len(ctx.ServiceClient.checks, 2)
|
||||
|
||||
// Sync and ensure that explicitly removed service was removed, but outofbandTask remains
|
||||
// Sync and ensure that explicitly removed service was removed, but outofbandWorkload remains
|
||||
|
||||
ctx.ServiceClient.RemoveTask(explicitlyRemovedTask)
|
||||
ctx.ServiceClient.RemoveWorkload(explicitlyRemovedWorkload)
|
||||
require.NoError(ctx.syncOnce())
|
||||
require.NoError(ctx.ServiceClient.sync())
|
||||
require.Len(ctx.FakeConsul.services, 2)
|
||||
require.Len(ctx.FakeConsul.checks, 2)
|
||||
|
||||
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
|
||||
require.Contains(ctx.FakeConsul.services, outofbandTaskServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
|
||||
require.Contains(ctx.FakeConsul.services, remainingWorkloadServiceID)
|
||||
require.Contains(ctx.FakeConsul.services, outofbandWorkloadServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedWorkloadServiceID)
|
||||
|
||||
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
|
||||
require.Contains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
|
||||
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
|
||||
require.Contains(ctx.FakeConsul.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
|
||||
|
||||
// after probation, outofband services and checks are removed
|
||||
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
|
||||
@@ -1680,12 +1680,12 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
||||
require.Len(ctx.FakeConsul.services, 1)
|
||||
require.Len(ctx.FakeConsul.checks, 1)
|
||||
|
||||
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
|
||||
require.Contains(ctx.FakeConsul.services, remainingWorkloadServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, outofbandWorkloadServiceID)
|
||||
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedWorkloadServiceID)
|
||||
|
||||
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
|
||||
require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingWorkloadServiceID, remainingWorkload.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandWorkloadServiceID, outofbandWorkload.Services[0].Checks[0]))
|
||||
require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedWorkloadServiceID, explicitlyRemovedWorkload.Services[0].Checks[0]))
|
||||
|
||||
}
|
||||
|
||||
@@ -4455,6 +4455,7 @@ func TestJobEndpoint_ValidateJob_ConsulConnect(t *testing.T) {
|
||||
|
||||
tg := j.TaskGroups[0]
|
||||
tg.Services = tgServices
|
||||
tg.Networks = nil
|
||||
|
||||
err := validateJob(j)
|
||||
require.Error(t, err)
|
||||
|
||||
@@ -3629,15 +3629,16 @@ func (j *Job) LookupTaskGroup(name string) *TaskGroup {
|
||||
func (j *Job) CombinedTaskMeta(groupName, taskName string) map[string]string {
|
||||
group := j.LookupTaskGroup(groupName)
|
||||
if group == nil {
|
||||
return nil
|
||||
return j.Meta
|
||||
}
|
||||
|
||||
var meta map[string]string
|
||||
|
||||
task := group.LookupTask(taskName)
|
||||
if task == nil {
|
||||
return nil
|
||||
if task != nil {
|
||||
meta = helper.CopyMapStringString(task.Meta)
|
||||
}
|
||||
|
||||
meta := helper.CopyMapStringString(task.Meta)
|
||||
if meta == nil {
|
||||
meta = make(map[string]string, len(group.Meta)+len(j.Meta))
|
||||
}
|
||||
|
||||
@@ -2565,6 +2565,51 @@ func TestJob_ExpandServiceNames(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestJob_CombinedTaskMeta(t *testing.T) {
|
||||
j := &Job{
|
||||
Meta: map[string]string{
|
||||
"job_test": "job",
|
||||
"group_test": "job",
|
||||
"task_test": "job",
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: "group",
|
||||
Meta: map[string]string{
|
||||
"group_test": "group",
|
||||
"task_test": "group",
|
||||
},
|
||||
Tasks: []*Task{
|
||||
{
|
||||
Name: "task",
|
||||
Meta: map[string]string{
|
||||
"task_test": "task",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
require := require.New(t)
|
||||
require.EqualValues(map[string]string{
|
||||
"job_test": "job",
|
||||
"group_test": "group",
|
||||
"task_test": "task",
|
||||
}, j.CombinedTaskMeta("group", "task"))
|
||||
require.EqualValues(map[string]string{
|
||||
"job_test": "job",
|
||||
"group_test": "group",
|
||||
"task_test": "group",
|
||||
}, j.CombinedTaskMeta("group", ""))
|
||||
require.EqualValues(map[string]string{
|
||||
"job_test": "job",
|
||||
"group_test": "job",
|
||||
"task_test": "job",
|
||||
}, j.CombinedTaskMeta("", "task"))
|
||||
|
||||
}
|
||||
|
||||
func TestPeriodicConfig_EnabledInvalid(t *testing.T) {
|
||||
// Create a config that is enabled but with no interval specified.
|
||||
p := &PeriodicConfig{Enabled: true}
|
||||
|
||||
Reference in New Issue
Block a user