Merge pull request #2318 from hashicorp/f-vault-debug

Server side Vault telemetry
This commit is contained in:
Alex Dadgar
2017-02-17 11:34:37 -08:00
committed by GitHub
6 changed files with 106 additions and 14 deletions

View File

@@ -249,7 +249,13 @@ func (c *vaultClient) DeriveToken(alloc *structs.Allocation, taskNames []string)
// Use the token supplied to interact with vault
c.client.SetToken("")
return c.tokenDeriver(alloc, taskNames, c.client)
tokens, err := c.tokenDeriver(alloc, taskNames, c.client)
if err != nil {
c.logger.Printf("[ERR] client.vault: failed to derive token for allocation %q and tasks %v: %v", alloc.ID, taskNames, err)
return nil, err
}
return tokens, nil
}
// GetConsulACL creates a vault API client and reads from vault a consul ACL

View File

@@ -1061,7 +1061,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
secret, err := n.srv.vault.CreateToken(ctx, alloc, task)
if err != nil {
wrapped := fmt.Errorf("failed to create token for task %q: %v", task, err)
wrapped := fmt.Errorf("failed to create token for task %q on alloc %q: %v", task, alloc.ID, err)
if rerr, ok := err.(*structs.RecoverableError); ok && rerr.Recoverable {
// If the error is recoverable, propogate it
return structs.NewRecoverableError(wrapped, true)
@@ -1117,10 +1117,10 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
// If there was an error revoke the created tokens
if createErr != nil {
n.srv.logger.Printf("[ERR] nomad.node: Vault token creation failed: %v", createErr)
n.srv.logger.Printf("[ERR] nomad.node: Vault token creation for alloc %q failed: %v", alloc.ID, createErr)
if revokeErr := n.srv.vault.RevokeTokens(context.Background(), accessors, false); revokeErr != nil {
n.srv.logger.Printf("[ERR] nomad.node: Vault token revocation failed: %v", revokeErr)
n.srv.logger.Printf("[ERR] nomad.node: Vault token revocation for alloc %q failed: %v", alloc.ID, revokeErr)
}
if rerr, ok := createErr.(*structs.RecoverableError); ok {
@@ -1136,7 +1136,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
req := structs.VaultAccessorsRequest{Accessors: accessors}
_, index, err := n.srv.raftApply(structs.VaultAccessorRegisterRequestType, &req)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: Register Vault accessors failed: %v", err)
n.srv.logger.Printf("[ERR] nomad.client: Register Vault accessors for alloc %q failed: %v", alloc.ID, err)
// Determine if we can recover from the error
retry := false

View File

@@ -286,6 +286,9 @@ func NewServer(config *Config, consulSyncer *consul.Syncer, logger *log.Logger)
// Emit metrics for the blocked eval tracker.
go blockedEvals.EmitStats(time.Second, s.shutdownCh)
// Emit metrics for the Vault client.
go s.vault.EmitStats(time.Second, s.shutdownCh)
// Emit metrics
go s.heartbeatStats()

View File

@@ -13,6 +13,7 @@ import (
"gopkg.in/tomb.v2"
metrics "github.com/armon/go-metrics"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
@@ -135,6 +136,21 @@ type VaultClient interface {
// Running returns whether the Vault client is running
Running() bool
// Stats returns the Vault clients statistics
Stats() *VaultStats
// EmitStats emits that clients statistics at the given period until stopCh
// is called.
EmitStats(period time.Duration, stopCh chan struct{})
}
// VaultStats returns all the stats about Vault tokens created and managed by
// Nomad.
type VaultStats struct {
// TrackedForRevoke is the count of tokens that are being tracked to be
// revoked since they could not be immediately revoked.
TrackedForRevoke int
}
// PurgeVaultAccessor is called to remove VaultAccessors from the system. If
@@ -204,6 +220,10 @@ type vaultClient struct {
tomb *tomb.Tomb
logger *log.Logger
// stats stores the stats
stats *VaultStats
statsLock sync.RWMutex
// l is used to lock the configuration aspects of the client such that
// multiple callers can't cause conflicting config updates
l sync.Mutex
@@ -227,6 +247,7 @@ func NewVaultClient(c *config.VaultConfig, logger *log.Logger, purgeFn PurgeVaul
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
stats: new(VaultStats),
}
if v.config.IsEnabled() {
@@ -821,7 +842,6 @@ func (v *vaultClient) CreateToken(ctx context.Context, a *structs.Allocation, ta
if !v.Enabled() {
return nil, fmt.Errorf("Vault integration disabled")
}
if !v.Active() {
return nil, structs.NewRecoverableError(fmt.Errorf("Vault client not active"), true)
}
@@ -833,6 +853,9 @@ func (v *vaultClient) CreateToken(ctx context.Context, a *structs.Allocation, ta
return nil, fmt.Errorf("Connection to Vault failed: %v", err)
}
// Track how long the request takes
defer metrics.MeasureSince([]string{"nomad", "vault", "create_token"}, time.Now())
// Retrieve the Vault block for the task
policies := a.Job.VaultPolicies()
if policies == nil {
@@ -908,6 +931,9 @@ func (v *vaultClient) LookupToken(ctx context.Context, token string) (*vapi.Secr
return nil, fmt.Errorf("Connection to Vault failed: %v", err)
}
// Track how long the request takes
defer metrics.MeasureSince([]string{"nomad", "vault", "lookup_token"}, time.Now())
// Ensure we are under our rate limit
if err := v.limiter.Wait(ctx); err != nil {
return nil, err
@@ -943,6 +969,9 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
return fmt.Errorf("Vault client not active")
}
// Track how long the request takes
defer metrics.MeasureSince([]string{"nomad", "vault", "revoke_tokens"}, time.Now())
// Check if we have established a connection with Vault. If not just add it
// to the queue
if established, err := v.ConnectionEstablished(); !established && err == nil {
@@ -952,22 +981,29 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
v.storeForRevocation(accessors)
}
// Track that we are abandoning these accessors.
metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_abandoned"}, float32(len(accessors)))
return nil
}
// Attempt to revoke immediately and if it fails, add it to the revoke queue
err := v.parallelRevoke(ctx, accessors)
if !committed {
if err != nil {
// If it is uncommitted, it is a best effort revoke as it will shortly
// TTL within the cubbyhole and has not been leaked to any outside
// system
return nil
}
if !committed {
metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_abandoned"}, float32(len(accessors)))
return nil
}
if err != nil {
v.logger.Printf("[WARN] vault: failed to revoke tokens. Will reattempt til TTL: %v", err)
v.storeForRevocation(accessors)
return nil
} else if !committed {
// Mark that it was revoked but there is nothing to purge so exit
metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_revoked"}, float32(len(accessors)))
return nil
}
if err := v.purgeFn(accessors); err != nil {
@@ -976,6 +1012,9 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
return nil
}
// Track that it was revoked successfully
metrics.IncrCounter([]string{"nomad", "vault", "distributed_tokens_revoked"}, float32(len(accessors)))
return nil
}
@@ -984,10 +1023,13 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
// time.
func (v *vaultClient) storeForRevocation(accessors []*structs.VaultAccessor) {
v.revLock.Lock()
v.statsLock.Lock()
now := time.Now()
for _, a := range accessors {
v.revoking[a] = now.Add(time.Duration(a.CreationTTL) * time.Second)
}
v.stats.TrackedForRevoke = len(v.revoking)
v.statsLock.Unlock()
v.revLock.Unlock()
}
@@ -1103,12 +1145,19 @@ func (v *vaultClient) revokeDaemon() {
continue
}
// Track that tokens were revoked successfully
metrics.IncrCounter([]string{"nomad", "vault", "distributed_tokens_revoked"}, float32(len(revoking)))
// Can delete from the tracked list now that we have purged
v.revLock.Lock()
v.statsLock.Lock()
for _, va := range revoking {
delete(v.revoking, va)
}
v.stats.TrackedForRevoke = len(v.revoking)
v.statsLock.Unlock()
v.revLock.Unlock()
}
}
}
@@ -1137,3 +1186,30 @@ func (v *vaultClient) setLimit(l rate.Limit) {
defer v.l.Unlock()
v.limiter = rate.NewLimiter(l, int(l))
}
// Stats is used to query the state of the blocked eval tracker.
func (v *vaultClient) Stats() *VaultStats {
// Allocate a new stats struct
stats := new(VaultStats)
v.statsLock.RLock()
defer v.statsLock.RUnlock()
// Copy all the stats
stats.TrackedForRevoke = v.stats.TrackedForRevoke
return stats
}
// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (v *vaultClient) EmitStats(period time.Duration, stopCh chan struct{}) {
for {
select {
case <-time.After(period):
stats := v.Stats()
metrics.SetGauge([]string{"nomad", "vault", "distributed_tokens_revoking"}, float32(stats.TrackedForRevoke))
case <-stopCh:
return
}
}
}

View File

@@ -993,6 +993,10 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
if len(client.revoking) != 2 {
t.Fatalf("didn't add to revoke loop")
}
if client.Stats().TrackedForRevoke != 2 {
t.Fatalf("didn't add to revoke loop")
}
}
func TestVaultClient_RevokeTokens_Root(t *testing.T) {

View File

@@ -2,6 +2,7 @@ package nomad
import (
"context"
"time"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
@@ -134,7 +135,9 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs
return nil
}
func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
func (v *TestVaultClient) Running() bool { return true }
func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
func (v *TestVaultClient) Running() bool { return true }
func (v *TestVaultClient) Stats() *VaultStats { return new(VaultStats) }
func (v *TestVaultClient) EmitStats(period time.Duration, stopCh chan struct{}) {}