diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index d0ed29c43..9769a6cb0 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -117,6 +117,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { // Create the alloc directory hook. This is run first to ensure the // directory path exists for other hooks. alloc := ar.Alloc() + ar.runnerHooks = []interfaces.RunnerHook{ newIdentityHook(hookLogger, ar.widmgr), newAllocDirHook(hookLogger, ar.allocDir), @@ -145,8 +146,10 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { logger: hookLogger, shutdownDelayCtx: ar.shutdownDelayCtx, }), - newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig, config.Node.Attributes), - newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), + newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, + config.GetConsulConfigs(ar.logger), config.Node.Attributes), + newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, + config.GetConsulConfigs(ar.logger)), newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID), newChecksHook(hookLogger, alloc, ar.checkStore, ar, builtTaskEnv), } diff --git a/client/allocrunner/consul_grpc_sock_hook.go b/client/allocrunner/consul_grpc_sock_hook.go index 7ff085141..2ee814bf6 100644 --- a/client/allocrunner/consul_grpc_sock_hook.go +++ b/client/allocrunner/consul_grpc_sock_hook.go @@ -16,6 +16,8 @@ import ( "time" "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-set/v2" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/nomad/structs" @@ -48,26 +50,46 @@ type consulGRPCSocketHook struct { // mu synchronizes proxy and alloc which may be mutated and read concurrently // via Prerun, Update, Postrun. - mu sync.Mutex - alloc *structs.Allocation - proxy *grpcSocketProxy + mu sync.Mutex + alloc *structs.Allocation + proxies map[string]*grpcSocketProxy } func newConsulGRPCSocketHook( logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, - config *config.ConsulConfig, nodeAttrs map[string]string) *consulGRPCSocketHook { + configs map[string]*config.ConsulConfig, nodeAttrs map[string]string) *consulGRPCSocketHook { - // Attempt to find the gRPC port via the node attributes, otherwise use the - // default fallback. - consulGRPCPort, ok := nodeAttrs["consul.grpc"] - if !ok { - consulGRPCPort = consulGRPCFallbackPort + // Get the deduplicated set of Consul clusters that are needed by this + // alloc. For Nomad CE, this will always be just the default cluster. + clusterNames := set.New[string](1) + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + for _, s := range tg.Services { + clusterNames.Insert(s.GetConsulClusterName(tg)) } + proxies := map[string]*grpcSocketProxy{} + + clusterNames.ForEach(func(clusterName string) bool { + + // Attempt to find the gRPC port via the node attributes, otherwise use + // the default fallback. + attrName := "consul.grpc" + if clusterName != structs.ConsulDefaultCluster { + attrName = "consul." + clusterName + ".grpc" + } + consulGRPCPort, ok := nodeAttrs[attrName] + if !ok { + consulGRPCPort = consulGRPCFallbackPort + } + + proxies[clusterName] = newGRPCSocketProxy(logger, allocDir, + configs[clusterName], consulGRPCPort) + return true + }) return &consulGRPCSocketHook{ - alloc: alloc, - proxy: newGRPCSocketProxy(logger, allocDir, config, consulGRPCPort), - logger: logger.Named(consulGRPCSockHookName), + alloc: alloc, + proxies: proxies, + logger: logger.Named(consulGRPCSockHookName), } } @@ -102,7 +124,13 @@ func (h *consulGRPCSocketHook) Prerun() error { return nil } - return h.proxy.run(h.alloc) + var mErr *multierror.Error + for _, proxy := range h.proxies { + if err := proxy.run(h.alloc); err != nil { + mErr = multierror.Append(mErr, err) + } + } + return mErr.ErrorOrNil() } // Update creates a gRPC socket file and proxy if there are any Connect @@ -116,19 +144,31 @@ func (h *consulGRPCSocketHook) Update(req *interfaces.RunnerUpdateRequest) error if !h.shouldRun() { return nil } + if len(h.proxies) == 0 { + return fmt.Errorf("cannot update alloc to Connect in-place") + } - return h.proxy.run(h.alloc) + var mErr *multierror.Error + for _, proxy := range h.proxies { + if err := proxy.run(h.alloc); err != nil { + mErr = multierror.Append(mErr, err) + } + } + return mErr.ErrorOrNil() } func (h *consulGRPCSocketHook) Postrun() error { h.mu.Lock() defer h.mu.Unlock() - if err := h.proxy.stop(); err != nil { - // Only log failures to stop proxies. Worst case scenario is a - // small goroutine leak. - h.logger.Debug("error stopping Consul proxy", "error", err) + for _, proxy := range h.proxies { + if err := proxy.stop(); err != nil { + // Only log failures to stop proxies. Worst case scenario is a small + // goroutine leak. + h.logger.Warn("error stopping Consul proxy", "error", err) + } } + return nil } @@ -186,7 +226,8 @@ func (p *grpcSocketProxy) run(alloc *structs.Allocation) error { // make sure either grpc or http consul address has been configured if p.config.GRPCAddr == "" && p.config.Addr == "" { - return errors.New("consul address must be set on nomad client") + return fmt.Errorf("consul address for cluster %q must be set on nomad client", + p.config.Name) } destAddr := p.config.GRPCAddr @@ -195,13 +236,17 @@ func (p *grpcSocketProxy) run(alloc *structs.Allocation) error { // default of 8502. host, _, err := net.SplitHostPort(p.config.Addr) if err != nil { - return fmt.Errorf("error parsing Consul address %q: %v", - p.config.Addr, err) + return fmt.Errorf("error parsing Consul address %q: %v", p.config.Addr, err) } destAddr = net.JoinHostPort(host, p.consulGRPCFallbackPort) } - hostGRPCSocketPath := filepath.Join(p.allocDir.AllocDir, allocdir.AllocGRPCSocket) + socketFile := allocdir.AllocGRPCSocket + if p.config.Name != structs.ConsulDefaultCluster && p.config.Name != "" { + socketFile = filepath.Join(allocdir.SharedAllocName, allocdir.TmpDirName, + "consul_"+p.config.Name+"_grpc.sock") + } + hostGRPCSocketPath := filepath.Join(p.allocDir.AllocDir, socketFile) // if the socket already exists we'll try to remove it, but if not then any // other errors will bubble up to the caller here or when we try to listen diff --git a/client/allocrunner/consul_grpc_sock_hook_test.go b/client/allocrunner/consul_grpc_sock_hook_test.go index b08d4575c..8ed87f85e 100644 --- a/client/allocrunner/consul_grpc_sock_hook_test.go +++ b/client/allocrunner/consul_grpc_sock_hook_test.go @@ -17,7 +17,9 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -33,9 +35,11 @@ func TestConsulGRPCSocketHook_PrerunPostrun_Ok(t *testing.T) { fakeConsul, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) defer fakeConsul.Close() - consulConfig := &config.ConsulConfig{ - GRPCAddr: fakeConsul.Addr().String(), - } + + consulConfigs := map[string]*config.ConsulConfig{ + structs.ConsulDefaultCluster: { + GRPCAddr: fakeConsul.Addr().String(), + }} alloc := mock.ConnectAlloc() @@ -45,7 +49,7 @@ func TestConsulGRPCSocketHook_PrerunPostrun_Ok(t *testing.T) { defer cleanup() // Start the unix socket proxy - h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfig, map[string]string{}) + h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{}) require.NoError(t, h.Prerun()) gRPCSock := filepath.Join(allocDir.AllocDir, allocdir.AllocGRPCSocket) @@ -108,7 +112,8 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) { logger := testlog.HCLogger(t) // A config without an Addr or GRPCAddr is invalid. - consulConfig := &config.ConsulConfig{} + consulConfigs := map[string]*config.ConsulConfig{ + structs.ConsulDefaultCluster: {}} alloc := mock.Alloc() connectAlloc := mock.ConnectAlloc() @@ -119,36 +124,36 @@ func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) { { // An alloc without a Connect proxy sidecar should not return // an error. - h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfig, map[string]string{}) - require.NoError(t, h.Prerun()) + h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{}) + must.NoError(t, h.Prerun()) // Postrun should be a noop - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) } { // An alloc *with* a Connect proxy sidecar *should* return an error // when Consul is not configured. - h := newConsulGRPCSocketHook(logger, connectAlloc, allocDir, consulConfig, map[string]string{}) - require.EqualError(t, h.Prerun(), "consul address must be set on nomad client") + h := newConsulGRPCSocketHook(logger, connectAlloc, allocDir, consulConfigs, map[string]string{}) + must.ErrorContains(t, h.Prerun(), `consul address for cluster "" must be set on nomad client`) // Postrun should be a noop - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) } { // Updating an alloc without a sidecar to have a sidecar should // error when the sidecar is added. - h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfig, map[string]string{}) - require.NoError(t, h.Prerun()) + h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{}) + must.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{ Alloc: connectAlloc, } - require.EqualError(t, h.Update(req), "consul address must be set on nomad client") + must.EqError(t, h.Update(req), "cannot update alloc to Connect in-place") // Postrun should be a noop - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) } } diff --git a/client/allocrunner/consul_http_sock_hook.go b/client/allocrunner/consul_http_sock_hook.go index 7c725c8d0..eaf535984 100644 --- a/client/allocrunner/consul_http_sock_hook.go +++ b/client/allocrunner/consul_http_sock_hook.go @@ -14,6 +14,8 @@ import ( "time" "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-set/v2" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/nomad/structs" @@ -36,16 +38,31 @@ type consulHTTPSockHook struct { // lock synchronizes proxy and alloc which may be mutated and read concurrently // via Prerun, Update, and Postrun. - lock sync.Mutex - alloc *structs.Allocation - proxy *httpSocketProxy + lock sync.Mutex + alloc *structs.Allocation + proxies map[string]*httpSocketProxy } -func newConsulHTTPSocketHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulHTTPSockHook { +func newConsulHTTPSocketHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, configs map[string]*config.ConsulConfig) *consulHTTPSockHook { + + // Get the deduplicated set of Consul clusters that are needed by this + // alloc. For Nomad CE, this will always be just the default cluster. + clusterNames := set.New[string](1) + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + for _, s := range tg.Services { + clusterNames.Insert(s.GetConsulClusterName(tg)) + } + proxies := map[string]*httpSocketProxy{} + + clusterNames.ForEach(func(clusterName string) bool { + proxies[clusterName] = newHTTPSocketProxy(logger, allocDir, configs[clusterName]) + return true + }) + return &consulHTTPSockHook{ - alloc: alloc, - proxy: newHTTPSocketProxy(logger, allocDir, config), - logger: logger.Named(consulHTTPSocketHookName), + alloc: alloc, + proxies: proxies, + logger: logger.Named(consulHTTPSocketHookName), } } @@ -81,7 +98,13 @@ func (h *consulHTTPSockHook) Prerun() error { return nil } - return h.proxy.run(h.alloc) + var mErr *multierror.Error + for _, proxy := range h.proxies { + if err := proxy.run(h.alloc); err != nil { + mErr = multierror.Append(mErr, err) + } + } + return mErr.ErrorOrNil() } func (h *consulHTTPSockHook) Update(req *interfaces.RunnerUpdateRequest) error { @@ -93,17 +116,29 @@ func (h *consulHTTPSockHook) Update(req *interfaces.RunnerUpdateRequest) error { if !h.shouldRun() { return nil } + if len(h.proxies) == 0 { + return fmt.Errorf("cannot update alloc to Connect in-place") + } - return h.proxy.run(h.alloc) + var mErr *multierror.Error + for _, proxy := range h.proxies { + if err := proxy.run(h.alloc); err != nil { + mErr = multierror.Append(mErr, err) + } + } + return mErr.ErrorOrNil() } func (h *consulHTTPSockHook) Postrun() error { h.lock.Lock() defer h.lock.Unlock() - if err := h.proxy.stop(); err != nil { - // Only log a failure to stop, worst case is the proxy leaks a goroutine. - h.logger.Warn("error stopping Consul HTTP proxy", "error", err) + for _, proxy := range h.proxies { + if err := proxy.stop(); err != nil { + // Only log failures to stop proxies. Worst case scenario is a small + // goroutine leak. + h.logger.Warn("error stopping Consul HTTP proxy", "error", err) + } } return nil @@ -158,7 +193,12 @@ func (p *httpSocketProxy) run(alloc *structs.Allocation) error { return errors.New("consul address must be set on nomad client") } - hostHTTPSockPath := filepath.Join(p.allocDir.AllocDir, allocdir.AllocHTTPSocket) + socketFile := allocdir.AllocHTTPSocket + if p.config.Name != structs.ConsulDefaultCluster && p.config.Name != "" { + socketFile = filepath.Join(allocdir.SharedAllocName, allocdir.TmpDirName, + "consul_"+p.config.Name+"_http.sock") + } + hostHTTPSockPath := filepath.Join(p.allocDir.AllocDir, socketFile) if err := maybeRemoveOldSocket(hostHTTPSockPath); err != nil { return err } diff --git a/client/allocrunner/consul_http_sock_hook_test.go b/client/allocrunner/consul_http_sock_hook_test.go index 022da2018..a6a31ec2c 100644 --- a/client/allocrunner/consul_http_sock_hook_test.go +++ b/client/allocrunner/consul_http_sock_hook_test.go @@ -13,7 +13,9 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -24,8 +26,8 @@ func TestConsulSocketHook_PrerunPostrun_Ok(t *testing.T) { require.NoError(t, err) defer fakeConsul.Close() - consulConfig := &config.ConsulConfig{ - Addr: fakeConsul.Addr().String(), + consulConfigs := map[string]*config.ConsulConfig{ + structs.ConsulDefaultCluster: {Addr: fakeConsul.Addr().String()}, } alloc := mock.ConnectNativeAlloc("bridge") @@ -36,7 +38,7 @@ func TestConsulSocketHook_PrerunPostrun_Ok(t *testing.T) { defer cleanupDir() // start unix socket proxy - h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfig) + h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfigs) require.NoError(t, h.Prerun()) httpSocket := filepath.Join(allocDir.AllocDir, allocdir.AllocHTTPSocket) @@ -97,7 +99,9 @@ func TestConsulHTTPSocketHook_Prerun_Error(t *testing.T) { logger := testlog.HCLogger(t) - consulConfig := new(config.ConsulConfig) + consulConfigs := map[string]*config.ConsulConfig{ + structs.ConsulDefaultCluster: new(config.ConsulConfig), + } alloc := mock.Alloc() connectNativeAlloc := mock.ConnectNativeAlloc("bridge") @@ -107,20 +111,20 @@ func TestConsulHTTPSocketHook_Prerun_Error(t *testing.T) { { // an alloc without a connect native task should not return an error - h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfig) - require.NoError(t, h.Prerun()) + h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfigs) + must.NoError(t, h.Prerun()) // postrun should be a noop - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) } { // an alloc with a native task should return an error when consul is not // configured - h := newConsulHTTPSocketHook(logger, connectNativeAlloc, allocDir, consulConfig) - require.EqualError(t, h.Prerun(), "consul address must be set on nomad client") + h := newConsulHTTPSocketHook(logger, connectNativeAlloc, allocDir, consulConfigs) + must.ErrorContains(t, h.Prerun(), "consul address must be set on nomad client") // Postrun should be a noop - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) } } diff --git a/client/allocrunner/taskrunner/envoy_version_hook.go b/client/allocrunner/taskrunner/envoy_version_hook.go index b34848b34..6c342614b 100644 --- a/client/allocrunner/taskrunner/envoy_version_hook.go +++ b/client/allocrunner/taskrunner/envoy_version_hook.go @@ -82,7 +82,7 @@ func (h *envoyVersionHook) Prestart(_ context.Context, request *ifs.TaskPrestart // We either need to acquire Consul's preferred Envoy version or fallback // to the legacy default. Query Consul and use the (possibly empty) result. // - // TODO: how do we select the right cluster here if we have multiple + // TODO(tgross): how do we select the right cluster here if we have multiple // services which could have their own cluster field value? proxies, err := h.proxiesClientFunc(structs.ConsulDefaultCluster).Proxies() if err != nil { diff --git a/client/allocrunner/taskrunner/sids_hook_test.go b/client/allocrunner/taskrunner/sids_hook_test.go index e617a313d..82890f5da 100644 --- a/client/allocrunner/taskrunner/sids_hook_test.go +++ b/client/allocrunner/taskrunner/sids_hook_test.go @@ -284,7 +284,7 @@ func TestTaskRunner_DeriveSIToken_UnWritableTokenFile(t *testing.T) { // set a consul token for the nomad client, which is what triggers the // SIDS hook to be applied - trConfig.ClientConfig.ConsulConfig.Token = uuid.Generate() + trConfig.ClientConfig.GetDefaultConsul().Token = uuid.Generate() // derive token works just fine deriveFn := func(*structs.Allocation, []string) (map[string]string, error) { diff --git a/client/allocrunner/taskrunner/task_runner_getters.go b/client/allocrunner/taskrunner/task_runner_getters.go index 94879402d..925d1cc5b 100644 --- a/client/allocrunner/taskrunner/task_runner_getters.go +++ b/client/allocrunner/taskrunner/task_runner_getters.go @@ -72,7 +72,7 @@ func (tr *TaskRunner) setVaultToken(token string) { // Update the task's environment taskNamespace := tr.task.Vault.Namespace - ns := tr.clientConfig.VaultConfig.Namespace + ns := tr.clientConfig.GetVaultConfigs(tr.logger)[tr.task.GetVaultClusterName()].Namespace if taskNamespace != "" { ns = taskNamespace } diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 6261dc9b5..806f429cf 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -142,9 +142,11 @@ func (tr *TaskRunner) initHooks() { // If this is a Connect sidecar proxy (or a Connect Native) service, // add the sidsHook for requesting a Service Identity token (if ACLs). if task.UsesConnect() { + tg := tr.Alloc().Job.LookupTaskGroup(tr.Alloc().TaskGroup) + // Enable the Service Identity hook only if the Nomad client is configured // with a consul token, indicating that Consul ACLs are enabled - if tr.clientConfig.ConsulConfig.Token != "" { + if tr.clientConfig.GetConsulConfigs(tr.logger)[task.GetConsulClusterName(tg)].Token != "" { tr.runnerHooks = append(tr.runnerHooks, newSIDSHook(sidsHookConfig{ alloc: tr.Alloc(), task: tr.Task(), @@ -158,11 +160,15 @@ func (tr *TaskRunner) initHooks() { if task.UsesConnectSidecar() { tr.runnerHooks = append(tr.runnerHooks, newEnvoyVersionHook(newEnvoyVersionHookConfig(alloc, tr.consulProxiesClientFunc, hookLogger)), - newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, tr.clientConfig.ConsulConfig, consulNamespace, hookLogger)), + newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, + tr.clientConfig.ConsulConfigs[task.GetConsulClusterName(tg)], + consulNamespace, + hookLogger)), ) } else if task.Kind.IsConnectNative() { tr.runnerHooks = append(tr.runnerHooks, newConnectNativeHook( - newConnectNativeHookConfig(alloc, tr.clientConfig.ConsulConfig, hookLogger), + newConnectNativeHookConfig(alloc, + tr.clientConfig.ConsulConfigs[task.GetConsulClusterName(tg)], hookLogger), )) } } diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 290bde161..2bd0741ba 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -122,7 +122,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri db := cstate.NewMemDB(logger) if thisTask.Vault != nil { - clientConf.VaultConfigs[structs.VaultDefaultCluster].Enabled = pointer.Of(true) + clientConf.GetDefaultVault().Enabled = pointer.Of(true) } var vaultFunc vaultclient.VaultClientFunc @@ -1433,7 +1433,7 @@ func TestTaskRunner_BlockForSIDSToken(t *testing.T) { // set a consul token on the Nomad client's consul config, because that is // what gates the action of requesting SI token(s) - trConfig.ClientConfig.ConsulConfig.Token = uuid.Generate() + trConfig.ClientConfig.GetDefaultConsul().Token = uuid.Generate() // control when we get a Consul SI token token := uuid.Generate() @@ -1497,7 +1497,7 @@ func TestTaskRunner_DeriveSIToken_Retry(t *testing.T) { // set a consul token on the Nomad client's consul config, because that is // what gates the action of requesting SI token(s) - trConfig.ClientConfig.ConsulConfig.Token = uuid.Generate() + trConfig.ClientConfig.GetDefaultConsul().Token = uuid.Generate() // control when we get a Consul SI token (recoverable failure on first call) token := uuid.Generate() @@ -1557,7 +1557,7 @@ func TestTaskRunner_DeriveSIToken_Unrecoverable(t *testing.T) { // set a consul token on the Nomad client's consul config, because that is // what gates the action of requesting SI token(s) - trConfig.ClientConfig.ConsulConfig.Token = uuid.Generate() + trConfig.ClientConfig.GetDefaultConsul().Token = uuid.Generate() // SI token derivation suffers a non-retryable error siClient := trConfig.ConsulSI.(*consulapi.MockServiceIdentitiesClient) diff --git a/client/allocrunner/taskrunner/template/template.go b/client/allocrunner/taskrunner/template/template.go index c2a6543c0..a5e038fc6 100644 --- a/client/allocrunner/taskrunner/template/template.go +++ b/client/allocrunner/taskrunner/template/template.go @@ -100,6 +100,10 @@ type TaskTemplateManagerConfig struct { // workload identity ConsulToken string + // ConsulConfig is the Consul configuration to use for this template. It may + // be nil if Nomad has no Consul cofiguration + ConsulConfig *structsc.ConsulConfig + // VaultToken is the Vault token for the task. VaultToken string @@ -819,8 +823,8 @@ func newRunnerConfig(config *TaskTemplateManagerConfig, } // Set up the Consul config - if cc.ConsulConfig != nil { - conf.Consul.Address = &cc.ConsulConfig.Addr + if config.ConsulConfig != nil { + conf.Consul.Address = &config.ConsulConfig.Addr // if we're using WI, use the token from consul_hook // NOTE: from Nomad 1.9 on, WI will be the only supported way of @@ -828,28 +832,28 @@ func newRunnerConfig(config *TaskTemplateManagerConfig, if config.ConsulToken != "" { conf.Consul.Token = &config.ConsulToken } else { - conf.Consul.Token = &cc.ConsulConfig.Token + conf.Consul.Token = &config.ConsulConfig.Token } // Get the Consul namespace from agent config. This is the lower level // of precedence (beyond default). - if cc.ConsulConfig.Namespace != "" { - conf.Consul.Namespace = &cc.ConsulConfig.Namespace + if config.ConsulConfig.Namespace != "" { + conf.Consul.Namespace = &config.ConsulConfig.Namespace } - if cc.ConsulConfig.EnableSSL != nil && *cc.ConsulConfig.EnableSSL { - verify := cc.ConsulConfig.VerifySSL != nil && *cc.ConsulConfig.VerifySSL + if config.ConsulConfig.EnableSSL != nil && *config.ConsulConfig.EnableSSL { + verify := config.ConsulConfig.VerifySSL != nil && *config.ConsulConfig.VerifySSL conf.Consul.SSL = &ctconf.SSLConfig{ Enabled: pointer.Of(true), Verify: &verify, - Cert: &cc.ConsulConfig.CertFile, - Key: &cc.ConsulConfig.KeyFile, - CaCert: &cc.ConsulConfig.CAFile, + Cert: &config.ConsulConfig.CertFile, + Key: &config.ConsulConfig.KeyFile, + CaCert: &config.ConsulConfig.CAFile, } } - if cc.ConsulConfig.Auth != "" { - parts := strings.SplitN(cc.ConsulConfig.Auth, ":", 2) + if config.ConsulConfig.Auth != "" { + parts := strings.SplitN(config.ConsulConfig.Auth, ":", 2) if len(parts) != 2 { return nil, fmt.Errorf("Failed to parse Consul Auth config") } diff --git a/client/allocrunner/taskrunner/template/template_test.go b/client/allocrunner/taskrunner/template/template_test.go index dcb1f378b..d617898e4 100644 --- a/client/allocrunner/taskrunner/template/template_test.go +++ b/client/allocrunner/taskrunner/template/template_test.go @@ -114,9 +114,10 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b if err != nil { t.Fatalf("error starting test Consul server: %v", err) } - harness.config.ConsulConfig = &sconfig.ConsulConfig{ - Addr: harness.consul.HTTPAddr, - } + harness.config.ConsulConfigs = map[string]*sconfig.ConsulConfig{ + structs.ConsulDefaultCluster: { + Addr: harness.consul.HTTPAddr, + }} } if vault { @@ -124,7 +125,6 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b harness.config.VaultConfigs = map[string]*sconfig.VaultConfig{ structs.VaultDefaultCluster: harness.vault.Config, } - harness.config.VaultConfig = harness.config.VaultConfigs[structs.VaultDefaultCluster] harness.vaultToken = harness.vault.RootToken } @@ -145,8 +145,9 @@ func (h *testHarness) startWithErr() error { Events: h.mockHooks, Templates: h.templates, ClientConfig: h.config, + ConsulConfig: h.config.GetDefaultConsul(), VaultToken: h.vaultToken, - VaultConfig: h.config.VaultConfigs[structs.VaultDefaultCluster], + VaultConfig: h.config.GetDefaultVault(), TaskDir: h.taskDir, EnvBuilder: h.envBuilder, MaxTemplateEventRate: h.emitRate, @@ -1716,20 +1717,19 @@ func TestTaskTemplateManager_Config_ServerName(t *testing.T) { TLSServerName: "notlocalhost", }, } - c.VaultConfig = c.VaultConfigs[structs.VaultDefaultCluster] config := &TaskTemplateManagerConfig{ ClientConfig: c, VaultToken: "token", - VaultConfig: c.VaultConfigs[structs.VaultDefaultCluster], + VaultConfig: c.GetDefaultVault(), } ctconf, err := newRunnerConfig(config, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } - if *ctconf.Vault.SSL.ServerName != c.VaultConfig.TLSServerName { - t.Fatalf("expected %q but found %q", c.VaultConfig.TLSServerName, *ctconf.Vault.SSL.ServerName) + if *ctconf.Vault.SSL.ServerName != c.GetDefaultVault().TLSServerName { + t.Fatalf("expected %q but found %q", c.GetDefaultVault().TLSServerName, *ctconf.Vault.SSL.ServerName) } } @@ -1750,13 +1750,12 @@ func TestTaskTemplateManager_Config_VaultNamespace(t *testing.T) { Namespace: testNS, }, } - c.VaultConfig = c.VaultConfigs[structs.VaultDefaultCluster] alloc := mock.Alloc() config := &TaskTemplateManagerConfig{ ClientConfig: c, VaultToken: "token", - VaultConfig: c.VaultConfigs[structs.VaultDefaultCluster], + VaultConfig: c.GetDefaultVault(), EnvBuilder: taskenv.NewBuilder(c.Node, alloc, alloc.Job.TaskGroups[0].Tasks[0], c.Region), } @@ -1785,7 +1784,6 @@ func TestTaskTemplateManager_Config_VaultNamespace_TaskOverride(t *testing.T) { Namespace: testNS, }, } - c.VaultConfig = c.VaultConfigs[structs.VaultDefaultCluster] alloc := mock.Alloc() overriddenNS := "new-namespace" @@ -1794,7 +1792,7 @@ func TestTaskTemplateManager_Config_VaultNamespace_TaskOverride(t *testing.T) { config := &TaskTemplateManagerConfig{ ClientConfig: c, VaultToken: "token", - VaultConfig: c.VaultConfigs[structs.VaultDefaultCluster], + VaultConfig: c.GetDefaultVault(), VaultNamespace: overriddenNS, EnvBuilder: taskenv.NewBuilder(c.Node, alloc, alloc.Job.TaskGroups[0].Tasks[0], c.Region), } @@ -2173,11 +2171,10 @@ func TestTaskTemplateManager_ClientTemplateConfig_Set(t *testing.T) { Namespace: testNS, }, } - clientConfig.VaultConfig = clientConfig.VaultConfigs[structs.VaultDefaultCluster] - - clientConfig.ConsulConfig = &sconfig.ConsulConfig{ - Namespace: testNS, - } + clientConfig.ConsulConfigs = map[string]*sconfig.ConsulConfig{ + structs.ConsulDefaultCluster: { + Namespace: testNS, + }} // helper to reduce boilerplate waitConfig := &config.WaitConfig{ @@ -2228,8 +2225,9 @@ func TestTaskTemplateManager_ClientTemplateConfig_Set(t *testing.T) { }, &TaskTemplateManagerConfig{ ClientConfig: clientConfig, + ConsulConfig: clientConfig.GetDefaultConsul(), VaultToken: "token", - VaultConfig: clientConfig.VaultConfigs[structs.VaultDefaultCluster], + VaultConfig: clientConfig.GetDefaultVault(), EnvBuilder: taskenv.NewBuilder(clientConfig.Node, alloc, alloc.Job.TaskGroups[0].Tasks[0], clientConfig.Region), }, &config.Config{ @@ -2262,8 +2260,9 @@ func TestTaskTemplateManager_ClientTemplateConfig_Set(t *testing.T) { }, &TaskTemplateManagerConfig{ ClientConfig: clientConfig, + ConsulConfig: clientConfig.GetDefaultConsul(), VaultToken: "token", - VaultConfig: clientConfig.VaultConfigs[structs.VaultDefaultCluster], + VaultConfig: clientConfig.GetDefaultVault(), EnvBuilder: taskenv.NewBuilder(clientConfig.Node, allocWithOverride, allocWithOverride.Job.TaskGroups[0].Tasks[0], clientConfig.Region), }, &config.Config{ @@ -2300,8 +2299,9 @@ func TestTaskTemplateManager_ClientTemplateConfig_Set(t *testing.T) { }, &TaskTemplateManagerConfig{ ClientConfig: clientConfig, + ConsulConfig: clientConfig.GetDefaultConsul(), VaultToken: "token", - VaultConfig: clientConfig.VaultConfigs[structs.VaultDefaultCluster], + VaultConfig: clientConfig.GetDefaultVault(), EnvBuilder: taskenv.NewBuilder(clientConfig.Node, allocWithOverride, allocWithOverride.Job.TaskGroups[0].Tasks[0], clientConfig.Region), Templates: []*structs.Template{ { diff --git a/client/allocrunner/taskrunner/template_hook.go b/client/allocrunner/taskrunner/template_hook.go index f8f7506f8..b993da44d 100644 --- a/client/allocrunner/taskrunner/template_hook.go +++ b/client/allocrunner/taskrunner/template_hook.go @@ -206,6 +206,10 @@ func (h *templateHook) newManager() (unblock chan struct{}, err error) { } } + tg := h.config.alloc.Job.LookupTaskGroup(h.config.alloc.TaskGroup) + consulCluster := h.task.GetConsulClusterName(tg) + consulConfig := h.config.clientConfig.GetConsulConfigs(h.logger)[consulCluster] + m, err := template.NewTaskTemplateManager(&template.TaskTemplateManagerConfig{ UnblockCh: unblock, Lifecycle: h.config.lifecycle, @@ -214,6 +218,7 @@ func (h *templateHook) newManager() (unblock chan struct{}, err error) { ClientConfig: h.config.clientConfig, ConsulNamespace: h.config.consulNamespace, ConsulToken: h.consulToken, + ConsulConfig: consulConfig, VaultToken: h.vaultToken, VaultConfig: vaultConfig, VaultNamespace: h.vaultNamespace, diff --git a/client/client.go b/client/client.go index d3481a06c..cb2ffe4db 100644 --- a/client/client.go +++ b/client/client.go @@ -568,7 +568,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie } // Setup Consul discovery if enabled - if cfg.ConsulConfig.ClientAutoJoin != nil && *cfg.ConsulConfig.ClientAutoJoin { + if cfg.GetDefaultConsul().ClientAutoJoin != nil && *cfg.GetDefaultConsul().ClientAutoJoin { c.shutdownGroup.Go(c.consulDiscovery) if c.servers.NumServers() == 0 { // No configured servers; trigger discovery manually @@ -3014,7 +3014,7 @@ func taskIsPresent(taskName string, tasks []*structs.Task) bool { // triggerDiscovery causes a Consul discovery to begin (if one hasn't already) func (c *Client) triggerDiscovery() { config := c.GetConfig() - if config.ConsulConfig.ClientAutoJoin != nil && *config.ConsulConfig.ClientAutoJoin { + if config.GetDefaultConsul() != nil && *config.GetDefaultConsul().ClientAutoJoin { select { case c.triggerDiscoveryCh <- struct{}{}: // Discovery goroutine was released to execute @@ -3059,7 +3059,7 @@ func (c *Client) consulDiscoveryImpl() error { dcs = dcs[0:min(len(dcs), datacenterQueryLimit)] } - serviceName := c.GetConfig().ConsulConfig.ServerServiceName + serviceName := c.GetConfig().GetDefaultConsul().ServerServiceName var mErr multierror.Error var nomadServers servers.Servers consulLogger.Debug("bootstrap contacting Consul DCs", "consul_dcs", dcs) diff --git a/client/config/config.go b/client/config/config.go index 7fcbe10a9..151e5db7e 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -170,19 +170,11 @@ type Config struct { // Version is the version of the Nomad client Version *version.VersionInfo - // ConsulConfig is this Agent's default Consul configuration - ConsulConfig *structsc.ConsulConfig - // ConsulConfigs is a map of Consul configurations, here to support features // in Nomad Enterprise. The default Consul config pointer above will be // found in this map under the name "default" ConsulConfigs map[string]*structsc.ConsulConfig - // VaultConfig is this Agent's default Vault configuration - // - // Deprecated: use GetVaultConfigs() instead. - VaultConfig *structsc.VaultConfig - // VaultConfigs is a map of Vault configurations, here to support features // in Nomad Enterprise. The default Vault config pointer above will be found // in this map under the name "default" @@ -760,9 +752,7 @@ func (c *Config) Copy() *Config { nc.Servers = slices.Clone(nc.Servers) nc.Options = maps.Clone(nc.Options) nc.HostVolumes = structs.CopyMapStringClientHostVolumeConfig(nc.HostVolumes) - nc.ConsulConfig = c.ConsulConfig.Copy() nc.ConsulConfigs = helper.DeepCopyMap(c.ConsulConfigs) - nc.VaultConfig = c.VaultConfig.Copy() nc.VaultConfigs = helper.DeepCopyMap(c.VaultConfigs) nc.TemplateConfig = c.TemplateConfig.Copy() nc.ReservableCores = slices.Clone(c.ReservableCores) @@ -773,9 +763,11 @@ func (c *Config) Copy() *Config { // DefaultConfig returns the default configuration func DefaultConfig() *Config { cfg := &Config{ - Version: version.GetVersion(), - VaultConfig: structsc.DefaultVaultConfig(), - ConsulConfig: structsc.DefaultConsulConfig(), + Version: version.GetVersion(), + VaultConfigs: map[string]*structsc.VaultConfig{ + structs.VaultDefaultCluster: structsc.DefaultVaultConfig()}, + ConsulConfigs: map[string]*structsc.ConsulConfig{ + structs.ConsulDefaultCluster: structsc.DefaultConsulConfig()}, Region: "global", StatsCollectionInterval: 1 * time.Second, TLSConfig: &structsc.TLSConfig{}, @@ -815,11 +807,6 @@ func DefaultConfig() *Config { MinDynamicPort: structs.DefaultMaxDynamicPort, } - cfg.ConsulConfigs = map[string]*structsc.ConsulConfig{ - structs.ConsulDefaultCluster: cfg.ConsulConfig} - cfg.VaultConfigs = map[string]*structsc.VaultConfig{ - structs.VaultDefaultCluster: cfg.VaultConfig} - return cfg } @@ -960,3 +947,11 @@ func (c *Config) NomadPluginConfig(topology *numalib.Topology) *base.AgentConfig }, } } + +func (c *Config) GetDefaultConsul() *structsc.ConsulConfig { + return c.ConsulConfigs[structs.ConsulDefaultCluster] +} + +func (c *Config) GetDefaultVault() *structsc.VaultConfig { + return c.VaultConfigs[structs.VaultDefaultCluster] +} diff --git a/client/config/config_ce.go b/client/config/config_ce.go index 3fd4593c4..ed1c86008 100644 --- a/client/config/config_ce.go +++ b/client/config/config_ce.go @@ -7,29 +7,26 @@ package config import ( "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/nomad/structs/config" structsc "github.com/hashicorp/nomad/nomad/structs/config" ) // GetVaultConfigs returns the set of Vault configurations available for this // client. In Nomad CE we only use the default Vault. func (c *Config) GetVaultConfigs(logger hclog.Logger) map[string]*structsc.VaultConfig { - if c.VaultConfig == nil || !c.VaultConfig.IsEnabled() { + if c.VaultConfigs["default"] == nil || !c.VaultConfigs["default"].IsEnabled() { return nil } if len(c.VaultConfigs) > 1 { logger.Warn("multiple Vault configurations are only supported in Nomad Enterprise") } - - return map[string]*structsc.VaultConfig{structs.VaultDefaultCluster: c.VaultConfig} + return c.VaultConfigs } // GetConsulConfigs returns the set of Consul configurations the fingerprint needs // to check. In Nomad CE we only check the default Consul. func (c *Config) GetConsulConfigs(logger hclog.Logger) map[string]*structsc.ConsulConfig { - if c.ConsulConfig == nil { + if c.ConsulConfigs["default"] == nil { return nil } @@ -37,5 +34,5 @@ func (c *Config) GetConsulConfigs(logger hclog.Logger) map[string]*structsc.Cons logger.Warn("multiple Consul configurations are only supported in Nomad Enterprise") } - return map[string]*config.ConsulConfig{structs.ConsulDefaultCluster: c.ConsulConfig} + return c.ConsulConfigs } diff --git a/client/config/testing.go b/client/config/testing.go index 156b5b54b..bff1e2e61 100644 --- a/client/config/testing.go +++ b/client/config/testing.go @@ -14,7 +14,6 @@ import ( "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" testing "github.com/mitchellh/go-testing-interface" ) @@ -69,8 +68,7 @@ func TestClientConfig(t testing.T) (*Config, func()) { // Helps make sure we are respecting configured parent conf.CgroupParent = "testing.slice" - conf.VaultConfig.Enabled = pointer.Of(false) - conf.VaultConfigs[structs.VaultDefaultCluster].Enabled = pointer.Of(false) + conf.GetDefaultVault().Enabled = pointer.Of(false) conf.DevMode = true // Loosen GC threshold diff --git a/client/fingerprint/consul_test.go b/client/fingerprint/consul_test.go index 76bd197b9..3bc7db5fe 100644 --- a/client/fingerprint/consul_test.go +++ b/client/fingerprint/consul_test.go @@ -37,7 +37,7 @@ func fakeConsul(payload string) (*httptest.Server, *config.Config) { })) cfg := config.DefaultConfig() - cfg.ConsulConfig.Addr = strings.TrimPrefix(ts.URL, `http://`) + cfg.GetDefaultConsul().Addr = strings.TrimPrefix(ts.URL, `http://`) return ts, cfg } diff --git a/client/fingerprint/vault_test.go b/client/fingerprint/vault_test.go index 8b033c015..15b573efc 100644 --- a/client/fingerprint/vault_test.go +++ b/client/fingerprint/vault_test.go @@ -34,7 +34,7 @@ func TestVaultFingerprint(t *testing.T) { } conf := config.DefaultConfig() - conf.VaultConfig = tv.Config + conf.VaultConfigs[structs.VaultDefaultCluster] = tv.Config request := &FingerprintRequest{Config: conf, Node: node} var response FingerprintResponse diff --git a/client/gc_test.go b/client/gc_test.go index 2d981796a..fa7ebdffb 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -378,7 +378,7 @@ func TestAllocGarbageCollector_MakeRoomFor_MaxAllocs(t *testing.T) { c.GCInterval = time.Hour c.RPCHandler = server c.Servers = []string{serverAddr} - c.ConsulConfig.ClientAutoJoin = new(bool) + c.GetDefaultConsul().ClientAutoJoin = new(bool) }) defer cleanup() waitTilNodeReady(client, t) diff --git a/command/agent/agent.go b/command/agent/agent.go index 5516ee04a..e5dfbaeca 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -825,15 +825,9 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { return nil, fmt.Errorf("client_service_name must be set when auto_advertise is enabled") } - conf.ConsulConfig = agentConfig.Consul - for _, consulConfig := range agentConfig.Consuls { - conf.ConsulConfigs[consulConfig.Name] = consulConfig - } - - conf.VaultConfig = agentConfig.Vault - for _, vaultConfig := range agentConfig.Vaults { - conf.VaultConfigs[vaultConfig.Name] = vaultConfig - } + // Set the Vault and Consul configurations + conf.ConsulConfigs = agentConfig.Consuls + conf.VaultConfigs = agentConfig.Vaults // Set up Telemetry configuration conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index dfa6d200d..57c4657e6 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -66,9 +66,9 @@ func TestConsul_Integration(t *testing.T) { conf := config.DefaultConfig() conf.Node = mock.Node() - conf.ConsulConfig.Addr = testconsul.HTTPAddr + conf.GetDefaultConsul().Addr = testconsul.HTTPAddr conf.APIListenerRegistrar = config.NoopAPIListenerRegistrar{} - consulConfig, err := conf.ConsulConfig.ApiConfig() + consulConfig, err := conf.GetDefaultConsul().ApiConfig() if err != nil { t.Fatalf("error generating consul config: %v", err) }