diff --git a/nomad/client_endpoint.go b/nomad/client_endpoint.go index 229a7ec31..1860b49cd 100644 --- a/nomad/client_endpoint.go +++ b/nomad/client_endpoint.go @@ -61,6 +61,16 @@ func (c *ClientEndpoint) Register(args *structs.NodeRegisterRequest, reply *stru reply.EvalCreateIndex = evalIndex } + // Check if we need to setup a heartbeat + if !args.Node.TerminalStatus() { + ttl, err := c.srv.resetHeartbeatTimer(args.Node.ID) + if err != nil { + c.srv.logger.Printf("[ERR] nomad.client: heartbeat reset failed: %v", err) + return err + } + reply.HeartbeatTTL = ttl + } + // Set the reply index reply.Index = index return nil @@ -86,6 +96,9 @@ func (c *ClientEndpoint) Deregister(args *structs.NodeDeregisterRequest, reply * return err } + // Clear the heartbeat timer if any + c.srv.clearHeartbeatTimer(args.NodeID) + // Create the evaluations for this node evalIDs, evalIndex, err := c.createNodeEvals(args.NodeID, index) if err != nil { diff --git a/nomad/heartbeat.go b/nomad/heartbeat.go index 60a2c7076..3ecd37c4c 100644 --- a/nomad/heartbeat.go +++ b/nomad/heartbeat.go @@ -11,7 +11,7 @@ const ( // defaultHeartbeatTTL is the TTL value used for heartbeats // when they are first initialized. This should be longer than // the usual TTL since clients are switching to a new leader. - defaultHeartbeatTTL = 120 * time.Second + defaultHeartbeatTTL = 300 * time.Second // minHeartbeatTTL is the minimum heartbeat interval. minHeartbeatTTL = 15 * time.Second diff --git a/nomad/heartbeat_test.go b/nomad/heartbeat_test.go new file mode 100644 index 000000000..c6086a9a0 --- /dev/null +++ b/nomad/heartbeat_test.go @@ -0,0 +1,271 @@ +package nomad + +import ( + "fmt" + "testing" + "time" + + "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" +) + +func TestInitializeHeartbeatTimers(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + node := mock.Node() + state := s1.fsm.State() + err := state.RegisterNode(1, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Reset the heartbeat timers + err = s1.initializeHeartbeatTimers() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check that we have a timer + _, ok := s1.heartbeatTimers[node.ID] + if !ok { + t.Fatalf("missing heartbeat timer") + } +} + +func TestResetHeartbeatTimer(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create a new timer + ttl, err := s1.resetHeartbeatTimer("test") + if err != nil { + t.Fatalf("err: %v", err) + } + if ttl != minHeartbeatTTL { + t.Fatalf("bad: %#v", ttl) + } + + // Check that we have a timer + _, ok := s1.heartbeatTimers["test"] + if !ok { + t.Fatalf("missing heartbeat timer") + } +} + +func TestResetHeartbeatTimerLocked(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + s1.heartbeatTimersLock.Lock() + s1.resetHeartbeatTimerLocked("foo", 5*time.Millisecond) + s1.heartbeatTimersLock.Unlock() + + if _, ok := s1.heartbeatTimers["foo"]; !ok { + t.Fatalf("missing timer") + } + + time.Sleep(10 * time.Millisecond) + + if _, ok := s1.heartbeatTimers["foo"]; ok { + t.Fatalf("timer should be gone") + } +} + +func TestResetHeartbeatTimerLocked_Renew(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + s1.heartbeatTimersLock.Lock() + s1.resetHeartbeatTimerLocked("foo", 5*time.Millisecond) + s1.heartbeatTimersLock.Unlock() + + if _, ok := s1.heartbeatTimers["foo"]; !ok { + t.Fatalf("missing timer") + } + + time.Sleep(2 * time.Millisecond) + + // Renew the heartbeat + s1.heartbeatTimersLock.Lock() + s1.resetHeartbeatTimerLocked("foo", 5*time.Millisecond) + s1.heartbeatTimersLock.Unlock() + renew := time.Now() + + // Watch for invalidation + for time.Now().Sub(renew) < 20*time.Millisecond { + s1.heartbeatTimersLock.Lock() + _, ok := s1.heartbeatTimers["foo"] + s1.heartbeatTimersLock.Unlock() + if !ok { + end := time.Now() + if diff := end.Sub(renew); diff < 5*time.Millisecond { + t.Fatalf("early invalidate %v", diff) + } + return + } + time.Sleep(time.Millisecond) + } + t.Fatalf("should have expired") +} + +func TestInvalidateHeartbeat(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create a node + node := mock.Node() + state := s1.fsm.State() + err := state.RegisterNode(1, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + // This should cause a status update + s1.invalidateHeartbeat(node.ID) + + // Check it is updated + out, err := state.GetNodeByID(node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if !out.TerminalStatus() { + t.Fatalf("should update node: %#v", out) + } +} + +func TestClearHeartbeatTimer(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + s1.heartbeatTimersLock.Lock() + s1.resetHeartbeatTimerLocked("foo", 5*time.Millisecond) + s1.heartbeatTimersLock.Unlock() + + err := s1.clearHeartbeatTimer("foo") + if err != nil { + t.Fatalf("err: %v", err) + } + + if _, ok := s1.heartbeatTimers["foo"]; ok { + t.Fatalf("timer should be gone") + } +} + +func TestClearAllHeartbeatTimers(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + s1.heartbeatTimersLock.Lock() + s1.resetHeartbeatTimerLocked("foo", 10*time.Millisecond) + s1.resetHeartbeatTimerLocked("bar", 10*time.Millisecond) + s1.resetHeartbeatTimerLocked("baz", 10*time.Millisecond) + s1.heartbeatTimersLock.Unlock() + + err := s1.clearAllHeartbeatTimers() + if err != nil { + t.Fatalf("err: %v", err) + } + + if len(s1.heartbeatTimers) != 0 { + t.Fatalf("timers should be gone") + } +} + +func TestServer_HeartbeatTTL_Failover(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + + s2 := testServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + + s3 := testServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s3.Shutdown() + servers := []*Server{s1, s2, s3} + testJoin(t, s1, s2, s3) + + testutil.WaitForResult(func() (bool, error) { + peers, _ := s1.raftPeers.Peers() + return len(peers) == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + + // Find the leader + var leader *Server + for _, s := range servers { + // Check that s.heartbeatTimers is empty + if len(s.heartbeatTimers) != 0 { + t.Fatalf("should have no heartbeatTimers") + } + // Find the leader too + if s.IsLeader() { + leader = s + } + } + if leader == nil { + t.Fatalf("Should have a leader") + } + codec := rpcClient(t, leader) + + // Create the register request + node := mock.Node() + req := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Client.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Check that heartbeatTimers has the heartbeat ID + if _, ok := leader.heartbeatTimers[node.ID]; !ok { + t.Fatalf("missing heartbeat timer") + } + + // Shutdown the leader! + leader.Shutdown() + + // heartbeatTimers should be cleared on leader shutdown + if len(leader.heartbeatTimers) != 0 { + t.Fatalf("heartbeat timers should be empty on the shutdown leader") + } + + // Find the new leader + testutil.WaitForResult(func() (bool, error) { + leader = nil + for _, s := range servers { + if s.IsLeader() { + leader = s + } + } + if leader == nil { + return false, fmt.Errorf("Should have a new leader") + } + + // Ensure heartbeat timer is restored + if _, ok := leader.heartbeatTimers[node.ID]; !ok { + return false, fmt.Errorf("missing heartbeat timer") + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 18f4fc243..e0df1b337 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -267,6 +267,7 @@ type JobDeregisterResponse struct { // NodeUpdateResponse is used to respond to a node update type NodeUpdateResponse struct { + HeartbeatTTL time.Duration EvalIDs []string EvalCreateIndex uint64 NodeModifyIndex uint64