From 3f6ebb3d5af21d3cee20cd01f10a0476eab41d11 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 22 Aug 2015 17:17:13 -0700 Subject: [PATCH] nomad: no heartbeat for nodes in terminal status --- nomad/heartbeat.go | 167 +++++++++++++++++++++++++++++++++++++++ nomad/leader.go | 21 +++++ nomad/server.go | 8 ++ nomad/structs/structs.go | 11 +++ 4 files changed, 207 insertions(+) create mode 100644 nomad/heartbeat.go diff --git a/nomad/heartbeat.go b/nomad/heartbeat.go new file mode 100644 index 000000000..60a2c7076 --- /dev/null +++ b/nomad/heartbeat.go @@ -0,0 +1,167 @@ +package nomad + +import ( + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // defaultHeartbeatTTL is the TTL value used for heartbeats + // when they are first initialized. This should be longer than + // the usual TTL since clients are switching to a new leader. + defaultHeartbeatTTL = 120 * time.Second + + // minHeartbeatTTL is the minimum heartbeat interval. + minHeartbeatTTL = 15 * time.Second + + // maxHeartbeatsPerSecond is the targeted maximum rate of heartbeats. + // As the cluster size grows, we simply increase the heartbeat TTL + // to approach this value. + maxHeartbeatsPerSecond = 50.0 +) + +// initializeHeartbeatTimers is used when a leader is newly elected to create +// a new map to track heartbeat expiration and to reset all the timers from +// the previously known set of timers. +func (s *Server) initializeHeartbeatTimers() error { + // Scan all nodes and reset their timer + snap, err := s.fsm.State().Snapshot() + if err != nil { + return err + } + + // Get an iterator over nodes + iter, err := snap.Nodes() + if err != nil { + return err + } + + s.heartbeatTimersLock.Lock() + defer s.heartbeatTimersLock.Unlock() + + // Handle each node + for { + raw := iter.Next() + if raw == nil { + break + } + node := raw.(*structs.Node) + if node.TerminalStatus() { + continue + } + s.resetHeartbeatTimerLocked(node.ID, defaultHeartbeatTTL) + } + return nil +} + +// resetHeartbeatTimer is used to reset the TTL of a heartbeat. +// This can be used for new heartbeats and existing ones. +func (s *Server) resetHeartbeatTimer(id string) (time.Duration, error) { + s.heartbeatTimersLock.Lock() + defer s.heartbeatTimersLock.Unlock() + + // Compute the target TTL value + n := len(s.heartbeatTimers) + ttl := rateScaledInterval(maxHeartbeatsPerSecond, minHeartbeatTTL, n) + + // Reset the TTL + s.resetHeartbeatTimerLocked(id, ttl) + return ttl, nil +} + +// resetHeartbeatTimerLocked is used to reset a heartbeat timer +// assuming the heartbeatTimerLock is already held +func (s *Server) resetHeartbeatTimerLocked(id string, ttl time.Duration) { + // Ensure a timer map exists + if s.heartbeatTimers == nil { + s.heartbeatTimers = make(map[string]*time.Timer) + } + + // Adjust the given TTL by adding an additional 10% grace period. + // This is to compensate for network and processing delays. + // The contract is that a heartbeat is not expired before the TTL, + // but there is no explicit promise about the upper bound so this is allowable. + ttl = ttl + (ttl / 10) + + // Renew the heartbeat timer if it exists + if timer, ok := s.heartbeatTimers[id]; ok { + timer.Reset(ttl) + return + } + + // Create a new timer to track expiration of thi sheartbeat + timer := time.AfterFunc(ttl, func() { + s.invalidateHeartbeat(id) + }) + s.heartbeatTimers[id] = timer +} + +// invalidateHeartbeat is invoked when a heartbeat TTL is reached and we +// need to invalidate the heartbeat. +func (s *Server) invalidateHeartbeat(id string) { + defer metrics.MeasureSince([]string{"nomad", "heartbeat", "invalidate"}, time.Now()) + // Clear the heartbeat timer + s.heartbeatTimersLock.Lock() + delete(s.heartbeatTimers, id) + s.heartbeatTimersLock.Unlock() + s.logger.Printf("[DEBUG] nomad.heartbeat: node '%s' TTL expired", id) + + // Make a request to update the node status + req := structs.NodeUpdateStatusRequest{ + NodeID: id, + Status: structs.NodeStatusDown, + WriteRequest: structs.WriteRequest{ + Region: s.config.Region, + }, + } + var resp structs.NodeUpdateResponse + if err := s.endpoints.Client.UpdateStatus(&req, &resp); err != nil { + s.logger.Printf("[ERR] nomad.heartbeat: update status failed: %v", err) + } +} + +// clearHeartbeatTimer is used to clear the heartbeat time for +// a single heartbeat. This is used when a heartbeat is destroyed +// explicitly and no longer needed. +func (s *Server) clearHeartbeatTimer(id string) error { + s.heartbeatTimersLock.Lock() + defer s.heartbeatTimersLock.Unlock() + + if timer, ok := s.heartbeatTimers[id]; ok { + timer.Stop() + delete(s.heartbeatTimers, id) + } + return nil +} + +// clearAllHeartbeatTimers is used when a leader is stepping +// down and we no longer need to track any heartbeat timers. +func (s *Server) clearAllHeartbeatTimers() error { + s.heartbeatTimersLock.Lock() + defer s.heartbeatTimersLock.Unlock() + + for _, t := range s.heartbeatTimers { + t.Stop() + } + s.heartbeatTimers = nil + return nil +} + +// heartbeatStats is a long running routine used to capture +// the number of active heartbeats being tracked +func (s *Server) heartbeatStats() { + for { + select { + case <-time.After(5 * time.Second): + s.heartbeatTimersLock.Lock() + num := len(s.heartbeatTimers) + s.heartbeatTimersLock.Unlock() + metrics.SetGauge([]string{"nomad", "heartbeat", "active"}, float32(num)) + + case <-s.shutdownCh: + return + } + } +} diff --git a/nomad/leader.go b/nomad/leader.go index 84ed0e0cf..55e0dd048 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -117,6 +117,20 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Reap any failed evaluations go s.reapFailedEvaluations(stopCh) + + // Setup the heartbeat timers. This is done both when starting up or when + // a leader fail over happens. Since the timers are maintained by the leader + // node, effectively this means all the timers are renewed at the time of failover. + // The TTL contract is that the session will not be expired before the TTL, + // so expiring it later is allowable. + // + // This MUST be done after the initial barrier to ensure the latest Nodes + // are available to be initialized. Otherwise initialization may use stale + // data. + if err := s.initializeHeartbeatTimers(); err != nil { + s.logger.Printf("[ERR] nomad: heartbeat timer setup failed: %v", err) + return err + } return nil } @@ -222,6 +236,13 @@ func (s *Server) revokeLeadership() error { // Disable the eval broker, since it is only useful as a leader s.evalBroker.SetEnabled(false) + + // Clear the heartbeat timers on either shutdown or step down, + // since we are no longer responsible for TTL expirations. + if err := s.clearAllHeartbeatTimers(); err != nil { + s.logger.Printf("[ERR] nomad: clearing heartbeat timers failed: %v", err) + return err + } return nil } diff --git a/nomad/server.go b/nomad/server.go index cef105615..769ceed7d 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -110,6 +110,11 @@ type Server struct { // plans that are waiting to be assessed by the leader planQueue *PlanQueue + // heartbeatTimers track the expiration time of each heartbeat that has + // a TTL. On expiration, the node status is updated to be 'down'. + heartbeatTimers map[string]*time.Timer + heartbeatTimersLock sync.Mutex + left bool shutdown bool shutdownCh chan struct{} @@ -204,6 +209,9 @@ func NewServer(config *Config) (*Server, error) { // Emit metrics for the plan queue go planQueue.EmitStats(time.Second, s.shutdownCh) + // Emit metrics + go s.heartbeatStats() + // Done return s, nil } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b92999408..18f4fc243 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -390,6 +390,17 @@ type Node struct { ModifyIndex uint64 } +// TerminalStatus returns if the current status is terminal and +// will no longer transition. +func (n *Node) TerminalStatus() bool { + switch n.Status { + case NodeStatusDown: + return true + default: + return false + } +} + // Resources is used to define the resources available // on a client type Resources struct {