From 701a1bd9bcbbb9cc32a8a0aa71e112b60fcd762b Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 23 Mar 2016 13:19:45 -0700 Subject: [PATCH] Hooking up the executor in all drivers --- client/consul.go | 421 ----------------------------- client/consul_test.go | 364 ------------------------- client/driver/exec.go | 8 + client/driver/executor/executor.go | 4 +- client/driver/java.go | 10 +- client/driver/qemu.go | 8 + client/driver/raw_exec.go | 8 + client/driver/rkt.go | 8 + 8 files changed, 43 insertions(+), 788 deletions(-) delete mode 100644 client/consul.go delete mode 100644 client/consul_test.go diff --git a/client/consul.go b/client/consul.go deleted file mode 100644 index 8634c9693..000000000 --- a/client/consul.go +++ /dev/null @@ -1,421 +0,0 @@ -package client - -import ( - "crypto/tls" - "fmt" - "log" - "net/http" - "net/url" - "strings" - "sync" - "time" - - consul "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-multierror" - "github.com/hashicorp/nomad/nomad/structs" -) - -const ( - syncInterval = 5 * time.Second -) - -// consulApi is the interface which wraps the actual consul api client -type consulApi interface { - CheckRegister(check *consul.AgentCheckRegistration) error - CheckDeregister(checkID string) error - ServiceRegister(service *consul.AgentServiceRegistration) error - ServiceDeregister(ServiceID string) error - Services() (map[string]*consul.AgentService, error) - Checks() (map[string]*consul.AgentCheck, error) -} - -// consulApiClient is the actual implementation of the consulApi which -// talks to the consul agent -type consulApiClient struct { - client *consul.Client -} - -func (a *consulApiClient) CheckRegister(check *consul.AgentCheckRegistration) error { - return a.client.Agent().CheckRegister(check) -} - -func (a *consulApiClient) CheckDeregister(checkID string) error { - return a.client.Agent().CheckDeregister(checkID) -} - -func (a *consulApiClient) ServiceRegister(service *consul.AgentServiceRegistration) error { - return a.client.Agent().ServiceRegister(service) -} - -func (a *consulApiClient) ServiceDeregister(serviceID string) error { - return a.client.Agent().ServiceDeregister(serviceID) -} - -func (a *consulApiClient) Services() (map[string]*consul.AgentService, error) { - return a.client.Agent().Services() -} - -func (a *consulApiClient) Checks() (map[string]*consul.AgentCheck, error) { - return a.client.Agent().Checks() -} - -// trackedTask is a Task that we are tracking for changes in service and check -// definitions and keep them sycned with Consul Agent -type trackedTask struct { - task *structs.Task - alloc *structs.Allocation -} - -// ConsulService is the service which tracks tasks and syncs the services and -// checks defined in them with Consul Agent -type ConsulService struct { - client consulApi - logger *log.Logger - shutdownCh chan struct{} - node *structs.Node - - trackedTasks map[string]*trackedTask - serviceStates map[string]string - allocToService map[string][]string - trackedTaskLock sync.Mutex -} - -type consulServiceConfig struct { - logger *log.Logger - consulAddr string - token string - auth string - enableSSL bool - verifySSL bool - node *structs.Node -} - -// A factory method to create new consul service -func NewConsulService(config *consulServiceConfig) (*ConsulService, error) { - var err error - var c *consul.Client - cfg := consul.DefaultConfig() - cfg.Address = config.consulAddr - if config.token != "" { - cfg.Token = config.token - } - - if config.auth != "" { - var username, password string - if strings.Contains(config.auth, ":") { - split := strings.SplitN(config.auth, ":", 2) - username = split[0] - password = split[1] - } else { - username = config.auth - } - - cfg.HttpAuth = &consul.HttpBasicAuth{ - Username: username, - Password: password, - } - } - if config.enableSSL { - cfg.Scheme = "https" - } - if config.enableSSL && !config.verifySSL { - cfg.HttpClient.Transport = &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - } - - } - if c, err = consul.NewClient(cfg); err != nil { - return nil, err - } - - consulService := ConsulService{ - client: &consulApiClient{client: c}, - logger: config.logger, - node: config.node, - trackedTasks: make(map[string]*trackedTask), - serviceStates: make(map[string]string), - allocToService: make(map[string][]string), - shutdownCh: make(chan struct{}), - } - - return &consulService, nil -} - -// Register starts tracking a task for changes to it's services and tasks and -// adds/removes services and checks associated with it. -func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation) error { - var mErr multierror.Error - c.trackedTaskLock.Lock() - tt := &trackedTask{task: task, alloc: alloc} - c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt - - // Delete any previously registered service as the same alloc is being - // re-registered. - for _, service := range c.allocToService[alloc.ID] { - delete(c.serviceStates, service) - } - c.trackedTaskLock.Unlock() - - for _, service := range task.Services { - // Track the services this alloc is registering. - c.allocToService[alloc.ID] = append(c.allocToService[alloc.ID], service.Name) - - c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name) - if err := c.registerService(service, task, alloc); err != nil { - mErr.Errors = append(mErr.Errors, err) - } - } - - return mErr.ErrorOrNil() -} - -// Deregister stops tracking a task for changes to it's services and checks and -// removes all the services and checks associated with the Task -func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation) error { - var mErr multierror.Error - c.trackedTaskLock.Lock() - delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name)) - delete(c.allocToService, alloc.ID) - c.trackedTaskLock.Unlock() - for _, service := range task.Services { - serviceID := alloc.Services[service.Name] - if serviceID == "" { - continue - } - c.logger.Printf("[INFO] consul: deregistering service %v with consul", service.Name) - if err := c.deregisterService(serviceID); err != nil { - c.printLogMessage("[DEBUG] consul: error in deregistering service %v from consul", service.Name) - mErr.Errors = append(mErr.Errors, err) - } - } - return mErr.ErrorOrNil() -} - -func (c *ConsulService) ShutDown() { - close(c.shutdownCh) -} - -// SyncWithConsul is a long lived function that performs calls to sync -// checks and services periodically with Consul Agent -func (c *ConsulService) SyncWithConsul() { - sync := time.After(syncInterval) - - for { - select { - case <-sync: - c.performSync() - sync = time.After(syncInterval) - case <-c.shutdownCh: - c.logger.Printf("[INFO] consul: shutting down consul service") - return - } - } -} - -// performSync syncs checks and services with Consul and removed tracked -// services which are no longer present in tasks -func (c *ConsulService) performSync() { - // Get the list of the services and that Consul knows about - srvcs, err := c.client.Services() - if err != nil { - return - } - chks, err := c.client.Checks() - if err != nil { - return - } - - // Filter the services and checks that isn't managed by consul - consulServices := c.filterConsulServices(srvcs) - consulChecks := c.filterConsulChecks(chks) - - knownChecks := make(map[string]struct{}) - knownServices := make(map[string]struct{}) - - c.trackedTaskLock.Lock() - tasks := make([]*trackedTask, 0, len(c.trackedTasks)) - for _, trackedTask := range c.trackedTasks { - tasks = append(tasks, trackedTask) - } - c.trackedTaskLock.Unlock() - - // Add services and checks which Consul doesn't know about - for _, trackedTask := range tasks { - for _, service := range trackedTask.task.Services { - serviceID := trackedTask.alloc.Services[service.Name] - - // Add new services which Consul agent isn't aware of - knownServices[serviceID] = struct{}{} - if _, ok := consulServices[serviceID]; !ok { - c.printLogMessage("[INFO] consul: perform sync, registering service %s with consul.", service.Name) - c.registerService(service, trackedTask.task, trackedTask.alloc) - continue - } - - // If a service has changed, re-register it with Consul agent - if service.Hash() != c.serviceStates[serviceID] { - c.printLogMessage("[INFO] consul: perform sync hash change, reregistering service %s with consul.", service.Name) - c.registerService(service, trackedTask.task, trackedTask.alloc) - continue - } - - // Add new checks that Consul isn't aware of - for _, check := range service.Checks { - checkID := check.Hash(serviceID) - knownChecks[checkID] = struct{}{} - if _, ok := consulChecks[checkID]; !ok { - host, port := trackedTask.task.FindHostAndPortFor(service.PortLabel) - cr := c.makeCheck(serviceID, check, host, port) - if err := c.registerCheck(cr); err != nil { - c.printLogMessage("[DEBUG] consul: error registering check %q: %v", cr.ID, err) - } - - } - } - } - } - - // Remove services from the service tracker which no longer exists - for serviceID := range c.serviceStates { - if _, ok := knownServices[serviceID]; !ok { - delete(c.serviceStates, serviceID) - } - } - - // Remove services that are not present anymore - for _, consulService := range consulServices { - if _, ok := knownServices[consulService.ID]; !ok { - delete(c.serviceStates, consulService.ID) - c.printLogMessage("[INFO] consul: perform sync, deregistering service %v with consul", consulService.Service) - c.deregisterService(consulService.ID) - } - } - - // Remove checks that are not present anymore - for _, consulCheck := range consulChecks { - if _, ok := knownChecks[consulCheck.CheckID]; !ok { - c.deregisterCheck(consulCheck.CheckID) - } - } -} - -// registerService registers a Service with Consul -func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, alloc *structs.Allocation) error { - var mErr multierror.Error - host, port := task.FindHostAndPortFor(service.PortLabel) - if host == "" || port == 0 { - return fmt.Errorf("consul: the port:%q marked for registration of service: %q couldn't be found", service.PortLabel, service.Name) - } - serviceID := alloc.Services[service.Name] - c.serviceStates[serviceID] = service.Hash() - - asr := &consul.AgentServiceRegistration{ - ID: serviceID, - Name: service.Name, - Tags: service.Tags, - Port: port, - Address: host, - } - - if err := c.client.ServiceRegister(asr); err != nil { - c.printLogMessage("[DEBUG] consul: error while registering service %v with consul: %v", service.Name, err) - mErr.Errors = append(mErr.Errors, err) - } - for _, check := range service.Checks { - cr := c.makeCheck(serviceID, check, host, port) - if err := c.registerCheck(cr); err != nil { - c.printLogMessage("[DEBUG] consul: error while registering check %v with consul: %v", check.Name, err) - mErr.Errors = append(mErr.Errors, err) - } - - } - return mErr.ErrorOrNil() -} - -// registerCheck registers a check with Consul -func (c *ConsulService) registerCheck(check *consul.AgentCheckRegistration) error { - c.printLogMessage("[INFO] consul: registering check with ID: %s for service: %s", check.ID, check.ServiceID) - return c.client.CheckRegister(check) -} - -// deregisterCheck de-registers a check with a specific ID from Consul -func (c *ConsulService) deregisterCheck(checkID string) error { - c.printLogMessage("[INFO] consul: removing check with ID: %v", checkID) - return c.client.CheckDeregister(checkID) -} - -// deregisterService de-registers a Service with a specific id from Consul -func (c *ConsulService) deregisterService(serviceID string) error { - delete(c.serviceStates, serviceID) - if err := c.client.ServiceDeregister(serviceID); err != nil { - return err - } - return nil -} - -// makeCheck creates a Consul Check Registration struct -func (c *ConsulService) makeCheck(serviceID string, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration { - checkID := check.Hash(serviceID) - cr := &consul.AgentCheckRegistration{ - ID: checkID, - Name: check.Name, - ServiceID: serviceID, - } - cr.Interval = check.Interval.String() - cr.Timeout = check.Timeout.String() - - switch check.Type { - case structs.ServiceCheckHTTP: - if check.Protocol == "" { - check.Protocol = "http" - } - url := url.URL{ - Scheme: check.Protocol, - Host: fmt.Sprintf("%s:%d", ip, port), - Path: check.Path, - } - cr.HTTP = url.String() - case structs.ServiceCheckTCP: - cr.TCP = fmt.Sprintf("%s:%d", ip, port) - case structs.ServiceCheckScript: - cr.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types - } - return cr -} - -// filterConsulServices prunes out all the service whose ids are not prefixed -// with nomad- -func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService { - nomadServices := make(map[string]*consul.AgentService) - delete(srvcs, "consul") - for _, srv := range srvcs { - if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) { - nomadServices[srv.ID] = srv - } - } - return nomadServices - -} - -// filterConsulChecks prunes out all the consul checks which do not have -// services with id prefixed with noamd- -func (c *ConsulService) filterConsulChecks(chks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck { - nomadChecks := make(map[string]*consul.AgentCheck) - for _, chk := range chks { - if strings.HasPrefix(chk.ServiceID, structs.NomadConsulPrefix) { - nomadChecks[chk.CheckID] = chk - } - } - return nomadChecks - -} - -// printLogMessage prints log messages only when the node attributes have consul -// related information -func (c *ConsulService) printLogMessage(message string, v ...interface{}) { - if _, ok := c.node.Attributes["consul.version"]; ok { - c.logger.Println(fmt.Sprintf(message, v...)) - } -} diff --git a/client/consul_test.go b/client/consul_test.go deleted file mode 100644 index 83cf36fcd..000000000 --- a/client/consul_test.go +++ /dev/null @@ -1,364 +0,0 @@ -package client - -import ( - "fmt" - consul "github.com/hashicorp/consul/api" - "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" - "log" - "os" - "reflect" - "testing" - "time" -) - -type mockConsulApiClient struct { - serviceRegisterCallCount int - checkRegisterCallCount int - checkDeregisterCallCount int - serviceDeregisterCallCount int -} - -func (a *mockConsulApiClient) CheckRegister(check *consul.AgentCheckRegistration) error { - a.checkRegisterCallCount += 1 - return nil -} - -func (a *mockConsulApiClient) CheckDeregister(checkID string) error { - a.checkDeregisterCallCount += 1 - return nil -} - -func (a *mockConsulApiClient) ServiceRegister(service *consul.AgentServiceRegistration) error { - a.serviceRegisterCallCount += 1 - return nil -} - -func (a *mockConsulApiClient) ServiceDeregister(serviceID string) error { - a.serviceDeregisterCallCount += 1 - return nil -} - -func (a *mockConsulApiClient) Services() (map[string]*consul.AgentService, error) { - return make(map[string]*consul.AgentService), nil -} - -func (a *mockConsulApiClient) Checks() (map[string]*consul.AgentCheck, error) { - return make(map[string]*consul.AgentCheck), nil -} - -func newConsulService() *ConsulService { - logger := log.New(os.Stdout, "logger: ", log.Lshortfile) - c, _ := NewConsulService(&consulServiceConfig{logger, "", "", "", false, false, &structs.Node{}}) - c.client = &mockConsulApiClient{} - return c -} - -func newTask() *structs.Task { - var services []*structs.Service - return &structs.Task{ - Name: "redis", - Services: services, - Resources: &structs.Resources{ - Networks: []*structs.NetworkResource{ - { - IP: "10.10.0.1", - DynamicPorts: []structs.Port{{"db", 20413}}, - }, - }, - }, - } -} - -func TestConsul_MakeChecks(t *testing.T) { - t.Parallel() - service := &structs.Service{ - Name: "Bar", - Checks: []*structs.ServiceCheck{ - { - Type: "http", - Path: "/foo/bar", - Interval: 10 * time.Second, - Timeout: 2 * time.Second, - }, - { - Type: "http", - Protocol: "https", - Path: "/foo/bar", - Interval: 10 * time.Second, - Timeout: 2 * time.Second, - }, - { - Type: "tcp", - Interval: 10 * time.Second, - Timeout: 2 * time.Second, - }, - }, - } - - c := newConsulService() - serviceID := fmt.Sprintf("%s-1234", structs.NomadConsulPrefix) - - check1 := c.makeCheck(serviceID, service.Checks[0], "10.10.0.1", 8090) - check2 := c.makeCheck(serviceID, service.Checks[1], "10.10.0.1", 8090) - check3 := c.makeCheck(serviceID, service.Checks[2], "10.10.0.1", 8090) - - if check1.HTTP != "http://10.10.0.1:8090/foo/bar" { - t.Fatalf("Invalid http url for check: %v", check1.HTTP) - } - - if check2.HTTP != "https://10.10.0.1:8090/foo/bar" { - t.Fatalf("Invalid http url for check: %v", check2.HTTP) - } - - if check3.TCP != "10.10.0.1:8090" { - t.Fatalf("Invalid tcp check: %v", check3.TCP) - } -} - -func TestConsul_InvalidPortLabelForService(t *testing.T) { - t.Parallel() - task := &structs.Task{ - Name: "foo", - Driver: "docker", - Resources: &structs.Resources{ - CPU: 500, - MemoryMB: 1024, - DiskMB: 1024, - IOPS: 10, - Networks: []*structs.NetworkResource{ - { - Device: "eth0", - CIDR: "255.255.0.0/16", - MBits: 10, - ReservedPorts: []structs.Port{ - { - Label: "http", - Value: 8080, - }, - { - Label: "ssh", - Value: 2026, - }, - }, - }, - }, - }, - } - service := &structs.Service{ - Name: "foo", - Tags: []string{"a", "b"}, - PortLabel: "https", - Checks: make([]*structs.ServiceCheck, 0), - } - - c := newConsulService() - if err := c.registerService(service, task, mock.Alloc()); err == nil { - t.Fatalf("Service should be invalid") - } -} - -func TestConsul_Services_Deleted_From_Task(t *testing.T) { - t.Parallel() - c := newConsulService() - task := structs.Task{ - Name: "redis", - Services: []*structs.Service{ - &structs.Service{ - Name: "example-cache-redis", - Tags: []string{"global"}, - PortLabel: "db", - }, - }, - Resources: &structs.Resources{ - Networks: []*structs.NetworkResource{ - { - IP: "10.10.0.1", - DynamicPorts: []structs.Port{{"db", 20413}}, - }, - }, - }, - } - c.Register(&task, mock.Alloc()) - if len(c.serviceStates) != 1 { - t.Fatalf("Expected tracked services: %v, Actual: %v", 1, len(c.serviceStates)) - } - task.Services = []*structs.Service{} - - c.performSync() - if len(c.serviceStates) != 0 { - t.Fatalf("Expected tracked services: %v, Actual: %v", 0, len(c.serviceStates)) - } -} - -func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) { - t.Parallel() - c := newConsulService() - task := newTask() - s1 := structs.Service{ - Name: "example-cache-redis", - Tags: []string{"global"}, - PortLabel: "db", - } - task.Services = append(task.Services, &s1) - alloc := mock.Alloc() - serviceID := alloc.Services[s1.Name] - c.Register(task, alloc) - - s1.Tags = []string{"frontcache"} - - c.performSync() - - if len(c.serviceStates) != 1 { - t.Fatal("We should be tracking one service") - } - - if c.serviceStates[serviceID] != s1.Hash() { - t.Fatalf("Hash is %v, expected %v", c.serviceStates[serviceID], s1.Hash()) - } -} - -func TestConsul_AddCheck_To_Service(t *testing.T) { - t.Parallel() - apiClient := &mockConsulApiClient{} - c := newConsulService() - c.client = apiClient - task := newTask() - var checks []*structs.ServiceCheck - s1 := structs.Service{ - Name: "example-cache-redis", - Tags: []string{"global"}, - PortLabel: "db", - Checks: checks, - } - task.Services = append(task.Services, &s1) - c.Register(task, mock.Alloc()) - - check1 := structs.ServiceCheck{ - Name: "alive", - Type: "tcp", - Interval: 10 * time.Second, - Timeout: 5 * time.Second, - } - - s1.Checks = append(s1.Checks, &check1) - - c.performSync() - if apiClient.checkRegisterCallCount != 1 { - t.Fatalf("Expected number of check registrations: %v, Actual: %v", 1, apiClient.checkRegisterCallCount) - } -} - -func TestConsul_ModifyCheck(t *testing.T) { - t.Parallel() - apiClient := &mockConsulApiClient{} - c := newConsulService() - c.client = apiClient - task := newTask() - var checks []*structs.ServiceCheck - s1 := structs.Service{ - Name: "example-cache-redis", - Tags: []string{"global"}, - PortLabel: "db", - Checks: checks, - } - task.Services = append(task.Services, &s1) - c.Register(task, mock.Alloc()) - - check1 := structs.ServiceCheck{ - Name: "alive", - Type: "tcp", - Interval: 10 * time.Second, - Timeout: 5 * time.Second, - } - - s1.Checks = append(s1.Checks, &check1) - - c.performSync() - if apiClient.checkRegisterCallCount != 1 { - t.Fatalf("Expected number of check registrations: %v, Actual: %v", 1, apiClient.checkRegisterCallCount) - } - - check1.Timeout = 2 * time.Second - c.performSync() - if apiClient.checkRegisterCallCount != 2 { - t.Fatalf("Expected number of check registrations: %v, Actual: %v", 2, apiClient.checkRegisterCallCount) - } -} - -func TestConsul_FilterNomadServicesAndChecks(t *testing.T) { - t.Parallel() - c := newConsulService() - srvs := map[string]*consul.AgentService{ - "foo-bar": { - ID: "foo-bar", - Service: "http-frontend", - Tags: []string{"global"}, - Port: 8080, - Address: "10.10.1.11", - }, - "nomad-registered-service-2121212": { - ID: "nomad-registered-service-2121212", - Service: "identity-service", - Tags: []string{"global"}, - Port: 8080, - Address: "10.10.1.11", - }, - } - - expSrvcs := map[string]*consul.AgentService{ - "nomad-registered-service-2121212": { - ID: "nomad-registered-service-2121212", - Service: "identity-service", - Tags: []string{"global"}, - Port: 8080, - Address: "10.10.1.11", - }, - } - - nomadServices := c.filterConsulServices(srvs) - if !reflect.DeepEqual(expSrvcs, nomadServices) { - t.Fatalf("Expected: %v, Actual: %v", expSrvcs, nomadServices) - } - - nomadServices = c.filterConsulServices(nil) - if len(nomadServices) != 0 { - t.Fatalf("Expected number of services: %v, Actual: %v", 0, len(nomadServices)) - } - - chks := map[string]*consul.AgentCheck{ - "foo-bar-chk": { - CheckID: "foo-bar-chk", - ServiceID: "foo-bar", - Name: "alive", - }, - "212121212": { - CheckID: "212121212", - ServiceID: "nomad-registered-service-2121212", - Name: "ping", - }, - } - - expChks := map[string]*consul.AgentCheck{ - "212121212": { - CheckID: "212121212", - ServiceID: "nomad-registered-service-2121212", - Name: "ping", - }, - } - - nomadChecks := c.filterConsulChecks(chks) - if !reflect.DeepEqual(expChks, nomadChecks) { - t.Fatalf("Expected: %v, Actual: %v", expChks, nomadChecks) - } - - if len(nomadChecks) != 1 { - t.Fatalf("Expected number of checks: %v, Actual: %v", 1, len(nomadChecks)) - } - - nomadChecks = c.filterConsulChecks(nil) - if len(nomadChecks) != 0 { - t.Fatalf("Expected number of checks: %v, Actual: %v", 0, len(nomadChecks)) - } - -} diff --git a/client/driver/exec.go b/client/driver/exec.go index aeaf5f8f8..45cdf8191 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -133,6 +133,9 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } + if err := exec.RegisterServices(); err != nil { + d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %v", task) + } go h.run() return h, nil } @@ -269,6 +272,11 @@ func (h *execHandle) run() { } h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err) close(h.waitCh) + // Remove services + if err := h.executor.DeregisterServices(); err != nil { + h.logger.Printf("[ERR] driver.exec: failed to deregister services: %v", err) + } + if err := h.executor.Exit(); err != nil { h.logger.Printf("[ERR] driver.exec: error destroying executor: %v", err) } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index c982d13fd..2827c32ef 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -342,7 +342,7 @@ func (e *UniversalExecutor) ShutDown() error { } func (e *UniversalExecutor) RegisterServices() error { - e.logger.Printf("registering services") + e.logger.Printf("executor: registering services") if e.consulService == nil { cs, err := consul.NewConsulService(e.ctx.ConsulConfig, e.logger) if err != nil { @@ -351,11 +351,11 @@ func (e *UniversalExecutor) RegisterServices() error { e.consulService = cs } err := e.consulService.SyncTask(e.ctx.Task) - e.logger.Printf("Finished registering services") return err } func (e *UniversalExecutor) DeregisterServices() error { + e.logger.Printf("executor: de-registering services and shutting down consul service") if e.consulService != nil { return e.consulService.Shutdown() } diff --git a/client/driver/java.go b/client/driver/java.go index 8a52d4beb..6a4ee76d0 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -194,7 +194,9 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } - + if err := h.executor.RegisterServices(); err != nil { + d.logger.Printf("[ERR] driver.java: error registering services with consul for task: %v", task) + } go h.run() return h, nil } @@ -340,6 +342,12 @@ func (h *javaHandle) run() { } h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + + // Remove services + if err := h.executor.DeregisterServices(); err != nil { + h.logger.Printf("[ERR] driver.java: failed to kill the deregister services: %v", err) + } + h.executor.Exit() h.pluginClient.Kill() } diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 7b25e6df6..6f94ed469 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -218,6 +218,9 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, waitCh: make(chan *cstructs.WaitResult, 1), } + if err := h.executor.RegisterServices(); err != nil { + h.logger.Printf("[ERR] driver.qemu: error registering services: %v", err) + } go h.run() return h, nil } @@ -335,6 +338,11 @@ func (h *qemuHandle) run() { close(h.doneCh) h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + // Remove services + if err := h.executor.DeregisterServices(); err != nil { + h.logger.Printf("[ERR] driver.qemu: failed to deregister services: %v", err) + } + h.executor.Exit() h.pluginClient.Kill() } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index ae58ff6f2..a4007c54c 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -122,6 +122,9 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } + if err := h.executor.RegisterServices(); err != nil { + h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul: %v", err) + } go h.run() return h, nil } @@ -236,6 +239,11 @@ func (h *rawExecHandle) run() { } h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + // Remove services + if err := h.executor.DeregisterServices(); err != nil { + h.logger.Printf("[ERR] driver.raw_exec: failed to deregister services: %v", err) + } + if err := h.executor.Exit(); err != nil { h.logger.Printf("[ERR] driver.raw_exec: error killing executor: %v", err) } diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 91bad22dc..599cf5ea4 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -263,6 +263,9 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } + if h.executor.RegisterServices(); err != nil { + h.logger.Printf("[ERR] driver.rkt: error registering services: %v", err) + } go h.run() return h, nil } @@ -358,6 +361,11 @@ func (h *rktHandle) run() { } h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err) close(h.waitCh) + // Remove services + if err := h.executor.DeregisterServices(); err != nil { + h.logger.Printf("[ERR] driver.rkt: failed to deregister services: %v", err) + } + if err := h.executor.Exit(); err != nil { h.logger.Printf("[ERR] driver.rkt: error killing executor: %v", err) }