diff --git a/client/client.go b/client/client.go index bd6a20216..8b753573b 100644 --- a/client/client.go +++ b/client/client.go @@ -1173,15 +1173,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error { // setupConsulClient creates a ConsulService func (c *Client) setupConsulClient() error { - cfg := consul.ConsulConfig{ - Addr: c.config.ReadDefault("consul.address", "127.0.0.1:8500"), - Token: c.config.Read("consul.token"), - Auth: c.config.Read("consul.auth"), - EnableSSL: c.config.ReadBoolDefault("consul.ssl", false), - VerifySSL: c.config.ReadBoolDefault("consul.verifyssl", true), - } - - cs, err := consul.NewConsulService(&cfg, c.logger, "") + cs, err := consul.NewConsulService(c.config.ConsulConfig, c.logger) c.consulService = cs return err } @@ -1216,12 +1208,14 @@ func (c *Client) syncConsul() { if taskState.State == structs.TaskStateRunning { if tr, ok := ar.tasks[taskName]; ok { for _, service := range tr.task.Services { - services[service.ID(ar.alloc.ID, tr.task.Name)] = struct{}{} + svcIdentifier := fmt.Sprintf("%s-%s", ar.alloc.ID, tr.task.Name) + services[service.ID(svcIdentifier)] = struct{}{} } } } } } + if err := c.consulService.KeepServices(services); err != nil { c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) } diff --git a/client/client_test.go b/client/client_test.go index 462a600fb..c429527ad 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -71,6 +72,7 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) { func testClient(t *testing.T, cb func(c *config.Config)) *Client { conf := DefaultConfig() conf.DevMode = true + conf.ConsulConfig = &consul.ConsulConfig{} if cb != nil { cb(conf) } diff --git a/client/config/config.go b/client/config/config.go index f5a0a6b0b..2b8a3c4aa 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/nomad/structs" ) @@ -108,6 +109,9 @@ type Config struct { // Revision is the commit number of the Nomad client Revision string + + // ConsulConfig is the configuration to connect with Consul Agent + ConsulConfig *consul.ConsulConfig } func (c *Config) Copy() *Config { diff --git a/client/consul/sync.go b/client/consul/sync.go index b1bc11ffa..08da6e89d 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -22,10 +22,10 @@ type ConsulService struct { client *consul.Client availble bool - task *structs.Task - allocID string - delegateChecks map[string]struct{} - createCheck func(*structs.ServiceCheck, string) (Check, error) + serviceIdentifier string // serviceIdentifier is a token which identifies which task/alloc the service belongs to + delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul + createCheck func(*structs.ServiceCheck, string) (Check, error) + addrFinder func(portLabel string) (string, int) trackedServices map[string]*consul.AgentService trackedChecks map[string]*consul.AgentCheckRegistration @@ -60,7 +60,7 @@ const ( ) // NewConsulService returns a new ConsulService -func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) (*ConsulService, error) { +func NewConsulService(config *ConsulConfig, logger *log.Logger) (*ConsulService, error) { var err error var c *consul.Client cfg := consul.DefaultConfig() @@ -114,7 +114,6 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) } consulService := ConsulService{ client: c, - allocID: allocID, logger: logger, trackedServices: make(map[string]*consul.AgentService), trackedChecks: make(map[string]*consul.AgentCheckRegistration), @@ -133,15 +132,26 @@ func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, c return c } -// SyncTask sync the services and task with consul -func (c *ConsulService) SyncTask(task *structs.Task) error { +// SetAddrFinder sets a function to find the host and port for a Service given its port label +func (c *ConsulService) SetAddrFinder(addrFinder func(string) (string, int)) *ConsulService { + c.addrFinder = addrFinder + return c +} + +// SetServiceIdentifier sets the identifier of the services we are syncing with Consul +func (c *ConsulService) SetServiceIdentifier(serviceIdentifier string) *ConsulService { + c.serviceIdentifier = serviceIdentifier + return c +} + +// SyncServices sync the services with consul +func (c *ConsulService) SyncServices(services []*structs.Service) error { var mErr multierror.Error - c.task = task taskServices := make(map[string]*consul.AgentService) taskChecks := make(map[string]*consul.AgentCheckRegistration) // Register Services and Checks that we don't know about or has changed - for _, service := range task.Services { + for _, service := range services { srv, err := c.createService(service) if err != nil { mErr.Errors = append(mErr.Errors, err) @@ -296,11 +306,11 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con // createService creates a Consul AgentService from a Nomad Service func (c *ConsulService) createService(service *structs.Service) (*consul.AgentService, error) { srv := consul.AgentService{ - ID: service.ID(c.allocID, c.task.Name), + ID: service.ID(c.serviceIdentifier), Service: service.Name, Tags: service.Tags, } - host, port := c.task.FindHostAndPortFor(service.PortLabel) + host, port := c.addrFinder(service.PortLabel) if host != "" { srv.Address = host } @@ -350,7 +360,7 @@ func (c *ConsulService) PeriodicSync() { case <-sync.C: if err := c.performSync(); err != nil { if c.availble { - c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.task.Name, err) + c.logger.Printf("[DEBUG] consul: error in syncing services for %q: %v", c.serviceIdentifier, err) } c.availble = false } else { @@ -358,7 +368,7 @@ func (c *ConsulService) PeriodicSync() { } case <-c.shutdownCh: sync.Stop() - c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.task.Name) + c.logger.Printf("[INFO] consul: shutting down sync for %q", c.serviceIdentifier) return } } @@ -401,7 +411,8 @@ func (c *ConsulService) performSync() error { func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService { nomadServices := make(map[string]*consul.AgentService) for _, srv := range srvcs { - if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) { + if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) && + !strings.HasPrefix(srv.ID, structs.AgentServicePrefix) { nomadServices[srv.ID] = srv } } @@ -455,3 +466,9 @@ func (c *ConsulService) runCheck(check Check) { } } } + +// GenerateServiceIdentifier returns a service identifier based on an allocation +// id and task name +func GenerateServiceIdentifier(allocID string, taskName string) string { + return fmt.Sprintf("%s-%s", taskName, allocID) +} diff --git a/client/consul/sync_test.go b/client/consul/sync_test.go index bb1acfc3c..80907c787 100644 --- a/client/consul/sync_test.go +++ b/client/consul/sync_test.go @@ -12,6 +12,10 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + allocID = "12" +) + var ( logger = log.New(os.Stdout, "", log.LstdFlags) check1 = structs.ServiceCheck{ @@ -37,8 +41,7 @@ var ( ) func TestConsulServiceRegisterServices(t *testing.T) { - allocID := "12" - cs, err := NewConsulService(&ConsulConfig{}, logger, allocID) + cs, err := NewConsulService(&ConsulConfig{}, logger) if err != nil { t.Fatalf("Err: %v", err) } @@ -47,13 +50,15 @@ func TestConsulServiceRegisterServices(t *testing.T) { return } task := mockTask() - if err := cs.SyncTask(task); err != nil { + cs.SetServiceIdentifier(GenerateServiceIdentifier(allocID, task.Name)) + cs.SetAddrFinder(task.FindHostAndPortFor) + if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) } defer cs.Shutdown() - service1ID := service1.ID(allocID, task.Name) - service2ID := service2.ID(allocID, task.Name) + service1ID := service1.ID(GenerateServiceIdentifier(allocID, task.Name)) + service2ID := service2.ID(GenerateServiceIdentifier(allocID, task.Name)) if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil { t.Fatalf("err : %v", err) } @@ -63,8 +68,7 @@ func TestConsulServiceRegisterServices(t *testing.T) { } func TestConsulServiceUpdateService(t *testing.T) { - allocID := "12" - cs, err := NewConsulService(&ConsulConfig{}, logger, allocID) + cs, err := NewConsulService(&ConsulConfig{}, logger) if err != nil { t.Fatalf("Err: %v", err) } @@ -74,7 +78,9 @@ func TestConsulServiceUpdateService(t *testing.T) { } task := mockTask() - if err := cs.SyncTask(task); err != nil { + cs.SetServiceIdentifier(GenerateServiceIdentifier(allocID, task.Name)) + cs.SetAddrFinder(task.FindHostAndPortFor) + if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) } defer cs.Shutdown() @@ -82,12 +88,12 @@ func TestConsulServiceUpdateService(t *testing.T) { //Update Service defn 1 newTags := []string{"tag3"} task.Services[0].Tags = newTags - if err := cs.SyncTask(task); err != nil { + if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) } // Make sure all the services and checks are still present - service1ID := service1.ID(allocID, task.Name) - service2ID := service2.ID(allocID, task.Name) + service1ID := service1.ID(GenerateServiceIdentifier(allocID, task.Name)) + service2ID := service2.ID(GenerateServiceIdentifier(allocID, task.Name)) if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil { t.Fatalf("err : %v", err) } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 664739f51..c95437dca 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -317,7 +317,7 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { // Re-syncing task with consul service if e.consulService != nil { - if err := e.consulService.SyncTask(task); err != nil { + if err := e.consulService.SyncServices(task.Services); err != nil { return err } } @@ -431,17 +431,19 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { e.logger.Printf("[INFO] executor: registering services") e.consulCtx = ctx if e.consulService == nil { - cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger, e.ctx.AllocID) + cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger) if err != nil { return err } cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck) + cs.SetServiceIdentifier(consul.GenerateServiceIdentifier(e.ctx.AllocID, e.ctx.Task.Name)) + cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) e.consulService = cs } if e.ctx != nil { e.interpolateServices(e.ctx.Task) } - err := e.consulService.SyncTask(e.ctx.Task) + err := e.consulService.SyncServices(e.ctx.Task.Services) go e.consulService.PeriodicSync() return err } diff --git a/command/agent/agent.go b/command/agent/agent.go index db54da287..78c463913 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -8,11 +8,13 @@ import ( "os" "path/filepath" "runtime" + "strconv" "sync" "time" "github.com/hashicorp/nomad/client" clientconfig "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" ) @@ -27,6 +29,11 @@ type Agent struct { logger *log.Logger logOutput io.Writer + consulService *consul.ConsulService // consulService registers the Nomad agent with the consul agent + consulConfig *consul.ConsulConfig // consulConfig is the consul configuration the Nomad client uses to connect with Consul agent + serverHTTPAddr string + clientHTTPAddr string + server *nomad.Server client *client.Client @@ -49,6 +56,10 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) { shutdownCh: make(chan struct{}), } + // creating the consul client configuration that both the server and client + // uses + a.createConsulConfig() + if err := a.setupServer(); err != nil { return nil, err } @@ -58,6 +69,12 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) { if a.client == nil && a.server == nil { return nil, fmt.Errorf("must have at least client or server mode enabled") } + if err := a.syncAgentServicesWithConsul(a.serverHTTPAddr, a.clientHTTPAddr); err != nil { + a.logger.Printf("[ERR] agent: unable to sync agent services with consul: %v", err) + } + if a.consulService != nil { + go a.consulService.PeriodicSync() + } return a, nil } @@ -140,6 +157,10 @@ func (a *Agent) serverConfig() (*nomad.Config, error) { if port := a.config.Ports.Serf; port != 0 { conf.SerfConfig.MemberlistConfig.BindPort = port } + a.serverHTTPAddr = fmt.Sprintf("%v:%v", a.config.Addresses.HTTP, a.config.Ports.HTTP) + if a.config.AdvertiseAddrs.HTTP != "" { + a.serverHTTPAddr = a.config.AdvertiseAddrs.HTTP + } if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" { dur, err := time.ParseDuration(gcThreshold) @@ -226,6 +247,7 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) { httpAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port) } conf.Node.HTTPAddr = httpAddr + a.clientHTTPAddr = httpAddr // Reserve resources on the node. r := conf.Node.Reserved @@ -242,6 +264,8 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) { conf.Version = fmt.Sprintf("%s%s", a.config.Version, a.config.VersionPrerelease) conf.Revision = a.config.Revision + conf.ConsulConfig = a.consulConfig + return conf, nil } @@ -403,6 +427,12 @@ func (a *Agent) Shutdown() error { } } + if a.consulService != nil { + if err := a.consulService.Shutdown(); err != nil { + a.logger.Printf("[ERR] agent: shutting down consul service failed: %v", err) + } + } + a.logger.Println("[INFO] agent: shutdown complete") a.shutdown = true close(a.shutdownCh) @@ -445,3 +475,60 @@ func (a *Agent) Stats() map[string]map[string]string { } return stats } + +func (a *Agent) createConsulConfig() { + cfg := &consul.ConsulConfig{ + Addr: a.config.ConsulConfig.Addr, + Token: a.config.ConsulConfig.Token, + Auth: a.config.ConsulConfig.Auth, + EnableSSL: a.config.ConsulConfig.EnableSSL, + VerifySSL: a.config.ConsulConfig.VerifySSL, + CAFile: a.config.ConsulConfig.CAFile, + CertFile: a.config.ConsulConfig.CertFile, + KeyFile: a.config.ConsulConfig.KeyFile, + } + a.consulConfig = cfg +} + +// syncAgentServicesWithConsul syncs the client and server services with Consul +func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAddr string) error { + cs, err := consul.NewConsulService(a.consulConfig, a.logger) + if err != nil { + return err + } + a.consulService = cs + var services []*structs.Service + if a.client != nil && a.config.ConsulConfig.ClientServiceName != "" { + if err != nil { + return err + } + clientService := &structs.Service{ + Name: a.config.ConsulConfig.ClientServiceName, + PortLabel: clientHttpAddr, + } + services = append(services, clientService) + cs.SetServiceIdentifier("agent-client") + } + if a.server != nil && a.config.ConsulConfig.ServerServiceName != "" { + serverService := &structs.Service{ + Name: a.config.ConsulConfig.ServerServiceName, + PortLabel: serverHttpAddr, + } + services = append(services, serverService) + cs.SetServiceIdentifier("agent-server") + } + + cs.SetAddrFinder(func(portLabel string) (string, int) { + host, port, err := net.SplitHostPort(portLabel) + if err != nil { + return "", 0 + } + p, err := strconv.Atoi(port) + if err != nil { + return "", 0 + } + return host, p + }) + + return cs.SyncServices(services) +} diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 91a352758..ae470bf62 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -42,6 +42,7 @@ func makeAgent(t *testing.T, cb func(*Config)) (string, *Agent) { Serf: getPort(), } conf.NodeName = fmt.Sprintf("Node %d", conf.Ports.RPC) + conf.ConsulConfig = &ConsulConfig{} // Tighten the Serf timing config.SerfConfig.MemberlistConfig.SuspicionMult = 2 diff --git a/command/agent/command.go b/command/agent/command.go index 1ce19a933..d414808fc 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -58,10 +58,11 @@ func (c *Command) readConfig() *Config { // Make a new, empty config. cmdConfig := &Config{ - Atlas: &AtlasConfig{}, - Client: &ClientConfig{}, - Ports: &Ports{}, - Server: &ServerConfig{}, + Atlas: &AtlasConfig{}, + ConsulConfig: &ConsulConfig{}, + Client: &ClientConfig{}, + Ports: &Ports{}, + Server: &ServerConfig{}, } flags := flag.NewFlagSet("agent", flag.ContinueOnError) diff --git a/command/agent/config-test-fixtures/basic.hcl b/command/agent/config-test-fixtures/basic.hcl index 34eb66818..b62bc0804 100644 --- a/command/agent/config-test-fixtures/basic.hcl +++ b/command/agent/config-test-fixtures/basic.hcl @@ -81,3 +81,17 @@ atlas { http_api_response_headers { Access-Control-Allow-Origin = "*" } +consul { + server_service_name = "nomad-server" + client_service_name = "nomad-client" + addr = "127.0.0.1:9500" + token = "token1" + auth = "username:pass" + ssl = true + verify_ssl = false + ca_file = "/path/to/ca/file" + cert_file = "/path/to/cert/file" + key_file = "/path/to/key/file" + server_auto_join = true + client_auto_join = true +} diff --git a/command/agent/config.go b/command/agent/config.go index 6137ea4f0..7a0a47cba 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -82,6 +82,10 @@ type Config struct { // AtlasConfig is used to configure Atlas Atlas *AtlasConfig `mapstructure:"atlas"` + // ConsulConfig is used to configure Consul clients and register the nomad + // server and client services with Consul + ConsulConfig *ConsulConfig `mapstructure:"consul"` + // NomadConfig is used to override the default config. // This is largly used for testing purposes. NomadConfig *nomad.Config `mapstructure:"-" json:"-"` @@ -124,6 +128,57 @@ type AtlasConfig struct { Endpoint string `mapstructure:"endpoint"` } +// ConsulConfig is used to configure Consul clients and register the nomad +// server and client services with Consul +type ConsulConfig struct { + + // ServerServiceName is the name of the service that Nomad uses to register + // servers with Consul + ServerServiceName string `mapstructure:"server_service_name"` + + // ClientServiceName is the name of the service that Nomad uses to register + // clients with Consul + ClientServiceName string `mapstructure:"client_service_name"` + + // AutoRegister determines if Nomad will register the Nomad client and + // server agents with Consul + AutoRegister bool `mapstructure:"auto_register"` + + // Addr is the address of the local Consul agent + Addr string `mapstructure:"addr"` + + // Token is used to provide a per-request ACL token.This options overrides + // the agent's default token + Token string `mapstructure:"token"` + + // Auth is the information to use for http access to Consul agent + Auth string `mapstructure:"auth"` + + // EnableSSL sets the transport scheme to talk to the Consul agent as https + EnableSSL bool `mapstructure:"ssl"` + + // VerifySSL enables or disables SSL verification when the transport scheme + // for the consul api client is https + VerifySSL bool `mapstructure:"verify_ssl"` + + // CAFile is the path to the ca certificate used for Consul communication + CAFile string `mapstructure:"ca_file"` + + // CertFile is the path to the certificate for Consul communication + CertFile string `mapstructure:"cert_file"` + + // KeyFile is the path to the private key for Consul communication + KeyFile string `mapstructure:"key_file"` + + // ServerAutoJoin enables Nomad servers to find peers by querying Consul and + // joining them + ServerAutoJoin bool `mapstructure:"server_auto_join"` + + // ClientAutoJoin enables Nomad servers to find addresses of Nomad servers + // and register with them + ClientAutoJoin bool `mapstructure:"client_auto_join"` +} + // ClientConfig is configuration specific to the client mode type ClientConfig struct { // Enabled controls if we are a client @@ -368,6 +423,11 @@ func DefaultConfig() *Config { Addresses: &Addresses{}, AdvertiseAddrs: &AdvertiseAddrs{}, Atlas: &AtlasConfig{}, + ConsulConfig: &ConsulConfig{ + ServerServiceName: "nomad-server", + ClientServiceName: "nomad-client", + AutoRegister: true, + }, Client: &ClientConfig{ Enabled: false, NetworkSpeed: 100, @@ -512,6 +572,14 @@ func (c *Config) Merge(b *Config) *Config { result.Atlas = result.Atlas.Merge(b.Atlas) } + // Apply the Consul Configuration + if result.ConsulConfig == nil && b.ConsulConfig != nil { + consulConfig := *b.ConsulConfig + result.ConsulConfig = &consulConfig + } else if b.ConsulConfig != nil { + result.ConsulConfig = result.ConsulConfig.Merge(b.ConsulConfig) + } + // Merge config files lists result.Files = append(result.Files, b.Files...) @@ -710,6 +778,49 @@ func (a *AtlasConfig) Merge(b *AtlasConfig) *AtlasConfig { return &result } +// Merge merges two Consul Configurations together. +func (a *ConsulConfig) Merge(b *ConsulConfig) *ConsulConfig { + result := *a + + if b.ServerServiceName != "" { + result.ServerServiceName = b.ServerServiceName + } + if b.ClientServiceName != "" { + result.ClientServiceName = b.ClientServiceName + } + if b.Addr != "" { + result.Addr = b.Addr + } + if b.Token != "" { + result.Token = b.Token + } + if b.Auth != "" { + result.Auth = b.Auth + } + if b.EnableSSL { + result.EnableSSL = true + } + if b.VerifySSL { + result.VerifySSL = true + } + if b.CAFile != "" { + result.CAFile = b.CAFile + } + if b.CertFile != "" { + result.CertFile = b.CertFile + } + if b.KeyFile != "" { + result.KeyFile = b.KeyFile + } + if b.ServerAutoJoin { + result.ServerAutoJoin = true + } + if b.ClientAutoJoin { + result.ClientAutoJoin = true + } + return &result +} + func (r *Resources) Merge(b *Resources) *Resources { result := *r if b.CPU != 0 { diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 99bd06f07..69b6a626d 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -90,6 +90,7 @@ func parseConfig(result *Config, list *ast.ObjectList) error { "disable_update_check", "disable_anonymous_signature", "atlas", + "consul", "http_api_response_headers", } if err := checkHCLKeys(list, valid); err != nil { @@ -109,6 +110,7 @@ func parseConfig(result *Config, list *ast.ObjectList) error { delete(m, "server") delete(m, "telemetry") delete(m, "atlas") + delete(m, "consul") delete(m, "http_api_response_headers") // Decode the rest @@ -165,6 +167,13 @@ func parseConfig(result *Config, list *ast.ObjectList) error { } } + // Parse the consul config + if o := list.Filter("consul"); len(o.Items) > 0 { + if err := parseConsulConfig(&result.ConsulConfig, o); err != nil { + return multierror.Prefix(err, "consul ->") + } + } + // Parse out http_api_response_headers fields. These are in HCL as a list so // we need to iterate over them and merge them. if headersO := list.Filter("http_api_response_headers"); len(headersO.Items) > 0 { @@ -530,6 +539,49 @@ func parseAtlas(result **AtlasConfig, list *ast.ObjectList) error { return nil } +func parseConsulConfig(result **ConsulConfig, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'consul' block allowed") + } + + // Get our consul object + listVal := list.Items[0].Val + + // Check for invalid keys + valid := []string{ + "server_service_name", + "client_service_name", + "addr", + "token", + "auth", + "ssl", + "verify_ssl", + "ca_file", + "cert_file", + "key_file", + "client_auto_join", + "server_auto_join", + } + + if err := checkHCLKeys(listVal, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, listVal); err != nil { + return err + } + + var consulConfig ConsulConfig + if err := mapstructure.WeakDecode(m, &consulConfig); err != nil { + return err + } + + *result = &consulConfig + return nil +} + func checkHCLKeys(node ast.Node, valid []string) error { var list *ast.ObjectList switch n := node.(type) { diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 7c5c3c468..7b9f3d5b6 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -96,6 +96,20 @@ func TestConfig_Parse(t *testing.T) { Join: true, Endpoint: "127.0.0.1:1234", }, + ConsulConfig: &ConsulConfig{ + ServerServiceName: "nomad-server", + ClientServiceName: "nomad-client", + Addr: "127.0.0.1:9500", + Token: "token1", + Auth: "username:pass", + EnableSSL: true, + VerifySSL: false, + CAFile: "/path/to/ca/file", + CertFile: "/path/to/cert/file", + KeyFile: "/path/to/key/file", + ServerAutoJoin: true, + ClientAutoJoin: true, + }, HTTPAPIResponseHeaders: map[string]string{ "Access-Control-Allow-Origin": "*", }, diff --git a/command/check.go b/command/check.go new file mode 100644 index 000000000..0eba98c1e --- /dev/null +++ b/command/check.go @@ -0,0 +1,135 @@ +package command + +import ( + "fmt" + "reflect" + "strconv" + "strings" + "time" +) + +const ( + HealthCritical = 2 + HealthWarn = 1 + HealthPass = 0 + HealthUnknown = 3 +) + +type AgentCheckCommand struct { + Meta +} + +func (c *AgentCheckCommand) Help() string { + helpText := ` +Usage: nomad check + + Display state of the Nomad agent. The exit code of the command is Nagios + compatible and could be used with alerting systems. + +General Options: + + ` + generalOptionsUsage() + ` + +Agent Check Options: + + -min-peers + Minimum number of peers that a server is expected to know. + + -min-servers + Minumum number of servers that a client is expected to know. +` + + return strings.TrimSpace(helpText) +} + +func (c *AgentCheckCommand) Synopsis() string { + return "Displays health of the local Nomad agent" +} + +func (c *AgentCheckCommand) Run(args []string) int { + var minPeers, minServers int + + flags := c.Meta.FlagSet("check", FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.IntVar(&minPeers, "min-peers", 0, "") + flags.IntVar(&minServers, "min-servers", 1, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("error initializing client: %s", err)) + return HealthCritical + } + + info, err := client.Agent().Self() + if err != nil { + c.Ui.Output(fmt.Sprintf("unable to query agent info: %v", err)) + return HealthCritical + } + if stats, ok := info["stats"]; !ok && (reflect.TypeOf(stats).Kind() == reflect.Map) { + c.Ui.Error("error getting stats from the agent api") + return 1 + } + if _, ok := info["stats"]["nomad"]; ok { + return c.checkServerHealth(info["stats"], minPeers) + } + + if _, ok := info["stats"]["client"]; ok { + return c.checkClientHealth(info["stats"], minServers) + } + return HealthWarn +} + +// checkServerHealth returns the health of a server. +// TODO Add more rules for determining server health +func (c *AgentCheckCommand) checkServerHealth(info map[string]interface{}, minPeers int) int { + raft := info["raft"].(map[string]interface{}) + knownPeers, err := strconv.Atoi(raft["num_peers"].(string)) + if err != nil { + c.Ui.Output(fmt.Sprintf("unable to get known peers: %v", err)) + return HealthCritical + } + + if knownPeers < minPeers { + c.Ui.Output(fmt.Sprintf("known peers: %v, is less than expected number of peers: %v", knownPeers, minPeers)) + return HealthCritical + } + return HealthPass +} + +// checkClientHealth retuns the health of a client +func (c *AgentCheckCommand) checkClientHealth(info map[string]interface{}, minServers int) int { + clientStats := info["client"].(map[string]interface{}) + knownServers, err := strconv.Atoi(clientStats["known_servers"].(string)) + if err != nil { + c.Ui.Output(fmt.Sprintf("unable to get known servers: %v", err)) + return HealthCritical + } + + heartbeatTTL, err := time.ParseDuration(clientStats["heartbeat_ttl"].(string)) + if err != nil { + c.Ui.Output(fmt.Sprintf("unable to parse heartbeat TTL: %v", err)) + return HealthCritical + } + + lastHeartbeat, err := time.ParseDuration(clientStats["last_heartbeat"].(string)) + if err != nil { + c.Ui.Output(fmt.Sprintf("unable to parse last heartbeat: %v", err)) + return HealthCritical + } + + if lastHeartbeat > heartbeatTTL { + c.Ui.Output(fmt.Sprintf("last heartbeat was %q time ago, expected heartbeat ttl: %q", lastHeartbeat, heartbeatTTL)) + return HealthCritical + } + + if knownServers < minServers { + c.Ui.Output(fmt.Sprintf("known servers: %v, is less than expected number of servers: %v", knownServers, minServers)) + return HealthCritical + } + + return HealthPass +} diff --git a/command/check_test.go b/command/check_test.go new file mode 100644 index 000000000..00e6e53c3 --- /dev/null +++ b/command/check_test.go @@ -0,0 +1,29 @@ +package command + +import ( + "fmt" + "testing" + + "github.com/mitchellh/cli" +) + +func TestAgentCheckCommand_ServerHealth(t *testing.T) { + srv, _, url := testServer(t, nil) + defer srv.Stop() + + ui := new(cli.MockUi) + cmd := &AgentCheckCommand{Meta: Meta{Ui: ui}} + address := fmt.Sprintf("-address=%s", url) + + code := cmd.Run([]string{address}) + if code != HealthPass { + t.Fatalf("expected exit: %v, actual: %d", HealthPass, code) + } + + minPeers := fmt.Sprintf("-min-peers=%v", 3) + code = cmd.Run([]string{address, minPeers}) + if code != HealthCritical { + t.Fatalf("expected exitcode: %v, actual: %v", HealthCritical, code) + } + +} diff --git a/commands.go b/commands.go index 2374dea06..1ebc1bd76 100644 --- a/commands.go +++ b/commands.go @@ -30,7 +30,6 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, - "agent": func() (cli.Command, error) { return &agent.Command{ Revision: GitCommit, @@ -40,13 +39,16 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { ShutdownCh: make(chan struct{}), }, nil }, - "agent-info": func() (cli.Command, error) { return &command.AgentInfoCommand{ Meta: meta, }, nil }, - + "check": func() (cli.Command, error) { + return &command.AgentCheckCommand{ + Meta: meta, + }, nil + }, "client-config": func() (cli.Command, error) { return &command.ClientConfigCommand{ Meta: meta, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 18fa5f93b..f0bfc4510 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1474,6 +1474,10 @@ const ( NomadConsulPrefix = "nomad-registered-service" ) +var ( + AgentServicePrefix = fmt.Sprintf("%s-%s", NomadConsulPrefix, "agent") +) + // The Service model represents a Consul service defintion type Service struct { Name string // Name of the service, defaults to id @@ -1519,8 +1523,8 @@ func (s *Service) InitFields(job string, taskGroup string, task string) { } } -func (s *Service) ID(allocID string, taskName string) string { - return fmt.Sprintf("%s-%s-%s-%s", NomadConsulPrefix, allocID, taskName, s.Hash()) +func (s *Service) ID(identifier string) string { + return fmt.Sprintf("%s-%s-%s", NomadConsulPrefix, identifier, s.Hash()) } // Validate checks if the Check definition is valid