From 0d0fe0a408ddc164b8e663c7c0635d5dd12a311c Mon Sep 17 00:00:00 2001 From: Chris Aubuchon Date: Wed, 2 Dec 2015 13:55:29 -0600 Subject: [PATCH] Add cluster join command line options and configuration options --- command/agent/command.go | 94 ++++++++++++++++++++++++++++++++++- command/agent/command_test.go | 58 +++++++++++++++++++++ command/agent/config.go | 77 +++++++++++++++++----------- command/agent/config_test.go | 28 ++++++----- 4 files changed, 215 insertions(+), 42 deletions(-) diff --git a/command/agent/command.go b/command/agent/command.go index ecae14d81..796660724 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -71,6 +71,11 @@ func (c *Command) readConfig() *Config { // Server-only options flags.IntVar(&cmdConfig.Server.BootstrapExpect, "bootstrap-expect", 0, "") + flags.BoolVar(&cmdConfig.Server.RejoinAfterLeave, "rejoin", false, "") + flags.Var((*sliceflag.StringFlag)(&cmdConfig.Server.StartJoin), "join", "") + flags.Var((*sliceflag.StringFlag)(&cmdConfig.Server.RetryJoin), "retry-join", "") + flags.IntVar(&cmdConfig.Server.RetryMaxAttempts, "retry-max", 0, "") + flags.StringVar(&cmdConfig.Server.RetryInterval, "retry-interval", "", "") // Client-only options flags.StringVar(&cmdConfig.Client.StateDir, "state-dir", "", "") @@ -100,6 +105,15 @@ func (c *Command) readConfig() *Config { return nil } + if cmdConfig.Server.RetryInterval != "" { + dur, err := time.ParseDuration(cmdConfig.Server.RetryInterval) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error: %s", err)) + return nil + } + cmdConfig.Server.retryInterval = dur + } + // Split the servers. if servers != "" { cmdConfig.Client.Servers = strings.Split(servers, ",") @@ -358,6 +372,12 @@ func (c *Command) Run(args []string) int { } }() + // Join startup nodes if specified + if err := c.startupJoin(config); err != nil { + c.Ui.Error(err.Error()) + return 1 + } + // Compile agent information for output later info := make(map[string]string) info["client"] = strconv.FormatBool(config.Client.Enabled) @@ -396,12 +416,16 @@ func (c *Command) Run(args []string) int { // Enable log streaming logGate.Flush() + // Start retry join process + errCh := make(chan struct{}) + go c.retryJoin(config, errCh) + // Wait for exit - return c.handleSignals(config) + return c.handleSignals(config, errCh) } // handleSignals blocks until we get an exit-causing signal -func (c *Command) handleSignals(config *Config) int { +func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}) int { signalCh := make(chan os.Signal, 4) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) @@ -413,6 +437,8 @@ WAIT: sig = s case <-c.ShutdownCh: sig = os.Interrupt + case <-retryJoin: + return 1 } c.Ui.Output(fmt.Sprintf("Caught signal: %v", sig)) @@ -559,6 +585,52 @@ func (c *Command) setupSCADA(config *Config) error { return nil } +func (c *Command) startupJoin(config *Config) error { + if len(config.Server.StartJoin) == 0 || !config.Server.Enabled { + return nil + } + + c.Ui.Output("Joining cluster...") + n, err := c.agent.server.Join(config.Server.StartJoin) + if err != nil { + return err + } + + c.Ui.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n)) + return nil +} + +// retryJoin is used to handle retrying a join until it succeeds or all retries +// are exhausted. +func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) { + if len(config.Server.RetryJoin) == 0 || !config.Server.Enabled { + return + } + + logger := c.agent.logger + logger.Printf("[INFO] agent: Joining cluster...") + + attempt := 0 + for { + n, err := c.agent.server.Join(config.Server.RetryJoin) + if err == nil { + logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n) + return + } + + attempt++ + if config.Server.RetryMaxAttempts > 0 && attempt > config.Server.RetryMaxAttempts { + logger.Printf("[ERROR] agent: max join retry exhausted, exiting") + close(errCh) + return + } + + logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err, + config.Server.RetryInterval) + time.Sleep(config.Server.retryInterval) + } +} + func (c *Command) Synopsis() string { return "Runs a Nomad agent" } @@ -632,6 +704,24 @@ Server Options: bootstrapping the cluster. Once servers have joined eachother, Nomad initiates the bootstrap process. + -join=
+ Address of an agent to join at start time. Can be specified + multiple times. + + -retry-join=
+ Address of an agent to join at start time with retries enabled. + Can be specified multiple times. + + -retry-max= + Maximum number of join attempts. Defaults to 0, which will retry + indefinitely. + + -retry-interval= + Time to wait between join attempts. + + -rejoin + Ignore a previous leave and attempts to rejoin the cluster. + Client Options: -client diff --git a/command/agent/command_test.go b/command/agent/command_test.go index c68da5a5a..a42261853 100644 --- a/command/agent/command_test.go +++ b/command/agent/command_test.go @@ -1,11 +1,14 @@ package agent import ( + "fmt" "io/ioutil" + "log" "os" "strings" "testing" + "github.com/hashicorp/nomad/testutil" "github.com/mitchellh/cli" ) @@ -69,3 +72,58 @@ func TestCommand_Args(t *testing.T) { } } } + +func TestRetryJoin(t *testing.T) { + dir, agent := makeAgent(t, nil) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + tmpDir, err := ioutil.TempDir("", "nomad") + if err != nil { + t.Fatalf("err: %s", err) + } + defer os.RemoveAll(tmpDir) + + doneCh := make(chan struct{}) + shutdownCh := make(chan struct{}) + + defer func() { + close(shutdownCh) + <-doneCh + }() + + cmd := &Command{ + ShutdownCh: shutdownCh, + Ui: new(cli.MockUi), + } + + serfAddr := fmt.Sprintf( + "%s:%d", + agent.config.BindAddr, + agent.config.Ports.Serf) + + args := []string{ + "-server", + "-data-dir", tmpDir, + "-node", fmt.Sprintf(`"Node %d"`, getPort()), + "-retry-join", serfAddr, + "-retry-interval", "1s", + } + + go func() { + if code := cmd.Run(args); code != 0 { + log.Printf("bad: %d", code) + } + close(doneCh) + }() + + testutil.WaitForResult(func() (bool, error) { + mem := agent.server.Members() + if len(mem) != 2 { + return false, fmt.Errorf("bad :%#v", mem) + } + return true, nil + }, func(err error) { + t.Fatalf(err.Error()) + }) +} diff --git a/command/agent/config.go b/command/agent/config.go index 5f0bfdf78..25f7cd95e 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -9,6 +9,7 @@ import ( "path/filepath" "runtime" "strings" + "time" "github.com/hashicorp/hcl" client "github.com/hashicorp/nomad/client/config" @@ -181,6 +182,31 @@ type ServerConfig struct { // NodeGCThreshold contros how "old" a node must be to be collected by GC. NodeGCThreshold string `hcl:"node_gc_threshold"` + + // StartJoin is a list of addresses to attempt to join when the + // agent starts. If Serf is unable to communicate with any of these + // addresses, then the agent will error and exit. + StartJoin []string `hcl:"start_join"` + + // RetryJoin is a list of addresses to join with retry enabled. + RetryJoin []string `hcl:"retry_join"` + + // RetryMaxAttempts specifies the maximum number of times to retry joining a + // host on startup. This is useful for cases where we know the node will be + // online eventually. + RetryMaxAttempts int `hcl:"retry_max"` + + // RetryInterval specifies the amount of time to wait in between join + // attempts on agent start. The minimum allowed value is 1 second and + // the default is 30s. + RetryInterval string `hcl:"retry_interval"` + retryInterval time.Duration `hcl:"-"` + + // RejoinAfterLeave controls our interaction with the cluster after leave. + // When set to false (default), a leave causes Consul to not rejoin + // the cluster until an explicit join is received. If this is set to + // true, we ignore the leave, and rejoin the cluster on start. + RejoinAfterLeave bool `hcl:"rejoin_after_leave"` } // Telemetry is the telemetry configuration for the server @@ -252,7 +278,9 @@ func DefaultConfig() *Config { NetworkSpeed: 100, }, Server: &ServerConfig{ - Enabled: false, + Enabled: false, + StartJoin: []string{}, + RetryJoin: []string{}, }, } } @@ -358,14 +386,6 @@ func (a *Config) Merge(b *Config) *Config { result.AdvertiseAddrs = result.AdvertiseAddrs.Merge(b.AdvertiseAddrs) } - // Apply the Atlas configuration - if result.Atlas == nil && b.Atlas != nil { - atlasConfig := *b.Atlas - result.Atlas = &atlasConfig - } else if b.Atlas != nil { - result.Atlas = result.Atlas.Merge(b.Atlas) - } - return &result } @@ -391,10 +411,30 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig { if b.NodeGCThreshold != "" { result.NodeGCThreshold = b.NodeGCThreshold } + if b.RetryMaxAttempts != 0 { + result.RetryMaxAttempts = b.RetryMaxAttempts + } + if b.RetryInterval != "" { + result.RetryInterval = b.RetryInterval + result.retryInterval = b.retryInterval + } + if b.RejoinAfterLeave { + result.RejoinAfterLeave = true + } // Add the schedulers result.EnabledSchedulers = append(result.EnabledSchedulers, b.EnabledSchedulers...) + // Copy the start join addresses + result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) + result.StartJoin = append(result.StartJoin, a.StartJoin...) + result.StartJoin = append(result.StartJoin, b.StartJoin...) + + // Copy the retry join addresses + result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin)) + result.RetryJoin = append(result.RetryJoin, a.RetryJoin...) + result.RetryJoin = append(result.RetryJoin, b.RetryJoin...) + return &result } @@ -507,25 +547,6 @@ func (a *AdvertiseAddrs) Merge(b *AdvertiseAddrs) *AdvertiseAddrs { return &result } -// Merge merges two Atlas configurations together. -func (a *AtlasConfig) Merge(b *AtlasConfig) *AtlasConfig { - var result AtlasConfig = *a - - if b.Infrastructure != "" { - result.Infrastructure = b.Infrastructure - } - if b.Token != "" { - result.Token = b.Token - } - if b.Join { - result.Join = true - } - if b.Endpoint != "" { - result.Endpoint = b.Endpoint - } - return &result -} - // LoadConfig loads the configuration at the given path, regardless if // its a file or directory. func LoadConfig(path string) (*Config, error) { diff --git a/command/agent/config_test.go b/command/agent/config_test.go index c13ff91f3..67edceff0 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "reflect" "testing" + "time" "github.com/hashicorp/nomad/nomad/structs" ) @@ -63,12 +64,6 @@ func TestConfig_Merge(t *testing.T) { RPC: "127.0.0.1", Serf: "127.0.0.1", }, - Atlas: &AtlasConfig{ - Infrastructure: "hashicorp/test1", - Token: "abc", - Join: false, - Endpoint: "foo", - }, } c2 := &Config{ @@ -114,6 +109,11 @@ func TestConfig_Merge(t *testing.T) { NumSchedulers: 2, EnabledSchedulers: []string{structs.JobTypeBatch}, NodeGCThreshold: "12h", + RejoinAfterLeave: true, + StartJoin: []string{"1.1.1.1"}, + RetryJoin: []string{"1.1.1.1"}, + RetryInterval: "10s", + retryInterval: time.Second * 10, }, Ports: &Ports{ HTTP: 20000, @@ -129,12 +129,6 @@ func TestConfig_Merge(t *testing.T) { RPC: "127.0.0.2", Serf: "127.0.0.2", }, - Atlas: &AtlasConfig{ - Infrastructure: "hashicorp/test2", - Token: "xyz", - Join: true, - Endpoint: "bar", - }, } result := c1.Merge(c2) @@ -384,6 +378,11 @@ func TestConfig_LoadConfigString(t *testing.T) { NumSchedulers: 2, EnabledSchedulers: []string{"test"}, NodeGCThreshold: "12h", + RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, + StartJoin: []string{"1.1.1.1", "2.2.2.2"}, + RetryInterval: "15s", + RejoinAfterLeave: true, + RetryMaxAttempts: 3, }, Telemetry: &Telemetry{ StatsiteAddr: "127.0.0.1:1234", @@ -457,6 +456,11 @@ server { num_schedulers = 2 enabled_schedulers = ["test"] node_gc_threshold = "12h" + retry_join = [ "1.1.1.1", "2.2.2.2" ] + start_join = [ "1.1.1.1", "2.2.2.2" ] + retry_max = 3 + retry_interval = "15s" + rejoin_after_leave = true } telemetry { statsite_address = "127.0.0.1:1234"