diff --git a/client/consul.go b/client/consul.go index 41da0abeb..c9fe4e107 100644 --- a/client/consul.go +++ b/client/consul.go @@ -10,4 +10,5 @@ import ( type ConsulServiceAPI interface { RegisterTask(allocID string, task *structs.Task, exec consul.ScriptExecutor) error RemoveTask(allocID string, task *structs.Task) + UpdateTask(allocID string, existing, newTask *structs.Task, exec consul.ScriptExecutor) error } diff --git a/client/task_runner.go b/client/task_runner.go index 64eb4570f..209336d70 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1386,15 +1386,27 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { updatedTask.Resources = update.TaskResources[updatedTask.Name] var mErr multierror.Error - var scriptExec consul.ScriptExecutor r.handleLock.Lock() if r.handle != nil { // Update will update resources and store the new kill timeout. if err := r.handle.Update(updatedTask); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err)) } - // Not all drivers support Exec (eg QEMU) - scriptExec, _ = r.handle.(consul.ScriptExecutor) + + //FIXME is there a better place to do this? used to be in executor + // Prepare services + interpolateServices(r.getTaskEnv(), updatedTask) + + // Not all drivers support Exec (eg QEMU), but RegisterTask + // handles nil ScriptExecutors + scriptExec, _ := r.handle.(consul.ScriptExecutor) + + // Since the handle exists, the task is running, so we need to + // update it in Consul (if the handle doesn't exist + // registration in Consul will happen when it's created) + if err := r.consul.UpdateTask(r.alloc.ID, r.task, updatedTask, scriptExec); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err)) + } } r.handleLock.Unlock() @@ -1403,21 +1415,9 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { r.restartTracker.SetPolicy(tg.RestartPolicy) } - // Deregister the old service+checks - r.consul.RemoveTask(r.alloc.ID, r.task) - // Store the updated alloc. r.alloc = update r.task = updatedTask - - //FIXME is there a better place to do this? used to be in executor - // Prepare services - interpolateServices(r.getTaskEnv(), r.task) - - // Register the new service+checks - if err := r.consul.RegisterTask(r.alloc.ID, r.task, scriptExec); err != nil { - mErr.Errors = append(mErr.Errors, fmt.Errorf("error registering updated task with consul: %v", err)) - } return mErr.ErrorOrNil() } diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 3538f2f5e..bc80acf70 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -88,17 +88,9 @@ type ServiceClient struct { // syncCh triggers a sync in the main Run loop syncCh chan struct{} - // services and checks to be registered - regServices map[string]*api.AgentServiceRegistration - regChecks map[string]*api.AgentCheckRegistration - - // services and checks to be unregisterd - deregServices map[string]struct{} - deregChecks map[string]struct{} - - // script checks to be run() after their corresponding check is - // registered - regScripts map[string]*scriptCheck + // pending service and check operations + pending *consulOps + opsLock sync.Mutex // script check cancel funcs to be called before their corresponding // check is removed. Only accessed in sync() so not covered by regLock @@ -126,11 +118,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient shutdownCh: make(chan struct{}), shutdownWait: defaultShutdownWait, syncCh: make(chan struct{}, 1), - regServices: make(map[string]*api.AgentServiceRegistration), - regChecks: make(map[string]*api.AgentCheckRegistration), - deregServices: make(map[string]struct{}), - deregChecks: make(map[string]struct{}), - regScripts: make(map[string]*scriptCheck), + pending: newConsulOps(), runningScripts: make(map[string]*scriptHandle), agentServices: make(map[string]struct{}, 8), agentChecks: make(map[string]struct{}, 8), @@ -182,67 +170,39 @@ func (c *ServiceClient) forceSync() { // sync enqueued operations. func (c *ServiceClient) sync() error { - // Shallow copy and reset the pending operations fields - c.regLock.Lock() - regServices := make(map[string]*api.AgentServiceRegistration, len(c.regServices)) - for k, v := range c.regServices { - regServices[k] = v - } - c.regServices = map[string]*api.AgentServiceRegistration{} - - regChecks := make(map[string]*api.AgentCheckRegistration, len(c.regChecks)) - for k, v := range c.regChecks { - regChecks[k] = v - } - c.regChecks = map[string]*api.AgentCheckRegistration{} - - regScripts := make(map[string]*scriptCheck, len(c.regScripts)) - for k, v := range c.regScripts { - regScripts[k] = v - } - c.regScripts = map[string]*scriptCheck{} - - deregServices := make(map[string]struct{}, len(c.deregServices)) - for k := range c.deregServices { - deregServices[k] = mark - } - c.deregServices = map[string]struct{}{} - - deregChecks := make(map[string]struct{}, len(c.deregChecks)) - for k := range c.deregChecks { - deregChecks[k] = mark - } - c.deregChecks = map[string]struct{}{} - c.regLock.Unlock() + c.opsLock.Lock() + ops := c.pending + c.pending = newConsulOps() + c.opsLock.Unlock() var err error - regServiceN, regCheckN, deregServiceN, deregCheckN := len(regServices), len(regChecks), len(deregServices), len(deregChecks) + msg := ops.String() // Register Services - for id, service := range regServices { + for id, service := range ops.regServices { if err = c.client.ServiceRegister(service); err != nil { goto ERROR } - delete(regServices, id) + delete(ops.regServices, id) } // Register Checks - for id, check := range regChecks { + for id, check := range ops.regChecks { if err = c.client.CheckRegister(check); err != nil { goto ERROR } - delete(regChecks, id) + delete(ops.regChecks, id) // Run the script for this check if one exists - if script, ok := regScripts[id]; ok { + if script, ok := ops.regScripts[id]; ok { // This check is a script check; run it c.runningScripts[id] = script.run() } } // Deregister Checks - for id := range deregChecks { + for id := range ops.deregChecks { if h, ok := c.runningScripts[id]; ok { // This check is a script check; stop it h.cancel() @@ -252,63 +212,28 @@ func (c *ServiceClient) sync() error { if err = c.client.CheckDeregister(id); err != nil { goto ERROR } - delete(deregChecks, id) + delete(ops.deregChecks, id) } // Deregister Services - for id := range deregServices { + for id := range ops.deregServices { if err = c.client.ServiceDeregister(id); err != nil { goto ERROR } - delete(deregServices, id) + delete(ops.deregServices, id) } - c.logger.Printf("[DEBUG] consul: registered %d services / %d checks; deregisterd %d services / %d checks", regServiceN, regCheckN, deregServiceN, deregCheckN) + c.logger.Printf("[DEBUG] consul: %s", msg) return nil //TODO Labels and gotos are nasty; move to a function? ERROR: - // An error occurred, repopulate the operation maps omitting any keys - // that have been updated while sync() ran. - c.regLock.Lock() - for id, service := range regServices { - if _, ok := c.regServices[id]; ok { - continue - } - if _, ok := c.deregServices[id]; ok { - continue - } - c.regServices[id] = service - } - for id, check := range regChecks { - if _, ok := c.regChecks[id]; ok { - continue - } - if _, ok := c.deregChecks[id]; ok { - continue - } - c.regChecks[id] = check - } - for id, script := range regScripts { - if _, ok := c.regScripts[id]; ok { - // a new version of this script was added, drop this one - continue - } - c.regScripts[id] = script - } - for id, _ := range deregServices { - if _, ok := c.regServices[id]; ok { - continue - } - c.deregServices[id] = mark - } - for id, _ := range deregChecks { - if _, ok := c.regChecks[id]; ok { - continue - } - c.deregChecks[id] = mark - } - c.regLock.Unlock() + // An error occurred, repopulate the operation maps but give + // precendence to new ops + c.opsLock.Lock() + ops.merge(c.pending) + c.pending = ops + c.opsLock.Unlock() return err } @@ -317,10 +242,9 @@ ERROR: // // Agents will be deregistered when Shutdown is called. func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) error { - regs := make([]*api.AgentServiceRegistration, len(services)) - checks := make([]*api.AgentCheckRegistration, 0, len(services)) + ops := newConsulOps() - for i, service := range services { + for _, service := range services { id := makeAgentServiceID(role, service) host, rawport, err := net.SplitHostPort(service.PortLabel) if err != nil { @@ -337,7 +261,7 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) Address: host, Port: port, } - regs[i] = serviceReg + ops.regServices[id] = serviceReg for _, check := range service.Checks { checkID := createCheckID(id, check) @@ -360,22 +284,79 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) if err != nil { return fmt.Errorf("failed to add check %q: %v", check.Name, err) } - checks = append(checks, checkReg) + ops.regChecks[checkID] = checkReg } } // Now add them to the registration queue - c.enqueueRegs(regs, checks, nil) + c.opsLock.Lock() + c.pending.merge(ops) + c.opsLock.Unlock() // Record IDs for deregistering on shutdown c.agentLock.Lock() - for _, s := range regs { - c.agentServices[s.ID] = mark + for id := range ops.regServices { + c.agentServices[id] = mark } - for _, ch := range checks { - c.agentChecks[ch.ID] = mark + for id := range ops.regChecks { + c.agentChecks[id] = mark } c.agentLock.Unlock() + c.forceSync() + return nil +} + +type addrParser func(portLabel string) (string, int) + +// makeCheckReg adds a check reg to operations. +func (c *ServiceClient) makeCheckReg(ops *consulOps, check *structs.ServiceCheck, + service *api.AgentServiceRegistration, exec ScriptExecutor, parseAddr addrParser) error { + + checkID := createCheckID(service.ID, check) + if check.Type == structs.ServiceCheckScript { + if exec == nil { + return fmt.Errorf("driver doesn't support script checks") + } + ops.regScripts[checkID] = newScriptCheck( + checkID, check, exec, c.client, c.logger, c.shutdownCh) + } + host, port := service.Address, service.Port + if check.PortLabel != "" { + host, port = parseAddr(check.PortLabel) + } + checkReg, err := createCheckReg(service.ID, checkID, check, host, port) + if err != nil { + return fmt.Errorf("failed to add check %q: %v", check.Name, err) + } + ops.regChecks[checkID] = checkReg + return nil +} + +// serviceRegs creates service registrations, check registrations, and script +// checks from a service. +func (c *ServiceClient) serviceRegs(ops *consulOps, allocID string, service *structs.Service, + exec ScriptExecutor, task *structs.Task) error { + + id := makeTaskServiceID(allocID, task.Name, service) + host, port := task.FindHostAndPortFor(service.PortLabel) + serviceReg := &api.AgentServiceRegistration{ + ID: id, + Name: service.Name, + Tags: make([]string, len(service.Tags)), + Address: host, + Port: port, + } + // copy isn't strictly necessary but can avoid bugs especially + // with tests that may reuse Tasks + copy(serviceReg.Tags, service.Tags) + ops.regServices[id] = serviceReg + + for _, check := range service.Checks { + err := c.makeCheckReg(ops, check, serviceReg, exec, task.FindHostAndPortFor) + if err != nil { + return err + } + } return nil } @@ -384,48 +365,99 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) // // Actual communication with Consul is done asynchrously (see Run). func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec ScriptExecutor) error { - regs := make([]*api.AgentServiceRegistration, len(task.Services)) - checks := make([]*api.AgentCheckRegistration, 0, len(task.Services)*2) // just guess at size - var scriptChecks []*scriptCheck - - for i, service := range task.Services { - id := makeTaskServiceID(allocID, task.Name, service) - host, port := task.FindHostAndPortFor(service.PortLabel) - serviceReg := &api.AgentServiceRegistration{ - ID: id, - Name: service.Name, - Tags: make([]string, len(service.Tags)), - Address: host, - Port: port, + ops := newConsulOps() + for _, service := range task.Services { + if err := c.serviceRegs(ops, allocID, service, exec, task); err != nil { + return err } - // copy isn't strictly necessary but can avoid bugs especially - // with tests that may reuse Tasks - copy(serviceReg.Tags, service.Tags) - regs[i] = serviceReg - - for _, check := range service.Checks { - checkID := createCheckID(id, check) - if check.Type == structs.ServiceCheckScript { - if exec == nil { - return fmt.Errorf("driver %q doesn't support script checks", task.Driver) - } - scriptChecks = append(scriptChecks, newScriptCheck(checkID, check, exec, c.client, c.logger, c.shutdownCh)) - } - host, port := serviceReg.Address, serviceReg.Port - if check.PortLabel != "" { - host, port = task.FindHostAndPortFor(check.PortLabel) - } - checkReg, err := createCheckReg(id, checkID, check, host, port) - if err != nil { - return fmt.Errorf("failed to add check %q: %v", check.Name, err) - } - checks = append(checks, checkReg) - } - } // Now add them to the registration queue - c.enqueueRegs(regs, checks, scriptChecks) + c.opsLock.Lock() + c.pending.merge(ops) + c.opsLock.Unlock() + c.forceSync() + return nil +} + +// UpdateTask in Consul. Does not alter the service if only checks have +// changed. +func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, exec ScriptExecutor) error { + ops := newConsulOps() + + existingIDs := make(map[string]*structs.Service, len(existing.Services)) + for _, s := range existing.Services { + existingIDs[makeTaskServiceID(allocID, existing.Name, s)] = s + c.logger.Printf("[XXX] EXISTING: %s", makeTaskServiceID(allocID, existing.Name, s)) + } + newIDs := make(map[string]*structs.Service, len(newTask.Services)) + for _, s := range newTask.Services { + newIDs[makeTaskServiceID(allocID, newTask.Name, s)] = s + c.logger.Printf("[XXX] UPDATED : %s", makeTaskServiceID(allocID, newTask.Name, s)) + } + + parseAddr := newTask.FindHostAndPortFor + + // Loop over existing Service IDs to see if they have been removed or + // updated. + for existingID, existingSvc := range existingIDs { + newSvc, ok := newIDs[existingID] + if !ok { + // Existing sevice entry removed + ops.deregServices[existingID] = mark + for _, check := range existingSvc.Checks { + ops.deregChecks[createCheckID(existingID, check)] = mark + } + continue + } + + // Manipulating checks is cheap and easy, so just remove old and add new + for _, check := range existingSvc.Checks { + ops.deregChecks[createCheckID(existingID, check)] = mark + } + + // Register new checks + for _, check := range newSvc.Checks { + checkID := createCheckID(existingID, check) + // Don't deregister this check if it hasn't changed + delete(ops.deregChecks, checkID) + if check.Type == structs.ServiceCheckScript { + if exec == nil { + return fmt.Errorf("driver doesn't support script checks") + } + ops.regScripts[checkID] = newScriptCheck( + checkID, check, exec, c.client, c.logger, c.shutdownCh) + } + host, port := parseAddr(existingSvc.PortLabel) + if check.PortLabel != "" { + host, port = parseAddr(check.PortLabel) + } + checkReg, err := createCheckReg(existingID, checkID, check, host, port) + if err != nil { + return err + } + ops.regChecks[checkID] = checkReg + } + + // Service hasn't changed and checks are updated so don't + // process this service again later + delete(newIDs, existingID) + } + + // Any remaining services should just be enqueued directly + for _, newSvc := range newIDs { + err := c.serviceRegs(ops, allocID, newSvc, exec, newTask) + if err != nil { + return err + } + } + + // Finally enqueue the updates and force sync + c.opsLock.Lock() + c.pending.merge(ops) + c.opsLock.Unlock() + + c.forceSync() return nil } @@ -433,62 +465,20 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec Sc // // Actual communication with Consul is done asynchrously (see Run). func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) { - deregs := make([]string, len(task.Services)) - checks := make([]string, 0, len(task.Services)*2) // just guess at size + ops := newConsulOps() - for i, service := range task.Services { + for _, service := range task.Services { id := makeTaskServiceID(allocID, task.Name, service) - deregs[i] = id + ops.deregServices[id] = mark for _, check := range service.Checks { - checks = append(checks, createCheckID(id, check)) + ops.deregChecks[createCheckID(id, check)] = mark } } // Now add them to the deregistration fields; main Run loop will update - c.enqueueDeregs(deregs, checks) -} - -// enqueueRegs enqueues service and check registrations for the next time -// operations are sync'd to Consul. -func (c *ServiceClient) enqueueRegs(regs []*api.AgentServiceRegistration, checks []*api.AgentCheckRegistration, scriptChecks []*scriptCheck) { c.regLock.Lock() - for _, reg := range regs { - // Add reg - c.regServices[reg.ID] = reg - // Make sure it's not being removed - delete(c.deregServices, reg.ID) - } - for _, check := range checks { - // Add check - c.regChecks[check.ID] = check - // Make sure it's not being removed - delete(c.deregChecks, check.ID) - } - for _, script := range scriptChecks { - c.regScripts[script.id] = script - } - c.regLock.Unlock() - - c.forceSync() -} - -// enqueueDeregs enqueues service and check removals for the next time -// operations are sync'd to Consul. -func (c *ServiceClient) enqueueDeregs(deregs []string, checks []string) { - c.regLock.Lock() - for _, dereg := range deregs { - // Add dereg - c.deregServices[dereg] = mark - // Make sure it's not being added - delete(c.regServices, dereg) - } - for _, check := range checks { - // Add check for removal - c.deregChecks[check] = mark - // Make sure it's not being added - delete(c.regChecks, check) - } + c.pending.merge(ops) c.regLock.Unlock() c.forceSync() diff --git a/command/agent/consul/ops.go b/command/agent/consul/ops.go new file mode 100644 index 000000000..0592b9a48 --- /dev/null +++ b/command/agent/consul/ops.go @@ -0,0 +1,61 @@ +package consul + +import ( + "fmt" + + "github.com/hashicorp/consul/api" +) + +type consulOps struct { + // services and checks to be registered + regServices map[string]*api.AgentServiceRegistration + regChecks map[string]*api.AgentCheckRegistration + + // services and checks to be unregisterd + deregServices map[string]struct{} + deregChecks map[string]struct{} + + // script checks to be run() after their corresponding check is + // registered + regScripts map[string]*scriptCheck +} + +func newConsulOps() *consulOps { + return &consulOps{ + regServices: make(map[string]*api.AgentServiceRegistration), + regChecks: make(map[string]*api.AgentCheckRegistration), + deregServices: make(map[string]struct{}), + deregChecks: make(map[string]struct{}), + regScripts: make(map[string]*scriptCheck), + } +} + +// merge newer operations. New operations registrations override existing +// deregistrations. +func (c *consulOps) merge(newer *consulOps) { + for id, service := range newer.regServices { + delete(c.deregServices, id) + c.regServices[id] = service + } + for id, check := range newer.regChecks { + delete(c.deregChecks, id) + c.regChecks[id] = check + } + for id, script := range newer.regScripts { + c.regScripts[id] = script + } + for id, _ := range newer.deregServices { + delete(c.regServices, id) + c.deregServices[id] = mark + } + for id, _ := range newer.deregChecks { + delete(c.regChecks, id) + delete(c.regScripts, id) + c.deregChecks[id] = mark + } +} + +func (c *consulOps) String() string { + return fmt.Sprintf("registered %d services / %d checks; deregisterd %d services / %d checks", + len(c.regServices), len(c.regChecks), len(c.deregServices), len(c.deregChecks)) +} diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 59ea83d7b..16de58449 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -323,8 +323,8 @@ func TestConsul_ShutdownOK(t *testing.T) { } // Nothing should be enqueued anymore - enqueued := (len(ctx.ServiceClient.regServices) + len(ctx.ServiceClient.deregServices) + - len(ctx.ServiceClient.regChecks) + len(ctx.ServiceClient.deregChecks)) + enqueued := (len(ctx.ServiceClient.pending.regServices) + len(ctx.ServiceClient.pending.deregServices) + + len(ctx.ServiceClient.pending.regChecks) + len(ctx.ServiceClient.pending.deregChecks)) if enqueued > 0 { t.Errorf("%d operations still enqueued", enqueued) }