From ccd9c14dd2dbee9d85782835e7ead643a974fbe4 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 22 Jan 2020 08:21:33 -0500 Subject: [PATCH 1/6] extract leader step function --- nomad/leader.go | 60 ++++++++++++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index ec406a4b4..d22cb2d14 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -59,37 +59,41 @@ var defaultSchedulerConfig = &structs.SchedulerConfiguration{ func (s *Server) monitorLeadership() { var weAreLeaderCh chan struct{} var leaderLoop sync.WaitGroup + + leaderStep := func(isLeader bool) { + switch { + case isLeader: + if weAreLeaderCh != nil { + s.logger.Error("attempted to start the leader loop while running") + return + } + + weAreLeaderCh = make(chan struct{}) + leaderLoop.Add(1) + go func(ch chan struct{}) { + defer leaderLoop.Done() + s.leaderLoop(ch) + }(weAreLeaderCh) + s.logger.Info("cluster leadership acquired") + + default: + if weAreLeaderCh == nil { + s.logger.Error("attempted to stop the leader loop while not running") + return + } + + s.logger.Debug("shutting down leader loop") + close(weAreLeaderCh) + leaderLoop.Wait() + weAreLeaderCh = nil + s.logger.Info("cluster leadership lost") + } + } + for { select { case isLeader := <-s.leaderCh: - switch { - case isLeader: - if weAreLeaderCh != nil { - s.logger.Error("attempted to start the leader loop while running") - continue - } - - weAreLeaderCh = make(chan struct{}) - leaderLoop.Add(1) - go func(ch chan struct{}) { - defer leaderLoop.Done() - s.leaderLoop(ch) - }(weAreLeaderCh) - s.logger.Info("cluster leadership acquired") - - default: - if weAreLeaderCh == nil { - s.logger.Error("attempted to stop the leader loop while not running") - continue - } - - s.logger.Debug("shutting down leader loop") - close(weAreLeaderCh) - leaderLoop.Wait() - weAreLeaderCh = nil - s.logger.Info("cluster leadership lost") - } - + leaderStep(isLeader) case <-s.shutdownCh: return } From 2810bf3b6753eaa841f8f7f5e6bcee9ecc856aa9 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 22 Jan 2020 10:55:44 -0500 Subject: [PATCH 2/6] Handle Nomad leadership flapping Fixes a deadlock in leadership handling if leadership flapped. Raft propagates leadership transition to Nomad through a NotifyCh channel. Raft blocks when writing to this channel, so channel must be buffered or aggressively consumed[1]. Otherwise, Raft blocks indefinitely in `raft.runLeader` until the channel is consumed[1] and does not move on to executing follower related logic (in `raft.runFollower`). While Raft `runLeader` defer function blocks, raft cannot process any other raft operations. For example, `run{Leader|Follower}` methods consume `raft.applyCh`, and while runLeader defer is blocked, all raft log applications or config lookup will block indefinitely. Sadly, `leaderLoop` and `establishLeader` makes few Raft calls! `establishLeader` attempts to auto-create autopilot/scheduler config [3]; and `leaderLoop` attempts to check raft configuration [4]. All of these calls occur without a timeout. Thus, if leadership flapped quickly while `leaderLoop/establishLeadership` is invoked and hit any of these Raft calls, Raft handler _deadlock_ forever. Depending on how many times it flapped and where exactly we get stuck, I suspect it's possible to get in the following case: * Agent metrics/stats http and RPC calls hang as they check raft.Configurations * raft.State remains in Leader state, and server attempts to handle RPC calls (e.g. node/alloc updates) and these hang as well As we create goroutines per RPC call, the number of goroutines grow over time and may trigger a out of memory errors in addition to missed updates. [1] https://github.com/hashicorp/raft/blob/d90d6d6bdacf1b35d66940b07be515b074d89e88/config.go#L190-L193 [2] https://github.com/hashicorp/raft/blob/d90d6d6bdacf1b35d66940b07be515b074d89e88/raft.go#L425-L436 [3] https://github.com/hashicorp/nomad/blob/2a89e477465adbe6a88987f0dcb9fe80145d7b2f/nomad/leader.go#L198-L202 [4] https://github.com/hashicorp/nomad/blob/2a89e477465adbe6a88987f0dcb9fe80145d7b2f/nomad/leader.go#L877 --- nomad/leader.go | 21 +++++++++++++- nomad/server.go | 4 +-- nomad/util.go | 65 +++++++++++++++++++++++++++++++++++++++++ nomad/util_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 159 insertions(+), 3 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index d22cb2d14..d91b16158 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -90,10 +90,29 @@ func (s *Server) monitorLeadership() { } } + wasLeader := false for { select { case isLeader := <-s.leaderCh: - leaderStep(isLeader) + if wasLeader != isLeader { + wasLeader = isLeader + // normal case where we went through a transition + leaderStep(isLeader) + } else if wasLeader && isLeader { + // Server lost but then gained leadership immediately. + // During this time, this server may have received + // Raft transitions that haven't been applied to the FSM + // yet. + // Ensure that that FSM caught up and eval queues are refreshed + s.logger.Error("cluster leadership flapped, lost and gained leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).") + + leaderStep(false) + leaderStep(true) + } else { + // Server gained but lost leadership immediately + // before it reacted; nothing to do, move on + s.logger.Error("cluster leadership flapped, gained and lost leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).") + } case <-s.shutdownCh: return } diff --git a/nomad/server.go b/nomad/server.go index a902beb41..6b7765edf 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1234,10 +1234,10 @@ func (s *Server) setupRaft() error { } } - // Setup the leader channel + // Setup the leader channel; that keeps the latest leadership alone leaderCh := make(chan bool, 1) s.config.RaftConfig.NotifyCh = leaderCh - s.leaderCh = leaderCh + s.leaderCh = dropButLastChannel(leaderCh, s.shutdownCh) // Setup the Raft store s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans) diff --git a/nomad/util.go b/nomad/util.go index e2772a73c..f3de97b9e 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -301,3 +301,68 @@ func getAlloc(state AllocGetter, allocID string) (*structs.Allocation, error) { return alloc, nil } + +// dropButLastChannel returns a channel that drops all but last value from sourceCh. +// +// Useful for aggressively consuming sourceCh when intermediate values aren't relevant. +// +// This function propagates values to result quickly and drops intermediate messages +// in best effort basis. Golang scheduler may delay delivery or result in extra +// deliveries. +// +// Consider this function for example: +// +// ``` +// src := make(chan bool) +// dst := dropButLastChannel(src, nil) +// +// go func() { +// src <- true +// src <- false +// }() +// +// // v can be `true` here but is very unlikely +// v := <-dst +// ``` +// +func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan bool { + // buffer the most recent result + dst := make(chan bool) + + go func() { + lv := false + + DEQUE_SOURCE: + // wait for first message + select { + case lv = <-sourceCh: + goto ENQUEUE_DST + case <-shutdownCh: + return + } + + ENQUEUE_DST: + // prioritize draining source first dequeue without blocking + for { + select { + case lv = <-sourceCh: + default: + break ENQUEUE_DST + } + } + + // attempt to enqueue but keep monitoring source channel + select { + case lv = <-sourceCh: + goto ENQUEUE_DST + case dst <- lv: + // enqueued value; back to dequeing from source + goto DEQUE_SOURCE + case <-shutdownCh: + return + } + }() + + return dst + +} diff --git a/nomad/util_test.go b/nomad/util_test.go index b1df2e523..bca35c879 100644 --- a/nomad/util_test.go +++ b/nomad/util_test.go @@ -4,6 +4,7 @@ import ( "net" "reflect" "testing" + "time" version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/helper/uuid" @@ -258,3 +259,74 @@ func TestMaxUint64(t *testing.T) { t.Fatalf("bad") } } + +func TestDropButLastChannelDropsValues(t *testing.T) { + sourceCh := make(chan bool) + shutdownCh := make(chan struct{}) + + dstCh := dropButLastChannel(sourceCh, shutdownCh) + + // timeout duration for any channel propagation delay + timeoutDuration := 5 * time.Millisecond + + // test that dstCh doesn't emit anything initially + select { + case <-dstCh: + require.Fail(t, "received a message unexpectedly") + case <-time.After(timeoutDuration): + // yay no message - it could have been a default: but + // checking for goroutine effect + } + + sourceCh <- false + select { + case v := <-dstCh: + require.False(t, v, "unexpected value from dstCh Ch") + case <-time.After(timeoutDuration): + require.Fail(t, "timed out waiting for source->dstCh propagation") + } + + // channel is drained now + select { + case v := <-dstCh: + require.Failf(t, "received a message unexpectedly", "value: %v", v) + case <-time.After(timeoutDuration): + // yay no message - it could have been a default: but + // checking for goroutine effect + } + + // now enqueue many messages and ensure only last one is received + // enqueueing should be fast! + sourceCh <- false + sourceCh <- false + sourceCh <- false + sourceCh <- false + sourceCh <- true + + // I suspect that dstCh may contain a stale (i.e. `false`) value if golang executes + // this select before the implementation goroutine dequeues last value. + // + // However, never got it to fail in test - so leaving it now to see if it ever fails; + // and if/when test fails, we can learn of how much of an issue it is and adjust + select { + case v := <-dstCh: + require.True(t, v, "unexpected value from dstCh Ch") + case <-time.After(timeoutDuration): + require.Fail(t, "timed out waiting for source->dstCh propagation") + } + + sourceCh <- true + sourceCh <- true + sourceCh <- true + sourceCh <- true + sourceCh <- true + sourceCh <- false + select { + case v := <-dstCh: + require.False(t, v, "unexpected value from dstCh Ch") + case <-time.After(timeoutDuration): + require.Fail(t, "timed out waiting for source->dstCh propagation") + } + + close(shutdownCh) +} From 09124007aa52d680216e1350081db022a90f4eaf Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 28 Jan 2020 09:06:52 -0500 Subject: [PATCH 3/6] include test and address review comments --- nomad/leader.go | 27 ++++++++++++------------- nomad/util_test.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 14 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index d91b16158..f72a52614 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -61,8 +61,7 @@ func (s *Server) monitorLeadership() { var leaderLoop sync.WaitGroup leaderStep := func(isLeader bool) { - switch { - case isLeader: + if isLeader { if weAreLeaderCh != nil { s.logger.Error("attempted to start the leader loop while running") return @@ -75,19 +74,19 @@ func (s *Server) monitorLeadership() { s.leaderLoop(ch) }(weAreLeaderCh) s.logger.Info("cluster leadership acquired") - - default: - if weAreLeaderCh == nil { - s.logger.Error("attempted to stop the leader loop while not running") - return - } - - s.logger.Debug("shutting down leader loop") - close(weAreLeaderCh) - leaderLoop.Wait() - weAreLeaderCh = nil - s.logger.Info("cluster leadership lost") + return } + + if weAreLeaderCh == nil { + s.logger.Error("attempted to stop the leader loop while not running") + return + } + + s.logger.Debug("shutting down leader loop") + close(weAreLeaderCh) + leaderLoop.Wait() + weAreLeaderCh = nil + s.logger.Info("cluster leadership lost") } wasLeader := false diff --git a/nomad/util_test.go b/nomad/util_test.go index bca35c879..145ef68d8 100644 --- a/nomad/util_test.go +++ b/nomad/util_test.go @@ -330,3 +330,52 @@ func TestDropButLastChannelDropsValues(t *testing.T) { close(shutdownCh) } + +// TestDropButLastChannel_DeliversMessages asserts that last +// message is always delivered, some messages are dropped but never +// introduce new messages. +// On tight loop, receivers may get some intermediary messages. +func TestDropButLastChannel_DeliversMessages(t *testing.T) { + sourceCh := make(chan bool) + shutdownCh := make(chan struct{}) + + dstCh := dropButLastChannel(sourceCh, shutdownCh) + + // timeout duration for any channel propagation delay + timeoutDuration := 5 * time.Millisecond + + sentMessages := 100 + go func() { + for i := 0; i < sentMessages-1; i++ { + sourceCh <- true + } + sourceCh <- false + }() + + receivedTrue, receivedFalse := 0, 0 + var lastReceived *bool + +RECEIVE_LOOP: + for { + select { + case v := <-dstCh: + lastReceived = &v + if v { + receivedTrue++ + } else { + receivedFalse++ + } + + case <-time.After(timeoutDuration): + break RECEIVE_LOOP + } + } + + t.Logf("receiver got %v out %v true messages, and %v out of %v false messages", + receivedTrue, sentMessages-1, receivedFalse, 1) + + require.NotNil(t, lastReceived) + require.False(t, *lastReceived) + require.Equal(t, 1, receivedFalse) + require.LessOrEqual(t, receivedTrue, sentMessages-1) +} From 97f20bddf46632107384c6892ac9a720904bf5aa Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 28 Jan 2020 09:38:51 -0500 Subject: [PATCH 4/6] handle channel close signal Always deliver last value then send close signal. --- nomad/util.go | 34 +++++++++++++++++++++++++++--- nomad/util_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/nomad/util.go b/nomad/util.go index f3de97b9e..8b8594661 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -330,12 +330,21 @@ func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan b dst := make(chan bool) go func() { + // last value received lv := false + // ok source was closed + ok := false + // received message since last delivery to destination + messageReceived := false DEQUE_SOURCE: // wait for first message select { - case lv = <-sourceCh: + case lv, ok = <-sourceCh: + if !ok { + goto SOURCE_CLOSED + } + messageReceived = true goto ENQUEUE_DST case <-shutdownCh: return @@ -345,7 +354,11 @@ func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan b // prioritize draining source first dequeue without blocking for { select { - case lv = <-sourceCh: + case lv, ok = <-sourceCh: + if !ok { + goto SOURCE_CLOSED + } + messageReceived = true default: break ENQUEUE_DST } @@ -353,14 +366,29 @@ func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan b // attempt to enqueue but keep monitoring source channel select { - case lv = <-sourceCh: + case lv, ok = <-sourceCh: + if !ok { + goto SOURCE_CLOSED + } + messageReceived = true goto ENQUEUE_DST case dst <- lv: + messageReceived = false // enqueued value; back to dequeing from source goto DEQUE_SOURCE case <-shutdownCh: return } + + SOURCE_CLOSED: + if messageReceived { + select { + case dst <- lv: + case <-shutdownCh: + return + } + } + close(dst) }() return dst diff --git a/nomad/util_test.go b/nomad/util_test.go index 145ef68d8..67bd5cab9 100644 --- a/nomad/util_test.go +++ b/nomad/util_test.go @@ -379,3 +379,55 @@ RECEIVE_LOOP: require.Equal(t, 1, receivedFalse) require.LessOrEqual(t, receivedTrue, sentMessages-1) } + +// TestDropButLastChannel_DeliversMessages_Close asserts that last +// message is always delivered, some messages are dropped but never +// introduce new messages, even with a closed signal. +func TestDropButLastChannel_DeliversMessages_Close(t *testing.T) { + sourceCh := make(chan bool) + shutdownCh := make(chan struct{}) + + dstCh := dropButLastChannel(sourceCh, shutdownCh) + + // timeout duration for any channel propagation delay + timeoutDuration := 5 * time.Millisecond + + sentMessages := 100 + go func() { + for i := 0; i < sentMessages-1; i++ { + sourceCh <- true + } + sourceCh <- false + close(sourceCh) + }() + + receivedTrue, receivedFalse := 0, 0 + var lastReceived *bool + +RECEIVE_LOOP: + for { + select { + case v, ok := <-dstCh: + if !ok { + break RECEIVE_LOOP + } + lastReceived = &v + if v { + receivedTrue++ + } else { + receivedFalse++ + } + + case <-time.After(timeoutDuration): + require.Fail(t, "timed out while waiting for messages") + } + } + + t.Logf("receiver got %v out %v true messages, and %v out of %v false messages", + receivedTrue, sentMessages-1, receivedFalse, 1) + + require.NotNil(t, lastReceived) + require.False(t, *lastReceived) + require.Equal(t, 1, receivedFalse) + require.LessOrEqual(t, receivedTrue, sentMessages-1) +} From 94a75b474400aed7e1b3c56b1a373205cef151bb Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 28 Jan 2020 09:49:36 -0500 Subject: [PATCH 5/6] tweak leadership flapping log messages --- nomad/leader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index f72a52614..4a85613ee 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -103,14 +103,14 @@ func (s *Server) monitorLeadership() { // Raft transitions that haven't been applied to the FSM // yet. // Ensure that that FSM caught up and eval queues are refreshed - s.logger.Error("cluster leadership flapped, lost and gained leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).") + s.logger.Warn("cluster leadership lost and gained leadership immediately. Could indicate network issues, memory paging, or high CPU load.") leaderStep(false) leaderStep(true) } else { // Server gained but lost leadership immediately // before it reacted; nothing to do, move on - s.logger.Error("cluster leadership flapped, gained and lost leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).") + s.logger.Warn("cluster leadership gained and lost leadership immediately. Could indicate network issues, memory paging, or high CPU load.") } case <-s.shutdownCh: return From 8ae03c32faec70ffa0197780df7c779675cd55b5 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 28 Jan 2020 09:53:48 -0500 Subject: [PATCH 6/6] tests: defer closing shutdownCh --- nomad/util_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nomad/util_test.go b/nomad/util_test.go index 67bd5cab9..bb6f68620 100644 --- a/nomad/util_test.go +++ b/nomad/util_test.go @@ -263,6 +263,7 @@ func TestMaxUint64(t *testing.T) { func TestDropButLastChannelDropsValues(t *testing.T) { sourceCh := make(chan bool) shutdownCh := make(chan struct{}) + defer close(shutdownCh) dstCh := dropButLastChannel(sourceCh, shutdownCh) @@ -327,8 +328,6 @@ func TestDropButLastChannelDropsValues(t *testing.T) { case <-time.After(timeoutDuration): require.Fail(t, "timed out waiting for source->dstCh propagation") } - - close(shutdownCh) } // TestDropButLastChannel_DeliversMessages asserts that last @@ -338,6 +337,7 @@ func TestDropButLastChannelDropsValues(t *testing.T) { func TestDropButLastChannel_DeliversMessages(t *testing.T) { sourceCh := make(chan bool) shutdownCh := make(chan struct{}) + defer close(shutdownCh) dstCh := dropButLastChannel(sourceCh, shutdownCh) @@ -386,6 +386,7 @@ RECEIVE_LOOP: func TestDropButLastChannel_DeliversMessages_Close(t *testing.T) { sourceCh := make(chan bool) shutdownCh := make(chan struct{}) + defer close(shutdownCh) dstCh := dropButLastChannel(sourceCh, shutdownCh)