mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
Fix Service.AddressMode changes during task updates
This commit is contained in:
@@ -11,5 +11,5 @@ import (
|
||||
type ConsulServiceAPI interface {
|
||||
RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
|
||||
RemoveTask(allocID string, task *structs.Task)
|
||||
UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor) error
|
||||
UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
|
||||
}
|
||||
|
||||
@@ -55,18 +55,18 @@ func newMockConsulServiceClient() *mockConsulServiceClient {
|
||||
return &m
|
||||
}
|
||||
|
||||
func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, exec driver.ScriptExecutor) error {
|
||||
func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T)", allocID, old, new, exec)
|
||||
m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec, nil))
|
||||
m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T, %x)", allocID, old, new, exec, net.Hash())
|
||||
m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec, net))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T)", allocID, task.Name, exec)
|
||||
m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T, %x)", allocID, task.Name, exec, net.Hash())
|
||||
m.ops = append(m.ops, newMockConsulOp("add", allocID, task, exec, net))
|
||||
return nil
|
||||
}
|
||||
|
||||
63
client/driver/env/env.go
vendored
63
client/driver/env/env.go
vendored
@@ -14,10 +14,6 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// Network env vars
|
||||
/*
|
||||
*/
|
||||
|
||||
// A set of environment variables that are exported by each driver.
|
||||
const (
|
||||
// AllocDir is the environment variable with the path to the alloc directory
|
||||
@@ -61,15 +57,15 @@ const (
|
||||
Region = "NOMAD_REGION"
|
||||
|
||||
// AddrPrefix is the prefix for passing both dynamic and static port
|
||||
// allocations to tasks.
|
||||
// allocations to tasks
|
||||
// E.g $NOMAD_ADDR_http=127.0.0.1:80
|
||||
//
|
||||
// Deprecated: Use NOMAD_HOST_ADDR_ or NOMAD_DRIVER_ADDR_ instead as
|
||||
// this environment variable will only ever use the host IP. If a port
|
||||
// map is used this variable will be set to the Host IP and Driver's
|
||||
// Port which likely won't work in either context.
|
||||
AddrPrefix = "NOMAD_ADDR_"
|
||||
|
||||
// HostAddrPrefix is the prefix for passing both dynamic and static
|
||||
// port allocations to tasks with the host's IP address for cases where
|
||||
// the task advertises a different address.
|
||||
HostAddrPrefix = "NOMAD_HOST_ADDR_"
|
||||
|
||||
// IpPrefix is the prefix for passing the IP of a port allocation to a
|
||||
// task. This may not be the host's address depending on task
|
||||
// configuration.
|
||||
@@ -78,14 +74,33 @@ const (
|
||||
// PortPrefix is the prefix for passing the port allocation to a task.
|
||||
PortPrefix = "NOMAD_PORT_"
|
||||
|
||||
// HostIPPrefix is the prefix for passing the host's IP to a task for
|
||||
// HostAddrPrefix is the prefix for passing both dynamic and static
|
||||
// port allocations to tasks with the host's IP address for cases where
|
||||
// the task advertises a different address.
|
||||
HostAddrPrefix = "NOMAD_HOST_ADDR_"
|
||||
|
||||
// HostIpPrefix is the prefix for passing the host's IP to a task for
|
||||
// cases where the task advertises a different address.
|
||||
HostIPPrefix = "NOMAD_HOST_IP_"
|
||||
HostIpPrefix = "NOMAD_HOST_IP_"
|
||||
|
||||
// HostPortPrefix is the prefix for passing the host port when a portmap is
|
||||
// specified.
|
||||
HostPortPrefix = "NOMAD_HOST_PORT_"
|
||||
|
||||
// DriverAddrPrefix is the prefix for passing the address of port
|
||||
// allocations to tasks with the driver's IP address if on exists.
|
||||
// Service's will advertise this address if address_mode=true or
|
||||
// address_mode=auto and the driver determines it should be used.
|
||||
DriverAddrPrefix = "NOMAD_DRIVER_ADDR_"
|
||||
|
||||
// DriverIpPrefix is the prefix for environemnt variables containing
|
||||
// the driver's IP address if it returned one.
|
||||
DriverIpPrefix = "NOMAD_DRIVER_IP_"
|
||||
|
||||
// DriverPortPrefix is the prefix for environment variables containing
|
||||
// the driver's port if a port map is used.
|
||||
DriverPortPrefix = "NOMAD_DRIVER_PORT_"
|
||||
|
||||
// MetaPrefix is the prefix for passing task meta data.
|
||||
MetaPrefix = "NOMAD_META_"
|
||||
|
||||
@@ -494,31 +509,31 @@ func buildNetworkEnv(envMap map[string]string, nets structs.Networks, driverNet
|
||||
func buildPortEnv(envMap map[string]string, p structs.Port, ip string, driverNet *cstructs.DriverNetwork) {
|
||||
// Host IP, PORT, and ADDR
|
||||
portStr := strconv.Itoa(p.Value)
|
||||
envMap["NOMAD_HOST_IP_"+p.Label] = ip
|
||||
envMap["NOMAD_HOST_PORT_"+p.Label] = portStr
|
||||
envMap["NOMAD_HOST_ADDR_"+p.Label] = net.JoinHostPort(ip, portStr)
|
||||
envMap[HostIpPrefix+p.Label] = ip
|
||||
envMap[HostPortPrefix+p.Label] = portStr
|
||||
envMap[HostAddrPrefix+p.Label] = net.JoinHostPort(ip, portStr)
|
||||
|
||||
// Driver IP, PORT, and ADDR if available
|
||||
if driverNet != nil {
|
||||
driverPortStr := strconv.Itoa(driverNet.PortMap[p.Label])
|
||||
envMap["NOMAD_DRIVER_IP_"+p.Label] = driverNet.IP
|
||||
envMap["NOMAD_DRIVER_PORT_"+p.Label] = driverPortStr
|
||||
envMap["NOMAD_DRIVER_ADDR_"+p.Label] = net.JoinHostPort(driverNet.IP, driverPortStr)
|
||||
envMap[DriverIpPrefix+p.Label] = driverNet.IP
|
||||
envMap[DriverPortPrefix+p.Label] = driverPortStr
|
||||
envMap[DriverAddrPrefix+p.Label] = net.JoinHostPort(driverNet.IP, driverPortStr)
|
||||
}
|
||||
|
||||
// Auto IP, PORT, and ADDR (driver if set; otherwise host)
|
||||
if envMap["NOMAD_DRIVER_IP_"+p.Label] != "" {
|
||||
if envMap[DriverIpPrefix+p.Label] != "" {
|
||||
// Driver IP set, use it
|
||||
envMap[IpPrefix+p.Label] = envMap["NOMAD_DRIVER_IP_"+p.Label]
|
||||
envMap[IpPrefix+p.Label] = envMap[DriverIpPrefix+p.Label]
|
||||
} else {
|
||||
envMap[IpPrefix+p.Label] = envMap["NOMAD_HOST_IP_"+p.Label]
|
||||
envMap[IpPrefix+p.Label] = envMap[HostIpPrefix+p.Label]
|
||||
}
|
||||
|
||||
if envMap["NOMAD_DRIVER_PORT_"+p.Label] != "" {
|
||||
if envMap[DriverPortPrefix+p.Label] != "" {
|
||||
// PortMap set, use it
|
||||
envMap[PortPrefix+p.Label] = envMap["NOMAD_DRIVER_PORT_"+p.Label]
|
||||
envMap[PortPrefix+p.Label] = envMap[DriverPortPrefix+p.Label]
|
||||
} else {
|
||||
envMap[PortPrefix+p.Label] = envMap["NOMAD_HOST_PORT_"+p.Label]
|
||||
envMap[PortPrefix+p.Label] = envMap[HostPortPrefix+p.Label]
|
||||
}
|
||||
|
||||
// Address just joins the two (which doesn't make sense if IP is host
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
package structs
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"io"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// MemoryStats holds memory usage related stats
|
||||
type MemoryStats struct {
|
||||
RSS uint64
|
||||
@@ -147,7 +153,7 @@ func (d *DriverNetwork) Advertise() bool {
|
||||
return d != nil && d.AutoAdvertise
|
||||
}
|
||||
|
||||
// Copy a Network struct. If it is nil, nil is returned.
|
||||
// Copy a DriverNetwork struct. If it is nil, nil is returned.
|
||||
func (d *DriverNetwork) Copy() *DriverNetwork {
|
||||
if d == nil {
|
||||
return nil
|
||||
@@ -162,3 +168,19 @@ func (d *DriverNetwork) Copy() *DriverNetwork {
|
||||
AutoAdvertise: d.AutoAdvertise,
|
||||
}
|
||||
}
|
||||
|
||||
// Hash the contents of a DriverNetwork struct to detect changes. If it is nil,
|
||||
// an empty slice is returned.
|
||||
func (d *DriverNetwork) Hash() []byte {
|
||||
if d == nil {
|
||||
return []byte{}
|
||||
}
|
||||
h := md5.New()
|
||||
io.WriteString(h, d.IP)
|
||||
io.WriteString(h, strconv.FormatBool(d.AutoAdvertise))
|
||||
for k, v := range d.PortMap {
|
||||
io.WriteString(h, k)
|
||||
io.WriteString(h, strconv.Itoa(v))
|
||||
}
|
||||
return h.Sum(nil)
|
||||
}
|
||||
|
||||
@@ -88,6 +88,10 @@ type TaskRunner struct {
|
||||
// envBuilder is used to build the task's environment
|
||||
envBuilder *env.Builder
|
||||
|
||||
// driverNet is the network information returned by the driver
|
||||
driverNet *cstructs.DriverNetwork
|
||||
driverNetLock sync.Mutex
|
||||
|
||||
// updateCh is used to receive updated versions of the allocation
|
||||
updateCh chan *structs.Allocation
|
||||
|
||||
@@ -167,6 +171,7 @@ type taskRunnerState struct {
|
||||
TaskDirBuilt bool
|
||||
PayloadRendered bool
|
||||
CreatedResources *driver.CreatedResources
|
||||
DriverNetwork *cstructs.DriverNetwork
|
||||
}
|
||||
|
||||
func (s *taskRunnerState) Hash() []byte {
|
||||
@@ -178,6 +183,7 @@ func (s *taskRunnerState) Hash() []byte {
|
||||
io.WriteString(h, fmt.Sprintf("%v", s.TaskDirBuilt))
|
||||
io.WriteString(h, fmt.Sprintf("%v", s.PayloadRendered))
|
||||
h.Write(s.CreatedResources.Hash())
|
||||
h.Write(s.DriverNetwork.Hash())
|
||||
|
||||
return h.Sum(nil)
|
||||
}
|
||||
@@ -312,6 +318,7 @@ func (r *TaskRunner) RestoreState() (string, error) {
|
||||
r.taskDirBuilt = snap.TaskDirBuilt
|
||||
r.payloadRendered = snap.PayloadRendered
|
||||
r.setCreatedResources(snap.CreatedResources)
|
||||
r.driverNet = snap.DriverNetwork
|
||||
|
||||
if r.task.Vault != nil {
|
||||
// Read the token from the secret directory
|
||||
@@ -337,6 +344,10 @@ func (r *TaskRunner) RestoreState() (string, error) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Add the restored network driver to the environment
|
||||
r.envBuilder.SetDriverNetwork(r.driverNet)
|
||||
|
||||
// Open a connection to the driver handle
|
||||
ctx := driver.NewExecContext(r.taskDir, r.envBuilder.Build())
|
||||
handle, err := d.Open(ctx, snap.HandleID)
|
||||
|
||||
@@ -351,8 +362,7 @@ func (r *TaskRunner) RestoreState() (string, error) {
|
||||
restartReason = pre06ScriptCheckReason
|
||||
}
|
||||
|
||||
//FIXME don't pass nil here
|
||||
if err := r.registerServices(d, handle, nil); err != nil {
|
||||
if err := r.registerServices(d, handle, r.driverNet); err != nil {
|
||||
// Don't hard fail here as there's a chance this task
|
||||
// registered with Consul properly when it initial
|
||||
// started.
|
||||
@@ -422,6 +432,10 @@ func (r *TaskRunner) SaveState() error {
|
||||
}
|
||||
r.handleLock.Unlock()
|
||||
|
||||
r.driverNetLock.Lock()
|
||||
snap.DriverNetwork = r.driverNet.Copy()
|
||||
r.driverNetLock.Unlock()
|
||||
|
||||
// If nothing has changed avoid the write
|
||||
h := snap.Hash()
|
||||
if bytes.Equal(h, r.persistedHash) {
|
||||
@@ -1356,6 +1370,11 @@ func (r *TaskRunner) startTask() error {
|
||||
r.handle = sresp.Handle
|
||||
r.handleLock.Unlock()
|
||||
|
||||
// Need to persist the driver network between restarts
|
||||
r.driverNetLock.Lock()
|
||||
r.driverNet = sresp.Network
|
||||
r.driverNetLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1505,7 +1524,7 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
|
||||
// Merge in the task resources
|
||||
updatedTask.Resources = update.TaskResources[updatedTask.Name]
|
||||
|
||||
// Update the task's environment
|
||||
// Update the task's environment for interpolating in services/checks
|
||||
r.envBuilder.UpdateTask(update, updatedTask)
|
||||
|
||||
var mErr multierror.Error
|
||||
@@ -1549,7 +1568,10 @@ func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, ol
|
||||
exec = h
|
||||
}
|
||||
interpolateServices(r.envBuilder.Build(), new)
|
||||
return r.consul.UpdateTask(r.alloc.ID, old, new, exec)
|
||||
r.driverNetLock.Lock()
|
||||
net := r.driverNet.Copy()
|
||||
r.driverNetLock.Unlock()
|
||||
return r.consul.UpdateTask(r.alloc.ID, old, new, exec, net)
|
||||
}
|
||||
|
||||
// handleDestroy kills the task handle. In the case that killing fails,
|
||||
|
||||
@@ -273,7 +273,10 @@ func (c *ServiceClient) sync() error {
|
||||
// Add Nomad services missing from Consul
|
||||
for id, locals := range c.services {
|
||||
if remotes, ok := consulServices[id]; ok {
|
||||
if locals.Port == remotes.Port {
|
||||
// Make sure Port and Address are stable since
|
||||
// PortLabel and AddressMode aren't included in the
|
||||
// service ID.
|
||||
if locals.Port == remotes.Port && locals.Address == remotes.Address {
|
||||
// Already exists in Consul; skip
|
||||
continue
|
||||
}
|
||||
@@ -295,7 +298,7 @@ func (c *ServiceClient) sync() error {
|
||||
continue
|
||||
}
|
||||
if !isNomadService(check.ServiceID) {
|
||||
// Not managed by Nomad, skip
|
||||
// Service not managed by Nomad, skip
|
||||
continue
|
||||
}
|
||||
// Unknown Nomad managed check; kill
|
||||
@@ -309,6 +312,7 @@ func (c *ServiceClient) sync() error {
|
||||
|
||||
// Add Nomad checks missing from Consul
|
||||
for id, check := range c.checks {
|
||||
c.logger.Printf("[DEBUG] consul.sync: registering check %q -> %s --- exists %p", id, check.Name, consulChecks[id])
|
||||
if check, ok := consulChecks[id]; ok {
|
||||
if _, changed := portsChanged[check.ServiceID]; !changed {
|
||||
// Already in Consul and ports didn't change; skipping
|
||||
@@ -370,7 +374,7 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
|
||||
ops.regServices = append(ops.regServices, serviceReg)
|
||||
|
||||
for _, check := range service.Checks {
|
||||
checkID := createCheckID(id, check)
|
||||
checkID := makeCheckID(id, check)
|
||||
if check.Type == structs.ServiceCheckScript {
|
||||
return fmt.Errorf("service %q contains invalid check: agent checks do not support scripts", service.Name)
|
||||
}
|
||||
@@ -436,8 +440,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st
|
||||
ip, port := task.Resources.Networks.Port(service.PortLabel)
|
||||
if addrMode == structs.AddressModeDriver {
|
||||
if net == nil {
|
||||
//FIXME oof this is a doozy of an error condition... wording?
|
||||
return fmt.Errorf("service %s cannot use driver's IP as it is unset", service.Name)
|
||||
return fmt.Errorf("service %s cannot use driver's IP because driver didn't set one", service.Name)
|
||||
}
|
||||
ip = net.IP
|
||||
port = net.PortMap[service.PortLabel]
|
||||
@@ -453,6 +456,11 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st
|
||||
// with tests that may reuse Tasks
|
||||
copy(serviceReg.Tags, service.Tags)
|
||||
ops.regServices = append(ops.regServices, serviceReg)
|
||||
return c.checkRegs(ops, allocID, id, service, task, exec, net)
|
||||
}
|
||||
|
||||
func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, service *structs.Service,
|
||||
task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
|
||||
|
||||
for _, check := range service.Checks {
|
||||
if check.TLSSkipVerify && !c.skipVerifySupport {
|
||||
@@ -460,7 +468,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st
|
||||
check.Name, task.Name, allocID)
|
||||
continue
|
||||
}
|
||||
checkID := createCheckID(id, check)
|
||||
checkID := makeCheckID(serviceID, check)
|
||||
if check.Type == structs.ServiceCheckScript {
|
||||
if exec == nil {
|
||||
return fmt.Errorf("driver doesn't support script checks")
|
||||
@@ -471,14 +479,13 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st
|
||||
}
|
||||
|
||||
// Checks should always use the host ip:port
|
||||
//FIXME right?!
|
||||
portLabel := check.PortLabel
|
||||
if portLabel == "" {
|
||||
// Default to the service's port label
|
||||
portLabel = service.PortLabel
|
||||
}
|
||||
ip, port := task.Resources.Networks.Port(portLabel)
|
||||
checkReg, err := createCheckReg(id, checkID, check, ip, port)
|
||||
checkReg, err := createCheckReg(serviceID, checkID, check, ip, port)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
|
||||
}
|
||||
@@ -507,7 +514,9 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec dr
|
||||
|
||||
// UpdateTask in Consul. Does not alter the service if only checks have
|
||||
// changed.
|
||||
func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor) error {
|
||||
//
|
||||
// DriverNetwork must not change between invocations for the same allocation.
|
||||
func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
|
||||
ops := &operations{}
|
||||
|
||||
existingIDs := make(map[string]*structs.Service, len(existing.Services))
|
||||
@@ -527,12 +536,15 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
|
||||
// Existing sevice entry removed
|
||||
ops.deregServices = append(ops.deregServices, existingID)
|
||||
for _, check := range existingSvc.Checks {
|
||||
ops.deregChecks = append(ops.deregChecks, createCheckID(existingID, check))
|
||||
ops.deregChecks = append(ops.deregChecks, makeCheckID(existingID, check))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if newSvc.PortLabel == existingSvc.PortLabel {
|
||||
// PortLabel and AddressMode aren't included in the ID, so we
|
||||
// have to compare manually.
|
||||
serviceUnchanged := newSvc.PortLabel == existingSvc.PortLabel && newSvc.AddressMode == existingSvc.AddressMode
|
||||
if serviceUnchanged {
|
||||
// Service exists and hasn't changed, don't add it later
|
||||
delete(newIDs, existingID)
|
||||
}
|
||||
@@ -540,15 +552,21 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
|
||||
// Check to see what checks were updated
|
||||
existingChecks := make(map[string]struct{}, len(existingSvc.Checks))
|
||||
for _, check := range existingSvc.Checks {
|
||||
existingChecks[createCheckID(existingID, check)] = struct{}{}
|
||||
existingChecks[makeCheckID(existingID, check)] = struct{}{}
|
||||
}
|
||||
|
||||
// Register new checks
|
||||
for _, check := range newSvc.Checks {
|
||||
checkID := createCheckID(existingID, check)
|
||||
checkID := makeCheckID(existingID, check)
|
||||
if _, exists := existingChecks[checkID]; exists {
|
||||
// Check exists, so don't remove it
|
||||
delete(existingChecks, checkID)
|
||||
} else if serviceUnchanged {
|
||||
// New check on an unchanged service; add them now
|
||||
err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -560,8 +578,7 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
|
||||
|
||||
// Any remaining services should just be enqueued directly
|
||||
for _, newSvc := range newIDs {
|
||||
//FIXME driver.Network needed
|
||||
err := c.serviceRegs(ops, allocID, newSvc, newTask, exec, nil)
|
||||
err := c.serviceRegs(ops, allocID, newSvc, newTask, exec, net)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -582,7 +599,7 @@ func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) {
|
||||
ops.deregServices = append(ops.deregServices, id)
|
||||
|
||||
for _, check := range service.Checks {
|
||||
ops.deregChecks = append(ops.deregChecks, createCheckID(id, check))
|
||||
ops.deregChecks = append(ops.deregChecks, makeCheckID(id, check))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -679,8 +696,8 @@ func makeTaskServiceID(allocID, taskName string, service *structs.Service) strin
|
||||
return strings.Join(parts, "-")
|
||||
}
|
||||
|
||||
// createCheckID creates a unique ID for a check.
|
||||
func createCheckID(serviceID string, check *structs.ServiceCheck) string {
|
||||
// makeCheckID creates a unique ID for a check.
|
||||
func makeCheckID(serviceID string, check *structs.ServiceCheck) string {
|
||||
return check.Hash(serviceID)
|
||||
}
|
||||
|
||||
|
||||
@@ -241,7 +241,7 @@ func TestConsul_ChangeTags(t *testing.T) {
|
||||
origTask := ctx.Task
|
||||
ctx.Task = testTask()
|
||||
ctx.Task.Services[0].Tags[0] = "newtag"
|
||||
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil); err != nil {
|
||||
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil, nil); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
@@ -383,7 +383,7 @@ func TestConsul_ChangePorts(t *testing.T) {
|
||||
// Removed PortLabel; should default to service's (y)
|
||||
},
|
||||
}
|
||||
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx); err != nil {
|
||||
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx, nil); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
@@ -447,6 +447,106 @@ func TestConsul_ChangePorts(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestConsul_ChangeChecks asserts that updating only the checks on a service
|
||||
// properly syncs with Consul.
|
||||
func TestConsul_ChangeChecks(t *testing.T) {
|
||||
ctx := setupFake()
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "c1",
|
||||
Type: "tcp",
|
||||
Interval: time.Second,
|
||||
Timeout: time.Second,
|
||||
PortLabel: "x",
|
||||
},
|
||||
}
|
||||
|
||||
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx, nil); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
if n := len(ctx.FakeConsul.services); n != 1 {
|
||||
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
|
||||
}
|
||||
|
||||
origServiceKey := ""
|
||||
for k, v := range ctx.FakeConsul.services {
|
||||
origServiceKey = k
|
||||
if v.Name != ctx.Task.Services[0].Name {
|
||||
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
|
||||
}
|
||||
if v.Port != xPort {
|
||||
t.Errorf("expected Port x=%v but found: %v", xPort, v.Port)
|
||||
}
|
||||
}
|
||||
|
||||
if n := len(ctx.FakeConsul.checks); n != 1 {
|
||||
t.Fatalf("expected 1 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
|
||||
}
|
||||
for _, v := range ctx.FakeConsul.checks {
|
||||
if v.Name != "c1" {
|
||||
t.Fatalf("expected check c1 but found %q", v.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Now add a check
|
||||
origTask := ctx.Task.Copy()
|
||||
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "c1",
|
||||
Type: "tcp",
|
||||
Interval: time.Second,
|
||||
Timeout: time.Second,
|
||||
PortLabel: "x",
|
||||
},
|
||||
{
|
||||
Name: "c2",
|
||||
Type: "http",
|
||||
Path: "/",
|
||||
Interval: time.Second,
|
||||
Timeout: time.Second,
|
||||
PortLabel: "x",
|
||||
},
|
||||
}
|
||||
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx, nil); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
if n := len(ctx.FakeConsul.services); n != 1 {
|
||||
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
|
||||
}
|
||||
|
||||
if _, ok := ctx.FakeConsul.services[origServiceKey]; !ok {
|
||||
t.Errorf("unexpected key change; was: %q -- but found %#v", origServiceKey, ctx.FakeConsul.services)
|
||||
}
|
||||
|
||||
if n := len(ctx.FakeConsul.checks); n != 2 {
|
||||
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
|
||||
}
|
||||
|
||||
for k, v := range ctx.FakeConsul.checks {
|
||||
switch v.Name {
|
||||
case "c1":
|
||||
if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected {
|
||||
t.Errorf("expected Port x=%v but found: %v", expected, v.TCP)
|
||||
}
|
||||
case "c2":
|
||||
if expected := fmt.Sprintf("http://:%d/", xPort); v.HTTP != expected {
|
||||
t.Errorf("expected Port x=%v but found: %v", expected, v.HTTP)
|
||||
}
|
||||
default:
|
||||
t.Errorf("Unkown check: %q", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestConsul_RegServices tests basic service registration.
|
||||
func TestConsul_RegServices(t *testing.T) {
|
||||
ctx := setupFake()
|
||||
@@ -829,7 +929,7 @@ func TestConsul_CancelScript(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx); err != nil {
|
||||
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx, nil); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
@@ -880,7 +980,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
|
||||
},
|
||||
{
|
||||
Name: "weird-y-check",
|
||||
Type: "tcp",
|
||||
Type: "http",
|
||||
Interval: time.Second,
|
||||
Timeout: time.Second,
|
||||
PortLabel: "y",
|
||||
@@ -912,8 +1012,8 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
|
||||
"x": 8888,
|
||||
"y": 9999,
|
||||
},
|
||||
IP: "172.18.0.2",
|
||||
AutoUseIP: true,
|
||||
IP: "172.18.0.2",
|
||||
AutoAdvertise: true,
|
||||
}
|
||||
|
||||
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx, net); err != nil {
|
||||
@@ -931,19 +1031,34 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
switch v.Name {
|
||||
case ctx.Task.Services[0].Name: // x
|
||||
// Since DriverNetwork.AutoUseIP=true, driver ports should be used
|
||||
// Since DriverNetwork.AutoAdvertise=true, driver ports should be used
|
||||
if v.Port != net.PortMap["x"] {
|
||||
t.Errorf("expected service %s's port to be %d but found %d",
|
||||
v.Name, net.PortMap["x"], v.Port)
|
||||
}
|
||||
// Checks should always use host port though
|
||||
if v.Checks[0].TCP != ":1234" { // xPort
|
||||
t.Errorf("exepcted service %s check 1's port to be %d but found %q",
|
||||
v.Name, xPort, v.Checks[0].TCP)
|
||||
// The order of checks in Consul is not guaranteed to
|
||||
// be the same as their order in the Task definition,
|
||||
// so check in a loop
|
||||
if expected := 2; len(v.Checks) != expected {
|
||||
t.Errorf("expected %d checks but found %d", len(v.Checks))
|
||||
}
|
||||
if v.Checks[1].TCP != ":1235" { // yPort
|
||||
t.Errorf("exepcted service %s check 2's port to be %d but found %q",
|
||||
v.Name, yPort, v.Checks[1].TCP)
|
||||
for _, c := range v.Checks {
|
||||
// No name on AgentServiceChecks, use type
|
||||
switch {
|
||||
case c.TCP != "":
|
||||
// Checks should always use host port though
|
||||
if c.TCP != ":1234" { // xPort
|
||||
t.Errorf("exepcted service %s check 1's port to be %d but found %q",
|
||||
v.Name, xPort, c.TCP)
|
||||
}
|
||||
case c.HTTP != "":
|
||||
if c.HTTP != "http://:1235" { // yPort
|
||||
t.Errorf("exepcted service %s check 2's port to be %d but found %q",
|
||||
v.Name, yPort, c.HTTP)
|
||||
}
|
||||
default:
|
||||
t.Errorf("unexpected check %#v on service %q", c, v.Name)
|
||||
}
|
||||
}
|
||||
case ctx.Task.Services[1].Name: // y
|
||||
// Service should be container ip:port
|
||||
@@ -1000,8 +1115,8 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
|
||||
"x": 8888,
|
||||
"y": 9999,
|
||||
},
|
||||
IP: "172.18.0.2",
|
||||
AutoUseIP: false,
|
||||
IP: "172.18.0.2",
|
||||
AutoAdvertise: false,
|
||||
}
|
||||
|
||||
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx, net); err != nil {
|
||||
@@ -1013,13 +1128,13 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
|
||||
}
|
||||
|
||||
if n := len(ctx.FakeConsul.services); n != 3 {
|
||||
t.Fatalf("expected 2 services but found: %d", n)
|
||||
t.Fatalf("expected 3 services but found: %d", n)
|
||||
}
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
switch v.Name {
|
||||
case ctx.Task.Services[0].Name: // x + auto
|
||||
// Since DriverNetwork.AutoUseIP=false, host ports should be used
|
||||
// Since DriverNetwork.AutoAdvertise=false, host ports should be used
|
||||
if v.Port != xPort {
|
||||
t.Errorf("expected service %s's port to be %d but found %d",
|
||||
v.Name, xPort, v.Port)
|
||||
@@ -1044,3 +1159,75 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestConsul_DriverNetwork_Change asserts that if a driver network is
|
||||
// specified and a service updates its use its properly updated in Consul.
|
||||
func TestConsul_DriverNetwork_Change(t *testing.T) {
|
||||
ctx := setupFake()
|
||||
|
||||
ctx.Task.Services = []*structs.Service{
|
||||
{
|
||||
Name: "service-foo",
|
||||
PortLabel: "x",
|
||||
AddressMode: structs.AddressModeAuto,
|
||||
},
|
||||
}
|
||||
|
||||
net := &cstructs.DriverNetwork{
|
||||
PortMap: map[string]int{
|
||||
"x": 8888,
|
||||
"y": 9999,
|
||||
},
|
||||
IP: "172.18.0.2",
|
||||
AutoAdvertise: false,
|
||||
}
|
||||
|
||||
syncAndAssertPort := func(port int) {
|
||||
if err := ctx.syncOnce(); err != nil {
|
||||
t.Fatalf("unexpected error syncing task: %v", err)
|
||||
}
|
||||
|
||||
if n := len(ctx.FakeConsul.services); n != 1 {
|
||||
t.Fatalf("expected 1 service but found: %d", n)
|
||||
}
|
||||
|
||||
for _, v := range ctx.FakeConsul.services {
|
||||
switch v.Name {
|
||||
case ctx.Task.Services[0].Name:
|
||||
if v.Port != port {
|
||||
t.Errorf("expected service %s's port to be %d but found %d",
|
||||
v.Name, port, v.Port)
|
||||
}
|
||||
default:
|
||||
t.Errorf("unexpected service name: %q", v.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initial service should advertise host port x
|
||||
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx, net); err != nil {
|
||||
t.Fatalf("unexpected error registering task: %v", err)
|
||||
}
|
||||
|
||||
syncAndAssertPort(xPort)
|
||||
|
||||
// UpdateTask to use Host (shouldn't change anything)
|
||||
orig := ctx.Task.Copy()
|
||||
ctx.Task.Services[0].AddressMode = structs.AddressModeHost
|
||||
|
||||
if err := ctx.ServiceClient.UpdateTask("allocid", orig, ctx.Task, ctx, net); err != nil {
|
||||
t.Fatalf("unexpected error updating task: %v", err)
|
||||
}
|
||||
|
||||
syncAndAssertPort(xPort)
|
||||
|
||||
// UpdateTask to use Driver (*should* change IP and port)
|
||||
orig = ctx.Task.Copy()
|
||||
ctx.Task.Services[0].AddressMode = structs.AddressModeDriver
|
||||
|
||||
if err := ctx.ServiceClient.UpdateTask("allocid", orig, ctx.Task, ctx, net); err != nil {
|
||||
t.Fatalf("unexpected error updating task: %v", err)
|
||||
}
|
||||
|
||||
syncAndAssertPort(net.PortMap["x"])
|
||||
}
|
||||
|
||||
@@ -2580,6 +2580,7 @@ func (s *Service) Hash() string {
|
||||
io.WriteString(h, s.Name)
|
||||
io.WriteString(h, strings.Join(s.Tags, ""))
|
||||
io.WriteString(h, s.PortLabel)
|
||||
io.WriteString(h, s.AddressMode)
|
||||
return fmt.Sprintf("%x", h.Sum(nil))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user