encrypter: Track initial tasks for is ready calculation. (#25803)

The server startup could "hang" to the view of an operator if it
had a key that could not be decrypted or replicated loaded from
the FSM at startup.

In order to prevent this happening, the server startup function
will now use a timeout to wait for the encrypter to be ready. If
the timeout is reached, the error is sent back to the caller which
fails the CLI command. This bubbling of error message will also
flush to logs which will provide addition operator feedback.

The server only cares about keys loaded from the FSM snapshot and
trailing logs before the encrypter should be classed as ready. So
that the encrypter ready function does not get blocked by keys
added outside of the initial Raft load, we take a snapshot of the
decryption tasks as we enter the blocking call, and class these as
our barrier.
This commit is contained in:
James Rasell
2025-05-07 15:38:16 +01:00
committed by GitHub
parent 3690a0118e
commit 0b265d2417
13 changed files with 218 additions and 11 deletions

3
.changelog/25803.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
server: Added a new server configuration option named `start_timeout` with a default value of `30s`. This duration is used to monitor the server setup and startup processes which must complete before it is considered healthy, such as keyring decryption. If these processes do not complete before the timeout is reached, the server process will exit.
```

View File

@@ -637,6 +637,16 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
conf.KEKProviderConfigs = agentConfig.KEKProviders
if startTimeout := agentConfig.Server.StartTimeout; startTimeout != "" {
dur, err := time.ParseDuration(startTimeout)
if err != nil {
return nil, fmt.Errorf("failed to parse start_timeout: %v", err)
} else if dur <= time.Duration(0) {
return nil, fmt.Errorf("start_timeout should be greater than 0s")
}
conf.StartTimeout = dur
}
// Ensure the passed number of scheduler is between the bounds of zero and
// the number of CPU cores on the machine. The runtime CPU count object is
// populated at process start time, so there is no overhead in calling the

View File

@@ -738,8 +738,14 @@ type ServerConfig struct {
// OIDCIssuer if set enables OIDC Discovery and uses this value as the
// issuer. Third parties such as AWS IAM OIDC Provider expect the issuer to
// be a publically accessible HTTPS URL signed by a trusted well-known CA.
// be a publicly accessible HTTPS URL signed by a trusted well-known CA.
OIDCIssuer string `hcl:"oidc_issuer"`
// StartTimeout is a time duration such as "30s" or "1h". It is provided to
// the server so that it can time out setup and startup process that are
// expected to complete before the server is considered healthy. Without
// this, the server can hang indefinitely waiting for these.
StartTimeout string `hcl:"start_timeout"`
}
func (s *ServerConfig) Copy() *ServerConfig {
@@ -2421,6 +2427,9 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.OIDCIssuer != "" {
result.OIDCIssuer = b.OIDCIssuer
}
if b.StartTimeout != "" {
result.StartTimeout = b.StartTimeout
}
// Add the schedulers
result.EnabledSchedulers = append(result.EnabledSchedulers, b.EnabledSchedulers...)

View File

@@ -158,6 +158,7 @@ var basicConfig = &Config{
LicensePath: "/tmp/nomad.hclic",
JobDefaultPriority: pointer.Of(100),
JobMaxPriority: pointer.Of(200),
StartTimeout: "1m",
},
ACL: &ACLConfig{
Enabled: true,

View File

@@ -160,7 +160,8 @@ func TestConfig_Merge(t *testing.T) {
NodeThreshold: 100,
NodeWindow: 11 * time.Minute,
},
OIDCIssuer: "https://oidc.test.nomadproject.io",
OIDCIssuer: "https://oidc.test.nomadproject.io",
StartTimeout: "45s",
},
ACL: &ACLConfig{
Enabled: true,
@@ -382,6 +383,7 @@ func TestConfig_Merge(t *testing.T) {
JobMaxPriority: pointer.Of(200),
JobDefaultPriority: pointer.Of(100),
OIDCIssuer: "https://oidc.test.nomadproject.io",
StartTimeout: "1m",
},
ACL: &ACLConfig{
Enabled: true,

View File

@@ -143,6 +143,7 @@ server {
event_buffer_size = 200
job_default_priority = 100
job_max_priority = 200
start_timeout = "1m"
plan_rejection_tracker {
enabled = true

View File

@@ -363,7 +363,8 @@
"upgrade_version": "0.8.0",
"license_path": "/tmp/nomad.hclic",
"job_default_priority": 100,
"job_max_priority": 200
"job_max_priority": 200,
"start_timeout": "1m"
}
],
"syslog_facility": "LOCAL1",

View File

@@ -442,6 +442,12 @@ type Config struct {
// KEKProviders are used to wrap the Nomad keyring
KEKProviderConfigs []*structs.KEKProviderConfig
// StartTimeout is provided to the server so that it can time out setup and
// startup process that are expected to complete before the server is
// considered healthy. Without this, the server can hang indefinitely
// waiting for these.
StartTimeout time.Duration
}
func (c *Config) Copy() *Config {
@@ -650,6 +656,7 @@ func DefaultConfig() *Config {
JobDefaultPriority: structs.JobDefaultPriority,
JobMaxPriority: structs.JobDefaultMaxPriority,
JobTrackedVersions: structs.JobDefaultTrackedVersions,
StartTimeout: 30 * time.Second,
}
// Enable all known schedulers by default

View File

@@ -203,18 +203,50 @@ func (e *Encrypter) loadKeystore() error {
})
}
// IsReady blocks until all decrypt tasks are complete, or the context expires.
// IsReady blocks until all in-flight decrypt tasks are complete, or the context
// expires.
func (e *Encrypter) IsReady(ctx context.Context) error {
// Generate a list of the existing decryption tasks. These tasks are the
// ones we must wait to finish. This function is called from the server when
// it is being set up and started. Raft would have already loaded the keys
// from the snapshot and trailing logs, so the encrypter is populated with
// everything we immediately care about.
e.decryptTasksLock.RLock()
basePendingTasks := make([]string, 0, len(e.decryptTasks))
for id := range e.decryptTasks {
basePendingTasks = append(basePendingTasks, id)
}
e.decryptTasksLock.RUnlock()
err := helper.WithBackoffFunc(ctx, time.Millisecond*100, time.Second, func() error {
var currentPendingTasks []string
e.decryptTasksLock.RLock()
defer e.decryptTasksLock.RUnlock()
if len(e.decryptTasks) != 0 {
keyIDs := []string{}
for keyID := range e.decryptTasks {
keyIDs = append(keyIDs, keyID)
for _, id := range basePendingTasks {
if _, ok := e.decryptTasks[id]; ok {
currentPendingTasks = append(currentPendingTasks, id)
}
}
// If we have decryption tasks which are still running that we care
// about, log about this as well as return an error. If key decryption
// progresses over time, an operator will be able to identify any
// long-running tasks. If the timeout is reached, the final error is
// sent to the caller which identifies the tasks that are taking too
// long.
if l := len(currentPendingTasks); l > 0 {
e.log.Debug("waiting for keyring to be ready",
"num_tasks", l, "key_ids", currentPendingTasks)
return fmt.Errorf("keyring is not ready - waiting for keys %s",
strings.Join(keyIDs, ", "))
strings.Join(currentPendingTasks, ", "))
}
return nil
})

View File

@@ -838,6 +838,111 @@ func TestEncrypter_TransitConfigFallback(t *testing.T) {
must.Eq(t, expect, providers[2].Config, must.Sprint("expected fallback to env"))
}
func TestEncrypter_IsReady_noTasks(t *testing.T) {
ci.Parallel(t)
srv := &Server{
logger: testlog.HCLogger(t),
config: &Config{},
}
encrypter, err := NewEncrypter(srv, t.TempDir())
must.NoError(t, err)
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
t.Cleanup(timeoutCancel)
must.NoError(t, encrypter.IsReady(timeoutCtx))
}
func TestEncrypter_IsReady_eventuallyReady(t *testing.T) {
ci.Parallel(t)
srv := &Server{
logger: testlog.HCLogger(t),
config: &Config{},
}
encrypter, err := NewEncrypter(srv, t.TempDir())
must.NoError(t, err)
// Add an initial decryption task to the encrypter. This simulates a key
// restored from the Raft state (snapshot or trailing logs) as the server is
// starting.
encrypter.decryptTasks["id1"] = struct{}{}
// Generate a timeout value that will be used to create the context passed
// to the encrypter. Changing this value should not impact the test except
// for its run length as other trigger values are calculated using this.
timeout := 2 * time.Second
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(timeoutCancel)
// Launch a goroutine to monitor the readiness of the encrypter. Any
// response is sent on the channel, so we can interrogate it.
respCh := make(chan error)
go func() {
respCh <- encrypter.IsReady(timeoutCtx)
}()
// Create a timer at 1/3 the value of the timeout. When this triggers, we
// add a new decryption task to the encrypter. This simulates Nomad
// upserting a new key into state which was not part of the original
// snapshot or trailing logs and therefore should not block the readiness
// check.
taskAddTimer, stop := helper.NewSafeTimer(timeout / 3)
t.Cleanup(stop)
// Create a timer at half the value of the timeout. When this triggers, we
// will remove the task from the encrypter simulating it finishing and the
// encrypter becoming ready.
taskDeleteTimer, stop := helper.NewSafeTimer(timeout / 2)
t.Cleanup(stop)
select {
case <-taskAddTimer.C:
encrypter.decryptTasksLock.Lock()
encrypter.decryptTasks["id2"] = struct{}{}
encrypter.decryptTasksLock.Unlock()
case <-taskDeleteTimer.C:
encrypter.decryptTasksLock.Lock()
delete(encrypter.decryptTasks, "id1")
encrypter.decryptTasksLock.Unlock()
case err := <-respCh:
must.NoError(t, err)
encrypter.decryptTasksLock.RLock()
must.MapLen(t, 1, encrypter.decryptTasks)
must.MapContainsKey(t, encrypter.decryptTasks, "id2")
encrypter.decryptTasksLock.RUnlock()
}
}
func TestEncrypter_IsReady_timeout(t *testing.T) {
ci.Parallel(t)
srv := &Server{
logger: testlog.HCLogger(t),
config: &Config{},
}
encrypter, err := NewEncrypter(srv, t.TempDir())
must.NoError(t, err)
// Add some tasks to the encrypter that we will never remove.
encrypter.decryptTasks["id1"] = struct{}{}
encrypter.decryptTasks["id2"] = struct{}{}
// Generate a timeout context that allows the backoff to trigger a few times
// before being canceled.
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 1*time.Second)
t.Cleanup(timeoutCancel)
err = encrypter.IsReady(timeoutCtx)
must.ErrorContains(t, err, "keys id1, id2")
}
func TestEncrypter_AddWrappedKey_zeroDecryptTaskError(t *testing.T) {
ci.Parallel(t)

View File

@@ -368,6 +368,11 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc
s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
s.shutdownCh = s.shutdownCtx.Done()
// Generate a timeout context for the server process which we wait for and
// can time out on.
startupTimeout, startupCancel := context.WithTimeout(s.shutdownCtx, s.config.StartTimeout)
defer startupCancel()
// Create an eval broker
evalBroker, err := NewEvalBroker(
s.shutdownCtx,
@@ -549,8 +554,16 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc
// exist before it can start.
s.keyringReplicator = NewKeyringReplicator(s, encrypter)
// Block until keys are decrypted
s.encrypter.IsReady(s.shutdownCtx)
// Wait for the keyring to be ready. This is a blocking call and will
// time out if the keyring takes too long to decrypt its initial set of
// keys.
//
// In the event of a timeout, we shut down the server and return an error to
// the caller which will include what keys were not decrypted.
if err := s.encrypter.IsReady(startupTimeout); err != nil {
_ = s.Shutdown()
return nil, fmt.Errorf("failed to wait for keyring decryption to complete: %v", err)
}
// Done
return s, nil

View File

@@ -258,6 +258,12 @@ server {
fields may directly specify the server address or use go-discover syntax for
auto-discovery. Refer to the [server_join documentation][server-join] for more detail.
- `start_timeout` `(string: "30s")` - A timeout applied to the server setup and
startup processes. These processes (keyring decryption) are expected to
complete before the server is considered healthy, and if the timeout is
reached before they are completed, the server will exit. Without this, the
server can hang indefinitely waiting for these.
- `upgrade_version` `(string: "")` - A custom version of the format X.Y.Z to use
in place of the Nomad version when custom upgrades are enabled in Autopilot.
For more information, refer to the [Autopilot Guide](/nomad/tutorials/manage-clusters/autopilot).

View File

@@ -28,6 +28,13 @@ could lead to agents running but unable to communicate. Any other errors when
parsing the new configuration are logged and the reload is aborted, consistent
with the current behavior.
#### Added Server `start_timeout` Configuration Option
Nomad 1.10.1 introduces a new server configuration option named `start_timeout`
with a default value of `30s`. This duration is used to monitor the server setup
and startup processes which must complete before it is considered healthy, such
as keyring decryption. If these processes do not complete before the timeout is
reached, the server process will exit and any errors logged to the console.
## Nomad 1.10.0
@@ -47,6 +54,16 @@ with the current behavior.
@include 'release-notes/v1-10/breaking-vault-consul-token.mdx'
## Nomad 1.9.9
#### Added Server `start_timeout` Configuration Option
Nomad 1.9.9 introduces a new server configuration option named `start_timeout`
with a default value of `30s`. This duration is used to monitor the server setup
and startup processes which must complete before it is considered healthy, such
as keyring decryption. If these processes do not complete before the timeout is
reached, the server process will exit and any errors logged to the console.
## Nomad 1.9.5
#### CNI plugins