diff --git a/.changelog/25373.txt b/.changelog/25373.txt new file mode 100644 index 000000000..1b908528a --- /dev/null +++ b/.changelog/25373.txt @@ -0,0 +1,11 @@ +```release-note:improvement +client: Improve memory usage by dropping references to task environment +``` + +```release-note:bug +services: Fixed a bug where Nomad native services would not be correctly interpolated during in-place updates +``` + +```release-note:bug +services: Fixed a bug where task-level services, checks, and identities could interpolate jobspec values from other tasks in the same group +``` diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go index 3273a1562..a6cb88045 100644 --- a/client/allochealth/tracker.go +++ b/client/allochealth/tracker.go @@ -119,7 +119,7 @@ func NewTracker( logger hclog.Logger, alloc *structs.Allocation, allocUpdates *cstructs.AllocListener, - taskEnvBuilder *taskenv.Builder, + allocEnv *taskenv.TaskEnv, consulClient serviceregistration.Handler, checkStore checkstore.Shim, minHealthyTime time.Duration, @@ -145,7 +145,7 @@ func NewTracker( // first because taskEnvBuilder is mutated in every loop and we can't undo // a call to UpdateTask(). t.taskEnvs = make(map[string]*taskenv.TaskEnv, len(t.tg.Tasks)+1) - t.taskEnvs[""] = taskEnvBuilder.Build() + t.taskEnvs[""] = allocEnv t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks)) for _, task := range t.tg.Tasks { @@ -155,7 +155,7 @@ func NewTracker( t.lifecycleTasks[task.Name] = task.Lifecycle.Hook } - t.taskEnvs[task.Name] = taskEnvBuilder.UpdateTask(alloc, task).Build() + t.taskEnvs[task.Name] = allocEnv.WithTask(alloc, task) c, n := countChecks(task.Services) t.consulCheckCount += c diff --git a/client/allochealth/tracker_test.go b/client/allochealth/tracker_test.go index bafce2d5b..789291836 100644 --- a/client/allochealth/tracker_test.go +++ b/client/allochealth/tracker_test.go @@ -180,9 +180,9 @@ func TestTracker_ConsulChecks_Interpolation(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -253,9 +253,9 @@ func TestTracker_ConsulChecks_Healthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -306,9 +306,9 @@ func TestTracker_NomadChecks_Healthy(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -375,9 +375,9 @@ func TestTracker_NomadChecks_Unhealthy(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -436,9 +436,9 @@ func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -479,9 +479,9 @@ func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -560,9 +560,9 @@ func TestTracker_ConsulChecks_Unhealthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -641,9 +641,9 @@ func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond minHealthyTime := 2 * time.Second - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, minHealthyTime, true) tracker.checkLookupInterval = checkInterval assertChecksHealth := func(exp bool) { @@ -732,9 +732,9 @@ func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval assertChecksHealth := func(exp bool) { @@ -785,8 +785,8 @@ func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) - tracker := NewTracker(ctx, logger, alloc, nil, taskEnvBuilder, nil, nil, time.Millisecond, true) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() + tracker := NewTracker(ctx, logger, alloc, nil, env, nil, nil, time.Millisecond, true) assertNoHealth := func() { require.NoError(t, tracker.ctx.Err()) @@ -895,9 +895,9 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -1042,9 +1042,9 @@ func TestTracker_ConsulChecks_OnUpdate(t *testing.T) { checks := checkstore.NewStore(logger, state.NewMemDB(logger)) checkInterval := 10 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true) tracker.checkLookupInterval = checkInterval tracker.Start() @@ -1162,9 +1162,9 @@ func TestTracker_NomadChecks_OnUpdate(t *testing.T) { consul := regmock.NewServiceRegistrationHandler(logger) minHealthyTime := 1 * time.Millisecond - taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true) + tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, minHealthyTime, true) tracker.checkLookupInterval = 10 * time.Millisecond tracker.Start() diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 9809d9b30..478cff439 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -297,17 +297,15 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e ar.shutdownDelayCtx = shutdownDelayCtx ar.shutdownDelayCancelFn = shutdownDelayCancel - // Create a *taskenv.Builder for the allocation so the WID manager can - // interpolate services with the allocation and tasks as needed - envBuilder := taskenv.NewBuilder( + allocEnv := taskenv.NewBuilder( config.ClientConfig.Node, ar.Alloc(), nil, config.ClientConfig.Region, - ).SetAllocDir(ar.allocDir.AllocDirPath()) + ).SetAllocDir(ar.allocDir.AllocDirPath()).Build() // initialize the workload identity manager - widmgr := widmgr.NewWIDMgr(ar.widsigner, alloc, ar.stateDB, ar.logger, envBuilder) + widmgr := widmgr.NewWIDMgr(ar.widsigner, alloc, ar.stateDB, ar.logger, allocEnv) ar.widmgr = widmgr // Initialize the runners hooks. diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index be8e082d2..8f7493e21 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -103,21 +103,6 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { return fmt.Errorf("failed to initialize network configurator: %v", err) } - // Create a new taskenv.Builder which is used by hooks that mutate them to - // build new taskenv.TaskEnv. - newEnvBuilder := func() *taskenv.Builder { - return taskenv.NewBuilder( - config.Node, - ar.Alloc(), - nil, - config.Region, - ).SetAllocDir(ar.allocDir.AllocDirPath()) - } - - // Create a *taskenv.TaskEnv which is used for read only purposes by the - // newNetworkHook and newChecksHook. - builtTaskEnv := newEnvBuilder().Build() - // Create the alloc directory hook. This is run first to ensure the // directory path exists for other hooks. alloc := ar.Alloc() @@ -132,21 +117,19 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { consulConfigs: ar.clientConfig.GetConsulConfigs(hookLogger), consulClientConstructor: consul.NewConsulClientFactory(config), hookResources: ar.hookResources, - envBuilder: newEnvBuilder, logger: hookLogger, }), newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher), newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir), newCPUPartsHook(hookLogger, ar.partitions, alloc), - newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulServicesHandler, ar.checkStore), - newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv), + newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulServicesHandler, ar.checkStore), + newNetworkHook(hookLogger, ns, alloc, nm, nc, ar), newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, providerNamespace: alloc.ServiceProviderNamespace(), serviceRegWrapper: ar.serviceRegWrapper, hookResources: ar.hookResources, restarter: ar, - taskEnvBuilder: newEnvBuilder(), networkStatus: ar, logger: hookLogger, shutdownDelayCtx: ar.shutdownDelayCtx, @@ -156,7 +139,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.GetConsulConfigs(ar.logger)), newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID), - newChecksHook(hookLogger, alloc, ar.checkStore, ar, builtTaskEnv), + newChecksHook(hookLogger, alloc, ar.checkStore, ar), } if config.ExtraAllocHooks != nil { ar.runnerHooks = append(ar.runnerHooks, config.ExtraAllocHooks...) @@ -176,6 +159,13 @@ func (ar *allocRunner) prerun() error { }() } + allocEnv := taskenv.NewBuilder( + ar.clientConfig.GetNode(), + ar.Alloc(), + nil, + ar.clientConfig.Region, + ).SetAllocDir(ar.allocDir.AllocDirPath()).Build() + for _, hook := range ar.runnerHooks { pre, ok := hook.(interfaces.RunnerPrerunHook) if !ok { @@ -197,7 +187,7 @@ func (ar *allocRunner) prerun() error { hookExecutionStart = time.Now() } - err := pre.Prerun() + err := pre.Prerun(allocEnv) ar.hookStatsHandler.Emit(hookExecutionStart, name, "prerun", err) if err != nil { return fmt.Errorf("pre-run hook %q failed: %v", name, err) @@ -224,8 +214,16 @@ func (ar *allocRunner) update(update *structs.Allocation) error { }() } + allocEnv := taskenv.NewBuilder( + ar.clientConfig.GetNode(), + ar.Alloc(), + nil, + ar.clientConfig.Region, + ).SetAllocDir(ar.allocDir.AllocDirPath()).Build() + req := &interfaces.RunnerUpdateRequest{ - Alloc: update, + Alloc: update, + AllocEnv: allocEnv, } var merr multierror.Error diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index bc69bcd8e..16cbf0f58 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/ci" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -303,6 +304,6 @@ type allocFailingPrestartHook struct{} func (*allocFailingPrestartHook) Name() string { return "failing_prestart" } -func (*allocFailingPrestartHook) Prerun() error { +func (*allocFailingPrestartHook) Prerun(_ *taskenv.TaskEnv) error { return fmt.Errorf("failing prestart hooks") } diff --git a/client/allocrunner/allocdir_hook.go b/client/allocrunner/allocdir_hook.go index e49b74ac4..7d1fa996b 100644 --- a/client/allocrunner/allocdir_hook.go +++ b/client/allocrunner/allocdir_hook.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" ) // allocDirHook creates and destroys the root directory and shared directories @@ -34,7 +35,7 @@ func (h *allocDirHook) Name() string { return "alloc_dir" } -func (h *allocDirHook) Prerun() error { +func (h *allocDirHook) Prerun(_ *taskenv.TaskEnv) error { return h.allocDir.Build() } diff --git a/client/allocrunner/checks_hook.go b/client/allocrunner/checks_hook.go index bb6cea16f..51a5a347c 100644 --- a/client/allocrunner/checks_hook.go +++ b/client/allocrunner/checks_hook.go @@ -83,7 +83,6 @@ type checksHook struct { shim checkstore.Shim checker checks.Checker allocID string - taskEnv *taskenv.TaskEnv // fields that get re-initialized on allocation update lock sync.RWMutex @@ -98,7 +97,6 @@ func newChecksHook( alloc *structs.Allocation, shim checkstore.Shim, network structs.NetworkStatus, - taskEnv *taskenv.TaskEnv, ) *checksHook { h := &checksHook{ logger: logger.Named(checksHookName), @@ -107,7 +105,6 @@ func newChecksHook( shim: shim, network: network, checker: checks.New(logger), - taskEnv: taskEnv, } h.initialize(alloc) return h @@ -210,7 +207,7 @@ func (h *checksHook) Name() string { return checksHookName } -func (h *checksHook) Prerun() error { +func (h *checksHook) Prerun(allocEnv *taskenv.TaskEnv) error { h.lock.Lock() defer h.lock.Unlock() @@ -219,7 +216,8 @@ func (h *checksHook) Prerun() error { return nil } - interpolatedServices := taskenv.InterpolateServices(h.taskEnv, group.NomadServices()) + interpolatedServices := taskenv.InterpolateServices( + allocEnv, group.NomadServices()) // create and start observers of nomad service checks in alloc h.observe(h.alloc, interpolatedServices) @@ -237,11 +235,12 @@ func (h *checksHook) Update(request *interfaces.RunnerUpdateRequest) error { } // get all group and task level services using nomad provider - services := group.NomadServices() + interpolatedServices := taskenv.InterpolateServices( + request.AllocEnv, group.NomadServices()) // create a set of the updated set of checks next := make([]structs.CheckID, 0, len(h.observers)) - for _, service := range services { + for _, service := range interpolatedServices { for _, check := range service.Checks { next = append(next, structs.NomadCheckID( request.Alloc.ID, @@ -267,7 +266,7 @@ func (h *checksHook) Update(request *interfaces.RunnerUpdateRequest) error { h.alloc = request.Alloc // ensure we are observing new checks (idempotent) - h.observe(request.Alloc, services) + h.observe(request.Alloc, interpolatedServices) return nil } diff --git a/client/allocrunner/checks_hook_test.go b/client/allocrunner/checks_hook_test.go index bb7355f67..bfaa12ea8 100644 --- a/client/allocrunner/checks_hook_test.go +++ b/client/allocrunner/checks_hook_test.go @@ -164,15 +164,15 @@ func TestCheckHook_Checks_ResultsSet(t *testing.T) { alloc := allocWithNomadChecks(addr, port, tc.onGroup) - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - h := newChecksHook(logger, alloc, checkStore, network, envBuilder.Build()) + h := newChecksHook(logger, alloc, checkStore, network) // initialize is called; observers are created but not started yet must.MapEmpty(t, h.observers) // calling pre-run starts the observers - err := h.Prerun() + err := h.Prerun(env) must.NoError(t, err) testutil.WaitForResultUntil( @@ -231,12 +231,12 @@ func TestCheckHook_Checks_UpdateSet(t *testing.T) { alloc := allocWithNomadChecks(addr, port, true) - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() - h := newChecksHook(logger, alloc, shim, network, envBuilder.Build()) + h := newChecksHook(logger, alloc, shim, network) // calling pre-run starts the observers - err := h.Prerun() + err := h.Prerun(env) must.NoError(t, err) // initial set of checks @@ -269,8 +269,12 @@ func TestCheckHook_Checks_UpdateSet(t *testing.T) { }, ) + updatedAlloc := allocWithDifferentNomadChecks(alloc.ID, addr, port) + updatedEnv := taskenv.NewBuilder(mock.Node(), updatedAlloc, nil, alloc.Job.Region).Build() + request := &interfaces.RunnerUpdateRequest{ - Alloc: allocWithDifferentNomadChecks(alloc.ID, addr, port), + Alloc: updatedAlloc, + AllocEnv: updatedEnv, } err = h.Update(request) diff --git a/client/allocrunner/consul_grpc_sock_hook.go b/client/allocrunner/consul_grpc_sock_hook.go index 50dc3f1bd..0cc53e981 100644 --- a/client/allocrunner/consul_grpc_sock_hook.go +++ b/client/allocrunner/consul_grpc_sock_hook.go @@ -21,6 +21,7 @@ import ( "github.com/hashicorp/go-set/v3" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" ) @@ -130,7 +131,7 @@ func (h *consulGRPCSocketHook) shouldRun() bool { return false } -func (h *consulGRPCSocketHook) Prerun() error { +func (h *consulGRPCSocketHook) Prerun(_ *taskenv.TaskEnv) error { h.mu.Lock() defer h.mu.Unlock() diff --git a/client/allocrunner/consul_grpc_sock_hook_test.go b/client/allocrunner/consul_grpc_sock_hook_test.go index 8ed87f85e..91c82aa87 100644 --- a/client/allocrunner/consul_grpc_sock_hook_test.go +++ b/client/allocrunner/consul_grpc_sock_hook_test.go @@ -50,7 +50,7 @@ func TestConsulGRPCSocketHook_PrerunPostrun_Ok(t *testing.T) { // Start the unix socket proxy h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{}) - require.NoError(t, h.Prerun()) + require.NoError(t, h.Prerun(nil)) gRPCSock := filepath.Join(allocDir.AllocDir, allocdir.AllocGRPCSocket) envoyConn, err := net.Dial("unix", gRPCSock) @@ -125,7 +125,7 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) { // An alloc without a Connect proxy sidecar should not return // an error. h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{}) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(nil)) // Postrun should be a noop must.NoError(t, h.Postrun()) @@ -135,7 +135,7 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) { // An alloc *with* a Connect proxy sidecar *should* return an error // when Consul is not configured. h := newConsulGRPCSocketHook(logger, connectAlloc, allocDir, consulConfigs, map[string]string{}) - must.ErrorContains(t, h.Prerun(), `consul address for cluster "" must be set on nomad client`) + must.ErrorContains(t, h.Prerun(nil), `consul address for cluster "" must be set on nomad client`) // Postrun should be a noop must.NoError(t, h.Postrun()) @@ -145,7 +145,7 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) { // Updating an alloc without a sidecar to have a sidecar should // error when the sidecar is added. h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{}) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(nil)) req := &interfaces.RunnerUpdateRequest{ Alloc: connectAlloc, diff --git a/client/allocrunner/consul_hook.go b/client/allocrunner/consul_hook.go index f47b9f992..fa282b749 100644 --- a/client/allocrunner/consul_hook.go +++ b/client/allocrunner/consul_hook.go @@ -27,7 +27,6 @@ type consulHook struct { consulConfigs map[string]*structsc.ConsulConfig consulClientConstructor consul.ConsulClientFunc hookResources *cstructs.AllocHookResources - envBuilder *taskenv.Builder logger log.Logger shutdownCtx context.Context @@ -48,9 +47,6 @@ type consulHookConfig struct { // hookResources is used for storing and retrieving Consul tokens hookResources *cstructs.AllocHookResources - // envBuilder is used to interpolate services - envBuilder func() *taskenv.Builder - logger log.Logger } @@ -63,7 +59,6 @@ func newConsulHook(cfg consulHookConfig) *consulHook { consulConfigs: cfg.consulConfigs, consulClientConstructor: cfg.consulClientConstructor, hookResources: cfg.hookResources, - envBuilder: cfg.envBuilder(), shutdownCtx: shutdownCtx, shutdownCancelFn: shutdownCancelFn, } @@ -83,7 +78,7 @@ func (*consulHook) Name() string { return "consul" } -func (h *consulHook) Prerun() error { +func (h *consulHook) Prerun(allocEnv *taskenv.TaskEnv) error { job := h.alloc.Job if job == nil { @@ -102,12 +97,12 @@ func (h *consulHook) Prerun() error { } var mErr *multierror.Error - if err := h.prepareConsulTokensForServices(tg.Services, tg, tokens, h.envBuilder.Build()); err != nil { + if err := h.prepareConsulTokensForServices(tg.Services, tg, tokens, allocEnv); err != nil { mErr = multierror.Append(mErr, err) } for _, task := range tg.Tasks { - h.envBuilder.UpdateTask(h.alloc, task) - if err := h.prepareConsulTokensForServices(task.Services, tg, tokens, h.envBuilder.Build()); err != nil { + taskEnv := allocEnv.WithTask(h.alloc, task) + if err := h.prepareConsulTokensForServices(task.Services, tg, tokens, taskEnv); err != nil { mErr = multierror.Append(mErr, err) } if err := h.prepareConsulTokensForTask(task, tg, tokens); err != nil { diff --git a/client/allocrunner/consul_hook_test.go b/client/allocrunner/consul_hook_test.go index c24b915e4..38df31766 100644 --- a/client/allocrunner/consul_hook_test.go +++ b/client/allocrunner/consul_hook_test.go @@ -58,9 +58,9 @@ func consulHookTestHarness(t *testing.T) *consulHook { db := cstate.NewMemDB(logger) // the WIDMgr env builder never has the task available - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") + env := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() - mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder) + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, env) mockWIDMgr.SignForTesting() consulConfigs := map[string]*structsc.ConsulConfig{ @@ -68,9 +68,6 @@ func consulHookTestHarness(t *testing.T) *consulHook { } hookResources := cstructs.NewAllocHookResources() - envBuilderFn := func() *taskenv.Builder { - return taskenv.NewBuilder(mock.Node(), alloc, nil, "global") - } consulHookCfg := consulHookConfig{ alloc: alloc, @@ -79,7 +76,6 @@ func consulHookTestHarness(t *testing.T) *consulHook { consulConfigs: consulConfigs, consulClientConstructor: consul.NewMockConsulClient, hookResources: hookResources, - envBuilder: envBuilderFn, logger: logger, } return newConsulHook(consulHookCfg) @@ -184,8 +180,8 @@ func Test_consulHook_prepareConsulTokensForServices(t *testing.T) { hook := consulHookTestHarness(t) task := hook.alloc.LookupTask("web") services := task.Services - hook.envBuilder.UpdateTask(hook.alloc, task) - env := hook.envBuilder.Build() + env := taskenv.NewBuilder(mock.Node(), hook.alloc, task, "global"). + Build().WithTask(hook.alloc, task) hashedJWT := make(map[string]string) for _, s := range services { diff --git a/client/allocrunner/consul_http_sock_hook.go b/client/allocrunner/consul_http_sock_hook.go index 86278b768..09f7f8757 100644 --- a/client/allocrunner/consul_http_sock_hook.go +++ b/client/allocrunner/consul_http_sock_hook.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/go-set/v3" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" ) @@ -105,7 +106,7 @@ func (h *consulHTTPSockHook) shouldRun() bool { return false } -func (h *consulHTTPSockHook) Prerun() error { +func (h *consulHTTPSockHook) Prerun(_ *taskenv.TaskEnv) error { h.lock.Lock() defer h.lock.Unlock() diff --git a/client/allocrunner/consul_http_sock_hook_test.go b/client/allocrunner/consul_http_sock_hook_test.go index a6a31ec2c..d48ec40c2 100644 --- a/client/allocrunner/consul_http_sock_hook_test.go +++ b/client/allocrunner/consul_http_sock_hook_test.go @@ -39,7 +39,7 @@ func TestConsulSocketHook_PrerunPostrun_Ok(t *testing.T) { // start unix socket proxy h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfigs) - require.NoError(t, h.Prerun()) + require.NoError(t, h.Prerun(nil)) httpSocket := filepath.Join(allocDir.AllocDir, allocdir.AllocHTTPSocket) taskCon, err := net.Dial("unix", httpSocket) @@ -112,7 +112,7 @@ func TestConsulHTTPSocketHook_Prerun_Error(t *testing.T) { { // an alloc without a connect native task should not return an error h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfigs) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(nil)) // postrun should be a noop must.NoError(t, h.Postrun()) @@ -122,7 +122,7 @@ func TestConsulHTTPSocketHook_Prerun_Error(t *testing.T) { // an alloc with a native task should return an error when consul is not // configured h := newConsulHTTPSocketHook(logger, connectNativeAlloc, allocDir, consulConfigs) - must.ErrorContains(t, h.Prerun(), "consul address must be set on nomad client") + must.ErrorContains(t, h.Prerun(nil), "consul address must be set on nomad client") // Postrun should be a noop must.NoError(t, h.Postrun()) diff --git a/client/allocrunner/cpuparts_hook.go b/client/allocrunner/cpuparts_hook.go index b9e545b03..5df9bdb60 100644 --- a/client/allocrunner/cpuparts_hook.go +++ b/client/allocrunner/cpuparts_hook.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/idset" "github.com/hashicorp/nomad/client/lib/numalib/hw" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" ) @@ -53,7 +54,7 @@ var ( _ interfaces.RunnerPostrunHook = (*cpuPartsHook)(nil) ) -func (h *cpuPartsHook) Prerun() error { +func (h *cpuPartsHook) Prerun(_ *taskenv.TaskEnv) error { return h.partitions.Reserve(h.reservations) } diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 1f0fe1810..7afcb64b8 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" @@ -89,7 +90,7 @@ func (c *csiHook) Name() string { return "csi_hook" } -func (c *csiHook) Prerun() error { +func (c *csiHook) Prerun(_ *taskenv.TaskEnv) error { if !c.shouldRun() { return nil } diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 68f0d306f..a0e1bfa8b 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -361,7 +361,7 @@ func TestCSIHook(t *testing.T) { vm.NextUnmountVolumeErr = errors.New("bad first attempt") } - err := hook.Prerun() + err := hook.Prerun(nil) if tc.expectedClaimErr != nil { must.EqError(t, err, tc.expectedClaimErr.Error()) } else { @@ -474,11 +474,11 @@ func TestCSIHook_Prerun_Validation(t *testing.T) { must.NotNil(t, hook) if tc.expectedErr != "" { - must.EqError(t, hook.Prerun(), tc.expectedErr) + must.EqError(t, hook.Prerun(nil), tc.expectedErr) mounts := ar.res.GetCSIMounts() must.Nil(t, mounts) } else { - must.NoError(t, hook.Prerun()) + must.NoError(t, hook.Prerun(nil)) mounts := ar.res.GetCSIMounts() must.NotNil(t, mounts) must.NoError(t, hook.Postrun()) diff --git a/client/allocrunner/fail_hook.go b/client/allocrunner/fail_hook.go index 0c03d6184..e53f148cf 100644 --- a/client/allocrunner/fail_hook.go +++ b/client/allocrunner/fail_hook.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/hcl/v2/hclsimple" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" ) var ErrFailHookError = errors.New("failed successfully") @@ -67,7 +68,7 @@ var ( _ interfaces.ShutdownHook = (*FailHook)(nil) ) -func (h *FailHook) Prerun() error { +func (h *FailHook) Prerun(_ *taskenv.TaskEnv) error { if h.Fail.Prerun { return fmt.Errorf("prerun %w", ErrFailHookError) } diff --git a/client/allocrunner/group_service_hook.go b/client/allocrunner/group_service_hook.go index 289d7d361..3ac957884 100644 --- a/client/allocrunner/group_service_hook.go +++ b/client/allocrunner/group_service_hook.go @@ -49,12 +49,11 @@ type groupServiceHook struct { logger hclog.Logger // The following fields may be updated - canary bool - services []*structs.Service - networks structs.Networks - ports structs.AllocatedPorts - taskEnvBuilder *taskenv.Builder - delay time.Duration + canary bool + services []*structs.Service + networks structs.Networks + ports structs.AllocatedPorts + delay time.Duration // Since Update() may be called concurrently with any other hook all // hook methods must be fully serialized @@ -64,7 +63,6 @@ type groupServiceHook struct { type groupServiceHookConfig struct { alloc *structs.Allocation restarter serviceregistration.WorkloadRestarter - taskEnvBuilder *taskenv.Builder networkStatus structs.NetworkStatus shutdownDelayCtx context.Context logger hclog.Logger @@ -95,7 +93,6 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { namespace: cfg.alloc.Namespace, restarter: cfg.restarter, providerNamespace: cfg.providerNamespace, - taskEnvBuilder: cfg.taskEnvBuilder, delay: shutdownDelay, networkStatus: cfg.networkStatus, logger: cfg.logger.Named(groupServiceHookName), @@ -131,7 +128,7 @@ func (*groupServiceHook) Name() string { return groupServiceHookName } -func (h *groupServiceHook) Prerun() error { +func (h *groupServiceHook) Prerun(allocEnv *taskenv.TaskEnv) error { h.mu.Lock() defer func() { // Mark prerun as true to unblock Updates @@ -140,15 +137,21 @@ func (h *groupServiceHook) Prerun() error { h.deregistered = false h.mu.Unlock() }() - return h.preRunLocked() + + return h.preRunLocked(allocEnv) } // caller must hold h.mu -func (h *groupServiceHook) preRunLocked() error { +func (h *groupServiceHook) preRunLocked(env *taskenv.TaskEnv) error { if len(h.services) == 0 { return nil } + // TODO(tgross): this will be nil in PreTaskKill method because we have some + // false sharing of this method + if env != nil { + h.services = taskenv.InterpolateServices(env, h.services) + } services := h.getWorkloadServicesLocked() return h.serviceRegWrapper.RegisterWorkload(services) } @@ -181,10 +184,10 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { // Update group service hook fields h.networks = networks - h.services = tg.Services + h.services = taskenv.InterpolateServices(req.AllocEnv, tg.Services) + h.canary = canary h.delay = shutdown - h.taskEnvBuilder.UpdateTask(req.Alloc, nil) // An update may change the service provider, therefore we need to account // for how namespaces work across providers also. @@ -213,7 +216,7 @@ func (h *groupServiceHook) PreTaskRestart() error { }() h.preKillLocked() - return h.preRunLocked() + return h.preRunLocked(nil) } func (h *groupServiceHook) PreKill() { @@ -278,9 +281,6 @@ func (h *groupServiceHook) deregisterLocked() { // // caller must hold h.lock func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.WorkloadServices { - // Interpolate with the task's environment - interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services) - allocTokens := h.hookResources.GetConsulTokens() tokens := map[string]string{} @@ -308,7 +308,7 @@ func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.Work AllocInfo: info, ProviderNamespace: h.providerNamespace, Restarter: h.restarter, - Services: interpolatedServices, + Services: h.services, Networks: h.networks, NetworkStatus: netStatus, Ports: h.ports, diff --git a/client/allocrunner/group_service_hook_test.go b/client/allocrunner/group_service_hook_test.go index 90866fde6..7b53d26ea 100644 --- a/client/allocrunner/group_service_hook_test.go +++ b/client/allocrunner/group_service_hook_test.go @@ -36,6 +36,7 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { logger := testlog.HCLogger(t) consulMockClient := regMock.NewServiceRegistrationHandler(logger) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() regWrapper := wrapper.NewHandlerWrapper( logger, @@ -46,13 +47,12 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(env)) - req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env} must.NoError(t, h.Update(req)) must.NoError(t, h.Postrun()) @@ -77,6 +77,7 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) { logger := testlog.HCLogger(t) consulMockClient := regMock.NewServiceRegistrationHandler(logger) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() regWrapper := wrapper.NewHandlerWrapper( logger, @@ -88,15 +89,14 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(env)) // Incease shutdown Delay alloc.Job.TaskGroups[0].ShutdownDelay = pointer.Of(15 * time.Second) - req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env} must.NoError(t, h.Update(req)) // Assert that update updated the delay value @@ -120,6 +120,7 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { alloc.Job.Canonicalize() logger := testlog.HCLogger(t) consulMockClient := regMock.NewServiceRegistrationHandler(logger) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() regWrapper := wrapper.NewHandlerWrapper( logger, @@ -130,13 +131,12 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(env)) - req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env} must.NoError(t, h.Update(req)) must.NoError(t, h.Postrun()) @@ -169,6 +169,7 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) { logger := testlog.HCLogger(t) consulMockClient := regMock.NewServiceRegistrationHandler(logger) nomadMockClient := regMock.NewServiceRegistrationHandler(logger) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() regWrapper := wrapper.NewHandlerWrapper(logger, consulMockClient, nomadMockClient) @@ -176,14 +177,13 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(env)) // Trigger our hook requests. - req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env} must.NoError(t, h.Update(req)) must.NoError(t, h.Postrun()) must.NoError(t, h.PreTaskRestart()) @@ -221,6 +221,7 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { logger := testlog.HCLogger(t) consulMockClient := regMock.NewServiceRegistrationHandler(logger) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() regWrapper := wrapper.NewHandlerWrapper( logger, @@ -231,13 +232,12 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) - must.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun(env)) - req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env} must.NoError(t, h.Update(req)) must.NoError(t, h.Postrun()) @@ -280,7 +280,6 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) @@ -324,7 +323,6 @@ func TestGroupServiceHook_PreKill(t *testing.T) { serviceRegWrapper: regWrapper, shutdownDelayCtx: shutDownCtx, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) @@ -373,7 +371,6 @@ func TestGroupServiceHook_PreKill(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) @@ -426,7 +423,6 @@ func TestGroupServiceHook_PreKill(t *testing.T) { alloc: alloc, serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, hookResources: cstructs.NewAllocHookResources(), }) diff --git a/client/allocrunner/health_hook.go b/client/allocrunner/health_hook.go index 9ac3653f5..0944aa3f1 100644 --- a/client/allocrunner/health_hook.go +++ b/client/allocrunner/health_hook.go @@ -68,13 +68,6 @@ type allocHealthWatcherHook struct { // alloc set by new func or Update. Must hold hookLock to access. alloc *structs.Allocation - // taskEnvBuilder is the current builder used to build task environments - // for the group and each of its tasks. Must hold hookLock to modify. - taskEnvBuilder *taskenv.Builder - - // taskEnvBuilderFactory creates a new *taskenv.Builder instance. - taskEnvBuilderFactory func() *taskenv.Builder - // isDeploy is true if monitoring a deployment. Set in init(). Must // hold hookLock to access. isDeploy bool @@ -85,7 +78,6 @@ type allocHealthWatcherHook struct { func newAllocHealthWatcherHook( logger hclog.Logger, alloc *structs.Allocation, - taskEnvBuilderFactory func() *taskenv.Builder, hs healthSetter, listener *cstructs.AllocListener, consul serviceregistration.Handler, @@ -103,15 +95,13 @@ func newAllocHealthWatcherHook( close(closedDone) h := &allocHealthWatcherHook{ - alloc: alloc, - taskEnvBuilderFactory: taskEnvBuilderFactory, - taskEnvBuilder: taskEnvBuilderFactory(), - cancelFn: func() {}, // initialize to prevent nil func panics - watchDone: closedDone, - consul: consul, - checkStore: checkStore, - healthSetter: hs, - listener: listener, + alloc: alloc, + cancelFn: func() {}, // initialize to prevent nil func panics + watchDone: closedDone, + consul: consul, + checkStore: checkStore, + healthSetter: hs, + listener: listener, } h.logger = logger.Named(h.Name()) @@ -134,7 +124,7 @@ func (h *allocHealthWatcherHook) Name() string { // Prerun or Update. Caller must set/update alloc and logger fields. // // Not threadsafe so the caller should lock since Updates occur concurrently. -func (h *allocHealthWatcherHook) init() error { +func (h *allocHealthWatcherHook) init(allocEnv *taskenv.TaskEnv) error { // No need to watch health as it's already set if h.healthSetter.HasHealth() { h.logger.Trace("not watching; already has health set") @@ -166,7 +156,7 @@ func (h *allocHealthWatcherHook) init() error { h.logger.Trace("watching", "deadline", deadline, "checks", useChecks, "min_healthy_time", minHealthyTime) // Create a new tracker, start it, and watch for health results. tracker := allochealth.NewTracker( - ctx, h.logger, h.alloc, h.listener, h.taskEnvBuilder, h.consul, h.checkStore, minHealthyTime, useChecks, + ctx, h.logger, h.alloc, h.listener, allocEnv, h.consul, h.checkStore, minHealthyTime, useChecks, ) tracker.Start() @@ -176,7 +166,7 @@ func (h *allocHealthWatcherHook) init() error { return nil } -func (h *allocHealthWatcherHook) Prerun() error { +func (h *allocHealthWatcherHook) Prerun(allocEnv *taskenv.TaskEnv) error { h.hookLock.Lock() defer h.hookLock.Unlock() @@ -186,7 +176,7 @@ func (h *allocHealthWatcherHook) Prerun() error { } h.ranOnce = true - return h.init() + return h.init(allocEnv) } func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) error { @@ -210,10 +200,7 @@ func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) err // Update alloc h.alloc = req.Alloc - // Create a new taskEnvBuilder with the updated alloc and a nil task - h.taskEnvBuilder = h.taskEnvBuilderFactory().UpdateTask(req.Alloc, nil) - - return h.init() + return h.init(req.AllocEnv) } func (h *allocHealthWatcherHook) Postrun() error { diff --git a/client/allocrunner/health_hook_test.go b/client/allocrunner/health_hook_test.go index 7b1bb09b4..ce1ae8c9f 100644 --- a/client/allocrunner/health_hook_test.go +++ b/client/allocrunner/health_hook_test.go @@ -96,7 +96,7 @@ func TestHealthHook_PrerunPostrun(t *testing.T) { checks := new(mock.CheckShim) alloc := mock.Alloc() - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks) // Assert we implemented the right interfaces prerunh, ok := h.(interfaces.RunnerPrerunHook) @@ -107,7 +107,8 @@ func TestHealthHook_PrerunPostrun(t *testing.T) { require.True(ok) // Prerun - require.NoError(prerunh.Prerun()) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() + require.NoError(prerunh.Prerun(env)) // Assert isDeploy is false (other tests peek at isDeploy to determine // if an Update applied) @@ -135,10 +136,11 @@ func TestHealthHook_PrerunUpdatePostrun(t *testing.T) { hs := &mockHealthSetter{} checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() // Prerun - require.NoError(h.Prerun()) + require.NoError(h.Prerun(env)) // Update multiple times in a goroutine to mimic Client behavior // (Updates are concurrent with alloc runner but are applied serially). @@ -175,7 +177,8 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) { hs := &mockHealthSetter{} checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() // Set a DeploymentID to cause ClearHealth to be called alloc.DeploymentID = uuid.Generate() @@ -184,7 +187,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) { // concurrent with alloc runner). errs := make(chan error, 1) go func(alloc *structs.Allocation) { - errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc}) + errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env}) close(errs) }(alloc.Copy()) @@ -193,7 +196,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) { } // Prerun should be a noop - require.NoError(h.Prerun()) + require.NoError(h.Prerun(env)) // Assert that the Update took affect by isDeploy being true h.hookLock.Lock() @@ -218,7 +221,7 @@ func TestHealthHook_Postrun(t *testing.T) { alloc := mock.Alloc() checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) // Postrun require.NoError(h.Postrun()) @@ -285,10 +288,11 @@ func TestHealthHook_SetHealth_healthy(t *testing.T) { hs := newMockHealthSetter() checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() // Prerun - require.NoError(h.Prerun()) + require.NoError(h.Prerun(env)) // Wait for health to be set (healthy) select { @@ -374,10 +378,11 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) { hs := newMockHealthSetter() checks := new(mock.CheckShim) - h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() // Prerun - require.NoError(h.Prerun()) + require.NoError(h.Prerun(env)) // Wait to ensure we don't get a healthy status select { @@ -396,7 +401,7 @@ func TestHealthHook_SystemNoop(t *testing.T) { ci.Parallel(t) alloc := mock.SystemAlloc() - h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil) + h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), nil, nil, nil, nil) // Assert that it's the noop impl _, ok := h.(noopAllocHealthWatcherHook) @@ -418,15 +423,9 @@ func TestHealthHook_BatchNoop(t *testing.T) { ci.Parallel(t) alloc := mock.BatchAlloc() - h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil) + h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), nil, nil, nil, nil) // Assert that it's the noop impl _, ok := h.(noopAllocHealthWatcherHook) require.True(t, ok) } - -func taskEnvBuilderFactory(alloc *structs.Allocation) func() *taskenv.Builder { - return func() *taskenv.Builder { - return taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) - } -} diff --git a/client/allocrunner/identity_hook.go b/client/allocrunner/identity_hook.go index ea34b8ee8..78a3528d0 100644 --- a/client/allocrunner/identity_hook.go +++ b/client/allocrunner/identity_hook.go @@ -6,6 +6,7 @@ package allocrunner import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/widmgr" ) @@ -34,7 +35,7 @@ func (*identityHook) Name() string { return "identity" } -func (h *identityHook) Prerun() error { +func (h *identityHook) Prerun(_ *taskenv.TaskEnv) error { // run the renewal if err := h.widmgr.Run(); err != nil { return err diff --git a/client/allocrunner/identity_hook_test.go b/client/allocrunner/identity_hook_test.go index 0d49087e5..8825c0133 100644 --- a/client/allocrunner/identity_hook_test.go +++ b/client/allocrunner/identity_hook_test.go @@ -40,13 +40,11 @@ func TestIdentityHook_Prerun(t *testing.T) { logger := testlog.HCLogger(t) db := cstate.NewMemDB(logger) - - // the WIDMgr env builder never has the task available - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") + env := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() // setup mock signer and WIDMgr mockSigner := widmgr.NewMockWIDSigner(task.Identities) - mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder) + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, env) allocrunner.widmgr = mockWIDMgr allocrunner.widsigner = mockSigner @@ -65,7 +63,7 @@ func TestIdentityHook_Prerun(t *testing.T) { start := time.Now() hook := newIdentityHook(logger, mockWIDMgr) must.Eq(t, hook.Name(), "identity") - must.NoError(t, hook.Prerun()) + must.NoError(t, hook.Prerun(env)) time.Sleep(time.Second) // give goroutines a moment to run sid, err := hook.widmgr.Get(structs.WIHandle{ diff --git a/client/allocrunner/interfaces/runner_lifecycle.go b/client/allocrunner/interfaces/runner_lifecycle.go index e0ad1c64c..d9a88377f 100644 --- a/client/allocrunner/interfaces/runner_lifecycle.go +++ b/client/allocrunner/interfaces/runner_lifecycle.go @@ -4,6 +4,7 @@ package interfaces import ( + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/nomad/structs" ) @@ -16,7 +17,7 @@ type RunnerHook interface { // non-terminal allocations. Terminal allocations do *not* call prerun. type RunnerPrerunHook interface { RunnerHook - Prerun() error + Prerun(*taskenv.TaskEnv) error } // A RunnerPreKillHook is executed inside of KillTasks before @@ -55,7 +56,8 @@ type RunnerUpdateHook interface { } type RunnerUpdateRequest struct { - Alloc *structs.Allocation + Alloc *structs.Allocation + AllocEnv *taskenv.TaskEnv } // A RunnerTaskRestartHook is executed just before the allocation runner is diff --git a/client/allocrunner/migrate_hook.go b/client/allocrunner/migrate_hook.go index 113b9c9da..c958b9276 100644 --- a/client/allocrunner/migrate_hook.go +++ b/client/allocrunner/migrate_hook.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/taskenv" ) // diskMigrationHook migrates ephemeral disk volumes. Depends on alloc dir @@ -41,7 +42,7 @@ func (h *diskMigrationHook) Name() string { return "migrate_disk" } -func (h *diskMigrationHook) Prerun() error { +func (h *diskMigrationHook) Prerun(_ *taskenv.TaskEnv) error { ctx := context.TODO() // Wait for a previous alloc - if any - to terminate diff --git a/client/allocrunner/network_hook.go b/client/allocrunner/network_hook.go index 13384dd9e..472cc5ad4 100644 --- a/client/allocrunner/network_hook.go +++ b/client/allocrunner/network_hook.go @@ -79,9 +79,6 @@ type networkHook struct { // the alloc network has been created networkConfigurator NetworkConfigurator - // taskEnv is used to perform interpolation within the network blocks. - taskEnv *taskenv.TaskEnv - logger hclog.Logger } @@ -91,7 +88,6 @@ func newNetworkHook(logger hclog.Logger, netManager drivers.DriverNetworkManager, netConfigurator NetworkConfigurator, networkStatusSetter networkStatusSetter, - taskEnv *taskenv.TaskEnv, ) *networkHook { return &networkHook{ isolationSetter: ns, @@ -99,7 +95,6 @@ func newNetworkHook(logger hclog.Logger, alloc: alloc, manager: netManager, networkConfigurator: netConfigurator, - taskEnv: taskEnv, logger: logger, } } @@ -114,7 +109,7 @@ func (h *networkHook) Name() string { return "network" } -func (h *networkHook) Prerun() error { +func (h *networkHook) Prerun(allocEnv *taskenv.TaskEnv) error { tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup) if len(tg.Networks) == 0 || tg.Networks[0].Mode == "host" || tg.Networks[0].Mode == "" { return nil @@ -126,7 +121,7 @@ func (h *networkHook) Prerun() error { } // Perform our networks block interpolation. - interpolatedNetworks := taskenv.InterpolateNetworks(h.taskEnv, tg.Networks) + interpolatedNetworks := taskenv.InterpolateNetworks(allocEnv, tg.Networks) // Interpolated values need to be validated. It is also possible a user // supplied hostname avoids the validation on job registrations because it diff --git a/client/allocrunner/network_hook_test.go b/client/allocrunner/network_hook_test.go index a8ec11fa7..bb1bd2dcd 100644 --- a/client/allocrunner/network_hook_test.go +++ b/client/allocrunner/network_hook_test.go @@ -82,10 +82,11 @@ func TestNetworkHook_Prerun_Postrun_group(t *testing.T) { expectedStatus: nil, } - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) logger := testlog.HCLogger(t) - hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter, envBuilder.Build()) - must.NoError(t, hook.Prerun()) + hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() + + must.NoError(t, hook.Prerun(env)) must.True(t, setter.called) must.False(t, destroyCalled) must.NoError(t, hook.Postrun()) @@ -125,10 +126,11 @@ func TestNetworkHook_Prerun_Postrun_host(t *testing.T) { expectedStatus: nil, } - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) logger := testlog.HCLogger(t) - hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter, envBuilder.Build()) - must.NoError(t, hook.Prerun()) + hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() + + must.NoError(t, hook.Prerun(env)) must.False(t, setter.called) must.False(t, destroyCalled) must.NoError(t, hook.Postrun()) @@ -184,8 +186,7 @@ func TestNetworkHook_Prerun_Postrun_ExistingNetNS(t *testing.T) { cni: fakePlugin, nsOpts: &nsOpts{}, } - - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build() testCases := []struct { name string @@ -251,9 +252,9 @@ func TestNetworkHook_Prerun_Postrun_ExistingNetNS(t *testing.T) { fakePlugin.checkErrors = tc.checkErrs configurator.nodeAttrs["plugins.cni.version.bridge"] = tc.cniVersion hook := newNetworkHook(testlog.HCLogger(t), isolationSetter, - alloc, nm, configurator, statusSetter, envBuilder.Build()) + alloc, nm, configurator, statusSetter) - err := hook.Prerun() + err := hook.Prerun(env) if tc.expectPrerunError == "" { must.NoError(t, err) } else { diff --git a/client/allocrunner/taskrunner/identity_hook_test.go b/client/allocrunner/taskrunner/identity_hook_test.go index d3d7c9343..600bcdcfa 100644 --- a/client/allocrunner/taskrunner/identity_hook_test.go +++ b/client/allocrunner/taskrunner/identity_hook_test.go @@ -92,9 +92,9 @@ func TestIdentityHook_RenewAll(t *testing.T) { logger := testlog.HCLogger(t) db := cstate.NewMemDB(logger) mockSigner := widmgr.NewMockWIDSigner(task.Identities) - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") + allocEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() - mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder) + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, allocEnv) mockWIDMgr.SetMinWait(time.Second) // fast renewals, because the default is 10s mockLifecycle := trtesting.NewMockTaskHooks() @@ -212,8 +212,8 @@ func TestIdentityHook_RenewOne(t *testing.T) { logger := testlog.HCLogger(t) db := cstate.NewMemDB(logger) mockSigner := widmgr.NewMockWIDSigner(task.Identities) - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") - mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder) + allocEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() + mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, allocEnv) mockWIDMgr.SetMinWait(time.Second) // fast renewals, because the default is 10s h := &identityHook{ diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index c922e1db4..89c1dd914 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -148,9 +148,9 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri if vault != nil { vaultFunc = func(_ string) (vaultclient.VaultClient, error) { return vault, nil } } - // the envBuilder for the WIDMgr never has access to the task, so don't - // include it here - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") + // the env for the WIDMgr never has access to the task, so don't include it + // here + allocEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() conf := &Config{ Alloc: alloc, @@ -171,7 +171,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri ServiceRegWrapper: wrapperMock, Getter: getter.TestSandbox(t), Wranglers: proclib.MockWranglers(t), - WIDMgr: widmgr.NewWIDMgr(widsigner, alloc, db, logger, envBuilder), + WIDMgr: widmgr.NewWIDMgr(widsigner, alloc, db, logger, allocEnv), AllocHookResources: cstructs.NewAllocHookResources(), } diff --git a/client/allocrunner/taskrunner/vault_hook_test.go b/client/allocrunner/taskrunner/vault_hook_test.go index 1cf243c7a..7e4be5e40 100644 --- a/client/allocrunner/taskrunner/vault_hook_test.go +++ b/client/allocrunner/taskrunner/vault_hook_test.go @@ -99,8 +99,8 @@ func setupTestVaultHook(t *testing.T, config *vaultHookConfig) *vaultHook { if config.widmgr == nil { db := cstate.NewMemDB(config.logger) signer := widmgr.NewMockWIDSigner(config.task.Identities) - envBuilder := taskenv.NewBuilder(mock.Node(), config.alloc, nil, "global") - config.widmgr = widmgr.NewWIDMgr(signer, config.alloc, db, config.logger, envBuilder) + allocEnv := taskenv.NewBuilder(mock.Node(), config.alloc, nil, "global").Build() + config.widmgr = widmgr.NewWIDMgr(signer, config.alloc, db, config.logger, allocEnv) err := config.widmgr.Run() must.NoError(t, err) } diff --git a/client/allocrunner/upstream_allocs_hook.go b/client/allocrunner/upstream_allocs_hook.go index d990ed82b..a3b12cea5 100644 --- a/client/allocrunner/upstream_allocs_hook.go +++ b/client/allocrunner/upstream_allocs_hook.go @@ -9,6 +9,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/taskenv" ) // upstreamAllocsHook waits for a PrevAllocWatcher to exit before allowing @@ -33,7 +34,7 @@ func (h *upstreamAllocsHook) Name() string { return "await_previous_allocations" } -func (h *upstreamAllocsHook) Prerun() error { +func (h *upstreamAllocsHook) Prerun(_ *taskenv.TaskEnv) error { // Wait for a previous alloc - if any - to terminate return h.allocWatcher.Wait(context.Background()) } diff --git a/client/taskenv/env.go b/client/taskenv/env.go index 02f78de38..9a8ab9e97 100644 --- a/client/taskenv/env.go +++ b/client/taskenv/env.go @@ -5,6 +5,7 @@ package taskenv import ( "fmt" + "maps" "net" "os" "path/filepath" @@ -248,6 +249,32 @@ func (t *TaskEnv) All() map[string]string { return m } +// WithTask returns a shallow copy of the TaskEnv, with the EnvMap deep-cloned +// and overwritten by task provided. This is only for use in the allocrunner +// hooks which may need to interpolate per-task services or identities, as it +// doesn't re-populate the rest of the environment +func (t *TaskEnv) WithTask(alloc *structs.Allocation, task *structs.Task) *TaskEnv { + if t == nil { + return t + } + newT := new(TaskEnv) + *newT = *t + newT.envList = []string{} + newT.EnvMap = maps.Clone(t.EnvMap) + + combined := alloc.Job.CombinedTaskMeta(alloc.TaskGroup, task.Name) + for k, v := range combined { + newT.EnvMap[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v + newT.EnvMap[fmt.Sprintf("%s%s", MetaPrefix, k)] = v + } + + for k, v := range task.Env { + newT.EnvMap[k] = v + } + newT.EnvMap[TaskName] = task.Name + return newT +} + // AllValues is a map of the task's environment variables and the node's // attributes with cty.Value (String) values. Errors including keys are // returned in a map by key name. @@ -658,11 +685,12 @@ func (b *Builder) buildEnv(allocDir, localDir, secretsDir string, // Build must be called after all the tasks environment values have been set. func (b *Builder) Build() *TaskEnv { - nodeAttrs := make(map[string]string) b.mu.RLock() defer b.mu.RUnlock() + nodeAttrs := make(map[string]string) + if b.region != "" { // Copy region over to node attrs nodeAttrs[nodeRegionKey] = b.region @@ -678,13 +706,6 @@ func (b *Builder) Build() *TaskEnv { return NewTaskEnv(envMap, envMapClient, deviceEnvs, nodeAttrs, b.clientTaskRoot, b.clientSharedAllocDir) } -// UpdateTask updates the environment based on a new alloc and task. -func (b *Builder) UpdateTask(alloc *structs.Allocation, task *structs.Task) *Builder { - b.mu.Lock() - defer b.mu.Unlock() - return b.setTask(task).setAlloc(alloc) -} - // SetHookEnv sets environment variables from a hook. Variables are // Last-Write-Wins, so if a hook writes a variable that's also written by a // later hook, the later hooks value always gets used. @@ -851,6 +872,9 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { // setNode is called from NewBuilder to populate node attributes. func (b *Builder) setNode(n *structs.Node) *Builder { + if n == nil { + return b + } b.nodeAttrs = make(map[string]string, 4+len(n.Attributes)+len(n.Meta)) b.nodeAttrs[nodeIdKey] = n.ID b.nodeAttrs[nodeNameKey] = n.Name diff --git a/client/taskenv/env_test.go b/client/taskenv/env_test.go index 8386962ec..8df01fb72 100644 --- a/client/taskenv/env_test.go +++ b/client/taskenv/env_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" + "github.com/shoenig/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -707,54 +708,41 @@ func TestEnvironment_DashesInTaskName(t *testing.T) { } } -// TestEnvironment_UpdateTask asserts env vars and task meta are updated when a -// task is updated. -func TestEnvironment_UpdateTask(t *testing.T) { +// TestEnvironment_WithTask asserts env vars are updated in an +// alloc environment when a task is updated. +func TestEnvironment_WithTask(t *testing.T) { ci.Parallel(t) a := mock.Alloc() a.Job.TaskGroups[0].Meta = map[string]string{"tgmeta": "tgmetaval"} - task := a.Job.TaskGroups[0].Tasks[0] - task.Name = "orig" - task.Env = map[string]string{"env": "envval"} - task.Meta = map[string]string{"taskmeta": "taskmetaval"} - builder := NewBuilder(mock.Node(), a, task, "global") + builder := NewBuilder(mock.Node(), a, nil, "global") origMap := builder.Build().Map() - if origMap["NOMAD_TASK_NAME"] != "orig" { - t.Errorf("Expected NOMAD_TASK_NAME=orig but found %q", origMap["NOMAD_TASK_NAME"]) - } - if origMap["NOMAD_META_taskmeta"] != "taskmetaval" { - t.Errorf("Expected NOMAD_META_taskmeta=taskmetaval but found %q", origMap["NOMAD_META_taskmeta"]) - } - if origMap["env"] != "envval" { - t.Errorf("Expected env=envva but found %q", origMap["env"]) - } - if origMap["NOMAD_META_tgmeta"] != "tgmetaval" { - t.Errorf("Expected NOMAD_META_tgmeta=tgmetaval but found %q", origMap["NOMAD_META_tgmeta"]) - } + test.Eq(t, "web", origMap["NOMAD_GROUP_NAME"]) + test.Eq(t, "armon", origMap["NOMAD_META_owner"]) + test.Eq(t, "", origMap["NOMAD_META_taskmeta"]) - a.Job.TaskGroups[0].Meta = map[string]string{"tgmeta2": "tgmetaval2"} - task.Name = "new" + task := a.Job.TaskGroups[0].Tasks[0] + task.Name = "task1" + task.Env = map[string]string{"env": "envval"} + task.Meta = map[string]string{"taskmeta": "taskmetaval"} + + newMap1 := builder.Build().WithTask(a, task).Map() + test.Eq(t, "task1", newMap1["NOMAD_TASK_NAME"]) + test.Eq(t, "taskmetaval", newMap1["NOMAD_META_taskmeta"]) + test.Eq(t, "envval", newMap1["env"]) + + task.Name = "task2" task.Env = map[string]string{"env2": "envval2"} task.Meta = map[string]string{"taskmeta2": "taskmetaval2"} - newMap := builder.UpdateTask(a, task).Build().Map() - if newMap["NOMAD_TASK_NAME"] != "new" { - t.Errorf("Expected NOMAD_TASK_NAME=new but found %q", newMap["NOMAD_TASK_NAME"]) - } - if newMap["NOMAD_META_taskmeta2"] != "taskmetaval2" { - t.Errorf("Expected NOMAD_META_taskmeta=taskmetaval but found %q", newMap["NOMAD_META_taskmeta2"]) - } - if newMap["env2"] != "envval2" { - t.Errorf("Expected env=envva but found %q", newMap["env2"]) - } - if newMap["NOMAD_META_tgmeta2"] != "tgmetaval2" { - t.Errorf("Expected NOMAD_META_tgmeta=tgmetaval but found %q", newMap["NOMAD_META_tgmeta2"]) - } - if v, ok := newMap["NOMAD_META_taskmeta"]; ok { - t.Errorf("Expected NOMAD_META_taskmeta to be unset but found: %q", v) - } + // original env should not have been mutated + newMap2 := builder.Build().WithTask(a, task).Map() + test.Eq(t, "task2", newMap2["NOMAD_TASK_NAME"]) + test.Eq(t, "taskmetaval2", newMap2["NOMAD_META_taskmeta2"]) + test.Eq(t, "envval2", newMap2["env2"]) + test.Eq(t, "", newMap2["NOMAD_META_taskmeta"]) + test.Eq(t, "", newMap2["env"]) } // TestEnvironment_InterpolateEmptyOptionalMeta asserts that in a parameterized diff --git a/client/widmgr/widmgr.go b/client/widmgr/widmgr.go index c0a043d6f..888befe96 100644 --- a/client/widmgr/widmgr.go +++ b/client/widmgr/widmgr.go @@ -54,12 +54,10 @@ type WIDMgr struct { logger hclog.Logger } -func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, db cstate.StateDB, logger hclog.Logger, envBuilder *taskenv.Builder) *WIDMgr { +func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, db cstate.StateDB, logger hclog.Logger, allocEnv *taskenv.TaskEnv) *WIDMgr { widspecs := map[structs.WIHandle]*structs.WorkloadIdentity{} tg := a.Job.LookupTaskGroup(a.TaskGroup) - allocEnv := envBuilder.Build() - for _, service := range tg.Services { if service.Identity != nil { handle := *service.IdentityHandle(allocEnv.ReplaceEnv) @@ -74,7 +72,7 @@ func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, db cstate.StateDB, } // update the builder for this task - taskEnv := envBuilder.UpdateTask(a, task).Build() + taskEnv := allocEnv.WithTask(a, task) for _, service := range task.Services { if service.Identity != nil { handle := *service.IdentityHandle(taskEnv.ReplaceEnv) diff --git a/client/widmgr/widmgr_test.go b/client/widmgr/widmgr_test.go index 8133840a6..9b9c9777f 100644 --- a/client/widmgr/widmgr_test.go +++ b/client/widmgr/widmgr_test.go @@ -30,10 +30,10 @@ func TestWIDMgr_Restore(t *testing.T) { } alloc.Job.TaskGroups[0].Tasks[0].Services[0].Identity = widSpecs[0] alloc.Job.TaskGroups[0].Tasks[0].Identities = widSpecs[1:] - envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global") + env := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build() signer := NewMockWIDSigner(widSpecs) - mgr := NewWIDMgr(signer, alloc, db, logger, envBuilder) + mgr := NewWIDMgr(signer, alloc, db, logger, env) // restore, but we haven't previously saved to the db hasExpired, err := mgr.restoreStoredIdentities() @@ -54,7 +54,7 @@ func TestWIDMgr_Restore(t *testing.T) { widSpecs[2].TTL = time.Second signer.setWIDs(widSpecs) - wiHandle := service.IdentityHandle(envBuilder.Build().ReplaceEnv) + wiHandle := service.IdentityHandle(env.ReplaceEnv) mgr.widSpecs[*wiHandle].TTL = time.Second // force a re-sign to re-populate the lastToken and save to the db diff --git a/nomad/job_endpoint_hook_connect.go b/nomad/job_endpoint_hook_connect.go index 1985ac9c0..ab8e54bab 100644 --- a/nomad/job_endpoint_hook_connect.go +++ b/nomad/job_endpoint_hook_connect.go @@ -273,10 +273,10 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error { // This should only be used to interpolate connect service names which are // used in sidecar or gateway task names. Note that the service name might // also be interpolated with job specifics during service canonicalization. - env := taskenv.NewEmptyBuilder().UpdateTask(&structs.Allocation{ + env := taskenv.NewBuilder(nil, &structs.Allocation{ Job: job, TaskGroup: g.Name, - }, nil).Build() + }, nil, job.Region).Build() for _, service := range g.Services { switch {