mirror of
https://github.com/kemko/nomad.git
synced 2026-01-03 08:55:43 +03:00
Merge pull request #3890 from hashicorp/b-heartbeat
Heartbeat improvements and handling failures during establishing leadership
This commit is contained in:
@@ -1171,6 +1171,7 @@ func (c *Client) registerNode() error {
|
||||
// Update the node status to ready after we register.
|
||||
c.configLock.Lock()
|
||||
node.Status = structs.NodeStatusReady
|
||||
c.config.Node.Status = structs.NodeStatusReady
|
||||
c.configLock.Unlock()
|
||||
|
||||
c.logger.Printf("[INFO] client: node registration complete")
|
||||
|
||||
@@ -443,6 +443,14 @@ func (w *deploymentWatcher) createEvalBatched(forIndex uint64) {
|
||||
w.outstandingBatch = true
|
||||
|
||||
time.AfterFunc(perJobEvalBatchPeriod, func() {
|
||||
// If the timer has been created and then we shutdown, we need to no-op
|
||||
// the evaluation creation.
|
||||
select {
|
||||
case <-w.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Create the eval
|
||||
evalCreateIndex, err := w.createEvaluation(w.getEval())
|
||||
if err != nil {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
@@ -9,6 +10,18 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// heartbeatNotLeader is the error string returned when the heartbeat request
|
||||
// couldn't be completed since the server is not the leader.
|
||||
heartbeatNotLeader = "failed to reset heartbeat since server is not leader"
|
||||
)
|
||||
|
||||
var (
|
||||
// heartbeatNotLeaderErr is the error returned when the heartbeat request
|
||||
// couldn't be completed since the server is not the leader.
|
||||
heartbeatNotLeaderErr = errors.New(heartbeatNotLeader)
|
||||
)
|
||||
|
||||
// initializeHeartbeatTimers is used when a leader is newly elected to create
|
||||
// a new map to track heartbeat expiration and to reset all the timers from
|
||||
// the previously known set of timers.
|
||||
@@ -50,6 +63,14 @@ func (s *Server) resetHeartbeatTimer(id string) (time.Duration, error) {
|
||||
s.heartbeatTimersLock.Lock()
|
||||
defer s.heartbeatTimersLock.Unlock()
|
||||
|
||||
// Do not create a timer for the node since we are not the leader. This
|
||||
// check avoids the race in which leadership is lost but a timer is created
|
||||
// on this server since it was servicing an RPC during a leadership loss.
|
||||
if !s.IsLeader() {
|
||||
s.logger.Printf("[DEBUG] nomad.heartbeat: ignoring resetting node %q TTL since this node is not the leader", id)
|
||||
return 0, heartbeatNotLeaderErr
|
||||
}
|
||||
|
||||
// Compute the target TTL value
|
||||
n := len(s.heartbeatTimers)
|
||||
ttl := lib.RateScaledInterval(s.config.MaxHeartbeatsPerSecond, s.config.MinHeartbeatTTL, n)
|
||||
@@ -89,6 +110,15 @@ func (s *Server) invalidateHeartbeat(id string) {
|
||||
s.heartbeatTimersLock.Lock()
|
||||
delete(s.heartbeatTimers, id)
|
||||
s.heartbeatTimersLock.Unlock()
|
||||
|
||||
// Do not invalidate the node since we are not the leader. This check avoids
|
||||
// the race in which leadership is lost but a timer is created on this
|
||||
// server since it was servicing an RPC during a leadership loss.
|
||||
if !s.IsLeader() {
|
||||
s.logger.Printf("[DEBUG] nomad.heartbeat: ignoring node %q TTL since this node is not the leader", id)
|
||||
return
|
||||
}
|
||||
|
||||
s.logger.Printf("[WARN] nomad.heartbeat: node '%s' TTL expired", id)
|
||||
|
||||
// Make a request to update the node status
|
||||
|
||||
@@ -10,9 +10,10 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestInitializeHeartbeatTimers(t *testing.T) {
|
||||
func TestHeartbeat_InitializeHeartbeatTimers(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
@@ -38,7 +39,7 @@ func TestInitializeHeartbeatTimers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestResetHeartbeatTimer(t *testing.T) {
|
||||
func TestHeartbeat_ResetHeartbeatTimer(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
@@ -60,7 +61,24 @@ func TestResetHeartbeatTimer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestResetHeartbeatTimerLocked(t *testing.T) {
|
||||
func TestHeartbeat_ResetHeartbeatTimer_Nonleader(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 3 // Won't become leader
|
||||
c.DevDisableBootstrap = true
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
|
||||
require.False(s1.IsLeader())
|
||||
|
||||
// Create a new timer
|
||||
_, err := s1.resetHeartbeatTimer("test")
|
||||
require.NotNil(err)
|
||||
require.EqualError(err, heartbeatNotLeader)
|
||||
}
|
||||
|
||||
func TestHeartbeat_ResetHeartbeatTimerLocked(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
@@ -81,7 +99,7 @@ func TestResetHeartbeatTimerLocked(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestResetHeartbeatTimerLocked_Renew(t *testing.T) {
|
||||
func TestHeartbeat_ResetHeartbeatTimerLocked_Renew(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
@@ -120,7 +138,7 @@ func TestResetHeartbeatTimerLocked_Renew(t *testing.T) {
|
||||
t.Fatalf("should have expired")
|
||||
}
|
||||
|
||||
func TestInvalidateHeartbeat(t *testing.T) {
|
||||
func TestHeartbeat_InvalidateHeartbeat(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
@@ -148,7 +166,7 @@ func TestInvalidateHeartbeat(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClearHeartbeatTimer(t *testing.T) {
|
||||
func TestHeartbeat_ClearHeartbeatTimer(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
@@ -168,7 +186,7 @@ func TestClearHeartbeatTimer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClearAllHeartbeatTimers(t *testing.T) {
|
||||
func TestHeartbeat_ClearAllHeartbeatTimers(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
@@ -190,7 +208,7 @@ func TestClearAllHeartbeatTimers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_HeartbeatTTL_Failover(t *testing.T) {
|
||||
func TestHeartbeat_Server_HeartbeatTTL_Failover(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
@@ -253,9 +271,11 @@ func TestServer_HeartbeatTTL_Failover(t *testing.T) {
|
||||
leader.Shutdown()
|
||||
|
||||
// heartbeatTimers should be cleared on leader shutdown
|
||||
if len(leader.heartbeatTimers) != 0 {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return len(leader.heartbeatTimers) == 0, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("heartbeat timers should be empty on the shutdown leader")
|
||||
}
|
||||
})
|
||||
|
||||
// Find the new leader
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
|
||||
@@ -107,8 +107,16 @@ RECONCILE:
|
||||
if !establishedLeader {
|
||||
if err := s.establishLeadership(stopCh); err != nil {
|
||||
s.logger.Printf("[ERR] nomad: failed to establish leadership: %v", err)
|
||||
|
||||
// Immediately revoke leadership since we didn't successfully
|
||||
// establish leadership.
|
||||
if err := s.revokeLeadership(); err != nil {
|
||||
s.logger.Printf("[ERR] nomad: failed to revoke leadership: %v", err)
|
||||
}
|
||||
|
||||
goto WAIT
|
||||
}
|
||||
|
||||
establishedLeader = true
|
||||
defer func() {
|
||||
if err := s.revokeLeadership(); err != nil {
|
||||
@@ -157,6 +165,8 @@ WAIT:
|
||||
// previously inflight transactions have been committed and that our
|
||||
// state is up-to-date.
|
||||
func (s *Server) establishLeadership(stopCh chan struct{}) error {
|
||||
defer metrics.MeasureSince([]string{"nomad", "leader", "establish_leadership"}, time.Now())
|
||||
|
||||
// Generate a leader ACL token. This will allow the leader to issue work
|
||||
// that requires a valid ACL token.
|
||||
s.setLeaderAcl(uuid.Generate())
|
||||
@@ -639,6 +649,8 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) {
|
||||
// revokeLeadership is invoked once we step down as leader.
|
||||
// This is used to cleanup any state that may be specific to a leader.
|
||||
func (s *Server) revokeLeadership() error {
|
||||
defer metrics.MeasureSince([]string{"nomad", "leader", "revoke_leadership"}, time.Now())
|
||||
|
||||
// Clear the leader token since we are no longer the leader.
|
||||
s.setLeaderAcl("")
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLeader_LeftServer(t *testing.T) {
|
||||
@@ -979,3 +980,19 @@ func TestLeader_RollRaftServer(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return s1.evalBroker.Enabled(), nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have finished establish leader loop")
|
||||
})
|
||||
|
||||
require.Nil(t, s1.revokeLeadership())
|
||||
require.Nil(t, s1.revokeLeadership())
|
||||
require.Nil(t, s1.revokeLeadership())
|
||||
}
|
||||
|
||||
20
vendor/github.com/hashicorp/consul/agent/consul/autopilot/autopilot.go
generated
vendored
20
vendor/github.com/hashicorp/consul/agent/consul/autopilot/autopilot.go
generated
vendored
@@ -38,8 +38,10 @@ type Autopilot struct {
|
||||
clusterHealth OperatorHealthReply
|
||||
clusterHealthLock sync.RWMutex
|
||||
|
||||
enabled bool
|
||||
removeDeadCh chan struct{}
|
||||
shutdownCh chan struct{}
|
||||
shutdownLock sync.Mutex
|
||||
waitGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
@@ -62,6 +64,14 @@ func NewAutopilot(logger *log.Logger, delegate Delegate, interval, healthInterva
|
||||
}
|
||||
|
||||
func (a *Autopilot) Start() {
|
||||
a.shutdownLock.Lock()
|
||||
defer a.shutdownLock.Unlock()
|
||||
|
||||
// Nothing to do
|
||||
if a.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
a.shutdownCh = make(chan struct{})
|
||||
a.waitGroup = sync.WaitGroup{}
|
||||
a.clusterHealth = OperatorHealthReply{}
|
||||
@@ -69,11 +79,21 @@ func (a *Autopilot) Start() {
|
||||
a.waitGroup.Add(2)
|
||||
go a.run()
|
||||
go a.serverHealthLoop()
|
||||
a.enabled = true
|
||||
}
|
||||
|
||||
func (a *Autopilot) Stop() {
|
||||
a.shutdownLock.Lock()
|
||||
defer a.shutdownLock.Unlock()
|
||||
|
||||
// Nothing to do
|
||||
if !a.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
close(a.shutdownCh)
|
||||
a.waitGroup.Wait()
|
||||
a.enabled = false
|
||||
}
|
||||
|
||||
// run periodically looks for nonvoting servers to promote and dead servers to remove.
|
||||
|
||||
2
vendor/vendor.json
vendored
2
vendor/vendor.json
vendored
@@ -123,7 +123,7 @@
|
||||
{"path":"github.com/hashicorp/consul-template/template","checksumSHA1":"N9qobVzScLbTEnGE7MgFnnTbGBw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
|
||||
{"path":"github.com/hashicorp/consul-template/version","checksumSHA1":"NB5+D4AuCNV9Bsqh3YFdPi4AJ6U=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
|
||||
{"path":"github.com/hashicorp/consul-template/watch","checksumSHA1":"b4+Y+02pY2Y5620F9ALzKg8Zmdw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
|
||||
{"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"/nyemJLkxBXKqI9xpLFyTyvOaYY=","revision":"bfeb09983befa337a3b2ebbafb7567913773e40b","revisionTime":"2018-01-23T20:52:17Z"},
|
||||
{"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"+I7fgoQlrnTUGW5krqNLadWwtjg=","revision":"d1ede2c93dec7b4580e37ef41d24371abab9d9e9","revisionTime":"2018-02-21T18:19:48Z"},
|
||||
{"path":"github.com/hashicorp/consul/api","checksumSHA1":"XLfcIX2qpRr0o26aFMjCOzvw6jo=","revision":"51ea240df8476e02215d53fbfad5838bf0d44d21","revisionTime":"2017-10-16T16:22:40Z"},
|
||||
{"path":"github.com/hashicorp/consul/command/flags","checksumSHA1":"XTQIYV+DPUVRKpVp0+y/78bWH3I=","revision":"d08ab9fd199434e5220276356ecf9617cfec1eb2","revisionTime":"2017-12-18T20:26:35Z"},
|
||||
{"path":"github.com/hashicorp/consul/lib","checksumSHA1":"HGljdtVaqi/e3DgIHymLRLfPYhw=","revision":"bcafded4e60982d0b71e730f0b8564d73cb1d715","revisionTime":"2017-10-31T16:39:15Z"},
|
||||
|
||||
Reference in New Issue
Block a user