From e1685483413e66e720d307fdd353ff248635c633 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 24 Mar 2025 12:05:04 -0400 Subject: [PATCH] provide allocrunner hooks with prebuilt taskenv and fix mutation bugs (#25373) Some of our allocrunner hooks require a task environment for interpolating values based on the node or allocation. But several of the hooks accept an already-built environment or builder and then keep that in memory. Both of these retain a copy of all the node attributes and allocation metadata, which balloons memory usage until the allocation is GC'd. While we'd like to look into ways to avoid keeping the allocrunner around entirely (see #25372), for now we can significantly reduce memory usage by creating the task environment on-demand when calling allocrunner methods, rather than persisting it in the allocrunner hooks. In doing so, we uncover two other bugs: * The WID manager, the group service hook, and the checks hook have to interpolate services for specific tasks. They mutated a taskenv builder to do so, but each time they mutate the builder, they write to the same environment map. When a group has multiple tasks, it's possible for one task to set an environment variable that would then be interpolated in the service definition for another task if that task did not have that environment variable. Only the service definition interpolation is impacted. This does not leak env vars across running tasks, as each taskrunner has its own builder. To fix this, we move the `UpdateTask` method off the builder and onto the taskenv as the `WithTask` method. This makes a shallow copy of the taskenv with a deep clone of the environment map used for interpolation, and then overwrites the environment from the task. * The checks hook interpolates Nomad native service checks only on `Prerun` and not on `Update`. This could cause unexpected deregistration and registration of checks during in-place updates. To fix this, we make sure we interpolate in the `Update` method. I also bumped into an incorrectly implemented interface in the CSI hook. I've pulled that and some better guardrails out to https://github.com/hashicorp/nomad/pull/25472. Fixes: https://github.com/hashicorp/nomad/issues/25269 Fixes: https://hashicorp.atlassian.net/browse/NET-12310 Ref: https://github.com/hashicorp/nomad/issues/25372 --- .changelog/25373.txt | 11 ++++ client/allochealth/tracker.go | 6 +- client/allochealth/tracker_test.go | 52 +++++++-------- client/allocrunner/alloc_runner.go | 8 +-- client/allocrunner/alloc_runner_hooks.go | 42 ++++++------ client/allocrunner/alloc_runner_unix_test.go | 3 +- client/allocrunner/allocdir_hook.go | 3 +- client/allocrunner/checks_hook.go | 15 ++--- client/allocrunner/checks_hook_test.go | 18 ++++-- client/allocrunner/consul_grpc_sock_hook.go | 3 +- .../allocrunner/consul_grpc_sock_hook_test.go | 8 +-- client/allocrunner/consul_hook.go | 13 ++-- client/allocrunner/consul_hook_test.go | 12 ++-- client/allocrunner/consul_http_sock_hook.go | 3 +- .../allocrunner/consul_http_sock_hook_test.go | 6 +- client/allocrunner/cpuparts_hook.go | 3 +- client/allocrunner/csi_hook.go | 3 +- client/allocrunner/csi_hook_test.go | 6 +- client/allocrunner/fail_hook.go | 3 +- client/allocrunner/group_service_hook.go | 36 +++++------ client/allocrunner/group_service_hook_test.go | 34 +++++----- client/allocrunner/health_hook.go | 37 ++++------- client/allocrunner/health_hook_test.go | 39 ++++++----- client/allocrunner/identity_hook.go | 3 +- client/allocrunner/identity_hook_test.go | 8 +-- .../interfaces/runner_lifecycle.go | 6 +- client/allocrunner/migrate_hook.go | 3 +- client/allocrunner/network_hook.go | 9 +-- client/allocrunner/network_hook_test.go | 21 +++--- .../taskrunner/identity_hook_test.go | 8 +-- .../taskrunner/task_runner_test.go | 8 +-- .../allocrunner/taskrunner/vault_hook_test.go | 4 +- client/allocrunner/upstream_allocs_hook.go | 3 +- client/taskenv/env.go | 40 +++++++++--- client/taskenv/env_test.go | 64 ++++++++----------- client/widmgr/widmgr.go | 6 +- client/widmgr/widmgr_test.go | 6 +- nomad/job_endpoint_hook_connect.go | 4 +- 38 files changed, 278 insertions(+), 279 deletions(-) create mode 100644 .changelog/25373.txt diff --git a/.changelog/25373.txt b/.changelog/25373.txt new file mode 100644 index 000000000..1b908528a --- /dev/null +++ b/.changelog/25373.txt @@ -0,0 +1,11 @@ +```release-note:improvement +client: Improve memory usage by dropping references to task environment +``` + +```release-note:bug +services: Fixed a bug where Nomad native services would not be correctly interpolated during in-place updates +``` + +```release-note:bug +services: Fixed a bug where task-level services, checks, and identities could interpolate jobspec values from other tasks in the same group +``` diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go index 3273a1562..a6cb88045 100644 --- a/client/allochealth/tracker.go +++ b/client/allochealth/tracker.go @@ -119,7 +119,7 @@ func NewTracker( logger hclog.Logger, alloc *structs.Allocation, allocUpdates *cstructs.AllocListener, - taskEnvBuilder *taskenv.Builder, + allocEnv *taskenv.TaskEnv, consulClient serviceregistration.Handler, checkStore checkstore.Shim, minHealthyTime time.Duration, @@ -145,7 +145,7 @@ func NewTracker( // first because taskEnvBuilder is mutated in every loop and we can't undo // a call to UpdateTask(). t.taskEnvs = make(map[string]*taskenv.TaskEnv, len(t.tg.Tasks)+1) - t.taskEnvs[""] = taskEnvBuilder.Build() + t.taskEnvs[""] = allocEnv t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks)) for _, task := range t.tg.Tasks { @@ -155,7 +155,7 @@ func NewTracker( t.lifecycleTasks[task.Name] = task.Lifecycle.Hook } - t.taskEnvs[task.Name] = taskEnvBuilder.UpdateTask(alloc, task).Build() + t.taskEnvs[task.Name] = allocEnv.WithTask(alloc, task) c, n := countChecks(task.Services) t.consulCheckCount += c diff --git a/client/allochealth/tracker_test.go b/client/allochealth/tracker_test.go index bafce2d5b..789291836 100644 --- a/client/allochealth/tracker_test.go +++ b/client/allochealth/tracker_test.go @@ -180,9 +180,9 @@ func TestTracker_ConsulChecks_Interpolation(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -253,9 +253,9 @@ func TestTracker_ConsulChecks_Healthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -306,9 +306,9 @@ func TestTracker_NomadChecks_Healthy(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -375,9 +375,9 @@ func TestTracker_NomadChecks_Unhealthy(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -436,9 +436,9 @@ func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -479,9 +479,9 @@ func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -560,9 +560,9 @@ func TestTracker_ConsulChecks_Unhealthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -641,9 +641,9 @@ func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond minHealthyTime := 2 * time.Second - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, minHealthyTime, true) tracker.checkLookupInterval = checkInterval assertChecksHealth := func(exp bool) { @@ -732,9 +732,9 @@ func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval assertChecksHealth := func(exp bool) { @@ -785,8 +785,8 @@ func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) - tracker := NewTracker(ctx, logger, alloc, nil, taskEnvBuilder, nil, nil, time.Millisecond, true) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() + tracker := NewTracker(ctx, logger, alloc, nil, env, nil, nil, time.Millisecond, true) assertNoHealth := func() { require.NoError(t, tracker.ctx.Err()) @@ -895,9 +895,9 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -1042,9 +1042,9 @@ func TestTracker_ConsulChecks_OnUpdate(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -1162,9 +1162,9 @@ func TestTracker_NomadChecks_OnUpdate(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) minHealthyTime := 1 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, minHealthyTime, true) tracker.checkLookupInterval = 10 * time.Millisecond tracker.Start() diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 9809d9b30..478cff439 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -297,17 +297,15 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e ar.shutdownDelayCtx = shutdownDelayCtx ar.shutdownDelayCancelFn = shutdownDelayCancel - // Create a *taskenv.Builder for the allocation so the WID manager can - // interpolate services with the allocation and tasks as needed - envBuilder := taskenv.NewBuilder( + allocEnv := taskenv.NewBuilder( config.ClientConfig.Node, ar.Alloc(), nil, config.ClientConfig.Region, - ).SetAllocDir(ar.allocDir.AllocDirPath()) + ).SetAllocDir(ar.allocDir.AllocDirPath()).Build() // initialize the workload identity manager - widmgr := widmgr.NewWIDMgr(ar.widsigner, alloc, ar.stateDB, ar.logger, envBuilder) + widmgr := widmgr.NewWIDMgr(ar.widsigner, alloc, ar.stateDB, ar.logger, allocEnv) ar.widmgr = widmgr // Initialize the runners hooks. diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index be8e082d2..8f7493e21 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -103,21 +103,6 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { return fmt.Errorf("failed to initialize network configurator: %v", err) } - // Create a new taskenv.Builder which is used by hooks that mutate them to - // build new taskenv.TaskEnv. - newEnvBuilder := func() *taskenv.Builder { - return taskenv.NewBuilder( - config.Node, - ar.Alloc(), - nil, - config.Region, - ).SetAllocDir(ar.allocDir.AllocDirPath()) - } - - // Create a *taskenv.TaskEnv which is used for read only purposes by the - // newNetworkHook and newChecksHook. - builtTaskEnv := newEnvBuilder().Build() - // Create the alloc directory hook. This is run first to ensure the // directory path exists for other hooks. alloc := ar.Alloc() @@ -132,21 +117,19 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { consulConfigs: ar.clientConfig.GetConsulConfigs(hookLogger), consulClientConstructor: consul.NewConsulClientFactory(config), hookResources: ar.hookResources, - envBuilder: newEnvBuilder, logger: hookLogger, }), newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher), newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir), newCPUPartsHook(hookLogger, ar.partitions, alloc), - newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulServicesHandler, ar.checkStore), - newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv), + newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulServicesHandler, ar.checkStore), + newNetworkHook(hookLogger, ns, alloc, nm, nc, ar), newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, providerNamespace: alloc.ServiceProviderNamespace(), serviceRegWrapper: ar.serviceRegWrapper, hookResources: ar.hookResources, restarter: ar, - taskEnvBuilder: newEnvBuilder(), networkStatus: ar, logger: hookLogger, shutdownDelayCtx: ar.shutdownDelayCtx, @@ -156,7 +139,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.GetConsulConfigs(ar.logger)), newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID), - newChecksHook(hookLogger, alloc, ar.checkStore, ar, builtTaskEnv), + newChecksHook(hookLogger, alloc, ar.checkStore, ar), } if config.ExtraAllocHooks != nil { ar.runnerHooks = append(ar.runnerHooks, config.ExtraAllocHooks...) @@ -176,6 +159,13 @@ func (ar *allocRunner) prerun() error { }() } + allocEnv := taskenv.NewBuilder( + ar.clientConfig.GetNode(), + ar.Alloc(), + nil, + ar.clientConfig.Region, + ).SetAllocDir(ar.allocDir.AllocDirPath()).Build() + for _, hook := range ar.runnerHooks { pre, ok := hook.(interfaces.RunnerPrerunHook) if !ok { @@ -197,7 +187,7 @@ func (ar *allocRunner) prerun() error { hookExecutionStart = time.Now() } - err := pre.Prerun() + err := pre.Prerun(allocEnv) ar.hookStatsHandler.Emit(hookExecutionStart, name, "prerun", err) if err != nil { return fmt.Errorf("pre-run hook %q failed: %v", name, err) @@ -224,8 +214,16 @@ func (ar *allocRunner) update(update *structs.Allocation) error { }() } + allocEnv := taskenv.NewBuilder( + ar.clientConfig.GetNode(), + ar.Alloc(), + nil, + ar.clientConfig.Region, + ).SetAllocDir(ar.allocDir.AllocDirPath()).Build() + req := &interfaces.RunnerUpdateRequest{ - Alloc: update, + Alloc: update, + AllocEnv: allocEnv, } var merr multierror.Error diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index bc69bcd8e..16cbf0f58 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/ci" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -303,6 +304,6 @@ type allocFailingPrestartHook struct{} func (*allocFailingPrestartHook) Name() string { return "failing_prestart" } -func (*allocFailingPrestartHook) Prerun() error { +func (*allocFailingPrestartHook) Prerun(_ *taskenv.TaskEnv) error { return fmt.Errorf("failing prestart hooks") } diff --git a/client/allocrunner/allocdir_hook.go b/client/allocrunner/allocdir_hook.go index e49b74ac4..7d1fa996b 100644 --- a/client/allocrunner/allocdir_hook.go +++ b/client/allocrunner/allocdir_hook.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" ) // allocDirHook creates and destroys the root directory and shared directories @@ -34,7 +35,7 @@ func (h *allocDirHook) Name() string { return "alloc_dir" } -func (h *allocDirHook) Prerun() error { +func (h *allocDirHook) Prerun(_ *taskenv.TaskEnv) error { return h.allocDir.Build() } diff --git a/client/allocrunner/checks_hook.go b/client/allocrunner/checks_hook.go index bb6cea16f..51a5a347c 100644 --- a/client/allocrunner/checks_hook.go +++ b/client/allocrunner/checks_hook.go @@ -83,7 +83,6 @@ type checksHook struct { shim checkstore.Shim checker checks.Checker allocID string - taskEnv *taskenv.TaskEnv // fields that get re-initialized on allocation update lock sync.RWMutex @@ -98,7 +97,6 @@ func newChecksHook( alloc *structs.Allocation, shim checkstore.Shim, network structs.NetworkStatus, - taskEnv *taskenv.TaskEnv, ) *checksHook { h := &checksHook{ logger: logger.Named(checksHookName), @@ -107,7 +105,6 @@ func newChecksHook( shim: shim, network: network, checker: checks.New(logger), - taskEnv: taskEnv, } h.initialize(alloc) return h @@ -210,7 +207,7 @@ func (h *checksHook) Name() string { return checksHookName } -func (h *checksHook) Prerun() error { +func (h *checksHook) Prerun(allocEnv *taskenv.TaskEnv) error { h.lock.Lock() defer h.lock.Unlock() @@ -219,7 +216,8 @@ func (h *checksHook) Prerun() error { return nil } - interpolatedServices := taskenv.InterpolateServices(h.taskEnv, group.NomadServices()) + interpolatedServices := taskenv.InterpolateServices( + allocEnv, group.NomadServices()) // create and start observers of nomad service checks in alloc h.observe(h.alloc, interpolatedServices) @@ -237,11 +235,12 @@ func (h *checksHook) Update(request *interfaces.RunnerUpdateRequest) error { } // get all group and task level services using nomad provider - services := group.NomadServices() + interpolatedServices := taskenv.InterpolateServices( + request.AllocEnv, group.NomadServices()) // create a set of the updated set of checks next := make([]structs.CheckID, 0, len(h.observers)) - for _, service := range services { + for _, service := range interpolatedServices { for _, check := range service.Checks { next = append(next, structs.NomadCheckID( request.Alloc.ID, @@ -267,7 +266,7 @@ func (h *checksHook) Update(request *interfaces.RunnerUpdateRequest) error { h.alloc = request.Alloc // ensure we are observing new checks (idempotent) - h.observe(request.Alloc, services) + h.observe(request.Alloc, interpolatedServices) return nil } diff --git a/client/allocrunner/checks_hook_test.go b/client/allocrunner/checks_hook_test.go index bb7355f67..bfaa12ea8 100644 --- a/client/allocrunner/checks_hook_test.go +++ b/client/allocrunner/checks_hook_test.go @@ -164,15 +164,15 @@ func TestCheckHook_Checks_ResultsSet(t *testing.T) { alloc := allocWithNomadChecks(addr, port, tc.onGroup) - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - h := newChecksHook(logger, alloc, checkStore, network, envBuilder.Build()) + h := newChecksHook(logger, alloc, checkStore, network) // initialize is called; observers are created but not started yet must.MapEmpty(t, h.observers) // calling pre-run starts the observers - err := h.Prerun() + err := h.Prerun(env) must.NoError(t, err) testutil.WaitForResultUntil( @@ -231,12 +231,12 @@ func TestCheckHook_Checks_UpdateSet(t *testing.T) { alloc := allocWithNomadChecks(addr, port, true) - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - h := newChecksHook(logger, alloc, shim, network, envBuilder.Build()) + h := newChecksHook(logger, alloc, shim, network) // calling pre-run starts the observers - err := h.Prerun() + err := h.Prerun(env) must.NoError(t, err) // initial set of checks @@ -269,8 +269,12 @@ func TestCheckHook_Checks_UpdateSet(t *testing.T) { }, ) + updatedAlloc := allocWithDifferentNomadChecks(alloc.ID, addr, port) + updatedEnv := taskenv.NewBuilder(mock.Node(), updatedAlloc, nil, alloc.Job.Region).Build() + request := &interfaces.RunnerUpdateRequest{ - Alloc: allocWithDifferentNomadChecks(alloc.ID, addr, port), + Alloc: updatedAlloc, + AllocEnv: updatedEnv, } err = h.Update(request) diff --git a/client/allocrunner/consul_grpc_sock_hook.go b/client/allocrunner/consul_grpc_sock_hook.go index 50dc3f1bd..0cc53e981 100644 --- a/client/allocrunner/consul_grpc_sock_hook.go +++ b/client/allocrunner/consul_grpc_sock_hook.go @@ -21,6 +21,7 @@ import ( "github.com/hashicorp/go-set/v3" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" ) @@ -130,7 +131,7 @@ func (h *consulGRPCSocketHook) shouldRun() bool { return false } -func (h *consulGRPCSocketHook) Prerun() error { +func (h *consulGRPCSocketHook) Prerun(_ *taskenv.TaskEnv) error { h.mu.Lock() defer h.mu.Unlock() diff --git a/client/allocrunner/consul_grpc_sock_hook_test.go b/client/allocrunner/consul_grpc_sock_hook_test.go index 8ed87f85e..91c82aa87 100644 --- a/client/allocrunner/consul_grpc_sock_hook_test.go +++ b/client/allocrunner/consul_grpc_sock_hook_test.go @@ -50,7 +50,7 @@ func TestConsulGRPCSocketHook_PrerunPostrun_Ok(t *testing.T) { // Start the unix socket proxy h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{}) - require.NoError(t, h.Prerun()) + require.NoError(t, h.Prerun(nil)) gRPCSock := filepath.Join(allocDir.AllocDir, allocdir.AllocGRPCSocket) envoyConn, err := net.Dial("unix", gRPCSock) @@ -125,7 +125,7 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) { // An alloc without a Connect proxy sidecar should not return // an error. h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{}) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(nil)) // Postrun should be a noop must.NoError(t, h.Postrun()) @@ -135,7 +135,7 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) { // An alloc *with* a Connect proxy sidecar *should* return an error // when Consul is not configured. h := newConsulGRPCSocketHook(logger, connectAlloc, allocDir, consulConfigs, map[string]string{}) - must.ErrorContains(t, h.Prerun(), `consul address for cluster "" must be set on nomad client`) + must.ErrorContains(t, h.Prerun(nil), `consul address for cluster "" must be set on nomad client`) // Postrun should be a noop must.NoError(t, h.Postrun()) @@ -145,7 +145,7 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) { // Updating an alloc without a sidecar to have a sidecar should // error when the sidecar is added. h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{}) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(nil)) req := &interfaces.RunnerUpdateRequest{ Alloc: connectAlloc, diff --git a/client/allocrunner/consul_hook.go b/client/allocrunner/consul_hook.go index f47b9f992..fa282b749 100644 --- a/client/allocrunner/consul_hook.go +++ b/client/allocrunner/consul_hook.go @@ -27,7 +27,6 @@ type consulHook struct { consulConfigs map[string]*structsc.ConsulConfig consulClientConstructor consul.ConsulClientFunc hookResources *cstructs.AllocHookResources - envBuilder *taskenv.Builder logger log.Logger shutdownCtx context.Context @@ -48,9 +47,6 @@ type consulHookConfig struct { // hookResources is used for storing and retrieving Consul tokens hookResources *cstructs.AllocHookResources - // envBuilder is used to interpolate services - envBuilder func() *taskenv.Builder - logger log.Logger } @@ -63,7 +59,6 @@ func newConsulHook(cfg consulHookConfig) *consulHook { consulConfigs: cfg.consulConfigs, consulClientConstructor: cfg.consulClientConstructor, hookResources: cfg.hookResources, - envBuilder: cfg.envBuilder(), shutdownCtx: shutdownCtx, shutdownCancelFn: shutdownCancelFn, } @@ -83,7 +78,7 @@ func (*consulHook) Name() string { return "consul" } -func (h *consulHook) Prerun() error { +func (h *consulHook) Prerun(allocEnv *taskenv.TaskEnv) error { job := h.alloc.Job if job == nil { @@ -102,12 +97,12 @@ func (h *consulHook) Prerun() error { } var mErr *multierror.Error - if err := h.prepareConsulTokensForServices(tg.Services, tg, tokens, h.envBuilder.Build()); err != nil { + if err := h.prepareConsulTokensForServices(tg.Services, tg, tokens, allocEnv); err != nil { mErr = multierror.Append(mErr, err) } for _, task := range tg.Tasks { - h.envBuilder.UpdateTask(h.alloc, task) - if err := h.prepareConsulTokensForServices(task.Services, tg, tokens, h.envBuilder.Build()); err != nil { + taskEnv := allocEnv.WithTask(h.alloc, task) + if err := h.prepareConsulTokensForServices(task.Services, tg, tokens, taskEnv); err != nil { mErr = multierror.Append(mErr, err) } if err := h.prepareConsulTokensForTask(task, tg, tokens); err != nil { diff --git a/client/allocrunner/consul_hook_test.go b/client/allocrunner/consul_hook_test.go index c24b915e4..38df31766 100644 --- a/client/allocrunner/consul_hook_test.go +++ b/client/allocrunner/consul_hook_test.go @@ -58,9 +58,9 @@ func consulHookTestHarness(t *testing.T) *consulHook { db := cstate.NewMemDB(logger) // the WIDMgr env builder never has the task available - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") + env := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() - mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder) + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, env) mockWIDMgr.SignForTesting() consulConfigs := map[string]*structsc.ConsulConfig{ @@ -68,9 +68,6 @@ func consulHookTestHarness(t *testing.T) *consulHook { } hookResources := cstructs.NewAllocHookResources() - envBuilderFn := func() *taskenv.Builder { - return taskenv.NewBuilder(mock.Node(), alloc, nil, "global") - } consulHookCfg := consulHookConfig{ alloc: alloc, @@ -79,7 +76,6 @@ func consulHookTestHarness(t *testing.T) *consulHook { consulConfigs: consulConfigs, consulClientConstructor: consul.NewMockConsulClient, hookResources: hookResources, - envBuilder: envBuilderFn, logger: logger, } return newConsulHook(consulHookCfg) @@ -184,8 +180,8 @@ func Test_consulHook_prepareConsulTokensForServices(t *testing.T) { hook := consulHookTestHarness(t) task := hook.alloc.LookupTask("web") services := task.Services - hook.envBuilder.UpdateTask(hook.alloc, task) - env := hook.envBuilder.Build() + env := taskenv.NewBuilder(mock.Node(), hook.alloc, task, "global"). + Build().WithTask(hook.alloc, task) hashedJWT := make(map[string]string) for _, s := range services { diff --git a/client/allocrunner/consul_http_sock_hook.go b/client/allocrunner/consul_http_sock_hook.go index 86278b768..09f7f8757 100644 --- a/client/allocrunner/consul_http_sock_hook.go +++ b/client/allocrunner/consul_http_sock_hook.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/go-set/v3" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" ) @@ -105,7 +106,7 @@ func (h *consulHTTPSockHook) shouldRun() bool { return false } -func (h *consulHTTPSockHook) Prerun() error { +func (h *consulHTTPSockHook) Prerun(_ *taskenv.TaskEnv) error { h.lock.Lock() defer h.lock.Unlock() diff --git a/client/allocrunner/consul_http_sock_hook_test.go b/client/allocrunner/consul_http_sock_hook_test.go index a6a31ec2c..d48ec40c2 100644 --- a/client/allocrunner/consul_http_sock_hook_test.go +++ b/client/allocrunner/consul_http_sock_hook_test.go @@ -39,7 +39,7 @@ func TestConsulSocketHook_PrerunPostrun_Ok(t *testing.T) { // start unix socket proxy h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfigs) - require.NoError(t, h.Prerun()) + require.NoError(t, h.Prerun(nil)) httpSocket := filepath.Join(allocDir.AllocDir, allocdir.AllocHTTPSocket) taskCon, err := net.Dial("unix", httpSocket) @@ -112,7 +112,7 @@ func TestConsulHTTPSocketHook_Prerun_Error(t *testing.T) { { // an alloc without a connect native task should not return an error h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfigs) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(nil)) // postrun should be a noop must.NoError(t, h.Postrun()) @@ -122,7 +122,7 @@ func TestConsulHTTPSocketHook_Prerun_Error(t *testing.T) { // an alloc with a native task should return an error when consul is not // configured h := newConsulHTTPSocketHook(logger, connectNativeAlloc, allocDir, consulConfigs) - must.ErrorContains(t, h.Prerun(), "consul address must be set on nomad client") + must.ErrorContains(t, h.Prerun(nil), "consul address must be set on nomad client") // Postrun should be a noop must.NoError(t, h.Postrun()) diff --git a/client/allocrunner/cpuparts_hook.go b/client/allocrunner/cpuparts_hook.go index b9e545b03..5df9bdb60 100644 --- a/client/allocrunner/cpuparts_hook.go +++ b/client/allocrunner/cpuparts_hook.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/idset" "github.com/hashicorp/nomad/client/lib/numalib/hw" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" ) @@ -53,7 +54,7 @@ var ( _ interfaces.RunnerPostrunHook = (*cpuPartsHook)(nil) ) -func (h *cpuPartsHook) Prerun() error { +func (h *cpuPartsHook) Prerun(_ *taskenv.TaskEnv) error { return h.partitions.Reserve(h.reservations) } diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 1f0fe1810..7afcb64b8 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" @@ -89,7 +90,7 @@ func (c *csiHook) Name() string { return "csi_hook" } -func (c *csiHook) Prerun() error { +func (c *csiHook) Prerun(_ *taskenv.TaskEnv) error { if !c.shouldRun() { return nil } diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 68f0d306f..a0e1bfa8b 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -361,7 +361,7 @@ func TestCSIHook(t *testing.T) { vm.NextUnmountVolumeErr = errors.New("bad first attempt") } - err := hook.Prerun() + err := hook.Prerun(nil) if tc.expectedClaimErr != nil { must.EqError(t, err, tc.expectedClaimErr.Error()) } else { @@ -474,11 +474,11 @@ func TestCSIHook_Prerun_Validation(t *testing.T) { must.NotNil(t, hook) if tc.expectedErr != "" { - must.EqError(t, hook.Prerun(), tc.expectedErr) + must.EqError(t, hook.Prerun(nil), tc.expectedErr) mounts := ar.res.GetCSIMounts() must.Nil(t, mounts) } else { - must.NoError(t, hook.Prerun()) + must.NoError(t, hook.Prerun(nil)) mounts := ar.res.GetCSIMounts() must.NotNil(t, mounts) must.NoError(t, hook.Postrun()) diff --git a/client/allocrunner/fail_hook.go b/client/allocrunner/fail_hook.go index 0c03d6184..e53f148cf 100644 --- a/client/allocrunner/fail_hook.go +++ b/client/allocrunner/fail_hook.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/hcl/v2/hclsimple" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" ) var ErrFailHookError = errors.New("failed successfully") @@ -67,7 +68,7 @@ var ( _ interfaces.ShutdownHook = (*FailHook)(nil) ) -func (h *FailHook) Prerun() error { +func (h *FailHook) Prerun(_ *taskenv.TaskEnv) error { if h.Fail.Prerun { return fmt.Errorf("prerun %w", ErrFailHookError) } diff --git a/client/allocrunner/group_service_hook.go b/client/allocrunner/group_service_hook.go index 289d7d361..3ac957884 100644 --- a/client/allocrunner/group_service_hook.go +++ b/client/allocrunner/group_service_hook.go @@ -49,12 +49,11 @@ type groupServiceHook struct { logger hclog.Logger // The following fields may be updated - canary bool - services []*structs.Service - networks structs.Networks - ports structs.AllocatedPorts - taskEnvBuilder *taskenv.Builder - delay time.Duration + canary bool + services []*structs.Service + networks structs.Networks + ports structs.AllocatedPorts + delay time.Duration // Since Update() may be called concurrently with any other hook all // hook methods must be fully serialized @@ -64,7 +63,6 @@ type groupServiceHook struct { type groupServiceHookConfig struct { alloc *structs.Allocation restarter serviceregistration.WorkloadRestarter - taskEnvBuilder *taskenv.Builder networkStatus structs.NetworkStatus shutdownDelayCtx context.Context logger hclog.Logger @@ -95,7 +93,6 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { namespace: cfg.alloc.Namespace, restarter: cfg.restarter, providerNamespace: cfg.providerNamespace, - taskEnvBuilder: cfg.taskEnvBuilder, delay: shutdownDelay, networkStatus: cfg.networkStatus, logger: cfg.logger.Named(groupServiceHookName), @@ -131,7 +128,7 @@ func (*groupServiceHook) Name() string { return groupServiceHookName } -func (h *groupServiceHook) Prerun() error { +func (h *groupServiceHook) Prerun(allocEnv *taskenv.TaskEnv) error { h.mu.Lock() defer func() { // Mark prerun as true to unblock Updates @@ -140,15 +137,21 @@ func (h *groupServiceHook) Prerun() error { h.deregistered = false h.mu.Unlock() }() - return h.preRunLocked() + + return h.preRunLocked(allocEnv) } // caller must hold h.mu -func (h *groupServiceHook) preRunLocked() error { +func (h *groupServiceHook) preRunLocked(env *taskenv.TaskEnv) error { if len(h.services) == 0 { return nil } + // TODO(tgross): this will be nil in PreTaskKill method because we have some + // false sharing of this method + if env != nil { + h.services = taskenv.InterpolateServices(env, h.services) + } services := h.getWorkloadServicesLocked() return h.serviceRegWrapper.RegisterWorkload(services) } @@ -181,10 +184,10 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { // Update group service hook fields h.networks = networks - h.services = tg.Services + h.services = taskenv.InterpolateServices(req.AllocEnv, tg.Services) + h.canary = canary h.delay = shutdown - h.taskEnvBuilder.UpdateTask(req.Alloc, nil) // An update may change the service provider, therefore we need to account // for how namespaces work across providers also. @@ -213,7 +216,7 @@ func (h *groupServiceHook) PreTaskRestart() error { }() h.preKillLocked() - return h.preRunLocked() + return h.preRunLocked(nil) } func (h *groupServiceHook) PreKill() { @@ -278,9 +281,6 @@ func (h *groupServiceHook) deregisterLocked() { // // caller must hold h.lock func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.WorkloadServices { - // Interpolate with the task's environment - interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services) - allocTokens := h.hookResources.GetConsulTokens() tokens := map[string]string{} @@ -308,7 +308,7 @@ func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.Work AllocInfo: info, ProviderNamespace: h.providerNamespace, Restarter: h.restarter, - Services: interpolatedServices, + Services: h.services, Networks: h.networks, NetworkStatus: netStatus, Ports: h.ports, diff --git a/client/allocrunner/group_service_hook_test.go b/client/allocrunner/group_service_hook_test.go index 90866fde6..7b53d26ea 100644 --- a/client/allocrunner/group_service_hook_test.go +++ b/client/allocrunner/group_service_hook_test.go @@ -36,6 +36,7 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { logger := testlog.HCLogger(t) consulMockClient := regMock.NewServiceRegistrationHandler(logger) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() regWrapper := wrapper.NewHandlerWrapper( logger, @@ -46,13 +47,12 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(env)) - req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env} must.NoError(t, h.Update(req)) must.NoError(t, h.Postrun()) @@ -77,6 +77,7 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) { logger := testlog.HCLogger(t) consulMockClient := regMock.NewServiceRegistrationHandler(logger) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() regWrapper := wrapper.NewHandlerWrapper( logger, @@ -88,15 +89,14 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(env)) // Incease shutdown Delay alloc.Job.TaskGroups[0].ShutdownDelay = pointer.Of(15 * time.Second) - req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env} must.NoError(t, h.Update(req)) // Assert that update updated the delay value @@ -120,6 +120,7 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { alloc.Job.Canonicalize() logger := testlog.HCLogger(t) consulMockClient := regMock.NewServiceRegistrationHandler(logger) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() regWrapper := wrapper.NewHandlerWrapper( logger, @@ -130,13 +131,12 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(env)) - req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env} must.NoError(t, h.Update(req)) must.NoError(t, h.Postrun()) @@ -169,6 +169,7 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) { logger := testlog.HCLogger(t) consulMockClient := regMock.NewServiceRegistrationHandler(logger) nomadMockClient := regMock.NewServiceRegistrationHandler(logger) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() regWrapper := wrapper.NewHandlerWrapper(logger, consulMockClient, nomadMockClient) @@ -176,14 +177,13 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(env)) // Trigger our hook requests. - req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env} must.NoError(t, h.Update(req)) must.NoError(t, h.Postrun()) must.NoError(t, h.PreTaskRestart()) @@ -221,6 +221,7 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { logger := testlog.HCLogger(t) consulMockClient := regMock.NewServiceRegistrationHandler(logger) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() regWrapper := wrapper.NewHandlerWrapper( logger, @@ -231,13 +232,12 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(env)) - req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env} must.NoError(t, h.Update(req)) must.NoError(t, h.Postrun()) @@ -280,7 +280,6 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) @@ -324,7 +323,6 @@ func TestGroupServiceHook_PreKill(t *testing.T) { serviceRegWrapper: regWrapper, shutdownDelayCtx: shutDownCtx, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) @@ -373,7 +371,6 @@ func TestGroupServiceHook_PreKill(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) @@ -426,7 +423,6 @@ func TestGroupServiceHook_PreKill(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) diff --git a/client/allocrunner/health_hook.go b/client/allocrunner/health_hook.go index 9ac3653f5..0944aa3f1 100644 --- a/client/allocrunner/health_hook.go +++ b/client/allocrunner/health_hook.go @@ -68,13 +68,6 @@ type allocHealthWatcherHook struct { // alloc set by new func or Update. Must hold hookLock to access. alloc *structs.Allocation - // taskEnvBuilder is the current builder used to build task environments - // for the group and each of its tasks. Must hold hookLock to modify. - taskEnvBuilder *taskenv.Builder - - // taskEnvBuilderFactory creates a new *taskenv.Builder instance. - taskEnvBuilderFactory func() *taskenv.Builder - // isDeploy is true if monitoring a deployment. Set in init(). Must // hold hookLock to access. isDeploy bool @@ -85,7 +78,6 @@ type allocHealthWatcherHook struct { func newAllocHealthWatcherHook( logger hclog.Logger, alloc *structs.Allocation, - taskEnvBuilderFactory func() *taskenv.Builder, hs healthSetter, listener *cstructs.AllocListener, consul serviceregistration.Handler, @@ -103,15 +95,13 @@ func newAllocHealthWatcherHook( close(closedDone) h := &allocHealthWatcherHook{ - alloc: alloc, - taskEnvBuilderFactory: taskEnvBuilderFactory, - taskEnvBuilder: taskEnvBuilderFactory(), - cancelFn: func() {}, // initialize to prevent nil func panics - watchDone: closedDone, - consul: consul, - checkStore: checkStore, - healthSetter: hs, - listener: listener, + alloc: alloc, + cancelFn: func() {}, // initialize to prevent nil func panics + watchDone: closedDone, + consul: consul, + checkStore: checkStore, + healthSetter: hs, + listener: listener, } h.logger = logger.Named(h.Name()) @@ -134,7 +124,7 @@ func (h *allocHealthWatcherHook) Name() string { // Prerun or Update. Caller must set/update alloc and logger fields. // // Not threadsafe so the caller should lock since Updates occur concurrently. -func (h *allocHealthWatcherHook) init() error { +func (h *allocHealthWatcherHook) init(allocEnv *taskenv.TaskEnv) error { // No need to watch health as it's already set if h.healthSetter.HasHealth() { h.logger.Trace("not watching; already has health set") @@ -166,7 +156,7 @@ func (h *allocHealthWatcherHook) init() error { h.logger.Trace("watching", "deadline", deadline, "checks", useChecks, "min_healthy_time", minHealthyTime) // Create a new tracker, start it, and watch for health results. tracker := allochealth.NewTracker( - ctx, h.logger, h.alloc, h.listener, h.taskEnvBuilder, h.consul, h.checkStore, minHealthyTime, useChecks, + ctx, h.logger, h.alloc, h.listener, allocEnv, h.consul, h.checkStore, minHealthyTime, useChecks, ) tracker.Start() @@ -176,7 +166,7 @@ func (h *allocHealthWatcherHook) init() error { return nil } -func (h *allocHealthWatcherHook) Prerun() error { +func (h *allocHealthWatcherHook) Prerun(allocEnv *taskenv.TaskEnv) error { h.hookLock.Lock() defer h.hookLock.Unlock() @@ -186,7 +176,7 @@ func (h *allocHealthWatcherHook) Prerun() error { } h.ranOnce = true - return h.init() + return h.init(allocEnv) } func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) error { @@ -210,10 +200,7 @@ func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) err // Update alloc h.alloc = req.Alloc - // Create a new taskEnvBuilder with the updated alloc and a nil task - h.taskEnvBuilder = h.taskEnvBuilderFactory().UpdateTask(req.Alloc, nil) - - return h.init() + return h.init(req.AllocEnv) } func (h *allocHealthWatcherHook) Postrun() error { diff --git a/client/allocrunner/health_hook_test.go b/client/allocrunner/health_hook_test.go index 7b1bb09b4..ce1ae8c9f 100644 --- a/client/allocrunner/health_hook_test.go +++ b/client/allocrunner/health_hook_test.go @@ -96,7 +96,7 @@ func TestHealthHook_PrerunPostrun(t *testing.T) { checks := new(mock.CheckShim) alloc := mock.Alloc() - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks) // Assert we implemented the right interfaces prerunh, ok := h.(interfaces.RunnerPrerunHook) @@ -107,7 +107,8 @@ func TestHealthHook_PrerunPostrun(t *testing.T) { require.True(ok) // Prerun - require.NoError(prerunh.Prerun()) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() + require.NoError(prerunh.Prerun(env)) // Assert isDeploy is false (other tests peek at isDeploy to determine // if an Update applied) @@ -135,10 +136,11 @@ func TestHealthHook_PrerunUpdatePostrun(t *testing.T) { hs := &mockHealthSetter{} checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() // Prerun - require.NoError(h.Prerun()) + require.NoError(h.Prerun(env)) // Update multiple times in a goroutine to mimic Client behavior // (Updates are concurrent with alloc runner but are applied serially). @@ -175,7 +177,8 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) { hs := &mockHealthSetter{} checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() // Set a DeploymentID to cause ClearHealth to be called alloc.DeploymentID = uuid.Generate() @@ -184,7 +187,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) { // concurrent with alloc runner). errs := make(chan error, 1) go func(alloc *structs.Allocation) { - errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc}) + errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env}) close(errs) }(alloc.Copy()) @@ -193,7 +196,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) { } // Prerun should be a noop - require.NoError(h.Prerun()) + require.NoError(h.Prerun(env)) // Assert that the Update took affect by isDeploy being true h.hookLock.Lock() @@ -218,7 +221,7 @@ func TestHealthHook_Postrun(t *testing.T) { alloc := mock.Alloc() checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) // Postrun require.NoError(h.Postrun()) @@ -285,10 +288,11 @@ func TestHealthHook_SetHealth_healthy(t *testing.T) { hs := newMockHealthSetter() checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() // Prerun - require.NoError(h.Prerun()) + require.NoError(h.Prerun(env)) // Wait for health to be set (healthy) select { @@ -374,10 +378,11 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) { hs := newMockHealthSetter() checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() // Prerun - require.NoError(h.Prerun()) + require.NoError(h.Prerun(env)) // Wait to ensure we don't get a healthy status select { @@ -396,7 +401,7 @@ func TestHealthHook_SystemNoop(t *testing.T) { ci.Parallel(t) alloc := mock.SystemAlloc() - h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil) + h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), nil, nil, nil, nil) // Assert that it's the noop impl _, ok := h.(noopAllocHealthWatcherHook) @@ -418,15 +423,9 @@ func TestHealthHook_BatchNoop(t *testing.T) { ci.Parallel(t) alloc := mock.BatchAlloc() - h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil) + h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), nil, nil, nil, nil) // Assert that it's the noop impl _, ok := h.(noopAllocHealthWatcherHook) require.True(t, ok) } - -func taskEnvBuilderFactory(alloc *structs.Allocation) func() *taskenv.Builder { - return func() *taskenv.Builder { - return taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) - } -} diff --git a/client/allocrunner/identity_hook.go b/client/allocrunner/identity_hook.go index ea34b8ee8..78a3528d0 100644 --- a/client/allocrunner/identity_hook.go +++ b/client/allocrunner/identity_hook.go @@ -6,6 +6,7 @@ package allocrunner import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/widmgr" ) @@ -34,7 +35,7 @@ func (*identityHook) Name() string { return "identity" } -func (h *identityHook) Prerun() error { +func (h *identityHook) Prerun(_ *taskenv.TaskEnv) error { // run the renewal if err := h.widmgr.Run(); err != nil { return err diff --git a/client/allocrunner/identity_hook_test.go b/client/allocrunner/identity_hook_test.go index 0d49087e5..8825c0133 100644 --- a/client/allocrunner/identity_hook_test.go +++ b/client/allocrunner/identity_hook_test.go @@ -40,13 +40,11 @@ func TestIdentityHook_Prerun(t *testing.T) { logger := testlog.HCLogger(t) db := cstate.NewMemDB(logger) - - // the WIDMgr env builder never has the task available - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") + env := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() // setup mock signer and WIDMgr mockSigner := widmgr.NewMockWIDSigner(task.Identities) - mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder) + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, env) allocrunner.widmgr = mockWIDMgr allocrunner.widsigner = mockSigner @@ -65,7 +63,7 @@ func TestIdentityHook_Prerun(t *testing.T) { start := time.Now() hook := newIdentityHook(logger, mockWIDMgr) must.Eq(t, hook.Name(), "identity") - must.NoError(t, hook.Prerun()) + must.NoError(t, hook.Prerun(env)) time.Sleep(time.Second) // give goroutines a moment to run sid, err := hook.widmgr.Get(structs.WIHandle{ diff --git a/client/allocrunner/interfaces/runner_lifecycle.go b/client/allocrunner/interfaces/runner_lifecycle.go index e0ad1c64c..d9a88377f 100644 --- a/client/allocrunner/interfaces/runner_lifecycle.go +++ b/client/allocrunner/interfaces/runner_lifecycle.go @@ -4,6 +4,7 @@ package interfaces import ( + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" ) @@ -16,7 +17,7 @@ type RunnerHook interface { // non-terminal allocations. Terminal allocations do *not* call prerun. type RunnerPrerunHook interface { RunnerHook - Prerun() error + Prerun(*taskenv.TaskEnv) error } // A RunnerPreKillHook is executed inside of KillTasks before @@ -55,7 +56,8 @@ type RunnerUpdateHook interface { } type RunnerUpdateRequest struct { - Alloc *structs.Allocation + Alloc *structs.Allocation + AllocEnv *taskenv.TaskEnv } // A RunnerTaskRestartHook is executed just before the allocation runner is diff --git a/client/allocrunner/migrate_hook.go b/client/allocrunner/migrate_hook.go index 113b9c9da..c958b9276 100644 --- a/client/allocrunner/migrate_hook.go +++ b/client/allocrunner/migrate_hook.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/taskenv" ) // diskMigrationHook migrates ephemeral disk volumes. Depends on alloc dir @@ -41,7 +42,7 @@ func (h *diskMigrationHook) Name() string { return "migrate_disk" } -func (h *diskMigrationHook) Prerun() error { +func (h *diskMigrationHook) Prerun(_ *taskenv.TaskEnv) error { ctx := context.TODO() // Wait for a previous alloc - if any - to terminate diff --git a/client/allocrunner/network_hook.go b/client/allocrunner/network_hook.go index 13384dd9e..472cc5ad4 100644 --- a/client/allocrunner/network_hook.go +++ b/client/allocrunner/network_hook.go @@ -79,9 +79,6 @@ type networkHook struct { // the alloc network has been created networkConfigurator NetworkConfigurator - // taskEnv is used to perform interpolation within the network blocks. - taskEnv *taskenv.TaskEnv - logger hclog.Logger } @@ -91,7 +88,6 @@ func newNetworkHook(logger hclog.Logger, netManager drivers.DriverNetworkManager, netConfigurator NetworkConfigurator, networkStatusSetter networkStatusSetter, - taskEnv *taskenv.TaskEnv, ) *networkHook { return &networkHook{ isolationSetter: ns, @@ -99,7 +95,6 @@ func newNetworkHook(logger hclog.Logger, alloc: alloc, manager: netManager, networkConfigurator: netConfigurator, - taskEnv: taskEnv, logger: logger, } } @@ -114,7 +109,7 @@ func (h *networkHook) Name() string { return "network" } -func (h *networkHook) Prerun() error { +func (h *networkHook) Prerun(allocEnv *taskenv.TaskEnv) error { tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup) if len(tg.Networks) == 0 || tg.Networks[0].Mode == "host" || tg.Networks[0].Mode == "" { return nil @@ -126,7 +121,7 @@ func (h *networkHook) Prerun() error { } // Perform our networks block interpolation. - interpolatedNetworks := taskenv.InterpolateNetworks(h.taskEnv, tg.Networks) + interpolatedNetworks := taskenv.InterpolateNetworks(allocEnv, tg.Networks) // Interpolated values need to be validated. It is also possible a user // supplied hostname avoids the validation on job registrations because it diff --git a/client/allocrunner/network_hook_test.go b/client/allocrunner/network_hook_test.go index a8ec11fa7..bb1bd2dcd 100644 --- a/client/allocrunner/network_hook_test.go +++ b/client/allocrunner/network_hook_test.go @@ -82,10 +82,11 @@ func TestNetworkHook_Prerun_Postrun_group(t *testing.T) { expectedStatus: nil, } - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) logger := testlog.HCLogger(t) - hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter, envBuilder.Build()) - must.NoError(t, hook.Prerun()) + hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() + + must.NoError(t, hook.Prerun(env)) must.True(t, setter.called) must.False(t, destroyCalled) must.NoError(t, hook.Postrun()) @@ -125,10 +126,11 @@ func TestNetworkHook_Prerun_Postrun_host(t *testing.T) { expectedStatus: nil, } - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) logger := testlog.HCLogger(t) - hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter, envBuilder.Build()) - must.NoError(t, hook.Prerun()) + hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() + + must.NoError(t, hook.Prerun(env)) must.False(t, setter.called) must.False(t, destroyCalled) must.NoError(t, hook.Postrun()) @@ -184,8 +186,7 @@ func TestNetworkHook_Prerun_Postrun_ExistingNetNS(t *testing.T) { cni: fakePlugin, nsOpts: &nsOpts{}, } - - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() testCases := []struct { name string @@ -251,9 +252,9 @@ func TestNetworkHook_Prerun_Postrun_ExistingNetNS(t *testing.T) { fakePlugin.checkErrors = tc.checkErrs configurator.nodeAttrs["plugins.cni.version.bridge"] = tc.cniVersion hook := newNetworkHook(testlog.HCLogger(t), isolationSetter, - alloc, nm, configurator, statusSetter, envBuilder.Build()) + alloc, nm, configurator, statusSetter) - err := hook.Prerun() + err := hook.Prerun(env) if tc.expectPrerunError == "" { must.NoError(t, err) } else { diff --git a/client/allocrunner/taskrunner/identity_hook_test.go b/client/allocrunner/taskrunner/identity_hook_test.go index d3d7c9343..600bcdcfa 100644 --- a/client/allocrunner/taskrunner/identity_hook_test.go +++ b/client/allocrunner/taskrunner/identity_hook_test.go @@ -92,9 +92,9 @@ func TestIdentityHook_RenewAll(t *testing.T) { logger := testlog.HCLogger(t) db := cstate.NewMemDB(logger) mockSigner := widmgr.NewMockWIDSigner(task.Identities) - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") + allocEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() - mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder) + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, allocEnv) mockWIDMgr.SetMinWait(time.Second) // fast renewals, because the default is 10s mockLifecycle := trtesting.NewMockTaskHooks() @@ -212,8 +212,8 @@ func TestIdentityHook_RenewOne(t *testing.T) { logger := testlog.HCLogger(t) db := cstate.NewMemDB(logger) mockSigner := widmgr.NewMockWIDSigner(task.Identities) - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") - mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder) + allocEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, allocEnv) mockWIDMgr.SetMinWait(time.Second) // fast renewals, because the default is 10s h := &identityHook{ diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index c922e1db4..89c1dd914 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -148,9 +148,9 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri if vault != nil { vaultFunc = func(_ string) (vaultclient.VaultClient, error) { return vault, nil } } - // the envBuilder for the WIDMgr never has access to the task, so don't - // include it here - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") + // the env for the WIDMgr never has access to the task, so don't include it + // here + allocEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() conf := &Config{ Alloc: alloc, @@ -171,7 +171,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri ServiceRegWrapper: wrapperMock, Getter: getter.TestSandbox(t), Wranglers: proclib.MockWranglers(t), - WIDMgr: widmgr.NewWIDMgr(widsigner, alloc, db, logger, envBuilder), + WIDMgr: widmgr.NewWIDMgr(widsigner, alloc, db, logger, allocEnv), AllocHookResources: cstructs.NewAllocHookResources(), } diff --git a/client/allocrunner/taskrunner/vault_hook_test.go b/client/allocrunner/taskrunner/vault_hook_test.go index 1cf243c7a..7e4be5e40 100644 --- a/client/allocrunner/taskrunner/vault_hook_test.go +++ b/client/allocrunner/taskrunner/vault_hook_test.go @@ -99,8 +99,8 @@ func setupTestVaultHook(t *testing.T, config *vaultHookConfig) *vaultHook { if config.widmgr == nil { db := cstate.NewMemDB(config.logger) signer := widmgr.NewMockWIDSigner(config.task.Identities) - envBuilder := taskenv.NewBuilder(mock.Node(), config.alloc, nil, "global") - config.widmgr = widmgr.NewWIDMgr(signer, config.alloc, db, config.logger, envBuilder) + allocEnv := taskenv.NewBuilder(mock.Node(), config.alloc, nil, "global").Build() + config.widmgr = widmgr.NewWIDMgr(signer, config.alloc, db, config.logger, allocEnv) err := config.widmgr.Run() must.NoError(t, err) } diff --git a/client/allocrunner/upstream_allocs_hook.go b/client/allocrunner/upstream_allocs_hook.go index d990ed82b..a3b12cea5 100644 --- a/client/allocrunner/upstream_allocs_hook.go +++ b/client/allocrunner/upstream_allocs_hook.go @@ -9,6 +9,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/taskenv" ) // upstreamAllocsHook waits for a PrevAllocWatcher to exit before allowing @@ -33,7 +34,7 @@ func (h *upstreamAllocsHook) Name() string { return "await_previous_allocations" } -func (h *upstreamAllocsHook) Prerun() error { +func (h *upstreamAllocsHook) Prerun(_ *taskenv.TaskEnv) error { // Wait for a previous alloc - if any - to terminate return h.allocWatcher.Wait(context.Background()) } diff --git a/client/taskenv/env.go b/client/taskenv/env.go index 02f78de38..9a8ab9e97 100644 --- a/client/taskenv/env.go +++ b/client/taskenv/env.go @@ -5,6 +5,7 @@ package taskenv import ( "fmt" + "maps" "net" "os" "path/filepath" @@ -248,6 +249,32 @@ func (t *TaskEnv) All() map[string]string { return m } +// WithTask returns a shallow copy of the TaskEnv, with the EnvMap deep-cloned +// and overwritten by task provided. This is only for use in the allocrunner +// hooks which may need to interpolate per-task services or identities, as it +// doesn't re-populate the rest of the environment +func (t *TaskEnv) WithTask(alloc *structs.Allocation, task *structs.Task) *TaskEnv { + if t == nil { + return t + } + newT := new(TaskEnv) + *newT = *t + newT.envList = []string{} + newT.EnvMap = maps.Clone(t.EnvMap) + + combined := alloc.Job.CombinedTaskMeta(alloc.TaskGroup, task.Name) + for k, v := range combined { + newT.EnvMap[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v + newT.EnvMap[fmt.Sprintf("%s%s", MetaPrefix, k)] = v + } + + for k, v := range task.Env { + newT.EnvMap[k] = v + } + newT.EnvMap[TaskName] = task.Name + return newT +} + // AllValues is a map of the task's environment variables and the node's // attributes with cty.Value (String) values. Errors including keys are // returned in a map by key name. @@ -658,11 +685,12 @@ func (b *Builder) buildEnv(allocDir, localDir, secretsDir string, // Build must be called after all the tasks environment values have been set. func (b *Builder) Build() *TaskEnv { - nodeAttrs := make(map[string]string) b.mu.RLock() defer b.mu.RUnlock() + nodeAttrs := make(map[string]string) + if b.region != "" { // Copy region over to node attrs nodeAttrs[nodeRegionKey] = b.region @@ -678,13 +706,6 @@ func (b *Builder) Build() *TaskEnv { return NewTaskEnv(envMap, envMapClient, deviceEnvs, nodeAttrs, b.clientTaskRoot, b.clientSharedAllocDir) } -// UpdateTask updates the environment based on a new alloc and task. -func (b *Builder) UpdateTask(alloc *structs.Allocation, task *structs.Task) *Builder { - b.mu.Lock() - defer b.mu.Unlock() - return b.setTask(task).setAlloc(alloc) -} - // SetHookEnv sets environment variables from a hook. Variables are // Last-Write-Wins, so if a hook writes a variable that's also written by a // later hook, the later hooks value always gets used. @@ -851,6 +872,9 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { // setNode is called from NewBuilder to populate node attributes. func (b *Builder) setNode(n *structs.Node) *Builder { + if n == nil { + return b + } b.nodeAttrs = make(map[string]string, 4+len(n.Attributes)+len(n.Meta)) b.nodeAttrs[nodeIdKey] = n.ID b.nodeAttrs[nodeNameKey] = n.Name diff --git a/client/taskenv/env_test.go b/client/taskenv/env_test.go index 8386962ec..8df01fb72 100644 --- a/client/taskenv/env_test.go +++ b/client/taskenv/env_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" + "github.com/shoenig/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -707,54 +708,41 @@ func TestEnvironment_DashesInTaskName(t *testing.T) { } } -// TestEnvironment_UpdateTask asserts env vars and task meta are updated when a -// task is updated. -func TestEnvironment_UpdateTask(t *testing.T) { +// TestEnvironment_WithTask asserts env vars are updated in an +// alloc environment when a task is updated. +func TestEnvironment_WithTask(t *testing.T) { ci.Parallel(t) a := mock.Alloc() a.Job.TaskGroups[0].Meta = map[string]string{"tgmeta": "tgmetaval"} - task := a.Job.TaskGroups[0].Tasks[0] - task.Name = "orig" - task.Env = map[string]string{"env": "envval"} - task.Meta = map[string]string{"taskmeta": "taskmetaval"} - builder := NewBuilder(mock.Node(), a, task, "global") + builder := NewBuilder(mock.Node(), a, nil, "global") origMap := builder.Build().Map() - if origMap["NOMAD_TASK_NAME"] != "orig" { - t.Errorf("Expected NOMAD_TASK_NAME=orig but found %q", origMap["NOMAD_TASK_NAME"]) - } - if origMap["NOMAD_META_taskmeta"] != "taskmetaval" { - t.Errorf("Expected NOMAD_META_taskmeta=taskmetaval but found %q", origMap["NOMAD_META_taskmeta"]) - } - if origMap["env"] != "envval" { - t.Errorf("Expected env=envva but found %q", origMap["env"]) - } - if origMap["NOMAD_META_tgmeta"] != "tgmetaval" { - t.Errorf("Expected NOMAD_META_tgmeta=tgmetaval but found %q", origMap["NOMAD_META_tgmeta"]) - } + test.Eq(t, "web", origMap["NOMAD_GROUP_NAME"]) + test.Eq(t, "armon", origMap["NOMAD_META_owner"]) + test.Eq(t, "", origMap["NOMAD_META_taskmeta"]) - a.Job.TaskGroups[0].Meta = map[string]string{"tgmeta2": "tgmetaval2"} - task.Name = "new" + task := a.Job.TaskGroups[0].Tasks[0] + task.Name = "task1" + task.Env = map[string]string{"env": "envval"} + task.Meta = map[string]string{"taskmeta": "taskmetaval"} + + newMap1 := builder.Build().WithTask(a, task).Map() + test.Eq(t, "task1", newMap1["NOMAD_TASK_NAME"]) + test.Eq(t, "taskmetaval", newMap1["NOMAD_META_taskmeta"]) + test.Eq(t, "envval", newMap1["env"]) + + task.Name = "task2" task.Env = map[string]string{"env2": "envval2"} task.Meta = map[string]string{"taskmeta2": "taskmetaval2"} - newMap := builder.UpdateTask(a, task).Build().Map() - if newMap["NOMAD_TASK_NAME"] != "new" { - t.Errorf("Expected NOMAD_TASK_NAME=new but found %q", newMap["NOMAD_TASK_NAME"]) - } - if newMap["NOMAD_META_taskmeta2"] != "taskmetaval2" { - t.Errorf("Expected NOMAD_META_taskmeta=taskmetaval but found %q", newMap["NOMAD_META_taskmeta2"]) - } - if newMap["env2"] != "envval2" { - t.Errorf("Expected env=envva but found %q", newMap["env2"]) - } - if newMap["NOMAD_META_tgmeta2"] != "tgmetaval2" { - t.Errorf("Expected NOMAD_META_tgmeta=tgmetaval but found %q", newMap["NOMAD_META_tgmeta2"]) - } - if v, ok := newMap["NOMAD_META_taskmeta"]; ok { - t.Errorf("Expected NOMAD_META_taskmeta to be unset but found: %q", v) - } + // original env should not have been mutated + newMap2 := builder.Build().WithTask(a, task).Map() + test.Eq(t, "task2", newMap2["NOMAD_TASK_NAME"]) + test.Eq(t, "taskmetaval2", newMap2["NOMAD_META_taskmeta2"]) + test.Eq(t, "envval2", newMap2["env2"]) + test.Eq(t, "", newMap2["NOMAD_META_taskmeta"]) + test.Eq(t, "", newMap2["env"]) } // TestEnvironment_InterpolateEmptyOptionalMeta asserts that in a parameterized diff --git a/client/widmgr/widmgr.go b/client/widmgr/widmgr.go index c0a043d6f..888befe96 100644 --- a/client/widmgr/widmgr.go +++ b/client/widmgr/widmgr.go @@ -54,12 +54,10 @@ type WIDMgr struct { logger hclog.Logger } -func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, db cstate.StateDB, logger hclog.Logger, envBuilder *taskenv.Builder) *WIDMgr { +func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, db cstate.StateDB, logger hclog.Logger, allocEnv *taskenv.TaskEnv) *WIDMgr { widspecs := map[structs.WIHandle]*structs.WorkloadIdentity{} tg := a.Job.LookupTaskGroup(a.TaskGroup) - allocEnv := envBuilder.Build() - for _, service := range tg.Services { if service.Identity != nil { handle := *service.IdentityHandle(allocEnv.ReplaceEnv) @@ -74,7 +72,7 @@ func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, db cstate.StateDB, } // update the builder for this task - taskEnv := envBuilder.UpdateTask(a, task).Build() + taskEnv := allocEnv.WithTask(a, task) for _, service := range task.Services { if service.Identity != nil { handle := *service.IdentityHandle(taskEnv.ReplaceEnv) diff --git a/client/widmgr/widmgr_test.go b/client/widmgr/widmgr_test.go index 8133840a6..9b9c9777f 100644 --- a/client/widmgr/widmgr_test.go +++ b/client/widmgr/widmgr_test.go @@ -30,10 +30,10 @@ func TestWIDMgr_Restore(t *testing.T) { } alloc.Job.TaskGroups[0].Tasks[0].Services[0].Identity = widSpecs[0] alloc.Job.TaskGroups[0].Tasks[0].Identities = widSpecs[1:] - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") + env := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() signer := NewMockWIDSigner(widSpecs) - mgr := NewWIDMgr(signer, alloc, db, logger, envBuilder) + mgr := NewWIDMgr(signer, alloc, db, logger, env) // restore, but we haven't previously saved to the db hasExpired, err := mgr.restoreStoredIdentities() @@ -54,7 +54,7 @@ func TestWIDMgr_Restore(t *testing.T) { widSpecs[2].TTL = time.Second signer.setWIDs(widSpecs) - wiHandle := service.IdentityHandle(envBuilder.Build().ReplaceEnv) + wiHandle := service.IdentityHandle(env.ReplaceEnv) mgr.widSpecs[*wiHandle].TTL = time.Second // force a re-sign to re-populate the lastToken and save to the db diff --git a/nomad/job_endpoint_hook_connect.go b/nomad/job_endpoint_hook_connect.go index 1985ac9c0..ab8e54bab 100644 --- a/nomad/job_endpoint_hook_connect.go +++ b/nomad/job_endpoint_hook_connect.go @@ -273,10 +273,10 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error { // This should only be used to interpolate connect service names which are // used in sidecar or gateway task names. Note that the service name might // also be interpolated with job specifics during service canonicalization. - env := taskenv.NewEmptyBuilder().UpdateTask(&structs.Allocation{ + env := taskenv.NewBuilder(nil, &structs.Allocation{ Job: job, TaskGroup: g.Name, - }, nil).Build() + }, nil, job.Region).Build() for _, service := range g.Services { switch {