From 4117c8b3a2eb9247e8885a1855bd6b07a22039c3 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 16 Jun 2017 16:35:16 -0700 Subject: [PATCH] Fix Service.AddressMode changes during task updates --- client/consul.go | 2 +- client/consul_test.go | 8 +- client/driver/env/env.go | 63 +++++---- client/structs/structs.go | 24 +++- client/task_runner.go | 30 +++- command/agent/consul/client.go | 53 ++++--- command/agent/consul/unit_test.go | 223 +++++++++++++++++++++++++++--- nomad/structs/structs.go | 1 + 8 files changed, 334 insertions(+), 70 deletions(-) diff --git a/client/consul.go b/client/consul.go index 25f61882f..5635bc362 100644 --- a/client/consul.go +++ b/client/consul.go @@ -11,5 +11,5 @@ import ( type ConsulServiceAPI interface { RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error RemoveTask(allocID string, task *structs.Task) - UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor) error + UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error } diff --git a/client/consul_test.go b/client/consul_test.go index 8bb782668..b8f282ebd 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -55,18 +55,18 @@ func newMockConsulServiceClient() *mockConsulServiceClient { return &m } -func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, exec driver.ScriptExecutor) error { +func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { m.mu.Lock() defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T)", allocID, old, new, exec) - m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec, nil)) + m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T, %x)", allocID, old, new, exec, net.Hash()) + m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec, net)) return nil } func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { m.mu.Lock() defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T)", allocID, task.Name, exec) + m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T, %x)", allocID, task.Name, exec, net.Hash()) m.ops = append(m.ops, newMockConsulOp("add", allocID, task, exec, net)) return nil } diff --git a/client/driver/env/env.go b/client/driver/env/env.go index 7b4af98e5..96dccd879 100644 --- a/client/driver/env/env.go +++ b/client/driver/env/env.go @@ -14,10 +14,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -// Network env vars -/* - */ - // A set of environment variables that are exported by each driver. const ( // AllocDir is the environment variable with the path to the alloc directory @@ -61,15 +57,15 @@ const ( Region = "NOMAD_REGION" // AddrPrefix is the prefix for passing both dynamic and static port - // allocations to tasks. + // allocations to tasks // E.g $NOMAD_ADDR_http=127.0.0.1:80 + // + // Deprecated: Use NOMAD_HOST_ADDR_ or NOMAD_DRIVER_ADDR_ instead as + // this environment variable will only ever use the host IP. If a port + // map is used this variable will be set to the Host IP and Driver's + // Port which likely won't work in either context. AddrPrefix = "NOMAD_ADDR_" - // HostAddrPrefix is the prefix for passing both dynamic and static - // port allocations to tasks with the host's IP address for cases where - // the task advertises a different address. - HostAddrPrefix = "NOMAD_HOST_ADDR_" - // IpPrefix is the prefix for passing the IP of a port allocation to a // task. This may not be the host's address depending on task // configuration. @@ -78,14 +74,33 @@ const ( // PortPrefix is the prefix for passing the port allocation to a task. PortPrefix = "NOMAD_PORT_" - // HostIPPrefix is the prefix for passing the host's IP to a task for + // HostAddrPrefix is the prefix for passing both dynamic and static + // port allocations to tasks with the host's IP address for cases where + // the task advertises a different address. + HostAddrPrefix = "NOMAD_HOST_ADDR_" + + // HostIpPrefix is the prefix for passing the host's IP to a task for // cases where the task advertises a different address. - HostIPPrefix = "NOMAD_HOST_IP_" + HostIpPrefix = "NOMAD_HOST_IP_" // HostPortPrefix is the prefix for passing the host port when a portmap is // specified. HostPortPrefix = "NOMAD_HOST_PORT_" + // DriverAddrPrefix is the prefix for passing the address of port + // allocations to tasks with the driver's IP address if on exists. + // Service's will advertise this address if address_mode=true or + // address_mode=auto and the driver determines it should be used. + DriverAddrPrefix = "NOMAD_DRIVER_ADDR_" + + // DriverIpPrefix is the prefix for environemnt variables containing + // the driver's IP address if it returned one. + DriverIpPrefix = "NOMAD_DRIVER_IP_" + + // DriverPortPrefix is the prefix for environment variables containing + // the driver's port if a port map is used. + DriverPortPrefix = "NOMAD_DRIVER_PORT_" + // MetaPrefix is the prefix for passing task meta data. MetaPrefix = "NOMAD_META_" @@ -494,31 +509,31 @@ func buildNetworkEnv(envMap map[string]string, nets structs.Networks, driverNet func buildPortEnv(envMap map[string]string, p structs.Port, ip string, driverNet *cstructs.DriverNetwork) { // Host IP, PORT, and ADDR portStr := strconv.Itoa(p.Value) - envMap["NOMAD_HOST_IP_"+p.Label] = ip - envMap["NOMAD_HOST_PORT_"+p.Label] = portStr - envMap["NOMAD_HOST_ADDR_"+p.Label] = net.JoinHostPort(ip, portStr) + envMap[HostIpPrefix+p.Label] = ip + envMap[HostPortPrefix+p.Label] = portStr + envMap[HostAddrPrefix+p.Label] = net.JoinHostPort(ip, portStr) // Driver IP, PORT, and ADDR if available if driverNet != nil { driverPortStr := strconv.Itoa(driverNet.PortMap[p.Label]) - envMap["NOMAD_DRIVER_IP_"+p.Label] = driverNet.IP - envMap["NOMAD_DRIVER_PORT_"+p.Label] = driverPortStr - envMap["NOMAD_DRIVER_ADDR_"+p.Label] = net.JoinHostPort(driverNet.IP, driverPortStr) + envMap[DriverIpPrefix+p.Label] = driverNet.IP + envMap[DriverPortPrefix+p.Label] = driverPortStr + envMap[DriverAddrPrefix+p.Label] = net.JoinHostPort(driverNet.IP, driverPortStr) } // Auto IP, PORT, and ADDR (driver if set; otherwise host) - if envMap["NOMAD_DRIVER_IP_"+p.Label] != "" { + if envMap[DriverIpPrefix+p.Label] != "" { // Driver IP set, use it - envMap[IpPrefix+p.Label] = envMap["NOMAD_DRIVER_IP_"+p.Label] + envMap[IpPrefix+p.Label] = envMap[DriverIpPrefix+p.Label] } else { - envMap[IpPrefix+p.Label] = envMap["NOMAD_HOST_IP_"+p.Label] + envMap[IpPrefix+p.Label] = envMap[HostIpPrefix+p.Label] } - if envMap["NOMAD_DRIVER_PORT_"+p.Label] != "" { + if envMap[DriverPortPrefix+p.Label] != "" { // PortMap set, use it - envMap[PortPrefix+p.Label] = envMap["NOMAD_DRIVER_PORT_"+p.Label] + envMap[PortPrefix+p.Label] = envMap[DriverPortPrefix+p.Label] } else { - envMap[PortPrefix+p.Label] = envMap["NOMAD_HOST_PORT_"+p.Label] + envMap[PortPrefix+p.Label] = envMap[HostPortPrefix+p.Label] } // Address just joins the two (which doesn't make sense if IP is host diff --git a/client/structs/structs.go b/client/structs/structs.go index ac1d0bcef..0673ce951 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -1,5 +1,11 @@ package structs +import ( + "crypto/md5" + "io" + "strconv" +) + // MemoryStats holds memory usage related stats type MemoryStats struct { RSS uint64 @@ -147,7 +153,7 @@ func (d *DriverNetwork) Advertise() bool { return d != nil && d.AutoAdvertise } -// Copy a Network struct. If it is nil, nil is returned. +// Copy a DriverNetwork struct. If it is nil, nil is returned. func (d *DriverNetwork) Copy() *DriverNetwork { if d == nil { return nil @@ -162,3 +168,19 @@ func (d *DriverNetwork) Copy() *DriverNetwork { AutoAdvertise: d.AutoAdvertise, } } + +// Hash the contents of a DriverNetwork struct to detect changes. If it is nil, +// an empty slice is returned. +func (d *DriverNetwork) Hash() []byte { + if d == nil { + return []byte{} + } + h := md5.New() + io.WriteString(h, d.IP) + io.WriteString(h, strconv.FormatBool(d.AutoAdvertise)) + for k, v := range d.PortMap { + io.WriteString(h, k) + io.WriteString(h, strconv.Itoa(v)) + } + return h.Sum(nil) +} diff --git a/client/task_runner.go b/client/task_runner.go index d1b45331f..db494c94c 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -88,6 +88,10 @@ type TaskRunner struct { // envBuilder is used to build the task's environment envBuilder *env.Builder + // driverNet is the network information returned by the driver + driverNet *cstructs.DriverNetwork + driverNetLock sync.Mutex + // updateCh is used to receive updated versions of the allocation updateCh chan *structs.Allocation @@ -167,6 +171,7 @@ type taskRunnerState struct { TaskDirBuilt bool PayloadRendered bool CreatedResources *driver.CreatedResources + DriverNetwork *cstructs.DriverNetwork } func (s *taskRunnerState) Hash() []byte { @@ -178,6 +183,7 @@ func (s *taskRunnerState) Hash() []byte { io.WriteString(h, fmt.Sprintf("%v", s.TaskDirBuilt)) io.WriteString(h, fmt.Sprintf("%v", s.PayloadRendered)) h.Write(s.CreatedResources.Hash()) + h.Write(s.DriverNetwork.Hash()) return h.Sum(nil) } @@ -312,6 +318,7 @@ func (r *TaskRunner) RestoreState() (string, error) { r.taskDirBuilt = snap.TaskDirBuilt r.payloadRendered = snap.PayloadRendered r.setCreatedResources(snap.CreatedResources) + r.driverNet = snap.DriverNetwork if r.task.Vault != nil { // Read the token from the secret directory @@ -337,6 +344,10 @@ func (r *TaskRunner) RestoreState() (string, error) { return "", err } + // Add the restored network driver to the environment + r.envBuilder.SetDriverNetwork(r.driverNet) + + // Open a connection to the driver handle ctx := driver.NewExecContext(r.taskDir, r.envBuilder.Build()) handle, err := d.Open(ctx, snap.HandleID) @@ -351,8 +362,7 @@ func (r *TaskRunner) RestoreState() (string, error) { restartReason = pre06ScriptCheckReason } - //FIXME don't pass nil here - if err := r.registerServices(d, handle, nil); err != nil { + if err := r.registerServices(d, handle, r.driverNet); err != nil { // Don't hard fail here as there's a chance this task // registered with Consul properly when it initial // started. @@ -422,6 +432,10 @@ func (r *TaskRunner) SaveState() error { } r.handleLock.Unlock() + r.driverNetLock.Lock() + snap.DriverNetwork = r.driverNet.Copy() + r.driverNetLock.Unlock() + // If nothing has changed avoid the write h := snap.Hash() if bytes.Equal(h, r.persistedHash) { @@ -1356,6 +1370,11 @@ func (r *TaskRunner) startTask() error { r.handle = sresp.Handle r.handleLock.Unlock() + // Need to persist the driver network between restarts + r.driverNetLock.Lock() + r.driverNet = sresp.Network + r.driverNetLock.Unlock() + return nil } @@ -1505,7 +1524,7 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { // Merge in the task resources updatedTask.Resources = update.TaskResources[updatedTask.Name] - // Update the task's environment + // Update the task's environment for interpolating in services/checks r.envBuilder.UpdateTask(update, updatedTask) var mErr multierror.Error @@ -1549,7 +1568,10 @@ func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, ol exec = h } interpolateServices(r.envBuilder.Build(), new) - return r.consul.UpdateTask(r.alloc.ID, old, new, exec) + r.driverNetLock.Lock() + net := r.driverNet.Copy() + r.driverNetLock.Unlock() + return r.consul.UpdateTask(r.alloc.ID, old, new, exec, net) } // handleDestroy kills the task handle. In the case that killing fails, diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 59dd03d6c..5d5c04ec8 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -273,7 +273,10 @@ func (c *ServiceClient) sync() error { // Add Nomad services missing from Consul for id, locals := range c.services { if remotes, ok := consulServices[id]; ok { - if locals.Port == remotes.Port { + // Make sure Port and Address are stable since + // PortLabel and AddressMode aren't included in the + // service ID. + if locals.Port == remotes.Port && locals.Address == remotes.Address { // Already exists in Consul; skip continue } @@ -295,7 +298,7 @@ func (c *ServiceClient) sync() error { continue } if !isNomadService(check.ServiceID) { - // Not managed by Nomad, skip + // Service not managed by Nomad, skip continue } // Unknown Nomad managed check; kill @@ -309,6 +312,7 @@ func (c *ServiceClient) sync() error { // Add Nomad checks missing from Consul for id, check := range c.checks { + c.logger.Printf("[DEBUG] consul.sync: registering check %q -> %s --- exists %p", id, check.Name, consulChecks[id]) if check, ok := consulChecks[id]; ok { if _, changed := portsChanged[check.ServiceID]; !changed { // Already in Consul and ports didn't change; skipping @@ -370,7 +374,7 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) ops.regServices = append(ops.regServices, serviceReg) for _, check := range service.Checks { - checkID := createCheckID(id, check) + checkID := makeCheckID(id, check) if check.Type == structs.ServiceCheckScript { return fmt.Errorf("service %q contains invalid check: agent checks do not support scripts", service.Name) } @@ -436,8 +440,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st ip, port := task.Resources.Networks.Port(service.PortLabel) if addrMode == structs.AddressModeDriver { if net == nil { - //FIXME oof this is a doozy of an error condition... wording? - return fmt.Errorf("service %s cannot use driver's IP as it is unset", service.Name) + return fmt.Errorf("service %s cannot use driver's IP because driver didn't set one", service.Name) } ip = net.IP port = net.PortMap[service.PortLabel] @@ -453,6 +456,11 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st // with tests that may reuse Tasks copy(serviceReg.Tags, service.Tags) ops.regServices = append(ops.regServices, serviceReg) + return c.checkRegs(ops, allocID, id, service, task, exec, net) +} + +func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, service *structs.Service, + task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { for _, check := range service.Checks { if check.TLSSkipVerify && !c.skipVerifySupport { @@ -460,7 +468,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st check.Name, task.Name, allocID) continue } - checkID := createCheckID(id, check) + checkID := makeCheckID(serviceID, check) if check.Type == structs.ServiceCheckScript { if exec == nil { return fmt.Errorf("driver doesn't support script checks") @@ -471,14 +479,13 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st } // Checks should always use the host ip:port - //FIXME right?! portLabel := check.PortLabel if portLabel == "" { // Default to the service's port label portLabel = service.PortLabel } ip, port := task.Resources.Networks.Port(portLabel) - checkReg, err := createCheckReg(id, checkID, check, ip, port) + checkReg, err := createCheckReg(serviceID, checkID, check, ip, port) if err != nil { return fmt.Errorf("failed to add check %q: %v", check.Name, err) } @@ -507,7 +514,9 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec dr // UpdateTask in Consul. Does not alter the service if only checks have // changed. -func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor) error { +// +// DriverNetwork must not change between invocations for the same allocation. +func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { ops := &operations{} existingIDs := make(map[string]*structs.Service, len(existing.Services)) @@ -527,12 +536,15 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta // Existing sevice entry removed ops.deregServices = append(ops.deregServices, existingID) for _, check := range existingSvc.Checks { - ops.deregChecks = append(ops.deregChecks, createCheckID(existingID, check)) + ops.deregChecks = append(ops.deregChecks, makeCheckID(existingID, check)) } continue } - if newSvc.PortLabel == existingSvc.PortLabel { + // PortLabel and AddressMode aren't included in the ID, so we + // have to compare manually. + serviceUnchanged := newSvc.PortLabel == existingSvc.PortLabel && newSvc.AddressMode == existingSvc.AddressMode + if serviceUnchanged { // Service exists and hasn't changed, don't add it later delete(newIDs, existingID) } @@ -540,15 +552,21 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta // Check to see what checks were updated existingChecks := make(map[string]struct{}, len(existingSvc.Checks)) for _, check := range existingSvc.Checks { - existingChecks[createCheckID(existingID, check)] = struct{}{} + existingChecks[makeCheckID(existingID, check)] = struct{}{} } // Register new checks for _, check := range newSvc.Checks { - checkID := createCheckID(existingID, check) + checkID := makeCheckID(existingID, check) if _, exists := existingChecks[checkID]; exists { // Check exists, so don't remove it delete(existingChecks, checkID) + } else if serviceUnchanged { + // New check on an unchanged service; add them now + err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net) + if err != nil { + return err + } } } @@ -560,8 +578,7 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta // Any remaining services should just be enqueued directly for _, newSvc := range newIDs { - //FIXME driver.Network needed - err := c.serviceRegs(ops, allocID, newSvc, newTask, exec, nil) + err := c.serviceRegs(ops, allocID, newSvc, newTask, exec, net) if err != nil { return err } @@ -582,7 +599,7 @@ func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) { ops.deregServices = append(ops.deregServices, id) for _, check := range service.Checks { - ops.deregChecks = append(ops.deregChecks, createCheckID(id, check)) + ops.deregChecks = append(ops.deregChecks, makeCheckID(id, check)) } } @@ -679,8 +696,8 @@ func makeTaskServiceID(allocID, taskName string, service *structs.Service) strin return strings.Join(parts, "-") } -// createCheckID creates a unique ID for a check. -func createCheckID(serviceID string, check *structs.ServiceCheck) string { +// makeCheckID creates a unique ID for a check. +func makeCheckID(serviceID string, check *structs.ServiceCheck) string { return check.Hash(serviceID) } diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 022603873..8a42cd5fb 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -241,7 +241,7 @@ func TestConsul_ChangeTags(t *testing.T) { origTask := ctx.Task ctx.Task = testTask() ctx.Task.Services[0].Tags[0] = "newtag" - if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil); err != nil { + if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil, nil); err != nil { t.Fatalf("unexpected error registering task: %v", err) } if err := ctx.syncOnce(); err != nil { @@ -383,7 +383,7 @@ func TestConsul_ChangePorts(t *testing.T) { // Removed PortLabel; should default to service's (y) }, } - if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx); err != nil { + if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx, nil); err != nil { t.Fatalf("unexpected error registering task: %v", err) } if err := ctx.syncOnce(); err != nil { @@ -447,6 +447,106 @@ func TestConsul_ChangePorts(t *testing.T) { } } +// TestConsul_ChangeChecks asserts that updating only the checks on a service +// properly syncs with Consul. +func TestConsul_ChangeChecks(t *testing.T) { + ctx := setupFake() + ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ + { + Name: "c1", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + PortLabel: "x", + }, + } + + if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx, nil); err != nil { + t.Fatalf("unexpected error registering task: %v", err) + } + + if err := ctx.syncOnce(); err != nil { + t.Fatalf("unexpected error syncing task: %v", err) + } + + if n := len(ctx.FakeConsul.services); n != 1 { + t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) + } + + origServiceKey := "" + for k, v := range ctx.FakeConsul.services { + origServiceKey = k + if v.Name != ctx.Task.Services[0].Name { + t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) + } + if v.Port != xPort { + t.Errorf("expected Port x=%v but found: %v", xPort, v.Port) + } + } + + if n := len(ctx.FakeConsul.checks); n != 1 { + t.Fatalf("expected 1 check but found %d:\n%#v", n, ctx.FakeConsul.checks) + } + for _, v := range ctx.FakeConsul.checks { + if v.Name != "c1" { + t.Fatalf("expected check c1 but found %q", v.Name) + } + } + + // Now add a check + origTask := ctx.Task.Copy() + ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ + { + Name: "c1", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + PortLabel: "x", + }, + { + Name: "c2", + Type: "http", + Path: "/", + Interval: time.Second, + Timeout: time.Second, + PortLabel: "x", + }, + } + if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx, nil); err != nil { + t.Fatalf("unexpected error registering task: %v", err) + } + if err := ctx.syncOnce(); err != nil { + t.Fatalf("unexpected error syncing task: %v", err) + } + + if n := len(ctx.FakeConsul.services); n != 1 { + t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) + } + + if _, ok := ctx.FakeConsul.services[origServiceKey]; !ok { + t.Errorf("unexpected key change; was: %q -- but found %#v", origServiceKey, ctx.FakeConsul.services) + } + + if n := len(ctx.FakeConsul.checks); n != 2 { + t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks) + } + + for k, v := range ctx.FakeConsul.checks { + switch v.Name { + case "c1": + if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected { + t.Errorf("expected Port x=%v but found: %v", expected, v.TCP) + } + case "c2": + if expected := fmt.Sprintf("http://:%d/", xPort); v.HTTP != expected { + t.Errorf("expected Port x=%v but found: %v", expected, v.HTTP) + } + default: + t.Errorf("Unkown check: %q", k) + } + } +} + // TestConsul_RegServices tests basic service registration. func TestConsul_RegServices(t *testing.T) { ctx := setupFake() @@ -829,7 +929,7 @@ func TestConsul_CancelScript(t *testing.T) { }, } - if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx); err != nil { + if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx, nil); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -880,7 +980,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) { }, { Name: "weird-y-check", - Type: "tcp", + Type: "http", Interval: time.Second, Timeout: time.Second, PortLabel: "y", @@ -912,8 +1012,8 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) { "x": 8888, "y": 9999, }, - IP: "172.18.0.2", - AutoUseIP: true, + IP: "172.18.0.2", + AutoAdvertise: true, } if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx, net); err != nil { @@ -931,19 +1031,34 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) { for _, v := range ctx.FakeConsul.services { switch v.Name { case ctx.Task.Services[0].Name: // x - // Since DriverNetwork.AutoUseIP=true, driver ports should be used + // Since DriverNetwork.AutoAdvertise=true, driver ports should be used if v.Port != net.PortMap["x"] { t.Errorf("expected service %s's port to be %d but found %d", v.Name, net.PortMap["x"], v.Port) } - // Checks should always use host port though - if v.Checks[0].TCP != ":1234" { // xPort - t.Errorf("exepcted service %s check 1's port to be %d but found %q", - v.Name, xPort, v.Checks[0].TCP) + // The order of checks in Consul is not guaranteed to + // be the same as their order in the Task definition, + // so check in a loop + if expected := 2; len(v.Checks) != expected { + t.Errorf("expected %d checks but found %d", len(v.Checks)) } - if v.Checks[1].TCP != ":1235" { // yPort - t.Errorf("exepcted service %s check 2's port to be %d but found %q", - v.Name, yPort, v.Checks[1].TCP) + for _, c := range v.Checks { + // No name on AgentServiceChecks, use type + switch { + case c.TCP != "": + // Checks should always use host port though + if c.TCP != ":1234" { // xPort + t.Errorf("exepcted service %s check 1's port to be %d but found %q", + v.Name, xPort, c.TCP) + } + case c.HTTP != "": + if c.HTTP != "http://:1235" { // yPort + t.Errorf("exepcted service %s check 2's port to be %d but found %q", + v.Name, yPort, c.HTTP) + } + default: + t.Errorf("unexpected check %#v on service %q", c, v.Name) + } } case ctx.Task.Services[1].Name: // y // Service should be container ip:port @@ -1000,8 +1115,8 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { "x": 8888, "y": 9999, }, - IP: "172.18.0.2", - AutoUseIP: false, + IP: "172.18.0.2", + AutoAdvertise: false, } if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx, net); err != nil { @@ -1013,13 +1128,13 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { } if n := len(ctx.FakeConsul.services); n != 3 { - t.Fatalf("expected 2 services but found: %d", n) + t.Fatalf("expected 3 services but found: %d", n) } for _, v := range ctx.FakeConsul.services { switch v.Name { case ctx.Task.Services[0].Name: // x + auto - // Since DriverNetwork.AutoUseIP=false, host ports should be used + // Since DriverNetwork.AutoAdvertise=false, host ports should be used if v.Port != xPort { t.Errorf("expected service %s's port to be %d but found %d", v.Name, xPort, v.Port) @@ -1044,3 +1159,75 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { } } } + +// TestConsul_DriverNetwork_Change asserts that if a driver network is +// specified and a service updates its use its properly updated in Consul. +func TestConsul_DriverNetwork_Change(t *testing.T) { + ctx := setupFake() + + ctx.Task.Services = []*structs.Service{ + { + Name: "service-foo", + PortLabel: "x", + AddressMode: structs.AddressModeAuto, + }, + } + + net := &cstructs.DriverNetwork{ + PortMap: map[string]int{ + "x": 8888, + "y": 9999, + }, + IP: "172.18.0.2", + AutoAdvertise: false, + } + + syncAndAssertPort := func(port int) { + if err := ctx.syncOnce(); err != nil { + t.Fatalf("unexpected error syncing task: %v", err) + } + + if n := len(ctx.FakeConsul.services); n != 1 { + t.Fatalf("expected 1 service but found: %d", n) + } + + for _, v := range ctx.FakeConsul.services { + switch v.Name { + case ctx.Task.Services[0].Name: + if v.Port != port { + t.Errorf("expected service %s's port to be %d but found %d", + v.Name, port, v.Port) + } + default: + t.Errorf("unexpected service name: %q", v.Name) + } + } + } + + // Initial service should advertise host port x + if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx, net); err != nil { + t.Fatalf("unexpected error registering task: %v", err) + } + + syncAndAssertPort(xPort) + + // UpdateTask to use Host (shouldn't change anything) + orig := ctx.Task.Copy() + ctx.Task.Services[0].AddressMode = structs.AddressModeHost + + if err := ctx.ServiceClient.UpdateTask("allocid", orig, ctx.Task, ctx, net); err != nil { + t.Fatalf("unexpected error updating task: %v", err) + } + + syncAndAssertPort(xPort) + + // UpdateTask to use Driver (*should* change IP and port) + orig = ctx.Task.Copy() + ctx.Task.Services[0].AddressMode = structs.AddressModeDriver + + if err := ctx.ServiceClient.UpdateTask("allocid", orig, ctx.Task, ctx, net); err != nil { + t.Fatalf("unexpected error updating task: %v", err) + } + + syncAndAssertPort(net.PortMap["x"]) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e81294fda..51cca9efa 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2580,6 +2580,7 @@ func (s *Service) Hash() string { io.WriteString(h, s.Name) io.WriteString(h, strings.Join(s.Tags, "")) io.WriteString(h, s.PortLabel) + io.WriteString(h, s.AddressMode) return fmt.Sprintf("%x", h.Sum(nil)) }