From e10f3942e799464e306eb818268c958212295f41 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 23 Jul 2015 21:44:17 -0700 Subject: [PATCH] nomad: add eval broker, configurable nack timeout --- nomad/config.go | 8 ++++++++ nomad/eval_broker.go | 8 -------- nomad/eval_broker_test.go | 6 +++--- nomad/server.go | 12 +++++++++++- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/nomad/config.go b/nomad/config.go index 72c792abe..8b593bad2 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -107,6 +107,13 @@ type Config struct { // that are force removed, as well as intermittent unavailability during // leader election. ReconcileInterval time.Duration + + // EvalNackTimeout controls how long we allow a sub-scheduler to + // work on an evaluation before we consider it failed and Nack it. + // This allows that evaluation to be handed to another sub-scheduler + // to work on. Defaults to 60 seconds. This should be long enough that + // no evaluation hits it unless the sub-scheduler has failed. + EvalNackTimeout time.Duration } // CheckVersion is used to check if the ProtocolVersion is valid @@ -137,6 +144,7 @@ func DefaultConfig() *Config { RPCAddr: DefaultRPCAddr, SerfConfig: serf.DefaultConfig(), ReconcileInterval: 60 * time.Second, + EvalNackTimeout: 60 * time.Second, } // Increase our reap interval to 3 days instead of 24h. diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 0abb1122a..8507b1a0e 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -10,14 +10,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -const ( - // DefaultNackTimeout is the default amount of time between when - // a message is dequeued and no Ack/Nack is received before it is - // assumed failed and Nack'd automatically. This is used to recover - // from the failure of a sub-scheduler. - DefaultNackTimeout = 60 * time.Second -) - // EvalBroker is used to manage brokering of evaluations. When an evaluation is // created, due to a change in a job specification or a node, we put it into the // broker. The broker sorts by evaluations by priority and scheduler type. This diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 6173dee98..83f8cddfc 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -16,7 +16,7 @@ var ( func testBroker(t *testing.T, timeout time.Duration) *EvalBroker { if timeout == 0 { - timeout = DefaultNackTimeout + timeout = 5 * time.Second } b, err := NewEvalBroker(timeout) if err != nil { @@ -246,9 +246,9 @@ func TestEvalBroker_Dequeue_Fairness(t *testing.T) { counter -= 1 } - // The odds are less than 1/1024 that + // The odds are less than 1/65536 that // we see the same sequence 10 times in a row - if counter >= 10 || counter <= -10 { + if counter >= 16 || counter <= -16 { t.Fatalf("unlikely sequence: %d", counter) } } diff --git a/nomad/server.go b/nomad/server.go index 52a7ead72..3e65ba290 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -91,6 +91,10 @@ type Server struct { // eventCh is used to receive events from the serf cluster eventCh chan serf.Event + // evalBroker is used to manage the in-progress evaluations + // that are waiting to be brokered to a sub-scheduler + evalBroker *EvalBroker + left bool shutdown bool shutdownCh chan struct{} @@ -121,6 +125,12 @@ func NewServer(config *Config) (*Server, error) { // Create a logger logger := log.New(config.LogOutput, "", log.LstdFlags) + // Create an eval broker + evalBroker, err := NewEvalBroker(config.EvalNackTimeout) + if err != nil { + return nil, err + } + // Create the server s := &Server{ config: config, @@ -131,6 +141,7 @@ func NewServer(config *Config) (*Server, error) { localPeers: make(map[string]*serverParts), reconcileCh: make(chan serf.Member, 32), eventCh: make(chan serf.Event, 256), + evalBroker: evalBroker, shutdownCh: make(chan struct{}), } @@ -148,7 +159,6 @@ func NewServer(config *Config) (*Server, error) { } // Initialize the wan Serf - var err error s.serf, err = s.setupSerf(config.SerfConfig, s.eventCh, serfSnapshot) if err != nil { s.Shutdown()