diff --git a/command/agent/agent.go b/command/agent/agent.go index b0e176aa0..860d10bd0 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -235,7 +235,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) { return nil, fmt.Errorf("server_service_name must be set when auto_advertise is enabled") } - // conf.ConsulConfig = a.config.Consul + conf.ConsulConfig = a.config.Consul return conf, nil } @@ -379,7 +379,7 @@ func (a *Agent) setupServer() error { } // Create the server - server, err := nomad.NewServer(conf) + server, err := nomad.NewServer(conf, a.consulSyncer) if err != nil { return fmt.Errorf("server setup failed: %v", err) } diff --git a/nomad/config.go b/nomad/config.go index d998045d9..d6405c5f6 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -176,6 +177,9 @@ type Config struct { // a new leader is elected, since we no longer know the status // of all the heartbeats. FailoverHeartbeatTTL time.Duration + + // ConsulConfig is this Agent's Consul configuration + ConsulConfig *config.ConsulConfig } // CheckVersion is used to check if the ProtocolVersion is valid diff --git a/nomad/server.go b/nomad/server.go index 1932d18b1..191cfc0a5 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -16,7 +16,11 @@ import ( "sync" "time" + consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/tlsutil" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" @@ -25,6 +29,12 @@ import ( ) const ( + // datacenterQueryFactor sets the max number of DCs that a Nomad + // Server will query to find bootstrap_expect servers. If + // bootstrap_expect is 3, then the Nomad Server bootstrapFn handler + // will search through up to 9 Consul DCs to find its quorum. + datacenterQueryFactor = 3 + raftState = "raft/" serfSnapshot = "serf/snapshot" snapshotsRetained = 2 @@ -116,6 +126,9 @@ type Server struct { heartbeatTimers map[string]*time.Timer heartbeatTimersLock sync.Mutex + // consulSyncer advertises this Nomad Agent with Consul + consulSyncer *consul.Syncer + // Worker used for processing workers []*Worker @@ -140,7 +153,7 @@ type endpoints struct { // NewServer is used to construct a new Nomad server from the // configuration, potentially returning an error -func NewServer(config *Config) (*Server, error) { +func NewServer(config *Config, consulSyncer *consul.Syncer) (*Server, error) { // Check the protocol version if err := config.CheckVersion(); err != nil { return nil, err @@ -172,6 +185,7 @@ func NewServer(config *Config) (*Server, error) { // Create the server s := &Server{ config: config, + consulSyncer: consulSyncer, connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, nil), logger: logger, rpcServer: rpc.NewServer(), @@ -218,6 +232,11 @@ func NewServer(config *Config) (*Server, error) { return nil, fmt.Errorf("Failed to start workers: %v", err) } + // Setup the Consul syncer + if err := s.setupConsulSyncer(); err != nil { + return nil, fmt.Errorf("failed to create server Consul syncer: %v") + } + // Monitor leadership changes go s.monitorLeadership() @@ -356,6 +375,91 @@ func (s *Server) Leave() error { return nil } +// setupConsulSyncer creates Server-mode consul.Syncer which periodically +// executes callbacks on a fixed interval. +func (s *Server) setupConsulSyncer() error { + // The bootstrapFn callback handler is used to periodically poll + // Consul to look up the Nomad Servers in Consul. In the event the + // server has been brought up without a `retry-join` configuration + // and this Server is partitioned from the rest of the cluster, + // periodically poll Consul to reattach this Server to other servers + // in the same region and automatically reform a quorum (assuming the + // correct number of servers required for quorum are present). + bootstrapFn := func() error { + // If the the number of Members in Serf is more than the + // bootstrap quorum, do nothing. + if len(s.Members()) < s.config.BootstrapExpect { + return nil + } + + s.logger.Printf("[TRACE] server.consul: lost contact with Nomad quorum, falling back to Consul for server list") + + consulCatalog := s.consulSyncer.ConsulClient().Catalog() + dcs, err := consulCatalog.Datacenters() + if err != nil { + return fmt.Errorf("server.consul: unable to query Consul datacenters: %v", err) + } + if len(dcs) > 2 { + // Query the local DC first, then shuffle the + // remaining DCs. If additional calls to bootstrapFn + // are necessary, this Nomad Server will eventually + // walk all datacenter until it finds enough hosts to + // form a quorum. + nearestDC := dcs[0] + otherDCs := make([]string, 0, len(dcs)) + otherDCs = dcs[1:lib.MinInt(len(dcs), s.config.BootstrapExpect*datacenterQueryFactor)] + shuffleStrings(otherDCs) + + dcs = append([]string{nearestDC}, otherDCs...) + } + + nomadServerServiceName := s.config.ConsulConfig.ServerServiceName + var mErr multierror.Error + const defaultMaxNumNomadServers = 8 + nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) + for _, dc := range dcs { + opts := &consulapi.QueryOptions{ + AllowStale: true, + Datacenter: dc, + Near: "_agent", + WaitTime: consul.DefaultQueryWaitDuration, + } + consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, opts) + if err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err)) + continue + } + + for _, cs := range consulServices { + port := strconv.FormatInt(int64(cs.ServicePort), 10) + addr := cs.ServiceAddress + if addr == "" { + addr = cs.Address + } + serverAddr := net.JoinHostPort(addr, port) + nomadServerServices = append(nomadServerServices, serverAddr) + } + } + if len(nomadServerServices) == 0 { + if len(mErr.Errors) > 0 { + return mErr.ErrorOrNil() + } + + return fmt.Errorf("no Nomad Servers advertising service %+q in Consul datacenters: %+q", nomadServerServiceName, dcs) + } + numServersContacted, err := s.Join(nomadServerServices) + if err != nil { + return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err) + } + s.logger.Printf("[INFO] successfully contacted %d Nomad Servers", numServersContacted) + + return nil + } + s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn) + + return nil +} + // setupRPC is used to setup the RPC listener func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { // Create endpoints diff --git a/nomad/server_test.go b/nomad/server_test.go index a44cb88da..e93f63305 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -3,11 +3,14 @@ package nomad import ( "fmt" "io/ioutil" + "log" "net" + "os" "sync/atomic" "testing" "time" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/testutil" ) @@ -63,8 +66,14 @@ func testServer(t *testing.T, cb func(*Config)) *Server { // Enable raft as leader if we have bootstrap on config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap + shutdownCh := make(chan struct{}) + consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, log.New(os.Stderr, "", log.LstdFlags)) + if err != nil { + t.Fatalf("err: %v", err) + } + // Create server - server, err := NewServer(config) + server, err := NewServer(config, consulSyncer) if err != nil { t.Fatalf("err: %v", err) }