diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 2517ee3ae..db8064ed7 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -121,6 +121,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir), newAllocHealthWatcherHook(hookLogger, ar.Alloc(), hs, ar.Listener(), ar.consulClient), newNetworkHook(hookLogger, ns, ar.Alloc(), nm, nc), + newGroupServiceHook(hookLogger, ar.Alloc(), ar.consulClient), } return nil diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index a438266a4..981d68ed9 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -117,11 +117,13 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) { // 2 removals (canary+noncanary) during prekill // 2 removals (canary+noncanary) during exited // 2 removals (canary+noncanary) during stop + // 1 remove group during stop consulOps := conf2.Consul.(*consul.MockConsulServiceClient).GetOps() - require.Len(t, consulOps, 6) - for _, op := range consulOps { + require.Len(t, consulOps, 7) + for _, op := range consulOps[:6] { require.Equal(t, "remove", op.Op) } + require.Equal(t, "remove_group", consulOps[6].Op) // Assert terminated task event was emitted events := ar2.AllocState().TaskStates[task.Name].Events diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go new file mode 100644 index 000000000..b9c65b0bf --- /dev/null +++ b/client/allocrunner/groupservice_hook.go @@ -0,0 +1,66 @@ +package allocrunner + +import ( + "sync" + + hclog "github.com/hashicorp/go-hclog" + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/nomad/structs" +) + +// groupServiceHook manages task group Consul service registration and +// deregistration. +type groupServiceHook struct { + alloc *structs.Allocation + consulClient consul.ConsulServiceAPI + prerun bool + mu sync.Mutex + + logger log.Logger +} + +func newGroupServiceHook(logger hclog.Logger, alloc *structs.Allocation, consulClient consul.ConsulServiceAPI) *groupServiceHook { + h := &groupServiceHook{ + alloc: alloc, + consulClient: consulClient, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (*groupServiceHook) Name() string { + return "group_services" +} + +func (h *groupServiceHook) Prerun() error { + h.mu.Lock() + defer func() { + // Mark prerun as true to unblock Updates + h.prerun = true + h.mu.Unlock() + }() + return h.consulClient.RegisterGroup(h.alloc) +} + +func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { + h.mu.Lock() + defer h.mu.Unlock() + oldAlloc := h.alloc + h.alloc = req.Alloc + + if !h.prerun { + // Update called before Prerun. Update alloc and exit to allow + // Prerun to do initial registration. + return nil + } + + return h.consulClient.UpdateGroup(oldAlloc, h.alloc) +} + +func (h *groupServiceHook) Postrun() error { + h.mu.Lock() + defer h.mu.Unlock() + return h.consulClient.RemoveGroup(h.alloc) +} diff --git a/client/allocrunner/groupservice_hook_test.go b/client/allocrunner/groupservice_hook_test.go new file mode 100644 index 000000000..3aa540e1d --- /dev/null +++ b/client/allocrunner/groupservice_hook_test.go @@ -0,0 +1,119 @@ +package allocrunner + +import ( + "testing" + + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/consul" + agentconsul "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil) +var _ interfaces.RunnerUpdateHook = (*groupServiceHook)(nil) +var _ interfaces.RunnerPostrunHook = (*groupServiceHook)(nil) + +// TestGroupServiceHook_NoGroupServices asserts calling group service hooks +// without group services does not error. +func TestGroupServiceHook_NoGroupServices(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + logger := testlog.HCLogger(t) + consulClient := consul.NewMockConsulServiceClient(t, logger) + + h := newGroupServiceHook(logger, alloc, consulClient) + require.NoError(t, h.Prerun()) + + req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + require.NoError(t, h.Update(req)) + + require.NoError(t, h.Postrun()) + + ops := consulClient.GetOps() + require.Len(t, ops, 3) + require.Equal(t, "add_group", ops[0].Op) + require.Equal(t, "update_group", ops[1].Op) + require.Equal(t, "remove_group", ops[2].Op) +} + +// TestGroupServiceHook_GroupServices asserts group service hooks with group +// services does not error. +func TestGroupServiceHook_GroupServices(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{ + { + Mode: "bridge", + IP: "10.0.0.1", + DynamicPorts: []structs.Port{ + { + Label: "connect-proxy-testconnect", + Value: 9999, + To: 9998, + }, + }, + }, + } + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + tg.Services = []*structs.Service{ + { + Name: "testconnect", + PortLabel: "9999", + Connect: &structs.ConsulConnect{ + SidecarService: &structs.ConsulSidecarService{}, + }, + }, + } + logger := testlog.HCLogger(t) + consulClient := consul.NewMockConsulServiceClient(t, logger) + + h := newGroupServiceHook(logger, alloc, consulClient) + require.NoError(t, h.Prerun()) + + req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + require.NoError(t, h.Update(req)) + + require.NoError(t, h.Postrun()) + + ops := consulClient.GetOps() + require.Len(t, ops, 3) + require.Equal(t, "add_group", ops[0].Op) + require.Equal(t, "update_group", ops[1].Op) + require.Equal(t, "remove_group", ops[2].Op) +} + +// TestGroupServiceHook_Error asserts group service hooks with group +// services but no group network returns an error. +func TestGroupServiceHook_Error(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + tg.Services = []*structs.Service{ + { + Name: "testconnect", + PortLabel: "9999", + Connect: &structs.ConsulConnect{ + SidecarService: &structs.ConsulSidecarService{}, + }, + }, + } + logger := testlog.HCLogger(t) + + // No need to set Consul client or call Run. This hould fail before + // attempting to register. + consulClient := agentconsul.NewServiceClient(nil, logger, false) + + h := newGroupServiceHook(logger, alloc, consulClient) + require.Error(t, h.Prerun()) + + req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + require.Error(t, h.Update(req)) + + require.Error(t, h.Postrun()) +} diff --git a/client/consul/consul.go b/client/consul/consul.go index 69f2f6e67..a789c4c34 100644 --- a/client/consul/consul.go +++ b/client/consul/consul.go @@ -2,11 +2,15 @@ package consul import ( "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/nomad/structs" ) // ConsulServiceAPI is the interface the Nomad Client uses to register and // remove services and checks from Consul. type ConsulServiceAPI interface { + RegisterGroup(*structs.Allocation) error + RemoveGroup(*structs.Allocation) error + UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error RegisterTask(*consul.TaskServices) error RemoveTask(*consul.TaskServices) UpdateTask(old, newTask *consul.TaskServices) error diff --git a/client/consul/consul_testing.go b/client/consul/consul_testing.go index cdadc37c0..e38cfc6c1 100644 --- a/client/consul/consul_testing.go +++ b/client/consul/consul_testing.go @@ -7,6 +7,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/nomad/structs" testing "github.com/mitchellh/go-testing-interface" ) @@ -14,17 +15,20 @@ import ( type MockConsulOp struct { Op string // add, remove, or update AllocID string - Task string + Name string // task or group name } -func NewMockConsulOp(op, allocID, task string) MockConsulOp { - if op != "add" && op != "remove" && op != "update" && op != "alloc_registrations" { +func NewMockConsulOp(op, allocID, name string) MockConsulOp { + switch op { + case "add", "remove", "update", "alloc_registrations", + "add_group", "remove_group", "update_group": + default: panic(fmt.Errorf("invalid consul op: %s", op)) } return MockConsulOp{ Op: op, AllocID: allocID, - Task: task, + Name: name, } } @@ -50,6 +54,33 @@ func NewMockConsulServiceClient(t testing.T, logger log.Logger) *MockConsulServi return &m } +func (m *MockConsulServiceClient) RegisterGroup(alloc *structs.Allocation) error { + m.mu.Lock() + defer m.mu.Unlock() + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + m.logger.Trace("RegisterGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services)) + m.ops = append(m.ops, NewMockConsulOp("add_group", alloc.ID, alloc.TaskGroup)) + return nil +} + +func (m *MockConsulServiceClient) UpdateGroup(_, alloc *structs.Allocation) error { + m.mu.Lock() + defer m.mu.Unlock() + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + m.logger.Trace("UpdateGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services)) + m.ops = append(m.ops, NewMockConsulOp("update_group", alloc.ID, alloc.TaskGroup)) + return nil +} + +func (m *MockConsulServiceClient) RemoveGroup(alloc *structs.Allocation) error { + m.mu.Lock() + defer m.mu.Unlock() + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + m.logger.Trace("RemoveGroup", "alloc_id", alloc.ID, "num_services", len(tg.Services)) + m.ops = append(m.ops, NewMockConsulOp("remove_group", alloc.ID, alloc.TaskGroup)) + return nil +} + func (m *MockConsulServiceClient) UpdateTask(old, newSvcs *consul.TaskServices) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 9b85fe5f2..a28d6340c 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -722,6 +722,12 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t copy(tags, service.Tags) } + // newConnect returns (nil, nil) if there's no Connect-enabled service. + connect, err := newConnect(service.Name, service.Connect, task.Networks) + if err != nil { + return nil, fmt.Errorf("invalid Consul Connect configuration for service %q: %v", service.Name, err) + } + // Build the Consul Service registration request serviceReg := &api.AgentServiceRegistration{ ID: id, @@ -733,6 +739,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t Meta: map[string]string{ "external-source": "nomad", }, + Connect: connect, // will be nil if no Connect stanza } ops.regServices = append(ops.regServices, serviceReg) @@ -807,6 +814,117 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st return checkIDs, nil } +//TODO(schmichael) remove +type noopRestarter struct{} + +func (noopRestarter) Restart(context.Context, *structs.TaskEvent, bool) error { return nil } + +// makeAllocTaskServices creates a TaskServices struct for a group service. +// +//TODO(schmichael) rename TaskServices and refactor this into a New method +func makeAllocTaskServices(alloc *structs.Allocation, tg *structs.TaskGroup) (*TaskServices, error) { + if n := len(alloc.AllocatedResources.Shared.Networks); n == 0 { + return nil, fmt.Errorf("unable to register a group service without a group network") + } + + //TODO(schmichael) only support one network for now + net := alloc.AllocatedResources.Shared.Networks[0] + + ts := &TaskServices{ + AllocID: alloc.ID, + Name: "group-" + alloc.TaskGroup, + Services: tg.Services, + Networks: alloc.AllocatedResources.Shared.Networks, + + //TODO(schmichael) there's probably a better way than hacking driver network + DriverNetwork: &drivers.DriverNetwork{ + AutoAdvertise: true, + IP: net.IP, + // Copy PortLabels from group network + PortMap: net.PortLabels(), + }, + + // unsupported for group services + Restarter: noopRestarter{}, + DriverExec: nil, + } + + if alloc.DeploymentStatus != nil { + ts.Canary = alloc.DeploymentStatus.Canary + } + + return ts, nil +} + +// RegisterGroup services with Consul. Adds all task group-level service +// entries and checks to Consul. +func (c *ServiceClient) RegisterGroup(alloc *structs.Allocation) error { + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg == nil { + return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup) + } + + if len(tg.Services) == 0 { + // noop + return nil + } + + ts, err := makeAllocTaskServices(alloc, tg) + if err != nil { + return err + } + + return c.RegisterTask(ts) +} + +// UpdateGroup services with Consul. Updates all task group-level service +// entries and checks to Consul. +func (c *ServiceClient) UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error { + oldTG := oldAlloc.Job.LookupTaskGroup(oldAlloc.TaskGroup) + if oldTG == nil { + return fmt.Errorf("task group %q not in old allocation", oldAlloc.TaskGroup) + } + + oldServices, err := makeAllocTaskServices(oldAlloc, oldTG) + if err != nil { + return err + } + + newTG := newAlloc.Job.LookupTaskGroup(newAlloc.TaskGroup) + if newTG == nil { + return fmt.Errorf("task group %q not in new allocation", newAlloc.TaskGroup) + } + + newServices, err := makeAllocTaskServices(newAlloc, newTG) + if err != nil { + return err + } + + return c.UpdateTask(oldServices, newServices) +} + +// RemoveGroup services with Consul. Removes all task group-level service +// entries and checks from Consul. +func (c *ServiceClient) RemoveGroup(alloc *structs.Allocation) error { + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg == nil { + return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup) + } + + if len(tg.Services) == 0 { + // noop + return nil + } + ts, err := makeAllocTaskServices(alloc, tg) + if err != nil { + return err + } + + c.RemoveTask(ts) + + return nil +} + // RegisterTask with Consul. Adds all service entries and checks to Consul. If // exec is nil and a script check exists an error is returned. // @@ -1314,3 +1432,81 @@ func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet return "", 0, fmt.Errorf("invalid address mode %q", addrMode) } } + +// newConnect creates a new Consul AgentServiceConnect struct based on a Nomad +// Connect struct. If the nomad Connect struct is nil, nil will be returned to +// disable Connect for this service. +func newConnect(serviceName string, nc *structs.ConsulConnect, networks structs.Networks) (*api.AgentServiceConnect, error) { + if nc == nil { + // No Connect stanza, returning nil is fine + return nil, nil + } + + cc := &api.AgentServiceConnect{ + Native: nc.Native, + } + + if nc.SidecarService == nil { + return cc, nil + } + + net, port, err := getConnectPort(serviceName, networks) + if err != nil { + return nil, err + } + + // Bind to netns IP(s):port + proxyConfig := map[string]interface{}{} + if nc.SidecarService.Proxy != nil && nc.SidecarService.Proxy.Config != nil { + proxyConfig = nc.SidecarService.Proxy.Config + } + proxyConfig["bind_address"] = "0.0.0.0" + proxyConfig["bind_port"] = port.To + + // Advertise host IP:port + cc.SidecarService = &api.AgentServiceRegistration{ + Address: net.IP, + Port: port.Value, + + // Automatically configure the proxy to bind to all addresses + // within the netns. + Proxy: &api.AgentServiceConnectProxyConfig{ + Config: proxyConfig, + }, + } + + // If no further proxy settings were explicitly configured, exit early + if nc.SidecarService.Proxy == nil { + return cc, nil + } + + numUpstreams := len(nc.SidecarService.Proxy.Upstreams) + if numUpstreams == 0 { + return cc, nil + } + + upstreams := make([]api.Upstream, numUpstreams) + for i, nu := range nc.SidecarService.Proxy.Upstreams { + upstreams[i].DestinationName = nu.DestinationName + upstreams[i].LocalBindPort = nu.LocalBindPort + } + cc.SidecarService.Proxy.Upstreams = upstreams + + return cc, nil +} + +// getConnectPort returns the network and port for the Connect proxy sidecar +// defined for this service. An error is returned if the network and port +// cannot be determined. +func getConnectPort(serviceName string, networks structs.Networks) (*structs.NetworkResource, structs.Port, error) { + if n := len(networks); n != 1 { + return nil, structs.Port{}, fmt.Errorf("Connect only supported with exactly 1 network (found %d)", n) + } + + port, ok := networks[0].PortForService(serviceName) + if !ok { + return nil, structs.Port{}, fmt.Errorf("No Connect port defined for service %q", serviceName) + } + + return networks[0], port, nil +} diff --git a/command/agent/consul/group_test.go b/command/agent/consul/group_test.go new file mode 100644 index 000000000..e38331228 --- /dev/null +++ b/command/agent/consul/group_test.go @@ -0,0 +1,99 @@ +package consul + +import ( + "io/ioutil" + "testing" + "time" + + consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestConsul_Connect(t *testing.T) { + // Create an embedded Consul server + testconsul, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) { + // If -v wasn't specified squelch consul logging + if !testing.Verbose() { + c.Stdout = ioutil.Discard + c.Stderr = ioutil.Discard + } + }) + if err != nil { + t.Fatalf("error starting test consul server: %v", err) + } + defer testconsul.Stop() + + consulConfig := consulapi.DefaultConfig() + consulConfig.Address = testconsul.HTTPAddr + consulClient, err := consulapi.NewClient(consulConfig) + require.NoError(t, err) + serviceClient := NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true) + go serviceClient.Run() + + alloc := mock.Alloc() + alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{ + { + Mode: "bridge", + IP: "10.0.0.1", + DynamicPorts: []structs.Port{ + { + Label: "connect-proxy-testconnect", + Value: 9999, + To: 9998, + }, + }, + }, + } + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + tg.Services = []*structs.Service{ + { + Name: "testconnect", + PortLabel: "9999", + Connect: &structs.ConsulConnect{ + SidecarService: &structs.ConsulSidecarService{}, + }, + }, + } + + require.NoError(t, serviceClient.RegisterGroup(alloc)) + + require.Eventually(t, func() bool { + services, err := consulClient.Agent().Services() + require.NoError(t, err) + return len(services) == 2 + }, 3*time.Second, 100*time.Millisecond) + + services, err := consulClient.Agent().Services() + require.NoError(t, err) + require.Len(t, services, 2) + + serviceID := makeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0], false) + connectID := serviceID + "-sidecar-proxy" + + require.Contains(t, services, serviceID) + agentService := services[serviceID] + require.Equal(t, agentService.Service, "testconnect") + require.Equal(t, agentService.Address, "10.0.0.1") + require.Equal(t, agentService.Port, 9999) + require.Nil(t, agentService.Connect) + require.Nil(t, agentService.Proxy) + + require.Contains(t, services, connectID) + connectService := services[connectID] + require.Equal(t, connectService.Service, "testconnect-sidecar-proxy") + require.Equal(t, connectService.Address, "10.0.0.1") + require.Equal(t, connectService.Port, 9999) + require.Nil(t, connectService.Connect) + require.Equal(t, connectService.Proxy.DestinationServiceName, "testconnect") + require.Equal(t, connectService.Proxy.DestinationServiceID, serviceID) + require.Equal(t, connectService.Proxy.LocalServiceAddress, "127.0.0.1") + require.Equal(t, connectService.Proxy.LocalServicePort, 9999) + require.Equal(t, connectService.Proxy.Config, map[string]interface{}{ + "bind_address": "0.0.0.0", + "bind_port": float64(9998), + }) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9792f4a74..29f39fff3 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2151,6 +2151,24 @@ func (n *NetworkResource) PortLabels() map[string]int { return labelValues } +// ConnectPort returns the Connect port for the given service. Returns false if +// no port was found for a service with that name. +func (n *NetworkResource) PortForService(serviceName string) (Port, bool) { + label := fmt.Sprintf("%s-%s", ConnectProxyPrefix, serviceName) + for _, port := range n.ReservedPorts { + if port.Label == label { + return port, true + } + } + for _, port := range n.DynamicPorts { + if port.Label == label { + return port, true + } + } + + return Port{}, false +} + // Networks defined for a task on the Resources struct. type Networks []*NetworkResource