connect: register group services with Consul

Fixes #6042

Add new task group service hook for registering group services like
Connect-enabled services.

Does not yet support checks.
This commit is contained in:
Michael Schurter
2019-08-14 15:02:00 -07:00
parent b8ebfb8c12
commit eeacb87f3b
9 changed files with 542 additions and 6 deletions

View File

@@ -722,6 +722,12 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
copy(tags, service.Tags)
}
// newConnect returns (nil, nil) if there's no Connect-enabled service.
connect, err := newConnect(service.Name, service.Connect, task.Networks)
if err != nil {
return nil, fmt.Errorf("invalid Consul Connect configuration for service %q: %v", service.Name, err)
}
// Build the Consul Service registration request
serviceReg := &api.AgentServiceRegistration{
ID: id,
@@ -733,6 +739,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
Meta: map[string]string{
"external-source": "nomad",
},
Connect: connect, // will be nil if no Connect stanza
}
ops.regServices = append(ops.regServices, serviceReg)
@@ -807,6 +814,117 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
return checkIDs, nil
}
//TODO(schmichael) remove
type noopRestarter struct{}
func (noopRestarter) Restart(context.Context, *structs.TaskEvent, bool) error { return nil }
// makeAllocTaskServices creates a TaskServices struct for a group service.
//
//TODO(schmichael) rename TaskServices and refactor this into a New method
func makeAllocTaskServices(alloc *structs.Allocation, tg *structs.TaskGroup) (*TaskServices, error) {
if n := len(alloc.AllocatedResources.Shared.Networks); n == 0 {
return nil, fmt.Errorf("unable to register a group service without a group network")
}
//TODO(schmichael) only support one network for now
net := alloc.AllocatedResources.Shared.Networks[0]
ts := &TaskServices{
AllocID: alloc.ID,
Name: "group-" + alloc.TaskGroup,
Services: tg.Services,
Networks: alloc.AllocatedResources.Shared.Networks,
//TODO(schmichael) there's probably a better way than hacking driver network
DriverNetwork: &drivers.DriverNetwork{
AutoAdvertise: true,
IP: net.IP,
// Copy PortLabels from group network
PortMap: net.PortLabels(),
},
// unsupported for group services
Restarter: noopRestarter{},
DriverExec: nil,
}
if alloc.DeploymentStatus != nil {
ts.Canary = alloc.DeploymentStatus.Canary
}
return ts, nil
}
// RegisterGroup services with Consul. Adds all task group-level service
// entries and checks to Consul.
func (c *ServiceClient) RegisterGroup(alloc *structs.Allocation) error {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
}
if len(tg.Services) == 0 {
// noop
return nil
}
ts, err := makeAllocTaskServices(alloc, tg)
if err != nil {
return err
}
return c.RegisterTask(ts)
}
// UpdateGroup services with Consul. Updates all task group-level service
// entries and checks to Consul.
func (c *ServiceClient) UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error {
oldTG := oldAlloc.Job.LookupTaskGroup(oldAlloc.TaskGroup)
if oldTG == nil {
return fmt.Errorf("task group %q not in old allocation", oldAlloc.TaskGroup)
}
oldServices, err := makeAllocTaskServices(oldAlloc, oldTG)
if err != nil {
return err
}
newTG := newAlloc.Job.LookupTaskGroup(newAlloc.TaskGroup)
if newTG == nil {
return fmt.Errorf("task group %q not in new allocation", newAlloc.TaskGroup)
}
newServices, err := makeAllocTaskServices(newAlloc, newTG)
if err != nil {
return err
}
return c.UpdateTask(oldServices, newServices)
}
// RemoveGroup services with Consul. Removes all task group-level service
// entries and checks from Consul.
func (c *ServiceClient) RemoveGroup(alloc *structs.Allocation) error {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
}
if len(tg.Services) == 0 {
// noop
return nil
}
ts, err := makeAllocTaskServices(alloc, tg)
if err != nil {
return err
}
c.RemoveTask(ts)
return nil
}
// RegisterTask with Consul. Adds all service entries and checks to Consul. If
// exec is nil and a script check exists an error is returned.
//
@@ -1314,3 +1432,81 @@ func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet
return "", 0, fmt.Errorf("invalid address mode %q", addrMode)
}
}
// newConnect creates a new Consul AgentServiceConnect struct based on a Nomad
// Connect struct. If the nomad Connect struct is nil, nil will be returned to
// disable Connect for this service.
func newConnect(serviceName string, nc *structs.ConsulConnect, networks structs.Networks) (*api.AgentServiceConnect, error) {
if nc == nil {
// No Connect stanza, returning nil is fine
return nil, nil
}
cc := &api.AgentServiceConnect{
Native: nc.Native,
}
if nc.SidecarService == nil {
return cc, nil
}
net, port, err := getConnectPort(serviceName, networks)
if err != nil {
return nil, err
}
// Bind to netns IP(s):port
proxyConfig := map[string]interface{}{}
if nc.SidecarService.Proxy != nil && nc.SidecarService.Proxy.Config != nil {
proxyConfig = nc.SidecarService.Proxy.Config
}
proxyConfig["bind_address"] = "0.0.0.0"
proxyConfig["bind_port"] = port.To
// Advertise host IP:port
cc.SidecarService = &api.AgentServiceRegistration{
Address: net.IP,
Port: port.Value,
// Automatically configure the proxy to bind to all addresses
// within the netns.
Proxy: &api.AgentServiceConnectProxyConfig{
Config: proxyConfig,
},
}
// If no further proxy settings were explicitly configured, exit early
if nc.SidecarService.Proxy == nil {
return cc, nil
}
numUpstreams := len(nc.SidecarService.Proxy.Upstreams)
if numUpstreams == 0 {
return cc, nil
}
upstreams := make([]api.Upstream, numUpstreams)
for i, nu := range nc.SidecarService.Proxy.Upstreams {
upstreams[i].DestinationName = nu.DestinationName
upstreams[i].LocalBindPort = nu.LocalBindPort
}
cc.SidecarService.Proxy.Upstreams = upstreams
return cc, nil
}
// getConnectPort returns the network and port for the Connect proxy sidecar
// defined for this service. An error is returned if the network and port
// cannot be determined.
func getConnectPort(serviceName string, networks structs.Networks) (*structs.NetworkResource, structs.Port, error) {
if n := len(networks); n != 1 {
return nil, structs.Port{}, fmt.Errorf("Connect only supported with exactly 1 network (found %d)", n)
}
port, ok := networks[0].PortForService(serviceName)
if !ok {
return nil, structs.Port{}, fmt.Errorf("No Connect port defined for service %q", serviceName)
}
return networks[0], port, nil
}

