client: Allow operators to force a client to renew its identity. (#26277)

The Nomad client will have its identity renewed according to the
TTL which defaults to 24h. In certain situations such as root
keyring rotation, operators may want to force clients to renew
their identities before the TTL threshold is met. This change
introduces a client HTTP and RPC endpoint which will instruct the
node to request a new identity at its next heartbeat. This can be
used via the API or a new command.

While this is a manual intervention step on top of the any keyring
rotation, it dramatically reduces the initial feature complexity
as it provides an asynchronous and efficient method of renewal that
utilises existing functionality.
This commit is contained in:
James Rasell
2025-07-16 15:56:00 +02:00
committed by GitHub
parent 8096ea4129
commit 953a149180
16 changed files with 574 additions and 8 deletions

View File

@@ -339,6 +339,11 @@ type Client struct {
// the servers. This is used to authenticate the client to the servers when
// performing RPC calls.
identity atomic.Value
// identityForceRenewal is used to force the client to renew its identity
// at the next heartbeat. It is set by an operator calling the node identity
// renew RPC method.
identityForceRenewal atomic.Bool
}
var (
@@ -402,6 +407,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
EnterpriseClient: newEnterpriseClient(logger),
allocrunnerFactory: cfg.AllocRunnerFactory,
identity: atomic.Value{},
identityForceRenewal: atomic.Bool{},
}
// we can't have this set in the default Config because of import cycles
@@ -968,6 +974,10 @@ func (c *Client) nodeIdentityToken() string {
// processes with a new node identity token.
func (c *Client) setNodeIdentityToken(token string) {
// It's a bit of a simple log line, but it is useful to know when the client
// has renewed or set its node identity token.
c.logger.Info("setting node identity token")
// Store the token on the client as the first step, so it's available for
// use by all RPCs immediately.
c.identity.Store(token)
@@ -2204,6 +2214,14 @@ func (c *Client) updateNodeStatus() error {
AuthToken: c.nodeAuthToken(),
},
}
// Check if the client has been informed to force a renewal of its identity,
// and set the flag in the request if so.
if c.identityForceRenewal.Load() {
c.logger.Debug("forcing identity renewal")
req.ForceIdentityRenewal = true
}
var resp structs.NodeUpdateResponse
if err := c.RPC("Node.UpdateStatus", &req, &resp); err != nil {
c.triggerDiscovery()
@@ -2226,7 +2244,17 @@ func (c *Client) updateNodeStatus() error {
c.heartbeatLock.Unlock()
c.logger.Trace("next heartbeat", "period", resp.HeartbeatTTL)
if resp.Index != 0 {
// The Nomad server will return an index of greater than zero when a Raft
// update has occurred, indicating a change in the state of the persisted
// node object.
//
// This can be due to a Nomad server invalidating the node's heartbeat timer
// and marking the node as down. In this case, we want to log a warning for
// the operator to see the client missed a heartbeat. If the server
// responded with a new identity, we assume the client did not miss a
// heartbeat. If we did, this line would appear each time the identity was
// renewed, which could confuse cluster operators.
if resp.Index != 0 && resp.SignedIdentity == nil {
c.logger.Debug("state updated", "node_status", req.Status)
// We have potentially missed our TTL log how delayed we were
@@ -2276,6 +2304,10 @@ func (c *Client) handleNodeUpdateResponse(resp structs.NodeUpdateResponse) error
return fmt.Errorf("error saving client identity: %w", err)
}
c.setNodeIdentityToken(*resp.SignedIdentity)
// If the operator forced this renewal, reset the flag so that we don't
// keep renewing the identity on every heartbeat.
c.identityForceRenewal.Store(false)
}
// Convert []*NodeServerInfo to []*servers.Server

View File

@@ -0,0 +1,33 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package client
import (
"github.com/hashicorp/nomad/nomad/structs"
)
type NodeIdentity struct {
c *Client
}
func newNodeIdentityEndpoint(c *Client) *NodeIdentity {
n := &NodeIdentity{c: c}
return n
}
func (n *NodeIdentity) Renew(args *structs.NodeIdentityRenewReq, _ *structs.NodeIdentityRenewResp) error {
// Check node write permissions.
if aclObj, err := n.c.ResolveToken(args.AuthToken); err != nil {
return err
} else if !aclObj.AllowNodeWrite() {
return structs.ErrPermissionDenied
}
// Store the node identity renewal request on the client, so it can be
// picked up at the next heartbeat.
n.c.identityForceRenewal.Store(true)
return nil
}

View File

@@ -0,0 +1,103 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package client
import (
"testing"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
)
func TestNodeIdentity_Renew(t *testing.T) {
ci.Parallel(t)
// Create a test ACL server and client and perform our node identity renewal
// tests against it.
testACLServer, testServerToken, testACLServerCleanup := nomad.TestACLServer(t, nil)
t.Cleanup(func() { testACLServerCleanup() })
testutil.WaitForLeader(t, testACLServer.RPC)
testACLClient, testACLClientCleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{testACLServer.GetConfig().RPCAddr.String()}
})
t.Cleanup(func() { _ = testACLClientCleanup() })
testutil.WaitForClientStatusWithToken(
t, testACLServer.RPC, testACLClient.NodeID(), testACLClient.Region(),
structs.NodeStatusReady, testServerToken.SecretID,
)
t.Run("acl_denied", func(t *testing.T) {
must.ErrorContains(
t,
testACLClient.ClientRPC(
structs.NodeIdentityRenewRPCMethod,
&structs.NodeIdentityRenewReq{},
&structs.NodeIdentityRenewResp{},
),
structs.ErrPermissionDenied.Error(),
)
})
t.Run("acl_valid", func(t *testing.T) {
aclPolicy := mock.NodePolicy(acl.PolicyWrite)
aclToken := mock.CreatePolicyAndToken(t, testACLServer.State(), 10, t.Name(), aclPolicy)
req := structs.NodeIdentityRenewReq{
NodeID: testACLClient.NodeID(),
QueryOptions: structs.QueryOptions{
AuthToken: aclToken.SecretID,
},
}
must.NoError(
t,
testACLClient.ClientRPC(
structs.NodeIdentityRenewRPCMethod,
&req,
&structs.NodeIdentityRenewResp{},
),
)
renewalVal := testACLClient.identityForceRenewal.Load()
must.True(t, renewalVal)
})
// Create a test non-ACL server and client and perform our node identity
// renewal tests against it.
testServer, testServerCleanup := nomad.TestServer(t, nil)
t.Cleanup(func() { testServerCleanup() })
testutil.WaitForLeader(t, testServer.RPC)
testClient, testClientCleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{testServer.GetConfig().RPCAddr.String()}
})
t.Cleanup(func() { _ = testClientCleanup() })
testutil.WaitForClient(t, testServer.RPC, testClient.NodeID(), testClient.Region())
t.Run("non_acl_valid", func(t *testing.T) {
must.NoError(
t,
testClient.ClientRPC(
structs.NodeIdentityRenewRPCMethod,
&structs.NodeIdentityRenewReq{
NodeID: testClient.NodeID(),
QueryOptions: structs.QueryOptions{},
},
&structs.NodeIdentityRenewResp{},
),
)
renewalVal := testClient.identityForceRenewal.Load()
must.True(t, renewalVal)
})
}

