Add cluster join command line options and configuration options

This commit is contained in:
Chris Aubuchon
2015-12-02 13:55:29 -06:00
parent d5df69bde9
commit 0d0fe0a408
4 changed files with 215 additions and 42 deletions

View File

@@ -71,6 +71,11 @@ func (c *Command) readConfig() *Config {
// Server-only options
flags.IntVar(&cmdConfig.Server.BootstrapExpect, "bootstrap-expect", 0, "")
flags.BoolVar(&cmdConfig.Server.RejoinAfterLeave, "rejoin", false, "")
flags.Var((*sliceflag.StringFlag)(&cmdConfig.Server.StartJoin), "join", "")
flags.Var((*sliceflag.StringFlag)(&cmdConfig.Server.RetryJoin), "retry-join", "")
flags.IntVar(&cmdConfig.Server.RetryMaxAttempts, "retry-max", 0, "")
flags.StringVar(&cmdConfig.Server.RetryInterval, "retry-interval", "", "")
// Client-only options
flags.StringVar(&cmdConfig.Client.StateDir, "state-dir", "", "")
@@ -100,6 +105,15 @@ func (c *Command) readConfig() *Config {
return nil
}
if cmdConfig.Server.RetryInterval != "" {
dur, err := time.ParseDuration(cmdConfig.Server.RetryInterval)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error: %s", err))
return nil
}
cmdConfig.Server.retryInterval = dur
}
// Split the servers.
if servers != "" {
cmdConfig.Client.Servers = strings.Split(servers, ",")
@@ -358,6 +372,12 @@ func (c *Command) Run(args []string) int {
}
}()
// Join startup nodes if specified
if err := c.startupJoin(config); err != nil {
c.Ui.Error(err.Error())
return 1
}
// Compile agent information for output later
info := make(map[string]string)
info["client"] = strconv.FormatBool(config.Client.Enabled)
@@ -396,12 +416,16 @@ func (c *Command) Run(args []string) int {
// Enable log streaming
logGate.Flush()
// Start retry join process
errCh := make(chan struct{})
go c.retryJoin(config, errCh)
// Wait for exit
return c.handleSignals(config)
return c.handleSignals(config, errCh)
}
// handleSignals blocks until we get an exit-causing signal
func (c *Command) handleSignals(config *Config) int {
func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}) int {
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
@@ -413,6 +437,8 @@ WAIT:
sig = s
case <-c.ShutdownCh:
sig = os.Interrupt
case <-retryJoin:
return 1
}
c.Ui.Output(fmt.Sprintf("Caught signal: %v", sig))
@@ -559,6 +585,52 @@ func (c *Command) setupSCADA(config *Config) error {
return nil
}
func (c *Command) startupJoin(config *Config) error {
if len(config.Server.StartJoin) == 0 || !config.Server.Enabled {
return nil
}
c.Ui.Output("Joining cluster...")
n, err := c.agent.server.Join(config.Server.StartJoin)
if err != nil {
return err
}
c.Ui.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n))
return nil
}
// retryJoin is used to handle retrying a join until it succeeds or all retries
// are exhausted.
func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) {
if len(config.Server.RetryJoin) == 0 || !config.Server.Enabled {
return
}
logger := c.agent.logger
logger.Printf("[INFO] agent: Joining cluster...")
attempt := 0
for {
n, err := c.agent.server.Join(config.Server.RetryJoin)
if err == nil {
logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
return
}
attempt++
if config.Server.RetryMaxAttempts > 0 && attempt > config.Server.RetryMaxAttempts {
logger.Printf("[ERROR] agent: max join retry exhausted, exiting")
close(errCh)
return
}
logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err,
config.Server.RetryInterval)
time.Sleep(config.Server.retryInterval)
}
}
func (c *Command) Synopsis() string {
return "Runs a Nomad agent"
}
@@ -632,6 +704,24 @@ Server Options:
bootstrapping the cluster. Once <num> servers have joined eachother,
Nomad initiates the bootstrap process.
-join=<address>
Address of an agent to join at start time. Can be specified
multiple times.
-retry-join=<address>
Address of an agent to join at start time with retries enabled.
Can be specified multiple times.
-retry-max=<num>
Maximum number of join attempts. Defaults to 0, which will retry
indefinitely.
-retry-interval=<dur>
Time to wait between join attempts.
-rejoin
Ignore a previous leave and attempts to rejoin the cluster.
Client Options:
-client

