mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
wait for service registration cleanup until allocs marked lost (#26424)
When a node misses a heartbeat and is marked down, Nomad deletes service registration instances for that node. But if the node then successfully heartbeats before its allocations are marked lost, the services are never restored. The node is unaware that it has missed a heartbeat and there's no anti-entropy on the node in any case. We already delete services when the plan applier marks allocations as stopped, so deleting the services when the node goes down is only an optimization to more quickly divert service traffic. But because the state after a plan apply is the "canonical" view of allocation health, this breaks correctness. Remove the code path that deletes services from nodes when nodes go down. Retain the state store code that deletes services when allocs are marked terminal by the plan applier. Also add a path in the state store to delete services when allocs are marked terminal by the client. This gets back some of the optimization but avoids the correctness bug because marking the allocation client-terminal is a one way operation. Fixes: https://github.com/hashicorp/nomad/issues/16983
This commit is contained in:
3
.changelog/26424.txt
Normal file
3
.changelog/26424.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:bug
|
||||
services: Fixed a bug where Nomad services were deleted if a node missed heartbeats and recovered before allocs were migrated
|
||||
```
|
||||
@@ -1525,62 +1525,6 @@ func TestClientEndpoint_UpdateStatus_HeartbeatOnly_Advertise(t *testing.T) {
|
||||
require.Equal(resp.Servers[0].RPCAdvertiseAddr, advAddr)
|
||||
}
|
||||
|
||||
func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
testServer, serverCleanup := TestServer(t, nil)
|
||||
defer serverCleanup()
|
||||
testutil.WaitForLeader(t, testServer.RPC)
|
||||
testutil.WaitForKeyring(t, testServer.RPC, testServer.config.Region)
|
||||
|
||||
// Create a node and upsert this into state.
|
||||
node := mock.Node()
|
||||
must.NoError(t, testServer.State().UpsertNode(structs.MsgTypeTestSetup, 10, node))
|
||||
|
||||
// Generate service registrations, ensuring the nodeID is set to the
|
||||
// generated node from above.
|
||||
services := mock.ServiceRegistrations()
|
||||
|
||||
for _, s := range services {
|
||||
s.NodeID = node.ID
|
||||
}
|
||||
|
||||
// Upsert the service registrations into state.
|
||||
must.NoError(t, testServer.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services))
|
||||
|
||||
// Check the service registrations are in state as we expect, so we can
|
||||
// have confidence in the rest of the test.
|
||||
ws := memdb.NewWatchSet()
|
||||
nodeRegs, err := testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID)
|
||||
must.NoError(t, err)
|
||||
must.Len(t, 2, nodeRegs)
|
||||
must.Eq(t, nodeRegs[0].NodeID, node.ID)
|
||||
must.Eq(t, nodeRegs[1].NodeID, node.ID)
|
||||
|
||||
// Generate and trigger a node down status update. This mimics what happens
|
||||
// when the node fails its heart-beating.
|
||||
args := structs.NodeUpdateStatusRequest{
|
||||
NodeID: node.ID,
|
||||
Status: structs.NodeStatusDown,
|
||||
WriteRequest: structs.WriteRequest{Region: "global", AuthToken: node.SecretID},
|
||||
}
|
||||
|
||||
var reply structs.NodeUpdateResponse
|
||||
|
||||
nodeEndpoint := NewNodeEndpoint(testServer, nil)
|
||||
must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply))
|
||||
|
||||
// Query our state, to ensure the node service registrations have been
|
||||
// removed.
|
||||
nodeRegs, err = testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID)
|
||||
must.NoError(t, err)
|
||||
must.Len(t, 0, nodeRegs)
|
||||
|
||||
// Re-send the status update, to ensure we get no error if service
|
||||
// registrations have already been removed
|
||||
must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply))
|
||||
}
|
||||
|
||||
func TestNode_UpdateStatus_Identity(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
|
||||
@@ -1163,11 +1163,6 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, req *structs.NodeUpdateStatus
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
// Deregister any services on the node in the same transaction
|
||||
if copyNode.Status == structs.NodeStatusDown {
|
||||
s.deleteServiceRegistrationByNodeIDTxn(txn, txn.Index, copyNode.ID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -4077,10 +4072,8 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *
|
||||
return fmt.Errorf("setting job status failed: %v", err)
|
||||
}
|
||||
|
||||
if copyAlloc.ClientTerminalStatus() {
|
||||
if err := s.deleteServiceRegistrationByAllocIDTxn(txn, index, copyAlloc.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.deregisterServicesForTerminalAllocs(txn, index, copyAlloc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -4296,6 +4289,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.deregisterServicesForTerminalAllocs(txn, index, alloc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := txn.Insert("allocs", alloc); err != nil {
|
||||
return fmt.Errorf("alloc insert failed: %v", err)
|
||||
}
|
||||
|
||||
@@ -185,6 +185,20 @@ func (s *StateStore) deleteServiceRegistrationByAllocIDTxn(
|
||||
return nil
|
||||
}
|
||||
|
||||
// deregisterServicesForTerminalAllocs deletes service registration instances
|
||||
// for allocations that have been marked client-terminal. This allows us to
|
||||
// remove services for an alloc even if the client hooks fail or the node goes
|
||||
// down.
|
||||
func (s *StateStore) deregisterServicesForTerminalAllocs(
|
||||
txn *txn, index uint64, alloc *structs.Allocation) error {
|
||||
|
||||
if !alloc.ClientTerminalStatus() {
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.deleteServiceRegistrationByAllocIDTxn(txn, index, alloc.ID)
|
||||
}
|
||||
|
||||
// GetServiceRegistrations returns an iterator that contains all service
|
||||
// registrations stored within state. This is primarily useful when performing
|
||||
// listings which use the namespace wildcard operator. The caller is
|
||||
|
||||
@@ -639,3 +639,69 @@ func TestStateStore_GetServiceRegistrationsByNodeID(t *testing.T) {
|
||||
must.NoError(t, err)
|
||||
must.Len(t, 1, serviceRegs)
|
||||
}
|
||||
|
||||
func TestAlloc_ServiceRegistrationLifecycle(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
store := testStateStore(t)
|
||||
index, _ := store.LatestIndex()
|
||||
|
||||
alloc0 := mock.Alloc()
|
||||
alloc1 := mock.Alloc()
|
||||
|
||||
services := mock.ServiceRegistrations()
|
||||
services[0].AllocID = alloc0.ID
|
||||
services[1].AllocID = alloc1.ID
|
||||
|
||||
node := mock.Node()
|
||||
node.ID = services[0].NodeID
|
||||
alloc0.NodeID = node.ID
|
||||
|
||||
index++
|
||||
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node))
|
||||
|
||||
index++
|
||||
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index,
|
||||
[]*structs.Allocation{alloc0, alloc1}))
|
||||
|
||||
index++
|
||||
must.NoError(t, store.UpsertServiceRegistrations(
|
||||
structs.MsgTypeTestSetup, index, services))
|
||||
|
||||
// node gets marked lost, but this doesn't delete services
|
||||
node = node.Copy()
|
||||
node.Status = structs.NodeStatusDown
|
||||
services, err := store.GetServiceRegistrationsByNodeID(nil, node.ID)
|
||||
must.NoError(t, err)
|
||||
must.Len(t, 1, services)
|
||||
|
||||
// client marks alloc complete, so we clear the service
|
||||
alloc0 = alloc0.Copy()
|
||||
alloc0.ClientStatus = structs.AllocClientStatusComplete
|
||||
index++
|
||||
must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index,
|
||||
[]*structs.Allocation{alloc0}))
|
||||
|
||||
iter, err := store.GetServiceRegistrationsByAllocID(nil, alloc0.ID)
|
||||
must.NoError(t, err)
|
||||
must.Nil(t, iter.Next())
|
||||
|
||||
// scheduler/plan marks alloc lost, so we clear the service
|
||||
planAlloc := new(structs.Allocation)
|
||||
*planAlloc = *alloc1
|
||||
planAlloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
planAlloc.ClientStatus = structs.AllocClientStatusLost
|
||||
diff := structs.AllocationDiff(*planAlloc)
|
||||
|
||||
index++
|
||||
must.NoError(t, store.UpsertPlanResults(structs.MsgTypeTestSetup, index,
|
||||
&structs.ApplyPlanResultsRequest{
|
||||
AllocUpdateRequest: structs.AllocUpdateRequest{
|
||||
AllocsStopped: []*structs.AllocationDiff{&diff},
|
||||
},
|
||||
}))
|
||||
|
||||
iter, err = store.GetServiceRegistrationsByAllocID(nil, alloc1.ID)
|
||||
must.NoError(t, err)
|
||||
must.Nil(t, iter.Next())
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user