From d06155e70169b5e8bf7fc050b15b9448621de941 Mon Sep 17 00:00:00 2001 From: DerekStrickland Date: Tue, 22 Feb 2022 06:13:14 -0500 Subject: [PATCH] NodeStatusDisconnected: support state transitions for new node status --- nomad/heartbeat.go | 22 ++++++++++++++++++++++ nomad/node_endpoint.go | 8 ++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/nomad/heartbeat.go b/nomad/heartbeat.go index 579d8d652..995ca21f2 100644 --- a/nomad/heartbeat.go +++ b/nomad/heartbeat.go @@ -161,12 +161,34 @@ func (h *nodeHeartbeater) invalidateHeartbeat(id string) { Region: h.config.Region, }, } + + if h.shouldDisconnect(id) { + req.Status = structs.NodeStatusDisconnected + } + var resp structs.NodeUpdateResponse if err := h.staticEndpoints.Node.UpdateStatus(&req, &resp); err != nil { h.logger.Error("update node status failed", "error", err) } } +func (h *nodeHeartbeater) shouldDisconnect(id string) bool { + allocs, err := h.State().AllocsByNode(nil, id) + if err != nil { + h.logger.Error("error retrieving allocs by node", "error", err) + return false + } + + now := time.Now().UTC() + for _, alloc := range allocs { + if alloc.DisconnectTimeout(now).After(now) { + return true + } + } + + return false +} + // clearHeartbeatTimer is used to clear the heartbeat time for // a single heartbeat. This is used when a heartbeat is destroyed // explicitly and no longer needed. diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 2511cadaa..810196a43 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -487,7 +487,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct // Check if we should trigger evaluations transitionToReady := transitionedToReady(args.Status, node.Status) - if structs.ShouldDrainNode(args.Status) || transitionToReady { + if structs.ShouldDrainNode(args.Status) || transitionToReady || args.Status == structs.NodeStatusDisconnected { evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) if err != nil { n.logger.Error("eval creation failed", "error", err) @@ -546,6 +546,9 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct } } + case structs.NodeStatusDisconnected: + n.logger.Trace(fmt.Sprintf("heartbeat reset skipped for disconnected node %q", args.NodeID)) + default: ttl, err := n.srv.resetHeartbeatTimer(args.NodeID) if err != nil { @@ -572,7 +575,8 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct func transitionedToReady(newStatus, oldStatus string) bool { initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady - return initToReady || terminalToReady + disconnectedToReady := oldStatus == structs.NodeStatusDisconnected && newStatus == structs.NodeStatusReady + return initToReady || terminalToReady || disconnectedToReady } // UpdateDrain is used to update the drain mode of a client node