diff --git a/client/client.go b/client/client.go index d993903c2..74abd0c16 100644 --- a/client/client.go +++ b/client/client.go @@ -15,6 +15,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" consulapi "github.com/hashicorp/consul/api" @@ -333,6 +334,11 @@ type Client struct { // users is a pool of dynamic workload users users dynamic.Pool + + // identity is the node identity token that has been generated and signed by + // the servers. This is used to authenticate the client to the servers when + // performing RPC calls. + identity atomic.Value } var ( @@ -395,6 +401,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie getter: getter.New(cfg.Artifact, logger), EnterpriseClient: newEnterpriseClient(logger), allocrunnerFactory: cfg.AllocRunnerFactory, + identity: atomic.Value{}, } // we can't have this set in the default Config because of import cycles @@ -604,6 +611,23 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie logger.Warn("batch fingerprint operation timed out; proceeding to register with fingerprinted plugins so far") } + // Attempt to pull the node identity from the state database. If the client + // is starting for the first time, this will be empty, so avoid an + // unnecessary set call to the client atomic. This needs to happen before we + // start heartbeating to avoid unnecessary identity generation and load on + // the Nomad servers. + // + // If the DB returns an error, it is more than likely that the full + // restoration will fail. It isn't terminal for us at this point though, as + // we can generate a new identity on registration. + clientIdentity, err := c.stateDB.GetNodeIdentity() + if err != nil { + logger.Error("failed to get client identity from state", "error", err) + } + if clientIdentity != "" { + c.setNodeIdentityToken(clientIdentity) + } + // Register and then start heartbeating to the servers. c.shutdownGroup.Go(c.registerAndHeartbeat) @@ -903,11 +927,57 @@ func (c *Client) NodeID() string { return c.GetConfig().Node.ID } -// secretNodeID returns the secret node ID for the given client +// secretNodeID returns the secret node ID for the given client. This is no +// longer used as the primary authentication method for Nomad clients. In fully +// upgraded clusters, the node identity token is used instead. It will still be +// used if the client has been upgraded, but the Nomad server has not. Most +// callers should use the nodeAuthToken function instead of this as it correctly +// handles both authentication token methods. There are some limited places +// where the secret node ID is still used on the RPC request object such as +// "Node.GetClientAllocs". func (c *Client) secretNodeID() string { return c.GetConfig().Node.SecretID } +// nodeAuthToken will return the authentication token for the client. This will +// return the node identity token if it is set, otherwise it will return the +// secret node ID. +// +// The callers of this should be moved to nodeIdentityToken in Nomad 1.13 when +// all clients should be using the node identity token. +func (c *Client) nodeAuthToken() string { + if nID := c.nodeIdentityToken(); nID != "" { + return nID + } + return c.secretNodeID() +} + +// nodeIdentityToken returns the node identity token for the given client. If +// the client is coming up for the first time, restarting, or is in a cluster +// where the Nomad servers have not been upgraded to support the node identity, +// this will be empty. Callers should use the nodeAuthToken function instead of +// this as it correctly handles both authentication token methods. +func (c *Client) nodeIdentityToken() string { + if v := c.identity.Load(); v != nil { + return v.(string) + } + return "" +} + +// setNodeIdentityToken handles storing and updating all the client backend +// processes with a new node identity token. +func (c *Client) setNodeIdentityToken(token string) { + + // Store the token on the client as the first step, so it's available for + // use by all RPCs immediately. + c.identity.Store(token) + + // Update the Nomad service registration handler and workload identity + // signer processes. + assertAndSetNodeIdentityToken(c.nomadService, token) + assertAndSetNodeIdentityToken(c.widsigner, token) +} + // Shutdown is used to tear down the client func (c *Client) Shutdown() error { c.shutdownLock.Lock() @@ -1954,7 +2024,7 @@ func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error { NodeEvents: nodeEvents, WriteRequest: structs.WriteRequest{ Region: c.Region(), - AuthToken: c.secretNodeID(), + AuthToken: c.nodeAuthToken(), }, } var resp structs.EmitNodeEventsResponse @@ -2053,7 +2123,7 @@ func (c *Client) getRegistrationToken() string { select { case <-c.registeredCh: - return c.secretNodeID() + return c.nodeAuthToken() default: // If we haven't yet closed the registeredCh we're either starting for // the 1st time or we've just restarted. Check the local state to see if @@ -2065,7 +2135,7 @@ func (c *Client) getRegistrationToken() string { } if registration != nil && registration.HasRegistered { c.registeredOnce.Do(func() { close(c.registeredCh) }) - return c.secretNodeID() + return c.nodeAuthToken() } } return "" @@ -2086,6 +2156,11 @@ func (c *Client) registerNode(authToken string) error { return err } + // + if err := c.handleNodeUpdateResponse(resp); err != nil { + return err + } + // Signal that we've registered once so that RPCs sent from the client can // send authenticated requests. Persist this information in the state so // that we don't block restoring running allocs when restarting while @@ -2100,11 +2175,6 @@ func (c *Client) registerNode(authToken string) error { close(c.registeredCh) }) - err := c.handleNodeUpdateResponse(resp) - if err != nil { - return err - } - // Update the node status to ready after we register. c.UpdateConfig(func(c *config.Config) { c.Node.Status = structs.NodeStatusReady @@ -2131,7 +2201,7 @@ func (c *Client) updateNodeStatus() error { Status: structs.NodeStatusReady, WriteRequest: structs.WriteRequest{ Region: c.Region(), - AuthToken: c.secretNodeID(), + AuthToken: c.nodeAuthToken(), }, } var resp structs.NodeUpdateResponse @@ -2175,9 +2245,8 @@ func (c *Client) updateNodeStatus() error { } }) - err := c.handleNodeUpdateResponse(resp) - if err != nil { - return fmt.Errorf("heartbeat response returned no valid servers") + if err := c.handleNodeUpdateResponse(resp); err != nil { + return fmt.Errorf("failed to handle node update response: %w", err) } // If there's no Leader in the response we may be talking to a partitioned @@ -2195,6 +2264,20 @@ func (c *Client) handleNodeUpdateResponse(resp structs.NodeUpdateResponse) error // rebalance rate. c.servers.SetNumNodes(resp.NumNodes) + // If the response includes a new identity, set it and save it to the state + // DB. + // + // In the unlikely event that we cannot write the identity to the state DB, + // we do not want to set the client identity token. That would mean the + // client memory state and persistent state DB are out of sync. Instead, we + // return an error and wait until the next heartbeat to try again. + if resp.SignedIdentity != nil { + if err := c.stateDB.PutNodeIdentity(*resp.SignedIdentity); err != nil { + return fmt.Errorf("error saving client identity: %w", err) + } + c.setNodeIdentityToken(*resp.SignedIdentity) + } + // Convert []*NodeServerInfo to []*servers.Server nomadServers := make([]*servers.Server, 0, len(resp.Servers)) for _, s := range resp.Servers { @@ -2277,7 +2360,7 @@ func (c *Client) allocSync() { Alloc: toSync, WriteRequest: structs.WriteRequest{ Region: c.Region(), - AuthToken: c.secretNodeID(), + AuthToken: c.nodeAuthToken(), }, } @@ -2336,6 +2419,27 @@ type allocUpdates struct { // watchAllocations is used to scan for updates to allocations func (c *Client) watchAllocations(updates chan *allocUpdates) { + + // The request object is generated as soon as this function is called, but + // the RPC can block on the register channel being closed. If we are + // starting for the first time and have not got our identity, the + // authentication token could be set to an empty string. This will result in + // a failed RPC when the call is unblocked. + // + // Although this will be quickly retried, we want to ensure that we do not + // throw errors into the logs or perform calls we know will fail if we can + // avoid it. Therefore, we wait for the registered channel to be closed, + // indicating the client has registered and has an identity token. + // + // This is a prevalent problem when the Nomad agent is run in development + // mode, as the server needs to start and have its encrypter ready, before + // it can generate identities. + select { + case <-c.shutdownCh: + return + case <-c.registeredCh: + } + // The request and response for getting the map of allocations that should // be running on the Node to their AllocModifyIndex which is incremented // when the allocation is updated by the servers. @@ -2352,7 +2456,7 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { // After the first request, only require monotonically // increasing state. AllowStale: false, - AuthToken: c.secretNodeID(), + AuthToken: c.nodeAuthToken(), }, } var resp structs.NodeClientAllocsResponse @@ -2363,7 +2467,7 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { QueryOptions: structs.QueryOptions{ Region: c.Region(), AllowStale: true, - AuthToken: c.secretNodeID(), + AuthToken: c.nodeAuthToken(), }, } var allocsResp structs.AllocsGetResponse @@ -2373,6 +2477,9 @@ OUTER: // Get the allocation modify index map, blocking for updates. We will // use this to determine exactly what allocations need to be downloaded // in full. + + req.AuthToken = c.nodeAuthToken() + resp = structs.NodeClientAllocsResponse{} err := c.RPC("Node.GetClientAllocs", &req, &resp) if err != nil { @@ -2463,6 +2570,7 @@ OUTER: // Pull the allocations that need to be updated. allocsReq.AllocIDs = pull allocsReq.MinQueryIndex = pullIndex - 1 + allocsReq.AuthToken = c.nodeAuthToken() allocsResp = structs.AllocsGetResponse{} if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil { c.logger.Error("error querying updated allocations", "error", err) diff --git a/client/client_test.go b/client/client_test.go index d3dafd194..cfc3cd369 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -23,6 +23,7 @@ import ( trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/client/servers" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/state" cstate "github.com/hashicorp/nomad/client/state" @@ -30,6 +31,7 @@ import ( "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/pluginutils/catalog" "github.com/hashicorp/nomad/helper/pluginutils/singleton" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad" @@ -1393,6 +1395,38 @@ func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) { } } +func TestClient_nodeAuthToken(t *testing.T) { + ci.Parallel(t) + + testClient, testClientCleanup := TestClient(t, func(c *config.Config) { + c.Node.ID = uuid.Generate() + }) + defer func() { + _ = testClientCleanup() + }() + + must.Eq(t, testClient.GetConfig().Node.SecretID, testClient.nodeAuthToken()) + + testClient.setNodeIdentityToken("my-identity-token") + must.Eq(t, "my-identity-token", testClient.nodeAuthToken()) +} + +func TestClient_setNodeIdentityToken(t *testing.T) { + ci.Parallel(t) + + testClient, testClientCleanup := TestClient(t, func(c *config.Config) { + c.Node.ID = uuid.Generate() + }) + defer func() { + _ = testClientCleanup() + }() + + must.Eq(t, "", testClient.nodeIdentityToken()) + + testClient.setNodeIdentityToken("my-identity-token") + must.Eq(t, "my-identity-token", testClient.nodeIdentityToken()) +} + // TestClient_ServerList tests client methods that interact with the internal // nomad server list. func TestClient_ServerList(t *testing.T) { @@ -1419,6 +1453,56 @@ func TestClient_ServerList(t *testing.T) { } } +func TestClient_handleNodeUpdateResponse(t *testing.T) { + ci.Parallel(t) + + testClient, testClientCleanup := TestClient(t, func(c *config.Config) { + c.StateDBFactory = func(logger hclog.Logger, stateDir string) (state.StateDB, error) { + return cstate.NewMemDB(logger), nil + } + }) + defer func() { + _ = testClientCleanup() + }() + + // Assert our starting state, so we can ensure we are not testing for values + // that already exist. + must.Eq(t, 0, testClient.servers.NumNodes()) + must.Eq(t, 0, testClient.servers.NumServers()) + must.Eq(t, []*servers.Server{}, testClient.servers.GetServers()) + must.Eq(t, "", testClient.nodeIdentityToken()) + + stateIdentity, err := testClient.stateDB.GetNodeIdentity() + must.NoError(t, err) + must.Eq(t, "", stateIdentity) + + updateResp := structs.NodeUpdateResponse{ + NumNodes: 1010, + Servers: []*structs.NodeServerInfo{ + {RPCAdvertiseAddr: "10.0.0.1:4647", Datacenter: "dc1"}, + {RPCAdvertiseAddr: "10.0.0.2:4647", Datacenter: "dc1"}, + {RPCAdvertiseAddr: "10.0.0.3:4647", Datacenter: "dc1"}, + }, + SignedIdentity: pointer.Of("node-identity"), + } + + // Perform the update and test the outcome. + must.NoError(t, testClient.handleNodeUpdateResponse(updateResp)) + + must.Eq(t, 1010, testClient.servers.NumNodes()) + must.Eq(t, 3, testClient.servers.NumServers()) + must.SliceContainsAllEqual(t, []*servers.Server{ + {Addr: &net.TCPAddr{IP: net.ParseIP("10.0.0.1"), Port: 4647}}, + {Addr: &net.TCPAddr{IP: net.ParseIP("10.0.0.2"), Port: 4647}}, + {Addr: &net.TCPAddr{IP: net.ParseIP("10.0.0.3"), Port: 4647}}, + }, testClient.servers.GetServers()) + must.Eq(t, "node-identity", testClient.nodeIdentityToken()) + + stateIdentity, err = testClient.stateDB.GetNodeIdentity() + must.NoError(t, err) + must.Eq(t, "node-identity", stateIdentity) +} + func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) { ci.Parallel(t) diff --git a/client/drain.go b/client/drain.go index a1bb34e8c..6644bb8bd 100644 --- a/client/drain.go +++ b/client/drain.go @@ -34,7 +34,9 @@ func (c *Client) DrainSelf() error { MarkEligible: false, Meta: map[string]string{"message": "shutting down"}, WriteRequest: structs.WriteRequest{ - Region: c.Region(), AuthToken: c.secretNodeID()}, + Region: c.Region(), + AuthToken: c.nodeAuthToken(), + }, } if drainSpec.Deadline > 0 { drainReq.DrainStrategy.ForceDeadline = now.Add(drainSpec.Deadline) @@ -94,7 +96,9 @@ func (c *Client) pollServerForDrainStatus(ctx context.Context, interval time.Dur NodeID: c.NodeID(), SecretID: c.secretNodeID(), QueryOptions: structs.QueryOptions{ - Region: c.Region(), AuthToken: c.secretNodeID()}, + Region: c.Region(), + AuthToken: c.nodeAuthToken(), + }, } var statusResp structs.SingleNodeResponse diff --git a/client/identity.go b/client/identity.go new file mode 100644 index 000000000..b108aadf4 --- /dev/null +++ b/client/identity.go @@ -0,0 +1,21 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package client + +// NodeIdentityHandler is an interface that allows setting a node identity +// token. The client uses this to inform its subsystems about a new node +// identity that it should use for RPC calls. +type NodeIdentityHandler interface { + SetNodeIdentityToken(token string) +} + +// assertAndSetNodeIdentityToken expects the passed interface implements +// NodeIdentityHandler and calls SetNodeIdentityToken. It is a programming error +// if the interface does not implement NodeIdentityHandler and will panic. The +// test file performs test assertions. +func assertAndSetNodeIdentityToken(impl any, token string) { + if impl != nil { + impl.(NodeIdentityHandler).SetNodeIdentityToken(token) + } +} diff --git a/client/identity_test.go b/client/identity_test.go new file mode 100644 index 000000000..6f2863b6e --- /dev/null +++ b/client/identity_test.go @@ -0,0 +1,32 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package client + +import ( + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/serviceregistration/nsd" + "github.com/hashicorp/nomad/client/widmgr" + "github.com/shoenig/test/must" +) + +var ( + _ NodeIdentityHandler = (*widmgr.Signer)(nil) + _ NodeIdentityHandler = (*nsd.ServiceRegistrationHandler)(nil) +) + +func Test_assertAndSetNodeIdentityToken(t *testing.T) { + ci.Parallel(t) + + // Call the function with a non-nil object that implements the interface and + // verify that SetNodeIdentityToken is called with the expected token. + testImpl := &testHandler{} + assertAndSetNodeIdentityToken(testImpl, "test-token") + must.Eq(t, "test-token", testImpl.t) +} + +type testHandler struct{ t string } + +func (t *testHandler) SetNodeIdentityToken(token string) { t.t = token } diff --git a/client/rpc.go b/client/rpc.go index 9d3441119..69db60c45 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -493,7 +493,7 @@ func resolveServer(s string) (net.Addr, error) { func (c *Client) Ping(srv net.Addr) error { pingRequest := &structs.GenericRequest{ QueryOptions: structs.QueryOptions{ - AuthToken: c.secretNodeID(), + AuthToken: c.nodeAuthToken(), }, } var reply struct{} diff --git a/client/serviceregistration/nsd/nsd.go b/client/serviceregistration/nsd/nsd.go index 45a5aaf14..3cf97a2fb 100644 --- a/client/serviceregistration/nsd/nsd.go +++ b/client/serviceregistration/nsd/nsd.go @@ -9,6 +9,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "github.com/hashicorp/go-hclog" @@ -33,6 +34,11 @@ type ServiceRegistrationHandler struct { // registering new ones. registrationEnabled bool + // nodeAuthToken is the token the node is using for RPC authentication with + // the servers. This is an atomic value as the node identity is periodically + // renewed, meaning this value is updated while potentially being read. + nodeAuthToken atomic.Value + // shutDownCh coordinates shutting down the handler and any long-running // processes, such as the RPC retry. shutDownCh chan struct{} @@ -102,6 +108,11 @@ func NewServiceRegistrationHandler(log hclog.Logger, cfg *ServiceRegistrationHan return s } +// SetNodeIdentityToken fulfills the NodeIdentityHandler interface, allowing +// the client to update the node identity token used for RPC calls when it is +// renewed. +func (s *ServiceRegistrationHandler) SetNodeIdentityToken(token string) { s.nodeAuthToken.Store(token) } + func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistration.WorkloadServices) error { // Check whether we are enabled or not first. Hitting this likely means // there is a bug within the implicit constraint, or process using it, as @@ -148,7 +159,7 @@ func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistrat Services: registrations, WriteRequest: structs.WriteRequest{ Region: s.cfg.Region, - AuthToken: s.cfg.NodeSecret, + AuthToken: s.authToken(), }, } @@ -201,7 +212,7 @@ func (s *ServiceRegistrationHandler) removeWorkload( WriteRequest: structs.WriteRequest{ Region: s.cfg.Region, Namespace: workload.ProviderNamespace, - AuthToken: s.cfg.NodeSecret, + AuthToken: s.authToken(), }, } @@ -390,3 +401,14 @@ func (s *ServiceRegistrationHandler) generateNomadServiceRegistration( Port: port, }, nil } + +// authToken returns the current authentication token used for RPC calls. It +// will use the node identity token if it is set, otherwise it will fallback to +// the node secret. This handles the case where the node is upgraded before the +// Nomad servers and should be removed in Nomad 1.13. +func (s *ServiceRegistrationHandler) authToken() string { + if id := s.nodeAuthToken.Load(); id != nil { + return id.(string) + } + return s.cfg.NodeSecret +} diff --git a/client/widmgr/signer.go b/client/widmgr/signer.go index 2102d5324..a137c4598 100644 --- a/client/widmgr/signer.go +++ b/client/widmgr/signer.go @@ -5,6 +5,7 @@ package widmgr import ( "fmt" + "sync/atomic" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" @@ -34,9 +35,10 @@ type SignerConfig struct { // Signer fetches and validates workload identities. type Signer struct { - nodeSecret string - region string - rpc RPCer + nodeSecret string + nodeIdentityToken atomic.Value + region string + rpc RPCer } // NewSigner workload identity manager. @@ -48,6 +50,11 @@ func NewSigner(c SignerConfig) *Signer { } } +// SetNodeIdentityToken fulfills the NodeIdentityHandler interface, allowing +// the client to update the node identity token used for RPC calls when it is +// renewed. +func (s *Signer) SetNodeIdentityToken(token string) { s.nodeIdentityToken.Store(token) } + // SignIdentities wraps the Alloc.SignIdentities RPC and retrieves signed // workload identities. The minIndex should be set to the lowest allocation // CreateIndex to ensure that the server handling the request isn't so stale @@ -62,6 +69,15 @@ func (s *Signer) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentity return nil, fmt.Errorf("no identities to sign") } + // Default to using the node secret, but if the node identity token is set, + // this will be used instead. This handles the case where the node is + // upgraded before the Nomad servers and should be removed in Nomad 1.13. + authToken := s.nodeSecret + + if id := s.nodeIdentityToken.Load(); id != nil { + authToken = id.(string) + } + args := structs.AllocIdentitiesRequest{ Identities: req, QueryOptions: structs.QueryOptions{ @@ -73,7 +89,7 @@ func (s *Signer) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentity // Server to block at least until the Allocation is created. MinQueryIndex: minIndex - 1, AllowStale: true, - AuthToken: s.nodeSecret, + AuthToken: authToken, }, } reply := structs.AllocIdentitiesResponse{}