diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go index 7c17b7e36..2bd727180 100644 --- a/client/allochealth/tracker.go +++ b/client/allochealth/tracker.go @@ -9,9 +9,8 @@ import ( "github.com/hashicorp/consul/api" hclog "github.com/hashicorp/go-hclog" - cconsul "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/serviceregistration" cstructs "github.com/hashicorp/nomad/client/structs" - "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" ) @@ -56,7 +55,7 @@ type Tracker struct { allocUpdates *cstructs.AllocListener // consulClient is used to look up the state of the task's checks - consulClient cconsul.ConsulServiceAPI + consulClient serviceregistration.Handler // healthy is used to signal whether we have determined the allocation to be // healthy or unhealthy @@ -93,7 +92,7 @@ type Tracker struct { // listener and consul API object are given so that the watcher can detect // health changes. func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.Allocation, - allocUpdates *cstructs.AllocListener, consulClient cconsul.ConsulServiceAPI, + allocUpdates *cstructs.AllocListener, consulClient serviceregistration.Handler, minHealthyTime time.Duration, useChecks bool) *Tracker { // Do not create a named sub-logger as the hook controlling @@ -377,7 +376,7 @@ func (t *Tracker) watchConsulEvents() { consulChecksErr := false // allocReg are the registered objects in Consul for the allocation - var allocReg *consul.AllocRegistration + var allocReg *serviceregistration.AllocRegistration OUTER: for { @@ -482,7 +481,7 @@ OUTER: type taskHealthState struct { task *structs.Task state *structs.TaskState - taskRegistrations *consul.ServiceRegistrations + taskRegistrations *serviceregistration.ServiceRegistrations } // event takes the deadline time for the allocation to be healthy and the update diff --git a/client/allochealth/tracker_test.go b/client/allochealth/tracker_test.go index f4aec166d..f40d016d5 100644 --- a/client/allochealth/tracker_test.go +++ b/client/allochealth/tracker_test.go @@ -8,9 +8,9 @@ import ( "time" consulapi "github.com/hashicorp/consul/api" - "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/serviceregistration" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" cstructs "github.com/hashicorp/nomad/client/structs" - agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -39,9 +39,9 @@ func TestTracker_Checks_Healthy(t *testing.T) { Name: task.Services[0].Checks[0].Name, Status: consulapi.HealthPassing, } - taskRegs := map[string]*agentconsul.ServiceRegistrations{ + taskRegs := map[string]*serviceregistration.ServiceRegistrations{ task.Name: { - Services: map[string]*agentconsul.ServiceRegistration{ + Services: map[string]*serviceregistration.ServiceRegistration{ task.Services[0].Name: { Service: &consulapi.AgentService{ ID: "foo", @@ -59,13 +59,13 @@ func TestTracker_Checks_Healthy(t *testing.T) { // Don't reply on the first call var called uint64 - consul := consul.NewMockConsulServiceClient(t, logger) - consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + consul := regMock.NewServiceRegistrationHandler(logger) + consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) { if atomic.AddUint64(&called, 1) == 1 { return nil, nil } - reg := &agentconsul.AllocRegistration{ + reg := &serviceregistration.AllocRegistration{ Tasks: taskRegs, } @@ -111,7 +111,7 @@ func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) { b := cstructs.NewAllocBroadcaster(logger) defer b.Close() - consul := consul.NewMockConsulServiceClient(t, logger) + consul := regMock.NewServiceRegistrationHandler(logger) ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() @@ -152,7 +152,7 @@ func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) { b := cstructs.NewAllocBroadcaster(logger) defer b.Close() - consul := consul.NewMockConsulServiceClient(t, logger) + consul := regMock.NewServiceRegistrationHandler(logger) ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() @@ -199,9 +199,9 @@ func TestTracker_Checks_Unhealthy(t *testing.T) { Name: task.Services[0].Checks[1].Name, Status: consulapi.HealthCritical, } - taskRegs := map[string]*agentconsul.ServiceRegistrations{ + taskRegs := map[string]*serviceregistration.ServiceRegistrations{ task.Name: { - Services: map[string]*agentconsul.ServiceRegistration{ + Services: map[string]*serviceregistration.ServiceRegistration{ task.Services[0].Name: { Service: &consulapi.AgentService{ ID: "foo", @@ -219,13 +219,13 @@ func TestTracker_Checks_Unhealthy(t *testing.T) { // Don't reply on the first call var called uint64 - consul := consul.NewMockConsulServiceClient(t, logger) - consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + consul := regMock.NewServiceRegistrationHandler(logger) + consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) { if atomic.AddUint64(&called, 1) == 1 { return nil, nil } - reg := &agentconsul.AllocRegistration{ + reg := &serviceregistration.AllocRegistration{ Tasks: taskRegs, } @@ -341,9 +341,9 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) { Name: task.Services[0].Checks[0].Name, Status: consulapi.HealthPassing, } - taskRegs := map[string]*agentconsul.ServiceRegistrations{ + taskRegs := map[string]*serviceregistration.ServiceRegistrations{ task.Name: { - Services: map[string]*agentconsul.ServiceRegistration{ + Services: map[string]*serviceregistration.ServiceRegistration{ task.Services[0].Name: { Service: &consulapi.AgentService{ ID: "foo", @@ -361,13 +361,13 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) { // Don't reply on the first call var called uint64 - consul := consul.NewMockConsulServiceClient(t, logger) - consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + consul := regMock.NewServiceRegistrationHandler(logger) + consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) { if atomic.AddUint64(&called, 1) == 1 { return nil, nil } - reg := &agentconsul.AllocRegistration{ + reg := &serviceregistration.AllocRegistration{ Tasks: taskRegs, } @@ -480,9 +480,9 @@ func TestTracker_Checks_OnUpdate(t *testing.T) { Name: task.Services[0].Checks[0].Name, Status: tc.consulResp, } - taskRegs := map[string]*agentconsul.ServiceRegistrations{ + taskRegs := map[string]*serviceregistration.ServiceRegistrations{ task.Name: { - Services: map[string]*agentconsul.ServiceRegistration{ + Services: map[string]*serviceregistration.ServiceRegistration{ task.Services[0].Name: { Service: &consulapi.AgentService{ ID: "foo", @@ -503,13 +503,13 @@ func TestTracker_Checks_OnUpdate(t *testing.T) { // Don't reply on the first call var called uint64 - consul := consul.NewMockConsulServiceClient(t, logger) - consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + consul := regMock.NewServiceRegistrationHandler(logger) + consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) { if atomic.AddUint64(&called, 1) == 1 { return nil, nil } - reg := &agentconsul.AllocRegistration{ + reg := &serviceregistration.AllocRegistration{ Tasks: taskRegs, } diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index c846b0916..17971c72b 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -22,6 +22,7 @@ import ( cinterfaces "github.com/hashicorp/nomad/client/interfaces" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + "github.com/hashicorp/nomad/client/serviceregistration" cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" @@ -63,7 +64,7 @@ type allocRunner struct { // consulClient is the client used by the consul service hook for // registering services and checks - consulClient consul.ConsulServiceAPI + consulClient serviceregistration.Handler // consulProxiesClient is the client used by the envoy version hook for // looking up supported envoy versions of the consul agent. diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 749f72078..23d59dc69 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -11,9 +11,9 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/client/allochealth" "github.com/hashicorp/nomad/client/allocwatcher" - cconsul "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/serviceregistration" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/state" - "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -577,9 +577,9 @@ func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) { }) // Get consul client operations - consulClient := conf.Consul.(*cconsul.MockConsulServiceClient) + consulClient := conf.Consul.(*regMock.ServiceRegistrationHandler) consulOpts := consulClient.GetOps() - var groupRemoveOp cconsul.MockConsulOp + var groupRemoveOp regMock.Operation for _, op := range consulOpts { // Grab the first deregistration request if op.Op == "remove" && op.Name == "group-web" { @@ -1030,12 +1030,12 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) { defer cleanup() // Only return the check as healthy after a duration - consulClient := conf.Consul.(*cconsul.MockConsulServiceClient) - consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { - return &consul.AllocRegistration{ - Tasks: map[string]*consul.ServiceRegistrations{ + consulClient := conf.Consul.(*regMock.ServiceRegistrationHandler) + consulClient.AllocRegistrationsFn = func(allocID string) (*serviceregistration.AllocRegistration, error) { + return &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ task.Name: { - Services: map[string]*consul.ServiceRegistration{ + Services: map[string]*serviceregistration.ServiceRegistration{ "123": { Service: &api.AgentService{Service: "fakeservice"}, Checks: []*api.AgentCheck{checkUnhealthy}, @@ -1352,12 +1352,12 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() - consulClient := conf.Consul.(*cconsul.MockConsulServiceClient) - consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { - return &consul.AllocRegistration{ - Tasks: map[string]*consul.ServiceRegistrations{ + consulClient := conf.Consul.(*regMock.ServiceRegistrationHandler) + consulClient.AllocRegistrationsFn = func(allocID string) (*serviceregistration.AllocRegistration, error) { + return &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ task.Name: { - Services: map[string]*consul.ServiceRegistration{ + Services: map[string]*serviceregistration.ServiceRegistration{ "123": { Service: &api.AgentService{Service: "fakeservice"}, Checks: []*api.AgentCheck{checkHealthy}, diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index 41b5dabc8..63d06fd3f 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -11,7 +11,7 @@ import ( "testing" "time" - "github.com/hashicorp/nomad/client/consul" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -125,7 +125,7 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) { // - removal during exited is de-duped due to prekill // - removal during stop is de-duped due to prekill // 1 removal group during stop - consulOps := conf2.Consul.(*consul.MockConsulServiceClient).GetOps() + consulOps := conf2.Consul.(*regMock.ServiceRegistrationHandler).GetOps() require.Len(t, consulOps, 2) for _, op := range consulOps { require.Equal(t, "remove", op.Op) diff --git a/client/allocrunner/config.go b/client/allocrunner/config.go index 343d4eec5..d1500c906 100644 --- a/client/allocrunner/config.go +++ b/client/allocrunner/config.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + "github.com/hashicorp/nomad/client/serviceregistration" cstate "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" @@ -31,7 +32,7 @@ type Config struct { StateDB cstate.StateDB // Consul is the Consul client used to register task services and checks - Consul consul.ConsulServiceAPI + Consul serviceregistration.Handler // ConsulProxies is the Consul client used to lookup supported envoy versions // of the Consul agent. diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go index 69eae41e8..1d5a62053 100644 --- a/client/allocrunner/groupservice_hook.go +++ b/client/allocrunner/groupservice_hook.go @@ -7,7 +7,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" - "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/taskenv" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" @@ -27,7 +27,7 @@ type groupServiceHook struct { allocID string group string restarter agentconsul.WorkloadRestarter - consulClient consul.ConsulServiceAPI + consulClient serviceregistration.Handler consulNamespace string prerun bool deregistered bool @@ -51,7 +51,7 @@ type groupServiceHook struct { type groupServiceHookConfig struct { alloc *structs.Allocation - consul consul.ConsulServiceAPI + consul serviceregistration.Handler consulNamespace string restarter agentconsul.WorkloadRestarter taskEnvBuilder *taskenv.Builder @@ -217,7 +217,7 @@ func (h *groupServiceHook) deregister() { } } -func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices { +func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadServices { // Interpolate with the task's environment interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services) @@ -227,15 +227,15 @@ func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices { } // Create task services struct with request's driver metadata - return &agentconsul.WorkloadServices{ - AllocID: h.allocID, - Group: h.group, - ConsulNamespace: h.consulNamespace, - Restarter: h.restarter, - Services: interpolatedServices, - Networks: h.networks, - NetworkStatus: netStatus, - Ports: h.ports, - Canary: h.canary, + return &serviceregistration.WorkloadServices{ + AllocID: h.allocID, + Group: h.group, + Namespace: h.consulNamespace, + Restarter: h.restarter, + Services: interpolatedServices, + Networks: h.networks, + NetworkStatus: netStatus, + Ports: h.ports, + Canary: h.canary, } } diff --git a/client/allocrunner/groupservice_hook_test.go b/client/allocrunner/groupservice_hook_test.go index 61d9a38b4..4fae46a30 100644 --- a/client/allocrunner/groupservice_hook_test.go +++ b/client/allocrunner/groupservice_hook_test.go @@ -8,7 +8,7 @@ import ( consulapi "github.com/hashicorp/consul/api" ctestutil "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/nomad/client/allocrunner/interfaces" - "github.com/hashicorp/nomad/client/consul" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/taskenv" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" @@ -35,7 +35,7 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { PortLabel: "9999", }} logger := testlog.HCLogger(t) - consulClient := consul.NewMockConsulServiceClient(t, logger) + consulClient := regMock.NewServiceRegistrationHandler(logger) h := newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, @@ -71,7 +71,7 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) { alloc.Job.TaskGroups[0].ShutdownDelay = helper.TimeToPtr(10 * time.Second) logger := testlog.HCLogger(t) - consulClient := consul.NewMockConsulServiceClient(t, logger) + consulClient := regMock.NewServiceRegistrationHandler(logger) h := newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, @@ -106,7 +106,7 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { alloc := mock.ConnectAlloc() logger := testlog.HCLogger(t) - consulClient := consul.NewMockConsulServiceClient(t, logger) + consulClient := regMock.NewServiceRegistrationHandler(logger) h := newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, @@ -152,7 +152,7 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { } logger := testlog.HCLogger(t) - consulClient := consul.NewMockConsulServiceClient(t, logger) + consulClient := regMock.NewServiceRegistrationHandler(logger) h := newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, @@ -196,7 +196,7 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) { } logger := testlog.HCLogger(t) - consulClient := consul.NewMockConsulServiceClient(t, logger) + consulClient := regMock.NewServiceRegistrationHandler(logger) h := newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, diff --git a/client/allocrunner/health_hook.go b/client/allocrunner/health_hook.go index feabfba2c..4c2bef43a 100644 --- a/client/allocrunner/health_hook.go +++ b/client/allocrunner/health_hook.go @@ -9,7 +9,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allochealth" "github.com/hashicorp/nomad/client/allocrunner/interfaces" - "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/serviceregistration" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -33,7 +33,7 @@ type allocHealthWatcherHook struct { healthSetter healthSetter // consul client used to monitor health checks - consul consul.ConsulServiceAPI + consul serviceregistration.Handler // listener is given to trackers to listen for alloc updates and closed // when the alloc is destroyed. @@ -68,7 +68,7 @@ type allocHealthWatcherHook struct { } func newAllocHealthWatcherHook(logger log.Logger, alloc *structs.Allocation, hs healthSetter, - listener *cstructs.AllocListener, consul consul.ConsulServiceAPI) interfaces.RunnerHook { + listener *cstructs.AllocListener, consul serviceregistration.Handler) interfaces.RunnerHook { // Neither deployments nor migrations care about the health of // non-service jobs so never watch their health diff --git a/client/allocrunner/health_hook_test.go b/client/allocrunner/health_hook_test.go index 62c148925..0f11e072c 100644 --- a/client/allocrunner/health_hook_test.go +++ b/client/allocrunner/health_hook_test.go @@ -7,9 +7,9 @@ import ( consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/client/allocrunner/interfaces" - "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/serviceregistration" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" cstructs "github.com/hashicorp/nomad/client/structs" - agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -92,7 +92,7 @@ func TestHealthHook_PrerunPostrun(t *testing.T) { b := cstructs.NewAllocBroadcaster(logger) defer b.Close() - consul := consul.NewMockConsulServiceClient(t, logger) + consul := regMock.NewServiceRegistrationHandler(logger) hs := &mockHealthSetter{} h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul) @@ -130,7 +130,7 @@ func TestHealthHook_PrerunUpdatePostrun(t *testing.T) { b := cstructs.NewAllocBroadcaster(logger) defer b.Close() - consul := consul.NewMockConsulServiceClient(t, logger) + consul := regMock.NewServiceRegistrationHandler(logger) hs := &mockHealthSetter{} h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook) @@ -169,7 +169,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) { b := cstructs.NewAllocBroadcaster(logger) defer b.Close() - consul := consul.NewMockConsulServiceClient(t, logger) + consul := regMock.NewServiceRegistrationHandler(logger) hs := &mockHealthSetter{} h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook) @@ -210,7 +210,7 @@ func TestHealthHook_Postrun(t *testing.T) { b := cstructs.NewAllocBroadcaster(logger) defer b.Close() - consul := consul.NewMockConsulServiceClient(t, logger) + consul := regMock.NewServiceRegistrationHandler(logger) hs := &mockHealthSetter{} h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul).(*allocHealthWatcherHook) @@ -243,9 +243,9 @@ func TestHealthHook_SetHealth_healthy(t *testing.T) { Name: task.Services[0].Checks[0].Name, Status: consulapi.HealthPassing, } - taskRegs := map[string]*agentconsul.ServiceRegistrations{ + taskRegs := map[string]*serviceregistration.ServiceRegistrations{ task.Name: { - Services: map[string]*agentconsul.ServiceRegistration{ + Services: map[string]*serviceregistration.ServiceRegistration{ task.Services[0].Name: { Service: &consulapi.AgentService{ ID: "foo", @@ -263,14 +263,14 @@ func TestHealthHook_SetHealth_healthy(t *testing.T) { // Don't reply on the first call called := false - consul := consul.NewMockConsulServiceClient(t, logger) - consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + consul := regMock.NewServiceRegistrationHandler(logger) + consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) { if !called { called = true return nil, nil } - reg := &agentconsul.AllocRegistration{ + reg := &serviceregistration.AllocRegistration{ Tasks: taskRegs, } @@ -331,9 +331,9 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) { Name: task.Services[0].Checks[1].Name, Status: consulapi.HealthCritical, } - taskRegs := map[string]*agentconsul.ServiceRegistrations{ + taskRegs := map[string]*serviceregistration.ServiceRegistrations{ task.Name: { - Services: map[string]*agentconsul.ServiceRegistration{ + Services: map[string]*serviceregistration.ServiceRegistration{ task.Services[0].Name: { Service: &consulapi.AgentService{ ID: "foo", @@ -351,14 +351,14 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) { // Don't reply on the first call called := false - consul := consul.NewMockConsulServiceClient(t, logger) - consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + consul := regMock.NewServiceRegistrationHandler(logger) + consul.AllocRegistrationsFn = func(string) (*serviceregistration.AllocRegistration, error) { if !called { called = true return nil, nil } - reg := &agentconsul.AllocRegistration{ + reg := &serviceregistration.AllocRegistration{ Tasks: taskRegs, } diff --git a/client/allocrunner/taskrunner/envoy_bootstrap_hook.go b/client/allocrunner/taskrunner/envoy_bootstrap_hook.go index a3025927b..712b5c236 100644 --- a/client/allocrunner/taskrunner/envoy_bootstrap_hook.go +++ b/client/allocrunner/taskrunner/envoy_bootstrap_hook.go @@ -17,8 +17,8 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocdir" ifs "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/taskenv" - agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" @@ -451,9 +451,9 @@ func (h *envoyBootstrapHook) grpcAddress(env map[string]string) string { } func (h *envoyBootstrapHook) proxyServiceID(group string, service *structs.Service) string { - // Note, it is critical the ID here matches what is actually registered in Consul. - // See: WorkloadServices.Name in structs.go - return agentconsul.MakeAllocServiceID(h.alloc.ID, "group-"+group, service) + // Note, it is critical the ID here matches what is actually registered in + // Consul. See: WorkloadServices.Name in serviceregistration/workload.go. + return serviceregistration.MakeAllocServiceID(h.alloc.ID, "group-"+group, service) } // newEnvoyBootstrapArgs is used to prepare for the invocation of the diff --git a/client/allocrunner/taskrunner/script_check_hook.go b/client/allocrunner/taskrunner/script_check_hook.go index 332ec6673..e556b6f72 100644 --- a/client/allocrunner/taskrunner/script_check_hook.go +++ b/client/allocrunner/taskrunner/script_check_hook.go @@ -10,7 +10,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" - "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/taskenv" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" @@ -26,7 +26,7 @@ const defaultShutdownWait = time.Minute type scriptCheckHookConfig struct { alloc *structs.Allocation task *structs.Task - consul consul.ConsulServiceAPI + consul serviceregistration.Handler logger log.Logger shutdownWait time.Duration } @@ -34,7 +34,7 @@ type scriptCheckHookConfig struct { // scriptCheckHook implements a task runner hook for running script // checks in the context of a task type scriptCheckHook struct { - consul consul.ConsulServiceAPI + consul serviceregistration.Handler consulNamespace string alloc *structs.Allocation task *structs.Task @@ -182,7 +182,7 @@ func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck { if check.Type != structs.ServiceCheckScript { continue } - serviceID := agentconsul.MakeAllocServiceID( + serviceID := serviceregistration.MakeAllocServiceID( h.alloc.ID, h.task.Name, service) sc := newScriptCheck(&scriptCheckConfig{ consulNamespace: h.consulNamespace, @@ -222,7 +222,7 @@ func (h *scriptCheckHook) newScriptChecks() map[string]*scriptCheck { continue } groupTaskName := "group-" + tg.Name - serviceID := agentconsul.MakeAllocServiceID( + serviceID := serviceregistration.MakeAllocServiceID( h.alloc.ID, groupTaskName, service) sc := newScriptCheck(&scriptCheckConfig{ consulNamespace: h.consulNamespace, diff --git a/client/allocrunner/taskrunner/script_check_hook_test.go b/client/allocrunner/taskrunner/script_check_hook_test.go index 0d50c4fc0..5463a0b2e 100644 --- a/client/allocrunner/taskrunner/script_check_hook_test.go +++ b/client/allocrunner/taskrunner/script_check_hook_test.go @@ -10,7 +10,8 @@ import ( "github.com/hashicorp/consul/api" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" - "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/serviceregistration" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/taskenv" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" @@ -226,7 +227,7 @@ func TestScript_Exec_Codes(t *testing.T) { func TestScript_TaskEnvInterpolation(t *testing.T) { logger := testlog.HCLogger(t) - consulClient := consul.NewMockConsulServiceClient(t, logger) + consulClient := regMock.NewServiceRegistrationHandler(logger) exec, cancel := newBlockingScriptExec() defer cancel() @@ -262,7 +263,7 @@ func TestScript_TaskEnvInterpolation(t *testing.T) { scHook.driverExec = exec expectedSvc := svcHook.getWorkloadServices().Services[0] - expected := agentconsul.MakeCheckID(agentconsul.MakeAllocServiceID( + expected := agentconsul.MakeCheckID(serviceregistration.MakeAllocServiceID( alloc.ID, task.Name, expectedSvc), expectedSvc.Checks[0]) actual := scHook.newScriptChecks() @@ -278,7 +279,7 @@ func TestScript_TaskEnvInterpolation(t *testing.T) { svcHook.taskEnv = env expectedSvc = svcHook.getWorkloadServices().Services[0] - expected = agentconsul.MakeCheckID(agentconsul.MakeAllocServiceID( + expected = agentconsul.MakeCheckID(serviceregistration.MakeAllocServiceID( alloc.ID, task.Name, expectedSvc), expectedSvc.Checks[0]) actual = scHook.newScriptChecks() diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index ecc022c40..9684317e5 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -8,7 +8,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" - "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/taskenv" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" @@ -24,7 +24,7 @@ var _ interfaces.TaskUpdateHook = &serviceHook{} type serviceHookConfig struct { alloc *structs.Allocation task *structs.Task - consulServices consul.ConsulServiceAPI + consulServices serviceregistration.Handler consulNamespace string // Restarter is a subset of the TaskLifecycle interface @@ -37,7 +37,7 @@ type serviceHook struct { allocID string taskName string consulNamespace string - consulServices consul.ConsulServiceAPI + consulServices serviceregistration.Handler restarter agentconsul.WorkloadRestarter logger log.Logger @@ -193,21 +193,21 @@ func (h *serviceHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, return nil } -func (h *serviceHook) getWorkloadServices() *agentconsul.WorkloadServices { +func (h *serviceHook) getWorkloadServices() *serviceregistration.WorkloadServices { // Interpolate with the task's environment interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services) // Create task services struct with request's driver metadata - return &agentconsul.WorkloadServices{ - AllocID: h.allocID, - Task: h.taskName, - ConsulNamespace: h.consulNamespace, - Restarter: h.restarter, - Services: interpolatedServices, - DriverExec: h.driverExec, - DriverNetwork: h.driverNet, - Networks: h.networks, - Canary: h.canary, - Ports: h.ports, + return &serviceregistration.WorkloadServices{ + AllocID: h.allocID, + Task: h.taskName, + Namespace: h.consulNamespace, + Restarter: h.restarter, + Services: interpolatedServices, + DriverExec: h.driverExec, + DriverNetwork: h.driverNet, + Networks: h.networks, + Canary: h.canary, + Ports: h.ports, } } diff --git a/client/allocrunner/taskrunner/service_hook_test.go b/client/allocrunner/taskrunner/service_hook_test.go index bdae6bfd0..4489e7220 100644 --- a/client/allocrunner/taskrunner/service_hook_test.go +++ b/client/allocrunner/taskrunner/service_hook_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/hashicorp/nomad/client/allocrunner/interfaces" - "github.com/hashicorp/nomad/client/consul" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/stretchr/testify/require" @@ -20,7 +20,7 @@ var _ interfaces.TaskUpdateHook = (*serviceHook)(nil) func TestUpdate_beforePoststart(t *testing.T) { alloc := mock.Alloc() logger := testlog.HCLogger(t) - c := consul.NewMockConsulServiceClient(t, logger) + c := regMock.NewServiceRegistrationHandler(logger) hook := newServiceHook(serviceHookConfig{ alloc: alloc, @@ -56,7 +56,7 @@ func Test_serviceHook_multipleDeRegisterCall(t *testing.T) { alloc := mock.Alloc() logger := testlog.HCLogger(t) - c := consul.NewMockConsulServiceClient(t, logger) + c := regMock.NewServiceRegistrationHandler(logger) hook := newServiceHook(serviceHookConfig{ alloc: alloc, diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 3cf751cf1..41938aea3 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -25,6 +25,7 @@ import ( cinterfaces "github.com/hashicorp/nomad/client/interfaces" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + "github.com/hashicorp/nomad/client/serviceregistration" cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" @@ -166,7 +167,7 @@ type TaskRunner struct { // consulClient is the client used by the consul service hook for // registering services and checks - consulServiceClient consul.ConsulServiceAPI + consulServiceClient serviceregistration.Handler // consulProxiesClient is the client used by the envoy version hook for // asking consul what version of envoy nomad should inject into the connect @@ -248,7 +249,7 @@ type Config struct { Logger log.Logger // Consul is the client to use for managing Consul service registrations - Consul consul.ConsulServiceAPI + Consul serviceregistration.Handler // ConsulProxies is the client to use for looking up supported envoy versions // from Consul. diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 77741c802..139808d91 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -24,6 +24,7 @@ import ( consulapi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" cstate "github.com/hashicorp/nomad/client/state" ctestutil "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/client/vaultclient" @@ -109,7 +110,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri Task: thisTask, TaskDir: taskDir, Logger: clientConf.Logger, - Consul: consulapi.NewMockConsulServiceClient(t, logger), + Consul: regMock.NewServiceRegistrationHandler(logger), ConsulSI: consulapi.NewMockServiceIdentitiesClient(), Vault: vaultclient.NewMockVaultClient(), StateDB: cstate.NoopDB{}, @@ -939,7 +940,7 @@ func TestTaskRunner_ShutdownDelay(t *testing.T) { tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient) + mockConsul := conf.Consul.(*regMock.ServiceRegistrationHandler) // Wait for the task to start testWaitForTaskToStart(t, tr) @@ -1027,7 +1028,7 @@ func TestTaskRunner_NoShutdownDelay(t *testing.T) { tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient) + mockConsul := conf.Consul.(*regMock.ServiceRegistrationHandler) testWaitForTaskToStart(t, tr) @@ -2479,7 +2480,7 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) - consul := conf.Consul.(*consulapi.MockConsulServiceClient) + consul := conf.Consul.(*regMock.ServiceRegistrationHandler) consulOps := consul.GetOps() require.Len(t, consulOps, 4) diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index da850719e..4fde2cd02 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" @@ -62,7 +63,7 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu Logger: clientConf.Logger, ClientConfig: clientConf, StateDB: state.NoopDB{}, - Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), + Consul: mock.NewServiceRegistrationHandler(clientConf.Logger), ConsulSI: consul.NewMockServiceIdentitiesClient(), Vault: vaultclient.NewMockVaultClient(), StateUpdater: &MockStateUpdater{}, diff --git a/client/client.go b/client/client.go index 826453170..c70a1547d 100644 --- a/client/client.go +++ b/client/client.go @@ -39,6 +39,7 @@ import ( "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/servers" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/stats" cstructs "github.com/hashicorp/nomad/client/structs" @@ -227,7 +228,7 @@ type Client struct { // consulService is Nomad's custom Consul client for managing services // and checks. - consulService consulApi.ConsulServiceAPI + consulService serviceregistration.Handler // consulProxies is Nomad's custom Consul client for looking up supported // envoy versions @@ -322,7 +323,7 @@ var ( // registered via https://golang.org/pkg/net/rpc/#Server.RegisterName in place // of the client's normal RPC handlers. This allows server tests to override // the behavior of the client. -func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxies consulApi.SupportedProxiesAPI, consulService consulApi.ConsulServiceAPI, rpcs map[string]interface{}) (*Client, error) { +func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxies consulApi.SupportedProxiesAPI, consulService serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) { // Create the tls wrapper var tlsWrap tlsutil.RegionWrapper if cfg.TLSConfig.EnableRPC { diff --git a/client/client_test.go b/client/client_test.go index d27ab0ed0..30fbf384b 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -14,8 +14,9 @@ import ( memdb "github.com/hashicorp/go-memdb" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" "github.com/hashicorp/nomad/client/config" - consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/fingerprint" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" + cstate "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/pluginutils/catalog" "github.com/hashicorp/nomad/helper/pluginutils/singleton" @@ -29,8 +30,6 @@ import ( psstructs "github.com/hashicorp/nomad/plugins/shared/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" - - cstate "github.com/hashicorp/nomad/client/state" "github.com/stretchr/testify/require" ) @@ -615,7 +614,7 @@ func TestClient_SaveRestoreState(t *testing.T) { logger := testlog.HCLogger(t) c1.config.Logger = logger consulCatalog := consul.NewMockCatalog(logger) - mockService := consulApi.NewMockConsulServiceClient(t, logger) + mockService := regMock.NewServiceRegistrationHandler(logger) // ensure we use non-shutdown driver instances c1.config.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", c1.config.Options, nil) diff --git a/client/consul/consul.go b/client/consul/consul.go index 9459e375d..05f77418c 100644 --- a/client/consul/consul.go +++ b/client/consul/consul.go @@ -1,33 +1,9 @@ package consul import ( - "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" ) -// ConsulServiceAPI is the interface the Nomad Client uses to register and -// remove services and checks from Consul. -// -// ACL requirements -// - service:write -type ConsulServiceAPI interface { - // RegisterWorkload with Consul. Adds all service entries and checks to Consul. - RegisterWorkload(*consul.WorkloadServices) error - - // RemoveWorkload from Consul. Removes all service entries and checks. - RemoveWorkload(*consul.WorkloadServices) - - // UpdateWorkload in Consul. Does not alter the service if only checks have - // changed. - UpdateWorkload(old, newTask *consul.WorkloadServices) error - - // AllocRegistrations returns the registrations for the given allocation. - AllocRegistrations(allocID string) (*consul.AllocRegistration, error) - - // UpdateTTL is used to update the TTL of a check. - UpdateTTL(id, namespace, output, status string) error -} - // TokenDeriverFunc takes an allocation and a set of tasks and derives a // service identity token for each. Requests go through nomad server. type TokenDeriverFunc func(*structs.Allocation, []string) (map[string]string, error) diff --git a/client/consul/consul_testing.go b/client/consul/consul_testing.go deleted file mode 100644 index 605bf96cf..000000000 --- a/client/consul/consul_testing.go +++ /dev/null @@ -1,113 +0,0 @@ -package consul - -import ( - "fmt" - "sync" - "time" - - log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/command/agent/consul" - testing "github.com/mitchellh/go-testing-interface" -) - -// MockConsulOp represents the register/deregister operations. -type MockConsulOp struct { - Op string // add, remove, or update - AllocID string - Name string // task or group name - OccurredAt time.Time -} - -func NewMockConsulOp(op, allocID, name string) MockConsulOp { - switch op { - case "add", "remove", "update", "alloc_registrations", - "add_group", "remove_group", "update_group", "update_ttl": - default: - panic(fmt.Errorf("invalid consul op: %s", op)) - } - return MockConsulOp{ - Op: op, - AllocID: allocID, - Name: name, - OccurredAt: time.Now(), - } -} - -// MockConsulServiceClient implements the ConsulServiceAPI interface to record -// and log task registration/deregistration. -type MockConsulServiceClient struct { - ops []MockConsulOp - mu sync.Mutex - - logger log.Logger - - // AllocRegistrationsFn allows injecting return values for the - // AllocRegistrations function. - AllocRegistrationsFn func(allocID string) (*consul.AllocRegistration, error) -} - -func NewMockConsulServiceClient(t testing.T, logger log.Logger) *MockConsulServiceClient { - logger = logger.Named("mock_consul") - m := MockConsulServiceClient{ - ops: make([]MockConsulOp, 0, 20), - logger: logger, - } - return &m -} - -func (m *MockConsulServiceClient) UpdateWorkload(old, newSvcs *consul.WorkloadServices) error { - m.mu.Lock() - defer m.mu.Unlock() - m.logger.Trace("UpdateWorkload", "alloc_id", newSvcs.AllocID, "name", newSvcs.Name(), - "old_services", len(old.Services), "new_services", len(newSvcs.Services), - ) - m.ops = append(m.ops, NewMockConsulOp("update", newSvcs.AllocID, newSvcs.Name())) - return nil -} - -func (m *MockConsulServiceClient) RegisterWorkload(svcs *consul.WorkloadServices) error { - m.mu.Lock() - defer m.mu.Unlock() - m.logger.Trace("RegisterWorkload", "alloc_id", svcs.AllocID, "name", svcs.Name(), - "services", len(svcs.Services), - ) - m.ops = append(m.ops, NewMockConsulOp("add", svcs.AllocID, svcs.Name())) - return nil -} - -func (m *MockConsulServiceClient) RemoveWorkload(svcs *consul.WorkloadServices) { - m.mu.Lock() - defer m.mu.Unlock() - m.logger.Trace("RemoveWorkload", "alloc_id", svcs.AllocID, "name", svcs.Name(), - "services", len(svcs.Services), - ) - m.ops = append(m.ops, NewMockConsulOp("remove", svcs.AllocID, svcs.Name())) -} - -func (m *MockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) { - m.mu.Lock() - defer m.mu.Unlock() - m.logger.Trace("AllocRegistrations", "alloc_id", allocID) - m.ops = append(m.ops, NewMockConsulOp("alloc_registrations", allocID, "")) - - if m.AllocRegistrationsFn != nil { - return m.AllocRegistrationsFn(allocID) - } - - return nil, nil -} - -func (m *MockConsulServiceClient) UpdateTTL(checkID, namespace, output, status string) error { - // TODO(tgross): this method is here so we can implement the - // interface but the locking we need for testing creates a lot - // of opportunities for deadlocks in testing that will never - // appear in live code. - m.logger.Trace("UpdateTTL", "check_id", checkID, "namespace", namespace, "status", status) - return nil -} - -func (m *MockConsulServiceClient) GetOps() []MockConsulOp { - m.mu.Lock() - defer m.mu.Unlock() - return m.ops -} diff --git a/client/serviceregistration/address.go b/client/serviceregistration/address.go new file mode 100644 index 000000000..4a67c94ab --- /dev/null +++ b/client/serviceregistration/address.go @@ -0,0 +1,136 @@ +package serviceregistration + +import ( + "fmt" + "strconv" + + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" +) + +// GetAddress returns the IP and port to use for a service or check. If no port +// label is specified (an empty value), zero values are returned because no +// address could be resolved. +func GetAddress( + addrMode, portLabel string, networks structs.Networks, driverNet *drivers.DriverNetwork, + ports structs.AllocatedPorts, netStatus *structs.AllocNetworkStatus) (string, int, error) { + + switch addrMode { + case structs.AddressModeAuto: + if driverNet.Advertise() { + addrMode = structs.AddressModeDriver + } else { + addrMode = structs.AddressModeHost + } + return GetAddress(addrMode, portLabel, networks, driverNet, ports, netStatus) + case structs.AddressModeHost: + if portLabel == "" { + if len(networks) != 1 { + // If no networks are specified return zero + // values. Consul will advertise the host IP + // with no port. This is the pre-0.7.1 behavior + // some people rely on. + return "", 0, nil + } + + return networks[0].IP, 0, nil + } + + // Default path: use host ip:port + // Try finding port in the AllocatedPorts struct first + // Check in Networks struct for backwards compatibility if not found + mapping, ok := ports.Get(portLabel) + if !ok { + mapping = networks.Port(portLabel) + if mapping.Value > 0 { + return mapping.HostIP, mapping.Value, nil + } + + // If port isn't a label, try to parse it as a literal port number + port, err := strconv.Atoi(portLabel) + if err != nil { + // Don't include Atoi error message as user likely + // never intended it to be a numeric and it creates a + // confusing error message + return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel) + } + if port <= 0 { + return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel) + } + + // A number was given which will use the Consul agent's address and the given port + // Returning a blank string as an address will use the Consul agent's address + return "", port, nil + } + return mapping.HostIP, mapping.Value, nil + + case structs.AddressModeDriver: + // Require a driver network if driver address mode is used + if driverNet == nil { + return "", 0, fmt.Errorf(`cannot use address_mode="driver": no driver network exists`) + } + + // If no port label is specified just return the IP + if portLabel == "" { + return driverNet.IP, 0, nil + } + + // If the port is a label, use the driver's port (not the host's) + if port, ok := ports.Get(portLabel); ok { + return driverNet.IP, port.To, nil + } + + // Check if old style driver portmap is used + if port, ok := driverNet.PortMap[portLabel]; ok { + return driverNet.IP, port, nil + } + + // If port isn't a label, try to parse it as a literal port number + port, err := strconv.Atoi(portLabel) + if err != nil { + // Don't include Atoi error message as user likely + // never intended it to be a numeric and it creates a + // confusing error message + return "", 0, fmt.Errorf("invalid port label %q: port labels in driver address_mode must be numeric or in the driver's port map", portLabel) + } + if port <= 0 { + return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel) + } + + return driverNet.IP, port, nil + + case structs.AddressModeAlloc: + if netStatus == nil { + return "", 0, fmt.Errorf(`cannot use address_mode="alloc": no allocation network status reported`) + } + + // If no port label is specified just return the IP + if portLabel == "" { + return netStatus.Address, 0, nil + } + + // If port is a label and is found then return it + if port, ok := ports.Get(portLabel); ok { + // Use port.To value unless not set + if port.To > 0 { + return netStatus.Address, port.To, nil + } + return netStatus.Address, port.Value, nil + } + + // Check if port is a literal number + port, err := strconv.Atoi(portLabel) + if err != nil { + // User likely specified wrong port label here + return "", 0, fmt.Errorf("invalid port %q: port label not found or is not numeric", portLabel) + } + if port <= 0 { + return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel) + } + return netStatus.Address, port, nil + + default: + // Shouldn't happen due to validation, but enforce invariants + return "", 0, fmt.Errorf("invalid address mode %q", addrMode) + } +} diff --git a/client/serviceregistration/address_test.go b/client/serviceregistration/address_test.go new file mode 100644 index 000000000..fbad6acd1 --- /dev/null +++ b/client/serviceregistration/address_test.go @@ -0,0 +1,361 @@ +package serviceregistration + +import ( + "testing" + + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/stretchr/testify/require" +) + +func Test_GetAddress(t *testing.T) { + const HostIP = "127.0.0.1" + + testCases := []struct { + name string + + // Parameters + mode string + portLabel string + host map[string]int // will be converted to structs.Networks + driver *drivers.DriverNetwork + ports structs.AllocatedPorts + status *structs.AllocNetworkStatus + + // Results + expectedIP string + expectedPort int + expectedErr string + }{ + // Valid Configurations + { + name: "ExampleService", + mode: structs.AddressModeAuto, + portLabel: "db", + host: map[string]int{"db": 12435}, + driver: &drivers.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + }, + expectedIP: HostIP, + expectedPort: 12435, + }, + { + name: "host", + mode: structs.AddressModeHost, + portLabel: "db", + host: map[string]int{"db": 12345}, + driver: &drivers.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + }, + expectedIP: HostIP, + expectedPort: 12345, + }, + { + name: "driver", + mode: structs.AddressModeDriver, + portLabel: "db", + host: map[string]int{"db": 12345}, + driver: &drivers.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + }, + expectedIP: "10.1.2.3", + expectedPort: 6379, + }, + { + name: "AutoDriver", + mode: structs.AddressModeAuto, + portLabel: "db", + host: map[string]int{"db": 12345}, + driver: &drivers.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + AutoAdvertise: true, + }, + expectedIP: "10.1.2.3", + expectedPort: 6379, + }, + { + name: "DriverCustomPort", + mode: structs.AddressModeDriver, + portLabel: "7890", + host: map[string]int{"db": 12345}, + driver: &drivers.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + }, + expectedIP: "10.1.2.3", + expectedPort: 7890, + }, + + // Invalid Configurations + { + name: "DriverWithoutNetwork", + mode: structs.AddressModeDriver, + portLabel: "db", + host: map[string]int{"db": 12345}, + driver: nil, + expectedErr: "no driver network exists", + }, + { + name: "DriverBadPort", + mode: structs.AddressModeDriver, + portLabel: "bad-port-label", + host: map[string]int{"db": 12345}, + driver: &drivers.DriverNetwork{ + PortMap: map[string]int{"db": 6379}, + IP: "10.1.2.3", + }, + expectedErr: "invalid port", + }, + { + name: "DriverZeroPort", + mode: structs.AddressModeDriver, + portLabel: "0", + driver: &drivers.DriverNetwork{ + IP: "10.1.2.3", + }, + expectedErr: "invalid port", + }, + { + name: "HostBadPort", + mode: structs.AddressModeHost, + portLabel: "bad-port-label", + expectedErr: "invalid port", + }, + { + name: "InvalidMode", + mode: "invalid-mode", + portLabel: "80", + expectedErr: "invalid address mode", + }, + { + name: "NoPort_AutoMode", + mode: structs.AddressModeAuto, + expectedIP: HostIP, + }, + { + name: "NoPort_HostMode", + mode: structs.AddressModeHost, + expectedIP: HostIP, + }, + { + name: "NoPort_DriverMode", + mode: structs.AddressModeDriver, + driver: &drivers.DriverNetwork{ + IP: "10.1.2.3", + }, + expectedIP: "10.1.2.3", + }, + + // Scenarios using port 0.12 networking fields (NetworkStatus, AllocatedPortMapping) + { + name: "ExampleServer_withAllocatedPorts", + mode: structs.AddressModeAuto, + portLabel: "db", + ports: []structs.AllocatedPortMapping{ + { + Label: "db", + Value: 12435, + To: 6379, + HostIP: HostIP, + }, + }, + status: &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "172.26.0.1", + }, + expectedIP: HostIP, + expectedPort: 12435, + }, + { + name: "Host_withAllocatedPorts", + mode: structs.AddressModeHost, + portLabel: "db", + ports: []structs.AllocatedPortMapping{ + { + Label: "db", + Value: 12345, + To: 6379, + HostIP: HostIP, + }, + }, + status: &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "172.26.0.1", + }, + expectedIP: HostIP, + expectedPort: 12345, + }, + { + name: "Driver_withAllocatedPorts", + mode: structs.AddressModeDriver, + portLabel: "db", + ports: []structs.AllocatedPortMapping{ + { + Label: "db", + Value: 12345, + To: 6379, + HostIP: HostIP, + }, + }, + driver: &drivers.DriverNetwork{ + IP: "10.1.2.3", + }, + status: &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "172.26.0.1", + }, + expectedIP: "10.1.2.3", + expectedPort: 6379, + }, + { + name: "AutoDriver_withAllocatedPorts", + mode: structs.AddressModeAuto, + portLabel: "db", + ports: []structs.AllocatedPortMapping{ + { + Label: "db", + Value: 12345, + To: 6379, + HostIP: HostIP, + }, + }, + driver: &drivers.DriverNetwork{ + IP: "10.1.2.3", + AutoAdvertise: true, + }, + status: &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "172.26.0.1", + }, + expectedIP: "10.1.2.3", + expectedPort: 6379, + }, + { + name: "DriverCustomPort_withAllocatedPorts", + mode: structs.AddressModeDriver, + portLabel: "7890", + ports: []structs.AllocatedPortMapping{ + { + Label: "db", + Value: 12345, + To: 6379, + HostIP: HostIP, + }, + }, + driver: &drivers.DriverNetwork{ + IP: "10.1.2.3", + }, + status: &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "172.26.0.1", + }, + expectedIP: "10.1.2.3", + expectedPort: 7890, + }, + { + name: "Host_MultiHostInterface", + mode: structs.AddressModeAuto, + portLabel: "db", + ports: []structs.AllocatedPortMapping{ + { + Label: "db", + Value: 12345, + To: 6379, + HostIP: "127.0.0.100", + }, + }, + status: &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "172.26.0.1", + }, + expectedIP: "127.0.0.100", + expectedPort: 12345, + }, + { + name: "Alloc", + mode: structs.AddressModeAlloc, + portLabel: "db", + ports: []structs.AllocatedPortMapping{ + { + Label: "db", + Value: 12345, + To: 6379, + HostIP: HostIP, + }, + }, + status: &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "172.26.0.1", + }, + expectedIP: "172.26.0.1", + expectedPort: 6379, + }, + { + name: "Alloc no to value", + mode: structs.AddressModeAlloc, + portLabel: "db", + ports: []structs.AllocatedPortMapping{ + { + Label: "db", + Value: 12345, + HostIP: HostIP, + }, + }, + status: &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "172.26.0.1", + }, + expectedIP: "172.26.0.1", + expectedPort: 12345, + }, + { + name: "AllocCustomPort", + mode: structs.AddressModeAlloc, + portLabel: "6379", + status: &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "172.26.0.1", + }, + expectedIP: "172.26.0.1", + expectedPort: 6379, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + // Convert host port map into a structs.Networks. + networks := []*structs.NetworkResource{ + { + IP: HostIP, + ReservedPorts: make([]structs.Port, len(tc.host)), + }, + } + + i := 0 + for label, port := range tc.host { + networks[0].ReservedPorts[i].Label = label + networks[0].ReservedPorts[i].Value = port + i++ + } + + // Run the GetAddress function. + actualIP, actualPort, actualErr := GetAddress( + tc.mode, tc.portLabel, networks, tc.driver, tc.ports, tc.status) + + // Assert the results + require.Equal(t, tc.expectedIP, actualIP, "IP mismatch") + require.Equal(t, tc.expectedPort, actualPort, "Port mismatch") + if tc.expectedErr == "" { + require.Nil(t, actualErr) + } else { + require.Error(t, actualErr) + require.Contains(t, actualErr.Error(), tc.expectedErr) + } + }) + } +} diff --git a/client/serviceregistration/id.go b/client/serviceregistration/id.go new file mode 100644 index 000000000..edcabd4c6 --- /dev/null +++ b/client/serviceregistration/id.go @@ -0,0 +1,27 @@ +package serviceregistration + +import ( + "fmt" + + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // nomadServicePrefix is the prefix that scopes all Nomad registered + // services (both agent and task entries). + nomadServicePrefix = "_nomad" + + // nomadTaskPrefix is the prefix that scopes Nomad registered services + // for tasks. + nomadTaskPrefix = nomadServicePrefix + "-task-" +) + +// MakeAllocServiceID creates a unique ID for identifying an alloc service in +// a service registration provider. Both Nomad and Consul solutions use the +// same ID format to provide consistency. +// +// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http-http +func MakeAllocServiceID(allocID, taskName string, service *structs.Service) string { + return fmt.Sprintf("%s%s-%s-%s-%s", + nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel) +} diff --git a/client/serviceregistration/id_test.go b/client/serviceregistration/id_test.go new file mode 100644 index 000000000..b41085a9f --- /dev/null +++ b/client/serviceregistration/id_test.go @@ -0,0 +1,36 @@ +package serviceregistration + +import ( + "testing" + + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func Test_MakeAllocServiceID(t *testing.T) { + testCases := []struct { + inputAllocID string + inputTaskName string + inputService *structs.Service + expectedOutput string + name string + }{ + { + inputAllocID: "7ac7c672-1824-6f06-644c-4c249e1578b9", + inputTaskName: "cache", + inputService: &structs.Service{ + Name: "redis", + PortLabel: "db", + }, + expectedOutput: "_nomad-task-7ac7c672-1824-6f06-644c-4c249e1578b9-cache-redis-db", + name: "generic 1", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualOutput := MakeAllocServiceID(tc.inputAllocID, tc.inputTaskName, tc.inputService) + require.Equal(t, tc.expectedOutput, actualOutput) + }) + } +} diff --git a/client/serviceregistration/mock/mock.go b/client/serviceregistration/mock/mock.go new file mode 100644 index 000000000..899c4f824 --- /dev/null +++ b/client/serviceregistration/mock/mock.go @@ -0,0 +1,125 @@ +package mock + +import ( + "fmt" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/serviceregistration" +) + +// Ensure that the mock handler implements the service registration handler +// interface. +var _ serviceregistration.Handler = (*ServiceRegistrationHandler)(nil) + +// ServiceRegistrationHandler is the mock implementation of the +// serviceregistration.Handler interface and can be used for testing. +type ServiceRegistrationHandler struct { + log hclog.Logger + + // ops tracks the requested operations by the caller during the entire + // lifecycle of the ServiceRegistrationHandler. The mutex should be used + // whenever interacting with this. + mu sync.Mutex + ops []Operation + + // AllocRegistrationsFn allows injecting return values for the + // AllocRegistrations function. + AllocRegistrationsFn func(allocID string) (*serviceregistration.AllocRegistration, error) +} + +// NewServiceRegistrationHandler returns a ready to use +// ServiceRegistrationHandler for testing. +func NewServiceRegistrationHandler(log hclog.Logger) *ServiceRegistrationHandler { + return &ServiceRegistrationHandler{ + ops: make([]Operation, 0, 20), + log: log.Named("mock_service_registration"), + } +} + +func (h *ServiceRegistrationHandler) RegisterWorkload(services *serviceregistration.WorkloadServices) error { + h.mu.Lock() + defer h.mu.Unlock() + + h.log.Trace("RegisterWorkload", "alloc_id", services.AllocID, + "name", services.Name(), "services", len(services.Services)) + + h.ops = append(h.ops, newOperation("add", services.AllocID, services.Name())) + return nil +} + +func (h *ServiceRegistrationHandler) RemoveWorkload(services *serviceregistration.WorkloadServices) { + h.mu.Lock() + defer h.mu.Unlock() + + h.log.Trace("RemoveWorkload", "alloc_id", services.AllocID, + "name", services.Name(), "services", len(services.Services)) + + h.ops = append(h.ops, newOperation("remove", services.AllocID, services.Name())) +} + +func (h *ServiceRegistrationHandler) UpdateWorkload(old, newServices *serviceregistration.WorkloadServices) error { + h.mu.Lock() + defer h.mu.Unlock() + + h.log.Trace("UpdateWorkload", "alloc_id", newServices.AllocID, "name", newServices.Name(), + "old_services", len(old.Services), "new_services", len(newServices.Services)) + + h.ops = append(h.ops, newOperation("update", newServices.AllocID, newServices.Name())) + return nil +} + +func (h *ServiceRegistrationHandler) AllocRegistrations(allocID string) (*serviceregistration.AllocRegistration, error) { + h.mu.Lock() + defer h.mu.Unlock() + + h.log.Trace("AllocRegistrations", "alloc_id", allocID) + h.ops = append(h.ops, newOperation("alloc_registrations", allocID, "")) + + if h.AllocRegistrationsFn != nil { + return h.AllocRegistrationsFn(allocID) + } + return nil, nil +} + +func (h *ServiceRegistrationHandler) UpdateTTL(checkID, namespace, output, status string) error { + // TODO(tgross): this method is here so we can implement the + // interface but the locking we need for testing creates a lot + // of opportunities for deadlocks in testing that will never + // appear in live code. + h.log.Trace("UpdateTTL", "check_id", checkID, "namespace", namespace, "status", status) + return nil +} + +// GetOps returns all stored operations within the handler. +func (h *ServiceRegistrationHandler) GetOps() []Operation { + h.mu.Lock() + defer h.mu.Unlock() + + return h.ops +} + +// Operation represents the register/deregister operations. +type Operation struct { + Op string // add, remove, or update + AllocID string + Name string // task or group name + OccurredAt time.Time +} + +// newOperation generates a new Operation for the given parameters. +func newOperation(op, allocID, name string) Operation { + switch op { + case "add", "remove", "update", "alloc_registrations", + "add_group", "remove_group", "update_group", "update_ttl": + default: + panic(fmt.Errorf("invalid consul op: %s", op)) + } + return Operation{ + Op: op, + AllocID: allocID, + Name: name, + OccurredAt: time.Now(), + } +} diff --git a/client/serviceregistration/service_registration.go b/client/serviceregistration/service_registration.go new file mode 100644 index 000000000..4981ff1cf --- /dev/null +++ b/client/serviceregistration/service_registration.go @@ -0,0 +1,157 @@ +package serviceregistration + +import ( + "context" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" +) + +// Handler is the interface the Nomad Client uses to register, update and +// remove services and checks from service registration providers. Currently, +// Consul and Nomad are supported providers. +// +// When utilising Consul, the ACL "service:write" is required. It supports all +// functionality and is the OG/GOAT. +// +// When utilising Nomad, the client secret ID is used for authorisation. It +// currently supports service registrations only. +type Handler interface { + + // RegisterWorkload adds all service entries and checks to the backend + // provider. Whilst callers attempt to ensure WorkloadServices.Services is + // not empty before calling this function, implementations should also + // perform this. + RegisterWorkload(workload *WorkloadServices) error + + // RemoveWorkload all service entries and checks from the backend provider + // that are found within the passed WorkloadServices object. Whilst callers + // attempt to ensure WorkloadServices.Services is not empty before calling + // this function, implementations should also perform this. + RemoveWorkload(workload *WorkloadServices) + + // UpdateWorkload removes workload as specified by the old parameter, and + // adds workload as specified by the new parameter. Callers do not perform + // any deduplication on both objects, it is therefore the responsibility of + // the implementation. + UpdateWorkload(old, newTask *WorkloadServices) error + + // AllocRegistrations returns the registrations for the given allocation. + AllocRegistrations(allocID string) (*AllocRegistration, error) + + // UpdateTTL is used to update the TTL of an individual service + // registration check. + UpdateTTL(id, namespace, output, status string) error +} + +// WorkloadRestarter allows the checkWatcher to restart tasks or entire task +// groups. +type WorkloadRestarter interface { + Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error +} + +// AllocRegistration holds the status of services registered for a particular +// allocations by task. +type AllocRegistration struct { + // Tasks maps the name of a task to its registered services and checks. + Tasks map[string]*ServiceRegistrations +} + +// Copy performs a deep copy of the AllocRegistration object. +func (a *AllocRegistration) Copy() *AllocRegistration { + c := &AllocRegistration{ + Tasks: make(map[string]*ServiceRegistrations, len(a.Tasks)), + } + + for k, v := range a.Tasks { + c.Tasks[k] = v.copy() + } + + return c +} + +// NumServices returns the number of registered services. +func (a *AllocRegistration) NumServices() int { + if a == nil { + return 0 + } + + total := 0 + for _, treg := range a.Tasks { + for _, sreg := range treg.Services { + if sreg.Service != nil { + total++ + } + } + } + + return total +} + +// NumChecks returns the number of registered checks. +func (a *AllocRegistration) NumChecks() int { + if a == nil { + return 0 + } + + total := 0 + for _, treg := range a.Tasks { + for _, sreg := range treg.Services { + total += len(sreg.Checks) + } + } + + return total +} + +// ServiceRegistrations holds the status of services registered for a +// particular task or task group. +type ServiceRegistrations struct { + Services map[string]*ServiceRegistration +} + +func (t *ServiceRegistrations) copy() *ServiceRegistrations { + c := &ServiceRegistrations{ + Services: make(map[string]*ServiceRegistration, len(t.Services)), + } + + for k, v := range t.Services { + c.Services[k] = v.copy() + } + + return c +} + +// ServiceRegistration holds the status of a registered Consul Service and its +// Checks. +type ServiceRegistration struct { + // serviceID and checkIDs are internal fields that track just the IDs of the + // services/checks registered in Consul. It is used to materialize the other + // fields when queried. + ServiceID string + CheckIDs map[string]struct{} + + // CheckOnUpdate is a map of checkIDs and the associated OnUpdate value + // from the ServiceCheck It is used to determine how a reported checks + // status should be evaluated. + CheckOnUpdate map[string]string + + // Service is the AgentService registered in Consul. + Service *api.AgentService + + // Checks is the status of the registered checks. + Checks []*api.AgentCheck +} + +func (s *ServiceRegistration) copy() *ServiceRegistration { + // Copy does not copy the external fields but only the internal fields. + // This is so that the caller of AllocRegistrations can not access the + // internal fields and that method uses these fields to populate the + // external fields. + return &ServiceRegistration{ + ServiceID: s.ServiceID, + CheckIDs: helper.CopyMapStringStruct(s.CheckIDs), + CheckOnUpdate: helper.CopyMapStringString(s.CheckOnUpdate), + } +} diff --git a/client/serviceregistration/service_registration_test.go b/client/serviceregistration/service_registration_test.go new file mode 100644 index 000000000..dc6589886 --- /dev/null +++ b/client/serviceregistration/service_registration_test.go @@ -0,0 +1,53 @@ +package serviceregistration + +import ( + "testing" + + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestAllocRegistration_Copy(t *testing.T) { + testCases := []struct { + inputAllocRegistration *AllocRegistration + name string + }{ + { + inputAllocRegistration: &AllocRegistration{ + Tasks: map[string]*ServiceRegistrations{}, + }, + name: "empty tasks map", + }, + { + inputAllocRegistration: &AllocRegistration{ + Tasks: map[string]*ServiceRegistrations{ + "cache": { + Services: map[string]*ServiceRegistration{ + "redis-db": { + ServiceID: "service-id-1", + CheckIDs: map[string]struct{}{ + "check-id-1": {}, + "check-id-2": {}, + "check-id-3": {}, + }, + CheckOnUpdate: map[string]string{ + "check-id-1": structs.OnUpdateIgnore, + "check-id-2": structs.OnUpdateRequireHealthy, + "check-id-3": structs.OnUpdateIgnoreWarn, + }, + }, + }, + }, + }, + }, + name: "non-empty tasks map", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualOutput := tc.inputAllocRegistration.Copy() + require.Equal(t, tc.inputAllocRegistration, actualOutput) + }) + } +} diff --git a/client/serviceregistration/workload.go b/client/serviceregistration/workload.go new file mode 100644 index 000000000..064f4fa06 --- /dev/null +++ b/client/serviceregistration/workload.go @@ -0,0 +1,95 @@ +package serviceregistration + +import ( + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" +) + +// WorkloadServices describes services defined in either a Task or TaskGroup +// that need to be syncronized with a service registration provider. +type WorkloadServices struct { + AllocID string + + // Group in which the service belongs for a group-level service, or the + // group in which task belongs for a task-level service. + Group string + + // Task in which the service belongs for task-level service. Will be empty + // for a group-level service. + Task string + + // JobID provides additional context for providers regarding which job + // caused this registration. + JobID string + + // Canary indicates whether, or not the allocation is a canary. This is + // used to build the correct tags mapping. + Canary bool + + // Namespace is the provider namespace in which services will be + // registered, if the provider supports this functionality. + Namespace string + + // Restarter allows restarting the task or task group depending on the + // check_restart stanzas. + Restarter WorkloadRestarter + + // Services and checks to register for the task. + Services []*structs.Service + + // Networks from the task's resources stanza. + // TODO: remove and use Ports + Networks structs.Networks + + // NetworkStatus from alloc if network namespace is created. + // Can be nil. + NetworkStatus *structs.AllocNetworkStatus + + // AllocatedPorts is the list of port mappings. + Ports structs.AllocatedPorts + + // DriverExec is the script executor for the task's driver. For group + // services this is nil and script execution is managed by a tasklet in the + // taskrunner script_check_hook. + DriverExec interfaces.ScriptExecutor + + // DriverNetwork is the network specified by the driver and may be nil. + DriverNetwork *drivers.DriverNetwork +} + +// RegistrationProvider identifies the service registration provider for the +// WorkloadServices. +func (ws *WorkloadServices) RegistrationProvider() string { + + // Protect against an empty array; it would be embarrassing to panic here. + if len(ws.Services) == 0 { + return "" + } + + // Note(jrasell): a Nomad task group can only currently utilise a single + // service provider for all services included within it. In the event we + // remove this restriction, this will need to change along which a lot of + // other logic. + return ws.Services[0].Provider +} + +// Copy method for easing tests. +func (ws *WorkloadServices) Copy() *WorkloadServices { + newTS := new(WorkloadServices) + *newTS = *ws + + // Deep copy Services + newTS.Services = make([]*structs.Service, len(ws.Services)) + for i := range ws.Services { + newTS.Services[i] = ws.Services[i].Copy() + } + return newTS +} + +func (ws *WorkloadServices) Name() string { + if ws.Task != "" { + return ws.Task + } + return "group-" + ws.Group +} diff --git a/client/serviceregistration/workload_test.go b/client/serviceregistration/workload_test.go new file mode 100644 index 000000000..3152e7492 --- /dev/null +++ b/client/serviceregistration/workload_test.go @@ -0,0 +1,49 @@ +package serviceregistration + +import ( + "testing" + + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestWorkloadServices_RegistrationProvider(t *testing.T) { + testCases := []struct { + inputWorkloadServices *WorkloadServices + expectedOutput string + name string + }{ + { + inputWorkloadServices: &WorkloadServices{ + Services: nil, + }, + expectedOutput: "", + name: "nil panic check", + }, + { + inputWorkloadServices: &WorkloadServices{ + Services: []*structs.Service{ + {Provider: structs.ServiceProviderNomad}, + }, + }, + expectedOutput: "nomad", + name: "nomad provider", + }, + { + inputWorkloadServices: &WorkloadServices{ + Services: []*structs.Service{ + {Provider: structs.ServiceProviderConsul}, + }, + }, + expectedOutput: "consul", + name: "consul provider", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualOutput := tc.inputWorkloadServices.RegistrationProvider() + require.Equal(t, tc.expectedOutput, actualOutput) + }) + } +} diff --git a/client/state/upgrade_int_test.go b/client/state/upgrade_int_test.go index 96df3fbad..22c70581b 100644 --- a/client/state/upgrade_int_test.go +++ b/client/state/upgrade_int_test.go @@ -14,10 +14,10 @@ import ( "github.com/hashicorp/nomad/client/allocrunner" "github.com/hashicorp/nomad/client/allocwatcher" clientconfig "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" . "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/helper/boltdd" @@ -206,7 +206,7 @@ func checkUpgradedAlloc(t *testing.T, path string, db StateDB, alloc *structs.Al Logger: clientConf.Logger, ClientConfig: clientConf, StateDB: db, - Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), + Consul: regMock.NewServiceRegistrationHandler(clientConf.Logger), Vault: vaultclient.NewMockVaultClient(), StateUpdater: &allocrunner.MockStateUpdater{}, PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, diff --git a/client/testing.go b/client/testing.go index 94681f76e..564f4273d 100644 --- a/client/testing.go +++ b/client/testing.go @@ -7,9 +7,9 @@ import ( "time" "github.com/hashicorp/nomad/client/config" - consulapi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/servers" + "github.com/hashicorp/nomad/client/serviceregistration/mock" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/pluginutils/catalog" "github.com/hashicorp/nomad/helper/pluginutils/singleton" @@ -53,7 +53,7 @@ func TestClientWithRPCs(t testing.T, cb func(c *config.Config), rpcs map[string] conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, conf.PluginLoader) } mockCatalog := agentconsul.NewMockCatalog(logger) - mockService := consulapi.NewMockConsulServiceClient(t, logger) + mockService := mock.NewServiceRegistrationHandler(logger) client, err := NewClient(conf, mockCatalog, nil, mockService, rpcs) if err != nil { cleanup() diff --git a/command/agent/consul/group_test.go b/command/agent/consul/group_test.go index a76aac73e..dd7431f7f 100644 --- a/command/agent/consul/group_test.go +++ b/command/agent/consul/group_test.go @@ -7,6 +7,7 @@ import ( consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -92,7 +93,7 @@ func TestConsul_Connect(t *testing.T) { require.NoError(t, err) require.Len(t, services, 2) - serviceID := MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0]) + serviceID := serviceregistration.MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0]) connectID := serviceID + "-sidecar-proxy" require.Contains(t, services, serviceID) diff --git a/command/agent/consul/service_client.go b/command/agent/consul/service_client.go index afd06d92e..1d888b81e 100644 --- a/command/agent/consul/service_client.go +++ b/command/agent/consul/service_client.go @@ -14,14 +14,13 @@ import ( "time" "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/helper/envoy" - "github.com/pkg/errors" - "github.com/hashicorp/consul/api" + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/envoy" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/drivers" + "github.com/pkg/errors" ) const ( @@ -370,109 +369,6 @@ func (o operations) String() string { return fmt.Sprintf("<%d, %d, %d, %d>", len(o.regServices), len(o.regChecks), len(o.deregServices), len(o.deregChecks)) } -// AllocRegistration holds the status of services registered for a particular -// allocations by task. -type AllocRegistration struct { - // Tasks maps the name of a task to its registered services and checks - Tasks map[string]*ServiceRegistrations -} - -func (a *AllocRegistration) copy() *AllocRegistration { - c := &AllocRegistration{ - Tasks: make(map[string]*ServiceRegistrations, len(a.Tasks)), - } - - for k, v := range a.Tasks { - c.Tasks[k] = v.copy() - } - - return c -} - -// NumServices returns the number of registered services -func (a *AllocRegistration) NumServices() int { - if a == nil { - return 0 - } - - total := 0 - for _, treg := range a.Tasks { - for _, sreg := range treg.Services { - if sreg.Service != nil { - total++ - } - } - } - - return total -} - -// NumChecks returns the number of registered checks -func (a *AllocRegistration) NumChecks() int { - if a == nil { - return 0 - } - - total := 0 - for _, treg := range a.Tasks { - for _, sreg := range treg.Services { - total += len(sreg.Checks) - } - } - - return total -} - -// ServiceRegistrations holds the status of services registered for a particular -// task or task group. -type ServiceRegistrations struct { - Services map[string]*ServiceRegistration -} - -func (t *ServiceRegistrations) copy() *ServiceRegistrations { - c := &ServiceRegistrations{ - Services: make(map[string]*ServiceRegistration, len(t.Services)), - } - - for k, v := range t.Services { - c.Services[k] = v.copy() - } - - return c -} - -// ServiceRegistration holds the status of a registered Consul Service and its -// Checks. -type ServiceRegistration struct { - // serviceID and checkIDs are internal fields that track just the IDs of the - // services/checks registered in Consul. It is used to materialize the other - // fields when queried. - serviceID string - checkIDs map[string]struct{} - - // CheckOnUpdate is a map of checkIDs and the associated OnUpdate value - // from the ServiceCheck It is used to determine how a reported checks - // status should be evaluated. - CheckOnUpdate map[string]string - - // Service is the AgentService registered in Consul. - Service *api.AgentService - - // Checks is the status of the registered checks. - Checks []*api.AgentCheck -} - -func (s *ServiceRegistration) copy() *ServiceRegistration { - // Copy does not copy the external fields but only the internal fields. This - // is so that the caller of AllocRegistrations can not access the internal - // fields and that method uses these fields to populate the external fields. - return &ServiceRegistration{ - serviceID: s.serviceID, - checkIDs: helper.CopyMapStringStruct(s.checkIDs), - CheckOnUpdate: helper.CopyMapStringString(s.CheckOnUpdate), - } -} - // ServiceClient handles task and agent service registration with Consul. type ServiceClient struct { agentAPI AgentAPI @@ -503,7 +399,7 @@ type ServiceClient struct { // allocRegistrations stores the services and checks that are registered // with Consul by allocation ID. - allocRegistrations map[string]*AllocRegistration + allocRegistrations map[string]*serviceregistration.AllocRegistration allocRegistrationsLock sync.RWMutex // Nomad agent services and checks that are recorded so they can be removed @@ -550,7 +446,7 @@ func NewServiceClient(agentAPI AgentAPI, namespacesClient *NamespacesClient, log checks: make(map[string]*api.AgentCheckRegistration), explicitlyDeregisteredServices: make(map[string]bool), explicitlyDeregisteredChecks: make(map[string]bool), - allocRegistrations: make(map[string]*AllocRegistration), + allocRegistrations: make(map[string]*serviceregistration.AllocRegistration), agentServices: make(map[string]struct{}), agentChecks: make(map[string]struct{}), checkWatcher: newCheckWatcher(logger, agentAPI, namespacesClient), @@ -1033,14 +929,15 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) // serviceRegs creates service registrations, check registrations, and script // checks from a service. It returns a service registration object with the // service and check IDs populated. -func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, workload *WorkloadServices) ( - *ServiceRegistration, error) { +func (c *ServiceClient) serviceRegs( + ops *operations, service *structs.Service, workload *serviceregistration.WorkloadServices) ( + *serviceregistration.ServiceRegistration, error) { // Get the services ID - id := MakeAllocServiceID(workload.AllocID, workload.Name(), service) - sreg := &ServiceRegistration{ - serviceID: id, - checkIDs: make(map[string]struct{}, len(service.Checks)), + id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service) + sreg := &serviceregistration.ServiceRegistration{ + ServiceID: id, + CheckIDs: make(map[string]struct{}, len(service.Checks)), CheckOnUpdate: make(map[string]string, len(service.Checks)), } @@ -1051,7 +948,8 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w } // Determine the address to advertise based on the mode - ip, port, err := getAddress(addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus) + ip, port, err := serviceregistration.GetAddress( + addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus) if err != nil { return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err) } @@ -1135,7 +1033,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w Kind: kind, ID: id, Name: service.Name, - Namespace: workload.ConsulNamespace, + Namespace: workload.Namespace, Tags: tags, EnableTagOverride: service.EnableTagOverride, Address: ip, @@ -1152,7 +1050,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w return nil, err } for _, registration := range checkRegs { - sreg.checkIDs[registration.ID] = struct{}{} + sreg.CheckIDs[registration.ID] = struct{}{} ops.regChecks = append(ops.regChecks, registration) } @@ -1161,7 +1059,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w // checkRegs creates check registrations for the given service func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service, - workload *WorkloadServices, sreg *ServiceRegistration) ([]*api.AgentCheckRegistration, error) { + workload *serviceregistration.WorkloadServices, sreg *serviceregistration.ServiceRegistration) ([]*api.AgentCheckRegistration, error) { registrations := make([]*api.AgentCheckRegistration, 0, len(service.Checks)) for _, check := range service.Checks { @@ -1181,14 +1079,15 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service, } var err error - ip, port, err = getAddress(addrMode, portLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus) + ip, port, err = serviceregistration.GetAddress( + addrMode, portLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus) if err != nil { return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err) } } checkID := MakeCheckID(serviceID, check) - registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.ConsulNamespace) + registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.Namespace) if err != nil { return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err) } @@ -1205,15 +1104,15 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service, // Checks will always use the IP from the Task struct (host's IP). // // Actual communication with Consul is done asynchronously (see Run). -func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error { +func (c *ServiceClient) RegisterWorkload(workload *serviceregistration.WorkloadServices) error { // Fast path numServices := len(workload.Services) if numServices == 0 { return nil } - t := new(ServiceRegistrations) - t.Services = make(map[string]*ServiceRegistration, numServices) + t := new(serviceregistration.ServiceRegistrations) + t.Services = make(map[string]*serviceregistration.ServiceRegistration, numServices) ops := &operations{} for _, service := range workload.Services { @@ -1221,7 +1120,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error { if err != nil { return err } - t.Services[sreg.serviceID] = sreg + t.Services[sreg.ServiceID] = sreg } // Add the workload to the allocation's registration @@ -1232,7 +1131,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error { // Start watching checks. Done after service registrations are built // since an error building them could leak watches. for _, service := range workload.Services { - serviceID := MakeAllocServiceID(workload.AllocID, workload.Name(), service) + serviceID := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service) for _, check := range service.Checks { if check.TriggersRestarts() { checkID := MakeCheckID(serviceID, check) @@ -1247,19 +1146,19 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error { // changed. // // DriverNetwork must not change between invocations for the same allocation. -func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error { +func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.WorkloadServices) error { ops := new(operations) - regs := new(ServiceRegistrations) - regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services)) + regs := new(serviceregistration.ServiceRegistrations) + regs.Services = make(map[string]*serviceregistration.ServiceRegistration, len(newWorkload.Services)) newIDs := make(map[string]*structs.Service, len(newWorkload.Services)) for _, s := range newWorkload.Services { - newIDs[MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s + newIDs[serviceregistration.MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s } // Loop over existing Services to see if they have been removed for _, existingSvc := range old.Services { - existingID := MakeAllocServiceID(old.AllocID, old.Name(), existingSvc) + existingID := serviceregistration.MakeAllocServiceID(old.AllocID, old.Name(), existingSvc) newSvc, ok := newIDs[existingID] if !ok { @@ -1285,9 +1184,9 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error } // Service still exists so add it to the task's registration - sreg := &ServiceRegistration{ - serviceID: existingID, - checkIDs: make(map[string]struct{}, len(newSvc.Checks)), + sreg := &serviceregistration.ServiceRegistration{ + ServiceID: existingID, + CheckIDs: make(map[string]struct{}, len(newSvc.Checks)), CheckOnUpdate: make(map[string]string, len(newSvc.Checks)), } regs.Services[existingID] = sreg @@ -1305,7 +1204,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error // Check is still required. Remove it from the map so it doesn't get // deleted later. delete(existingChecks, checkID) - sreg.checkIDs[checkID] = struct{}{} + sreg.CheckIDs[checkID] = struct{}{} sreg.CheckOnUpdate[checkID] = check.OnUpdate } @@ -1316,7 +1215,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error } for _, registration := range checkRegs { - sreg.checkIDs[registration.ID] = struct{}{} + sreg.CheckIDs[registration.ID] = struct{}{} sreg.CheckOnUpdate[registration.ID] = check.OnUpdate ops.regChecks = append(ops.regChecks, registration) } @@ -1345,7 +1244,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error return err } - regs.Services[sreg.serviceID] = sreg + regs.Services[sreg.ServiceID] = sreg } // Add the task to the allocation's registration @@ -1370,11 +1269,11 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error // RemoveWorkload from Consul. Removes all service entries and checks. // // Actual communication with Consul is done asynchronously (see Run). -func (c *ServiceClient) RemoveWorkload(workload *WorkloadServices) { +func (c *ServiceClient) RemoveWorkload(workload *serviceregistration.WorkloadServices) { ops := operations{} for _, service := range workload.Services { - id := MakeAllocServiceID(workload.AllocID, workload.Name(), service) + id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service) ops.deregServices = append(ops.deregServices, id) for _, check := range service.Checks { @@ -1406,7 +1305,7 @@ func normalizeNamespace(namespace string) string { // AllocRegistrations returns the registrations for the given allocation. If the // allocation has no registrations, the response is a nil object. -func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, error) { +func (c *ServiceClient) AllocRegistrations(allocID string) (*serviceregistration.AllocRegistration, error) { // Get the internal struct using the lock c.allocRegistrationsLock.RLock() regInternal, ok := c.allocRegistrations[allocID] @@ -1416,7 +1315,7 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, } // Copy so we don't expose internal structs - reg := regInternal.copy() + reg := regInternal.Copy() c.allocRegistrationsLock.RUnlock() // Get the list of all namespaces created so we can iterate them. @@ -1451,7 +1350,7 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, for _, treg := range reg.Tasks { for serviceID, sreg := range treg.Services { sreg.Service = services[serviceID] - for checkID := range sreg.checkIDs { + for checkID := range sreg.CheckIDs { if check, ok := checks[checkID]; ok { sreg.Checks = append(sreg.Checks, check) } @@ -1547,14 +1446,14 @@ func (c *ServiceClient) Shutdown() error { } // addRegistration adds the service registrations for the given allocation. -func (c *ServiceClient) addRegistrations(allocID, taskName string, reg *ServiceRegistrations) { +func (c *ServiceClient) addRegistrations(allocID, taskName string, reg *serviceregistration.ServiceRegistrations) { c.allocRegistrationsLock.Lock() defer c.allocRegistrationsLock.Unlock() alloc, ok := c.allocRegistrations[allocID] if !ok { - alloc = &AllocRegistration{ - Tasks: make(map[string]*ServiceRegistrations), + alloc = &serviceregistration.AllocRegistration{ + Tasks: make(map[string]*serviceregistration.ServiceRegistrations), } c.allocRegistrations[allocID] = alloc } @@ -1592,14 +1491,6 @@ func makeAgentServiceID(role string, service *structs.Service) string { return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "", false)) } -// MakeAllocServiceID creates a unique ID for identifying an alloc service in -// Consul. -// -// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http-http -func MakeAllocServiceID(allocID, taskName string, service *structs.Service) string { - return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel) -} - // MakeCheckID creates a unique ID for a check. // // Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d @@ -1768,127 +1659,3 @@ func getNomadSidecar(id string, services map[string]*api.AgentService) *api.Agen sidecarID := id + sidecarSuffix return services[sidecarID] } - -// getAddress returns the IP and port to use for a service or check. If no port -// label is specified (an empty value), zero values are returned because no -// address could be resolved. -func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet *drivers.DriverNetwork, ports structs.AllocatedPorts, netStatus *structs.AllocNetworkStatus) (string, int, error) { - switch addrMode { - case structs.AddressModeAuto: - if driverNet.Advertise() { - addrMode = structs.AddressModeDriver - } else { - addrMode = structs.AddressModeHost - } - return getAddress(addrMode, portLabel, networks, driverNet, ports, netStatus) - case structs.AddressModeHost: - if portLabel == "" { - if len(networks) != 1 { - // If no networks are specified return zero - // values. Consul will advertise the host IP - // with no port. This is the pre-0.7.1 behavior - // some people rely on. - return "", 0, nil - } - - return networks[0].IP, 0, nil - } - - // Default path: use host ip:port - // Try finding port in the AllocatedPorts struct first - // Check in Networks struct for backwards compatibility if not found - mapping, ok := ports.Get(portLabel) - if !ok { - mapping = networks.Port(portLabel) - if mapping.Value > 0 { - return mapping.HostIP, mapping.Value, nil - } - - // If port isn't a label, try to parse it as a literal port number - port, err := strconv.Atoi(portLabel) - if err != nil { - // Don't include Atoi error message as user likely - // never intended it to be a numeric and it creates a - // confusing error message - return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel) - } - if port <= 0 { - return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel) - } - - // A number was given which will use the Consul agent's address and the given port - // Returning a blank string as an address will use the Consul agent's address - return "", port, nil - } - return mapping.HostIP, mapping.Value, nil - - case structs.AddressModeDriver: - // Require a driver network if driver address mode is used - if driverNet == nil { - return "", 0, fmt.Errorf(`cannot use address_mode="driver": no driver network exists`) - } - - // If no port label is specified just return the IP - if portLabel == "" { - return driverNet.IP, 0, nil - } - - // If the port is a label, use the driver's port (not the host's) - if port, ok := ports.Get(portLabel); ok { - return driverNet.IP, port.To, nil - } - - // Check if old style driver portmap is used - if port, ok := driverNet.PortMap[portLabel]; ok { - return driverNet.IP, port, nil - } - - // If port isn't a label, try to parse it as a literal port number - port, err := strconv.Atoi(portLabel) - if err != nil { - // Don't include Atoi error message as user likely - // never intended it to be a numeric and it creates a - // confusing error message - return "", 0, fmt.Errorf("invalid port label %q: port labels in driver address_mode must be numeric or in the driver's port map", portLabel) - } - if port <= 0 { - return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel) - } - - return driverNet.IP, port, nil - - case structs.AddressModeAlloc: - if netStatus == nil { - return "", 0, fmt.Errorf(`cannot use address_mode="alloc": no allocation network status reported`) - } - - // If no port label is specified just return the IP - if portLabel == "" { - return netStatus.Address, 0, nil - } - - // If port is a label and is found then return it - if port, ok := ports.Get(portLabel); ok { - // Use port.To value unless not set - if port.To > 0 { - return netStatus.Address, port.To, nil - } - return netStatus.Address, port.Value, nil - } - - // Check if port is a literal number - port, err := strconv.Atoi(portLabel) - if err != nil { - // User likely specified wrong port label here - return "", 0, fmt.Errorf("invalid port %q: port label not found or is not numeric", portLabel) - } - if port <= 0 { - return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel) - } - return netStatus.Address, port, nil - - default: - // Shouldn't happen due to validation, but enforce invariants - return "", 0, fmt.Errorf("invalid address mode %q", addrMode) - } -} diff --git a/command/agent/consul/service_client_test.go b/command/agent/consul/service_client_test.go index 9cacaa38d..e1dcc7a91 100644 --- a/command/agent/consul/service_client_test.go +++ b/command/agent/consul/service_client_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" @@ -393,7 +394,7 @@ func TestServiceRegistration_CheckOnUpdate(t *testing.T) { sc := NewServiceClient(mockAgent, namespacesClient, logger, true) allocID := uuid.Generate() - ws := &WorkloadServices{ + ws := &serviceregistration.WorkloadServices{ AllocID: allocID, Task: "taskname", Restarter: &restartRecorder{}, @@ -444,7 +445,7 @@ func TestServiceRegistration_CheckOnUpdate(t *testing.T) { } // Update - wsUpdate := new(WorkloadServices) + wsUpdate := new(serviceregistration.WorkloadServices) *wsUpdate = *ws wsUpdate.Services[0].Checks[0].OnUpdate = structs.OnUpdateRequireHealthy diff --git a/command/agent/consul/structs.go b/command/agent/consul/structs.go index 1163d8c59..d8e9210ce 100644 --- a/command/agent/consul/structs.go +++ b/command/agent/consul/structs.go @@ -1,64 +1,22 @@ package consul import ( - "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" ) -// WorkloadServices describes services defined in either a Task or TaskGroup -// that need to be syncronized with Consul. -type WorkloadServices struct { - AllocID string - - // Name of the task and task group the services are defined for. For - // group based services, Task will be empty. - Task string - Group string - - // Canary indicates whether or not the allocation is a canary. - Canary bool - - // ConsulNamespace is the consul namespace in which services will be registered. - ConsulNamespace string - - // Restarter allows restarting the task or task group depending on the - // check_restart stanzas. - Restarter WorkloadRestarter - - // Services and checks to register for the task. - Services []*structs.Service - - // Networks from the task's resources stanza. - // TODO: remove and use Ports - Networks structs.Networks - - // NetworkStatus from alloc if network namespace is created. - // Can be nil. - NetworkStatus *structs.AllocNetworkStatus - - // AllocatedPorts is the list of port mappings. - Ports structs.AllocatedPorts - - // DriverExec is the script executor for the task's driver. - // For group services this is nil and script execution is managed by - // a tasklet in the taskrunner script_check_hook. - DriverExec interfaces.ScriptExecutor - - // DriverNetwork is the network specified by the driver and may be nil. - DriverNetwork *drivers.DriverNetwork -} - -func BuildAllocServices(node *structs.Node, alloc *structs.Allocation, restarter WorkloadRestarter) *WorkloadServices { +func BuildAllocServices( + node *structs.Node, alloc *structs.Allocation, restarter WorkloadRestarter) *serviceregistration.WorkloadServices { //TODO(schmichael) only support one network for now net := alloc.AllocatedResources.Shared.Networks[0] tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - ws := &WorkloadServices{ + ws := &serviceregistration.WorkloadServices{ AllocID: alloc.ID, Group: alloc.TaskGroup, Services: taskenv.InterpolateServices(taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build(), tg.Services), @@ -82,24 +40,3 @@ func BuildAllocServices(node *structs.Node, alloc *structs.Allocation, restarter return ws } - -// Copy method for easing tests -func (ws *WorkloadServices) Copy() *WorkloadServices { - newTS := new(WorkloadServices) - *newTS = *ws - - // Deep copy Services - newTS.Services = make([]*structs.Service, len(ws.Services)) - for i := range ws.Services { - newTS.Services[i] = ws.Services[i].Copy() - } - return newTS -} - -func (ws *WorkloadServices) Name() string { - if ws.Task != "" { - return ws.Task - } - - return "group-" + ws.Group -} diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index b3f035ad3..3a5d29c11 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -11,12 +11,12 @@ import ( "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" "github.com/kr/pretty" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -26,8 +26,8 @@ const ( yPort = 1235 ) -func testWorkload() *WorkloadServices { - return &WorkloadServices{ +func testWorkload() *serviceregistration.WorkloadServices { + return &serviceregistration.WorkloadServices{ AllocID: uuid.Generate(), Task: "taskname", Restarter: &restartRecorder{}, @@ -65,7 +65,7 @@ func (r *restartRecorder) Restart(ctx context.Context, event *structs.TaskEvent, type testFakeCtx struct { ServiceClient *ServiceClient FakeConsul *MockAgent - Workload *WorkloadServices + Workload *serviceregistration.WorkloadServices } var errNoOps = fmt.Errorf("testing error: no pending operations") @@ -502,8 +502,8 @@ func TestConsul_ChangeChecks(t *testing.T) { t.Fatalf("service ID changed") } - for newID := range sreg.checkIDs { - if _, ok := otherServiceReg.checkIDs[newID]; ok { + for newID := range sreg.CheckIDs { + if _, ok := otherServiceReg.CheckIDs[newID]; ok { t.Fatalf("check IDs should change") } } @@ -1349,361 +1349,6 @@ func TestCreateCheckReg_GRPC(t *testing.T) { require.Equal(t, expected, actual) } -// TestGetAddress asserts Nomad uses the correct ip and port for services and -// checks depending on port labels, driver networks, and address mode. -func TestGetAddress(t *testing.T) { - const HostIP = "127.0.0.1" - - cases := []struct { - Name string - - // Parameters - Mode string - PortLabel string - Host map[string]int // will be converted to structs.Networks - Driver *drivers.DriverNetwork - Ports structs.AllocatedPorts - Status *structs.AllocNetworkStatus - - // Results - ExpectedIP string - ExpectedPort int - ExpectedErr string - }{ - // Valid Configurations - { - Name: "ExampleService", - Mode: structs.AddressModeAuto, - PortLabel: "db", - Host: map[string]int{"db": 12435}, - Driver: &drivers.DriverNetwork{ - PortMap: map[string]int{"db": 6379}, - IP: "10.1.2.3", - }, - ExpectedIP: HostIP, - ExpectedPort: 12435, - }, - { - Name: "Host", - Mode: structs.AddressModeHost, - PortLabel: "db", - Host: map[string]int{"db": 12345}, - Driver: &drivers.DriverNetwork{ - PortMap: map[string]int{"db": 6379}, - IP: "10.1.2.3", - }, - ExpectedIP: HostIP, - ExpectedPort: 12345, - }, - { - Name: "Driver", - Mode: structs.AddressModeDriver, - PortLabel: "db", - Host: map[string]int{"db": 12345}, - Driver: &drivers.DriverNetwork{ - PortMap: map[string]int{"db": 6379}, - IP: "10.1.2.3", - }, - ExpectedIP: "10.1.2.3", - ExpectedPort: 6379, - }, - { - Name: "AutoDriver", - Mode: structs.AddressModeAuto, - PortLabel: "db", - Host: map[string]int{"db": 12345}, - Driver: &drivers.DriverNetwork{ - PortMap: map[string]int{"db": 6379}, - IP: "10.1.2.3", - AutoAdvertise: true, - }, - ExpectedIP: "10.1.2.3", - ExpectedPort: 6379, - }, - { - Name: "DriverCustomPort", - Mode: structs.AddressModeDriver, - PortLabel: "7890", - Host: map[string]int{"db": 12345}, - Driver: &drivers.DriverNetwork{ - PortMap: map[string]int{"db": 6379}, - IP: "10.1.2.3", - }, - ExpectedIP: "10.1.2.3", - ExpectedPort: 7890, - }, - - // Invalid Configurations - { - Name: "DriverWithoutNetwork", - Mode: structs.AddressModeDriver, - PortLabel: "db", - Host: map[string]int{"db": 12345}, - Driver: nil, - ExpectedErr: "no driver network exists", - }, - { - Name: "DriverBadPort", - Mode: structs.AddressModeDriver, - PortLabel: "bad-port-label", - Host: map[string]int{"db": 12345}, - Driver: &drivers.DriverNetwork{ - PortMap: map[string]int{"db": 6379}, - IP: "10.1.2.3", - }, - ExpectedErr: "invalid port", - }, - { - Name: "DriverZeroPort", - Mode: structs.AddressModeDriver, - PortLabel: "0", - Driver: &drivers.DriverNetwork{ - IP: "10.1.2.3", - }, - ExpectedErr: "invalid port", - }, - { - Name: "HostBadPort", - Mode: structs.AddressModeHost, - PortLabel: "bad-port-label", - ExpectedErr: "invalid port", - }, - { - Name: "InvalidMode", - Mode: "invalid-mode", - PortLabel: "80", - ExpectedErr: "invalid address mode", - }, - { - Name: "NoPort_AutoMode", - Mode: structs.AddressModeAuto, - ExpectedIP: HostIP, - }, - { - Name: "NoPort_HostMode", - Mode: structs.AddressModeHost, - ExpectedIP: HostIP, - }, - { - Name: "NoPort_DriverMode", - Mode: structs.AddressModeDriver, - Driver: &drivers.DriverNetwork{ - IP: "10.1.2.3", - }, - ExpectedIP: "10.1.2.3", - }, - - // Scenarios using port 0.12 networking fields (NetworkStatus, AllocatedPortMapping) - { - Name: "ExampleServer_withAllocatedPorts", - Mode: structs.AddressModeAuto, - PortLabel: "db", - Ports: []structs.AllocatedPortMapping{ - { - Label: "db", - Value: 12435, - To: 6379, - HostIP: HostIP, - }, - }, - Status: &structs.AllocNetworkStatus{ - InterfaceName: "eth0", - Address: "172.26.0.1", - }, - ExpectedIP: HostIP, - ExpectedPort: 12435, - }, - { - Name: "Host_withAllocatedPorts", - Mode: structs.AddressModeHost, - PortLabel: "db", - Ports: []structs.AllocatedPortMapping{ - { - Label: "db", - Value: 12345, - To: 6379, - HostIP: HostIP, - }, - }, - Status: &structs.AllocNetworkStatus{ - InterfaceName: "eth0", - Address: "172.26.0.1", - }, - ExpectedIP: HostIP, - ExpectedPort: 12345, - }, - { - Name: "Driver_withAllocatedPorts", - Mode: structs.AddressModeDriver, - PortLabel: "db", - Ports: []structs.AllocatedPortMapping{ - { - Label: "db", - Value: 12345, - To: 6379, - HostIP: HostIP, - }, - }, - Driver: &drivers.DriverNetwork{ - IP: "10.1.2.3", - }, - Status: &structs.AllocNetworkStatus{ - InterfaceName: "eth0", - Address: "172.26.0.1", - }, - ExpectedIP: "10.1.2.3", - ExpectedPort: 6379, - }, - { - Name: "AutoDriver_withAllocatedPorts", - Mode: structs.AddressModeAuto, - PortLabel: "db", - Ports: []structs.AllocatedPortMapping{ - { - Label: "db", - Value: 12345, - To: 6379, - HostIP: HostIP, - }, - }, - Driver: &drivers.DriverNetwork{ - IP: "10.1.2.3", - AutoAdvertise: true, - }, - Status: &structs.AllocNetworkStatus{ - InterfaceName: "eth0", - Address: "172.26.0.1", - }, - ExpectedIP: "10.1.2.3", - ExpectedPort: 6379, - }, - { - Name: "DriverCustomPort_withAllocatedPorts", - Mode: structs.AddressModeDriver, - PortLabel: "7890", - Ports: []structs.AllocatedPortMapping{ - { - Label: "db", - Value: 12345, - To: 6379, - HostIP: HostIP, - }, - }, - Driver: &drivers.DriverNetwork{ - IP: "10.1.2.3", - }, - Status: &structs.AllocNetworkStatus{ - InterfaceName: "eth0", - Address: "172.26.0.1", - }, - ExpectedIP: "10.1.2.3", - ExpectedPort: 7890, - }, - { - Name: "Host_MultiHostInterface", - Mode: structs.AddressModeAuto, - PortLabel: "db", - Ports: []structs.AllocatedPortMapping{ - { - Label: "db", - Value: 12345, - To: 6379, - HostIP: "127.0.0.100", - }, - }, - Status: &structs.AllocNetworkStatus{ - InterfaceName: "eth0", - Address: "172.26.0.1", - }, - ExpectedIP: "127.0.0.100", - ExpectedPort: 12345, - }, - { - Name: "Alloc", - Mode: structs.AddressModeAlloc, - PortLabel: "db", - Ports: []structs.AllocatedPortMapping{ - { - Label: "db", - Value: 12345, - To: 6379, - HostIP: HostIP, - }, - }, - Status: &structs.AllocNetworkStatus{ - InterfaceName: "eth0", - Address: "172.26.0.1", - }, - ExpectedIP: "172.26.0.1", - ExpectedPort: 6379, - }, - { - Name: "Alloc no to value", - Mode: structs.AddressModeAlloc, - PortLabel: "db", - Ports: []structs.AllocatedPortMapping{ - { - Label: "db", - Value: 12345, - HostIP: HostIP, - }, - }, - Status: &structs.AllocNetworkStatus{ - InterfaceName: "eth0", - Address: "172.26.0.1", - }, - ExpectedIP: "172.26.0.1", - ExpectedPort: 12345, - }, - { - Name: "AllocCustomPort", - Mode: structs.AddressModeAlloc, - PortLabel: "6379", - Status: &structs.AllocNetworkStatus{ - InterfaceName: "eth0", - Address: "172.26.0.1", - }, - ExpectedIP: "172.26.0.1", - ExpectedPort: 6379, - }, - } - - for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - // convert host port map into a structs.Networks - networks := []*structs.NetworkResource{ - { - IP: HostIP, - ReservedPorts: make([]structs.Port, len(tc.Host)), - }, - } - - i := 0 - for label, port := range tc.Host { - networks[0].ReservedPorts[i].Label = label - networks[0].ReservedPorts[i].Value = port - i++ - } - - // Run getAddress - ip, port, err := getAddress(tc.Mode, tc.PortLabel, networks, tc.Driver, tc.Ports, tc.Status) - - // Assert the results - assert.Equal(t, tc.ExpectedIP, ip, "IP mismatch") - assert.Equal(t, tc.ExpectedPort, port, "Port mismatch") - if tc.ExpectedErr == "" { - assert.Nil(t, err) - } else { - if err == nil { - t.Fatalf("expected error containing %q but err=nil", tc.ExpectedErr) - } else { - assert.Contains(t, err.Error(), tc.ExpectedErr) - } - } - }) - } -} - func TestConsul_ServiceName_Duplicates(t *testing.T) { t.Parallel() ctx := setupFake(t) @@ -1789,7 +1434,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { }, }, } - remainingWorkloadServiceID := MakeAllocServiceID(remainingWorkload.AllocID, + remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID, remainingWorkload.Name(), remainingWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload)) @@ -1812,7 +1457,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { }, }, } - explicitlyRemovedWorkloadServiceID := MakeAllocServiceID(explicitlyRemovedWorkload.AllocID, + explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID, explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload)) @@ -1837,7 +1482,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { }, }, } - outofbandWorkloadServiceID := MakeAllocServiceID(outofbandWorkload.AllocID, + outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID, outofbandWorkload.Name(), outofbandWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload)) @@ -1898,7 +1543,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { }, }, } - remainingWorkloadServiceID := MakeAllocServiceID(remainingWorkload.AllocID, + remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID, remainingWorkload.Name(), remainingWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload)) @@ -1921,7 +1566,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { }, }, } - explicitlyRemovedWorkloadServiceID := MakeAllocServiceID(explicitlyRemovedWorkload.AllocID, + explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID, explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload)) @@ -1946,7 +1591,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { }, }, } - outofbandWorkloadServiceID := MakeAllocServiceID(outofbandWorkload.AllocID, + outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID, outofbandWorkload.Name(), outofbandWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))