client: refactor common service registration objects from Consul.

This commit performs refactoring to pull out common service
registration objects into a new `client/serviceregistration`
package. This new package will form the base point for all
client specific service registration functionality.

The Consul specific implementation is not moved as it also
includes non-service registration implementations; this reduces
the blast radius of the changes as well.
This commit is contained in:
James Rasell
2022-03-15 09:38:30 +01:00
parent 066747ce79
commit 6e8f32a290
38 changed files with 1245 additions and 987 deletions

View File

@@ -7,6 +7,7 @@ import (
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@@ -92,7 +93,7 @@ func TestConsul_Connect(t *testing.T) {
require.NoError(t, err)
require.Len(t, services, 2)
serviceID := MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0])
serviceID := serviceregistration.MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0])
connectID := serviceID + "-sidecar-proxy"
require.Contains(t, services, serviceID)

View File

@@ -14,14 +14,13 @@ import (
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper/envoy"
"github.com/pkg/errors"
"github.com/hashicorp/consul/api"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/envoy"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/pkg/errors"
)
const (
@@ -370,109 +369,6 @@ func (o operations) String() string {
return fmt.Sprintf("<%d, %d, %d, %d>", len(o.regServices), len(o.regChecks), len(o.deregServices), len(o.deregChecks))
}
// AllocRegistration holds the status of services registered for a particular
// allocations by task.
type AllocRegistration struct {
// Tasks maps the name of a task to its registered services and checks
Tasks map[string]*ServiceRegistrations
}
func (a *AllocRegistration) copy() *AllocRegistration {
c := &AllocRegistration{
Tasks: make(map[string]*ServiceRegistrations, len(a.Tasks)),
}
for k, v := range a.Tasks {
c.Tasks[k] = v.copy()
}
return c
}
// NumServices returns the number of registered services
func (a *AllocRegistration) NumServices() int {
if a == nil {
return 0
}
total := 0
for _, treg := range a.Tasks {
for _, sreg := range treg.Services {
if sreg.Service != nil {
total++
}
}
}
return total
}
// NumChecks returns the number of registered checks
func (a *AllocRegistration) NumChecks() int {
if a == nil {
return 0
}
total := 0
for _, treg := range a.Tasks {
for _, sreg := range treg.Services {
total += len(sreg.Checks)
}
}
return total
}
// ServiceRegistrations holds the status of services registered for a particular
// task or task group.
type ServiceRegistrations struct {
Services map[string]*ServiceRegistration
}
func (t *ServiceRegistrations) copy() *ServiceRegistrations {
c := &ServiceRegistrations{
Services: make(map[string]*ServiceRegistration, len(t.Services)),
}
for k, v := range t.Services {
c.Services[k] = v.copy()
}
return c
}
// ServiceRegistration holds the status of a registered Consul Service and its
// Checks.
type ServiceRegistration struct {
// serviceID and checkIDs are internal fields that track just the IDs of the
// services/checks registered in Consul. It is used to materialize the other
// fields when queried.
serviceID string
checkIDs map[string]struct{}
// CheckOnUpdate is a map of checkIDs and the associated OnUpdate value
// from the ServiceCheck It is used to determine how a reported checks
// status should be evaluated.
CheckOnUpdate map[string]string
// Service is the AgentService registered in Consul.
Service *api.AgentService
// Checks is the status of the registered checks.
Checks []*api.AgentCheck
}
func (s *ServiceRegistration) copy() *ServiceRegistration {
// Copy does not copy the external fields but only the internal fields. This
// is so that the caller of AllocRegistrations can not access the internal
// fields and that method uses these fields to populate the external fields.
return &ServiceRegistration{
serviceID: s.serviceID,
checkIDs: helper.CopyMapStringStruct(s.checkIDs),
CheckOnUpdate: helper.CopyMapStringString(s.CheckOnUpdate),
}
}
// ServiceClient handles task and agent service registration with Consul.
type ServiceClient struct {
agentAPI AgentAPI
@@ -503,7 +399,7 @@ type ServiceClient struct {
// allocRegistrations stores the services and checks that are registered
// with Consul by allocation ID.
allocRegistrations map[string]*AllocRegistration
allocRegistrations map[string]*serviceregistration.AllocRegistration
allocRegistrationsLock sync.RWMutex
// Nomad agent services and checks that are recorded so they can be removed
@@ -550,7 +446,7 @@ func NewServiceClient(agentAPI AgentAPI, namespacesClient *NamespacesClient, log
checks: make(map[string]*api.AgentCheckRegistration),
explicitlyDeregisteredServices: make(map[string]bool),
explicitlyDeregisteredChecks: make(map[string]bool),
allocRegistrations: make(map[string]*AllocRegistration),
allocRegistrations: make(map[string]*serviceregistration.AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, agentAPI, namespacesClient),
@@ -1033,14 +929,15 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
// serviceRegs creates service registrations, check registrations, and script
// checks from a service. It returns a service registration object with the
// service and check IDs populated.
func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, workload *WorkloadServices) (
*ServiceRegistration, error) {
func (c *ServiceClient) serviceRegs(
ops *operations, service *structs.Service, workload *serviceregistration.WorkloadServices) (
*serviceregistration.ServiceRegistration, error) {
// Get the services ID
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
sreg := &ServiceRegistration{
serviceID: id,
checkIDs: make(map[string]struct{}, len(service.Checks)),
id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service)
sreg := &serviceregistration.ServiceRegistration{
ServiceID: id,
CheckIDs: make(map[string]struct{}, len(service.Checks)),
CheckOnUpdate: make(map[string]string, len(service.Checks)),
}
@@ -1051,7 +948,8 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
}
// Determine the address to advertise based on the mode
ip, port, err := getAddress(addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
ip, port, err := serviceregistration.GetAddress(
addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
if err != nil {
return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err)
}
@@ -1135,7 +1033,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
Kind: kind,
ID: id,
Name: service.Name,
Namespace: workload.ConsulNamespace,
Namespace: workload.Namespace,
Tags: tags,
EnableTagOverride: service.EnableTagOverride,
Address: ip,
@@ -1152,7 +1050,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
return nil, err
}
for _, registration := range checkRegs {
sreg.checkIDs[registration.ID] = struct{}{}
sreg.CheckIDs[registration.ID] = struct{}{}
ops.regChecks = append(ops.regChecks, registration)
}
@@ -1161,7 +1059,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
// checkRegs creates check registrations for the given service
func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
workload *WorkloadServices, sreg *ServiceRegistration) ([]*api.AgentCheckRegistration, error) {
workload *serviceregistration.WorkloadServices, sreg *serviceregistration.ServiceRegistration) ([]*api.AgentCheckRegistration, error) {
registrations := make([]*api.AgentCheckRegistration, 0, len(service.Checks))
for _, check := range service.Checks {
@@ -1181,14 +1079,15 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
}
var err error
ip, port, err = getAddress(addrMode, portLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
ip, port, err = serviceregistration.GetAddress(
addrMode, portLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
if err != nil {
return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err)
}
}
checkID := MakeCheckID(serviceID, check)
registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.ConsulNamespace)
registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.Namespace)
if err != nil {
return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
@@ -1205,15 +1104,15 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
// Checks will always use the IP from the Task struct (host's IP).
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
func (c *ServiceClient) RegisterWorkload(workload *serviceregistration.WorkloadServices) error {
// Fast path
numServices := len(workload.Services)
if numServices == 0 {
return nil
}
t := new(ServiceRegistrations)
t.Services = make(map[string]*ServiceRegistration, numServices)
t := new(serviceregistration.ServiceRegistrations)
t.Services = make(map[string]*serviceregistration.ServiceRegistration, numServices)
ops := &operations{}
for _, service := range workload.Services {
@@ -1221,7 +1120,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
if err != nil {
return err
}
t.Services[sreg.serviceID] = sreg
t.Services[sreg.ServiceID] = sreg
}
// Add the workload to the allocation's registration
@@ -1232,7 +1131,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range workload.Services {
serviceID := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
serviceID := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := MakeCheckID(serviceID, check)
@@ -1247,19 +1146,19 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
// changed.
//
// DriverNetwork must not change between invocations for the same allocation.
func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error {
func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.WorkloadServices) error {
ops := new(operations)
regs := new(ServiceRegistrations)
regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services))
regs := new(serviceregistration.ServiceRegistrations)
regs.Services = make(map[string]*serviceregistration.ServiceRegistration, len(newWorkload.Services))
newIDs := make(map[string]*structs.Service, len(newWorkload.Services))
for _, s := range newWorkload.Services {
newIDs[MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s
newIDs[serviceregistration.MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s
}
// Loop over existing Services to see if they have been removed
for _, existingSvc := range old.Services {
existingID := MakeAllocServiceID(old.AllocID, old.Name(), existingSvc)
existingID := serviceregistration.MakeAllocServiceID(old.AllocID, old.Name(), existingSvc)
newSvc, ok := newIDs[existingID]
if !ok {
@@ -1285,9 +1184,9 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
}
// Service still exists so add it to the task's registration
sreg := &ServiceRegistration{
serviceID: existingID,
checkIDs: make(map[string]struct{}, len(newSvc.Checks)),
sreg := &serviceregistration.ServiceRegistration{
ServiceID: existingID,
CheckIDs: make(map[string]struct{}, len(newSvc.Checks)),
CheckOnUpdate: make(map[string]string, len(newSvc.Checks)),
}
regs.Services[existingID] = sreg
@@ -1305,7 +1204,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
// Check is still required. Remove it from the map so it doesn't get
// deleted later.
delete(existingChecks, checkID)
sreg.checkIDs[checkID] = struct{}{}
sreg.CheckIDs[checkID] = struct{}{}
sreg.CheckOnUpdate[checkID] = check.OnUpdate
}
@@ -1316,7 +1215,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
}
for _, registration := range checkRegs {
sreg.checkIDs[registration.ID] = struct{}{}
sreg.CheckIDs[registration.ID] = struct{}{}
sreg.CheckOnUpdate[registration.ID] = check.OnUpdate
ops.regChecks = append(ops.regChecks, registration)
}
@@ -1345,7 +1244,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
return err
}
regs.Services[sreg.serviceID] = sreg
regs.Services[sreg.ServiceID] = sreg
}
// Add the task to the allocation's registration
@@ -1370,11 +1269,11 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
// RemoveWorkload from Consul. Removes all service entries and checks.
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RemoveWorkload(workload *WorkloadServices) {
func (c *ServiceClient) RemoveWorkload(workload *serviceregistration.WorkloadServices) {
ops := operations{}
for _, service := range workload.Services {
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service)
ops.deregServices = append(ops.deregServices, id)
for _, check := range service.Checks {
@@ -1406,7 +1305,7 @@ func normalizeNamespace(namespace string) string {
// AllocRegistrations returns the registrations for the given allocation. If the
// allocation has no registrations, the response is a nil object.
func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, error) {
func (c *ServiceClient) AllocRegistrations(allocID string) (*serviceregistration.AllocRegistration, error) {
// Get the internal struct using the lock
c.allocRegistrationsLock.RLock()
regInternal, ok := c.allocRegistrations[allocID]
@@ -1416,7 +1315,7 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration,
}
// Copy so we don't expose internal structs
reg := regInternal.copy()
reg := regInternal.Copy()
c.allocRegistrationsLock.RUnlock()
// Get the list of all namespaces created so we can iterate them.
@@ -1451,7 +1350,7 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration,
for _, treg := range reg.Tasks {
for serviceID, sreg := range treg.Services {
sreg.Service = services[serviceID]
for checkID := range sreg.checkIDs {
for checkID := range sreg.CheckIDs {
if check, ok := checks[checkID]; ok {
sreg.Checks = append(sreg.Checks, check)
}
@@ -1547,14 +1446,14 @@ func (c *ServiceClient) Shutdown() error {
}
// addRegistration adds the service registrations for the given allocation.
func (c *ServiceClient) addRegistrations(allocID, taskName string, reg *ServiceRegistrations) {
func (c *ServiceClient) addRegistrations(allocID, taskName string, reg *serviceregistration.ServiceRegistrations) {
c.allocRegistrationsLock.Lock()
defer c.allocRegistrationsLock.Unlock()
alloc, ok := c.allocRegistrations[allocID]
if !ok {
alloc = &AllocRegistration{
Tasks: make(map[string]*ServiceRegistrations),
alloc = &serviceregistration.AllocRegistration{
Tasks: make(map[string]*serviceregistration.ServiceRegistrations),
}
c.allocRegistrations[allocID] = alloc
}
@@ -1592,14 +1491,6 @@ func makeAgentServiceID(role string, service *structs.Service) string {
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "", false))
}
// MakeAllocServiceID creates a unique ID for identifying an alloc service in
// Consul.
//
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http-http
func MakeAllocServiceID(allocID, taskName string, service *structs.Service) string {
return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel)
}
// MakeCheckID creates a unique ID for a check.
//
// Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d
@@ -1768,127 +1659,3 @@ func getNomadSidecar(id string, services map[string]*api.AgentService) *api.Agen
sidecarID := id + sidecarSuffix
return services[sidecarID]
}
// getAddress returns the IP and port to use for a service or check. If no port
// label is specified (an empty value), zero values are returned because no
// address could be resolved.
func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet *drivers.DriverNetwork, ports structs.AllocatedPorts, netStatus *structs.AllocNetworkStatus) (string, int, error) {
switch addrMode {
case structs.AddressModeAuto:
if driverNet.Advertise() {
addrMode = structs.AddressModeDriver
} else {
addrMode = structs.AddressModeHost
}
return getAddress(addrMode, portLabel, networks, driverNet, ports, netStatus)
case structs.AddressModeHost:
if portLabel == "" {
if len(networks) != 1 {
// If no networks are specified return zero
// values. Consul will advertise the host IP
// with no port. This is the pre-0.7.1 behavior
// some people rely on.
return "", 0, nil
}
return networks[0].IP, 0, nil
}
// Default path: use host ip:port
// Try finding port in the AllocatedPorts struct first
// Check in Networks struct for backwards compatibility if not found
mapping, ok := ports.Get(portLabel)
if !ok {
mapping = networks.Port(portLabel)
if mapping.Value > 0 {
return mapping.HostIP, mapping.Value, nil
}
// If port isn't a label, try to parse it as a literal port number
port, err := strconv.Atoi(portLabel)
if err != nil {
// Don't include Atoi error message as user likely
// never intended it to be a numeric and it creates a
// confusing error message
return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
// A number was given which will use the Consul agent's address and the given port
// Returning a blank string as an address will use the Consul agent's address
return "", port, nil
}
return mapping.HostIP, mapping.Value, nil
case structs.AddressModeDriver:
// Require a driver network if driver address mode is used
if driverNet == nil {
return "", 0, fmt.Errorf(`cannot use address_mode="driver": no driver network exists`)
}
// If no port label is specified just return the IP
if portLabel == "" {
return driverNet.IP, 0, nil
}
// If the port is a label, use the driver's port (not the host's)
if port, ok := ports.Get(portLabel); ok {
return driverNet.IP, port.To, nil
}
// Check if old style driver portmap is used
if port, ok := driverNet.PortMap[portLabel]; ok {
return driverNet.IP, port, nil
}
// If port isn't a label, try to parse it as a literal port number
port, err := strconv.Atoi(portLabel)
if err != nil {
// Don't include Atoi error message as user likely
// never intended it to be a numeric and it creates a
// confusing error message
return "", 0, fmt.Errorf("invalid port label %q: port labels in driver address_mode must be numeric or in the driver's port map", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
return driverNet.IP, port, nil
case structs.AddressModeAlloc:
if netStatus == nil {
return "", 0, fmt.Errorf(`cannot use address_mode="alloc": no allocation network status reported`)
}
// If no port label is specified just return the IP
if portLabel == "" {
return netStatus.Address, 0, nil
}
// If port is a label and is found then return it
if port, ok := ports.Get(portLabel); ok {
// Use port.To value unless not set
if port.To > 0 {
return netStatus.Address, port.To, nil
}
return netStatus.Address, port.Value, nil
}
// Check if port is a literal number
port, err := strconv.Atoi(portLabel)
if err != nil {
// User likely specified wrong port label here
return "", 0, fmt.Errorf("invalid port %q: port label not found or is not numeric", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
return netStatus.Address, port, nil
default:
// Shouldn't happen due to validation, but enforce invariants
return "", 0, fmt.Errorf("invalid address mode %q", addrMode)
}
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
@@ -393,7 +394,7 @@ func TestServiceRegistration_CheckOnUpdate(t *testing.T) {
sc := NewServiceClient(mockAgent, namespacesClient, logger, true)
allocID := uuid.Generate()
ws := &WorkloadServices{
ws := &serviceregistration.WorkloadServices{
AllocID: allocID,
Task: "taskname",
Restarter: &restartRecorder{},
@@ -444,7 +445,7 @@ func TestServiceRegistration_CheckOnUpdate(t *testing.T) {
}
// Update
wsUpdate := new(WorkloadServices)
wsUpdate := new(serviceregistration.WorkloadServices)
*wsUpdate = *ws
wsUpdate.Services[0].Checks[0].OnUpdate = structs.OnUpdateRequireHealthy

View File

@@ -1,64 +1,22 @@
package consul
import (
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
// WorkloadServices describes services defined in either a Task or TaskGroup
// that need to be syncronized with Consul.
type WorkloadServices struct {
AllocID string
// Name of the task and task group the services are defined for. For
// group based services, Task will be empty.
Task string
Group string
// Canary indicates whether or not the allocation is a canary.
Canary bool
// ConsulNamespace is the consul namespace in which services will be registered.
ConsulNamespace string
// Restarter allows restarting the task or task group depending on the
// check_restart stanzas.
Restarter WorkloadRestarter
// Services and checks to register for the task.
Services []*structs.Service
// Networks from the task's resources stanza.
// TODO: remove and use Ports
Networks structs.Networks
// NetworkStatus from alloc if network namespace is created.
// Can be nil.
NetworkStatus *structs.AllocNetworkStatus
// AllocatedPorts is the list of port mappings.
Ports structs.AllocatedPorts
// DriverExec is the script executor for the task's driver.
// For group services this is nil and script execution is managed by
// a tasklet in the taskrunner script_check_hook.
DriverExec interfaces.ScriptExecutor
// DriverNetwork is the network specified by the driver and may be nil.
DriverNetwork *drivers.DriverNetwork
}
func BuildAllocServices(node *structs.Node, alloc *structs.Allocation, restarter WorkloadRestarter) *WorkloadServices {
func BuildAllocServices(
node *structs.Node, alloc *structs.Allocation, restarter WorkloadRestarter) *serviceregistration.WorkloadServices {
//TODO(schmichael) only support one network for now
net := alloc.AllocatedResources.Shared.Networks[0]
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
ws := &WorkloadServices{
ws := &serviceregistration.WorkloadServices{
AllocID: alloc.ID,
Group: alloc.TaskGroup,
Services: taskenv.InterpolateServices(taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build(), tg.Services),
@@ -82,24 +40,3 @@ func BuildAllocServices(node *structs.Node, alloc *structs.Allocation, restarter
return ws
}
// Copy method for easing tests
func (ws *WorkloadServices) Copy() *WorkloadServices {
newTS := new(WorkloadServices)
*newTS = *ws
// Deep copy Services
newTS.Services = make([]*structs.Service, len(ws.Services))
for i := range ws.Services {
newTS.Services[i] = ws.Services[i].Copy()
}
return newTS
}
func (ws *WorkloadServices) Name() string {
if ws.Task != "" {
return ws.Task
}
return "group-" + ws.Group
}

View File

@@ -11,12 +11,12 @@ import (
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -26,8 +26,8 @@ const (
yPort = 1235
)
func testWorkload() *WorkloadServices {
return &WorkloadServices{
func testWorkload() *serviceregistration.WorkloadServices {
return &serviceregistration.WorkloadServices{
AllocID: uuid.Generate(),
Task: "taskname",
Restarter: &restartRecorder{},
@@ -65,7 +65,7 @@ func (r *restartRecorder) Restart(ctx context.Context, event *structs.TaskEvent,
type testFakeCtx struct {
ServiceClient *ServiceClient
FakeConsul *MockAgent
Workload *WorkloadServices
Workload *serviceregistration.WorkloadServices
}
var errNoOps = fmt.Errorf("testing error: no pending operations")
@@ -502,8 +502,8 @@ func TestConsul_ChangeChecks(t *testing.T) {
t.Fatalf("service ID changed")
}
for newID := range sreg.checkIDs {
if _, ok := otherServiceReg.checkIDs[newID]; ok {
for newID := range sreg.CheckIDs {
if _, ok := otherServiceReg.CheckIDs[newID]; ok {
t.Fatalf("check IDs should change")
}
}
@@ -1349,361 +1349,6 @@ func TestCreateCheckReg_GRPC(t *testing.T) {
require.Equal(t, expected, actual)
}
// TestGetAddress asserts Nomad uses the correct ip and port for services and
// checks depending on port labels, driver networks, and address mode.
func TestGetAddress(t *testing.T) {
const HostIP = "127.0.0.1"
cases := []struct {
Name string
// Parameters
Mode string
PortLabel string
Host map[string]int // will be converted to structs.Networks
Driver *drivers.DriverNetwork
Ports structs.AllocatedPorts
Status *structs.AllocNetworkStatus
// Results
ExpectedIP string
ExpectedPort int
ExpectedErr string
}{
// Valid Configurations
{
Name: "ExampleService",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Host: map[string]int{"db": 12435},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: HostIP,
ExpectedPort: 12435,
},
{
Name: "Host",
Mode: structs.AddressModeHost,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: HostIP,
ExpectedPort: 12345,
},
{
Name: "Driver",
Mode: structs.AddressModeDriver,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 6379,
},
{
Name: "AutoDriver",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
AutoAdvertise: true,
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 6379,
},
{
Name: "DriverCustomPort",
Mode: structs.AddressModeDriver,
PortLabel: "7890",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 7890,
},
// Invalid Configurations
{
Name: "DriverWithoutNetwork",
Mode: structs.AddressModeDriver,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: nil,
ExpectedErr: "no driver network exists",
},
{
Name: "DriverBadPort",
Mode: structs.AddressModeDriver,
PortLabel: "bad-port-label",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedErr: "invalid port",
},
{
Name: "DriverZeroPort",
Mode: structs.AddressModeDriver,
PortLabel: "0",
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
ExpectedErr: "invalid port",
},
{
Name: "HostBadPort",
Mode: structs.AddressModeHost,
PortLabel: "bad-port-label",
ExpectedErr: "invalid port",
},
{
Name: "InvalidMode",
Mode: "invalid-mode",
PortLabel: "80",
ExpectedErr: "invalid address mode",
},
{
Name: "NoPort_AutoMode",
Mode: structs.AddressModeAuto,
ExpectedIP: HostIP,
},
{
Name: "NoPort_HostMode",
Mode: structs.AddressModeHost,
ExpectedIP: HostIP,
},
{
Name: "NoPort_DriverMode",
Mode: structs.AddressModeDriver,
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
ExpectedIP: "10.1.2.3",
},
// Scenarios using port 0.12 networking fields (NetworkStatus, AllocatedPortMapping)
{
Name: "ExampleServer_withAllocatedPorts",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12435,
To: 6379,
HostIP: HostIP,
},
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: HostIP,
ExpectedPort: 12435,
},
{
Name: "Host_withAllocatedPorts",
Mode: structs.AddressModeHost,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: HostIP,
ExpectedPort: 12345,
},
{
Name: "Driver_withAllocatedPorts",
Mode: structs.AddressModeDriver,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 6379,
},
{
Name: "AutoDriver_withAllocatedPorts",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
AutoAdvertise: true,
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 6379,
},
{
Name: "DriverCustomPort_withAllocatedPorts",
Mode: structs.AddressModeDriver,
PortLabel: "7890",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 7890,
},
{
Name: "Host_MultiHostInterface",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: "127.0.0.100",
},
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "127.0.0.100",
ExpectedPort: 12345,
},
{
Name: "Alloc",
Mode: structs.AddressModeAlloc,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
To: 6379,
HostIP: HostIP,
},
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "172.26.0.1",
ExpectedPort: 6379,
},
{
Name: "Alloc no to value",
Mode: structs.AddressModeAlloc,
PortLabel: "db",
Ports: []structs.AllocatedPortMapping{
{
Label: "db",
Value: 12345,
HostIP: HostIP,
},
},
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "172.26.0.1",
ExpectedPort: 12345,
},
{
Name: "AllocCustomPort",
Mode: structs.AddressModeAlloc,
PortLabel: "6379",
Status: &structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "172.26.0.1",
},
ExpectedIP: "172.26.0.1",
ExpectedPort: 6379,
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
// convert host port map into a structs.Networks
networks := []*structs.NetworkResource{
{
IP: HostIP,
ReservedPorts: make([]structs.Port, len(tc.Host)),
},
}
i := 0
for label, port := range tc.Host {
networks[0].ReservedPorts[i].Label = label
networks[0].ReservedPorts[i].Value = port
i++
}
// Run getAddress
ip, port, err := getAddress(tc.Mode, tc.PortLabel, networks, tc.Driver, tc.Ports, tc.Status)
// Assert the results
assert.Equal(t, tc.ExpectedIP, ip, "IP mismatch")
assert.Equal(t, tc.ExpectedPort, port, "Port mismatch")
if tc.ExpectedErr == "" {
assert.Nil(t, err)
} else {
if err == nil {
t.Fatalf("expected error containing %q but err=nil", tc.ExpectedErr)
} else {
assert.Contains(t, err.Error(), tc.ExpectedErr)
}
}
})
}
}
func TestConsul_ServiceName_Duplicates(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
@@ -1789,7 +1434,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
remainingWorkloadServiceID := MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkload.Name(), remainingWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
@@ -1812,7 +1457,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
explicitlyRemovedWorkloadServiceID := MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
@@ -1837,7 +1482,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
outofbandWorkloadServiceID := MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkload.Name(), outofbandWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
@@ -1898,7 +1543,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
remainingWorkloadServiceID := MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkload.Name(), remainingWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
@@ -1921,7 +1566,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
explicitlyRemovedWorkloadServiceID := MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
@@ -1946,7 +1591,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
outofbandWorkloadServiceID := MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkload.Name(), outofbandWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))