View File

@@ -1,11 +1,14 @@
package agent
import (
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"testing"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/cli"
)
@@ -69,3 +72,58 @@ func TestCommand_Args(t *testing.T) {
}
}
}
func TestRetryJoin(t *testing.T) {
dir, agent := makeAgent(t, nil)
defer os.RemoveAll(dir)
defer agent.Shutdown()
tmpDir, err := ioutil.TempDir("", "nomad")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)
doneCh := make(chan struct{})
shutdownCh := make(chan struct{})
defer func() {
close(shutdownCh)
<-doneCh
}()
cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}
serfAddr := fmt.Sprintf(
"%s:%d",
agent.config.BindAddr,
agent.config.Ports.Serf)
args := []string{
"-server",
"-data-dir", tmpDir,
"-node", fmt.Sprintf(`"Node %d"`, getPort()),
"-retry-join", serfAddr,
"-retry-interval", "1s",
}
go func() {
if code := cmd.Run(args); code != 0 {
log.Printf("bad: %d", code)
}
close(doneCh)
}()
testutil.WaitForResult(func() (bool, error) {
mem := agent.server.Members()
if len(mem) != 2 {
return false, fmt.Errorf("bad :%#v", mem)
}
return true, nil
}, func(err error) {
t.Fatalf(err.Error())
})
}

View File

