provide allocrunner hooks with prebuilt taskenv and fix mutation bugs (#25373)

Some of our allocrunner hooks require a task environment for interpolating values based on the node or allocation. But several of the hooks accept an already-built environment or builder and then keep that in memory. Both of these retain a copy of all the node attributes and allocation metadata, which balloons memory usage until the allocation is GC'd.

While we'd like to look into ways to avoid keeping the allocrunner around entirely (see #25372), for now we can significantly reduce memory usage by creating the task environment on-demand when calling allocrunner methods, rather than persisting it in the allocrunner hooks.

In doing so, we uncover two other bugs:
* The WID manager, the group service hook, and the checks hook have to interpolate services for specific tasks. They mutated a taskenv builder to do so, but each time they mutate the builder, they write to the same environment map. When a group has multiple tasks, it's possible for one task to set an environment variable that would then be interpolated in the service definition for another task if that task did not have that environment variable. Only the service definition interpolation is impacted. This does not leak env vars across running tasks, as each taskrunner has its own builder.

  To fix this, we move the `UpdateTask` method off the builder and onto the taskenv as the `WithTask` method. This makes a shallow copy of the taskenv with a deep clone of the environment map used for interpolation, and then overwrites the environment from the task.

* The checks hook interpolates Nomad native service checks only on `Prerun` and not on `Update`. This could cause unexpected deregistration and registration of checks during in-place updates. To fix this, we make sure we interpolate in the `Update` method.

I also bumped into an incorrectly implemented interface in the CSI hook. I've pulled that and some better guardrails out to https://github.com/hashicorp/nomad/pull/25472.

Fixes: https://github.com/hashicorp/nomad/issues/25269
Fixes: https://hashicorp.atlassian.net/browse/NET-12310
Ref: https://github.com/hashicorp/nomad/issues/25372
This commit is contained in:
Tim Gross
2025-03-24 12:05:04 -04:00
committed by GitHub
parent ecf3d88e81
commit e168548341
38 changed files with 278 additions and 279 deletions

11
.changelog/25373.txt Normal file
View File

@@ -0,0 +1,11 @@
```release-note:improvement
client: Improve memory usage by dropping references to task environment
```
```release-note:bug
services: Fixed a bug where Nomad native services would not be correctly interpolated during in-place updates
```
```release-note:bug
services: Fixed a bug where task-level services, checks, and identities could interpolate jobspec values from other tasks in the same group
```

View File

@@ -119,7 +119,7 @@ func NewTracker(
logger hclog.Logger,
alloc *structs.Allocation,
allocUpdates *cstructs.AllocListener,
taskEnvBuilder *taskenv.Builder,
allocEnv *taskenv.TaskEnv,
consulClient serviceregistration.Handler,
checkStore checkstore.Shim,
minHealthyTime time.Duration,
@@ -145,7 +145,7 @@ func NewTracker(
// first because taskEnvBuilder is mutated in every loop and we can't undo
// a call to UpdateTask().
t.taskEnvs = make(map[string]*taskenv.TaskEnv, len(t.tg.Tasks)+1)
t.taskEnvs[""] = taskEnvBuilder.Build()
t.taskEnvs[""] = allocEnv
t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks))
for _, task := range t.tg.Tasks {
@@ -155,7 +155,7 @@ func NewTracker(
t.lifecycleTasks[task.Name] = task.Lifecycle.Hook
}
t.taskEnvs[task.Name] = taskEnvBuilder.UpdateTask(alloc, task).Build()
t.taskEnvs[task.Name] = allocEnv.WithTask(alloc, task)
c, n := countChecks(task.Services)
t.consulCheckCount += c

View File

@@ -180,9 +180,9 @@ func TestTracker_ConsulChecks_Interpolation(t *testing.T) {
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
@@ -253,9 +253,9 @@ func TestTracker_ConsulChecks_Healthy(t *testing.T) {
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
@@ -306,9 +306,9 @@ func TestTracker_NomadChecks_Healthy(t *testing.T) {
consul := regmock.NewServiceRegistrationHandler(logger)
checkInterval := 10 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
@@ -375,9 +375,9 @@ func TestTracker_NomadChecks_Unhealthy(t *testing.T) {
consul := regmock.NewServiceRegistrationHandler(logger)
checkInterval := 10 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
@@ -436,9 +436,9 @@ func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) {
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
@@ -479,9 +479,9 @@ func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) {
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
@@ -560,9 +560,9 @@ func TestTracker_ConsulChecks_Unhealthy(t *testing.T) {
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
@@ -641,9 +641,9 @@ func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) {
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
minHealthyTime := 2 * time.Second
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, minHealthyTime, true)
tracker.checkLookupInterval = checkInterval
assertChecksHealth := func(exp bool) {
@@ -732,9 +732,9 @@ func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) {
consul := regmock.NewServiceRegistrationHandler(logger)
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
assertChecksHealth := func(exp bool) {
@@ -785,8 +785,8 @@ func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
tracker := NewTracker(ctx, logger, alloc, nil, taskEnvBuilder, nil, nil, time.Millisecond, true)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, nil, env, nil, nil, time.Millisecond, true)
assertNoHealth := func() {
require.NoError(t, tracker.ctx.Err())
@@ -895,9 +895,9 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) {
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
@@ -1042,9 +1042,9 @@ func TestTracker_ConsulChecks_OnUpdate(t *testing.T) {
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, time.Millisecond, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()
@@ -1162,9 +1162,9 @@ func TestTracker_NomadChecks_OnUpdate(t *testing.T) {
consul := regmock.NewServiceRegistrationHandler(logger)
minHealthyTime := 1 * time.Millisecond
taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnvBuilder, consul, checks, minHealthyTime, true)
tracker := NewTracker(ctx, logger, alloc, b.Listen(), env, consul, checks, minHealthyTime, true)
tracker.checkLookupInterval = 10 * time.Millisecond
tracker.Start()

View File

@@ -297,17 +297,15 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e
ar.shutdownDelayCtx = shutdownDelayCtx
ar.shutdownDelayCancelFn = shutdownDelayCancel
// Create a *taskenv.Builder for the allocation so the WID manager can
// interpolate services with the allocation and tasks as needed
envBuilder := taskenv.NewBuilder(
allocEnv := taskenv.NewBuilder(
config.ClientConfig.Node,
ar.Alloc(),
nil,
config.ClientConfig.Region,
).SetAllocDir(ar.allocDir.AllocDirPath())
).SetAllocDir(ar.allocDir.AllocDirPath()).Build()
// initialize the workload identity manager
widmgr := widmgr.NewWIDMgr(ar.widsigner, alloc, ar.stateDB, ar.logger, envBuilder)
widmgr := widmgr.NewWIDMgr(ar.widsigner, alloc, ar.stateDB, ar.logger, allocEnv)
ar.widmgr = widmgr
// Initialize the runners hooks.

View File

@@ -103,21 +103,6 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
return fmt.Errorf("failed to initialize network configurator: %v", err)
}
// Create a new taskenv.Builder which is used by hooks that mutate them to
// build new taskenv.TaskEnv.
newEnvBuilder := func() *taskenv.Builder {
return taskenv.NewBuilder(
config.Node,
ar.Alloc(),
nil,
config.Region,
).SetAllocDir(ar.allocDir.AllocDirPath())
}
// Create a *taskenv.TaskEnv which is used for read only purposes by the
// newNetworkHook and newChecksHook.
builtTaskEnv := newEnvBuilder().Build()
// Create the alloc directory hook. This is run first to ensure the
// directory path exists for other hooks.
alloc := ar.Alloc()
@@ -132,21 +117,19 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
consulConfigs: ar.clientConfig.GetConsulConfigs(hookLogger),
consulClientConstructor: consul.NewConsulClientFactory(config),
hookResources: ar.hookResources,
envBuilder: newEnvBuilder,
logger: hookLogger,
}),
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newCPUPartsHook(hookLogger, ar.partitions, alloc),
newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulServicesHandler, ar.checkStore),
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulServicesHandler, ar.checkStore),
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
providerNamespace: alloc.ServiceProviderNamespace(),
serviceRegWrapper: ar.serviceRegWrapper,
hookResources: ar.hookResources,
restarter: ar,
taskEnvBuilder: newEnvBuilder(),
networkStatus: ar,
logger: hookLogger,
shutdownDelayCtx: ar.shutdownDelayCtx,
@@ -156,7 +139,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir,
config.GetConsulConfigs(ar.logger)),
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID),
newChecksHook(hookLogger, alloc, ar.checkStore, ar, builtTaskEnv),
newChecksHook(hookLogger, alloc, ar.checkStore, ar),
}
if config.ExtraAllocHooks != nil {
ar.runnerHooks = append(ar.runnerHooks, config.ExtraAllocHooks...)
@@ -176,6 +159,13 @@ func (ar *allocRunner) prerun() error {
}()
}
allocEnv := taskenv.NewBuilder(
ar.clientConfig.GetNode(),
ar.Alloc(),
nil,
ar.clientConfig.Region,
).SetAllocDir(ar.allocDir.AllocDirPath()).Build()
for _, hook := range ar.runnerHooks {
pre, ok := hook.(interfaces.RunnerPrerunHook)
if !ok {
@@ -197,7 +187,7 @@ func (ar *allocRunner) prerun() error {
hookExecutionStart = time.Now()
}
err := pre.Prerun()
err := pre.Prerun(allocEnv)
ar.hookStatsHandler.Emit(hookExecutionStart, name, "prerun", err)
if err != nil {
return fmt.Errorf("pre-run hook %q failed: %v", name, err)
@@ -224,8 +214,16 @@ func (ar *allocRunner) update(update *structs.Allocation) error {
}()
}
allocEnv := taskenv.NewBuilder(
ar.clientConfig.GetNode(),
ar.Alloc(),
nil,
ar.clientConfig.Region,
).SetAllocDir(ar.allocDir.AllocDirPath()).Build()
req := &interfaces.RunnerUpdateRequest{
Alloc: update,
Alloc: update,
AllocEnv: allocEnv,
}
var merr multierror.Error

View File

@@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/ci"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@@ -303,6 +304,6 @@ type allocFailingPrestartHook struct{}
func (*allocFailingPrestartHook) Name() string { return "failing_prestart" }
func (*allocFailingPrestartHook) Prerun() error {
func (*allocFailingPrestartHook) Prerun(_ *taskenv.TaskEnv) error {
return fmt.Errorf("failing prestart hooks")
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
)
// allocDirHook creates and destroys the root directory and shared directories
@@ -34,7 +35,7 @@ func (h *allocDirHook) Name() string {
return "alloc_dir"
}
func (h *allocDirHook) Prerun() error {
func (h *allocDirHook) Prerun(_ *taskenv.TaskEnv) error {
return h.allocDir.Build()
}

View File

@@ -83,7 +83,6 @@ type checksHook struct {
shim checkstore.Shim
checker checks.Checker
allocID string
taskEnv *taskenv.TaskEnv
// fields that get re-initialized on allocation update
lock sync.RWMutex
@@ -98,7 +97,6 @@ func newChecksHook(
alloc *structs.Allocation,
shim checkstore.Shim,
network structs.NetworkStatus,
taskEnv *taskenv.TaskEnv,
) *checksHook {
h := &checksHook{
logger: logger.Named(checksHookName),
@@ -107,7 +105,6 @@ func newChecksHook(
shim: shim,
network: network,
checker: checks.New(logger),
taskEnv: taskEnv,
}
h.initialize(alloc)
return h
@@ -210,7 +207,7 @@ func (h *checksHook) Name() string {
return checksHookName
}
func (h *checksHook) Prerun() error {
func (h *checksHook) Prerun(allocEnv *taskenv.TaskEnv) error {
h.lock.Lock()
defer h.lock.Unlock()
@@ -219,7 +216,8 @@ func (h *checksHook) Prerun() error {
return nil
}
interpolatedServices := taskenv.InterpolateServices(h.taskEnv, group.NomadServices())
interpolatedServices := taskenv.InterpolateServices(
allocEnv, group.NomadServices())
// create and start observers of nomad service checks in alloc
h.observe(h.alloc, interpolatedServices)
@@ -237,11 +235,12 @@ func (h *checksHook) Update(request *interfaces.RunnerUpdateRequest) error {
}
// get all group and task level services using nomad provider
services := group.NomadServices()
interpolatedServices := taskenv.InterpolateServices(
request.AllocEnv, group.NomadServices())
// create a set of the updated set of checks
next := make([]structs.CheckID, 0, len(h.observers))
for _, service := range services {
for _, service := range interpolatedServices {
for _, check := range service.Checks {
next = append(next, structs.NomadCheckID(
request.Alloc.ID,
@@ -267,7 +266,7 @@ func (h *checksHook) Update(request *interfaces.RunnerUpdateRequest) error {
h.alloc = request.Alloc
// ensure we are observing new checks (idempotent)
h.observe(request.Alloc, services)
h.observe(request.Alloc, interpolatedServices)
return nil
}

View File

@@ -164,15 +164,15 @@ func TestCheckHook_Checks_ResultsSet(t *testing.T) {
alloc := allocWithNomadChecks(addr, port, tc.onGroup)
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
h := newChecksHook(logger, alloc, checkStore, network, envBuilder.Build())
h := newChecksHook(logger, alloc, checkStore, network)
// initialize is called; observers are created but not started yet
must.MapEmpty(t, h.observers)
// calling pre-run starts the observers
err := h.Prerun()
err := h.Prerun(env)
must.NoError(t, err)
testutil.WaitForResultUntil(
@@ -231,12 +231,12 @@ func TestCheckHook_Checks_UpdateSet(t *testing.T) {
alloc := allocWithNomadChecks(addr, port, true)
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
h := newChecksHook(logger, alloc, shim, network, envBuilder.Build())
h := newChecksHook(logger, alloc, shim, network)
// calling pre-run starts the observers
err := h.Prerun()
err := h.Prerun(env)
must.NoError(t, err)
// initial set of checks
@@ -269,8 +269,12 @@ func TestCheckHook_Checks_UpdateSet(t *testing.T) {
},
)
updatedAlloc := allocWithDifferentNomadChecks(alloc.ID, addr, port)
updatedEnv := taskenv.NewBuilder(mock.Node(), updatedAlloc, nil, alloc.Job.Region).Build()
request := &interfaces.RunnerUpdateRequest{
Alloc: allocWithDifferentNomadChecks(alloc.ID, addr, port),
Alloc: updatedAlloc,
AllocEnv: updatedEnv,
}
err = h.Update(request)

View File

@@ -21,6 +21,7 @@ import (
"github.com/hashicorp/go-set/v3"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
)
@@ -130,7 +131,7 @@ func (h *consulGRPCSocketHook) shouldRun() bool {
return false
}
func (h *consulGRPCSocketHook) Prerun() error {
func (h *consulGRPCSocketHook) Prerun(_ *taskenv.TaskEnv) error {
h.mu.Lock()
defer h.mu.Unlock()

View File

@@ -50,7 +50,7 @@ func TestConsulGRPCSocketHook_PrerunPostrun_Ok(t *testing.T) {
// Start the unix socket proxy
h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{})
require.NoError(t, h.Prerun())
require.NoError(t, h.Prerun(nil))
gRPCSock := filepath.Join(allocDir.AllocDir, allocdir.AllocGRPCSocket)
envoyConn, err := net.Dial("unix", gRPCSock)
@@ -125,7 +125,7 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) {
// An alloc without a Connect proxy sidecar should not return
// an error.
h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{})
must.NoError(t, h.Prerun())
must.NoError(t, h.Prerun(nil))
// Postrun should be a noop
must.NoError(t, h.Postrun())
@@ -135,7 +135,7 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) {
// An alloc *with* a Connect proxy sidecar *should* return an error
// when Consul is not configured.
h := newConsulGRPCSocketHook(logger, connectAlloc, allocDir, consulConfigs, map[string]string{})
must.ErrorContains(t, h.Prerun(), `consul address for cluster "" must be set on nomad client`)
must.ErrorContains(t, h.Prerun(nil), `consul address for cluster "" must be set on nomad client`)
// Postrun should be a noop
must.NoError(t, h.Postrun())
@@ -145,7 +145,7 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) {
// Updating an alloc without a sidecar to have a sidecar should
// error when the sidecar is added.
h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{})
must.NoError(t, h.Prerun())
must.NoError(t, h.Prerun(nil))
req := &interfaces.RunnerUpdateRequest{
Alloc: connectAlloc,

View File

@@ -27,7 +27,6 @@ type consulHook struct {
consulConfigs map[string]*structsc.ConsulConfig
consulClientConstructor consul.ConsulClientFunc
hookResources *cstructs.AllocHookResources
envBuilder *taskenv.Builder
logger log.Logger
shutdownCtx context.Context
@@ -48,9 +47,6 @@ type consulHookConfig struct {
// hookResources is used for storing and retrieving Consul tokens
hookResources *cstructs.AllocHookResources
// envBuilder is used to interpolate services
envBuilder func() *taskenv.Builder
logger log.Logger
}
@@ -63,7 +59,6 @@ func newConsulHook(cfg consulHookConfig) *consulHook {
consulConfigs: cfg.consulConfigs,
consulClientConstructor: cfg.consulClientConstructor,
hookResources: cfg.hookResources,
envBuilder: cfg.envBuilder(),
shutdownCtx: shutdownCtx,
shutdownCancelFn: shutdownCancelFn,
}
@@ -83,7 +78,7 @@ func (*consulHook) Name() string {
return "consul"
}
func (h *consulHook) Prerun() error {
func (h *consulHook) Prerun(allocEnv *taskenv.TaskEnv) error {
job := h.alloc.Job
if job == nil {
@@ -102,12 +97,12 @@ func (h *consulHook) Prerun() error {
}
var mErr *multierror.Error
if err := h.prepareConsulTokensForServices(tg.Services, tg, tokens, h.envBuilder.Build()); err != nil {
if err := h.prepareConsulTokensForServices(tg.Services, tg, tokens, allocEnv); err != nil {
mErr = multierror.Append(mErr, err)
}
for _, task := range tg.Tasks {
h.envBuilder.UpdateTask(h.alloc, task)
if err := h.prepareConsulTokensForServices(task.Services, tg, tokens, h.envBuilder.Build()); err != nil {
taskEnv := allocEnv.WithTask(h.alloc, task)
if err := h.prepareConsulTokensForServices(task.Services, tg, tokens, taskEnv); err != nil {
mErr = multierror.Append(mErr, err)
}
if err := h.prepareConsulTokensForTask(task, tg, tokens); err != nil {

View File

@@ -58,9 +58,9 @@ func consulHookTestHarness(t *testing.T) *consulHook {
db := cstate.NewMemDB(logger)
// the WIDMgr env builder never has the task available
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global")
env := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build()
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder)
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, env)
mockWIDMgr.SignForTesting()
consulConfigs := map[string]*structsc.ConsulConfig{
@@ -68,9 +68,6 @@ func consulHookTestHarness(t *testing.T) *consulHook {
}
hookResources := cstructs.NewAllocHookResources()
envBuilderFn := func() *taskenv.Builder {
return taskenv.NewBuilder(mock.Node(), alloc, nil, "global")
}
consulHookCfg := consulHookConfig{
alloc: alloc,
@@ -79,7 +76,6 @@ func consulHookTestHarness(t *testing.T) *consulHook {
consulConfigs: consulConfigs,
consulClientConstructor: consul.NewMockConsulClient,
hookResources: hookResources,
envBuilder: envBuilderFn,
logger: logger,
}
return newConsulHook(consulHookCfg)
@@ -184,8 +180,8 @@ func Test_consulHook_prepareConsulTokensForServices(t *testing.T) {
hook := consulHookTestHarness(t)
task := hook.alloc.LookupTask("web")
services := task.Services
hook.envBuilder.UpdateTask(hook.alloc, task)
env := hook.envBuilder.Build()
env := taskenv.NewBuilder(mock.Node(), hook.alloc, task, "global").
Build().WithTask(hook.alloc, task)
hashedJWT := make(map[string]string)
for _, s := range services {

View File

@@ -18,6 +18,7 @@ import (
"github.com/hashicorp/go-set/v3"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
)
@@ -105,7 +106,7 @@ func (h *consulHTTPSockHook) shouldRun() bool {
return false
}
func (h *consulHTTPSockHook) Prerun() error {
func (h *consulHTTPSockHook) Prerun(_ *taskenv.TaskEnv) error {
h.lock.Lock()
defer h.lock.Unlock()

View File

@@ -39,7 +39,7 @@ func TestConsulSocketHook_PrerunPostrun_Ok(t *testing.T) {
// start unix socket proxy
h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfigs)
require.NoError(t, h.Prerun())
require.NoError(t, h.Prerun(nil))
httpSocket := filepath.Join(allocDir.AllocDir, allocdir.AllocHTTPSocket)
taskCon, err := net.Dial("unix", httpSocket)
@@ -112,7 +112,7 @@ func TestConsulHTTPSocketHook_Prerun_Error(t *testing.T) {
{
// an alloc without a connect native task should not return an error
h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfigs)
must.NoError(t, h.Prerun())
must.NoError(t, h.Prerun(nil))
// postrun should be a noop
must.NoError(t, h.Postrun())
@@ -122,7 +122,7 @@ func TestConsulHTTPSocketHook_Prerun_Error(t *testing.T) {
// an alloc with a native task should return an error when consul is not
// configured
h := newConsulHTTPSocketHook(logger, connectNativeAlloc, allocDir, consulConfigs)
must.ErrorContains(t, h.Prerun(), "consul address must be set on nomad client")
must.ErrorContains(t, h.Prerun(nil), "consul address must be set on nomad client")
// Postrun should be a noop
must.NoError(t, h.Postrun())

View File

@@ -9,6 +9,7 @@ import (
"github.com/hashicorp/nomad/client/lib/cgroupslib"
"github.com/hashicorp/nomad/client/lib/idset"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -53,7 +54,7 @@ var (
_ interfaces.RunnerPostrunHook = (*cpuPartsHook)(nil)
)
func (h *cpuPartsHook) Prerun() error {
func (h *cpuPartsHook) Prerun(_ *taskenv.TaskEnv) error {
return h.partitions.Reserve(h.reservations)
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
@@ -89,7 +90,7 @@ func (c *csiHook) Name() string {
return "csi_hook"
}
func (c *csiHook) Prerun() error {
func (c *csiHook) Prerun(_ *taskenv.TaskEnv) error {
if !c.shouldRun() {
return nil
}

View File

@@ -361,7 +361,7 @@ func TestCSIHook(t *testing.T) {
vm.NextUnmountVolumeErr = errors.New("bad first attempt")
}
err := hook.Prerun()
err := hook.Prerun(nil)
if tc.expectedClaimErr != nil {
must.EqError(t, err, tc.expectedClaimErr.Error())
} else {
@@ -474,11 +474,11 @@ func TestCSIHook_Prerun_Validation(t *testing.T) {
must.NotNil(t, hook)
if tc.expectedErr != "" {
must.EqError(t, hook.Prerun(), tc.expectedErr)
must.EqError(t, hook.Prerun(nil), tc.expectedErr)
mounts := ar.res.GetCSIMounts()
must.Nil(t, mounts)
} else {
must.NoError(t, hook.Prerun())
must.NoError(t, hook.Prerun(nil))
mounts := ar.res.GetCSIMounts()
must.NotNil(t, mounts)
must.NoError(t, hook.Postrun())

View File

@@ -16,6 +16,7 @@ import (
"github.com/hashicorp/hcl/v2/hclsimple"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
)
var ErrFailHookError = errors.New("failed successfully")
@@ -67,7 +68,7 @@ var (
_ interfaces.ShutdownHook = (*FailHook)(nil)
)
func (h *FailHook) Prerun() error {
func (h *FailHook) Prerun(_ *taskenv.TaskEnv) error {
if h.Fail.Prerun {
return fmt.Errorf("prerun %w", ErrFailHookError)
}

View File

@@ -49,12 +49,11 @@ type groupServiceHook struct {
logger hclog.Logger
// The following fields may be updated
canary bool
services []*structs.Service
networks structs.Networks
ports structs.AllocatedPorts
taskEnvBuilder *taskenv.Builder
delay time.Duration
canary bool
services []*structs.Service
networks structs.Networks
ports structs.AllocatedPorts
delay time.Duration
// Since Update() may be called concurrently with any other hook all
// hook methods must be fully serialized
@@ -64,7 +63,6 @@ type groupServiceHook struct {
type groupServiceHookConfig struct {
alloc *structs.Allocation
restarter serviceregistration.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
networkStatus structs.NetworkStatus
shutdownDelayCtx context.Context
logger hclog.Logger
@@ -95,7 +93,6 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
namespace: cfg.alloc.Namespace,
restarter: cfg.restarter,
providerNamespace: cfg.providerNamespace,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
networkStatus: cfg.networkStatus,
logger: cfg.logger.Named(groupServiceHookName),
@@ -131,7 +128,7 @@ func (*groupServiceHook) Name() string {
return groupServiceHookName
}
func (h *groupServiceHook) Prerun() error {
func (h *groupServiceHook) Prerun(allocEnv *taskenv.TaskEnv) error {
h.mu.Lock()
defer func() {
// Mark prerun as true to unblock Updates
@@ -140,15 +137,21 @@ func (h *groupServiceHook) Prerun() error {
h.deregistered = false
h.mu.Unlock()
}()
return h.preRunLocked()
return h.preRunLocked(allocEnv)
}
// caller must hold h.mu
func (h *groupServiceHook) preRunLocked() error {
func (h *groupServiceHook) preRunLocked(env *taskenv.TaskEnv) error {
if len(h.services) == 0 {
return nil
}
// TODO(tgross): this will be nil in PreTaskKill method because we have some
// false sharing of this method
if env != nil {
h.services = taskenv.InterpolateServices(env, h.services)
}
services := h.getWorkloadServicesLocked()
return h.serviceRegWrapper.RegisterWorkload(services)
}
@@ -181,10 +184,10 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
// Update group service hook fields
h.networks = networks
h.services = tg.Services
h.services = taskenv.InterpolateServices(req.AllocEnv, tg.Services)
h.canary = canary
h.delay = shutdown
h.taskEnvBuilder.UpdateTask(req.Alloc, nil)
// An update may change the service provider, therefore we need to account
// for how namespaces work across providers also.
@@ -213,7 +216,7 @@ func (h *groupServiceHook) PreTaskRestart() error {
}()
h.preKillLocked()
return h.preRunLocked()
return h.preRunLocked(nil)
}
func (h *groupServiceHook) PreKill() {
@@ -278,9 +281,6 @@ func (h *groupServiceHook) deregisterLocked() {
//
// caller must hold h.lock
func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.WorkloadServices {
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)
allocTokens := h.hookResources.GetConsulTokens()
tokens := map[string]string{}
@@ -308,7 +308,7 @@ func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.Work
AllocInfo: info,
ProviderNamespace: h.providerNamespace,
Restarter: h.restarter,
Services: interpolatedServices,
Services: h.services,
Networks: h.networks,
NetworkStatus: netStatus,
Ports: h.ports,

View File

@@ -36,6 +36,7 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
logger := testlog.HCLogger(t)
consulMockClient := regMock.NewServiceRegistrationHandler(logger)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
regWrapper := wrapper.NewHandlerWrapper(
logger,
@@ -46,13 +47,12 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
alloc: alloc,
serviceRegWrapper: regWrapper,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
hookResources: cstructs.NewAllocHookResources(),
})
must.NoError(t, h.Prerun())
must.NoError(t, h.Prerun(env))
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env}
must.NoError(t, h.Update(req))
must.NoError(t, h.Postrun())
@@ -77,6 +77,7 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) {
logger := testlog.HCLogger(t)
consulMockClient := regMock.NewServiceRegistrationHandler(logger)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
regWrapper := wrapper.NewHandlerWrapper(
logger,
@@ -88,15 +89,14 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) {
alloc: alloc,
serviceRegWrapper: regWrapper,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
hookResources: cstructs.NewAllocHookResources(),
})
must.NoError(t, h.Prerun())
must.NoError(t, h.Prerun(env))
// Incease shutdown Delay
alloc.Job.TaskGroups[0].ShutdownDelay = pointer.Of(15 * time.Second)
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env}
must.NoError(t, h.Update(req))
// Assert that update updated the delay value
@@ -120,6 +120,7 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
alloc.Job.Canonicalize()
logger := testlog.HCLogger(t)
consulMockClient := regMock.NewServiceRegistrationHandler(logger)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
regWrapper := wrapper.NewHandlerWrapper(
logger,
@@ -130,13 +131,12 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
alloc: alloc,
serviceRegWrapper: regWrapper,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
hookResources: cstructs.NewAllocHookResources(),
})
must.NoError(t, h.Prerun())
must.NoError(t, h.Prerun(env))
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env}
must.NoError(t, h.Update(req))
must.NoError(t, h.Postrun())
@@ -169,6 +169,7 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) {
logger := testlog.HCLogger(t)
consulMockClient := regMock.NewServiceRegistrationHandler(logger)
nomadMockClient := regMock.NewServiceRegistrationHandler(logger)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
regWrapper := wrapper.NewHandlerWrapper(logger, consulMockClient, nomadMockClient)
@@ -176,14 +177,13 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) {
alloc: alloc,
serviceRegWrapper: regWrapper,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
hookResources: cstructs.NewAllocHookResources(),
})
must.NoError(t, h.Prerun())
must.NoError(t, h.Prerun(env))
// Trigger our hook requests.
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env}
must.NoError(t, h.Update(req))
must.NoError(t, h.Postrun())
must.NoError(t, h.PreTaskRestart())
@@ -221,6 +221,7 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) {
logger := testlog.HCLogger(t)
consulMockClient := regMock.NewServiceRegistrationHandler(logger)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
regWrapper := wrapper.NewHandlerWrapper(
logger,
@@ -231,13 +232,12 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) {
alloc: alloc,
serviceRegWrapper: regWrapper,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
hookResources: cstructs.NewAllocHookResources(),
})
must.NoError(t, h.Prerun())
must.NoError(t, h.Prerun(env))
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
req := &interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env}
must.NoError(t, h.Update(req))
must.NoError(t, h.Postrun())
@@ -280,7 +280,6 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
alloc: alloc,
serviceRegWrapper: regWrapper,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
hookResources: cstructs.NewAllocHookResources(),
})
@@ -324,7 +323,6 @@ func TestGroupServiceHook_PreKill(t *testing.T) {
serviceRegWrapper: regWrapper,
shutdownDelayCtx: shutDownCtx,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
hookResources: cstructs.NewAllocHookResources(),
})
@@ -373,7 +371,6 @@ func TestGroupServiceHook_PreKill(t *testing.T) {
alloc: alloc,
serviceRegWrapper: regWrapper,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
hookResources: cstructs.NewAllocHookResources(),
})
@@ -426,7 +423,6 @@ func TestGroupServiceHook_PreKill(t *testing.T) {
alloc: alloc,
serviceRegWrapper: regWrapper,
restarter: agentconsul.NoopRestarter(),
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
hookResources: cstructs.NewAllocHookResources(),
})

View File

@@ -68,13 +68,6 @@ type allocHealthWatcherHook struct {
// alloc set by new func or Update. Must hold hookLock to access.
alloc *structs.Allocation
// taskEnvBuilder is the current builder used to build task environments
// for the group and each of its tasks. Must hold hookLock to modify.
taskEnvBuilder *taskenv.Builder
// taskEnvBuilderFactory creates a new *taskenv.Builder instance.
taskEnvBuilderFactory func() *taskenv.Builder
// isDeploy is true if monitoring a deployment. Set in init(). Must
// hold hookLock to access.
isDeploy bool
@@ -85,7 +78,6 @@ type allocHealthWatcherHook struct {
func newAllocHealthWatcherHook(
logger hclog.Logger,
alloc *structs.Allocation,
taskEnvBuilderFactory func() *taskenv.Builder,
hs healthSetter,
listener *cstructs.AllocListener,
consul serviceregistration.Handler,
@@ -103,15 +95,13 @@ func newAllocHealthWatcherHook(
close(closedDone)
h := &allocHealthWatcherHook{
alloc: alloc,
taskEnvBuilderFactory: taskEnvBuilderFactory,
taskEnvBuilder: taskEnvBuilderFactory(),
cancelFn: func() {}, // initialize to prevent nil func panics
watchDone: closedDone,
consul: consul,
checkStore: checkStore,
healthSetter: hs,
listener: listener,
alloc: alloc,
cancelFn: func() {}, // initialize to prevent nil func panics
watchDone: closedDone,
consul: consul,
checkStore: checkStore,
healthSetter: hs,
listener: listener,
}
h.logger = logger.Named(h.Name())
@@ -134,7 +124,7 @@ func (h *allocHealthWatcherHook) Name() string {
// Prerun or Update. Caller must set/update alloc and logger fields.
//
// Not threadsafe so the caller should lock since Updates occur concurrently.
func (h *allocHealthWatcherHook) init() error {
func (h *allocHealthWatcherHook) init(allocEnv *taskenv.TaskEnv) error {
// No need to watch health as it's already set
if h.healthSetter.HasHealth() {
h.logger.Trace("not watching; already has health set")
@@ -166,7 +156,7 @@ func (h *allocHealthWatcherHook) init() error {
h.logger.Trace("watching", "deadline", deadline, "checks", useChecks, "min_healthy_time", minHealthyTime)
// Create a new tracker, start it, and watch for health results.
tracker := allochealth.NewTracker(
ctx, h.logger, h.alloc, h.listener, h.taskEnvBuilder, h.consul, h.checkStore, minHealthyTime, useChecks,
ctx, h.logger, h.alloc, h.listener, allocEnv, h.consul, h.checkStore, minHealthyTime, useChecks,
)
tracker.Start()
@@ -176,7 +166,7 @@ func (h *allocHealthWatcherHook) init() error {
return nil
}
func (h *allocHealthWatcherHook) Prerun() error {
func (h *allocHealthWatcherHook) Prerun(allocEnv *taskenv.TaskEnv) error {
h.hookLock.Lock()
defer h.hookLock.Unlock()
@@ -186,7 +176,7 @@ func (h *allocHealthWatcherHook) Prerun() error {
}
h.ranOnce = true
return h.init()
return h.init(allocEnv)
}
func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) error {
@@ -210,10 +200,7 @@ func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) err
// Update alloc
h.alloc = req.Alloc
// Create a new taskEnvBuilder with the updated alloc and a nil task
h.taskEnvBuilder = h.taskEnvBuilderFactory().UpdateTask(req.Alloc, nil)
return h.init()
return h.init(req.AllocEnv)
}
func (h *allocHealthWatcherHook) Postrun() error {

View File

@@ -96,7 +96,7 @@ func TestHealthHook_PrerunPostrun(t *testing.T) {
checks := new(mock.CheckShim)
alloc := mock.Alloc()
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks)
// Assert we implemented the right interfaces
prerunh, ok := h.(interfaces.RunnerPrerunHook)
@@ -107,7 +107,8 @@ func TestHealthHook_PrerunPostrun(t *testing.T) {
require.True(ok)
// Prerun
require.NoError(prerunh.Prerun())
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
require.NoError(prerunh.Prerun(env))
// Assert isDeploy is false (other tests peek at isDeploy to determine
// if an Update applied)
@@ -135,10 +136,11 @@ func TestHealthHook_PrerunUpdatePostrun(t *testing.T) {
hs := &mockHealthSetter{}
checks := new(mock.CheckShim)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
// Prerun
require.NoError(h.Prerun())
require.NoError(h.Prerun(env))
// Update multiple times in a goroutine to mimic Client behavior
// (Updates are concurrent with alloc runner but are applied serially).
@@ -175,7 +177,8 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) {
hs := &mockHealthSetter{}
checks := new(mock.CheckShim)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
// Set a DeploymentID to cause ClearHealth to be called
alloc.DeploymentID = uuid.Generate()
@@ -184,7 +187,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) {
// concurrent with alloc runner).
errs := make(chan error, 1)
go func(alloc *structs.Allocation) {
errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc})
errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc, AllocEnv: env})
close(errs)
}(alloc.Copy())
@@ -193,7 +196,7 @@ func TestHealthHook_UpdatePrerunPostrun(t *testing.T) {
}
// Prerun should be a noop
require.NoError(h.Prerun())
require.NoError(h.Prerun(env))
// Assert that the Update took affect by isDeploy being true
h.hookLock.Lock()
@@ -218,7 +221,7 @@ func TestHealthHook_Postrun(t *testing.T) {
alloc := mock.Alloc()
checks := new(mock.CheckShim)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
// Postrun
require.NoError(h.Postrun())
@@ -285,10 +288,11 @@ func TestHealthHook_SetHealth_healthy(t *testing.T) {
hs := newMockHealthSetter()
checks := new(mock.CheckShim)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
// Prerun
require.NoError(h.Prerun())
require.NoError(h.Prerun(env))
// Wait for health to be set (healthy)
select {
@@ -374,10 +378,11 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) {
hs := newMockHealthSetter()
checks := new(mock.CheckShim)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), taskEnvBuilderFactory(alloc), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul, checks).(*allocHealthWatcherHook)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
// Prerun
require.NoError(h.Prerun())
require.NoError(h.Prerun(env))
// Wait to ensure we don't get a healthy status
select {
@@ -396,7 +401,7 @@ func TestHealthHook_SystemNoop(t *testing.T) {
ci.Parallel(t)
alloc := mock.SystemAlloc()
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil)
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), nil, nil, nil, nil)
// Assert that it's the noop impl
_, ok := h.(noopAllocHealthWatcherHook)
@@ -418,15 +423,9 @@ func TestHealthHook_BatchNoop(t *testing.T) {
ci.Parallel(t)
alloc := mock.BatchAlloc()
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), taskEnvBuilderFactory(alloc), nil, nil, nil, nil)
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), nil, nil, nil, nil)
// Assert that it's the noop impl
_, ok := h.(noopAllocHealthWatcherHook)
require.True(t, ok)
}
func taskEnvBuilderFactory(alloc *structs.Allocation) func() *taskenv.Builder {
return func() *taskenv.Builder {
return taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
}
}

View File

@@ -6,6 +6,7 @@ package allocrunner
import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/client/widmgr"
)
@@ -34,7 +35,7 @@ func (*identityHook) Name() string {
return "identity"
}
func (h *identityHook) Prerun() error {
func (h *identityHook) Prerun(_ *taskenv.TaskEnv) error {
// run the renewal
if err := h.widmgr.Run(); err != nil {
return err

View File

@@ -40,13 +40,11 @@ func TestIdentityHook_Prerun(t *testing.T) {
logger := testlog.HCLogger(t)
db := cstate.NewMemDB(logger)
// the WIDMgr env builder never has the task available
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global")
env := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build()
// setup mock signer and WIDMgr
mockSigner := widmgr.NewMockWIDSigner(task.Identities)
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder)
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, env)
allocrunner.widmgr = mockWIDMgr
allocrunner.widsigner = mockSigner
@@ -65,7 +63,7 @@ func TestIdentityHook_Prerun(t *testing.T) {
start := time.Now()
hook := newIdentityHook(logger, mockWIDMgr)
must.Eq(t, hook.Name(), "identity")
must.NoError(t, hook.Prerun())
must.NoError(t, hook.Prerun(env))
time.Sleep(time.Second) // give goroutines a moment to run
sid, err := hook.widmgr.Get(structs.WIHandle{

View File

@@ -4,6 +4,7 @@
package interfaces
import (
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -16,7 +17,7 @@ type RunnerHook interface {
// non-terminal allocations. Terminal allocations do *not* call prerun.
type RunnerPrerunHook interface {
RunnerHook
Prerun() error
Prerun(*taskenv.TaskEnv) error
}
// A RunnerPreKillHook is executed inside of KillTasks before
@@ -55,7 +56,8 @@ type RunnerUpdateHook interface {
}
type RunnerUpdateRequest struct {
Alloc *structs.Allocation
Alloc *structs.Allocation
AllocEnv *taskenv.TaskEnv
}
// A RunnerTaskRestartHook is executed just before the allocation runner is

View File

@@ -11,6 +11,7 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/taskenv"
)
// diskMigrationHook migrates ephemeral disk volumes. Depends on alloc dir
@@ -41,7 +42,7 @@ func (h *diskMigrationHook) Name() string {
return "migrate_disk"
}
func (h *diskMigrationHook) Prerun() error {
func (h *diskMigrationHook) Prerun(_ *taskenv.TaskEnv) error {
ctx := context.TODO()
// Wait for a previous alloc - if any - to terminate

View File

@@ -79,9 +79,6 @@ type networkHook struct {
// the alloc network has been created
networkConfigurator NetworkConfigurator
// taskEnv is used to perform interpolation within the network blocks.
taskEnv *taskenv.TaskEnv
logger hclog.Logger
}
@@ -91,7 +88,6 @@ func newNetworkHook(logger hclog.Logger,
netManager drivers.DriverNetworkManager,
netConfigurator NetworkConfigurator,
networkStatusSetter networkStatusSetter,
taskEnv *taskenv.TaskEnv,
) *networkHook {
return &networkHook{
isolationSetter: ns,
@@ -99,7 +95,6 @@ func newNetworkHook(logger hclog.Logger,
alloc: alloc,
manager: netManager,
networkConfigurator: netConfigurator,
taskEnv: taskEnv,
logger: logger,
}
}
@@ -114,7 +109,7 @@ func (h *networkHook) Name() string {
return "network"
}
func (h *networkHook) Prerun() error {
func (h *networkHook) Prerun(allocEnv *taskenv.TaskEnv) error {
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
if len(tg.Networks) == 0 || tg.Networks[0].Mode == "host" || tg.Networks[0].Mode == "" {
return nil
@@ -126,7 +121,7 @@ func (h *networkHook) Prerun() error {
}
// Perform our networks block interpolation.
interpolatedNetworks := taskenv.InterpolateNetworks(h.taskEnv, tg.Networks)
interpolatedNetworks := taskenv.InterpolateNetworks(allocEnv, tg.Networks)
// Interpolated values need to be validated. It is also possible a user
// supplied hostname avoids the validation on job registrations because it

View File

@@ -82,10 +82,11 @@ func TestNetworkHook_Prerun_Postrun_group(t *testing.T) {
expectedStatus: nil,
}
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
logger := testlog.HCLogger(t)
hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter, envBuilder.Build())
must.NoError(t, hook.Prerun())
hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
must.NoError(t, hook.Prerun(env))
must.True(t, setter.called)
must.False(t, destroyCalled)
must.NoError(t, hook.Postrun())
@@ -125,10 +126,11 @@ func TestNetworkHook_Prerun_Postrun_host(t *testing.T) {
expectedStatus: nil,
}
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
logger := testlog.HCLogger(t)
hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter, envBuilder.Build())
must.NoError(t, hook.Prerun())
hook := newNetworkHook(logger, setter, alloc, nm, &hostNetworkConfigurator{}, statusSetter)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
must.NoError(t, hook.Prerun(env))
must.False(t, setter.called)
must.False(t, destroyCalled)
must.NoError(t, hook.Postrun())
@@ -184,8 +186,7 @@ func TestNetworkHook_Prerun_Postrun_ExistingNetNS(t *testing.T) {
cni: fakePlugin,
nsOpts: &nsOpts{},
}
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region)
env := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
testCases := []struct {
name string
@@ -251,9 +252,9 @@ func TestNetworkHook_Prerun_Postrun_ExistingNetNS(t *testing.T) {
fakePlugin.checkErrors = tc.checkErrs
configurator.nodeAttrs["plugins.cni.version.bridge"] = tc.cniVersion
hook := newNetworkHook(testlog.HCLogger(t), isolationSetter,
alloc, nm, configurator, statusSetter, envBuilder.Build())
alloc, nm, configurator, statusSetter)
err := hook.Prerun()
err := hook.Prerun(env)
if tc.expectPrerunError == "" {
must.NoError(t, err)
} else {

View File

@@ -92,9 +92,9 @@ func TestIdentityHook_RenewAll(t *testing.T) {
logger := testlog.HCLogger(t)
db := cstate.NewMemDB(logger)
mockSigner := widmgr.NewMockWIDSigner(task.Identities)
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global")
allocEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build()
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder)
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, allocEnv)
mockWIDMgr.SetMinWait(time.Second) // fast renewals, because the default is 10s
mockLifecycle := trtesting.NewMockTaskHooks()
@@ -212,8 +212,8 @@ func TestIdentityHook_RenewOne(t *testing.T) {
logger := testlog.HCLogger(t)
db := cstate.NewMemDB(logger)
mockSigner := widmgr.NewMockWIDSigner(task.Identities)
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global")
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, envBuilder)
allocEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build()
mockWIDMgr := widmgr.NewWIDMgr(mockSigner, alloc, db, logger, allocEnv)
mockWIDMgr.SetMinWait(time.Second) // fast renewals, because the default is 10s
h := &identityHook{

View File

@@ -148,9 +148,9 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
if vault != nil {
vaultFunc = func(_ string) (vaultclient.VaultClient, error) { return vault, nil }
}
// the envBuilder for the WIDMgr never has access to the task, so don't
// include it here
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global")
// the env for the WIDMgr never has access to the task, so don't include it
// here
allocEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build()
conf := &Config{
Alloc: alloc,
@@ -171,7 +171,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
ServiceRegWrapper: wrapperMock,
Getter: getter.TestSandbox(t),
Wranglers: proclib.MockWranglers(t),
WIDMgr: widmgr.NewWIDMgr(widsigner, alloc, db, logger, envBuilder),
WIDMgr: widmgr.NewWIDMgr(widsigner, alloc, db, logger, allocEnv),
AllocHookResources: cstructs.NewAllocHookResources(),
}

View File

@@ -99,8 +99,8 @@ func setupTestVaultHook(t *testing.T, config *vaultHookConfig) *vaultHook {
if config.widmgr == nil {
db := cstate.NewMemDB(config.logger)
signer := widmgr.NewMockWIDSigner(config.task.Identities)
envBuilder := taskenv.NewBuilder(mock.Node(), config.alloc, nil, "global")
config.widmgr = widmgr.NewWIDMgr(signer, config.alloc, db, config.logger, envBuilder)
allocEnv := taskenv.NewBuilder(mock.Node(), config.alloc, nil, "global").Build()
config.widmgr = widmgr.NewWIDMgr(signer, config.alloc, db, config.logger, allocEnv)
err := config.widmgr.Run()
must.NoError(t, err)
}

View File

@@ -9,6 +9,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/taskenv"
)
// upstreamAllocsHook waits for a PrevAllocWatcher to exit before allowing
@@ -33,7 +34,7 @@ func (h *upstreamAllocsHook) Name() string {
return "await_previous_allocations"
}
func (h *upstreamAllocsHook) Prerun() error {
func (h *upstreamAllocsHook) Prerun(_ *taskenv.TaskEnv) error {
// Wait for a previous alloc - if any - to terminate
return h.allocWatcher.Wait(context.Background())
}

View File

@@ -5,6 +5,7 @@ package taskenv
import (
"fmt"
"maps"
"net"
"os"
"path/filepath"
@@ -248,6 +249,32 @@ func (t *TaskEnv) All() map[string]string {
return m
}
// WithTask returns a shallow copy of the TaskEnv, with the EnvMap deep-cloned
// and overwritten by task provided. This is only for use in the allocrunner
// hooks which may need to interpolate per-task services or identities, as it
// doesn't re-populate the rest of the environment
func (t *TaskEnv) WithTask(alloc *structs.Allocation, task *structs.Task) *TaskEnv {
if t == nil {
return t
}
newT := new(TaskEnv)
*newT = *t
newT.envList = []string{}
newT.EnvMap = maps.Clone(t.EnvMap)
combined := alloc.Job.CombinedTaskMeta(alloc.TaskGroup, task.Name)
for k, v := range combined {
newT.EnvMap[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
newT.EnvMap[fmt.Sprintf("%s%s", MetaPrefix, k)] = v
}
for k, v := range task.Env {
newT.EnvMap[k] = v
}
newT.EnvMap[TaskName] = task.Name
return newT
}
// AllValues is a map of the task's environment variables and the node's
// attributes with cty.Value (String) values. Errors including keys are
// returned in a map by key name.
@@ -658,11 +685,12 @@ func (b *Builder) buildEnv(allocDir, localDir, secretsDir string,
// Build must be called after all the tasks environment values have been set.
func (b *Builder) Build() *TaskEnv {
nodeAttrs := make(map[string]string)
b.mu.RLock()
defer b.mu.RUnlock()
nodeAttrs := make(map[string]string)
if b.region != "" {
// Copy region over to node attrs
nodeAttrs[nodeRegionKey] = b.region
@@ -678,13 +706,6 @@ func (b *Builder) Build() *TaskEnv {
return NewTaskEnv(envMap, envMapClient, deviceEnvs, nodeAttrs, b.clientTaskRoot, b.clientSharedAllocDir)
}
// UpdateTask updates the environment based on a new alloc and task.
func (b *Builder) UpdateTask(alloc *structs.Allocation, task *structs.Task) *Builder {
b.mu.Lock()
defer b.mu.Unlock()
return b.setTask(task).setAlloc(alloc)
}
// SetHookEnv sets environment variables from a hook. Variables are
// Last-Write-Wins, so if a hook writes a variable that's also written by a
// later hook, the later hooks value always gets used.
@@ -851,6 +872,9 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {
// setNode is called from NewBuilder to populate node attributes.
func (b *Builder) setNode(n *structs.Node) *Builder {
if n == nil {
return b
}
b.nodeAttrs = make(map[string]string, 4+len(n.Attributes)+len(n.Meta))
b.nodeAttrs[nodeIdKey] = n.ID
b.nodeAttrs[nodeNameKey] = n.Name

View File

@@ -19,6 +19,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/shoenig/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -707,54 +708,41 @@ func TestEnvironment_DashesInTaskName(t *testing.T) {
}
}
// TestEnvironment_UpdateTask asserts env vars and task meta are updated when a
// task is updated.
func TestEnvironment_UpdateTask(t *testing.T) {
// TestEnvironment_WithTask asserts env vars are updated in an
// alloc environment when a task is updated.
func TestEnvironment_WithTask(t *testing.T) {
ci.Parallel(t)
a := mock.Alloc()
a.Job.TaskGroups[0].Meta = map[string]string{"tgmeta": "tgmetaval"}
task := a.Job.TaskGroups[0].Tasks[0]
task.Name = "orig"
task.Env = map[string]string{"env": "envval"}
task.Meta = map[string]string{"taskmeta": "taskmetaval"}
builder := NewBuilder(mock.Node(), a, task, "global")
builder := NewBuilder(mock.Node(), a, nil, "global")
origMap := builder.Build().Map()
if origMap["NOMAD_TASK_NAME"] != "orig" {
t.Errorf("Expected NOMAD_TASK_NAME=orig but found %q", origMap["NOMAD_TASK_NAME"])
}
if origMap["NOMAD_META_taskmeta"] != "taskmetaval" {
t.Errorf("Expected NOMAD_META_taskmeta=taskmetaval but found %q", origMap["NOMAD_META_taskmeta"])
}
if origMap["env"] != "envval" {
t.Errorf("Expected env=envva but found %q", origMap["env"])
}
if origMap["NOMAD_META_tgmeta"] != "tgmetaval" {
t.Errorf("Expected NOMAD_META_tgmeta=tgmetaval but found %q", origMap["NOMAD_META_tgmeta"])
}
test.Eq(t, "web", origMap["NOMAD_GROUP_NAME"])
test.Eq(t, "armon", origMap["NOMAD_META_owner"])
test.Eq(t, "", origMap["NOMAD_META_taskmeta"])
a.Job.TaskGroups[0].Meta = map[string]string{"tgmeta2": "tgmetaval2"}
task.Name = "new"
task := a.Job.TaskGroups[0].Tasks[0]
task.Name = "task1"
task.Env = map[string]string{"env": "envval"}
task.Meta = map[string]string{"taskmeta": "taskmetaval"}
newMap1 := builder.Build().WithTask(a, task).Map()
test.Eq(t, "task1", newMap1["NOMAD_TASK_NAME"])
test.Eq(t, "taskmetaval", newMap1["NOMAD_META_taskmeta"])
test.Eq(t, "envval", newMap1["env"])
task.Name = "task2"
task.Env = map[string]string{"env2": "envval2"}
task.Meta = map[string]string{"taskmeta2": "taskmetaval2"}
newMap := builder.UpdateTask(a, task).Build().Map()
if newMap["NOMAD_TASK_NAME"] != "new" {
t.Errorf("Expected NOMAD_TASK_NAME=new but found %q", newMap["NOMAD_TASK_NAME"])
}
if newMap["NOMAD_META_taskmeta2"] != "taskmetaval2" {
t.Errorf("Expected NOMAD_META_taskmeta=taskmetaval but found %q", newMap["NOMAD_META_taskmeta2"])
}
if newMap["env2"] != "envval2" {
t.Errorf("Expected env=envva but found %q", newMap["env2"])
}
if newMap["NOMAD_META_tgmeta2"] != "tgmetaval2" {
t.Errorf("Expected NOMAD_META_tgmeta=tgmetaval but found %q", newMap["NOMAD_META_tgmeta2"])
}
if v, ok := newMap["NOMAD_META_taskmeta"]; ok {
t.Errorf("Expected NOMAD_META_taskmeta to be unset but found: %q", v)
}
// original env should not have been mutated
newMap2 := builder.Build().WithTask(a, task).Map()
test.Eq(t, "task2", newMap2["NOMAD_TASK_NAME"])
test.Eq(t, "taskmetaval2", newMap2["NOMAD_META_taskmeta2"])
test.Eq(t, "envval2", newMap2["env2"])
test.Eq(t, "", newMap2["NOMAD_META_taskmeta"])
test.Eq(t, "", newMap2["env"])
}
// TestEnvironment_InterpolateEmptyOptionalMeta asserts that in a parameterized

View File

@@ -54,12 +54,10 @@ type WIDMgr struct {
logger hclog.Logger
}
func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, db cstate.StateDB, logger hclog.Logger, envBuilder *taskenv.Builder) *WIDMgr {
func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, db cstate.StateDB, logger hclog.Logger, allocEnv *taskenv.TaskEnv) *WIDMgr {
widspecs := map[structs.WIHandle]*structs.WorkloadIdentity{}
tg := a.Job.LookupTaskGroup(a.TaskGroup)
allocEnv := envBuilder.Build()
for _, service := range tg.Services {
if service.Identity != nil {
handle := *service.IdentityHandle(allocEnv.ReplaceEnv)
@@ -74,7 +72,7 @@ func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, db cstate.StateDB,
}
// update the builder for this task
taskEnv := envBuilder.UpdateTask(a, task).Build()
taskEnv := allocEnv.WithTask(a, task)
for _, service := range task.Services {
if service.Identity != nil {
handle := *service.IdentityHandle(taskEnv.ReplaceEnv)

View File

@@ -30,10 +30,10 @@ func TestWIDMgr_Restore(t *testing.T) {
}
alloc.Job.TaskGroups[0].Tasks[0].Services[0].Identity = widSpecs[0]
alloc.Job.TaskGroups[0].Tasks[0].Identities = widSpecs[1:]
envBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, "global")
env := taskenv.NewBuilder(mock.Node(), alloc, nil, "global").Build()
signer := NewMockWIDSigner(widSpecs)
mgr := NewWIDMgr(signer, alloc, db, logger, envBuilder)
mgr := NewWIDMgr(signer, alloc, db, logger, env)
// restore, but we haven't previously saved to the db
hasExpired, err := mgr.restoreStoredIdentities()
@@ -54,7 +54,7 @@ func TestWIDMgr_Restore(t *testing.T) {
widSpecs[2].TTL = time.Second
signer.setWIDs(widSpecs)
wiHandle := service.IdentityHandle(envBuilder.Build().ReplaceEnv)
wiHandle := service.IdentityHandle(env.ReplaceEnv)
mgr.widSpecs[*wiHandle].TTL = time.Second
// force a re-sign to re-populate the lastToken and save to the db

View File

@@ -273,10 +273,10 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error {
// This should only be used to interpolate connect service names which are
// used in sidecar or gateway task names. Note that the service name might
// also be interpolated with job specifics during service canonicalization.
env := taskenv.NewEmptyBuilder().UpdateTask(&structs.Allocation{
env := taskenv.NewBuilder(nil, &structs.Allocation{
Job: job,
TaskGroup: g.Name,
}, nil).Build()
}, nil, job.Region).Build()
for _, service := range g.Services {
switch {