mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 02:45:42 +03:00
Merge pull request #6977 from hashicorp/b-leadership-flapping-2
Handle Nomad leadership flapping (attempt 2)
This commit is contained in:
@@ -59,37 +59,59 @@ var defaultSchedulerConfig = &structs.SchedulerConfiguration{
|
||||
func (s *Server) monitorLeadership() {
|
||||
var weAreLeaderCh chan struct{}
|
||||
var leaderLoop sync.WaitGroup
|
||||
|
||||
leaderStep := func(isLeader bool) {
|
||||
if isLeader {
|
||||
if weAreLeaderCh != nil {
|
||||
s.logger.Error("attempted to start the leader loop while running")
|
||||
return
|
||||
}
|
||||
|
||||
weAreLeaderCh = make(chan struct{})
|
||||
leaderLoop.Add(1)
|
||||
go func(ch chan struct{}) {
|
||||
defer leaderLoop.Done()
|
||||
s.leaderLoop(ch)
|
||||
}(weAreLeaderCh)
|
||||
s.logger.Info("cluster leadership acquired")
|
||||
return
|
||||
}
|
||||
|
||||
if weAreLeaderCh == nil {
|
||||
s.logger.Error("attempted to stop the leader loop while not running")
|
||||
return
|
||||
}
|
||||
|
||||
s.logger.Debug("shutting down leader loop")
|
||||
close(weAreLeaderCh)
|
||||
leaderLoop.Wait()
|
||||
weAreLeaderCh = nil
|
||||
s.logger.Info("cluster leadership lost")
|
||||
}
|
||||
|
||||
wasLeader := false
|
||||
for {
|
||||
select {
|
||||
case isLeader := <-s.leaderCh:
|
||||
switch {
|
||||
case isLeader:
|
||||
if weAreLeaderCh != nil {
|
||||
s.logger.Error("attempted to start the leader loop while running")
|
||||
continue
|
||||
}
|
||||
if wasLeader != isLeader {
|
||||
wasLeader = isLeader
|
||||
// normal case where we went through a transition
|
||||
leaderStep(isLeader)
|
||||
} else if wasLeader && isLeader {
|
||||
// Server lost but then gained leadership immediately.
|
||||
// During this time, this server may have received
|
||||
// Raft transitions that haven't been applied to the FSM
|
||||
// yet.
|
||||
// Ensure that that FSM caught up and eval queues are refreshed
|
||||
s.logger.Warn("cluster leadership lost and gained leadership immediately. Could indicate network issues, memory paging, or high CPU load.")
|
||||
|
||||
weAreLeaderCh = make(chan struct{})
|
||||
leaderLoop.Add(1)
|
||||
go func(ch chan struct{}) {
|
||||
defer leaderLoop.Done()
|
||||
s.leaderLoop(ch)
|
||||
}(weAreLeaderCh)
|
||||
s.logger.Info("cluster leadership acquired")
|
||||
|
||||
default:
|
||||
if weAreLeaderCh == nil {
|
||||
s.logger.Error("attempted to stop the leader loop while not running")
|
||||
continue
|
||||
}
|
||||
|
||||
s.logger.Debug("shutting down leader loop")
|
||||
close(weAreLeaderCh)
|
||||
leaderLoop.Wait()
|
||||
weAreLeaderCh = nil
|
||||
s.logger.Info("cluster leadership lost")
|
||||
leaderStep(false)
|
||||
leaderStep(true)
|
||||
} else {
|
||||
// Server gained but lost leadership immediately
|
||||
// before it reacted; nothing to do, move on
|
||||
s.logger.Warn("cluster leadership gained and lost leadership immediately. Could indicate network issues, memory paging, or high CPU load.")
|
||||
}
|
||||
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1234,10 +1234,10 @@ func (s *Server) setupRaft() error {
|
||||
}
|
||||
}
|
||||
|
||||
// Setup the leader channel
|
||||
// Setup the leader channel; that keeps the latest leadership alone
|
||||
leaderCh := make(chan bool, 1)
|
||||
s.config.RaftConfig.NotifyCh = leaderCh
|
||||
s.leaderCh = leaderCh
|
||||
s.leaderCh = dropButLastChannel(leaderCh, s.shutdownCh)
|
||||
|
||||
// Setup the Raft store
|
||||
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
|
||||
|
||||
@@ -301,3 +301,96 @@ func getAlloc(state AllocGetter, allocID string) (*structs.Allocation, error) {
|
||||
|
||||
return alloc, nil
|
||||
}
|
||||
|
||||
// dropButLastChannel returns a channel that drops all but last value from sourceCh.
|
||||
//
|
||||
// Useful for aggressively consuming sourceCh when intermediate values aren't relevant.
|
||||
//
|
||||
// This function propagates values to result quickly and drops intermediate messages
|
||||
// in best effort basis. Golang scheduler may delay delivery or result in extra
|
||||
// deliveries.
|
||||
//
|
||||
// Consider this function for example:
|
||||
//
|
||||
// ```
|
||||
// src := make(chan bool)
|
||||
// dst := dropButLastChannel(src, nil)
|
||||
//
|
||||
// go func() {
|
||||
// src <- true
|
||||
// src <- false
|
||||
// }()
|
||||
//
|
||||
// // v can be `true` here but is very unlikely
|
||||
// v := <-dst
|
||||
// ```
|
||||
//
|
||||
func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan bool {
|
||||
// buffer the most recent result
|
||||
dst := make(chan bool)
|
||||
|
||||
go func() {
|
||||
// last value received
|
||||
lv := false
|
||||
// ok source was closed
|
||||
ok := false
|
||||
// received message since last delivery to destination
|
||||
messageReceived := false
|
||||
|
||||
DEQUE_SOURCE:
|
||||
// wait for first message
|
||||
select {
|
||||
case lv, ok = <-sourceCh:
|
||||
if !ok {
|
||||
goto SOURCE_CLOSED
|
||||
}
|
||||
messageReceived = true
|
||||
goto ENQUEUE_DST
|
||||
case <-shutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
ENQUEUE_DST:
|
||||
// prioritize draining source first dequeue without blocking
|
||||
for {
|
||||
select {
|
||||
case lv, ok = <-sourceCh:
|
||||
if !ok {
|
||||
goto SOURCE_CLOSED
|
||||
}
|
||||
messageReceived = true
|
||||
default:
|
||||
break ENQUEUE_DST
|
||||
}
|
||||
}
|
||||
|
||||
// attempt to enqueue but keep monitoring source channel
|
||||
select {
|
||||
case lv, ok = <-sourceCh:
|
||||
if !ok {
|
||||
goto SOURCE_CLOSED
|
||||
}
|
||||
messageReceived = true
|
||||
goto ENQUEUE_DST
|
||||
case dst <- lv:
|
||||
messageReceived = false
|
||||
// enqueued value; back to dequeing from source
|
||||
goto DEQUE_SOURCE
|
||||
case <-shutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
SOURCE_CLOSED:
|
||||
if messageReceived {
|
||||
select {
|
||||
case dst <- lv:
|
||||
case <-shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
close(dst)
|
||||
}()
|
||||
|
||||
return dst
|
||||
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"net"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
version "github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
@@ -258,3 +259,176 @@ func TestMaxUint64(t *testing.T) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDropButLastChannelDropsValues(t *testing.T) {
|
||||
sourceCh := make(chan bool)
|
||||
shutdownCh := make(chan struct{})
|
||||
defer close(shutdownCh)
|
||||
|
||||
dstCh := dropButLastChannel(sourceCh, shutdownCh)
|
||||
|
||||
// timeout duration for any channel propagation delay
|
||||
timeoutDuration := 5 * time.Millisecond
|
||||
|
||||
// test that dstCh doesn't emit anything initially
|
||||
select {
|
||||
case <-dstCh:
|
||||
require.Fail(t, "received a message unexpectedly")
|
||||
case <-time.After(timeoutDuration):
|
||||
// yay no message - it could have been a default: but
|
||||
// checking for goroutine effect
|
||||
}
|
||||
|
||||
sourceCh <- false
|
||||
select {
|
||||
case v := <-dstCh:
|
||||
require.False(t, v, "unexpected value from dstCh Ch")
|
||||
case <-time.After(timeoutDuration):
|
||||
require.Fail(t, "timed out waiting for source->dstCh propagation")
|
||||
}
|
||||
|
||||
// channel is drained now
|
||||
select {
|
||||
case v := <-dstCh:
|
||||
require.Failf(t, "received a message unexpectedly", "value: %v", v)
|
||||
case <-time.After(timeoutDuration):
|
||||
// yay no message - it could have been a default: but
|
||||
// checking for goroutine effect
|
||||
}
|
||||
|
||||
// now enqueue many messages and ensure only last one is received
|
||||
// enqueueing should be fast!
|
||||
sourceCh <- false
|
||||
sourceCh <- false
|
||||
sourceCh <- false
|
||||
sourceCh <- false
|
||||
sourceCh <- true
|
||||
|
||||
// I suspect that dstCh may contain a stale (i.e. `false`) value if golang executes
|
||||
// this select before the implementation goroutine dequeues last value.
|
||||
//
|
||||
// However, never got it to fail in test - so leaving it now to see if it ever fails;
|
||||
// and if/when test fails, we can learn of how much of an issue it is and adjust
|
||||
select {
|
||||
case v := <-dstCh:
|
||||
require.True(t, v, "unexpected value from dstCh Ch")
|
||||
case <-time.After(timeoutDuration):
|
||||
require.Fail(t, "timed out waiting for source->dstCh propagation")
|
||||
}
|
||||
|
||||
sourceCh <- true
|
||||
sourceCh <- true
|
||||
sourceCh <- true
|
||||
sourceCh <- true
|
||||
sourceCh <- true
|
||||
sourceCh <- false
|
||||
select {
|
||||
case v := <-dstCh:
|
||||
require.False(t, v, "unexpected value from dstCh Ch")
|
||||
case <-time.After(timeoutDuration):
|
||||
require.Fail(t, "timed out waiting for source->dstCh propagation")
|
||||
}
|
||||
}
|
||||
|
||||
// TestDropButLastChannel_DeliversMessages asserts that last
|
||||
// message is always delivered, some messages are dropped but never
|
||||
// introduce new messages.
|
||||
// On tight loop, receivers may get some intermediary messages.
|
||||
func TestDropButLastChannel_DeliversMessages(t *testing.T) {
|
||||
sourceCh := make(chan bool)
|
||||
shutdownCh := make(chan struct{})
|
||||
defer close(shutdownCh)
|
||||
|
||||
dstCh := dropButLastChannel(sourceCh, shutdownCh)
|
||||
|
||||
// timeout duration for any channel propagation delay
|
||||
timeoutDuration := 5 * time.Millisecond
|
||||
|
||||
sentMessages := 100
|
||||
go func() {
|
||||
for i := 0; i < sentMessages-1; i++ {
|
||||
sourceCh <- true
|
||||
}
|
||||
sourceCh <- false
|
||||
}()
|
||||
|
||||
receivedTrue, receivedFalse := 0, 0
|
||||
var lastReceived *bool
|
||||
|
||||
RECEIVE_LOOP:
|
||||
for {
|
||||
select {
|
||||
case v := <-dstCh:
|
||||
lastReceived = &v
|
||||
if v {
|
||||
receivedTrue++
|
||||
} else {
|
||||
receivedFalse++
|
||||
}
|
||||
|
||||
case <-time.After(timeoutDuration):
|
||||
break RECEIVE_LOOP
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("receiver got %v out %v true messages, and %v out of %v false messages",
|
||||
receivedTrue, sentMessages-1, receivedFalse, 1)
|
||||
|
||||
require.NotNil(t, lastReceived)
|
||||
require.False(t, *lastReceived)
|
||||
require.Equal(t, 1, receivedFalse)
|
||||
require.LessOrEqual(t, receivedTrue, sentMessages-1)
|
||||
}
|
||||
|
||||
// TestDropButLastChannel_DeliversMessages_Close asserts that last
|
||||
// message is always delivered, some messages are dropped but never
|
||||
// introduce new messages, even with a closed signal.
|
||||
func TestDropButLastChannel_DeliversMessages_Close(t *testing.T) {
|
||||
sourceCh := make(chan bool)
|
||||
shutdownCh := make(chan struct{})
|
||||
defer close(shutdownCh)
|
||||
|
||||
dstCh := dropButLastChannel(sourceCh, shutdownCh)
|
||||
|
||||
// timeout duration for any channel propagation delay
|
||||
timeoutDuration := 5 * time.Millisecond
|
||||
|
||||
sentMessages := 100
|
||||
go func() {
|
||||
for i := 0; i < sentMessages-1; i++ {
|
||||
sourceCh <- true
|
||||
}
|
||||
sourceCh <- false
|
||||
close(sourceCh)
|
||||
}()
|
||||
|
||||
receivedTrue, receivedFalse := 0, 0
|
||||
var lastReceived *bool
|
||||
|
||||
RECEIVE_LOOP:
|
||||
for {
|
||||
select {
|
||||
case v, ok := <-dstCh:
|
||||
if !ok {
|
||||
break RECEIVE_LOOP
|
||||
}
|
||||
lastReceived = &v
|
||||
if v {
|
||||
receivedTrue++
|
||||
} else {
|
||||
receivedFalse++
|
||||
}
|
||||
|
||||
case <-time.After(timeoutDuration):
|
||||
require.Fail(t, "timed out while waiting for messages")
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("receiver got %v out %v true messages, and %v out of %v false messages",
|
||||
receivedTrue, sentMessages-1, receivedFalse, 1)
|
||||
|
||||
require.NotNil(t, lastReceived)
|
||||
require.False(t, *lastReceived)
|
||||
require.Equal(t, 1, receivedFalse)
|
||||
require.LessOrEqual(t, receivedTrue, sentMessages-1)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user