Files
nomad/client/allocrunner/alloc_runner_hooks.go
Tim Gross df67e74615 Consul: add preflight checks for Envoy bootstrap (#23381)
Nomad creates Consul ACL tokens and service registrations to support Consul
service mesh workloads, before bootstrapping the Envoy proxy. Nomad always talks
to the local Consul agent and never directly to the Consul servers. But the
local Consul agent talks to the Consul servers in stale consistency mode to
reduce load on the servers. This can result in the Nomad client making the Envoy
bootstrap request with a tokens or services that have not yet replicated to the
follower that the local client is connected to. This request gets a 404 on the
ACL token and that negative entry gets cached, preventing any retries from
succeeding.

To workaround this, we'll use a method described by our friends over on
`consul-k8s` where after creating the objects in Consul we try to read them from
the local agent in stale consistency mode (which prevents a failed read from
being cached). This cannot completely eliminate this source of error because
it's possible that Consul cluster replication is unhealthy at the time we need
it, but this should make Envoy bootstrap significantly more robust.

This changset adds preflight checks for the objects we create in Consul:
* We add a preflight check for ACL tokens after we login via via Workload
  Identity and in the function we use to derive tokens in the legacy
  workflow. We do this check early because we also want to use this token for
  registering group services in the allocrunner hooks.
* We add a preflight check for services right before we bootstrap Envoy in the
  taskrunner hook, so that we have time for our service client to batch updates
  to the local Consul agent in addition to the local agent sync.

We've added the timeouts to be configurable via node metadata rather than the
usual static configuration because for most cases, users should not need to
touch or even know these values are configurable; the configuration is mostly
available for testing.


Fixes: https://github.com/hashicorp/nomad/issues/9307
Fixes: https://github.com/hashicorp/nomad/issues/10451
Fixes: https://github.com/hashicorp/nomad/issues/20516

Ref: https://github.com/hashicorp/consul-k8s/pull/887
Ref: https://hashicorp.atlassian.net/browse/NET-10051
Ref: https://hashicorp.atlassian.net/browse/NET-9273
Follow-up: https://hashicorp.atlassian.net/browse/NET-10138
2024-06-27 10:15:37 -04:00

396 lines
11 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package allocrunner
import (
"fmt"
"time"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
)
// allocHealthSetter is a shim to allow the alloc health watcher hook to set
// and clear the alloc health without full access to the alloc runner state
type allocHealthSetter struct {
ar *allocRunner
}
// HasHealth returns true if a deployment status is already set.
func (a *allocHealthSetter) HasHealth() bool {
a.ar.stateLock.Lock()
defer a.ar.stateLock.Unlock()
return a.ar.state.DeploymentStatus.HasHealth()
}
// ClearHealth allows the health watcher hook to clear the alloc's deployment
// health if the deployment id changes. It does not update the server as the
// status is only cleared when already receiving an update from the server.
//
// Only for use by health hook.
func (a *allocHealthSetter) ClearHealth() {
a.ar.stateLock.Lock()
a.ar.state.ClearDeploymentStatus()
a.ar.persistDeploymentStatus(nil)
a.ar.stateLock.Unlock()
}
// SetHealth allows the health watcher hook to set the alloc's
// deployment/migration health and emit task events.
//
// Only for use by health hook.
func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents map[string]*structs.TaskEvent) {
// Updating alloc deployment state is tricky because it may be nil, but
// if it's not then we need to maintain the values of Canary and
// ModifyIndex as they're only mutated by the server.
a.ar.stateLock.Lock()
a.ar.state.SetDeploymentStatus(time.Now(), healthy)
a.ar.persistDeploymentStatus(a.ar.state.DeploymentStatus)
terminalDesiredState := a.ar.Alloc().ServerTerminalStatus()
a.ar.stateLock.Unlock()
// If deployment is unhealthy emit task events explaining why
if !healthy && isDeploy && !terminalDesiredState {
for task, event := range trackerTaskEvents {
if tr, ok := a.ar.tasks[task]; ok {
// Append but don't emit event since the server
// will be updated below
tr.AppendEvent(event)
}
}
}
// Gather the state of the other tasks
states := make(map[string]*structs.TaskState, len(a.ar.tasks))
for name, tr := range a.ar.tasks {
states[name] = tr.TaskState()
}
// Build the client allocation
calloc := a.ar.clientAlloc(states)
// Update the server
a.ar.stateUpdater.AllocStateUpdated(calloc)
// Broadcast client alloc to listeners
a.ar.allocBroadcaster.Send(calloc)
}
// initRunnerHooks initializes the runners hooks.
func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
hookLogger := ar.logger.Named("runner_hook")
// create health setting shim
hs := &allocHealthSetter{ar}
// create network isolation setting shim
ns := &allocNetworkIsolationSetter{ar: ar}
// build the network manager
nm, err := newNetworkManager(ar.Alloc(), ar.driverManager)
if err != nil {
return fmt.Errorf("failed to configure network manager: %v", err)
}
// create network configurator
nc, err := newNetworkConfigurator(hookLogger, ar.Alloc(), config)
if err != nil {
return fmt.Errorf("failed to initialize network configurator: %v", err)
}
// Create a new taskenv.Builder which is used by hooks that mutate them to
// build new taskenv.TaskEnv.
newEnvBuilder := func() *taskenv.Builder {
return taskenv.NewBuilder(
config.Node,
ar.Alloc(),
nil,
config.Region,
).SetAllocDir(ar.allocDir.AllocDirPath())
}
// Create a *taskenv.TaskEnv which is used for read only purposes by the
// newNetworkHook and newChecksHook.
builtTaskEnv := newEnvBuilder().Build()
// Create the alloc directory hook. This is run first to ensure the
// directory path exists for other hooks.
alloc := ar.Alloc()
ar.runnerHooks = []interfaces.RunnerHook{
newIdentityHook(hookLogger, ar.widmgr),
newAllocDirHook(hookLogger, ar.allocDir),
newConsulHook(consulHookConfig{
alloc: ar.alloc,
allocdir: ar.allocDir,
widmgr: ar.widmgr,
consulConfigs: ar.clientConfig.GetConsulConfigs(hookLogger),
consulClientConstructor: consul.NewConsulClientFactory(config),
hookResources: ar.hookResources,
envBuilder: newEnvBuilder,
logger: hookLogger,
}),
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newCPUPartsHook(hookLogger, ar.partitions, alloc),
newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulServicesHandler, ar.checkStore),
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
providerNamespace: alloc.ServiceProviderNamespace(),
serviceRegWrapper: ar.serviceRegWrapper,
hookResources: ar.hookResources,
restarter: ar,
taskEnvBuilder: newEnvBuilder(),
networkStatus: ar,
logger: hookLogger,
shutdownDelayCtx: ar.shutdownDelayCtx,
}),
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir,
config.GetConsulConfigs(ar.logger), config.Node.Attributes),
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir,
config.GetConsulConfigs(ar.logger)),
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID),
newChecksHook(hookLogger, alloc, ar.checkStore, ar, builtTaskEnv),
}
if config.ExtraAllocHooks != nil {
ar.runnerHooks = append(ar.runnerHooks, config.ExtraAllocHooks...)
}
return nil
}
// prerun is used to run the runners prerun hooks.
func (ar *allocRunner) prerun() error {
if ar.logger.IsTrace() {
start := time.Now()
ar.logger.Trace("running pre-run hooks", "start", start)
defer func() {
end := time.Now()
ar.logger.Trace("finished pre-run hooks", "end", end, "duration", end.Sub(start))
}()
}
for _, hook := range ar.runnerHooks {
pre, ok := hook.(interfaces.RunnerPrerunHook)
if !ok {
continue
}
name := pre.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running pre-run hook", "name", name, "start", start)
}
if err := pre.Prerun(); err != nil {
return fmt.Errorf("pre-run hook %q failed: %v", name, err)
}
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished pre-run hook", "name", name, "end", end, "duration", end.Sub(start))
}
}
return nil
}
// update runs the alloc runner update hooks. Update hooks are run
// asynchronously with all other alloc runner operations.
func (ar *allocRunner) update(update *structs.Allocation) error {
if ar.logger.IsTrace() {
start := time.Now()
ar.logger.Trace("running update hooks", "start", start)
defer func() {
end := time.Now()
ar.logger.Trace("finished update hooks", "end", end, "duration", end.Sub(start))
}()
}
req := &interfaces.RunnerUpdateRequest{
Alloc: update,
}
var merr multierror.Error
for _, hook := range ar.runnerHooks {
h, ok := hook.(interfaces.RunnerUpdateHook)
if !ok {
continue
}
name := h.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running update hook", "name", name, "start", start)
}
if err := h.Update(req); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("update hook %q failed: %v", name, err))
}
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished update hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return merr.ErrorOrNil()
}
// postrun is used to run the runners postrun hooks.
func (ar *allocRunner) postrun() error {
if ar.logger.IsTrace() {
start := time.Now()
ar.logger.Trace("running post-run hooks", "start", start)
defer func() {
end := time.Now()
ar.logger.Trace("finished post-run hooks", "end", end, "duration", end.Sub(start))
}()
}
for _, hook := range ar.runnerHooks {
post, ok := hook.(interfaces.RunnerPostrunHook)
if !ok {
continue
}
name := post.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running post-run hook", "name", name, "start", start)
}
if err := post.Postrun(); err != nil {
return fmt.Errorf("hook %q failed: %v", name, err)
}
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished post-run hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return nil
}
// destroy is used to run the runners destroy hooks. All hooks are run and
// errors are returned as a multierror.
func (ar *allocRunner) destroy() error {
if ar.logger.IsTrace() {
start := time.Now()
ar.logger.Trace("running destroy hooks", "start", start)
defer func() {
end := time.Now()
ar.logger.Trace("finished destroy hooks", "end", end, "duration", end.Sub(start))
}()
}
var merr multierror.Error
for _, hook := range ar.runnerHooks {
h, ok := hook.(interfaces.RunnerDestroyHook)
if !ok {
continue
}
name := h.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running destroy hook", "name", name, "start", start)
}
if err := h.Destroy(); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("destroy hook %q failed: %v", name, err))
}
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished destroy hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return merr.ErrorOrNil()
}
func (ar *allocRunner) preKillHooks() {
for _, hook := range ar.runnerHooks {
pre, ok := hook.(interfaces.RunnerPreKillHook)
if !ok {
continue
}
name := pre.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running alloc pre shutdown hook", "name", name, "start", start)
}
pre.PreKill()
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished alloc pre shutdown hook", "name", name, "end", end, "duration", end.Sub(start))
}
}
}
// shutdownHooks calls graceful shutdown hooks for when the agent is exiting.
func (ar *allocRunner) shutdownHooks() {
for _, hook := range ar.runnerHooks {
sh, ok := hook.(interfaces.ShutdownHook)
if !ok {
continue
}
name := sh.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running shutdown hook", "name", name, "start", start)
}
sh.Shutdown()
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished shutdown hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
}
func (ar *allocRunner) taskRestartHooks() {
for _, hook := range ar.runnerHooks {
re, ok := hook.(interfaces.RunnerTaskRestartHook)
if !ok {
continue
}
name := re.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running alloc task restart hook",
"name", name, "start", start)
}
re.PreTaskRestart()
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished alloc task restart hook",
"name", name, "end", end, "duration", end.Sub(start))
}
}
}