View File

@@ -22,13 +22,14 @@ import (
// rpcEndpoints holds the RPC endpoints
type rpcEndpoints struct {
ClientStats *ClientStats
CSI *CSI
FileSystem *FileSystem
Allocations *Allocations
Agent *Agent
NodeMeta *NodeMeta
HostVolume *HostVolume
ClientStats *ClientStats
CSI *CSI
FileSystem *FileSystem
Allocations *Allocations
Agent *Agent
NodeIdentity *NodeIdentity
NodeMeta *NodeMeta
HostVolume *HostVolume
}
// ClientRPC is used to make a local, client only RPC call
@@ -301,6 +302,7 @@ func (c *Client) setupClientRpc(rpcs map[string]interface{}) {
c.endpoints.FileSystem = NewFileSystemEndpoint(c)
c.endpoints.Allocations = NewAllocationsEndpoint(c)
c.endpoints.Agent = NewAgentEndpoint(c)
c.endpoints.NodeIdentity = newNodeIdentityEndpoint(c)
c.endpoints.NodeMeta = newNodeMetaEndpoint(c)
c.endpoints.HostVolume = newHostVolumesEndpoint(c)
c.setupClientRpcServer(c.rpcServer)
@@ -317,6 +319,7 @@ func (c *Client) setupClientRpcServer(server *rpc.Server) {
server.Register(c.endpoints.FileSystem)
server.Register(c.endpoints.Allocations)
server.Register(c.endpoints.Agent)
_ = server.Register(c.endpoints.NodeIdentity)
server.Register(c.endpoints.NodeMeta)
server.Register(c.endpoints.HostVolume)
}