client: Handle identities from servers and use for RPC auth. (#26218)

Nomad servers, if upgraded, can return node identities as part of
the register and update/heartbeat response objects. The Nomad
client will now handle this and store it as appropriate within its
memory and statedb.

The client will now use any stored identity for RPC authentication
with a fallback to the secretID. This supports upgrades paths where
the Nomad clients are updated before the Nomad servers.
This commit is contained in:
James Rasell
2025-07-14 15:24:43 +02:00
committed by GitHub
parent 2f30205102
commit 8096ea4129
8 changed files with 312 additions and 25 deletions

View File

@@ -15,6 +15,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
consulapi "github.com/hashicorp/consul/api"
@@ -333,6 +334,11 @@ type Client struct {
// users is a pool of dynamic workload users
users dynamic.Pool
// identity is the node identity token that has been generated and signed by
// the servers. This is used to authenticate the client to the servers when
// performing RPC calls.
identity atomic.Value
}
var (
@@ -395,6 +401,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
getter: getter.New(cfg.Artifact, logger),
EnterpriseClient: newEnterpriseClient(logger),
allocrunnerFactory: cfg.AllocRunnerFactory,
identity: atomic.Value{},
}
// we can't have this set in the default Config because of import cycles
@@ -604,6 +611,23 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
logger.Warn("batch fingerprint operation timed out; proceeding to register with fingerprinted plugins so far")
}
// Attempt to pull the node identity from the state database. If the client
// is starting for the first time, this will be empty, so avoid an
// unnecessary set call to the client atomic. This needs to happen before we
// start heartbeating to avoid unnecessary identity generation and load on
// the Nomad servers.
//
// If the DB returns an error, it is more than likely that the full
// restoration will fail. It isn't terminal for us at this point though, as
// we can generate a new identity on registration.
clientIdentity, err := c.stateDB.GetNodeIdentity()
if err != nil {
logger.Error("failed to get client identity from state", "error", err)
}
if clientIdentity != "" {
c.setNodeIdentityToken(clientIdentity)
}
// Register and then start heartbeating to the servers.
c.shutdownGroup.Go(c.registerAndHeartbeat)
@@ -903,11 +927,57 @@ func (c *Client) NodeID() string {
return c.GetConfig().Node.ID
}
// secretNodeID returns the secret node ID for the given client
// secretNodeID returns the secret node ID for the given client. This is no
// longer used as the primary authentication method for Nomad clients. In fully
// upgraded clusters, the node identity token is used instead. It will still be
// used if the client has been upgraded, but the Nomad server has not. Most
// callers should use the nodeAuthToken function instead of this as it correctly
// handles both authentication token methods. There are some limited places
// where the secret node ID is still used on the RPC request object such as
// "Node.GetClientAllocs".
func (c *Client) secretNodeID() string {
return c.GetConfig().Node.SecretID
}
// nodeAuthToken will return the authentication token for the client. This will
// return the node identity token if it is set, otherwise it will return the
// secret node ID.
//
// The callers of this should be moved to nodeIdentityToken in Nomad 1.13 when
// all clients should be using the node identity token.
func (c *Client) nodeAuthToken() string {
if nID := c.nodeIdentityToken(); nID != "" {
return nID
}
return c.secretNodeID()
}
// nodeIdentityToken returns the node identity token for the given client. If
// the client is coming up for the first time, restarting, or is in a cluster
// where the Nomad servers have not been upgraded to support the node identity,
// this will be empty. Callers should use the nodeAuthToken function instead of
// this as it correctly handles both authentication token methods.
func (c *Client) nodeIdentityToken() string {
if v := c.identity.Load(); v != nil {
return v.(string)
}
return ""
}
// setNodeIdentityToken handles storing and updating all the client backend
// processes with a new node identity token.
func (c *Client) setNodeIdentityToken(token string) {
// Store the token on the client as the first step, so it's available for
// use by all RPCs immediately.
c.identity.Store(token)
// Update the Nomad service registration handler and workload identity
// signer processes.
assertAndSetNodeIdentityToken(c.nomadService, token)
assertAndSetNodeIdentityToken(c.widsigner, token)
}
// Shutdown is used to tear down the client
func (c *Client) Shutdown() error {
c.shutdownLock.Lock()
@@ -1954,7 +2024,7 @@ func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error {
NodeEvents: nodeEvents,
WriteRequest: structs.WriteRequest{
Region: c.Region(),
AuthToken: c.secretNodeID(),
AuthToken: c.nodeAuthToken(),
},
}
var resp structs.EmitNodeEventsResponse
@@ -2053,7 +2123,7 @@ func (c *Client) getRegistrationToken() string {
select {
case <-c.registeredCh:
return c.secretNodeID()
return c.nodeAuthToken()
default:
// If we haven't yet closed the registeredCh we're either starting for
// the 1st time or we've just restarted. Check the local state to see if
@@ -2065,7 +2135,7 @@ func (c *Client) getRegistrationToken() string {
}
if registration != nil && registration.HasRegistered {
c.registeredOnce.Do(func() { close(c.registeredCh) })
return c.secretNodeID()
return c.nodeAuthToken()
}
}
return ""
@@ -2086,6 +2156,11 @@ func (c *Client) registerNode(authToken string) error {
return err
}
//
if err := c.handleNodeUpdateResponse(resp); err != nil {
return err
}
// Signal that we've registered once so that RPCs sent from the client can
// send authenticated requests. Persist this information in the state so
// that we don't block restoring running allocs when restarting while
@@ -2100,11 +2175,6 @@ func (c *Client) registerNode(authToken string) error {
close(c.registeredCh)
})
err := c.handleNodeUpdateResponse(resp)
if err != nil {
return err
}
// Update the node status to ready after we register.
c.UpdateConfig(func(c *config.Config) {
c.Node.Status = structs.NodeStatusReady
@@ -2131,7 +2201,7 @@ func (c *Client) updateNodeStatus() error {
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{
Region: c.Region(),
AuthToken: c.secretNodeID(),
AuthToken: c.nodeAuthToken(),
},
}
var resp structs.NodeUpdateResponse
@@ -2175,9 +2245,8 @@ func (c *Client) updateNodeStatus() error {
}
})
err := c.handleNodeUpdateResponse(resp)
if err != nil {
return fmt.Errorf("heartbeat response returned no valid servers")
if err := c.handleNodeUpdateResponse(resp); err != nil {
return fmt.Errorf("failed to handle node update response: %w", err)
}
// If there's no Leader in the response we may be talking to a partitioned
@@ -2195,6 +2264,20 @@ func (c *Client) handleNodeUpdateResponse(resp structs.NodeUpdateResponse) error
// rebalance rate.
c.servers.SetNumNodes(resp.NumNodes)
// If the response includes a new identity, set it and save it to the state
// DB.
//
// In the unlikely event that we cannot write the identity to the state DB,
// we do not want to set the client identity token. That would mean the
// client memory state and persistent state DB are out of sync. Instead, we
// return an error and wait until the next heartbeat to try again.
if resp.SignedIdentity != nil {
if err := c.stateDB.PutNodeIdentity(*resp.SignedIdentity); err != nil {
return fmt.Errorf("error saving client identity: %w", err)
}
c.setNodeIdentityToken(*resp.SignedIdentity)
}
// Convert []*NodeServerInfo to []*servers.Server
nomadServers := make([]*servers.Server, 0, len(resp.Servers))
for _, s := range resp.Servers {
@@ -2277,7 +2360,7 @@ func (c *Client) allocSync() {
Alloc: toSync,
WriteRequest: structs.WriteRequest{
Region: c.Region(),
AuthToken: c.secretNodeID(),
AuthToken: c.nodeAuthToken(),
},
}
@@ -2336,6 +2419,27 @@ type allocUpdates struct {
// watchAllocations is used to scan for updates to allocations
func (c *Client) watchAllocations(updates chan *allocUpdates) {
// The request object is generated as soon as this function is called, but
// the RPC can block on the register channel being closed. If we are
// starting for the first time and have not got our identity, the
// authentication token could be set to an empty string. This will result in
// a failed RPC when the call is unblocked.
//
// Although this will be quickly retried, we want to ensure that we do not
// throw errors into the logs or perform calls we know will fail if we can
// avoid it. Therefore, we wait for the registered channel to be closed,
// indicating the client has registered and has an identity token.
//
// This is a prevalent problem when the Nomad agent is run in development
// mode, as the server needs to start and have its encrypter ready, before
// it can generate identities.
select {
case <-c.shutdownCh:
return
case <-c.registeredCh:
}
// The request and response for getting the map of allocations that should
// be running on the Node to their AllocModifyIndex which is incremented
// when the allocation is updated by the servers.
@@ -2352,7 +2456,7 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
// After the first request, only require monotonically
// increasing state.
AllowStale: false,
AuthToken: c.secretNodeID(),
AuthToken: c.nodeAuthToken(),
},
}
var resp structs.NodeClientAllocsResponse
@@ -2363,7 +2467,7 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
QueryOptions: structs.QueryOptions{
Region: c.Region(),
AllowStale: true,
AuthToken: c.secretNodeID(),
AuthToken: c.nodeAuthToken(),
},
}
var allocsResp structs.AllocsGetResponse
@@ -2373,6 +2477,9 @@ OUTER:
// Get the allocation modify index map, blocking for updates. We will
// use this to determine exactly what allocations need to be downloaded
// in full.
req.AuthToken = c.nodeAuthToken()
resp = structs.NodeClientAllocsResponse{}
err := c.RPC("Node.GetClientAllocs", &req, &resp)
if err != nil {
@@ -2463,6 +2570,7 @@ OUTER:
// Pull the allocations that need to be updated.
allocsReq.AllocIDs = pull
allocsReq.MinQueryIndex = pullIndex - 1
allocsReq.AuthToken = c.nodeAuthToken()
allocsResp = structs.AllocsGetResponse{}
if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil {
c.logger.Error("error querying updated allocations", "error", err)

View File

@@ -23,6 +23,7 @@ import (
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/servers"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/state"
cstate "github.com/hashicorp/nomad/client/state"
@@ -30,6 +31,7 @@ import (
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad"
@@ -1393,6 +1395,38 @@ func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) {
}
}
func TestClient_nodeAuthToken(t *testing.T) {
ci.Parallel(t)
testClient, testClientCleanup := TestClient(t, func(c *config.Config) {
c.Node.ID = uuid.Generate()
})
defer func() {
_ = testClientCleanup()
}()
must.Eq(t, testClient.GetConfig().Node.SecretID, testClient.nodeAuthToken())
testClient.setNodeIdentityToken("my-identity-token")
must.Eq(t, "my-identity-token", testClient.nodeAuthToken())
}
func TestClient_setNodeIdentityToken(t *testing.T) {
ci.Parallel(t)
testClient, testClientCleanup := TestClient(t, func(c *config.Config) {
c.Node.ID = uuid.Generate()
})
defer func() {
_ = testClientCleanup()
}()
must.Eq(t, "", testClient.nodeIdentityToken())
testClient.setNodeIdentityToken("my-identity-token")
must.Eq(t, "my-identity-token", testClient.nodeIdentityToken())
}
// TestClient_ServerList tests client methods that interact with the internal
// nomad server list.
func TestClient_ServerList(t *testing.T) {
@@ -1419,6 +1453,56 @@ func TestClient_ServerList(t *testing.T) {
}
}
func TestClient_handleNodeUpdateResponse(t *testing.T) {
ci.Parallel(t)
testClient, testClientCleanup := TestClient(t, func(c *config.Config) {
c.StateDBFactory = func(logger hclog.Logger, stateDir string) (state.StateDB, error) {
return cstate.NewMemDB(logger), nil
}
})
defer func() {
_ = testClientCleanup()
}()
// Assert our starting state, so we can ensure we are not testing for values
// that already exist.
must.Eq(t, 0, testClient.servers.NumNodes())
must.Eq(t, 0, testClient.servers.NumServers())
must.Eq(t, []*servers.Server{}, testClient.servers.GetServers())
must.Eq(t, "", testClient.nodeIdentityToken())
stateIdentity, err := testClient.stateDB.GetNodeIdentity()
must.NoError(t, err)
must.Eq(t, "", stateIdentity)
updateResp := structs.NodeUpdateResponse{
NumNodes: 1010,
Servers: []*structs.NodeServerInfo{
{RPCAdvertiseAddr: "10.0.0.1:4647", Datacenter: "dc1"},
{RPCAdvertiseAddr: "10.0.0.2:4647", Datacenter: "dc1"},
{RPCAdvertiseAddr: "10.0.0.3:4647", Datacenter: "dc1"},
},
SignedIdentity: pointer.Of("node-identity"),
}
// Perform the update and test the outcome.
must.NoError(t, testClient.handleNodeUpdateResponse(updateResp))
must.Eq(t, 1010, testClient.servers.NumNodes())
must.Eq(t, 3, testClient.servers.NumServers())
must.SliceContainsAllEqual(t, []*servers.Server{
{Addr: &net.TCPAddr{IP: net.ParseIP("10.0.0.1"), Port: 4647}},
{Addr: &net.TCPAddr{IP: net.ParseIP("10.0.0.2"), Port: 4647}},
{Addr: &net.TCPAddr{IP: net.ParseIP("10.0.0.3"), Port: 4647}},
}, testClient.servers.GetServers())
must.Eq(t, "node-identity", testClient.nodeIdentityToken())
stateIdentity, err = testClient.stateDB.GetNodeIdentity()
must.NoError(t, err)
must.Eq(t, "node-identity", stateIdentity)
}
func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
ci.Parallel(t)

View File

@@ -34,7 +34,9 @@ func (c *Client) DrainSelf() error {
MarkEligible: false,
Meta: map[string]string{"message": "shutting down"},
WriteRequest: structs.WriteRequest{
Region: c.Region(), AuthToken: c.secretNodeID()},
Region: c.Region(),
AuthToken: c.nodeAuthToken(),
},
}
if drainSpec.Deadline > 0 {
drainReq.DrainStrategy.ForceDeadline = now.Add(drainSpec.Deadline)
@@ -94,7 +96,9 @@ func (c *Client) pollServerForDrainStatus(ctx context.Context, interval time.Dur
NodeID: c.NodeID(),
SecretID: c.secretNodeID(),
QueryOptions: structs.QueryOptions{
Region: c.Region(), AuthToken: c.secretNodeID()},
Region: c.Region(),
AuthToken: c.nodeAuthToken(),
},
}
var statusResp structs.SingleNodeResponse

21
client/identity.go Normal file
View File

@@ -0,0 +1,21 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package client
// NodeIdentityHandler is an interface that allows setting a node identity
// token. The client uses this to inform its subsystems about a new node
// identity that it should use for RPC calls.
type NodeIdentityHandler interface {
SetNodeIdentityToken(token string)
}
// assertAndSetNodeIdentityToken expects the passed interface implements
// NodeIdentityHandler and calls SetNodeIdentityToken. It is a programming error
// if the interface does not implement NodeIdentityHandler and will panic. The
// test file performs test assertions.
func assertAndSetNodeIdentityToken(impl any, token string) {
if impl != nil {
impl.(NodeIdentityHandler).SetNodeIdentityToken(token)
}
}

32
client/identity_test.go Normal file
View File

@@ -0,0 +1,32 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package client
import (
"testing"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/serviceregistration/nsd"
"github.com/hashicorp/nomad/client/widmgr"
"github.com/shoenig/test/must"
)
var (
_ NodeIdentityHandler = (*widmgr.Signer)(nil)
_ NodeIdentityHandler = (*nsd.ServiceRegistrationHandler)(nil)
)
func Test_assertAndSetNodeIdentityToken(t *testing.T) {
ci.Parallel(t)
// Call the function with a non-nil object that implements the interface and
// verify that SetNodeIdentityToken is called with the expected token.
testImpl := &testHandler{}
assertAndSetNodeIdentityToken(testImpl, "test-token")
must.Eq(t, "test-token", testImpl.t)
}
type testHandler struct{ t string }
func (t *testHandler) SetNodeIdentityToken(token string) { t.t = token }

View File

@@ -493,7 +493,7 @@ func resolveServer(s string) (net.Addr, error) {
func (c *Client) Ping(srv net.Addr) error {
pingRequest := &structs.GenericRequest{
QueryOptions: structs.QueryOptions{
AuthToken: c.secretNodeID(),
AuthToken: c.nodeAuthToken(),
},
}
var reply struct{}

View File

@@ -9,6 +9,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-hclog"
@@ -33,6 +34,11 @@ type ServiceRegistrationHandler struct {
// registering new ones.
registrationEnabled bool
// nodeAuthToken is the token the node is using for RPC authentication with
// the servers. This is an atomic value as the node identity is periodically
// renewed, meaning this value is updated while potentially being read.
nodeAuthToken atomic.Value
// shutDownCh coordinates shutting down the handler and any long-running
// processes, such as the RPC retry.
shutDownCh chan struct{}
@@ -102,6 +108,11 @@ func NewServiceRegistrationHandler(log hclog.Logger, cfg *ServiceRegistrationHan
return s
}
// SetNodeIdentityToken fulfills the NodeIdentityHandler interface, allowing
// the client to update the node identity token used for RPC calls when it is
// renewed.
func (s *ServiceRegistrationHandler) SetNodeIdentityToken(token string) { s.nodeAuthToken.Store(token) }
func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistration.WorkloadServices) error {
// Check whether we are enabled or not first. Hitting this likely means
// there is a bug within the implicit constraint, or process using it, as
@@ -148,7 +159,7 @@ func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistrat
Services: registrations,
WriteRequest: structs.WriteRequest{
Region: s.cfg.Region,
AuthToken: s.cfg.NodeSecret,
AuthToken: s.authToken(),
},
}
@@ -201,7 +212,7 @@ func (s *ServiceRegistrationHandler) removeWorkload(
WriteRequest: structs.WriteRequest{
Region: s.cfg.Region,
Namespace: workload.ProviderNamespace,
AuthToken: s.cfg.NodeSecret,
AuthToken: s.authToken(),
},
}
@@ -390,3 +401,14 @@ func (s *ServiceRegistrationHandler) generateNomadServiceRegistration(
Port: port,
}, nil
}
// authToken returns the current authentication token used for RPC calls. It
// will use the node identity token if it is set, otherwise it will fallback to
// the node secret. This handles the case where the node is upgraded before the
// Nomad servers and should be removed in Nomad 1.13.
func (s *ServiceRegistrationHandler) authToken() string {
if id := s.nodeAuthToken.Load(); id != nil {
return id.(string)
}
return s.cfg.NodeSecret
}

View File

@@ -5,6 +5,7 @@ package widmgr
import (
"fmt"
"sync/atomic"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
@@ -34,9 +35,10 @@ type SignerConfig struct {
// Signer fetches and validates workload identities.
type Signer struct {
nodeSecret string
region string
rpc RPCer
nodeSecret string
nodeIdentityToken atomic.Value
region string
rpc RPCer
}
// NewSigner workload identity manager.
@@ -48,6 +50,11 @@ func NewSigner(c SignerConfig) *Signer {
}
}
// SetNodeIdentityToken fulfills the NodeIdentityHandler interface, allowing
// the client to update the node identity token used for RPC calls when it is
// renewed.
func (s *Signer) SetNodeIdentityToken(token string) { s.nodeIdentityToken.Store(token) }
// SignIdentities wraps the Alloc.SignIdentities RPC and retrieves signed
// workload identities. The minIndex should be set to the lowest allocation
// CreateIndex to ensure that the server handling the request isn't so stale
@@ -62,6 +69,15 @@ func (s *Signer) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentity
return nil, fmt.Errorf("no identities to sign")
}
// Default to using the node secret, but if the node identity token is set,
// this will be used instead. This handles the case where the node is
// upgraded before the Nomad servers and should be removed in Nomad 1.13.
authToken := s.nodeSecret
if id := s.nodeIdentityToken.Load(); id != nil {
authToken = id.(string)
}
args := structs.AllocIdentitiesRequest{
Identities: req,
QueryOptions: structs.QueryOptions{
@@ -73,7 +89,7 @@ func (s *Signer) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentity
// Server to block at least until the Allocation is created.
MinQueryIndex: minIndex - 1,
AllowStale: true,
AuthToken: s.nodeSecret,
AuthToken: authToken,
},
}
reply := structs.AllocIdentitiesResponse{}