mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
services: get Consul token from hook resources (#18600)
When Workload Identity is being used with Consul, the `consul_hook` will add Consul tokens to the alloc hook resources. Update the `group_service_hook` and `service_hook` to use those tokens when available for registering and deregistering Consul workloads.
This commit is contained in:
@@ -129,6 +129,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
||||
alloc: alloc,
|
||||
providerNamespace: alloc.ServiceProviderNamespace(),
|
||||
serviceRegWrapper: ar.serviceRegWrapper,
|
||||
hookResources: ar.hookResources,
|
||||
restarter: ar,
|
||||
taskEnvBuilder: newEnvBuilder(),
|
||||
networkStatus: ar,
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -42,6 +43,8 @@ type groupServiceHook struct {
|
||||
// and check registration and deregistration.
|
||||
serviceRegWrapper *wrapper.HandlerWrapper
|
||||
|
||||
hookResources *cstructs.AllocHookResources
|
||||
|
||||
logger hclog.Logger
|
||||
|
||||
// The following fields may be updated
|
||||
@@ -72,6 +75,8 @@ type groupServiceHookConfig struct {
|
||||
// serviceRegWrapper is the handler wrapper that is used to perform service
|
||||
// and check registration and deregistration.
|
||||
serviceRegWrapper *wrapper.HandlerWrapper
|
||||
|
||||
hookResources *cstructs.AllocHookResources
|
||||
}
|
||||
|
||||
func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
|
||||
@@ -95,6 +100,7 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
|
||||
logger: cfg.logger.Named(groupServiceHookName),
|
||||
serviceRegWrapper: cfg.serviceRegWrapper,
|
||||
services: tg.Services,
|
||||
hookResources: cfg.hookResources,
|
||||
shutdownDelayCtx: cfg.shutdownDelayCtx,
|
||||
}
|
||||
|
||||
@@ -257,6 +263,15 @@ func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.Work
|
||||
// Interpolate with the task's environment
|
||||
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)
|
||||
|
||||
allocTokens := h.hookResources.GetConsulTokens()
|
||||
|
||||
tokens := map[string]string{}
|
||||
for _, service := range h.services {
|
||||
if token, ok := allocTokens[service.Cluster][service.MakeUniqueIdentityName()]; ok {
|
||||
tokens[service.Name] = token
|
||||
}
|
||||
}
|
||||
|
||||
var netStatus *structs.AllocNetworkStatus
|
||||
if h.networkStatus != nil {
|
||||
netStatus = h.networkStatus.NetworkStatus()
|
||||
@@ -279,5 +294,6 @@ func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.Work
|
||||
NetworkStatus: netStatus,
|
||||
Ports: h.ports,
|
||||
Canary: h.canary,
|
||||
Tokens: tokens,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/pointer"
|
||||
@@ -52,6 +53,7 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
})
|
||||
must.NoError(t, h.Prerun())
|
||||
|
||||
@@ -93,6 +95,7 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) {
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
})
|
||||
must.NoError(t, h.Prerun())
|
||||
|
||||
@@ -134,6 +137,7 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
})
|
||||
must.NoError(t, h.Prerun())
|
||||
|
||||
@@ -179,6 +183,7 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) {
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
})
|
||||
must.NoError(t, h.Prerun())
|
||||
|
||||
@@ -233,6 +238,7 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) {
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
})
|
||||
must.NoError(t, h.Prerun())
|
||||
|
||||
@@ -281,6 +287,7 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
})
|
||||
|
||||
services := h.getWorkloadServicesLocked()
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/serviceregistration"
|
||||
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
@@ -260,6 +261,7 @@ func TestScript_TaskEnvInterpolation(t *testing.T) {
|
||||
task: task,
|
||||
serviceRegWrapper: regWrap,
|
||||
logger: logger,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
})
|
||||
// emulate prestart having been fired
|
||||
svcHook.taskEnv = env
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
@@ -43,6 +44,8 @@ type serviceHookConfig struct {
|
||||
// Restarter is a subset of the TaskLifecycle interface
|
||||
restarter serviceregistration.WorkloadRestarter
|
||||
|
||||
hookResources *cstructs.AllocHookResources
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
@@ -80,6 +83,8 @@ type serviceHook struct {
|
||||
// we do not call this multiple times for a single task when not needed.
|
||||
deregistered bool
|
||||
|
||||
hookResources *cstructs.AllocHookResources
|
||||
|
||||
// Since Update() may be called concurrently with any other hook all
|
||||
// hook methods must be fully serialized
|
||||
mu sync.Mutex
|
||||
@@ -96,6 +101,7 @@ func newServiceHook(c serviceHookConfig) *serviceHook {
|
||||
serviceRegWrapper: c.serviceRegWrapper,
|
||||
services: c.task.Services,
|
||||
restarter: c.restarter,
|
||||
hookResources: c.hookResources,
|
||||
ports: c.alloc.AllocatedResources.Shared.Ports,
|
||||
}
|
||||
|
||||
@@ -224,6 +230,15 @@ func (h *serviceHook) getWorkloadServices() *serviceregistration.WorkloadService
|
||||
// Interpolate with the task's environment
|
||||
interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services)
|
||||
|
||||
allocTokens := h.hookResources.GetConsulTokens()
|
||||
|
||||
tokens := map[string]string{}
|
||||
for _, service := range h.services {
|
||||
if token, ok := allocTokens[service.Cluster][service.MakeUniqueIdentityName()]; ok {
|
||||
tokens[service.Name] = token
|
||||
}
|
||||
}
|
||||
|
||||
info := structs.AllocInfo{
|
||||
AllocID: h.allocID,
|
||||
JobID: h.jobID,
|
||||
@@ -243,5 +258,6 @@ func (h *serviceHook) getWorkloadServices() *serviceregistration.WorkloadService
|
||||
Networks: h.networks,
|
||||
Canary: h.canary,
|
||||
Ports: h.ports,
|
||||
Tokens: tokens,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
@@ -43,6 +44,7 @@ func TestUpdate_beforePoststart(t *testing.T) {
|
||||
task: alloc.LookupTask("web"),
|
||||
serviceRegWrapper: regWrap,
|
||||
logger: logger,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
})
|
||||
require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{
|
||||
Alloc: alloc,
|
||||
@@ -108,6 +110,7 @@ func Test_serviceHook_multipleDeRegisterCall(t *testing.T) {
|
||||
task: alloc.LookupTask("web"),
|
||||
serviceRegWrapper: regWrap,
|
||||
logger: logger,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
})
|
||||
|
||||
// Interpolating workload services performs a check on the task env, if it
|
||||
@@ -184,6 +187,7 @@ func Test_serviceHook_Nomad(t *testing.T) {
|
||||
serviceRegWrapper: regWrapper,
|
||||
restarter: agentconsul.NoopRestarter(),
|
||||
logger: logger,
|
||||
hookResources: cstructs.NewAllocHookResources(),
|
||||
})
|
||||
|
||||
// Create a taskEnv builder to use in requests, otherwise interpolation of
|
||||
|
||||
@@ -132,6 +132,7 @@ func (tr *TaskRunner) initHooks() {
|
||||
providerNamespace: serviceProviderNamespace,
|
||||
serviceRegWrapper: tr.serviceRegWrapper,
|
||||
restarter: tr,
|
||||
hookResources: tr.allocHookResources,
|
||||
logger: hookLogger,
|
||||
}))
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
@@ -173,6 +174,7 @@ func TestConsul_Integration(t *testing.T) {
|
||||
StartConditionMetCh: closedCh,
|
||||
ServiceRegWrapper: wrapper.NewHandlerWrapper(logger, serviceClient, regMock.NewServiceRegistrationHandler(logger)),
|
||||
Wranglers: proclib.MockWranglers(t),
|
||||
AllocHookResources: cstructs.NewAllocHookResources(),
|
||||
}
|
||||
|
||||
tr, err := taskrunner.NewTaskRunner(config)
|
||||
|
||||
Reference in New Issue
Block a user