From 2c1611c8ccc22b430766f7171dcc8e28edb56fcc Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 30 Mar 2016 23:20:24 -0700 Subject: [PATCH 01/17] Added parsing logic for config block --- command/agent/config.go | 51 +++++++++++++++++++++++++++++++++++ command/agent/config_parse.go | 50 ++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/command/agent/config.go b/command/agent/config.go index 6137ea4f0..76079483d 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -82,6 +82,10 @@ type Config struct { // AtlasConfig is used to configure Atlas Atlas *AtlasConfig `mapstructure:"atlas"` + // ConsulConfig is used to configure Consul clients and register the nomad + // server and client services with Consul + ConsulConfig *ConsulConfig `mapstructure:"consul"` + // NomadConfig is used to override the default config. // This is largly used for testing purposes. NomadConfig *nomad.Config `mapstructure:"-" json:"-"` @@ -124,6 +128,53 @@ type AtlasConfig struct { Endpoint string `mapstructure:"endpoint"` } +// ConsulConfig is used to configure Consul clients and register the nomad +// server and client services with Consul +type ConsulConfig struct { + + // ServerServiceName is the name of the service that Nomad uses to register + // servers with Consul + ServerServiceName string `mapstructure:"server_service_name"` + + // ClientServiceName is the name of the service that Nomad uses to register + // clients with Consul + ClientServiceName string `mapstructure:"client_service_name"` + + // Addr is the address of the local Consul agent + Addr string `mapstructure:"addr"` + + // Token is used to provide a per-request ACL token.This options overrides + // the agent's default token + Token string `mapstructure:"token"` + + // Auth is the information to use for http access to Consul agent + Auth string `mapstructure:"auth"` + + // EnableSSL sets the transport scheme to talk to the Consul agent as https + EnableSSL bool `mapstructure:"ssl"` + + // VerifySSL enables or disables SSL verification when the transport scheme + // for the consul api client is https + VerifySSL bool `mapstructure:"verify_ssl"` + + // CAFile is the path to the ca certificate used for Consul communication + CAFile string `mapstructure:"ca_file"` + + // CertFile is the path to the certificate for Consul communication + CertFile string `mapstructure:"cert_file"` + + // KeyFile is the path to the private key for Consul communication + KeyFile string `mapstructure:"key_file"` + + // ServerAutoJoin enables Nomad servers to find peers by querying Consul and + // joining them + ServerAutoJoin bool `mapstructure:"server_auto_join"` + + // ClientAutoJoin enables Nomad servers to find addresses of Nomad servers + // and register with them + ClientAutoJoin bool `mapstructure:"client_auto_join"` +} + // ClientConfig is configuration specific to the client mode type ClientConfig struct { // Enabled controls if we are a client diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 99bd06f07..9e64b4026 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -90,6 +90,7 @@ func parseConfig(result *Config, list *ast.ObjectList) error { "disable_update_check", "disable_anonymous_signature", "atlas", + "consul", "http_api_response_headers", } if err := checkHCLKeys(list, valid); err != nil { @@ -109,6 +110,7 @@ func parseConfig(result *Config, list *ast.ObjectList) error { delete(m, "server") delete(m, "telemetry") delete(m, "atlas") + delete(m, "consul") delete(m, "http_api_response_headers") // Decode the rest @@ -165,6 +167,13 @@ func parseConfig(result *Config, list *ast.ObjectList) error { } } + // Parse the consul config + if o := list.Filter("consul"); len(o.Items) > 0 { + if err := parseConsulConfig(&result.ConsulConfig, o); err != nil { + return multierror.Prefix(err, "consul ->") + } + } + // Parse out http_api_response_headers fields. These are in HCL as a list so // we need to iterate over them and merge them. if headersO := list.Filter("http_api_response_headers"); len(headersO.Items) > 0 { @@ -530,6 +539,47 @@ func parseAtlas(result **AtlasConfig, list *ast.ObjectList) error { return nil } +func parseConsulConfig(result **ConsulConfig, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'consul' block allowed") + } + + // Get our consul object + listVal := list.Items[0].Val + + // Check for invalid keys + valid := []string{ + "server_service_name", + "client_service_name", + "addr", + "token", + "auth", + "ssl", + "verify_ssl", + "ca_file", + "cert_file", + "key_file", + } + + if err := checkHCLKeys(listVal, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, listVal); err != nil { + return err + } + + var consulConfig ConsulConfig + if err := mapstructure.WeakDecode(m, &consulConfig); err != nil { + return err + } + + *result = &consulConfig + return nil +} + func checkHCLKeys(node ast.Node, valid []string) error { var list *ast.ObjectList switch n := node.(type) { From d32757b03e0d825bc3edcd206be602bf0081ac4c Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sun, 10 Apr 2016 01:13:25 -0700 Subject: [PATCH 02/17] Added a check command --- command/check.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++++ commands.go | 8 +++-- 2 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 command/check.go diff --git a/command/check.go b/command/check.go new file mode 100644 index 000000000..8bc559f98 --- /dev/null +++ b/command/check.go @@ -0,0 +1,93 @@ +package command + +import ( + "fmt" + "strconv" + "strings" +) + +const ( + HealthCritical = 1 + HealthWarn = 2 + HealthPass = 0 +) + +type AgentCheckCommand struct { + Meta +} + +func (c *AgentCheckCommand) Help() string { + helpText := ` +Usage: nomad check + + Display state of the Nomad agent. The exit code of the command is Nagios + compatible and could be used with alerting systems. + +General Options: + + ` + generalOptionsUsage() + ` + +Agent Check Options: + + -min-peers + Minimum number of peers that a server is expected to know. +` + + return strings.TrimSpace(helpText) +} + +func (c *AgentCheckCommand) Synopsis() string { + return "Displays health of the local Nomad agent" +} + +func (c *AgentCheckCommand) Run(args []string) int { + var minPeers int + + flags := c.Meta.FlagSet("check", FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.IntVar(&minPeers, "min-peers", 0, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("error initializing client: %s", err)) + return HealthCritical + } + + info, err := client.Agent().Self() + if err != nil { + c.Ui.Output(fmt.Sprintf("unable to query agent info: %v", err)) + return HealthCritical + } + if _, ok := info["stats"]["nomad"]; ok { + return c.checkServerHealth(info["stats"], minPeers) + } + + if _, ok := info["client"]; ok { + return c.checkClientHealth(info) + } + return HealthWarn +} + +// checkServerHealth returns the health of a server +func (c *AgentCheckCommand) checkServerHealth(info map[string]interface{}, minPeers int) int { + raft := info["raft"].(map[string]interface{}) + knownPeers, err := strconv.Atoi(raft["num_peers"].(string)) + if err != nil { + c.Ui.Output(fmt.Sprintf("unable to get known peers: %v", err)) + return HealthCritical + } + + if knownPeers < minPeers { + c.Ui.Output(fmt.Sprintf("known peers: %v, is less than expected number of peers: %v", knownPeers, minPeers)) + return HealthCritical + } + return HealthPass +} + +func (c *AgentCheckCommand) checkClientHealth(info map[string]map[string]interface{}) int { + return HealthPass +} diff --git a/commands.go b/commands.go index 2374dea06..1ebc1bd76 100644 --- a/commands.go +++ b/commands.go @@ -30,7 +30,6 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, - "agent": func() (cli.Command, error) { return &agent.Command{ Revision: GitCommit, @@ -40,13 +39,16 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { ShutdownCh: make(chan struct{}), }, nil }, - "agent-info": func() (cli.Command, error) { return &command.AgentInfoCommand{ Meta: meta, }, nil }, - + "check": func() (cli.Command, error) { + return &command.AgentCheckCommand{ + Meta: meta, + }, nil + }, "client-config": func() (cli.Command, error) { return &command.ClientConfigCommand{ Meta: meta, From e5f0d7e5317cd375d4d55e506665c8155800dd78 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sun, 10 Apr 2016 01:45:10 -0700 Subject: [PATCH 03/17] Adding logic for checking health of client --- command/check.go | 51 +++++++++++++++++++++++++++++++++++++------ command/check_test.go | 29 ++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 7 deletions(-) create mode 100644 command/check_test.go diff --git a/command/check.go b/command/check.go index 8bc559f98..d35e5d268 100644 --- a/command/check.go +++ b/command/check.go @@ -4,12 +4,14 @@ import ( "fmt" "strconv" "strings" + "time" ) const ( - HealthCritical = 1 - HealthWarn = 2 + HealthCritical = 2 + HealthWarn = 1 HealthPass = 0 + HealthUnknown = 3 ) type AgentCheckCommand struct { @@ -31,6 +33,9 @@ Agent Check Options: -min-peers Minimum number of peers that a server is expected to know. + + -min-servers + Minumum number of servers that a client is expected to know. ` return strings.TrimSpace(helpText) @@ -41,11 +46,12 @@ func (c *AgentCheckCommand) Synopsis() string { } func (c *AgentCheckCommand) Run(args []string) int { - var minPeers int + var minPeers, minServers int flags := c.Meta.FlagSet("check", FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.IntVar(&minPeers, "min-peers", 0, "") + flags.IntVar(&minServers, "min-servers", 1, "") if err := flags.Parse(args); err != nil { return 1 @@ -66,13 +72,14 @@ func (c *AgentCheckCommand) Run(args []string) int { return c.checkServerHealth(info["stats"], minPeers) } - if _, ok := info["client"]; ok { - return c.checkClientHealth(info) + if _, ok := info["stats"]["client"]; ok { + return c.checkClientHealth(info["stats"], minServers) } return HealthWarn } -// checkServerHealth returns the health of a server +// checkServerHealth returns the health of a server. +// TODO Add more rules for determining server health func (c *AgentCheckCommand) checkServerHealth(info map[string]interface{}, minPeers int) int { raft := info["raft"].(map[string]interface{}) knownPeers, err := strconv.Atoi(raft["num_peers"].(string)) @@ -88,6 +95,36 @@ func (c *AgentCheckCommand) checkServerHealth(info map[string]interface{}, minPe return HealthPass } -func (c *AgentCheckCommand) checkClientHealth(info map[string]map[string]interface{}) int { +// checkClientHealth retuns the health of a client +func (c *AgentCheckCommand) checkClientHealth(info map[string]interface{}, minServers int) int { + clientStats := info["client"].(map[string]interface{}) + knownServers, err := strconv.Atoi(clientStats["known_servers"].(string)) + if err != nil { + c.Ui.Output(fmt.Sprintf("unable to get known servers: %v", err)) + return HealthCritical + } + + heartbeatTTL, err := time.ParseDuration(clientStats["heartbeat_ttl"].(string)) + if err != nil { + c.Ui.Output(fmt.Sprintf("unable to parse heartbeat TTL: %v", err)) + return HealthCritical + } + + lastHeartbeat, err := time.ParseDuration(clientStats["last_heartbeat"].(string)) + if err != nil { + c.Ui.Output(fmt.Sprintf("unable to parse last heartbeat: %v", err)) + return HealthCritical + } + + if lastHeartbeat > heartbeatTTL { + c.Ui.Output(fmt.Sprintf("last heartbeat was %q time ago, expected heartbeat ttl: %q", lastHeartbeat, heartbeatTTL)) + return HealthCritical + } + + if knownServers < minServers { + c.Ui.Output(fmt.Sprintf("known servers: %v, is less than expected number of servers: %v", knownServers, minServers)) + return HealthCritical + } + return HealthPass } diff --git a/command/check_test.go b/command/check_test.go new file mode 100644 index 000000000..00e6e53c3 --- /dev/null +++ b/command/check_test.go @@ -0,0 +1,29 @@ +package command + +import ( + "fmt" + "testing" + + "github.com/mitchellh/cli" +) + +func TestAgentCheckCommand_ServerHealth(t *testing.T) { + srv, _, url := testServer(t, nil) + defer srv.Stop() + + ui := new(cli.MockUi) + cmd := &AgentCheckCommand{Meta: Meta{Ui: ui}} + address := fmt.Sprintf("-address=%s", url) + + code := cmd.Run([]string{address}) + if code != HealthPass { + t.Fatalf("expected exit: %v, actual: %d", HealthPass, code) + } + + minPeers := fmt.Sprintf("-min-peers=%v", 3) + code = cmd.Run([]string{address, minPeers}) + if code != HealthCritical { + t.Fatalf("expected exitcode: %v, actual: %v", HealthCritical, code) + } + +} From 2663ef9d6a3f5d30b5ea672bc6084da7f4893ac2 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 12 Apr 2016 01:55:19 -0400 Subject: [PATCH 04/17] Refactored the signature of NewConsulService --- client/client.go | 2 +- client/consul/sync.go | 35 +++++++++++++++++++++--------- client/consul/sync_test.go | 12 ++++++---- client/driver/executor/executor.go | 9 +++++--- 4 files changed, 40 insertions(+), 18 deletions(-) diff --git a/client/client.go b/client/client.go index bd6a20216..6fb3ad34e 100644 --- a/client/client.go +++ b/client/client.go @@ -1181,7 +1181,7 @@ func (c *Client) setupConsulClient() error { VerifySSL: c.config.ReadBoolDefault("consul.verifyssl", true), } - cs, err := consul.NewConsulService(&cfg, c.logger, "") + cs, err := consul.NewConsulService(&cfg, c.logger) c.consulService = cs return err } diff --git a/client/consul/sync.go b/client/consul/sync.go index b1bc11ffa..e6092c0e0 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -22,10 +22,11 @@ type ConsulService struct { client *consul.Client availble bool - task *structs.Task + taskName string allocID string delegateChecks map[string]struct{} createCheck func(*structs.ServiceCheck, string) (Check, error) + addrFinder func(string) (string, int) trackedServices map[string]*consul.AgentService trackedChecks map[string]*consul.AgentCheckRegistration @@ -60,7 +61,7 @@ const ( ) // NewConsulService returns a new ConsulService -func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) (*ConsulService, error) { +func NewConsulService(config *ConsulConfig, logger *log.Logger) (*ConsulService, error) { var err error var c *consul.Client cfg := consul.DefaultConfig() @@ -114,7 +115,6 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) } consulService := ConsulService{ client: c, - allocID: allocID, logger: logger, trackedServices: make(map[string]*consul.AgentService), trackedChecks: make(map[string]*consul.AgentCheckRegistration), @@ -133,15 +133,30 @@ func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, c return c } +// SetAllocID sets the allocID +func (c *ConsulService) SetAllocID(allocID string) *ConsulService { + c.allocID = allocID + return c +} + +func (c *ConsulService) SetAddrFinder(addrFinder func(string) (string, int)) *ConsulService { + c.addrFinder = addrFinder + return c +} + +func (c *ConsulService) SetTaskName(taskName string) *ConsulService { + c.taskName = taskName + return c +} + // SyncTask sync the services and task with consul -func (c *ConsulService) SyncTask(task *structs.Task) error { +func (c *ConsulService) SyncServices(services []*structs.Service) error { var mErr multierror.Error - c.task = task taskServices := make(map[string]*consul.AgentService) taskChecks := make(map[string]*consul.AgentCheckRegistration) // Register Services and Checks that we don't know about or has changed - for _, service := range task.Services { + for _, service := range services { srv, err := c.createService(service) if err != nil { mErr.Errors = append(mErr.Errors, err) @@ -296,11 +311,11 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con // createService creates a Consul AgentService from a Nomad Service func (c *ConsulService) createService(service *structs.Service) (*consul.AgentService, error) { srv := consul.AgentService{ - ID: service.ID(c.allocID, c.task.Name), + ID: service.ID(c.allocID, c.taskName), Service: service.Name, Tags: service.Tags, } - host, port := c.task.FindHostAndPortFor(service.PortLabel) + host, port := c.addrFinder(service.PortLabel) if host != "" { srv.Address = host } @@ -350,7 +365,7 @@ func (c *ConsulService) PeriodicSync() { case <-sync.C: if err := c.performSync(); err != nil { if c.availble { - c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.task.Name, err) + c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.taskName, err) } c.availble = false } else { @@ -358,7 +373,7 @@ func (c *ConsulService) PeriodicSync() { } case <-c.shutdownCh: sync.Stop() - c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.task.Name) + c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.taskName) return } } diff --git a/client/consul/sync_test.go b/client/consul/sync_test.go index bb1acfc3c..c69a0caeb 100644 --- a/client/consul/sync_test.go +++ b/client/consul/sync_test.go @@ -12,6 +12,10 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + allocID = "12" +) + var ( logger = log.New(os.Stdout, "", log.LstdFlags) check1 = structs.ServiceCheck{ @@ -37,11 +41,11 @@ var ( ) func TestConsulServiceRegisterServices(t *testing.T) { - allocID := "12" - cs, err := NewConsulService(&ConsulConfig{}, logger, allocID) + cs, err := NewConsulService(&ConsulConfig{}, logger) if err != nil { t.Fatalf("Err: %v", err) } + cs.SetAllocID(allocID) // Skipping the test if consul isn't present if !cs.consulPresent() { return @@ -63,11 +67,11 @@ func TestConsulServiceRegisterServices(t *testing.T) { } func TestConsulServiceUpdateService(t *testing.T) { - allocID := "12" - cs, err := NewConsulService(&ConsulConfig{}, logger, allocID) + cs, err := NewConsulService(&ConsulConfig{}, logger) if err != nil { t.Fatalf("Err: %v", err) } + cs.SetAllocID(allocID) // Skipping the test if consul isn't present if !cs.consulPresent() { return diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 664739f51..506f728d9 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -317,7 +317,7 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { // Re-syncing task with consul service if e.consulService != nil { - if err := e.consulService.SyncTask(task); err != nil { + if err := e.consulService.SyncServices(task.Services); err != nil { return err } } @@ -431,17 +431,20 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { e.logger.Printf("[INFO] executor: registering services") e.consulCtx = ctx if e.consulService == nil { - cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger, e.ctx.AllocID) + cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger) if err != nil { return err } cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck) + cs.SetAllocID(e.ctx.AllocID) + cs.SetTaskName(e.ctx.Task.Name) + cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) e.consulService = cs } if e.ctx != nil { e.interpolateServices(e.ctx.Task) } - err := e.consulService.SyncTask(e.ctx.Task) + err := e.consulService.SyncServices(e.ctx.Task.Services) go e.consulService.PeriodicSync() return err } From 50b03354f2ae85c5a544fbe60fc337f6b7672ca1 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 11 May 2016 13:29:56 -0700 Subject: [PATCH 05/17] Fixed the consul tests --- client/consul/sync.go | 4 +++- client/consul/sync_test.go | 10 +++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/client/consul/sync.go b/client/consul/sync.go index e6092c0e0..e1d16b9c9 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -139,17 +139,19 @@ func (c *ConsulService) SetAllocID(allocID string) *ConsulService { return c } +// SetAddrFinder sets a function to find the host and port for a Service func (c *ConsulService) SetAddrFinder(addrFinder func(string) (string, int)) *ConsulService { c.addrFinder = addrFinder return c } +// SetTaskName sets the task name whose services we are syncing with Consul func (c *ConsulService) SetTaskName(taskName string) *ConsulService { c.taskName = taskName return c } -// SyncTask sync the services and task with consul +// SyncServices sync the services with consul func (c *ConsulService) SyncServices(services []*structs.Service) error { var mErr multierror.Error taskServices := make(map[string]*consul.AgentService) diff --git a/client/consul/sync_test.go b/client/consul/sync_test.go index c69a0caeb..df6a7d778 100644 --- a/client/consul/sync_test.go +++ b/client/consul/sync_test.go @@ -51,7 +51,9 @@ func TestConsulServiceRegisterServices(t *testing.T) { return } task := mockTask() - if err := cs.SyncTask(task); err != nil { + cs.SetTaskName(task.Name) + cs.SetAddrFinder(task.FindHostAndPortFor) + if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) } defer cs.Shutdown() @@ -78,7 +80,9 @@ func TestConsulServiceUpdateService(t *testing.T) { } task := mockTask() - if err := cs.SyncTask(task); err != nil { + cs.SetTaskName(task.Name) + cs.SetAddrFinder(task.FindHostAndPortFor) + if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) } defer cs.Shutdown() @@ -86,7 +90,7 @@ func TestConsulServiceUpdateService(t *testing.T) { //Update Service defn 1 newTags := []string{"tag3"} task.Services[0].Tags = newTags - if err := cs.SyncTask(task); err != nil { + if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) } // Make sure all the services and checks are still present From 7b720afd867679b38e310a283be5c98064f29203 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 11 May 2016 15:22:00 -0700 Subject: [PATCH 06/17] Added tests for parsing consul client configuration --- command/agent/config-test-fixtures/basic.hcl | 14 ++++++++++++++ command/agent/config_parse.go | 2 ++ command/agent/config_parse_test.go | 14 ++++++++++++++ 3 files changed, 30 insertions(+) diff --git a/command/agent/config-test-fixtures/basic.hcl b/command/agent/config-test-fixtures/basic.hcl index 34eb66818..b62bc0804 100644 --- a/command/agent/config-test-fixtures/basic.hcl +++ b/command/agent/config-test-fixtures/basic.hcl @@ -81,3 +81,17 @@ atlas { http_api_response_headers { Access-Control-Allow-Origin = "*" } +consul { + server_service_name = "nomad-server" + client_service_name = "nomad-client" + addr = "127.0.0.1:9500" + token = "token1" + auth = "username:pass" + ssl = true + verify_ssl = false + ca_file = "/path/to/ca/file" + cert_file = "/path/to/cert/file" + key_file = "/path/to/key/file" + server_auto_join = true + client_auto_join = true +} diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 9e64b4026..69b6a626d 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -560,6 +560,8 @@ func parseConsulConfig(result **ConsulConfig, list *ast.ObjectList) error { "ca_file", "cert_file", "key_file", + "client_auto_join", + "server_auto_join", } if err := checkHCLKeys(listVal, valid); err != nil { diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 7c5c3c468..7b9f3d5b6 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -96,6 +96,20 @@ func TestConfig_Parse(t *testing.T) { Join: true, Endpoint: "127.0.0.1:1234", }, + ConsulConfig: &ConsulConfig{ + ServerServiceName: "nomad-server", + ClientServiceName: "nomad-client", + Addr: "127.0.0.1:9500", + Token: "token1", + Auth: "username:pass", + EnableSSL: true, + VerifySSL: false, + CAFile: "/path/to/ca/file", + CertFile: "/path/to/cert/file", + KeyFile: "/path/to/key/file", + ServerAutoJoin: true, + ClientAutoJoin: true, + }, HTTPAPIResponseHeaders: map[string]string{ "Access-Control-Allow-Origin": "*", }, From a2237a6057ba9c6349159dbc1d32b4cd94567200 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 11 May 2016 15:24:37 -0700 Subject: [PATCH 07/17] Implemented registering client and server services --- client/client.go | 10 ++++- client/config/config.go | 6 +++ command/agent/agent.go | 89 ++++++++++++++++++++++++++++++++++++++++ command/agent/command.go | 9 ++-- command/agent/config.go | 51 +++++++++++++++++++++++ 5 files changed, 160 insertions(+), 5 deletions(-) diff --git a/client/client.go b/client/client.go index 6fb3ad34e..0d1b4cbcd 100644 --- a/client/client.go +++ b/client/client.go @@ -1186,7 +1186,7 @@ func (c *Client) setupConsulClient() error { return err } -// syncConsul removes services of tasks which are no longer in running state +// syncConsul removes services of tasks which are no longer in running state and func (c *Client) syncConsul() { sync := time.NewTicker(consulSyncInterval) for { @@ -1222,6 +1222,14 @@ func (c *Client) syncConsul() { } } } + + // Add the client service + clientService := &structs.Service{ + Name: c.config.ClientServiceName, + PortLabel: "clienthttpaddr", + } + services[clientService.ID("agent", "client")] = struct{}{} + if err := c.consulService.KeepServices(services); err != nil { c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) } diff --git a/client/config/config.go b/client/config/config.go index f5a0a6b0b..64a1dd53d 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/nomad/structs" ) @@ -108,6 +109,11 @@ type Config struct { // Revision is the commit number of the Nomad client Revision string + + ClientServiceName string + + // ConsulConfig is the configuration to connect with Consul Agent + ConsulConfig *consul.ConsulConfig } func (c *Config) Copy() *Config { diff --git a/command/agent/agent.go b/command/agent/agent.go index db54da287..5932d1124 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -8,11 +8,13 @@ import ( "os" "path/filepath" "runtime" + "strconv" "sync" "time" "github.com/hashicorp/nomad/client" clientconfig "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" ) @@ -27,6 +29,11 @@ type Agent struct { logger *log.Logger logOutput io.Writer + consulService *consul.ConsulService + consulConfig *consul.ConsulConfig + serverHTTPAddr string + clientHTTPAddr string + server *nomad.Server client *client.Client @@ -49,6 +56,8 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) { shutdownCh: make(chan struct{}), } + a.createConsulConfig() + if err := a.setupServer(); err != nil { return nil, err } @@ -58,6 +67,11 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) { if a.client == nil && a.server == nil { return nil, fmt.Errorf("must have at least client or server mode enabled") } + if err := a.syncAgentServicesWithConsul(a.serverHTTPAddr, a.clientHTTPAddr); err != nil { + a.logger.Printf("[ERR] agent: unable to sync agent services with consul: %v", err) + } else { + go a.consulService.PeriodicSync() + } return a, nil } @@ -140,6 +154,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) { if port := a.config.Ports.Serf; port != 0 { conf.SerfConfig.MemberlistConfig.BindPort = port } + a.serverHTTPAddr = fmt.Sprintf("%v:%v", a.config.Addresses.HTTP, a.config.Ports.HTTP) if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" { dur, err := time.ParseDuration(gcThreshold) @@ -226,6 +241,7 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) { httpAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port) } conf.Node.HTTPAddr = httpAddr + a.clientHTTPAddr = httpAddr // Reserve resources on the node. r := conf.Node.Reserved @@ -242,6 +258,8 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) { conf.Version = fmt.Sprintf("%s%s", a.config.Version, a.config.VersionPrerelease) conf.Revision = a.config.Revision + conf.ConsulConfig = a.consulConfig + return conf, nil } @@ -403,6 +421,12 @@ func (a *Agent) Shutdown() error { } } + if a.consulService != nil { + if err := a.consulService.Shutdown(); err != nil { + a.logger.Printf("[ERR] agent: shutting down consul service failed: %v", err) + } + } + a.logger.Println("[INFO] agent: shutdown complete") a.shutdown = true close(a.shutdownCh) @@ -445,3 +469,68 @@ func (a *Agent) Stats() map[string]map[string]string { } return stats } + +func (a *Agent) createConsulConfig() { + cfg := &consul.ConsulConfig{ + Addr: a.config.ConsulConfig.Addr, + Token: a.config.ConsulConfig.Token, + Auth: a.config.ConsulConfig.Auth, + EnableSSL: a.config.ConsulConfig.EnableSSL, + VerifySSL: a.config.ConsulConfig.VerifySSL, + CAFile: a.config.ConsulConfig.CAFile, + CertFile: a.config.ConsulConfig.CertFile, + KeyFile: a.config.ConsulConfig.KeyFile, + } + a.consulConfig = cfg +} + +func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAddr string) error { + cs, err := consul.NewConsulService(a.consulConfig, a.logger) + if err != nil { + return err + } + a.consulService = cs + var services []*structs.Service + addrs := make(map[string]string) + if a.client != nil && a.config.ConsulConfig.ClientServiceName != "" { + if err != nil { + return err + } + clientService := &structs.Service{ + Name: a.config.ConsulConfig.ClientServiceName, + PortLabel: "clienthttpaddr", + } + addrs["clienthttpaddr"] = clientHttpAddr + services = append(services, clientService) + cs.SetTaskName("client") + } + if a.server != nil && a.config.ConsulConfig.ServerServiceName != "" { + serverService := &structs.Service{ + Name: a.config.ConsulConfig.ServerServiceName, + PortLabel: "serverhttpaddr", + } + addrs["serverhttpaddr"] = serverHttpAddr + services = append(services, serverService) + cs.SetTaskName("server") + } + + cs.SetAddrFinder(func(portLabel string) (string, int) { + addr := addrs[portLabel] + if addr == "" { + return "", 0 + } + host, port, err := net.SplitHostPort(addr) + if err != nil { + return "", 0 + } + p, err := strconv.Atoi(port) + if err != nil { + return "", 0 + } + return host, p + }) + + cs.SetAllocID("agent") + + return cs.SyncServices(services) +} diff --git a/command/agent/command.go b/command/agent/command.go index 1ce19a933..d414808fc 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -58,10 +58,11 @@ func (c *Command) readConfig() *Config { // Make a new, empty config. cmdConfig := &Config{ - Atlas: &AtlasConfig{}, - Client: &ClientConfig{}, - Ports: &Ports{}, - Server: &ServerConfig{}, + Atlas: &AtlasConfig{}, + ConsulConfig: &ConsulConfig{}, + Client: &ClientConfig{}, + Ports: &Ports{}, + Server: &ServerConfig{}, } flags := flag.NewFlagSet("agent", flag.ContinueOnError) diff --git a/command/agent/config.go b/command/agent/config.go index 76079483d..c58345b5c 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -563,6 +563,14 @@ func (c *Config) Merge(b *Config) *Config { result.Atlas = result.Atlas.Merge(b.Atlas) } + // Apply the Consul Configuration + if result.ConsulConfig == nil && b.ConsulConfig != nil { + consulConfig := *b.ConsulConfig + result.ConsulConfig = &consulConfig + } else if b.ConsulConfig != nil { + result.ConsulConfig = result.ConsulConfig.Merge(b.ConsulConfig) + } + // Merge config files lists result.Files = append(result.Files, b.Files...) @@ -761,6 +769,49 @@ func (a *AtlasConfig) Merge(b *AtlasConfig) *AtlasConfig { return &result } +// Merge merges two Consul Configurations together. +func (a *ConsulConfig) Merge(b *ConsulConfig) *ConsulConfig { + result := *a + + if b.ServerServiceName != "" { + result.ServerServiceName = b.ServerServiceName + } + if b.ClientServiceName != "" { + result.ClientServiceName = b.ClientServiceName + } + if b.Addr != "" { + result.Addr = b.Addr + } + if b.Token != "" { + result.Token = b.Token + } + if b.Auth != "" { + result.Auth = b.Auth + } + if b.EnableSSL { + result.EnableSSL = true + } + if b.VerifySSL { + result.VerifySSL = true + } + if b.CAFile != "" { + result.CAFile = b.CAFile + } + if b.CertFile != "" { + result.CertFile = b.CertFile + } + if b.KeyFile != "" { + result.KeyFile = b.KeyFile + } + if b.ServerAutoJoin { + result.ServerAutoJoin = true + } + if b.ClientAutoJoin { + result.ClientAutoJoin = true + } + return &result +} + func (r *Resources) Merge(b *Resources) *Resources { result := *r if b.CPU != 0 { From 4266e25e3c9f66141c6edea1f3316778d6a29834 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 11 May 2016 16:10:57 -0700 Subject: [PATCH 08/17] Using consul config from client config instead of reading from client options --- client/client.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/client/client.go b/client/client.go index 0d1b4cbcd..65d16add7 100644 --- a/client/client.go +++ b/client/client.go @@ -1173,15 +1173,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error { // setupConsulClient creates a ConsulService func (c *Client) setupConsulClient() error { - cfg := consul.ConsulConfig{ - Addr: c.config.ReadDefault("consul.address", "127.0.0.1:8500"), - Token: c.config.Read("consul.token"), - Auth: c.config.Read("consul.auth"), - EnableSSL: c.config.ReadBoolDefault("consul.ssl", false), - VerifySSL: c.config.ReadBoolDefault("consul.verifyssl", true), - } - - cs, err := consul.NewConsulService(&cfg, c.logger) + cs, err := consul.NewConsulService(c.config.ConsulConfig, c.logger) c.consulService = cs return err } From de660c4acf3e2892c3f0211da2c6e790ffcfe8b8 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 11 May 2016 16:24:43 -0700 Subject: [PATCH 09/17] Removed allocID and task name from consul service --- client/client.go | 6 ++++-- client/consul/sync.go | 27 ++++++++++----------------- client/consul/sync_test.go | 14 ++++++-------- client/driver/executor/executor.go | 3 +-- command/agent/agent.go | 7 +++---- nomad/structs/structs.go | 4 ++-- 6 files changed, 26 insertions(+), 35 deletions(-) diff --git a/client/client.go b/client/client.go index 65d16add7..1ca63c45b 100644 --- a/client/client.go +++ b/client/client.go @@ -1208,7 +1208,8 @@ func (c *Client) syncConsul() { if taskState.State == structs.TaskStateRunning { if tr, ok := ar.tasks[taskName]; ok { for _, service := range tr.task.Services { - services[service.ID(ar.alloc.ID, tr.task.Name)] = struct{}{} + svcIdentifier := fmt.Sprintf("%s-%s", ar.alloc.ID, tr.task.Name) + services[service.ID(svcIdentifier)] = struct{}{} } } } @@ -1220,7 +1221,8 @@ func (c *Client) syncConsul() { Name: c.config.ClientServiceName, PortLabel: "clienthttpaddr", } - services[clientService.ID("agent", "client")] = struct{}{} + svcIdentifier := fmt.Sprintf("%s-%s", "agent", "client") + services[clientService.ID(svcIdentifier)] = struct{}{} if err := c.consulService.KeepServices(services); err != nil { c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) diff --git a/client/consul/sync.go b/client/consul/sync.go index e1d16b9c9..e6d857eb1 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -22,11 +22,10 @@ type ConsulService struct { client *consul.Client availble bool - taskName string - allocID string - delegateChecks map[string]struct{} - createCheck func(*structs.ServiceCheck, string) (Check, error) - addrFinder func(string) (string, int) + serviceIdentifier string + delegateChecks map[string]struct{} + createCheck func(*structs.ServiceCheck, string) (Check, error) + addrFinder func(string) (string, int) trackedServices map[string]*consul.AgentService trackedChecks map[string]*consul.AgentCheckRegistration @@ -133,21 +132,15 @@ func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, c return c } -// SetAllocID sets the allocID -func (c *ConsulService) SetAllocID(allocID string) *ConsulService { - c.allocID = allocID - return c -} - // SetAddrFinder sets a function to find the host and port for a Service func (c *ConsulService) SetAddrFinder(addrFinder func(string) (string, int)) *ConsulService { c.addrFinder = addrFinder return c } -// SetTaskName sets the task name whose services we are syncing with Consul -func (c *ConsulService) SetTaskName(taskName string) *ConsulService { - c.taskName = taskName +// SetServiceIdentifier sets the identifier of the services we are syncing with Consul +func (c *ConsulService) SetServiceIdentifier(serviceIdentifier string) *ConsulService { + c.serviceIdentifier = serviceIdentifier return c } @@ -313,7 +306,7 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con // createService creates a Consul AgentService from a Nomad Service func (c *ConsulService) createService(service *structs.Service) (*consul.AgentService, error) { srv := consul.AgentService{ - ID: service.ID(c.allocID, c.taskName), + ID: service.ID(c.serviceIdentifier), Service: service.Name, Tags: service.Tags, } @@ -367,7 +360,7 @@ func (c *ConsulService) PeriodicSync() { case <-sync.C: if err := c.performSync(); err != nil { if c.availble { - c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.taskName, err) + c.logger.Printf("[DEBUG] consul: error in syncing services for %q: %v", c.serviceIdentifier, err) } c.availble = false } else { @@ -375,7 +368,7 @@ func (c *ConsulService) PeriodicSync() { } case <-c.shutdownCh: sync.Stop() - c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.taskName) + c.logger.Printf("[INFO] consul: shutting down sync for %q", c.serviceIdentifier) return } } diff --git a/client/consul/sync_test.go b/client/consul/sync_test.go index df6a7d778..d4fb5fad4 100644 --- a/client/consul/sync_test.go +++ b/client/consul/sync_test.go @@ -45,21 +45,20 @@ func TestConsulServiceRegisterServices(t *testing.T) { if err != nil { t.Fatalf("Err: %v", err) } - cs.SetAllocID(allocID) // Skipping the test if consul isn't present if !cs.consulPresent() { return } task := mockTask() - cs.SetTaskName(task.Name) + cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", allocID, task.Name)) cs.SetAddrFinder(task.FindHostAndPortFor) if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) } defer cs.Shutdown() - service1ID := service1.ID(allocID, task.Name) - service2ID := service2.ID(allocID, task.Name) + service1ID := service1.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) + service2ID := service2.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil { t.Fatalf("err : %v", err) } @@ -73,14 +72,13 @@ func TestConsulServiceUpdateService(t *testing.T) { if err != nil { t.Fatalf("Err: %v", err) } - cs.SetAllocID(allocID) // Skipping the test if consul isn't present if !cs.consulPresent() { return } task := mockTask() - cs.SetTaskName(task.Name) + cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", allocID, task.Name)) cs.SetAddrFinder(task.FindHostAndPortFor) if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) @@ -94,8 +92,8 @@ func TestConsulServiceUpdateService(t *testing.T) { t.Fatalf("err: %v", err) } // Make sure all the services and checks are still present - service1ID := service1.ID(allocID, task.Name) - service2ID := service2.ID(allocID, task.Name) + service1ID := service1.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) + service2ID := service2.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil { t.Fatalf("err : %v", err) } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 506f728d9..9552c2892 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -436,8 +436,7 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { return err } cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck) - cs.SetAllocID(e.ctx.AllocID) - cs.SetTaskName(e.ctx.Task.Name) + cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", e.ctx.AllocID, e.ctx.Task.Name)) cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) e.consulService = cs } diff --git a/command/agent/agent.go b/command/agent/agent.go index 5932d1124..31a7f5acd 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -484,6 +484,7 @@ func (a *Agent) createConsulConfig() { a.consulConfig = cfg } +// syncAgentServicesWithConsul syncs the client and server services with Consul func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAddr string) error { cs, err := consul.NewConsulService(a.consulConfig, a.logger) if err != nil { @@ -502,7 +503,7 @@ func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAdd } addrs["clienthttpaddr"] = clientHttpAddr services = append(services, clientService) - cs.SetTaskName("client") + cs.SetServiceIdentifier("agent-client") } if a.server != nil && a.config.ConsulConfig.ServerServiceName != "" { serverService := &structs.Service{ @@ -511,7 +512,7 @@ func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAdd } addrs["serverhttpaddr"] = serverHttpAddr services = append(services, serverService) - cs.SetTaskName("server") + cs.SetServiceIdentifier("agent-server") } cs.SetAddrFinder(func(portLabel string) (string, int) { @@ -530,7 +531,5 @@ func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAdd return host, p }) - cs.SetAllocID("agent") - return cs.SyncServices(services) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 18fa5f93b..7f3428b3e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1519,8 +1519,8 @@ func (s *Service) InitFields(job string, taskGroup string, task string) { } } -func (s *Service) ID(allocID string, taskName string) string { - return fmt.Sprintf("%s-%s-%s-%s", NomadConsulPrefix, allocID, taskName, s.Hash()) +func (s *Service) ID(identifier string) string { + return fmt.Sprintf("%s-%s-%s", NomadConsulPrefix, identifier, s.Hash()) } // Validate checks if the Check definition is valid From 269a28d5b3e55e4a00e4473f91152c1f4e1cf99d Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 11 May 2016 16:58:14 -0700 Subject: [PATCH 10/17] Fixing the client test --- client/client_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/client_test.go b/client/client_test.go index 462a600fb..c429527ad 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -71,6 +72,7 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) { func testClient(t *testing.T, cb func(c *config.Config)) *Client { conf := DefaultConfig() conf.DevMode = true + conf.ConsulConfig = &consul.ConsulConfig{} if cb != nil { cb(conf) } From 22169446767b8d258440266dd5a039aa3c981012 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 11 May 2016 17:26:53 -0700 Subject: [PATCH 11/17] Fixed an agent test --- client/client.go | 2 +- command/agent/agent_test.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index 1ca63c45b..5169cc489 100644 --- a/client/client.go +++ b/client/client.go @@ -1178,7 +1178,7 @@ func (c *Client) setupConsulClient() error { return err } -// syncConsul removes services of tasks which are no longer in running state and +// syncConsul removes services of tasks which are no longer in running state func (c *Client) syncConsul() { sync := time.NewTicker(consulSyncInterval) for { diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 91a352758..ae470bf62 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -42,6 +42,7 @@ func makeAgent(t *testing.T, cb func(*Config)) (string, *Agent) { Serf: getPort(), } conf.NodeName = fmt.Sprintf("Node %d", conf.Ports.RPC) + conf.ConsulConfig = &ConsulConfig{} // Tighten the Serf timing config.SerfConfig.MemberlistConfig.SuspicionMult = 2 From b2c18168a48ecddbe504c388dc00ca48eae1289a Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 13 May 2016 10:19:49 -0700 Subject: [PATCH 12/17] Making Nomad register services with consul in dev mode --- client/consul/sync.go | 3 ++- command/agent/agent.go | 3 ++- command/agent/config.go | 9 +++++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/client/consul/sync.go b/client/consul/sync.go index e6d857eb1..99255624a 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -411,7 +411,8 @@ func (c *ConsulService) performSync() error { func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService { nomadServices := make(map[string]*consul.AgentService) for _, srv := range srvcs { - if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) { + if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) && + !strings.HasPrefix(srv.ID, fmt.Sprintf("%s-%s", structs.NomadConsulPrefix, "agent")) { nomadServices[srv.ID] = srv } } diff --git a/command/agent/agent.go b/command/agent/agent.go index 31a7f5acd..d2aab1c1b 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -69,7 +69,8 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) { } if err := a.syncAgentServicesWithConsul(a.serverHTTPAddr, a.clientHTTPAddr); err != nil { a.logger.Printf("[ERR] agent: unable to sync agent services with consul: %v", err) - } else { + } + if a.consulService != nil { go a.consulService.PeriodicSync() } return a, nil diff --git a/command/agent/config.go b/command/agent/config.go index c58345b5c..7a0a47cba 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -140,6 +140,10 @@ type ConsulConfig struct { // clients with Consul ClientServiceName string `mapstructure:"client_service_name"` + // AutoRegister determines if Nomad will register the Nomad client and + // server agents with Consul + AutoRegister bool `mapstructure:"auto_register"` + // Addr is the address of the local Consul agent Addr string `mapstructure:"addr"` @@ -419,6 +423,11 @@ func DefaultConfig() *Config { Addresses: &Addresses{}, AdvertiseAddrs: &AdvertiseAddrs{}, Atlas: &AtlasConfig{}, + ConsulConfig: &ConsulConfig{ + ServerServiceName: "nomad-server", + ClientServiceName: "nomad-client", + AutoRegister: true, + }, Client: &ClientConfig{ Enabled: false, NetworkSpeed: 100, From e3fffa75aa85abfe918752cb736a39a586af47c1 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 13 May 2016 10:34:21 -0700 Subject: [PATCH 13/17] Removing addition of the client service while reconciling services --- client/client.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/client/client.go b/client/client.go index 5169cc489..8b753573b 100644 --- a/client/client.go +++ b/client/client.go @@ -1216,14 +1216,6 @@ func (c *Client) syncConsul() { } } - // Add the client service - clientService := &structs.Service{ - Name: c.config.ClientServiceName, - PortLabel: "clienthttpaddr", - } - svcIdentifier := fmt.Sprintf("%s-%s", "agent", "client") - services[clientService.ID(svcIdentifier)] = struct{}{} - if err := c.consulService.KeepServices(services); err != nil { c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) } From 8c5aa0fe65a39c22a0678c28a0a22b02cb4dba62 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sat, 14 May 2016 00:36:26 -0700 Subject: [PATCH 14/17] Added some docs --- client/config/config.go | 2 -- client/consul/sync.go | 14 ++++++++++---- command/agent/agent.go | 19 +++++++------------ 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/client/config/config.go b/client/config/config.go index 64a1dd53d..2b8a3c4aa 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -110,8 +110,6 @@ type Config struct { // Revision is the commit number of the Nomad client Revision string - ClientServiceName string - // ConsulConfig is the configuration to connect with Consul Agent ConsulConfig *consul.ConsulConfig } diff --git a/client/consul/sync.go b/client/consul/sync.go index 99255624a..9713adc72 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -22,10 +22,10 @@ type ConsulService struct { client *consul.Client availble bool - serviceIdentifier string - delegateChecks map[string]struct{} + serviceIdentifier string // serviceIdentifier is a token which identifies which task/alloc the service belongs to + delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul createCheck func(*structs.ServiceCheck, string) (Check, error) - addrFinder func(string) (string, int) + addrFinder func(portLabel string) (string, int) trackedServices map[string]*consul.AgentService trackedChecks map[string]*consul.AgentCheckRegistration @@ -132,7 +132,7 @@ func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, c return c } -// SetAddrFinder sets a function to find the host and port for a Service +// SetAddrFinder sets a function to find the host and port for a Service given its port label func (c *ConsulService) SetAddrFinder(addrFinder func(string) (string, int)) *ConsulService { c.addrFinder = addrFinder return c @@ -466,3 +466,9 @@ func (c *ConsulService) runCheck(check Check) { } } } + +// generateServiceIdentifier returns a service identifier based on an allocation +// id and task name +func generateServiceIdentifier(allocID string, taskName string) string { + return fmt.Sprintf("%s-%s", taskName, allocID) +} diff --git a/command/agent/agent.go b/command/agent/agent.go index d2aab1c1b..288058fd3 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -29,8 +29,8 @@ type Agent struct { logger *log.Logger logOutput io.Writer - consulService *consul.ConsulService - consulConfig *consul.ConsulConfig + consulService *consul.ConsulService // consulService registers the Nomad agent with the consul agent + consulConfig *consul.ConsulConfig // consulConfig is the consul configuration the Nomad client uses to connect with Consul agent serverHTTPAddr string clientHTTPAddr string @@ -56,6 +56,8 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) { shutdownCh: make(chan struct{}), } + // creating the consul client configuration that both the server and client + // uses a.createConsulConfig() if err := a.setupServer(); err != nil { @@ -493,35 +495,28 @@ func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAdd } a.consulService = cs var services []*structs.Service - addrs := make(map[string]string) if a.client != nil && a.config.ConsulConfig.ClientServiceName != "" { if err != nil { return err } clientService := &structs.Service{ Name: a.config.ConsulConfig.ClientServiceName, - PortLabel: "clienthttpaddr", + PortLabel: clientHttpAddr, } - addrs["clienthttpaddr"] = clientHttpAddr services = append(services, clientService) cs.SetServiceIdentifier("agent-client") } if a.server != nil && a.config.ConsulConfig.ServerServiceName != "" { serverService := &structs.Service{ Name: a.config.ConsulConfig.ServerServiceName, - PortLabel: "serverhttpaddr", + PortLabel: serverHttpAddr, } - addrs["serverhttpaddr"] = serverHttpAddr services = append(services, serverService) cs.SetServiceIdentifier("agent-server") } cs.SetAddrFinder(func(portLabel string) (string, int) { - addr := addrs[portLabel] - if addr == "" { - return "", 0 - } - host, port, err := net.SplitHostPort(addr) + host, port, err := net.SplitHostPort(portLabel) if err != nil { return "", 0 } From 522eebcbc1d3fc42af9e319bb337f074e7633675 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sat, 14 May 2016 00:43:25 -0700 Subject: [PATCH 15/17] Using a helper method to create service identifiers --- client/consul/sync.go | 6 +++--- client/consul/sync_test.go | 12 ++++++------ client/driver/executor/executor.go | 2 +- nomad/structs/structs.go | 4 ++++ 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/client/consul/sync.go b/client/consul/sync.go index 9713adc72..08da6e89d 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -412,7 +412,7 @@ func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentServi nomadServices := make(map[string]*consul.AgentService) for _, srv := range srvcs { if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) && - !strings.HasPrefix(srv.ID, fmt.Sprintf("%s-%s", structs.NomadConsulPrefix, "agent")) { + !strings.HasPrefix(srv.ID, structs.AgentServicePrefix) { nomadServices[srv.ID] = srv } } @@ -467,8 +467,8 @@ func (c *ConsulService) runCheck(check Check) { } } -// generateServiceIdentifier returns a service identifier based on an allocation +// GenerateServiceIdentifier returns a service identifier based on an allocation // id and task name -func generateServiceIdentifier(allocID string, taskName string) string { +func GenerateServiceIdentifier(allocID string, taskName string) string { return fmt.Sprintf("%s-%s", taskName, allocID) } diff --git a/client/consul/sync_test.go b/client/consul/sync_test.go index d4fb5fad4..80907c787 100644 --- a/client/consul/sync_test.go +++ b/client/consul/sync_test.go @@ -50,15 +50,15 @@ func TestConsulServiceRegisterServices(t *testing.T) { return } task := mockTask() - cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", allocID, task.Name)) + cs.SetServiceIdentifier(GenerateServiceIdentifier(allocID, task.Name)) cs.SetAddrFinder(task.FindHostAndPortFor) if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) } defer cs.Shutdown() - service1ID := service1.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) - service2ID := service2.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) + service1ID := service1.ID(GenerateServiceIdentifier(allocID, task.Name)) + service2ID := service2.ID(GenerateServiceIdentifier(allocID, task.Name)) if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil { t.Fatalf("err : %v", err) } @@ -78,7 +78,7 @@ func TestConsulServiceUpdateService(t *testing.T) { } task := mockTask() - cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", allocID, task.Name)) + cs.SetServiceIdentifier(GenerateServiceIdentifier(allocID, task.Name)) cs.SetAddrFinder(task.FindHostAndPortFor) if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) @@ -92,8 +92,8 @@ func TestConsulServiceUpdateService(t *testing.T) { t.Fatalf("err: %v", err) } // Make sure all the services and checks are still present - service1ID := service1.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) - service2ID := service2.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) + service1ID := service1.ID(GenerateServiceIdentifier(allocID, task.Name)) + service2ID := service2.ID(GenerateServiceIdentifier(allocID, task.Name)) if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil { t.Fatalf("err : %v", err) } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 9552c2892..c95437dca 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -436,7 +436,7 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { return err } cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck) - cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", e.ctx.AllocID, e.ctx.Task.Name)) + cs.SetServiceIdentifier(consul.GenerateServiceIdentifier(e.ctx.AllocID, e.ctx.Task.Name)) cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) e.consulService = cs } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7f3428b3e..f0bfc4510 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1474,6 +1474,10 @@ const ( NomadConsulPrefix = "nomad-registered-service" ) +var ( + AgentServicePrefix = fmt.Sprintf("%s-%s", NomadConsulPrefix, "agent") +) + // The Service model represents a Consul service defintion type Service struct { Name string // Name of the service, defaults to id From f2de4f20193f014e8bcab3031edfe15b13ff8998 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sat, 14 May 2016 01:09:05 -0700 Subject: [PATCH 16/17] Using advertise addr for the http address --- command/agent/agent.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/command/agent/agent.go b/command/agent/agent.go index 288058fd3..78c463913 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -158,6 +158,9 @@ func (a *Agent) serverConfig() (*nomad.Config, error) { conf.SerfConfig.MemberlistConfig.BindPort = port } a.serverHTTPAddr = fmt.Sprintf("%v:%v", a.config.Addresses.HTTP, a.config.Ports.HTTP) + if a.config.AdvertiseAddrs.HTTP != "" { + a.serverHTTPAddr = a.config.AdvertiseAddrs.HTTP + } if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" { dur, err := time.ParseDuration(gcThreshold) From 9c60703e5251cf39f08cc4f59264c7261725cea6 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sat, 14 May 2016 01:20:51 -0700 Subject: [PATCH 17/17] Testing if stats is present and it is a map --- command/check.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/command/check.go b/command/check.go index d35e5d268..0eba98c1e 100644 --- a/command/check.go +++ b/command/check.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "reflect" "strconv" "strings" "time" @@ -68,6 +69,10 @@ func (c *AgentCheckCommand) Run(args []string) int { c.Ui.Output(fmt.Sprintf("unable to query agent info: %v", err)) return HealthCritical } + if stats, ok := info["stats"]; !ok && (reflect.TypeOf(stats).Kind() == reflect.Map) { + c.Ui.Error("error getting stats from the agent api") + return 1 + } if _, ok := info["stats"]["nomad"]; ok { return c.checkServerHealth(info["stats"], minPeers) }