View File

@@ -0,0 +1,99 @@
package consul
import (
"io/ioutil"
"testing"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestConsul_Connect(t *testing.T) {
// Create an embedded Consul server
testconsul, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
if err != nil {
t.Fatalf("error starting test consul server: %v", err)
}
defer testconsul.Stop()
consulConfig := consulapi.DefaultConfig()
consulConfig.Address = testconsul.HTTPAddr
consulClient, err := consulapi.NewClient(consulConfig)
require.NoError(t, err)
serviceClient := NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true)
go serviceClient.Run()
alloc := mock.Alloc()
alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{
{
Mode: "bridge",
IP: "10.0.0.1",
DynamicPorts: []structs.Port{
{
Label: "connect-proxy-testconnect",
Value: 9999,
To: 9998,
},
},
},
}
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
tg.Services = []*structs.Service{
{
Name: "testconnect",
PortLabel: "9999",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
},
}
require.NoError(t, serviceClient.RegisterGroup(alloc))
require.Eventually(t, func() bool {
services, err := consulClient.Agent().Services()
require.NoError(t, err)
return len(services) == 2
}, 3*time.Second, 100*time.Millisecond)
services, err := consulClient.Agent().Services()
require.NoError(t, err)
require.Len(t, services, 2)
serviceID := makeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0], false)
connectID := serviceID + "-sidecar-proxy"
require.Contains(t, services, serviceID)
agentService := services[serviceID]
require.Equal(t, agentService.Service, "testconnect")
require.Equal(t, agentService.Address, "10.0.0.1")
require.Equal(t, agentService.Port, 9999)
require.Nil(t, agentService.Connect)
require.Nil(t, agentService.Proxy)
require.Contains(t, services, connectID)
connectService := services[connectID]
require.Equal(t, connectService.Service, "testconnect-sidecar-proxy")
require.Equal(t, connectService.Address, "10.0.0.1")
require.Equal(t, connectService.Port, 9999)
require.Nil(t, connectService.Connect)
require.Equal(t, connectService.Proxy.DestinationServiceName, "testconnect")
require.Equal(t, connectService.Proxy.DestinationServiceID, serviceID)
require.Equal(t, connectService.Proxy.LocalServiceAddress, "127.0.0.1")
require.Equal(t, connectService.Proxy.LocalServicePort, 9999)
require.Equal(t, connectService.Proxy.Config, map[string]interface{}{
"bind_address": "0.0.0.0",
"bind_port": float64(9998),
})
}