@@ -9,6 +9,7 @@ import (
"path/filepath"
"runtime"
"strings"
"time"
"github.com/hashicorp/hcl"
client "github.com/hashicorp/nomad/client/config"
@@ -181,6 +182,31 @@ type ServerConfig struct {
// NodeGCThreshold contros how "old" a node must be to be collected by GC.
NodeGCThreshold string `hcl:"node_gc_threshold"`
// StartJoin is a list of addresses to attempt to join when the
// agent starts. If Serf is unable to communicate with any of these
// addresses, then the agent will error and exit.
StartJoin []string `hcl:"start_join"`
// RetryJoin is a list of addresses to join with retry enabled.
RetryJoin []string `hcl:"retry_join"`
// RetryMaxAttempts specifies the maximum number of times to retry joining a
// host on startup. This is useful for cases where we know the node will be
// online eventually.
RetryMaxAttempts int `hcl:"retry_max"`
// RetryInterval specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
RetryInterval string `hcl:"retry_interval"`
retryInterval time.Duration `hcl:"-"`
// RejoinAfterLeave controls our interaction with the cluster after leave.
// When set to false (default), a leave causes Consul to not rejoin
// the cluster until an explicit join is received. If this is set to
// true, we ignore the leave, and rejoin the cluster on start.
RejoinAfterLeave bool `hcl:"rejoin_after_leave"`
}
// Telemetry is the telemetry configuration for the server
@@ -252,7 +278,9 @@ func DefaultConfig() *Config {
NetworkSpeed: 100,
},
Server: &ServerConfig{
Enabled: false,
Enabled: false,
StartJoin: []string{},
RetryJoin: []string{},
},
}
}
@@ -358,14 +386,6 @@ func (a *Config) Merge(b *Config) *Config {
result.AdvertiseAddrs = result.AdvertiseAddrs.Merge(b.AdvertiseAddrs)
}
// Apply the Atlas configuration
if result.Atlas == nil && b.Atlas != nil {
atlasConfig := *b.Atlas
result.Atlas = &atlasConfig
} else if b.Atlas != nil {
result.Atlas = result.Atlas.Merge(b.Atlas)
}
return &result
}
@@ -391,10 +411,30 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.NodeGCThreshold != "" {
result.NodeGCThreshold = b.NodeGCThreshold
}
if b.RetryMaxAttempts != 0 {
result.RetryMaxAttempts = b.RetryMaxAttempts
}
if b.RetryInterval != "" {
result.RetryInterval = b.RetryInterval
result.retryInterval = b.retryInterval
}
if b.RejoinAfterLeave {
result.RejoinAfterLeave = true
}
// Add the schedulers
result.EnabledSchedulers = append(result.EnabledSchedulers, b.EnabledSchedulers...)
// Copy the start join addresses
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))
result.StartJoin = append(result.StartJoin, a.StartJoin...)
result.StartJoin = append(result.StartJoin, b.StartJoin...)
// Copy the retry join addresses
result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin))
result.RetryJoin = append(result.RetryJoin, a.RetryJoin...)
result.RetryJoin = append(result.RetryJoin, b.RetryJoin...)
return &result
}
@@ -507,25 +547,6 @@ func (a *AdvertiseAddrs) Merge(b *AdvertiseAddrs) *AdvertiseAddrs {
return &result
}
// Merge merges two Atlas configurations together.
func (a *AtlasConfig) Merge(b *AtlasConfig) *AtlasConfig {
var result AtlasConfig = *a
if b.Infrastructure != "" {
result.Infrastructure = b.Infrastructure
}
if b.Token != "" {
result.Token = b.Token
}
if b.Join {
result.Join = true
}
if b.Endpoint != "" {
result.Endpoint = b.Endpoint
}
return &result
}
// LoadConfig loads the configuration at the given path, regardless if
// its a file or directory.
func LoadConfig(path string) (*Config, error) {

View File

@@ -6,6 +6,7 @@ import (
"path/filepath"
"reflect"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -63,12 +64,6 @@ func TestConfig_Merge(t *testing.T) {
RPC: "127.0.0.1",
Serf: "127.0.0.1",
},
Atlas: &AtlasConfig{
Infrastructure: "hashicorp/test1",
Token: "abc",
Join: false,
Endpoint: "foo",
},
}
c2 := &Config{
@@ -114,6 +109,11 @@ func TestConfig_Merge(t *testing.T) {
NumSchedulers: 2,
EnabledSchedulers: []string{structs.JobTypeBatch},
NodeGCThreshold: "12h",
RejoinAfterLeave: true,
StartJoin: []string{"1.1.1.1"},
RetryJoin: []string{"1.1.1.1"},
RetryInterval: "10s",
retryInterval: time.Second * 10,
},
Ports: &Ports{
HTTP: 20000,
@@ -129,12 +129,6 @@ func TestConfig_Merge(t *testing.T) {
RPC: "127.0.0.2",
Serf: "127.0.0.2",
},
Atlas: &AtlasConfig{
Infrastructure: "hashicorp/test2",
Token: "xyz",
Join: true,
Endpoint: "bar",
},
}
result := c1.Merge(c2)
@@ -384,6 +378,11 @@ func TestConfig_LoadConfigString(t *testing.T) {
NumSchedulers: 2,
EnabledSchedulers: []string{"test"},
NodeGCThreshold: "12h",
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: "15s",
RejoinAfterLeave: true,
RetryMaxAttempts: 3,
},
Telemetry: &Telemetry{
StatsiteAddr: "127.0.0.1:1234",
@@ -457,6 +456,11 @@ server {
num_schedulers = 2
enabled_schedulers = ["test"]
node_gc_threshold = "12h"
retry_join = [ "1.1.1.1", "2.2.2.2" ]
start_join = [ "1.1.1.1", "2.2.2.2" ]
retry_max = 3
retry_interval = "15s"
rejoin_after_leave = true
}
telemetry {
statsite_address = "127.0.0.1:1234"