mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
fingerprint: add support for fingerprinting multiple Vault clusters (#18253)
Add fingerprinting we'll need to accept multiple Vault clusters in upcoming Nomad Enterprise features. The fingerprinter will create a map of Vault clients by cluster name. In Nomad CE, all but the default cluster will be ignored and there will be no visible behavior change.
This commit is contained in:
@@ -10,8 +10,10 @@ import (
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/helper/useragent"
|
||||
"github.com/hashicorp/nomad/nomad/structs/config"
|
||||
vapi "github.com/hashicorp/vault/api"
|
||||
)
|
||||
|
||||
@@ -20,73 +22,121 @@ const (
|
||||
vaultUnavailable = "unavailable"
|
||||
)
|
||||
|
||||
var vaultBaseFingerprintInterval = 15 * time.Second
|
||||
|
||||
// VaultFingerprint is used to fingerprint for Vault
|
||||
type VaultFingerprint struct {
|
||||
logger log.Logger
|
||||
client *vapi.Client
|
||||
lastState string
|
||||
logger log.Logger
|
||||
states map[string]*fingerprintState
|
||||
}
|
||||
|
||||
type fingerprintState struct {
|
||||
client *vapi.Client
|
||||
isAvailable bool
|
||||
nextCheck time.Time
|
||||
}
|
||||
|
||||
// NewVaultFingerprint is used to create a Vault fingerprint
|
||||
func NewVaultFingerprint(logger log.Logger) Fingerprint {
|
||||
return &VaultFingerprint{logger: logger.Named("vault"), lastState: vaultUnavailable}
|
||||
return &VaultFingerprint{
|
||||
logger: logger.Named("vault"),
|
||||
states: map[string]*fingerprintState{},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *VaultFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error {
|
||||
config := req.Config
|
||||
var mErr *multierror.Error
|
||||
for _, cfg := range f.vaultConfigs(req) {
|
||||
err := f.fingerprintImpl(cfg, resp)
|
||||
if err != nil {
|
||||
mErr = multierror.Append(mErr, err)
|
||||
}
|
||||
}
|
||||
|
||||
if config.VaultConfig == nil || !config.VaultConfig.IsEnabled() {
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// fingerprintImpl fingerprints for a single Vault cluster
|
||||
func (f *VaultFingerprint) fingerprintImpl(cfg *config.VaultConfig, resp *FingerprintResponse) error {
|
||||
|
||||
logger := f.logger.With("cluster", cfg.Name)
|
||||
|
||||
state, ok := f.states[cfg.Name]
|
||||
if !ok {
|
||||
state = &fingerprintState{}
|
||||
f.states[cfg.Name] = state
|
||||
}
|
||||
if state.nextCheck.After(time.Now()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only create the client once to avoid creating too many connections to Vault
|
||||
if f.client == nil {
|
||||
vaultConfig, err := config.VaultConfig.ApiConfig()
|
||||
if state.client == nil {
|
||||
vaultConfig, err := cfg.ApiConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to initialize the Vault client config: %v", err)
|
||||
return fmt.Errorf("Failed to initialize the Vault client config for %s: %v", cfg.Name, err)
|
||||
}
|
||||
f.client, err = vapi.NewClient(vaultConfig)
|
||||
state.client, err = vapi.NewClient(vaultConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to initialize Vault client: %s", err)
|
||||
return fmt.Errorf("Failed to initialize Vault client for %s: %s", cfg.Name, err)
|
||||
}
|
||||
useragent.SetHeaders(f.client)
|
||||
useragent.SetHeaders(state.client)
|
||||
}
|
||||
|
||||
// Connect to vault and parse its information
|
||||
status, err := f.client.Sys().SealStatus()
|
||||
status, err := state.client.Sys().SealStatus()
|
||||
if err != nil {
|
||||
|
||||
// Print a message indicating that Vault is not available anymore
|
||||
if f.lastState == vaultAvailable {
|
||||
f.logger.Info("Vault is unavailable")
|
||||
if state.isAvailable {
|
||||
logger.Info("Vault is unavailable")
|
||||
}
|
||||
f.lastState = vaultUnavailable
|
||||
state.isAvailable = false
|
||||
state.nextCheck = time.Time{} // always check on next interval
|
||||
return nil
|
||||
}
|
||||
|
||||
resp.AddAttribute("vault.accessible", strconv.FormatBool(true))
|
||||
// We strip the Vault prefix because < 0.6.2 the version looks like:
|
||||
// status.Version = "Vault v0.6.1"
|
||||
resp.AddAttribute("vault.version", strings.TrimPrefix(status.Version, "Vault "))
|
||||
resp.AddAttribute("vault.cluster_id", status.ClusterID)
|
||||
resp.AddAttribute("vault.cluster_name", status.ClusterName)
|
||||
if cfg.Name == "default" {
|
||||
resp.AddAttribute("vault.accessible", strconv.FormatBool(true))
|
||||
resp.AddAttribute("vault.version", strings.TrimPrefix(status.Version, "Vault "))
|
||||
resp.AddAttribute("vault.cluster_id", status.ClusterID)
|
||||
resp.AddAttribute("vault.cluster_name", status.ClusterName)
|
||||
} else {
|
||||
resp.AddAttribute(fmt.Sprintf("vault.%s.accessible", cfg.Name), strconv.FormatBool(true))
|
||||
resp.AddAttribute(fmt.Sprintf("vault.%s.version", cfg.Name), strings.TrimPrefix(status.Version, "Vault "))
|
||||
resp.AddAttribute(fmt.Sprintf("vault.%s.cluster_id", cfg.Name), status.ClusterID)
|
||||
resp.AddAttribute(fmt.Sprintf("vault.%s.cluster_name", cfg.Name), status.ClusterName)
|
||||
}
|
||||
|
||||
// If Vault was previously unavailable print a message to indicate the Agent
|
||||
// is available now
|
||||
if f.lastState == vaultUnavailable {
|
||||
f.logger.Info("Vault is available")
|
||||
if !state.isAvailable {
|
||||
logger.Info("Vault is available")
|
||||
}
|
||||
f.lastState = vaultAvailable
|
||||
|
||||
// Widen the minimum window to the next check so that if one out of a set of
|
||||
// Vaults is unhealthy we don't greatly increase requests to the healthy
|
||||
// ones. This is less than the minimum window if all Vaults are healthy so
|
||||
// that we don't desync from the larger window provided by Periodic
|
||||
state.nextCheck = time.Now().Add(29 * time.Second)
|
||||
state.isAvailable = true
|
||||
|
||||
resp.Detected = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *VaultFingerprint) Periodic() (bool, time.Duration) {
|
||||
if f.lastState == vaultAvailable {
|
||||
// Fingerprint infrequently once Vault is initially discovered with wide
|
||||
// jitter to avoid thundering herds of fingerprints against central Vault
|
||||
// servers.
|
||||
return true, (30 * time.Second) + helper.RandomStagger(90*time.Second)
|
||||
if len(f.states) == 0 {
|
||||
return true, vaultBaseFingerprintInterval
|
||||
}
|
||||
return true, 15 * time.Second
|
||||
for _, state := range f.states {
|
||||
if !state.isAvailable {
|
||||
return true, vaultBaseFingerprintInterval
|
||||
}
|
||||
}
|
||||
|
||||
// Once all Vaults are initially discovered and healthy we fingerprint with
|
||||
// a wide jitter to avoid thundering herds of fingerprints against central
|
||||
// Vault servers.
|
||||
return true, (30 * time.Second) + helper.RandomStagger(90*time.Second)
|
||||
}
|
||||
|
||||
19
client/fingerprint/vault_ce.go
Normal file
19
client/fingerprint/vault_ce.go
Normal file
@@ -0,0 +1,19 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
//go:build !ent
|
||||
|
||||
package fingerprint
|
||||
|
||||
import "github.com/hashicorp/nomad/nomad/structs/config"
|
||||
|
||||
// vaultConfigs returns the set of Vault configurations the fingerprint needs to
|
||||
// check. In Nomad CE we only check the default Vault.
|
||||
func (f *VaultFingerprint) vaultConfigs(req *FingerprintRequest) map[string]*config.VaultConfig {
|
||||
agentCfg := req.Config
|
||||
if agentCfg.VaultConfig == nil || !agentCfg.VaultConfig.IsEnabled() {
|
||||
return nil
|
||||
}
|
||||
|
||||
return map[string]*config.VaultConfig{"default": agentCfg.VaultConfig}
|
||||
}
|
||||
@@ -64,6 +64,11 @@ func TestVaultFingerprint(t *testing.T) {
|
||||
// Stop Vault to simulate it being unavailable
|
||||
tv.Stop()
|
||||
|
||||
// Reset the nextCheck time for testing purposes, or we won't pick up the
|
||||
// change until the next period, up to 2min from now
|
||||
vfp := fp.(*VaultFingerprint)
|
||||
vfp.states["default"].nextCheck = time.Now()
|
||||
|
||||
err = fp.Fingerprint(request, &response)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to fingerprint: %s", err)
|
||||
|
||||
@@ -72,6 +72,7 @@ func NewTestVaultFromPath(t testing.T, binary string) *TestVault {
|
||||
RootToken: token,
|
||||
Client: client,
|
||||
Config: &config.VaultConfig{
|
||||
Name: "default",
|
||||
Enabled: &enable,
|
||||
Token: token,
|
||||
Addr: http,
|
||||
|
||||
Reference in New Issue
Block a user