mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
heartbeat: use leader's ACL token when failing heartbeat (#24241)
In #23838 we updated the `Node.Update` RPC handler we use for heartbeats to be more strict about requiring node secrets. But when a node goes down, it's the leader that sends the request to mark the node down via `Node.Update` (to itself), and this request was missing the leader ACL needed to authenticate to itself. Add the leader ACL to the request and update the RPC handler test for disconnected-clients to use ACLs, which would have detected this bug. Also added a note to the `Authenticate` comment about how that authentication path requires the leader ACL. Fixes: https://github.com/hashicorp/nomad/issues/24231 Ref: https://hashicorp.atlassian.net/browse/NET-11384
This commit is contained in:
3
.changelog/24241.txt
Normal file
3
.changelog/24241.txt
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
```release-note:bug
|
||||||
|
heartbeat: Fixed a bug where failed nodes would not be marked down
|
||||||
|
```
|
||||||
@@ -93,6 +93,10 @@ func NewAuthenticator(cfg *AuthenticatorConfig) *Authenticator {
|
|||||||
// an ephemeral ACLToken makes the original of the credential clear to RPC
|
// an ephemeral ACLToken makes the original of the credential clear to RPC
|
||||||
// handlers, who may have different behavior for internal vs external origins.
|
// handlers, who may have different behavior for internal vs external origins.
|
||||||
//
|
//
|
||||||
|
// Note: when making a server-to-server RPC that authenticates with this method,
|
||||||
|
// the RPC *must* include the leader's ACL token. Use AuthenticateServerOnly for
|
||||||
|
// requests that don't have access to the leader's ACL token.
|
||||||
|
//
|
||||||
// Note: when called on the follower we'll be making stale queries, so it's
|
// Note: when called on the follower we'll be making stale queries, so it's
|
||||||
// possible if the follower is behind that the leader will get a different value
|
// possible if the follower is behind that the leader will get a different value
|
||||||
// if an ACL token or allocation's WI has just been created.
|
// if an ACL token or allocation's WI has just been created.
|
||||||
|
|||||||
@@ -163,7 +163,8 @@ func (h *nodeHeartbeater) invalidateHeartbeat(id string) {
|
|||||||
Status: structs.NodeStatusDown,
|
Status: structs.NodeStatusDown,
|
||||||
NodeEvent: structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).SetMessage(NodeHeartbeatEventMissed),
|
NodeEvent: structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).SetMessage(NodeHeartbeatEventMissed),
|
||||||
WriteRequest: structs.WriteRequest{
|
WriteRequest: structs.WriteRequest{
|
||||||
Region: h.srv.config.Region,
|
Region: h.srv.config.Region,
|
||||||
|
AuthToken: h.srv.getLeaderAcl(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -952,7 +952,7 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
|
|||||||
// Setup server with tighter heartbeat so we don't have to wait so long
|
// Setup server with tighter heartbeat so we don't have to wait so long
|
||||||
// for nodes to go down.
|
// for nodes to go down.
|
||||||
heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond
|
heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond
|
||||||
s, cleanupS := TestServer(t, func(c *Config) {
|
s, rootToken, cleanupS := TestACLServer(t, func(c *Config) {
|
||||||
c.MinHeartbeatTTL = heartbeatTTL
|
c.MinHeartbeatTTL = heartbeatTTL
|
||||||
c.HeartbeatGrace = 2 * heartbeatTTL
|
c.HeartbeatGrace = 2 * heartbeatTTL
|
||||||
})
|
})
|
||||||
@@ -1001,7 +1001,8 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
|
|||||||
go heartbeat(heartbeatCtx)
|
go heartbeat(heartbeatCtx)
|
||||||
|
|
||||||
// Wait for node to be ready.
|
// Wait for node to be ready.
|
||||||
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
|
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
|
||||||
|
structs.NodeStatusReady, rootToken.SecretID)
|
||||||
|
|
||||||
// Register job with Disconnect.LostAfter
|
// Register job with Disconnect.LostAfter
|
||||||
job := version.jobSpec(time.Hour)
|
job := version.jobSpec(time.Hour)
|
||||||
@@ -1018,6 +1019,7 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
|
|||||||
WriteRequest: structs.WriteRequest{
|
WriteRequest: structs.WriteRequest{
|
||||||
Region: "global",
|
Region: "global",
|
||||||
Namespace: job.Namespace,
|
Namespace: job.Namespace,
|
||||||
|
AuthToken: rootToken.SecretID,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
var jobResp structs.JobRegisterResponse
|
var jobResp structs.JobRegisterResponse
|
||||||
@@ -1025,15 +1027,16 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
|
|||||||
must.NoError(t, err)
|
must.NoError(t, err)
|
||||||
|
|
||||||
// Wait for alloc to be pending in the server.
|
// Wait for alloc to be pending in the server.
|
||||||
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
|
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
|
||||||
structs.AllocClientStatusPending: 1,
|
structs.AllocClientStatusPending: 1,
|
||||||
})
|
}, rootToken.SecretID)
|
||||||
|
|
||||||
// Get allocs that node should run.
|
// Get allocs that node should run.
|
||||||
allocsReq := &structs.NodeSpecificRequest{
|
allocsReq := &structs.NodeSpecificRequest{
|
||||||
NodeID: node.ID,
|
NodeID: node.ID,
|
||||||
QueryOptions: structs.QueryOptions{
|
QueryOptions: structs.QueryOptions{
|
||||||
Region: "global",
|
Region: "global",
|
||||||
|
AuthToken: rootToken.SecretID,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
var allocsResp structs.NodeAllocsResponse
|
var allocsResp structs.NodeAllocsResponse
|
||||||
@@ -1058,17 +1061,18 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
|
|||||||
must.NoError(t, err)
|
must.NoError(t, err)
|
||||||
|
|
||||||
// Wait for alloc to be running in the server.
|
// Wait for alloc to be running in the server.
|
||||||
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
|
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
|
||||||
structs.AllocClientStatusRunning: 1,
|
structs.AllocClientStatusRunning: 1,
|
||||||
})
|
}, rootToken.SecretID)
|
||||||
|
|
||||||
// Stop heartbeat and wait for the client to be disconnected and the alloc
|
// Stop heartbeat and wait for the client to be disconnected and the alloc
|
||||||
// to be unknown.
|
// to be unknown.
|
||||||
cancelHeartbeat()
|
cancelHeartbeat()
|
||||||
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected)
|
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
|
||||||
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
|
structs.NodeStatusDisconnected, rootToken.SecretID)
|
||||||
|
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
|
||||||
structs.AllocClientStatusUnknown: 1,
|
structs.AllocClientStatusUnknown: 1,
|
||||||
})
|
}, rootToken.SecretID)
|
||||||
|
|
||||||
// Restart heartbeat to reconnect node.
|
// Restart heartbeat to reconnect node.
|
||||||
heartbeatCtx, cancelHeartbeat = context.WithCancel(context.Background())
|
heartbeatCtx, cancelHeartbeat = context.WithCancel(context.Background())
|
||||||
@@ -1081,7 +1085,8 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
|
|||||||
// allocs status with the server so the scheduler have the necessary
|
// allocs status with the server so the scheduler have the necessary
|
||||||
// information to avoid unnecessary placements.
|
// information to avoid unnecessary placements.
|
||||||
time.Sleep(3 * heartbeatTTL)
|
time.Sleep(3 * heartbeatTTL)
|
||||||
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit)
|
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
|
||||||
|
structs.NodeStatusInit, rootToken.SecretID)
|
||||||
|
|
||||||
// Get allocs that node should run.
|
// Get allocs that node should run.
|
||||||
// The node should only have one alloc assigned until it updates its allocs
|
// The node should only have one alloc assigned until it updates its allocs
|
||||||
@@ -1089,7 +1094,8 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
|
|||||||
allocsReq = &structs.NodeSpecificRequest{
|
allocsReq = &structs.NodeSpecificRequest{
|
||||||
NodeID: node.ID,
|
NodeID: node.ID,
|
||||||
QueryOptions: structs.QueryOptions{
|
QueryOptions: structs.QueryOptions{
|
||||||
Region: "global",
|
Region: "global",
|
||||||
|
AuthToken: rootToken.SecretID,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
|
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
|
||||||
@@ -1104,10 +1110,12 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
|
|||||||
// - client status is ready.
|
// - client status is ready.
|
||||||
// - only 1 alloc and the alloc is running.
|
// - only 1 alloc and the alloc is running.
|
||||||
// - all evals are terminal, so cluster is in a stable state.
|
// - all evals are terminal, so cluster is in a stable state.
|
||||||
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
|
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
|
||||||
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
|
structs.NodeStatusReady, rootToken.SecretID)
|
||||||
|
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
|
||||||
structs.AllocClientStatusRunning: 1,
|
structs.AllocClientStatusRunning: 1,
|
||||||
})
|
}, rootToken.SecretID)
|
||||||
|
|
||||||
testutil.WaitForResult(func() (bool, error) {
|
testutil.WaitForResult(func() (bool, error) {
|
||||||
state := s.fsm.State()
|
state := s.fsm.State()
|
||||||
ws := memdb.NewWatchSet()
|
ws := memdb.NewWatchSet()
|
||||||
|
|||||||
@@ -193,8 +193,15 @@ func WaitForClient(t testing.TB, rpc rpcFn, nodeID string, region string) {
|
|||||||
WaitForClientStatus(t, rpc, nodeID, region, structs.NodeStatusReady)
|
WaitForClientStatus(t, rpc, nodeID, region, structs.NodeStatusReady)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForClientStatus blocks until the client is in the expected status.
|
// WaitForClientStatus blocks until the client is in the expected status
|
||||||
func WaitForClientStatus(t testing.TB, rpc rpcFn, nodeID string, region string, status string) {
|
func WaitForClientStatus(t testing.TB, rpc rpcFn, nodeID, region, status string) {
|
||||||
|
t.Helper()
|
||||||
|
WaitForClientStatusWithToken(t, rpc, nodeID, region, status, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForClientStatusWithToken blocks until the client is in the expected
|
||||||
|
// status, for use with ACLs enabled
|
||||||
|
func WaitForClientStatusWithToken(t testing.TB, rpc rpcFn, nodeID, region, status, token string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
if region == "" {
|
if region == "" {
|
||||||
@@ -202,8 +209,11 @@ func WaitForClientStatus(t testing.TB, rpc rpcFn, nodeID string, region string,
|
|||||||
}
|
}
|
||||||
WaitForResult(func() (bool, error) {
|
WaitForResult(func() (bool, error) {
|
||||||
req := structs.NodeSpecificRequest{
|
req := structs.NodeSpecificRequest{
|
||||||
NodeID: nodeID,
|
NodeID: nodeID,
|
||||||
QueryOptions: structs.QueryOptions{Region: region},
|
QueryOptions: structs.QueryOptions{
|
||||||
|
Region: region,
|
||||||
|
AuthToken: token,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
var out structs.SingleNodeResponse
|
var out structs.SingleNodeResponse
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user