From 86d2cdcf8089ab3cc0251a4f660d2468e5038de9 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Thu, 21 Sep 2023 17:31:27 +0200 Subject: [PATCH] client: split identity_hook across allocrunner and taskrunner (#18431) This commit splits identity_hook between the allocrunner and taskrunner. The allocrunner-level part of the hook signs each task identity, and the taskrunner-level part picks it up and stores secrets for each task. The code revamps the WIDMgr, which is now split into 2 interfaces: IdentityManager which manages renewals of signatures and handles sending updates to subscribers via Watch method, and IdentitySigner which only does the signing. This work is necessary for having a unified Consul login workflow that comes with the new Consul integration. A new, allocrunner-level consul_hook will now be the only hook doing Consul authentication. --- client/allocrunner/alloc_runner.go | 13 +- client/allocrunner/alloc_runner_hooks.go | 1 + client/allocrunner/identity_hook.go | 49 +++ client/allocrunner/identity_hook_test.go | 87 ++++ .../allocrunner/taskrunner/identity_hook.go | 229 ++-------- .../taskrunner/identity_hook_test.go | 192 ++------- client/allocrunner/taskrunner/logmon_hook.go | 4 +- client/allocrunner/taskrunner/task_runner.go | 9 +- .../taskrunner/task_runner_test.go | 13 +- client/client.go | 10 +- client/config/arconfig.go | 7 +- client/structs/allochook.go | 5 + client/widmgr/mock.go | 93 ++++ client/widmgr/signer.go | 110 +++++ client/widmgr/widmgr.go | 408 +++++++++++++++--- client/widmgr/widmgr_test.go | 4 +- nomad/structs/structs.go | 4 + 17 files changed, 829 insertions(+), 409 deletions(-) create mode 100644 client/allocrunner/identity_hook.go create mode 100644 client/allocrunner/identity_hook_test.go create mode 100644 client/widmgr/mock.go create mode 100644 client/widmgr/signer.go diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index a84fb312a..b4f52193a 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -206,8 +206,11 @@ type allocRunner struct { // partitions is an interface for managing cpuset partitions partitions cinterfaces.CPUPartitions - // widmgr fetches workload identities - widmgr *widmgr.WIDMgr + // widsigner signs workload identities + widsigner widmgr.IdentitySigner + + // widmgr manages workload identity signatures + widmgr widmgr.IdentityManager } // NewAllocRunner returns a new allocation runner. @@ -251,7 +254,7 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e wranglers: config.Wranglers, partitions: config.Partitions, hookResources: cstructs.NewAllocHookResources(), - widmgr: config.WIDMgr, + widsigner: config.WIDSigner, } // Create the logger based on the allocation ID @@ -269,6 +272,10 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e ar.shutdownDelayCtx = shutdownDelayCtx ar.shutdownDelayCancelFn = shutdownDelayCancel + // initialize the workload identity manager + widmgr := widmgr.NewWIDMgr(ar.widsigner, alloc, ar.logger) + ar.widmgr = widmgr + // Initialize the runners hooks. if err := ar.initRunnerHooks(config.ClientConfig); err != nil { return nil, err diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 76ae3710c..1a609f12e 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -118,6 +118,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { // directory path exists for other hooks. alloc := ar.Alloc() ar.runnerHooks = []interfaces.RunnerHook{ + newIdentityHook(hookLogger, ar.widmgr), newAllocDirHook(hookLogger, ar.allocDir), newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher), newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir), diff --git a/client/allocrunner/identity_hook.go b/client/allocrunner/identity_hook.go new file mode 100644 index 000000000..c9842fcad --- /dev/null +++ b/client/allocrunner/identity_hook.go @@ -0,0 +1,49 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package allocrunner + +import ( + "context" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/widmgr" +) + +type identityHook struct { + widmgr widmgr.IdentityManager + logger log.Logger +} + +func newIdentityHook(logger log.Logger, widmgr widmgr.IdentityManager) *identityHook { + h := &identityHook{ + widmgr: widmgr, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (*identityHook) Name() string { + return "identity" +} + +func (h *identityHook) Prerun() error { + // run the renewal + if err := h.widmgr.Run(); err != nil { + return err + } + + return nil +} + +// Stop implements interfaces.TaskStopHook +func (h *identityHook) Stop(context.Context, *interfaces.TaskStopRequest, *interfaces.TaskStopResponse) error { + h.widmgr.Shutdown() + return nil +} + +// Shutdown implements interfaces.ShutdownHook +func (h *identityHook) Shutdown() { + h.widmgr.Shutdown() +} diff --git a/client/allocrunner/identity_hook_test.go b/client/allocrunner/identity_hook_test.go new file mode 100644 index 000000000..5098b053d --- /dev/null +++ b/client/allocrunner/identity_hook_test.go @@ -0,0 +1,87 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package allocrunner + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/client/widmgr" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" +) + +// statically assert network hook implements the expected interfaces +var _ interfaces.RunnerPrerunHook = (*identityHook)(nil) +var _ interfaces.ShutdownHook = (*identityHook)(nil) +var _ interfaces.TaskStopHook = (*identityHook)(nil) + +func TestIdentityHook_Prerun(t *testing.T) { + ci.Parallel(t) + + ttl := 30 * time.Second + + wid := &structs.WorkloadIdentity{ + Name: "testing", + Audience: []string{"consul.io"}, + Env: true, + File: true, + TTL: ttl, + } + + alloc := mock.Alloc() + task := alloc.LookupTask("web") + task.Identity = wid + task.Identities = []*structs.WorkloadIdentity{wid} + + allocrunner, stopAR := TestAllocRunnerFromAlloc(t, alloc) + defer stopAR() + + logger := testlog.HCLogger(t) + + // setup mock signer and WIDMgr + mockSigner := widmgr.NewMockWIDSigner(task.Identities) + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, logger) + allocrunner.widmgr = mockWIDMgr + allocrunner.widsigner = mockSigner + + // do the initial signing + _, err := mockSigner.SignIdentities(1, []*structs.WorkloadIdentityRequest{ + { + AllocID: alloc.ID, + TaskName: task.Name, + IdentityName: task.Identities[0].Name, + }, + }) + must.NoError(t, err) + + start := time.Now() + hook := newIdentityHook(logger, mockWIDMgr) + must.Eq(t, hook.Name(), "identity") + must.NoError(t, hook.Prerun()) + + time.Sleep(time.Second) // give goroutines a moment to run + sid, err := hook.widmgr.Get(cstructs.TaskIdentity{ + TaskName: task.Name, + IdentityName: task.Identities[0].Name}, + ) + must.Nil(t, err) + must.Eq(t, sid.IdentityName, task.Identity.Name) + must.NotEq(t, sid.JWT, "") + + // pad expiry time with a second to be safe + must.Between(t, + start.Add(ttl).Add(-1*time.Second).Unix(), + sid.Expiration.Unix(), + start.Add(ttl).Add(1*time.Second).Unix(), + ) + + must.NoError(t, hook.Stop(context.Background(), nil, nil)) +} diff --git a/client/allocrunner/taskrunner/identity_hook.go b/client/allocrunner/taskrunner/identity_hook.go index 8de9344b6..c277abe3e 100644 --- a/client/allocrunner/taskrunner/identity_hook.go +++ b/client/allocrunner/taskrunner/identity_hook.go @@ -7,13 +7,13 @@ import ( "context" "fmt" "path/filepath" - "time" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" - "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/client/widmgr" "github.com/hashicorp/nomad/helper/users" "github.com/hashicorp/nomad/nomad/structs" ) @@ -27,12 +27,6 @@ const ( wiTokenFile = "nomad_token" ) -// IdentitySigner is the interface needed to retrieve signed identities for -// workload identities. At runtime it is implemented by *widmgr.WIDMgr. -type IdentitySigner interface { - SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error) -} - // tokenSetter provides methods for exposing workload identities to other // internal Nomad components. type tokenSetter interface { @@ -45,23 +39,15 @@ type identityHook struct { tokenDir string envBuilder *taskenv.Builder ts tokenSetter - widmgr IdentitySigner + widmgr widmgr.IdentityManager logger log.Logger - // minWait is the minimum amount of time to wait before renewing. Settable to - // ease testing. - minWait time.Duration - stopCtx context.Context stop context.CancelFunc } func newIdentityHook(tr *TaskRunner, logger log.Logger) *identityHook { - // Create a context for the renew loop. This context will be canceled when - // the task is stopped or agent is shutting down, unlike Prestart's ctx which - // is not intended for use after Prestart is returns. stopCtx, stop := context.WithCancel(context.Background()) - h := &identityHook{ alloc: tr.Alloc(), task: tr.Task(), @@ -69,7 +55,6 @@ func newIdentityHook(tr *TaskRunner, logger log.Logger) *identityHook { envBuilder: tr.envBuilder, ts: tr, widmgr: tr.widmgr, - minWait: 10 * time.Second, stopCtx: stopCtx, stop: stop, } @@ -88,39 +73,43 @@ func (h *identityHook) Prestart(context.Context, *interfaces.TaskPrestartRequest return err } - signedWIDs, err := h.getIdentities() - if err != nil { - return fmt.Errorf("error fetching alternate identities: %w", err) - } - + // Start token watcher loops for _, widspec := range h.task.Identities { - signedWID := signedWIDs[widspec.Name] - if signedWID == nil { - // The only way to hit this should be a bug as it indicates the server - // did not sign an identity for a task on this alloc. - return fmt.Errorf("missing workload identity %q", widspec.Name) - } - - if err := h.setAltToken(widspec, signedWID.JWT); err != nil { - return err - } + w := widspec + go h.watchIdentity(w) } - // Start token renewal loop - go h.renew(h.alloc.CreateIndex, signedWIDs) - return nil } -// Stop implements interfaces.TaskStopHook -func (h *identityHook) Stop(context.Context, *interfaces.TaskStopRequest, *interfaces.TaskStopResponse) error { - h.stop() - return nil -} +func (h *identityHook) watchIdentity(wid *structs.WorkloadIdentity) { + id := cstructs.TaskIdentity{TaskName: h.task.Name, IdentityName: wid.Name} + signedIdentitiesChan, stopWatching := h.widmgr.Watch(id) + defer stopWatching() -// Shutdown implements interfaces.ShutdownHook -func (h *identityHook) Shutdown() { - h.stop() + for { + select { + case signedWID, ok := <-signedIdentitiesChan: + h.logger.Trace("receiving renewed identity", "identity_name", wid.Name) + if !ok { + // Chan was closed, stop watching + h.logger.Trace("identity watch closed", "task", h.task.Name, "identity", wid.Name) + return + } + + if signedWID == nil { + // The only way to hit this should be a bug as it indicates the server + // did not sign an identity for a task on this alloc. + h.logger.Error("missing workload identity %q", wid.Name) + } + + if err := h.setAltToken(wid, signedWID.JWT); err != nil { + h.logger.Error(err.Error()) + } + case <-h.stopCtx.Done(): + return + } + } } // setDefaultToken adds the Nomad token to the task's environment and writes it to a @@ -163,151 +152,13 @@ func (h *identityHook) setAltToken(widspec *structs.WorkloadIdentity, rawJWT str return nil } -// getIdentities calls Alloc.SignIdentities to get all of the identities for -// this workload signed. If there are no identities to be signed then (nil, -// nil) is returned. -func (h *identityHook) getIdentities() (map[string]*structs.SignedWorkloadIdentity, error) { - - if len(h.task.Identities) == 0 { - return nil, nil - } - - req := make([]*structs.WorkloadIdentityRequest, len(h.task.Identities)) - for i, widspec := range h.task.Identities { - req[i] = &structs.WorkloadIdentityRequest{ - AllocID: h.alloc.ID, - TaskName: h.task.Name, - IdentityName: widspec.Name, - } - } - - // Get signed workload identities - signedWIDs, err := h.widmgr.SignIdentities(h.alloc.CreateIndex, req) - if err != nil { - return nil, err - } - - // Index initial workload identities by name - widMap := make(map[string]*structs.SignedWorkloadIdentity, len(signedWIDs)) - for _, wid := range signedWIDs { - widMap[wid.IdentityName] = wid - } - - return widMap, nil +// Stop implements interfaces.TaskStopHook +func (h *identityHook) Stop(context.Context, *interfaces.TaskStopRequest, *interfaces.TaskStopResponse) error { + h.stop() + return nil } -// renew fetches new signed workload identity tokens before the existing tokens -// expire. -func (h *identityHook) renew(createIndex uint64, signedWIDs map[string]*structs.SignedWorkloadIdentity) { - wids := h.task.Identities - if len(wids) == 0 { - h.logger.Trace("no workload identities to renew") - return - } - - var reqs []*structs.WorkloadIdentityRequest - renewNow := false - minExp := time.Now().Add(30 * time.Hour) // set high default expiration - widMap := make(map[string]*structs.WorkloadIdentity, len(wids)) // Identity.Name -> Identity - - for _, wid := range wids { - if wid.TTL == 0 { - // No ttl, so no need to renew it - continue - } - - widMap[wid.Name] = wid - - reqs = append(reqs, &structs.WorkloadIdentityRequest{ - AllocID: h.alloc.ID, - TaskName: h.task.Name, - IdentityName: wid.Name, - }) - - sid, ok := signedWIDs[wid.Name] - if !ok { - // Missing a signature, treat this case as already expired so we get a - // token ASAP - h.logger.Trace("missing token for identity", "identity", wid.Name) - renewNow = true - continue - } - - if sid.Expiration.Before(minExp) { - minExp = sid.Expiration - } - } - - if len(reqs) == 0 { - h.logger.Trace("no workload identities expire") - return - } - - var wait time.Duration - if !renewNow { - wait = helper.ExpiryToRenewTime(minExp, time.Now, h.minWait) - } - - timer, timerStop := helper.NewStoppedTimer() - defer timerStop() - - var retry uint64 - - for err := h.stopCtx.Err(); err == nil; { - h.logger.Debug("waiting to renew identities", "num", len(reqs), "wait", wait) - timer.Reset(wait) - select { - case <-timer.C: - h.logger.Trace("getting new signed identities", "num", len(reqs)) - case <-h.stopCtx.Done(): - return - } - - // Renew all tokens together since its cheap - tokens, err := h.widmgr.SignIdentities(createIndex, reqs) - if err != nil { - retry++ - wait = helper.Backoff(h.minWait, time.Hour, retry) + helper.RandomStagger(h.minWait) - h.logger.Error("error renewing workload identities", "error", err, "next", wait) - continue - } - - if len(tokens) == 0 { - retry++ - wait = helper.Backoff(h.minWait, time.Hour, retry) + helper.RandomStagger(h.minWait) - h.logger.Error("error renewing workload identities", "error", "no tokens", "next", wait) - continue - } - - // Reset next expiration time - minExp = time.Time{} - - for _, token := range tokens { - widspec, ok := widMap[token.IdentityName] - if !ok { - // Bug: Every requested workload identity should either have a signed - // identity or rejection. - h.logger.Warn("bug: unexpected workload identity received", "identity", token.IdentityName) - continue - } - - if err := h.setAltToken(widspec, token.JWT); err != nil { - // Set minExp using retry's backoff logic - minExp = time.Now().Add(helper.Backoff(h.minWait, time.Hour, retry+1) + helper.RandomStagger(h.minWait)) - h.logger.Error("error setting new workload identity", "error", err, "identity", token.IdentityName) - continue - } - - // Set next expiration time - if minExp.IsZero() { - minExp = token.Expiration - } else if token.Expiration.Before(minExp) { - minExp = token.Expiration - } - } - - // Success! Set next renewal and reset retries - wait = helper.ExpiryToRenewTime(minExp, time.Now, h.minWait) - retry = 0 - } +// Shutdown implements interfaces.ShutdownHook +func (h *identityHook) Shutdown() { + h.stop() } diff --git a/client/allocrunner/taskrunner/identity_hook_test.go b/client/allocrunner/taskrunner/identity_hook_test.go index 7f6a8dc1d..4d24a84d1 100644 --- a/client/allocrunner/taskrunner/identity_hook_test.go +++ b/client/allocrunner/taskrunner/identity_hook_test.go @@ -5,20 +5,15 @@ package taskrunner import ( "context" - "crypto/ed25519" - "fmt" "path/filepath" - "slices" "testing" "time" - "github.com/go-jose/go-jose/v3" - "github.com/go-jose/go-jose/v3/jwt" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/taskenv" + "github.com/hashicorp/nomad/client/widmgr" "github.com/hashicorp/nomad/helper/testlog" - "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -31,92 +26,6 @@ var _ interfaces.ShutdownHook = (*identityHook)(nil) // See task_runner_test.go:TestTaskRunner_IdentityHook -// MockWIDMgr allows TaskRunner unit tests to avoid having to setup a Server, -// Client, and Allocation. -type MockWIDMgr struct { - // wids maps identity names to workload identities. If wids is non-nil then - // SignIdentities will use it to find expirations or reject invalid identity - // names - wids map[string]*structs.WorkloadIdentity - - key ed25519.PrivateKey - keyID string -} - -func NewMockWIDMgr(wids []*structs.WorkloadIdentity) *MockWIDMgr { - _, privKey, err := ed25519.GenerateKey(nil) - if err != nil { - panic(err) - } - m := &MockWIDMgr{ - key: privKey, - keyID: uuid.Generate(), - } - - if wids != nil { - m.setWIDs(wids) - } - - return m -} - -// setWIDs is a test helper to use Task.Identities in the MockWIDMgr for -// sharing TTLs and validating names. -func (m *MockWIDMgr) setWIDs(wids []*structs.WorkloadIdentity) { - m.wids = make(map[string]*structs.WorkloadIdentity, len(wids)) - for _, wid := range wids { - m.wids[wid.Name] = wid - } -} - -func (m *MockWIDMgr) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error) { - swids := make([]*structs.SignedWorkloadIdentity, 0, len(req)) - for _, idReq := range req { - // Set test values for default claims - claims := &structs.IdentityClaims{ - Namespace: "default", - JobID: "test", - AllocationID: idReq.AllocID, - TaskName: idReq.TaskName, - } - claims.ID = uuid.Generate() - - // If test has set workload identities. Lookup claims or reject unknown - // identity. - if m.wids != nil { - wid, ok := m.wids[idReq.IdentityName] - if !ok { - return nil, fmt.Errorf("unknown identity: %q", idReq.IdentityName) - } - - claims.Audience = slices.Clone(wid.Audience) - - if wid.TTL > 0 { - claims.Expiry = jwt.NewNumericDate(time.Now().Add(wid.TTL)) - } - } - - opts := (&jose.SignerOptions{}).WithHeader("kid", m.keyID).WithType("JWT") - sig, err := jose.NewSigner(jose.SigningKey{Algorithm: jose.EdDSA, Key: m.key}, opts) - if err != nil { - return nil, fmt.Errorf("error creating signer: %w", err) - } - token, err := jwt.Signed(sig).Claims(claims).CompactSerialize() - if err != nil { - return nil, fmt.Errorf("error signing: %w", err) - } - - swid := &structs.SignedWorkloadIdentity{ - WorkloadIdentityRequest: *idReq, - JWT: token, - Expiration: claims.Expiry.Time(), - } - - swids = append(swids, swid) - } - return swids, nil -} - // MockTokenSetter is a mock implementation of tokenSetter which is satisfied // by TaskRunner at runtime. type MockTokenSetter struct { @@ -135,7 +44,7 @@ func TestIdentityHook_RenewAll(t *testing.T) { // checking that tokens were rotated. Therefore the time must be long enough // to generate new tokens. Since no Raft or IO (outside of potentially // writing 1 token file) is performed, this should be relatively fast. - ttl := 8 * time.Second + ttl := 3 * time.Second node := mock.Node() alloc := mock.Alloc() @@ -158,28 +67,46 @@ func TestIdentityHook_RenewAll(t *testing.T) { secretsDir := t.TempDir() - widmgr := NewMockWIDMgr(task.Identities) - mockTR := &MockTokenSetter{} stopCtx, stop := context.WithCancel(context.Background()) t.Cleanup(stop) + // setup mock signer and WIDMgr + mockSigner := widmgr.NewMockWIDSigner(task.Identities) + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, testlog.HCLogger(t)) + mockWIDMgr.SetMinWait(time.Second) // fast renewals, because the default is 10s + + // do the initial signing + for _, i := range task.Identities { + _, err := mockSigner.SignIdentities(1, []*structs.WorkloadIdentityRequest{ + { + AllocID: alloc.ID, + TaskName: task.Name, + IdentityName: i.Name, + }, + }) + must.NoError(t, err) + } + h := &identityHook{ alloc: alloc, task: task, tokenDir: secretsDir, envBuilder: taskenv.NewBuilder(node, alloc, task, alloc.Job.Region), ts: mockTR, - widmgr: widmgr, - minWait: time.Second, + widmgr: mockWIDMgr, logger: testlog.HCLogger(t), stopCtx: stopCtx, stop: stop, } + // do the initial renewal and start the loop + must.NoError(t, h.widmgr.Run()) + start := time.Now() must.NoError(t, h.Prestart(context.Background(), nil, nil)) + time.Sleep(time.Second) // goroutines in the Prestart hook must run first before we Build the EnvMap env := h.envBuilder.Build().EnvMap // Assert initial tokens were set in Prestart @@ -219,7 +146,7 @@ func TestIdentityHook_RenewAll(t *testing.T) { func TestIdentityHook_RenewOne(t *testing.T) { ci.Parallel(t) - ttl := 8 * time.Second + ttl := 3 * time.Second node := mock.Node() alloc := mock.Alloc() @@ -242,28 +169,46 @@ func TestIdentityHook_RenewOne(t *testing.T) { secretsDir := t.TempDir() - widmgr := NewMockWIDMgr(task.Identities) - mockTR := &MockTokenSetter{} stopCtx, stop := context.WithCancel(context.Background()) t.Cleanup(stop) + // setup mock signer and WIDMgr + mockSigner := widmgr.NewMockWIDSigner(task.Identities) + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, testlog.HCLogger(t)) + mockWIDMgr.SetMinWait(time.Second) // fast renewals, because the default is 10s + + // do the initial signing + for _, i := range task.Identities { + _, err := mockSigner.SignIdentities(1, []*structs.WorkloadIdentityRequest{ + { + AllocID: alloc.ID, + TaskName: task.Name, + IdentityName: i.Name, + }, + }) + must.NoError(t, err) + } + h := &identityHook{ alloc: alloc, task: task, tokenDir: secretsDir, envBuilder: taskenv.NewBuilder(node, alloc, task, alloc.Job.Region), ts: mockTR, - widmgr: widmgr, - minWait: time.Second, + widmgr: mockWIDMgr, logger: testlog.HCLogger(t), stopCtx: stopCtx, stop: stop, } + // do the initial renewal and start the loop + must.NoError(t, h.widmgr.Run()) + start := time.Now() must.NoError(t, h.Prestart(context.Background(), nil, nil)) + time.Sleep(time.Second) // goroutines in the Prestart hook must run first before we Build the EnvMap env := h.envBuilder.Build().EnvMap // Assert initial tokens were set in Prestart @@ -318,8 +263,6 @@ func TestIdentityHook_ErrorWriting(t *testing.T) { tokenDir: "/this-should-not-exist", envBuilder: taskenv.NewBuilder(node, alloc, task, alloc.Job.Region), ts: &MockTokenSetter{}, - widmgr: NewMockWIDMgr(nil), - minWait: time.Second, logger: testlog.HCLogger(t), stopCtx: stopCtx, stop: stop, @@ -329,44 +272,3 @@ func TestIdentityHook_ErrorWriting(t *testing.T) { err := h.Prestart(context.Background(), nil, nil) must.ErrorContains(t, err, "failed to write nomad token") } - -// TestIdentityHook_GetIdentitiesMismatch asserts that if SignIdentities() does -// not return enough identities then Prestart fails. -func TestIdentityHook_GetIdentitiesMismatch(t *testing.T) { - ci.Parallel(t) - - alloc := mock.Alloc() - task := alloc.LookupTask("web") - task.Identities = []*structs.WorkloadIdentity{ - { - Name: "consul", - Audience: []string{"consul"}, - TTL: time.Minute, - }, - } - node := mock.Node() - stopCtx, stop := context.WithCancel(context.Background()) - t.Cleanup(stop) - - wids := []*structs.WorkloadIdentity{ - { - Name: "not-consul", - }, - } - h := &identityHook{ - alloc: alloc, - task: task, - tokenDir: t.TempDir(), - envBuilder: taskenv.NewBuilder(node, alloc, task, alloc.Job.Region), - ts: &MockTokenSetter{}, - widmgr: NewMockWIDMgr(wids), - minWait: time.Second, - logger: testlog.HCLogger(t), - stopCtx: stopCtx, - stop: stop, - } - - // Prestart should fail when trying to write the default identity file - err := h.Prestart(context.Background(), nil, nil) - must.ErrorContains(t, err, "error fetching alternate identities") -} diff --git a/client/allocrunner/taskrunner/logmon_hook.go b/client/allocrunner/taskrunner/logmon_hook.go index a3d14d25b..27bba4f25 100644 --- a/client/allocrunner/taskrunner/logmon_hook.go +++ b/client/allocrunner/taskrunner/logmon_hook.go @@ -20,8 +20,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" bstructs "github.com/hashicorp/nomad/plugins/base/structs" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" - "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -122,7 +122,7 @@ func (h *logmonHook) Prestart(ctx context.Context, attempts := 0 for { err := h.prestartOneLoop(ctx, req) - if err == bstructs.ErrPluginShutdown || grpc.Code(err) == codes.Unavailable { + if err == bstructs.ErrPluginShutdown || status.Code(err) == codes.Unavailable { h.logger.Warn("logmon shutdown while making request", "error", err) if attempts > 3 { diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 53c3bf393..9cf953c03 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -34,6 +34,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/client/widmgr" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" @@ -267,8 +268,8 @@ type TaskRunner struct { // system features like cgroups wranglers cinterfaces.ProcessWranglers - // widmgr fetches workload identities - widmgr IdentitySigner + // widmgr manages workload identities + widmgr widmgr.IdentityManager } type Config struct { @@ -342,8 +343,8 @@ type Config struct { // allocrunner hooks AllocHookResources *cstructs.AllocHookResources - // WIDMgr fetches workload identities - WIDMgr IdentitySigner + // WIDMgr manages workload identities + WIDMgr widmgr.IdentityManager } func NewTaskRunner(config *Config) (*TaskRunner, error) { diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index b32a82741..d3a3aa679 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -30,6 +30,7 @@ import ( cstate "github.com/hashicorp/nomad/client/state" ctestutil "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/client/widmgr" agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" "github.com/hashicorp/nomad/drivers/rawexec" @@ -116,6 +117,9 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri nomadRegMock := regMock.NewServiceRegistrationHandler(logger) wrapperMock := wrapper.NewHandlerWrapper(logger, consulRegMock, nomadRegMock) + task := alloc.LookupTask(taskName) + widsigner := widmgr.NewMockWIDSigner(task.Identities) + var vaultFunc vaultclient.VaultClientFunc if vault != nil { vaultFunc = func(_ string) (vaultclient.VaultClient, error) { return vault, nil } @@ -141,7 +145,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri ServiceRegWrapper: wrapperMock, Getter: getter.TestSandbox(t), Wranglers: proclib.MockWranglers(t), - WIDMgr: NewMockWIDMgr(nil), + WIDMgr: widmgr.NewWIDMgr(widsigner, alloc, logger), } return conf, trCleanup @@ -153,6 +157,13 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string) (*TaskRunner, *Config, func()) { config, cleanup := testTaskRunnerConfig(t, alloc, taskName, nil) + // This is usually handled by the identity hook in the alloc runner, so it + // must be called manually when testing a task runner in isolation. + if config.WIDMgr != nil { + err := config.WIDMgr.Run() + must.NoError(t, err) + } + tr, err := NewTaskRunner(config) require.NoError(t, err) go tr.Run() diff --git a/client/client.go b/client/client.go index 9bba11c58..a4eb7b82f 100644 --- a/client/client.go +++ b/client/client.go @@ -335,8 +335,8 @@ type Client struct { // partitions is used for managing cpuset partitioning on linux systems partitions cgroupslib.Partition - // widmgr retrieves workload identities - widmgr *widmgr.WIDMgr + // widsigner signs workload identities + widsigner widmgr.IdentitySigner } var ( @@ -445,8 +445,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie return nil, fmt.Errorf("node setup failed: %v", err) } - // Add workload identity manager after node secret has been generated/loaded - c.widmgr = widmgr.New(widmgr.Config{ + // Add workload identity signer after node secret has been generated/loaded + c.widsigner = widmgr.NewSigner(widmgr.SignerConfig{ NodeSecret: c.secretNodeID(), Region: cfg.Region, RPC: c, @@ -2762,7 +2762,7 @@ func (c *Client) newAllocRunnerConfig( StateDB: c.stateDB, StateUpdater: c, VaultFunc: c.VaultClient, - WIDMgr: c.widmgr, + WIDSigner: c.widsigner, Wranglers: c.wranglers, Partitions: c.partitions, } diff --git a/client/config/arconfig.go b/client/config/arconfig.go index d47221a33..3b5603aa7 100644 --- a/client/config/arconfig.go +++ b/client/config/arconfig.go @@ -114,8 +114,11 @@ type AllocRunnerConfig struct { // Partitions is an interface for managing cpuset partitions. Partitions interfaces.CPUPartitions - // WIDMgr fetches workload identities - WIDMgr *widmgr.WIDMgr + // WIDSigner fetches workload identities + WIDSigner widmgr.IdentitySigner + + // WIDMgr manages workload identities + WIDMgr widmgr.IdentityManager } // PrevAllocWatcher allows AllocRunners to wait for a previous allocation to diff --git a/client/structs/allochook.go b/client/structs/allochook.go index 010b61a17..8a3655f27 100644 --- a/client/structs/allochook.go +++ b/client/structs/allochook.go @@ -10,6 +10,11 @@ import ( "github.com/hashicorp/nomad/helper" ) +type TaskIdentity struct { + TaskName string + IdentityName string +} + // AllocHookResources contains data that is provided by AllocRunner Hooks for // consumption by TaskRunners. This should be instantiated once in the // AllocRunner and then only accessed via getters and setters that hold the diff --git a/client/widmgr/mock.go b/client/widmgr/mock.go new file mode 100644 index 000000000..4ff47061e --- /dev/null +++ b/client/widmgr/mock.go @@ -0,0 +1,93 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package widmgr + +import ( + "crypto/ed25519" + "fmt" + "slices" + "time" + + "github.com/go-jose/go-jose/v3" + "github.com/go-jose/go-jose/v3/jwt" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" +) + +// MockWIDSigner allows TaskRunner unit tests to avoid having to setup a Server, +// Client, and Allocation. +type MockWIDSigner struct { + // wids maps identity names to workload identities. If wids is non-nil then + // SignIdentities will use it to find expirations or reject invalid identity + // names + wids map[string]*structs.WorkloadIdentity + key ed25519.PrivateKey + keyID string +} + +func NewMockWIDSigner(wids []*structs.WorkloadIdentity) *MockWIDSigner { + _, privKey, err := ed25519.GenerateKey(nil) + if err != nil { + panic(err) + } + m := &MockWIDSigner{ + key: privKey, + keyID: uuid.Generate(), + } + if wids != nil { + m.setWIDs(wids) + } + return m +} + +// setWIDs is a test helper to use Task.Identities in the MockWIDSigner for +// sharing TTLs and validating names. +func (m *MockWIDSigner) setWIDs(wids []*structs.WorkloadIdentity) { + m.wids = make(map[string]*structs.WorkloadIdentity, len(wids)) + for _, wid := range wids { + m.wids[wid.Name] = wid + } +} +func (m *MockWIDSigner) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error) { + swids := make([]*structs.SignedWorkloadIdentity, 0, len(req)) + for _, idReq := range req { + // Set test values for default claims + claims := &structs.IdentityClaims{ + Namespace: "default", + JobID: "test", + AllocationID: idReq.AllocID, + TaskName: idReq.TaskName, + } + claims.ID = uuid.Generate() + // If test has set workload identities. Lookup claims or reject unknown + // identity. + if m.wids != nil { + wid, ok := m.wids[idReq.IdentityName] + if !ok { + return nil, fmt.Errorf("unknown identity: %q", idReq.IdentityName) + } + claims.Audience = slices.Clone(wid.Audience) + if wid.TTL > 0 { + claims.Expiry = jwt.NewNumericDate(time.Now().Add(wid.TTL)) + } + } + opts := (&jose.SignerOptions{}).WithHeader("kid", m.keyID).WithType("JWT") + sig, err := jose.NewSigner(jose.SigningKey{Algorithm: jose.EdDSA, Key: m.key}, opts) + if err != nil { + return nil, fmt.Errorf("error creating signer: %w", err) + } + token, err := jwt.Signed(sig).Claims(claims).CompactSerialize() + if err != nil { + return nil, fmt.Errorf("error signing: %w", err) + } + swid := &structs.SignedWorkloadIdentity{ + WorkloadIdentityRequest: *idReq, + JWT: token, + Expiration: claims.Expiry.Time(), + } + swids = append(swids, swid) + } + return swids, nil +} diff --git a/client/widmgr/signer.go b/client/widmgr/signer.go new file mode 100644 index 000000000..2102d5324 --- /dev/null +++ b/client/widmgr/signer.go @@ -0,0 +1,110 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package widmgr + +import ( + "fmt" + + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/structs" +) + +type RPCer interface { + RPC(method string, args any, reply any) error +} + +// IdentitySigner is the interface needed to retrieve signed identities for +// workload identities. At runtime it is implemented by *widmgr.Signer. +type IdentitySigner interface { + SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error) +} + +// SignerConfig wraps the configuration parameters the workload identity manager +// needs. +type SignerConfig struct { + // NodeSecret is the node's secret token + NodeSecret string + + // Region of the node + Region string + + RPC RPCer +} + +// Signer fetches and validates workload identities. +type Signer struct { + nodeSecret string + region string + rpc RPCer +} + +// NewSigner workload identity manager. +func NewSigner(c SignerConfig) *Signer { + return &Signer{ + nodeSecret: c.NodeSecret, + region: c.Region, + rpc: c.RPC, + } +} + +// SignIdentities wraps the Alloc.SignIdentities RPC and retrieves signed +// workload identities. The minIndex should be set to the lowest allocation +// CreateIndex to ensure that the server handling the request isn't so stale +// that it doesn't know the allocation exist (and therefore rejects the signing +// requests). +// +// Since a single rejection causes an error to be returned, SignIdentities +// should currently only be used when requesting signed identities for a single +// allocation. +func (s *Signer) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error) { + if len(req) == 0 { + return nil, fmt.Errorf("no identities to sign") + } + + args := structs.AllocIdentitiesRequest{ + Identities: req, + QueryOptions: structs.QueryOptions{ + Region: s.region, + + // Unlike other RPCs, this one doesn't care about "subsequent + // modifications" after an index. We only want to ensure the state + // isn't too stale to know about this alloc, so we instruct the + // Server to block at least until the Allocation is created. + MinQueryIndex: minIndex - 1, + AllowStale: true, + AuthToken: s.nodeSecret, + }, + } + reply := structs.AllocIdentitiesResponse{} + if err := s.rpc.RPC("Alloc.SignIdentities", &args, &reply); err != nil { + return nil, err + } + + if n := len(reply.Rejections); n == 1 { + return nil, fmt.Errorf( + "%d/%d signing request was rejected: %v", + n, len(req), reply.Rejections[0].Reason, + ) + } else if n > 1 { + var mErr *multierror.Error + for _, r := range reply.Rejections { + mErr = multierror.Append( + fmt.Errorf( + "%d/%d signing request was rejected: %v", + n, len(req), r.Reason, + )) + } + return nil, mErr + } + + if len(reply.SignedIdentities) == 0 { + return nil, fmt.Errorf("empty signed identity response") + } + + if exp, act := len(reply.SignedIdentities), len(req); exp != act { + return nil, fmt.Errorf("expected %d signed identities but received %d", exp, act) + } + + return reply.SignedIdentities, nil +} diff --git a/client/widmgr/widmgr.go b/client/widmgr/widmgr.go index eb17adc50..758dd57a1 100644 --- a/client/widmgr/widmgr.go +++ b/client/widmgr/widmgr.go @@ -4,80 +4,376 @@ package widmgr import ( + "context" "fmt" + "slices" + "sync" + "time" + "github.com/hashicorp/go-hclog" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) -type RPCer interface { - RPC(method string, args any, reply any) error +// IdentityManager defines a manager responsible for signing and renewing +// signed identities. At runtime it is implemented by *widmgr.WIDMgr. +type IdentityManager interface { + Run() error + Get(cstructs.TaskIdentity) (*structs.SignedWorkloadIdentity, error) + Watch(cstructs.TaskIdentity) (<-chan *structs.SignedWorkloadIdentity, func()) + Shutdown() } -// Config wraps the configuration parameters the workload identity manager -// needs. -type Config struct { - // NodeSecret is the node's secret token - NodeSecret string - - // Region of the node - Region string - - RPC RPCer -} - -// WIDMgr fetches and validates workload identities. type WIDMgr struct { - nodeSecret string - region string - rpc RPCer + allocID string + minIndex uint64 + widSpecs map[string][]*structs.WorkloadIdentity // task -> WI + signer IdentitySigner + + // lastToken are the last retrieved signed workload identifiers keyed by + // TaskIdentity + lastToken map[cstructs.TaskIdentity]*structs.SignedWorkloadIdentity + lastTokenLock sync.RWMutex + + // watchers is a map of task identities to slices of channels (each identity + // can have multiple watchers) + watchers map[cstructs.TaskIdentity][]chan *structs.SignedWorkloadIdentity + watchersLock sync.Mutex + + // minWait is the minimum amount of time to wait before renewing. Settable to + // ease testing. + minWait time.Duration + + stopCtx context.Context + stop context.CancelFunc + + logger hclog.Logger } -// New workload identity manager. -func New(c Config) *WIDMgr { +func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, logger hclog.Logger) *WIDMgr { + widspecs := map[string][]*structs.WorkloadIdentity{} + tg := a.Job.LookupTaskGroup(a.TaskGroup) + for _, task := range tg.Tasks { + // Omit default identity as it does not expire + if len(task.Identities) > 0 { + widspecs[task.Name] = helper.CopySlice(task.Identities) + } + } + + // Create a context for the renew loop. This context will be canceled when + // the allocation is stopped or agent is shutting down + stopCtx, stop := context.WithCancel(context.Background()) + return &WIDMgr{ - nodeSecret: c.NodeSecret, - region: c.Region, - rpc: c.RPC, + allocID: a.ID, + minIndex: a.CreateIndex, + widSpecs: widspecs, + signer: signer, + minWait: 10 * time.Second, + lastToken: map[cstructs.TaskIdentity]*structs.SignedWorkloadIdentity{}, + watchers: map[cstructs.TaskIdentity][]chan *structs.SignedWorkloadIdentity{}, + stopCtx: stopCtx, + stop: stop, + logger: logger.Named("widmgr"), } } -// SignIdentities wraps the Alloc.SignIdentities RPC and retrieves signed -// workload identities. The minIndex should be set to the lowest allocation -// CreateIndex to ensure that the server handling the request isn't so stale -// that it doesn't know the allocation exist (and therefore rejects the signing -// requests). +// SetMinWait sets the minimum time for renewals +func (m *WIDMgr) SetMinWait(t time.Duration) { + m.minWait = t +} + +// Run blocks until identities are initially signed and then renews them in a +// goroutine. The goroutine is stopped when WIDMgr.Shutdown is called. // -// Since a single rejection causes an error to be returned, SignIdentities -// should currently only be used when requesting signed identities for a single -// allocation. -func (m *WIDMgr) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error) { - args := structs.AllocIdentitiesRequest{ - Identities: req, - QueryOptions: structs.QueryOptions{ - Region: m.region, - MinQueryIndex: minIndex - 1, - AllowStale: true, - AuthToken: m.nodeSecret, - }, - } - reply := structs.AllocIdentitiesResponse{} - if err := m.rpc.RPC("Alloc.SignIdentities", &args, &reply); err != nil { - return nil, err +// If an error is returned the identities could not be fetched and the renewal +// goroutine was not started. +func (m *WIDMgr) Run() error { + if len(m.widSpecs) == 0 { + m.logger.Debug("no workload identities to retrieve or renew") + return nil } - if n := len(reply.Rejections); n == 1 { - return nil, fmt.Errorf("%d/%d signing request was rejected", n, len(req)) - } else if n > 1 { - return nil, fmt.Errorf("%d/%d signing requests were rejected", n, len(req)) + m.logger.Debug("retrieving and renewing workload identities", "num_identities", len(m.widSpecs)) + + if err := m.getIdentities(); err != nil { + return fmt.Errorf("failed to fetch signed identities: %w", err) } - if len(reply.SignedIdentities) == 0 { - return nil, fmt.Errorf("empty signed identity response") - } + go m.renew() - if exp, act := len(reply.SignedIdentities), len(req); exp != act { - return nil, fmt.Errorf("expected %d signed identities but received %d", exp, act) - } - - return reply.SignedIdentities, nil + return nil +} + +// Get retrieves the latest signed identity or returns an error. It must be +// called after Run and does not block. +// +// For retrieving tokens which might be renewed callers should use Watch +// instead to avoid missing new tokens retrieved by Run between Get and Watch +// calls. +func (m *WIDMgr) Get(id cstructs.TaskIdentity) (*structs.SignedWorkloadIdentity, error) { + token := m.get(id) + if token == nil { + // This is an error as every identity should have a token by the time Get + // is called. + return nil, fmt.Errorf("unable to find token for task %q and identity %q", id.TaskName, id.IdentityName) + } + + return token, nil +} + +func (m *WIDMgr) get(id cstructs.TaskIdentity) *structs.SignedWorkloadIdentity { + m.lastTokenLock.RLock() + defer m.lastTokenLock.RUnlock() + + return m.lastToken[id] +} + +// Watch returns a channel that sends new signed identities until it is closed +// due to shutdown. Must be called after Run. +// +// The caller must call the returned func to stop watching and ensure the +// watched id actually exists, otherwise the channel never returns a result. +func (m *WIDMgr) Watch(id cstructs.TaskIdentity) (<-chan *structs.SignedWorkloadIdentity, func()) { + // If Shutdown has been called return a closed chan + if m.stopCtx.Err() != nil { + c := make(chan *structs.SignedWorkloadIdentity) + close(c) + return c, func() {} + } + + m.watchersLock.Lock() + defer m.watchersLock.Unlock() + + // Buffer of 1 so sends don't block on receives + c := make(chan *structs.SignedWorkloadIdentity, 1) + m.watchers[id] = make([]chan *structs.SignedWorkloadIdentity, 0) + m.watchers[id] = append(m.watchers[id], c) + + // Create a cancel func for watchers to deregister when they exit. + cancel := func() { + m.watchersLock.Lock() + defer m.watchersLock.Unlock() + + m.watchers[id] = slices.DeleteFunc( + m.watchers[id], + func(ch chan *structs.SignedWorkloadIdentity) bool { return ch == c }, + ) + } + + // Prime chan with latest token to avoid a race condition where consumers + // could miss a token update between Get and Watch calls. + if token := m.get(id); token != nil { + c <- token + } + + return c, cancel +} + +// Shutdown stops renewal and closes all watch chans. +func (m *WIDMgr) Shutdown() { + m.watchersLock.Lock() + defer m.watchersLock.Unlock() + + m.stop() + + for _, w := range m.watchers { + for _, c := range w { + close(c) + } + } +} + +// getIdentities fetches all signed identities or returns an error. +func (m *WIDMgr) getIdentities() error { + if len(m.widSpecs) == 0 { + return nil + } + + m.lastTokenLock.Lock() + defer m.lastTokenLock.Unlock() + + reqs := make([]*structs.WorkloadIdentityRequest, 0, len(m.widSpecs)) + for taskName, widspecs := range m.widSpecs { + for _, widspec := range widspecs { + reqs = append(reqs, &structs.WorkloadIdentityRequest{ + AllocID: m.allocID, + TaskName: taskName, + IdentityName: widspec.Name, + }) + } + } + + // Get signed workload identities + signedWIDs, err := m.signer.SignIdentities(m.minIndex, reqs) + if err != nil { + return err + } + + // Index initial workload identities by name + m.lastToken = make(map[cstructs.TaskIdentity]*structs.SignedWorkloadIdentity, len(signedWIDs)) + for _, swid := range signedWIDs { + id := cstructs.TaskIdentity{ + TaskName: swid.TaskName, + IdentityName: swid.IdentityName, + } + + m.lastToken[id] = swid + } + // TODO: Persist signed identity token to client state + return nil +} + +// renew fetches new signed workload identity tokens before the existing tokens +// expire. +func (m *WIDMgr) renew() { + if len(m.widSpecs) == 0 { + return + } + + reqs := make([]*structs.WorkloadIdentityRequest, 0, len(m.widSpecs)) + for taskName, widspecs := range m.widSpecs { + for _, widspec := range widspecs { + if widspec.TTL == 0 { + continue + } + reqs = append(reqs, &structs.WorkloadIdentityRequest{ + AllocID: m.allocID, + TaskName: taskName, + IdentityName: widspec.Name, + }) + } + } + + if len(reqs) == 0 { + m.logger.Trace("no workload identities expire") + return + } + + renewNow := false + minExp := time.Time{} + + for taskName, wids := range m.widSpecs { + for _, wid := range wids { + if wid.TTL == 0 { + // No ttl, so no need to renew it + continue + } + + //FIXME make this less ugly + token := m.get(cstructs.TaskIdentity{ + TaskName: taskName, + IdentityName: wid.Name, + }) + if token == nil { + // Missing a signature, treat this case as already expired so + // we get a token ASAP + m.logger.Debug("missing token for identity", "identity", wid.Name) + renewNow = true + continue + } + + if minExp.IsZero() || token.Expiration.Before(minExp) { + minExp = token.Expiration + } + } + } + + var wait time.Duration + if !renewNow { + wait = helper.ExpiryToRenewTime(minExp, time.Now, m.minWait) + } + + timer, timerStop := helper.NewStoppedTimer() + defer timerStop() + + var retry uint64 + + for { + // we need to handle stopCtx.Err() and manually stop the subscribers + if err := m.stopCtx.Err(); err != nil { + // close watchers and shutdown + m.Shutdown() + return + } + + m.logger.Debug("waiting to renew identities", "num", len(reqs), "wait", wait) + timer.Reset(wait) + select { + case <-timer.C: + m.logger.Trace("getting new signed identities", "num", len(reqs)) + case <-m.stopCtx.Done(): + // close watchers and shutdown + m.Shutdown() + return + } + + // Renew all tokens together since its cheap + // FIXME this will have to be revisited once we support identity change modes + tokens, err := m.signer.SignIdentities(m.minIndex, reqs) + if err != nil { + retry++ + wait = helper.Backoff(m.minWait, time.Hour, retry) + helper.RandomStagger(m.minWait) + m.logger.Error("error renewing workload identities", "error", err, "next", wait) + continue + } + + if len(tokens) == 0 { + retry++ + wait = helper.Backoff(m.minWait, time.Hour, retry) + helper.RandomStagger(m.minWait) + m.logger.Error("error renewing workload identities", "error", "no tokens", "next", wait) + continue + } + + // Reset next expiration time + minExp = time.Time{} + + for _, token := range tokens { + id := cstructs.TaskIdentity{ + TaskName: token.TaskName, + IdentityName: token.IdentityName, + } + + // Set for getters + m.lastTokenLock.Lock() + m.lastToken[id] = token + m.lastTokenLock.Unlock() + + // Send to watchers + m.watchersLock.Lock() + m.send(id, token) + m.watchersLock.Unlock() + + // Set next expiration time + if minExp.IsZero() || token.Expiration.Before(minExp) { + minExp = token.Expiration + } + } + + // Success! Set next renewal and reset retries + wait = helper.ExpiryToRenewTime(minExp, time.Now, m.minWait) + retry = 0 + } +} + +// send must be called while holding the m.watchersLock +func (m *WIDMgr) send(id cstructs.TaskIdentity, token *structs.SignedWorkloadIdentity) { + w, ok := m.watchers[id] + if !ok { + // No watchers + return + } + + for _, c := range w { + // Pop any unreceived tokens + select { + case <-c: + default: + } + + // Send new token, should never block since this is the only sender and + // watchersLock is held + c <- token + } } diff --git a/client/widmgr/widmgr_test.go b/client/widmgr/widmgr_test.go index 3e9592bca..83c0aa88e 100644 --- a/client/widmgr/widmgr_test.go +++ b/client/widmgr/widmgr_test.go @@ -29,14 +29,14 @@ func TestWIDMgr(t *testing.T) { }) t.Cleanup(ta.Shutdown) - mgr := widmgr.New(widmgr.Config{ + mgr := widmgr.NewSigner(widmgr.SignerConfig{ NodeSecret: uuid.Generate(), // not checked when ACLs disabled Region: "global", RPC: ta, }) _, err := mgr.SignIdentities(1, nil) - must.ErrorContains(t, err, "no identities requested") + must.ErrorContains(t, err, "no identities to sign") _, err = mgr.SignIdentities(1, []*structs.WorkloadIdentityRequest{ { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 77247f950..b54cc5c17 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -11337,6 +11337,10 @@ func NewIdentityClaims(job *Job, alloc *Allocation, taskName string, wid *Worklo return nil } + if wid == nil { + return nil + } + jwtnow := jwt.NewNumericDate(now.UTC()) claims := &IdentityClaims{ Namespace: alloc.Namespace,