mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
fingerprint: initial fingerprint of Vault/Consul should be periodic (#25102)
In #24526 we updated the Consul and Vault fingerprints so that they are no longer periodic. This fixed a problem that cluster admins reported where rolling updates of Vault or Consul would cause a thundering herd of fingerprint updates across the whole cluster. But if Consul/Vault is not available during the initial fingerprint, it will never get fingerprinted again. This is challenging for cluster updates and black starts because the implicit service startup ordering may require reloads. Instead, have the fingerprinter run periodically but mark that it has made its first successful fingerprint of all Consul/Vault clusters. At that point, we can skip further periodic updates. The `Reload` method will reset the mark and allow the subsequent fingerprint to run normally. Fixes: https://github.com/hashicorp/nomad/issues/25097 Ref: https://github.com/hashicorp/nomad/pull/24526 Ref: https://github.com/hashicorp/nomad/issues/24049
This commit is contained in:
3
.changelog/25102.txt
Normal file
3
.changelog/25102.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:bug
|
||||
fingerprint: Fixed a bug where Consul/Vault would never be fingerprinted if not available on agent start
|
||||
```
|
||||
@@ -4,9 +4,8 @@ Documentation=https://nomadproject.io/docs/
|
||||
Wants=network-online.target
|
||||
After=network-online.target
|
||||
|
||||
# When using Nomad with Consul it is not necessary to start Consul first. These
|
||||
# lines start Consul before Nomad as an optimization to avoid Nomad logging
|
||||
# that Consul is unavailable at startup.
|
||||
# When using Nomad with Consul you should start Consul first, so that running
|
||||
# allocations using Consul are restored correctly during startup.
|
||||
#Wants=consul.service
|
||||
#After=consul.service
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"net/netip"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
@@ -35,6 +36,13 @@ type ConsulFingerprint struct {
|
||||
// clusters maintains the latest fingerprinted state for each cluster
|
||||
// defined in nomad consul client configuration(s).
|
||||
clusters map[string]*consulState
|
||||
|
||||
// Once initial fingerprints are complete, we no-op all periodic
|
||||
// fingerprints to prevent Consul availability issues causing a thundering
|
||||
// herd of node updates. This behavior resets if we reload the
|
||||
// configuration.
|
||||
initialResponse *FingerprintResponse
|
||||
initialResponseLock sync.RWMutex
|
||||
}
|
||||
|
||||
type consulState struct {
|
||||
@@ -43,6 +51,10 @@ type consulState struct {
|
||||
// readers associates a function used to parse the value associated
|
||||
// with the given key from a consul api response
|
||||
readers map[string]valueReader
|
||||
|
||||
// tracks that we've successfully fingerprinted this cluster at least once
|
||||
// since the last Fingerprint call
|
||||
fingerprintedOnce bool
|
||||
}
|
||||
|
||||
// valueReader is used to parse out one attribute from consulInfo. Returns
|
||||
@@ -58,6 +70,10 @@ func NewConsulFingerprint(logger hclog.Logger) Fingerprint {
|
||||
}
|
||||
|
||||
func (f *ConsulFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error {
|
||||
if f.readInitialResponse(resp) {
|
||||
return nil
|
||||
}
|
||||
|
||||
var mErr *multierror.Error
|
||||
consulConfigs := req.Config.GetConsulConfigs(f.logger)
|
||||
for _, cfg := range consulConfigs {
|
||||
@@ -67,9 +83,43 @@ func (f *ConsulFingerprint) Fingerprint(req *FingerprintRequest, resp *Fingerpri
|
||||
}
|
||||
}
|
||||
|
||||
fingerprintCount := 0
|
||||
for _, state := range f.clusters {
|
||||
if state.fingerprintedOnce {
|
||||
fingerprintCount++
|
||||
}
|
||||
}
|
||||
if fingerprintCount == len(consulConfigs) {
|
||||
f.setInitialResponse(resp)
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// readInitialResponse checks for a previously seen response. It returns true
|
||||
// and shallow-copies the response into the argument if one is available. We
|
||||
// only want to hold the lock open during the read and not the Fingerprint so
|
||||
// that we don't block a Reload call while waiting for Consul requests to
|
||||
// complete. If the Reload clears the initialResponse after we take the lock
|
||||
// again in setInitialResponse (ex. 2 reloads quickly in a row), the worst that
|
||||
// happens is we do an extra fingerprint when the Reload caller calls
|
||||
// Fingerprint
|
||||
func (f *ConsulFingerprint) readInitialResponse(resp *FingerprintResponse) bool {
|
||||
f.initialResponseLock.RLock()
|
||||
defer f.initialResponseLock.RUnlock()
|
||||
if f.initialResponse != nil {
|
||||
*resp = *f.initialResponse
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (f *ConsulFingerprint) setInitialResponse(resp *FingerprintResponse) {
|
||||
f.initialResponseLock.Lock()
|
||||
defer f.initialResponseLock.Unlock()
|
||||
f.initialResponse = resp
|
||||
}
|
||||
|
||||
func (f *ConsulFingerprint) fingerprintImpl(cfg *config.ConsulConfig, resp *FingerprintResponse) error {
|
||||
logger := f.logger.With("cluster", cfg.Name)
|
||||
|
||||
@@ -103,18 +153,23 @@ func (f *ConsulFingerprint) fingerprintImpl(cfg *config.ConsulConfig, resp *Fing
|
||||
// create link for consul
|
||||
f.link(resp)
|
||||
|
||||
state.fingerprintedOnce = true
|
||||
resp.Detected = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *ConsulFingerprint) Periodic() (bool, time.Duration) {
|
||||
return false, 0
|
||||
return true, 15 * time.Second
|
||||
}
|
||||
|
||||
// Reload satisfies ReloadableFingerprint.
|
||||
func (f *ConsulFingerprint) Reload() {}
|
||||
// Reload satisfies ReloadableFingerprint and resets the gate on periodic
|
||||
// fingerprinting.
|
||||
func (f *ConsulFingerprint) Reload() {
|
||||
f.setInitialResponse(nil)
|
||||
}
|
||||
|
||||
func (cfs *consulState) initialize(cfg *config.ConsulConfig, logger hclog.Logger) error {
|
||||
cfs.fingerprintedOnce = false
|
||||
if cfs.client != nil {
|
||||
return nil // already initialized!
|
||||
}
|
||||
|
||||
@@ -637,20 +637,28 @@ func TestConsulFingerprint_Fingerprint_oss(t *testing.T) {
|
||||
node.Attributes["connect.grpc"] = "foo"
|
||||
node.Attributes["unique.consul.name"] = "foo"
|
||||
|
||||
// execute second query with error
|
||||
// execute second query with error but without reload
|
||||
err2 := cf.Fingerprint(&FingerprintRequest{Config: cfg, Node: node}, &resp2)
|
||||
must.NoError(t, err2) // does not return error
|
||||
must.Nil(t, resp2.Attributes) // attributes unset so they don't change
|
||||
must.Eq(t, resp.Attributes, resp2.Attributes) // attributes unset don't change
|
||||
must.True(t, resp.Detected) // never downgrade
|
||||
|
||||
// consul no longer available; an agent restart is required to clear an
|
||||
// existing fingerprint
|
||||
// Consul no longer available but an agent reload is required to clear a
|
||||
// detected cluster
|
||||
must.NotNil(t, cf.clusters[structs.ConsulDefaultCluster])
|
||||
|
||||
// execute third query no error
|
||||
// Reload to reset
|
||||
cf.Reload()
|
||||
var resp3 FingerprintResponse
|
||||
err3 := cf.Fingerprint(&FingerprintRequest{Config: cfg, Node: node}, &resp3)
|
||||
must.NoError(t, err3)
|
||||
must.NoError(t, err3) // does not return error
|
||||
must.Nil(t, resp3.Attributes) // attributes reset
|
||||
must.True(t, resp3.Detected) // detection is never reset
|
||||
|
||||
// execute 4th query no error
|
||||
var resp4 FingerprintResponse
|
||||
err4 := cf.Fingerprint(&FingerprintRequest{Config: cfg, Node: node}, &resp4)
|
||||
must.NoError(t, err4)
|
||||
must.Eq(t, map[string]string{
|
||||
"consul.datacenter": "dc1",
|
||||
"consul.revision": "3c1c22679",
|
||||
@@ -663,7 +671,7 @@ func TestConsulFingerprint_Fingerprint_oss(t *testing.T) {
|
||||
"consul.dns.port": "8600",
|
||||
"consul.ft.namespaces": "false",
|
||||
"unique.consul.name": "HAL9000",
|
||||
}, resp3.Attributes)
|
||||
}, resp4.Attributes)
|
||||
|
||||
// consul now available again
|
||||
must.NotNil(t, cf.clusters[structs.ConsulDefaultCluster])
|
||||
@@ -721,20 +729,28 @@ func TestConsulFingerprint_Fingerprint_ent(t *testing.T) {
|
||||
node.Attributes["connect.grpc"] = "foo"
|
||||
node.Attributes["unique.consul.name"] = "foo"
|
||||
|
||||
// execute second query with error
|
||||
// execute second query with error but without reload
|
||||
err2 := cf.Fingerprint(&FingerprintRequest{Config: cfg, Node: node}, &resp2)
|
||||
must.NoError(t, err2) // does not return error
|
||||
must.Nil(t, resp2.Attributes) // attributes unset so they don't change
|
||||
must.Eq(t, resp.Attributes, resp2.Attributes) // attributes unset don't change
|
||||
must.True(t, resp.Detected) // never downgrade
|
||||
|
||||
// consul no longer available; an agent restart is required to clear
|
||||
// a detected cluster
|
||||
// Consul no longer available but an agent reload is required to clear a
|
||||
// detected cluster
|
||||
must.NotNil(t, cf.clusters[structs.ConsulDefaultCluster])
|
||||
|
||||
// execute third query no error
|
||||
// Reload to reset
|
||||
cf.Reload()
|
||||
var resp3 FingerprintResponse
|
||||
err3 := cf.Fingerprint(&FingerprintRequest{Config: cfg, Node: node}, &resp3)
|
||||
must.NoError(t, err3)
|
||||
must.NoError(t, err3) // does not return error
|
||||
must.Nil(t, resp3.Attributes) // attributes reset
|
||||
must.True(t, resp3.Detected) // detection is never reset
|
||||
|
||||
// execute 4th query no error
|
||||
var resp4 FingerprintResponse
|
||||
err4 := cf.Fingerprint(&FingerprintRequest{Config: cfg, Node: node}, &resp4)
|
||||
must.NoError(t, err4)
|
||||
must.Eq(t, map[string]string{
|
||||
"consul.datacenter": "dc1",
|
||||
"consul.revision": "22ce6c6ad",
|
||||
@@ -749,9 +765,9 @@ func TestConsulFingerprint_Fingerprint_ent(t *testing.T) {
|
||||
"consul.dns.port": "8600",
|
||||
"consul.partition": "default",
|
||||
"unique.consul.name": "HAL9000",
|
||||
}, resp3.Attributes)
|
||||
}, resp4.Attributes)
|
||||
|
||||
// consul now available again
|
||||
must.NotNil(t, cf.clusters[structs.ConsulDefaultCluster])
|
||||
must.True(t, resp.Detected)
|
||||
must.True(t, resp4.Detected)
|
||||
}
|
||||
|
||||
@@ -124,8 +124,11 @@ type Fingerprint interface {
|
||||
Periodic() (bool, time.Duration)
|
||||
}
|
||||
|
||||
// ReloadableFingerprint can be implemented if the fingerprinter needs to be run during client reload.
|
||||
// If implemented, the client will call Reload during client reload then immediately Fingerprint
|
||||
// ReloadableFingerprint can be implemented if the fingerprinter needs to be run
|
||||
// during client reload. If implemented, the client will call Reload during
|
||||
// client reload then immediately Fingerprint. The Reload call is not protected
|
||||
// by the same mutex that Fingerprint is, so implementations must ensure they
|
||||
// are safe to call concurrently with a Fingerprint
|
||||
type ReloadableFingerprint interface {
|
||||
Fingerprint
|
||||
Reload()
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
@@ -21,11 +22,19 @@ import (
|
||||
type VaultFingerprint struct {
|
||||
logger log.Logger
|
||||
states map[string]*vaultFingerprintState
|
||||
|
||||
// Once initial fingerprints are complete, we no-op all periodic
|
||||
// fingerprints to prevent Vault availability issues causing a thundering
|
||||
// herd of node updates. This behavior resets if we reload the
|
||||
// configuration.
|
||||
initialResponse *FingerprintResponse
|
||||
initialResponseLock sync.RWMutex
|
||||
}
|
||||
|
||||
type vaultFingerprintState struct {
|
||||
client *vapi.Client
|
||||
isAvailable bool
|
||||
fingerprintedOnce bool
|
||||
}
|
||||
|
||||
// NewVaultFingerprint is used to create a Vault fingerprint
|
||||
@@ -37,6 +46,9 @@ func NewVaultFingerprint(logger log.Logger) Fingerprint {
|
||||
}
|
||||
|
||||
func (f *VaultFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error {
|
||||
if f.readInitialResponse(resp) {
|
||||
return nil
|
||||
}
|
||||
var mErr *multierror.Error
|
||||
vaultConfigs := req.Config.GetVaultConfigs(f.logger)
|
||||
|
||||
@@ -47,9 +59,43 @@ func (f *VaultFingerprint) Fingerprint(req *FingerprintRequest, resp *Fingerprin
|
||||
}
|
||||
}
|
||||
|
||||
fingerprintCount := 0
|
||||
for _, state := range f.states {
|
||||
if state.fingerprintedOnce {
|
||||
fingerprintCount++
|
||||
}
|
||||
}
|
||||
if fingerprintCount == len(vaultConfigs) {
|
||||
f.setInitialResponse(resp)
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// readInitialResponse checks for a previously seen response. It returns true
|
||||
// and shallow-copies the response into the argument if one is available. We
|
||||
// only want to hold the lock open during the read and not the Fingerprint so
|
||||
// that we don't block a Reload call while waiting for Vault requests to
|
||||
// complete. If the Reload clears the initialResponse after we take the lock
|
||||
// again in setInitialResponse (ex. 2 reloads quickly in a row), the worst that
|
||||
// happens is we do an extra fingerprint when the Reload caller calls
|
||||
// Fingerprint
|
||||
func (f *VaultFingerprint) readInitialResponse(resp *FingerprintResponse) bool {
|
||||
f.initialResponseLock.RLock()
|
||||
defer f.initialResponseLock.RUnlock()
|
||||
if f.initialResponse != nil {
|
||||
*resp = *f.initialResponse
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (f *VaultFingerprint) setInitialResponse(resp *FingerprintResponse) {
|
||||
f.initialResponseLock.Lock()
|
||||
defer f.initialResponseLock.Unlock()
|
||||
f.initialResponse = resp
|
||||
}
|
||||
|
||||
// fingerprintImpl fingerprints for a single Vault cluster
|
||||
func (f *VaultFingerprint) fingerprintImpl(cfg *config.VaultConfig, resp *FingerprintResponse) error {
|
||||
logger := f.logger.With("cluster", cfg.Name)
|
||||
@@ -103,14 +149,17 @@ func (f *VaultFingerprint) fingerprintImpl(cfg *config.VaultConfig, resp *Finger
|
||||
}
|
||||
|
||||
state.isAvailable = true
|
||||
state.fingerprintedOnce = true
|
||||
resp.Detected = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *VaultFingerprint) Periodic() (bool, time.Duration) {
|
||||
return false, 0
|
||||
return true, 15 * time.Second
|
||||
}
|
||||
|
||||
// Reload satisfies ReloadableFingerprint.
|
||||
func (f *VaultFingerprint) Reload() {}
|
||||
// Reload satisfies ReloadableFingerprint and resets the gate on periodic fingerprinting.
|
||||
func (f *VaultFingerprint) Reload() {
|
||||
f.setInitialResponse(nil)
|
||||
}
|
||||
|
||||
@@ -25,10 +25,6 @@ func TestVaultFingerprint(t *testing.T) {
|
||||
Attributes: make(map[string]string),
|
||||
}
|
||||
|
||||
p, period := fp.Periodic()
|
||||
must.False(t, p)
|
||||
must.Zero(t, period)
|
||||
|
||||
conf := config.DefaultConfig()
|
||||
conf.VaultConfigs[structs.VaultDefaultCluster] = tv.Config
|
||||
|
||||
@@ -46,9 +42,17 @@ func TestVaultFingerprint(t *testing.T) {
|
||||
// Stop Vault to simulate it being unavailable
|
||||
tv.Stop()
|
||||
|
||||
// Not detected this time
|
||||
// Fingerprint should not change without a reload
|
||||
var response2 FingerprintResponse
|
||||
err = fp.Fingerprint(request, &response2)
|
||||
must.NoError(t, err)
|
||||
must.False(t, response2.Detected)
|
||||
must.Eq(t, response1, response2)
|
||||
|
||||
// Fingerprint should update after a reload
|
||||
reloadable := fp.(ReloadableFingerprint)
|
||||
reloadable.Reload()
|
||||
var response3 FingerprintResponse
|
||||
err = fp.Fingerprint(request, &response3)
|
||||
must.NoError(t, err)
|
||||
must.False(t, response3.Detected)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user