mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
client: split identity_hook across allocrunner and taskrunner (#18431)
This commit splits identity_hook between the allocrunner and taskrunner. The allocrunner-level part of the hook signs each task identity, and the taskrunner-level part picks it up and stores secrets for each task. The code revamps the WIDMgr, which is now split into 2 interfaces: IdentityManager which manages renewals of signatures and handles sending updates to subscribers via Watch method, and IdentitySigner which only does the signing. This work is necessary for having a unified Consul login workflow that comes with the new Consul integration. A new, allocrunner-level consul_hook will now be the only hook doing Consul authentication.
This commit is contained in:
committed by
GitHub
parent
cf8dde0850
commit
86d2cdcf80
@@ -206,8 +206,11 @@ type allocRunner struct {
|
||||
// partitions is an interface for managing cpuset partitions
|
||||
partitions cinterfaces.CPUPartitions
|
||||
|
||||
// widmgr fetches workload identities
|
||||
widmgr *widmgr.WIDMgr
|
||||
// widsigner signs workload identities
|
||||
widsigner widmgr.IdentitySigner
|
||||
|
||||
// widmgr manages workload identity signatures
|
||||
widmgr widmgr.IdentityManager
|
||||
}
|
||||
|
||||
// NewAllocRunner returns a new allocation runner.
|
||||
@@ -251,7 +254,7 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e
|
||||
wranglers: config.Wranglers,
|
||||
partitions: config.Partitions,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
widmgr: config.WIDMgr,
|
||||
widsigner: config.WIDSigner,
|
||||
}
|
||||
|
||||
// Create the logger based on the allocation ID
|
||||
@@ -269,6 +272,10 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e
|
||||
ar.shutdownDelayCtx = shutdownDelayCtx
|
||||
ar.shutdownDelayCancelFn = shutdownDelayCancel
|
||||
|
||||
// initialize the workload identity manager
|
||||
widmgr := widmgr.NewWIDMgr(ar.widsigner, alloc, ar.logger)
|
||||
ar.widmgr = widmgr
|
||||
|
||||
// Initialize the runners hooks.
|
||||
if err := ar.initRunnerHooks(config.ClientConfig); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -118,6 +118,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
||||
// directory path exists for other hooks.
|
||||
alloc := ar.Alloc()
|
||||
ar.runnerHooks = []interfaces.RunnerHook{
|
||||
newIdentityHook(hookLogger, ar.widmgr),
|
||||
newAllocDirHook(hookLogger, ar.allocDir),
|
||||
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
|
||||
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
|
||||
|
||||
49
client/allocrunner/identity_hook.go
Normal file
49
client/allocrunner/identity_hook.go
Normal file
@@ -0,0 +1,49 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/widmgr"
|
||||
)
|
||||
|
||||
type identityHook struct {
|
||||
widmgr widmgr.IdentityManager
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func newIdentityHook(logger log.Logger, widmgr widmgr.IdentityManager) *identityHook {
|
||||
h := &identityHook{
|
||||
widmgr: widmgr,
|
||||
}
|
||||
h.logger = logger.Named(h.Name())
|
||||
return h
|
||||
}
|
||||
|
||||
func (*identityHook) Name() string {
|
||||
return "identity"
|
||||
}
|
||||
|
||||
func (h *identityHook) Prerun() error {
|
||||
// run the renewal
|
||||
if err := h.widmgr.Run(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements interfaces.TaskStopHook
|
||||
func (h *identityHook) Stop(context.Context, *interfaces.TaskStopRequest, *interfaces.TaskStopResponse) error {
|
||||
h.widmgr.Shutdown()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown implements interfaces.ShutdownHook
|
||||
func (h *identityHook) Shutdown() {
|
||||
h.widmgr.Shutdown()
|
||||
}
|
||||
87
client/allocrunner/identity_hook_test.go
Normal file
87
client/allocrunner/identity_hook_test.go
Normal file
@@ -0,0 +1,87 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/widmgr"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
// statically assert network hook implements the expected interfaces
|
||||
var _ interfaces.RunnerPrerunHook = (*identityHook)(nil)
|
||||
var _ interfaces.ShutdownHook = (*identityHook)(nil)
|
||||
var _ interfaces.TaskStopHook = (*identityHook)(nil)
|
||||
|
||||
func TestIdentityHook_Prerun(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
ttl := 30 * time.Second
|
||||
|
||||
wid := &structs.WorkloadIdentity{
|
||||
Name: "testing",
|
||||
Audience: []string{"consul.io"},
|
||||
Env: true,
|
||||
File: true,
|
||||
TTL: ttl,
|
||||
}
|
||||
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.LookupTask("web")
|
||||
task.Identity = wid
|
||||
task.Identities = []*structs.WorkloadIdentity{wid}
|
||||
|
||||
allocrunner, stopAR := TestAllocRunnerFromAlloc(t, alloc)
|
||||
defer stopAR()
|
||||
|
||||
logger := testlog.HCLogger(t)
|
||||
|
||||
// setup mock signer and WIDMgr
|
||||
mockSigner := widmgr.NewMockWIDSigner(task.Identities)
|
||||
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, logger)
|
||||
allocrunner.widmgr = mockWIDMgr
|
||||
allocrunner.widsigner = mockSigner
|
||||
|
||||
// do the initial signing
|
||||
_, err := mockSigner.SignIdentities(1, []*structs.WorkloadIdentityRequest{
|
||||
{
|
||||
AllocID: alloc.ID,
|
||||
TaskName: task.Name,
|
||||
IdentityName: task.Identities[0].Name,
|
||||
},
|
||||
})
|
||||
must.NoError(t, err)
|
||||
|
||||
start := time.Now()
|
||||
hook := newIdentityHook(logger, mockWIDMgr)
|
||||
must.Eq(t, hook.Name(), "identity")
|
||||
must.NoError(t, hook.Prerun())
|
||||
|
||||
time.Sleep(time.Second) // give goroutines a moment to run
|
||||
sid, err := hook.widmgr.Get(cstructs.TaskIdentity{
|
||||
TaskName: task.Name,
|
||||
IdentityName: task.Identities[0].Name},
|
||||
)
|
||||
must.Nil(t, err)
|
||||
must.Eq(t, sid.IdentityName, task.Identity.Name)
|
||||
must.NotEq(t, sid.JWT, "")
|
||||
|
||||
// pad expiry time with a second to be safe
|
||||
must.Between(t,
|
||||
start.Add(ttl).Add(-1*time.Second).Unix(),
|
||||
sid.Expiration.Unix(),
|
||||
start.Add(ttl).Add(1*time.Second).Unix(),
|
||||
)
|
||||
|
||||
must.NoError(t, hook.Stop(context.Background(), nil, nil))
|
||||
}
|
||||
@@ -7,13 +7,13 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/client/widmgr"
|
||||
"github.com/hashicorp/nomad/helper/users"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
@@ -27,12 +27,6 @@ const (
|
||||
wiTokenFile = "nomad_token"
|
||||
)
|
||||
|
||||
// IdentitySigner is the interface needed to retrieve signed identities for
|
||||
// workload identities. At runtime it is implemented by *widmgr.WIDMgr.
|
||||
type IdentitySigner interface {
|
||||
SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error)
|
||||
}
|
||||
|
||||
// tokenSetter provides methods for exposing workload identities to other
|
||||
// internal Nomad components.
|
||||
type tokenSetter interface {
|
||||
@@ -45,23 +39,15 @@ type identityHook struct {
|
||||
tokenDir string
|
||||
envBuilder *taskenv.Builder
|
||||
ts tokenSetter
|
||||
widmgr IdentitySigner
|
||||
widmgr widmgr.IdentityManager
|
||||
logger log.Logger
|
||||
|
||||
// minWait is the minimum amount of time to wait before renewing. Settable to
|
||||
// ease testing.
|
||||
minWait time.Duration
|
||||
|
||||
stopCtx context.Context
|
||||
stop context.CancelFunc
|
||||
}
|
||||
|
||||
func newIdentityHook(tr *TaskRunner, logger log.Logger) *identityHook {
|
||||
// Create a context for the renew loop. This context will be canceled when
|
||||
// the task is stopped or agent is shutting down, unlike Prestart's ctx which
|
||||
// is not intended for use after Prestart is returns.
|
||||
stopCtx, stop := context.WithCancel(context.Background())
|
||||
|
||||
h := &identityHook{
|
||||
alloc: tr.Alloc(),
|
||||
task: tr.Task(),
|
||||
@@ -69,7 +55,6 @@ func newIdentityHook(tr *TaskRunner, logger log.Logger) *identityHook {
|
||||
envBuilder: tr.envBuilder,
|
||||
ts: tr,
|
||||
widmgr: tr.widmgr,
|
||||
minWait: 10 * time.Second,
|
||||
stopCtx: stopCtx,
|
||||
stop: stop,
|
||||
}
|
||||
@@ -88,39 +73,43 @@ func (h *identityHook) Prestart(context.Context, *interfaces.TaskPrestartRequest
|
||||
return err
|
||||
}
|
||||
|
||||
signedWIDs, err := h.getIdentities()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error fetching alternate identities: %w", err)
|
||||
}
|
||||
|
||||
// Start token watcher loops
|
||||
for _, widspec := range h.task.Identities {
|
||||
signedWID := signedWIDs[widspec.Name]
|
||||
if signedWID == nil {
|
||||
// The only way to hit this should be a bug as it indicates the server
|
||||
// did not sign an identity for a task on this alloc.
|
||||
return fmt.Errorf("missing workload identity %q", widspec.Name)
|
||||
}
|
||||
|
||||
if err := h.setAltToken(widspec, signedWID.JWT); err != nil {
|
||||
return err
|
||||
}
|
||||
w := widspec
|
||||
go h.watchIdentity(w)
|
||||
}
|
||||
|
||||
// Start token renewal loop
|
||||
go h.renew(h.alloc.CreateIndex, signedWIDs)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements interfaces.TaskStopHook
|
||||
func (h *identityHook) Stop(context.Context, *interfaces.TaskStopRequest, *interfaces.TaskStopResponse) error {
|
||||
h.stop()
|
||||
return nil
|
||||
}
|
||||
func (h *identityHook) watchIdentity(wid *structs.WorkloadIdentity) {
|
||||
id := cstructs.TaskIdentity{TaskName: h.task.Name, IdentityName: wid.Name}
|
||||
signedIdentitiesChan, stopWatching := h.widmgr.Watch(id)
|
||||
defer stopWatching()
|
||||
|
||||
// Shutdown implements interfaces.ShutdownHook
|
||||
func (h *identityHook) Shutdown() {
|
||||
h.stop()
|
||||
for {
|
||||
select {
|
||||
case signedWID, ok := <-signedIdentitiesChan:
|
||||
h.logger.Trace("receiving renewed identity", "identity_name", wid.Name)
|
||||
if !ok {
|
||||
// Chan was closed, stop watching
|
||||
h.logger.Trace("identity watch closed", "task", h.task.Name, "identity", wid.Name)
|
||||
return
|
||||
}
|
||||
|
||||
if signedWID == nil {
|
||||
// The only way to hit this should be a bug as it indicates the server
|
||||
// did not sign an identity for a task on this alloc.
|
||||
h.logger.Error("missing workload identity %q", wid.Name)
|
||||
}
|
||||
|
||||
if err := h.setAltToken(wid, signedWID.JWT); err != nil {
|
||||
h.logger.Error(err.Error())
|
||||
}
|
||||
case <-h.stopCtx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// setDefaultToken adds the Nomad token to the task's environment and writes it to a
|
||||
@@ -163,151 +152,13 @@ func (h *identityHook) setAltToken(widspec *structs.WorkloadIdentity, rawJWT str
|
||||
return nil
|
||||
}
|
||||
|
||||
// getIdentities calls Alloc.SignIdentities to get all of the identities for
|
||||
// this workload signed. If there are no identities to be signed then (nil,
|
||||
// nil) is returned.
|
||||
func (h *identityHook) getIdentities() (map[string]*structs.SignedWorkloadIdentity, error) {
|
||||
|
||||
if len(h.task.Identities) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
req := make([]*structs.WorkloadIdentityRequest, len(h.task.Identities))
|
||||
for i, widspec := range h.task.Identities {
|
||||
req[i] = &structs.WorkloadIdentityRequest{
|
||||
AllocID: h.alloc.ID,
|
||||
TaskName: h.task.Name,
|
||||
IdentityName: widspec.Name,
|
||||
}
|
||||
}
|
||||
|
||||
// Get signed workload identities
|
||||
signedWIDs, err := h.widmgr.SignIdentities(h.alloc.CreateIndex, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Index initial workload identities by name
|
||||
widMap := make(map[string]*structs.SignedWorkloadIdentity, len(signedWIDs))
|
||||
for _, wid := range signedWIDs {
|
||||
widMap[wid.IdentityName] = wid
|
||||
}
|
||||
|
||||
return widMap, nil
|
||||
// Stop implements interfaces.TaskStopHook
|
||||
func (h *identityHook) Stop(context.Context, *interfaces.TaskStopRequest, *interfaces.TaskStopResponse) error {
|
||||
h.stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// renew fetches new signed workload identity tokens before the existing tokens
|
||||
// expire.
|
||||
func (h *identityHook) renew(createIndex uint64, signedWIDs map[string]*structs.SignedWorkloadIdentity) {
|
||||
wids := h.task.Identities
|
||||
if len(wids) == 0 {
|
||||
h.logger.Trace("no workload identities to renew")
|
||||
return
|
||||
}
|
||||
|
||||
var reqs []*structs.WorkloadIdentityRequest
|
||||
renewNow := false
|
||||
minExp := time.Now().Add(30 * time.Hour) // set high default expiration
|
||||
widMap := make(map[string]*structs.WorkloadIdentity, len(wids)) // Identity.Name -> Identity
|
||||
|
||||
for _, wid := range wids {
|
||||
if wid.TTL == 0 {
|
||||
// No ttl, so no need to renew it
|
||||
continue
|
||||
}
|
||||
|
||||
widMap[wid.Name] = wid
|
||||
|
||||
reqs = append(reqs, &structs.WorkloadIdentityRequest{
|
||||
AllocID: h.alloc.ID,
|
||||
TaskName: h.task.Name,
|
||||
IdentityName: wid.Name,
|
||||
})
|
||||
|
||||
sid, ok := signedWIDs[wid.Name]
|
||||
if !ok {
|
||||
// Missing a signature, treat this case as already expired so we get a
|
||||
// token ASAP
|
||||
h.logger.Trace("missing token for identity", "identity", wid.Name)
|
||||
renewNow = true
|
||||
continue
|
||||
}
|
||||
|
||||
if sid.Expiration.Before(minExp) {
|
||||
minExp = sid.Expiration
|
||||
}
|
||||
}
|
||||
|
||||
if len(reqs) == 0 {
|
||||
h.logger.Trace("no workload identities expire")
|
||||
return
|
||||
}
|
||||
|
||||
var wait time.Duration
|
||||
if !renewNow {
|
||||
wait = helper.ExpiryToRenewTime(minExp, time.Now, h.minWait)
|
||||
}
|
||||
|
||||
timer, timerStop := helper.NewStoppedTimer()
|
||||
defer timerStop()
|
||||
|
||||
var retry uint64
|
||||
|
||||
for err := h.stopCtx.Err(); err == nil; {
|
||||
h.logger.Debug("waiting to renew identities", "num", len(reqs), "wait", wait)
|
||||
timer.Reset(wait)
|
||||
select {
|
||||
case <-timer.C:
|
||||
h.logger.Trace("getting new signed identities", "num", len(reqs))
|
||||
case <-h.stopCtx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// Renew all tokens together since its cheap
|
||||
tokens, err := h.widmgr.SignIdentities(createIndex, reqs)
|
||||
if err != nil {
|
||||
retry++
|
||||
wait = helper.Backoff(h.minWait, time.Hour, retry) + helper.RandomStagger(h.minWait)
|
||||
h.logger.Error("error renewing workload identities", "error", err, "next", wait)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(tokens) == 0 {
|
||||
retry++
|
||||
wait = helper.Backoff(h.minWait, time.Hour, retry) + helper.RandomStagger(h.minWait)
|
||||
h.logger.Error("error renewing workload identities", "error", "no tokens", "next", wait)
|
||||
continue
|
||||
}
|
||||
|
||||
// Reset next expiration time
|
||||
minExp = time.Time{}
|
||||
|
||||
for _, token := range tokens {
|
||||
widspec, ok := widMap[token.IdentityName]
|
||||
if !ok {
|
||||
// Bug: Every requested workload identity should either have a signed
|
||||
// identity or rejection.
|
||||
h.logger.Warn("bug: unexpected workload identity received", "identity", token.IdentityName)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := h.setAltToken(widspec, token.JWT); err != nil {
|
||||
// Set minExp using retry's backoff logic
|
||||
minExp = time.Now().Add(helper.Backoff(h.minWait, time.Hour, retry+1) + helper.RandomStagger(h.minWait))
|
||||
h.logger.Error("error setting new workload identity", "error", err, "identity", token.IdentityName)
|
||||
continue
|
||||
}
|
||||
|
||||
// Set next expiration time
|
||||
if minExp.IsZero() {
|
||||
minExp = token.Expiration
|
||||
} else if token.Expiration.Before(minExp) {
|
||||
minExp = token.Expiration
|
||||
}
|
||||
}
|
||||
|
||||
// Success! Set next renewal and reset retries
|
||||
wait = helper.ExpiryToRenewTime(minExp, time.Now, h.minWait)
|
||||
retry = 0
|
||||
}
|
||||
// Shutdown implements interfaces.ShutdownHook
|
||||
func (h *identityHook) Shutdown() {
|
||||
h.stop()
|
||||
}
|
||||
|
||||
@@ -5,20 +5,15 @@ package taskrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-jose/go-jose/v3"
|
||||
"github.com/go-jose/go-jose/v3/jwt"
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/client/widmgr"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
@@ -31,92 +26,6 @@ var _ interfaces.ShutdownHook = (*identityHook)(nil)
|
||||
|
||||
// See task_runner_test.go:TestTaskRunner_IdentityHook
|
||||
|
||||
// MockWIDMgr allows TaskRunner unit tests to avoid having to setup a Server,
|
||||
// Client, and Allocation.
|
||||
type MockWIDMgr struct {
|
||||
// wids maps identity names to workload identities. If wids is non-nil then
|
||||
// SignIdentities will use it to find expirations or reject invalid identity
|
||||
// names
|
||||
wids map[string]*structs.WorkloadIdentity
|
||||
|
||||
key ed25519.PrivateKey
|
||||
keyID string
|
||||
}
|
||||
|
||||
func NewMockWIDMgr(wids []*structs.WorkloadIdentity) *MockWIDMgr {
|
||||
_, privKey, err := ed25519.GenerateKey(nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m := &MockWIDMgr{
|
||||
key: privKey,
|
||||
keyID: uuid.Generate(),
|
||||
}
|
||||
|
||||
if wids != nil {
|
||||
m.setWIDs(wids)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// setWIDs is a test helper to use Task.Identities in the MockWIDMgr for
|
||||
// sharing TTLs and validating names.
|
||||
func (m *MockWIDMgr) setWIDs(wids []*structs.WorkloadIdentity) {
|
||||
m.wids = make(map[string]*structs.WorkloadIdentity, len(wids))
|
||||
for _, wid := range wids {
|
||||
m.wids[wid.Name] = wid
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockWIDMgr) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error) {
|
||||
swids := make([]*structs.SignedWorkloadIdentity, 0, len(req))
|
||||
for _, idReq := range req {
|
||||
// Set test values for default claims
|
||||
claims := &structs.IdentityClaims{
|
||||
Namespace: "default",
|
||||
JobID: "test",
|
||||
AllocationID: idReq.AllocID,
|
||||
TaskName: idReq.TaskName,
|
||||
}
|
||||
claims.ID = uuid.Generate()
|
||||
|
||||
// If test has set workload identities. Lookup claims or reject unknown
|
||||
// identity.
|
||||
if m.wids != nil {
|
||||
wid, ok := m.wids[idReq.IdentityName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown identity: %q", idReq.IdentityName)
|
||||
}
|
||||
|
||||
claims.Audience = slices.Clone(wid.Audience)
|
||||
|
||||
if wid.TTL > 0 {
|
||||
claims.Expiry = jwt.NewNumericDate(time.Now().Add(wid.TTL))
|
||||
}
|
||||
}
|
||||
|
||||
opts := (&jose.SignerOptions{}).WithHeader("kid", m.keyID).WithType("JWT")
|
||||
sig, err := jose.NewSigner(jose.SigningKey{Algorithm: jose.EdDSA, Key: m.key}, opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating signer: %w", err)
|
||||
}
|
||||
token, err := jwt.Signed(sig).Claims(claims).CompactSerialize()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error signing: %w", err)
|
||||
}
|
||||
|
||||
swid := &structs.SignedWorkloadIdentity{
|
||||
WorkloadIdentityRequest: *idReq,
|
||||
JWT: token,
|
||||
Expiration: claims.Expiry.Time(),
|
||||
}
|
||||
|
||||
swids = append(swids, swid)
|
||||
}
|
||||
return swids, nil
|
||||
}
|
||||
|
||||
// MockTokenSetter is a mock implementation of tokenSetter which is satisfied
|
||||
// by TaskRunner at runtime.
|
||||
type MockTokenSetter struct {
|
||||
@@ -135,7 +44,7 @@ func TestIdentityHook_RenewAll(t *testing.T) {
|
||||
// checking that tokens were rotated. Therefore the time must be long enough
|
||||
// to generate new tokens. Since no Raft or IO (outside of potentially
|
||||
// writing 1 token file) is performed, this should be relatively fast.
|
||||
ttl := 8 * time.Second
|
||||
ttl := 3 * time.Second
|
||||
|
||||
node := mock.Node()
|
||||
alloc := mock.Alloc()
|
||||
@@ -158,28 +67,46 @@ func TestIdentityHook_RenewAll(t *testing.T) {
|
||||
|
||||
secretsDir := t.TempDir()
|
||||
|
||||
widmgr := NewMockWIDMgr(task.Identities)
|
||||
|
||||
mockTR := &MockTokenSetter{}
|
||||
|
||||
stopCtx, stop := context.WithCancel(context.Background())
|
||||
t.Cleanup(stop)
|
||||
|
||||
// setup mock signer and WIDMgr
|
||||
mockSigner := widmgr.NewMockWIDSigner(task.Identities)
|
||||
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, testlog.HCLogger(t))
|
||||
mockWIDMgr.SetMinWait(time.Second) // fast renewals, because the default is 10s
|
||||
|
||||
// do the initial signing
|
||||
for _, i := range task.Identities {
|
||||
_, err := mockSigner.SignIdentities(1, []*structs.WorkloadIdentityRequest{
|
||||
{
|
||||
AllocID: alloc.ID,
|
||||
TaskName: task.Name,
|
||||
IdentityName: i.Name,
|
||||
},
|
||||
})
|
||||
must.NoError(t, err)
|
||||
}
|
||||
|
||||
h := &identityHook{
|
||||
alloc: alloc,
|
||||
task: task,
|
||||
tokenDir: secretsDir,
|
||||
envBuilder: taskenv.NewBuilder(node, alloc, task, alloc.Job.Region),
|
||||
ts: mockTR,
|
||||
widmgr: widmgr,
|
||||
minWait: time.Second,
|
||||
widmgr: mockWIDMgr,
|
||||
logger: testlog.HCLogger(t),
|
||||
stopCtx: stopCtx,
|
||||
stop: stop,
|
||||
}
|
||||
|
||||
// do the initial renewal and start the loop
|
||||
must.NoError(t, h.widmgr.Run())
|
||||
|
||||
start := time.Now()
|
||||
must.NoError(t, h.Prestart(context.Background(), nil, nil))
|
||||
time.Sleep(time.Second) // goroutines in the Prestart hook must run first before we Build the EnvMap
|
||||
env := h.envBuilder.Build().EnvMap
|
||||
|
||||
// Assert initial tokens were set in Prestart
|
||||
@@ -219,7 +146,7 @@ func TestIdentityHook_RenewAll(t *testing.T) {
|
||||
func TestIdentityHook_RenewOne(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
ttl := 8 * time.Second
|
||||
ttl := 3 * time.Second
|
||||
|
||||
node := mock.Node()
|
||||
alloc := mock.Alloc()
|
||||
@@ -242,28 +169,46 @@ func TestIdentityHook_RenewOne(t *testing.T) {
|
||||
|
||||
secretsDir := t.TempDir()
|
||||
|
||||
widmgr := NewMockWIDMgr(task.Identities)
|
||||
|
||||
mockTR := &MockTokenSetter{}
|
||||
|
||||
stopCtx, stop := context.WithCancel(context.Background())
|
||||
t.Cleanup(stop)
|
||||
|
||||
// setup mock signer and WIDMgr
|
||||
mockSigner := widmgr.NewMockWIDSigner(task.Identities)
|
||||
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, testlog.HCLogger(t))
|
||||
mockWIDMgr.SetMinWait(time.Second) // fast renewals, because the default is 10s
|
||||
|
||||
// do the initial signing
|
||||
for _, i := range task.Identities {
|
||||
_, err := mockSigner.SignIdentities(1, []*structs.WorkloadIdentityRequest{
|
||||
{
|
||||
AllocID: alloc.ID,
|
||||
TaskName: task.Name,
|
||||
IdentityName: i.Name,
|
||||
},
|
||||
})
|
||||
must.NoError(t, err)
|
||||
}
|
||||
|
||||
h := &identityHook{
|
||||
alloc: alloc,
|
||||
task: task,
|
||||
tokenDir: secretsDir,
|
||||
envBuilder: taskenv.NewBuilder(node, alloc, task, alloc.Job.Region),
|
||||
ts: mockTR,
|
||||
widmgr: widmgr,
|
||||
minWait: time.Second,
|
||||
widmgr: mockWIDMgr,
|
||||
logger: testlog.HCLogger(t),
|
||||
stopCtx: stopCtx,
|
||||
stop: stop,
|
||||
}
|
||||
|
||||
// do the initial renewal and start the loop
|
||||
must.NoError(t, h.widmgr.Run())
|
||||
|
||||
start := time.Now()
|
||||
must.NoError(t, h.Prestart(context.Background(), nil, nil))
|
||||
time.Sleep(time.Second) // goroutines in the Prestart hook must run first before we Build the EnvMap
|
||||
env := h.envBuilder.Build().EnvMap
|
||||
|
||||
// Assert initial tokens were set in Prestart
|
||||
@@ -318,8 +263,6 @@ func TestIdentityHook_ErrorWriting(t *testing.T) {
|
||||
tokenDir: "/this-should-not-exist",
|
||||
envBuilder: taskenv.NewBuilder(node, alloc, task, alloc.Job.Region),
|
||||
ts: &MockTokenSetter{},
|
||||
widmgr: NewMockWIDMgr(nil),
|
||||
minWait: time.Second,
|
||||
logger: testlog.HCLogger(t),
|
||||
stopCtx: stopCtx,
|
||||
stop: stop,
|
||||
@@ -329,44 +272,3 @@ func TestIdentityHook_ErrorWriting(t *testing.T) {
|
||||
err := h.Prestart(context.Background(), nil, nil)
|
||||
must.ErrorContains(t, err, "failed to write nomad token")
|
||||
}
|
||||
|
||||
// TestIdentityHook_GetIdentitiesMismatch asserts that if SignIdentities() does
|
||||
// not return enough identities then Prestart fails.
|
||||
func TestIdentityHook_GetIdentitiesMismatch(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.LookupTask("web")
|
||||
task.Identities = []*structs.WorkloadIdentity{
|
||||
{
|
||||
Name: "consul",
|
||||
Audience: []string{"consul"},
|
||||
TTL: time.Minute,
|
||||
},
|
||||
}
|
||||
node := mock.Node()
|
||||
stopCtx, stop := context.WithCancel(context.Background())
|
||||
t.Cleanup(stop)
|
||||
|
||||
wids := []*structs.WorkloadIdentity{
|
||||
{
|
||||
Name: "not-consul",
|
||||
},
|
||||
}
|
||||
h := &identityHook{
|
||||
alloc: alloc,
|
||||
task: task,
|
||||
tokenDir: t.TempDir(),
|
||||
envBuilder: taskenv.NewBuilder(node, alloc, task, alloc.Job.Region),
|
||||
ts: &MockTokenSetter{},
|
||||
widmgr: NewMockWIDMgr(wids),
|
||||
minWait: time.Second,
|
||||
logger: testlog.HCLogger(t),
|
||||
stopCtx: stopCtx,
|
||||
stop: stop,
|
||||
}
|
||||
|
||||
// Prestart should fail when trying to write the default identity file
|
||||
err := h.Prestart(context.Background(), nil, nil)
|
||||
must.ErrorContains(t, err, "error fetching alternate identities")
|
||||
}
|
||||
|
||||
@@ -20,8 +20,8 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
|
||||
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -122,7 +122,7 @@ func (h *logmonHook) Prestart(ctx context.Context,
|
||||
attempts := 0
|
||||
for {
|
||||
err := h.prestartOneLoop(ctx, req)
|
||||
if err == bstructs.ErrPluginShutdown || grpc.Code(err) == codes.Unavailable {
|
||||
if err == bstructs.ErrPluginShutdown || status.Code(err) == codes.Unavailable {
|
||||
h.logger.Warn("logmon shutdown while making request", "error", err)
|
||||
|
||||
if attempts > 3 {
|
||||
|
||||
@@ -34,6 +34,7 @@ import (
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/client/widmgr"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/helper/pluginutils/hclspecutils"
|
||||
"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
|
||||
@@ -267,8 +268,8 @@ type TaskRunner struct {
|
||||
// system features like cgroups
|
||||
wranglers cinterfaces.ProcessWranglers
|
||||
|
||||
// widmgr fetches workload identities
|
||||
widmgr IdentitySigner
|
||||
// widmgr manages workload identities
|
||||
widmgr widmgr.IdentityManager
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@@ -342,8 +343,8 @@ type Config struct {
|
||||
// allocrunner hooks
|
||||
AllocHookResources *cstructs.AllocHookResources
|
||||
|
||||
// WIDMgr fetches workload identities
|
||||
WIDMgr IdentitySigner
|
||||
// WIDMgr manages workload identities
|
||||
WIDMgr widmgr.IdentityManager
|
||||
}
|
||||
|
||||
func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
cstate "github.com/hashicorp/nomad/client/state"
|
||||
ctestutil "github.com/hashicorp/nomad/client/testutil"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/client/widmgr"
|
||||
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
||||
mockdriver "github.com/hashicorp/nomad/drivers/mock"
|
||||
"github.com/hashicorp/nomad/drivers/rawexec"
|
||||
@@ -116,6 +117,9 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
|
||||
nomadRegMock := regMock.NewServiceRegistrationHandler(logger)
|
||||
wrapperMock := wrapper.NewHandlerWrapper(logger, consulRegMock, nomadRegMock)
|
||||
|
||||
task := alloc.LookupTask(taskName)
|
||||
widsigner := widmgr.NewMockWIDSigner(task.Identities)
|
||||
|
||||
var vaultFunc vaultclient.VaultClientFunc
|
||||
if vault != nil {
|
||||
vaultFunc = func(_ string) (vaultclient.VaultClient, error) { return vault, nil }
|
||||
@@ -141,7 +145,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
|
||||
ServiceRegWrapper: wrapperMock,
|
||||
Getter: getter.TestSandbox(t),
|
||||
Wranglers: proclib.MockWranglers(t),
|
||||
WIDMgr: NewMockWIDMgr(nil),
|
||||
WIDMgr: widmgr.NewWIDMgr(widsigner, alloc, logger),
|
||||
}
|
||||
|
||||
return conf, trCleanup
|
||||
@@ -153,6 +157,13 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
|
||||
func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string) (*TaskRunner, *Config, func()) {
|
||||
config, cleanup := testTaskRunnerConfig(t, alloc, taskName, nil)
|
||||
|
||||
// This is usually handled by the identity hook in the alloc runner, so it
|
||||
// must be called manually when testing a task runner in isolation.
|
||||
if config.WIDMgr != nil {
|
||||
err := config.WIDMgr.Run()
|
||||
must.NoError(t, err)
|
||||
}
|
||||
|
||||
tr, err := NewTaskRunner(config)
|
||||
require.NoError(t, err)
|
||||
go tr.Run()
|
||||
|
||||
@@ -335,8 +335,8 @@ type Client struct {
|
||||
// partitions is used for managing cpuset partitioning on linux systems
|
||||
partitions cgroupslib.Partition
|
||||
|
||||
// widmgr retrieves workload identities
|
||||
widmgr *widmgr.WIDMgr
|
||||
// widsigner signs workload identities
|
||||
widsigner widmgr.IdentitySigner
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -445,8 +445,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
|
||||
return nil, fmt.Errorf("node setup failed: %v", err)
|
||||
}
|
||||
|
||||
// Add workload identity manager after node secret has been generated/loaded
|
||||
c.widmgr = widmgr.New(widmgr.Config{
|
||||
// Add workload identity signer after node secret has been generated/loaded
|
||||
c.widsigner = widmgr.NewSigner(widmgr.SignerConfig{
|
||||
NodeSecret: c.secretNodeID(),
|
||||
Region: cfg.Region,
|
||||
RPC: c,
|
||||
@@ -2762,7 +2762,7 @@ func (c *Client) newAllocRunnerConfig(
|
||||
StateDB: c.stateDB,
|
||||
StateUpdater: c,
|
||||
VaultFunc: c.VaultClient,
|
||||
WIDMgr: c.widmgr,
|
||||
WIDSigner: c.widsigner,
|
||||
Wranglers: c.wranglers,
|
||||
Partitions: c.partitions,
|
||||
}
|
||||
|
||||
@@ -114,8 +114,11 @@ type AllocRunnerConfig struct {
|
||||
// Partitions is an interface for managing cpuset partitions.
|
||||
Partitions interfaces.CPUPartitions
|
||||
|
||||
// WIDMgr fetches workload identities
|
||||
WIDMgr *widmgr.WIDMgr
|
||||
// WIDSigner fetches workload identities
|
||||
WIDSigner widmgr.IdentitySigner
|
||||
|
||||
// WIDMgr manages workload identities
|
||||
WIDMgr widmgr.IdentityManager
|
||||
}
|
||||
|
||||
// PrevAllocWatcher allows AllocRunners to wait for a previous allocation to
|
||||
|
||||
@@ -10,6 +10,11 @@ import (
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
)
|
||||
|
||||
type TaskIdentity struct {
|
||||
TaskName string
|
||||
IdentityName string
|
||||
}
|
||||
|
||||
// AllocHookResources contains data that is provided by AllocRunner Hooks for
|
||||
// consumption by TaskRunners. This should be instantiated once in the
|
||||
// AllocRunner and then only accessed via getters and setters that hold the
|
||||
|
||||
93
client/widmgr/mock.go
Normal file
93
client/widmgr/mock.go
Normal file
@@ -0,0 +1,93 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package widmgr
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"fmt"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/go-jose/go-jose/v3"
|
||||
"github.com/go-jose/go-jose/v3/jwt"
|
||||
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// MockWIDSigner allows TaskRunner unit tests to avoid having to setup a Server,
|
||||
// Client, and Allocation.
|
||||
type MockWIDSigner struct {
|
||||
// wids maps identity names to workload identities. If wids is non-nil then
|
||||
// SignIdentities will use it to find expirations or reject invalid identity
|
||||
// names
|
||||
wids map[string]*structs.WorkloadIdentity
|
||||
key ed25519.PrivateKey
|
||||
keyID string
|
||||
}
|
||||
|
||||
func NewMockWIDSigner(wids []*structs.WorkloadIdentity) *MockWIDSigner {
|
||||
_, privKey, err := ed25519.GenerateKey(nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m := &MockWIDSigner{
|
||||
key: privKey,
|
||||
keyID: uuid.Generate(),
|
||||
}
|
||||
if wids != nil {
|
||||
m.setWIDs(wids)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// setWIDs is a test helper to use Task.Identities in the MockWIDSigner for
|
||||
// sharing TTLs and validating names.
|
||||
func (m *MockWIDSigner) setWIDs(wids []*structs.WorkloadIdentity) {
|
||||
m.wids = make(map[string]*structs.WorkloadIdentity, len(wids))
|
||||
for _, wid := range wids {
|
||||
m.wids[wid.Name] = wid
|
||||
}
|
||||
}
|
||||
func (m *MockWIDSigner) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error) {
|
||||
swids := make([]*structs.SignedWorkloadIdentity, 0, len(req))
|
||||
for _, idReq := range req {
|
||||
// Set test values for default claims
|
||||
claims := &structs.IdentityClaims{
|
||||
Namespace: "default",
|
||||
JobID: "test",
|
||||
AllocationID: idReq.AllocID,
|
||||
TaskName: idReq.TaskName,
|
||||
}
|
||||
claims.ID = uuid.Generate()
|
||||
// If test has set workload identities. Lookup claims or reject unknown
|
||||
// identity.
|
||||
if m.wids != nil {
|
||||
wid, ok := m.wids[idReq.IdentityName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown identity: %q", idReq.IdentityName)
|
||||
}
|
||||
claims.Audience = slices.Clone(wid.Audience)
|
||||
if wid.TTL > 0 {
|
||||
claims.Expiry = jwt.NewNumericDate(time.Now().Add(wid.TTL))
|
||||
}
|
||||
}
|
||||
opts := (&jose.SignerOptions{}).WithHeader("kid", m.keyID).WithType("JWT")
|
||||
sig, err := jose.NewSigner(jose.SigningKey{Algorithm: jose.EdDSA, Key: m.key}, opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating signer: %w", err)
|
||||
}
|
||||
token, err := jwt.Signed(sig).Claims(claims).CompactSerialize()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error signing: %w", err)
|
||||
}
|
||||
swid := &structs.SignedWorkloadIdentity{
|
||||
WorkloadIdentityRequest: *idReq,
|
||||
JWT: token,
|
||||
Expiration: claims.Expiry.Time(),
|
||||
}
|
||||
swids = append(swids, swid)
|
||||
}
|
||||
return swids, nil
|
||||
}
|
||||
110
client/widmgr/signer.go
Normal file
110
client/widmgr/signer.go
Normal file
@@ -0,0 +1,110 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package widmgr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
type RPCer interface {
|
||||
RPC(method string, args any, reply any) error
|
||||
}
|
||||
|
||||
// IdentitySigner is the interface needed to retrieve signed identities for
|
||||
// workload identities. At runtime it is implemented by *widmgr.Signer.
|
||||
type IdentitySigner interface {
|
||||
SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error)
|
||||
}
|
||||
|
||||
// SignerConfig wraps the configuration parameters the workload identity manager
|
||||
// needs.
|
||||
type SignerConfig struct {
|
||||
// NodeSecret is the node's secret token
|
||||
NodeSecret string
|
||||
|
||||
// Region of the node
|
||||
Region string
|
||||
|
||||
RPC RPCer
|
||||
}
|
||||
|
||||
// Signer fetches and validates workload identities.
|
||||
type Signer struct {
|
||||
nodeSecret string
|
||||
region string
|
||||
rpc RPCer
|
||||
}
|
||||
|
||||
// NewSigner workload identity manager.
|
||||
func NewSigner(c SignerConfig) *Signer {
|
||||
return &Signer{
|
||||
nodeSecret: c.NodeSecret,
|
||||
region: c.Region,
|
||||
rpc: c.RPC,
|
||||
}
|
||||
}
|
||||
|
||||
// SignIdentities wraps the Alloc.SignIdentities RPC and retrieves signed
|
||||
// workload identities. The minIndex should be set to the lowest allocation
|
||||
// CreateIndex to ensure that the server handling the request isn't so stale
|
||||
// that it doesn't know the allocation exist (and therefore rejects the signing
|
||||
// requests).
|
||||
//
|
||||
// Since a single rejection causes an error to be returned, SignIdentities
|
||||
// should currently only be used when requesting signed identities for a single
|
||||
// allocation.
|
||||
func (s *Signer) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error) {
|
||||
if len(req) == 0 {
|
||||
return nil, fmt.Errorf("no identities to sign")
|
||||
}
|
||||
|
||||
args := structs.AllocIdentitiesRequest{
|
||||
Identities: req,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: s.region,
|
||||
|
||||
// Unlike other RPCs, this one doesn't care about "subsequent
|
||||
// modifications" after an index. We only want to ensure the state
|
||||
// isn't too stale to know about this alloc, so we instruct the
|
||||
// Server to block at least until the Allocation is created.
|
||||
MinQueryIndex: minIndex - 1,
|
||||
AllowStale: true,
|
||||
AuthToken: s.nodeSecret,
|
||||
},
|
||||
}
|
||||
reply := structs.AllocIdentitiesResponse{}
|
||||
if err := s.rpc.RPC("Alloc.SignIdentities", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if n := len(reply.Rejections); n == 1 {
|
||||
return nil, fmt.Errorf(
|
||||
"%d/%d signing request was rejected: %v",
|
||||
n, len(req), reply.Rejections[0].Reason,
|
||||
)
|
||||
} else if n > 1 {
|
||||
var mErr *multierror.Error
|
||||
for _, r := range reply.Rejections {
|
||||
mErr = multierror.Append(
|
||||
fmt.Errorf(
|
||||
"%d/%d signing request was rejected: %v",
|
||||
n, len(req), r.Reason,
|
||||
))
|
||||
}
|
||||
return nil, mErr
|
||||
}
|
||||
|
||||
if len(reply.SignedIdentities) == 0 {
|
||||
return nil, fmt.Errorf("empty signed identity response")
|
||||
}
|
||||
|
||||
if exp, act := len(reply.SignedIdentities), len(req); exp != act {
|
||||
return nil, fmt.Errorf("expected %d signed identities but received %d", exp, act)
|
||||
}
|
||||
|
||||
return reply.SignedIdentities, nil
|
||||
}
|
||||
@@ -4,80 +4,376 @@
|
||||
package widmgr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
type RPCer interface {
|
||||
RPC(method string, args any, reply any) error
|
||||
// IdentityManager defines a manager responsible for signing and renewing
|
||||
// signed identities. At runtime it is implemented by *widmgr.WIDMgr.
|
||||
type IdentityManager interface {
|
||||
Run() error
|
||||
Get(cstructs.TaskIdentity) (*structs.SignedWorkloadIdentity, error)
|
||||
Watch(cstructs.TaskIdentity) (<-chan *structs.SignedWorkloadIdentity, func())
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
// Config wraps the configuration parameters the workload identity manager
|
||||
// needs.
|
||||
type Config struct {
|
||||
// NodeSecret is the node's secret token
|
||||
NodeSecret string
|
||||
|
||||
// Region of the node
|
||||
Region string
|
||||
|
||||
RPC RPCer
|
||||
}
|
||||
|
||||
// WIDMgr fetches and validates workload identities.
|
||||
type WIDMgr struct {
|
||||
nodeSecret string
|
||||
region string
|
||||
rpc RPCer
|
||||
allocID string
|
||||
minIndex uint64
|
||||
widSpecs map[string][]*structs.WorkloadIdentity // task -> WI
|
||||
signer IdentitySigner
|
||||
|
||||
// lastToken are the last retrieved signed workload identifiers keyed by
|
||||
// TaskIdentity
|
||||
lastToken map[cstructs.TaskIdentity]*structs.SignedWorkloadIdentity
|
||||
lastTokenLock sync.RWMutex
|
||||
|
||||
// watchers is a map of task identities to slices of channels (each identity
|
||||
// can have multiple watchers)
|
||||
watchers map[cstructs.TaskIdentity][]chan *structs.SignedWorkloadIdentity
|
||||
watchersLock sync.Mutex
|
||||
|
||||
// minWait is the minimum amount of time to wait before renewing. Settable to
|
||||
// ease testing.
|
||||
minWait time.Duration
|
||||
|
||||
stopCtx context.Context
|
||||
stop context.CancelFunc
|
||||
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
// New workload identity manager.
|
||||
func New(c Config) *WIDMgr {
|
||||
func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, logger hclog.Logger) *WIDMgr {
|
||||
widspecs := map[string][]*structs.WorkloadIdentity{}
|
||||
tg := a.Job.LookupTaskGroup(a.TaskGroup)
|
||||
for _, task := range tg.Tasks {
|
||||
// Omit default identity as it does not expire
|
||||
if len(task.Identities) > 0 {
|
||||
widspecs[task.Name] = helper.CopySlice(task.Identities)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a context for the renew loop. This context will be canceled when
|
||||
// the allocation is stopped or agent is shutting down
|
||||
stopCtx, stop := context.WithCancel(context.Background())
|
||||
|
||||
return &WIDMgr{
|
||||
nodeSecret: c.NodeSecret,
|
||||
region: c.Region,
|
||||
rpc: c.RPC,
|
||||
allocID: a.ID,
|
||||
minIndex: a.CreateIndex,
|
||||
widSpecs: widspecs,
|
||||
signer: signer,
|
||||
minWait: 10 * time.Second,
|
||||
lastToken: map[cstructs.TaskIdentity]*structs.SignedWorkloadIdentity{},
|
||||
watchers: map[cstructs.TaskIdentity][]chan *structs.SignedWorkloadIdentity{},
|
||||
stopCtx: stopCtx,
|
||||
stop: stop,
|
||||
logger: logger.Named("widmgr"),
|
||||
}
|
||||
}
|
||||
|
||||
// SignIdentities wraps the Alloc.SignIdentities RPC and retrieves signed
|
||||
// workload identities. The minIndex should be set to the lowest allocation
|
||||
// CreateIndex to ensure that the server handling the request isn't so stale
|
||||
// that it doesn't know the allocation exist (and therefore rejects the signing
|
||||
// requests).
|
||||
// SetMinWait sets the minimum time for renewals
|
||||
func (m *WIDMgr) SetMinWait(t time.Duration) {
|
||||
m.minWait = t
|
||||
}
|
||||
|
||||
// Run blocks until identities are initially signed and then renews them in a
|
||||
// goroutine. The goroutine is stopped when WIDMgr.Shutdown is called.
|
||||
//
|
||||
// Since a single rejection causes an error to be returned, SignIdentities
|
||||
// should currently only be used when requesting signed identities for a single
|
||||
// allocation.
|
||||
func (m *WIDMgr) SignIdentities(minIndex uint64, req []*structs.WorkloadIdentityRequest) ([]*structs.SignedWorkloadIdentity, error) {
|
||||
args := structs.AllocIdentitiesRequest{
|
||||
Identities: req,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: m.region,
|
||||
MinQueryIndex: minIndex - 1,
|
||||
AllowStale: true,
|
||||
AuthToken: m.nodeSecret,
|
||||
},
|
||||
}
|
||||
reply := structs.AllocIdentitiesResponse{}
|
||||
if err := m.rpc.RPC("Alloc.SignIdentities", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
// If an error is returned the identities could not be fetched and the renewal
|
||||
// goroutine was not started.
|
||||
func (m *WIDMgr) Run() error {
|
||||
if len(m.widSpecs) == 0 {
|
||||
m.logger.Debug("no workload identities to retrieve or renew")
|
||||
return nil
|
||||
}
|
||||
|
||||
if n := len(reply.Rejections); n == 1 {
|
||||
return nil, fmt.Errorf("%d/%d signing request was rejected", n, len(req))
|
||||
} else if n > 1 {
|
||||
return nil, fmt.Errorf("%d/%d signing requests were rejected", n, len(req))
|
||||
m.logger.Debug("retrieving and renewing workload identities", "num_identities", len(m.widSpecs))
|
||||
|
||||
if err := m.getIdentities(); err != nil {
|
||||
return fmt.Errorf("failed to fetch signed identities: %w", err)
|
||||
}
|
||||
|
||||
if len(reply.SignedIdentities) == 0 {
|
||||
return nil, fmt.Errorf("empty signed identity response")
|
||||
}
|
||||
go m.renew()
|
||||
|
||||
if exp, act := len(reply.SignedIdentities), len(req); exp != act {
|
||||
return nil, fmt.Errorf("expected %d signed identities but received %d", exp, act)
|
||||
}
|
||||
|
||||
return reply.SignedIdentities, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves the latest signed identity or returns an error. It must be
|
||||
// called after Run and does not block.
|
||||
//
|
||||
// For retrieving tokens which might be renewed callers should use Watch
|
||||
// instead to avoid missing new tokens retrieved by Run between Get and Watch
|
||||
// calls.
|
||||
func (m *WIDMgr) Get(id cstructs.TaskIdentity) (*structs.SignedWorkloadIdentity, error) {
|
||||
token := m.get(id)
|
||||
if token == nil {
|
||||
// This is an error as every identity should have a token by the time Get
|
||||
// is called.
|
||||
return nil, fmt.Errorf("unable to find token for task %q and identity %q", id.TaskName, id.IdentityName)
|
||||
}
|
||||
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func (m *WIDMgr) get(id cstructs.TaskIdentity) *structs.SignedWorkloadIdentity {
|
||||
m.lastTokenLock.RLock()
|
||||
defer m.lastTokenLock.RUnlock()
|
||||
|
||||
return m.lastToken[id]
|
||||
}
|
||||
|
||||
// Watch returns a channel that sends new signed identities until it is closed
|
||||
// due to shutdown. Must be called after Run.
|
||||
//
|
||||
// The caller must call the returned func to stop watching and ensure the
|
||||
// watched id actually exists, otherwise the channel never returns a result.
|
||||
func (m *WIDMgr) Watch(id cstructs.TaskIdentity) (<-chan *structs.SignedWorkloadIdentity, func()) {
|
||||
// If Shutdown has been called return a closed chan
|
||||
if m.stopCtx.Err() != nil {
|
||||
c := make(chan *structs.SignedWorkloadIdentity)
|
||||
close(c)
|
||||
return c, func() {}
|
||||
}
|
||||
|
||||
m.watchersLock.Lock()
|
||||
defer m.watchersLock.Unlock()
|
||||
|
||||
// Buffer of 1 so sends don't block on receives
|
||||
c := make(chan *structs.SignedWorkloadIdentity, 1)
|
||||
m.watchers[id] = make([]chan *structs.SignedWorkloadIdentity, 0)
|
||||
m.watchers[id] = append(m.watchers[id], c)
|
||||
|
||||
// Create a cancel func for watchers to deregister when they exit.
|
||||
cancel := func() {
|
||||
m.watchersLock.Lock()
|
||||
defer m.watchersLock.Unlock()
|
||||
|
||||
m.watchers[id] = slices.DeleteFunc(
|
||||
m.watchers[id],
|
||||
func(ch chan *structs.SignedWorkloadIdentity) bool { return ch == c },
|
||||
)
|
||||
}
|
||||
|
||||
// Prime chan with latest token to avoid a race condition where consumers
|
||||
// could miss a token update between Get and Watch calls.
|
||||
if token := m.get(id); token != nil {
|
||||
c <- token
|
||||
}
|
||||
|
||||
return c, cancel
|
||||
}
|
||||
|
||||
// Shutdown stops renewal and closes all watch chans.
|
||||
func (m *WIDMgr) Shutdown() {
|
||||
m.watchersLock.Lock()
|
||||
defer m.watchersLock.Unlock()
|
||||
|
||||
m.stop()
|
||||
|
||||
for _, w := range m.watchers {
|
||||
for _, c := range w {
|
||||
close(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getIdentities fetches all signed identities or returns an error.
|
||||
func (m *WIDMgr) getIdentities() error {
|
||||
if len(m.widSpecs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.lastTokenLock.Lock()
|
||||
defer m.lastTokenLock.Unlock()
|
||||
|
||||
reqs := make([]*structs.WorkloadIdentityRequest, 0, len(m.widSpecs))
|
||||
for taskName, widspecs := range m.widSpecs {
|
||||
for _, widspec := range widspecs {
|
||||
reqs = append(reqs, &structs.WorkloadIdentityRequest{
|
||||
AllocID: m.allocID,
|
||||
TaskName: taskName,
|
||||
IdentityName: widspec.Name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Get signed workload identities
|
||||
signedWIDs, err := m.signer.SignIdentities(m.minIndex, reqs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Index initial workload identities by name
|
||||
m.lastToken = make(map[cstructs.TaskIdentity]*structs.SignedWorkloadIdentity, len(signedWIDs))
|
||||
for _, swid := range signedWIDs {
|
||||
id := cstructs.TaskIdentity{
|
||||
TaskName: swid.TaskName,
|
||||
IdentityName: swid.IdentityName,
|
||||
}
|
||||
|
||||
m.lastToken[id] = swid
|
||||
}
|
||||
// TODO: Persist signed identity token to client state
|
||||
return nil
|
||||
}
|
||||
|
||||
// renew fetches new signed workload identity tokens before the existing tokens
|
||||
// expire.
|
||||
func (m *WIDMgr) renew() {
|
||||
if len(m.widSpecs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
reqs := make([]*structs.WorkloadIdentityRequest, 0, len(m.widSpecs))
|
||||
for taskName, widspecs := range m.widSpecs {
|
||||
for _, widspec := range widspecs {
|
||||
if widspec.TTL == 0 {
|
||||
continue
|
||||
}
|
||||
reqs = append(reqs, &structs.WorkloadIdentityRequest{
|
||||
AllocID: m.allocID,
|
||||
TaskName: taskName,
|
||||
IdentityName: widspec.Name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if len(reqs) == 0 {
|
||||
m.logger.Trace("no workload identities expire")
|
||||
return
|
||||
}
|
||||
|
||||
renewNow := false
|
||||
minExp := time.Time{}
|
||||
|
||||
for taskName, wids := range m.widSpecs {
|
||||
for _, wid := range wids {
|
||||
if wid.TTL == 0 {
|
||||
// No ttl, so no need to renew it
|
||||
continue
|
||||
}
|
||||
|
||||
//FIXME make this less ugly
|
||||
token := m.get(cstructs.TaskIdentity{
|
||||
TaskName: taskName,
|
||||
IdentityName: wid.Name,
|
||||
})
|
||||
if token == nil {
|
||||
// Missing a signature, treat this case as already expired so
|
||||
// we get a token ASAP
|
||||
m.logger.Debug("missing token for identity", "identity", wid.Name)
|
||||
renewNow = true
|
||||
continue
|
||||
}
|
||||
|
||||
if minExp.IsZero() || token.Expiration.Before(minExp) {
|
||||
minExp = token.Expiration
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var wait time.Duration
|
||||
if !renewNow {
|
||||
wait = helper.ExpiryToRenewTime(minExp, time.Now, m.minWait)
|
||||
}
|
||||
|
||||
timer, timerStop := helper.NewStoppedTimer()
|
||||
defer timerStop()
|
||||
|
||||
var retry uint64
|
||||
|
||||
for {
|
||||
// we need to handle stopCtx.Err() and manually stop the subscribers
|
||||
if err := m.stopCtx.Err(); err != nil {
|
||||
// close watchers and shutdown
|
||||
m.Shutdown()
|
||||
return
|
||||
}
|
||||
|
||||
m.logger.Debug("waiting to renew identities", "num", len(reqs), "wait", wait)
|
||||
timer.Reset(wait)
|
||||
select {
|
||||
case <-timer.C:
|
||||
m.logger.Trace("getting new signed identities", "num", len(reqs))
|
||||
case <-m.stopCtx.Done():
|
||||
// close watchers and shutdown
|
||||
m.Shutdown()
|
||||
return
|
||||
}
|
||||
|
||||
// Renew all tokens together since its cheap
|
||||
// FIXME this will have to be revisited once we support identity change modes
|
||||
tokens, err := m.signer.SignIdentities(m.minIndex, reqs)
|
||||
if err != nil {
|
||||
retry++
|
||||
wait = helper.Backoff(m.minWait, time.Hour, retry) + helper.RandomStagger(m.minWait)
|
||||
m.logger.Error("error renewing workload identities", "error", err, "next", wait)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(tokens) == 0 {
|
||||
retry++
|
||||
wait = helper.Backoff(m.minWait, time.Hour, retry) + helper.RandomStagger(m.minWait)
|
||||
m.logger.Error("error renewing workload identities", "error", "no tokens", "next", wait)
|
||||
continue
|
||||
}
|
||||
|
||||
// Reset next expiration time
|
||||
minExp = time.Time{}
|
||||
|
||||
for _, token := range tokens {
|
||||
id := cstructs.TaskIdentity{
|
||||
TaskName: token.TaskName,
|
||||
IdentityName: token.IdentityName,
|
||||
}
|
||||
|
||||
// Set for getters
|
||||
m.lastTokenLock.Lock()
|
||||
m.lastToken[id] = token
|
||||
m.lastTokenLock.Unlock()
|
||||
|
||||
// Send to watchers
|
||||
m.watchersLock.Lock()
|
||||
m.send(id, token)
|
||||
m.watchersLock.Unlock()
|
||||
|
||||
// Set next expiration time
|
||||
if minExp.IsZero() || token.Expiration.Before(minExp) {
|
||||
minExp = token.Expiration
|
||||
}
|
||||
}
|
||||
|
||||
// Success! Set next renewal and reset retries
|
||||
wait = helper.ExpiryToRenewTime(minExp, time.Now, m.minWait)
|
||||
retry = 0
|
||||
}
|
||||
}
|
||||
|
||||
// send must be called while holding the m.watchersLock
|
||||
func (m *WIDMgr) send(id cstructs.TaskIdentity, token *structs.SignedWorkloadIdentity) {
|
||||
w, ok := m.watchers[id]
|
||||
if !ok {
|
||||
// No watchers
|
||||
return
|
||||
}
|
||||
|
||||
for _, c := range w {
|
||||
// Pop any unreceived tokens
|
||||
select {
|
||||
case <-c:
|
||||
default:
|
||||
}
|
||||
|
||||
// Send new token, should never block since this is the only sender and
|
||||
// watchersLock is held
|
||||
c <- token
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,14 +29,14 @@ func TestWIDMgr(t *testing.T) {
|
||||
})
|
||||
t.Cleanup(ta.Shutdown)
|
||||
|
||||
mgr := widmgr.New(widmgr.Config{
|
||||
mgr := widmgr.NewSigner(widmgr.SignerConfig{
|
||||
NodeSecret: uuid.Generate(), // not checked when ACLs disabled
|
||||
Region: "global",
|
||||
RPC: ta,
|
||||
})
|
||||
|
||||
_, err := mgr.SignIdentities(1, nil)
|
||||
must.ErrorContains(t, err, "no identities requested")
|
||||
must.ErrorContains(t, err, "no identities to sign")
|
||||
|
||||
_, err = mgr.SignIdentities(1, []*structs.WorkloadIdentityRequest{
|
||||
{
|
||||
|
||||
@@ -11337,6 +11337,10 @@ func NewIdentityClaims(job *Job, alloc *Allocation, taskName string, wid *Worklo
|
||||
return nil
|
||||
}
|
||||
|
||||
if wid == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
jwtnow := jwt.NewNumericDate(now.UTC())
|
||||
claims := &IdentityClaims{
|
||||
Namespace: alloc.Namespace,
|
||||
|
||||
Reference in New Issue
Block a user