mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 17:35:43 +03:00
Addressed review feedback
This commit is contained in:
@@ -1306,10 +1306,6 @@ func (c *Client) setupVaultClient() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if c.config.VaultConfig.Token == "" {
|
||||
return fmt.Errorf("vault token not set")
|
||||
}
|
||||
|
||||
var err error
|
||||
if c.vaultClient, err =
|
||||
vaultclient.NewVaultClient(c.config.VaultConfig, c.logger, c.deriveToken); err != nil {
|
||||
|
||||
@@ -22,32 +22,34 @@ type TokenDeriverFunc func(*structs.Allocation, []string, *vaultapi.Client) (map
|
||||
// The interface which nomad client uses to interact with vault and
|
||||
// periodically renews the tokens and secrets.
|
||||
type VaultClient interface {
|
||||
// Starts the renewal loop of tokens and secrets
|
||||
// Start initiates the renewal loop of tokens and secrets
|
||||
Start()
|
||||
|
||||
// Stops the renewal loop for tokens and secrets
|
||||
// Stop terminates the renewal loop for tokens and secrets
|
||||
Stop()
|
||||
|
||||
// Contacts the nomad server and fetches wrapped tokens for a set of
|
||||
// tasks. The wrapped tokens will be unwrapped using vault and
|
||||
// DeriveToken contacts the nomad server and fetches wrapped tokens for
|
||||
// a set of tasks. The wrapped tokens will be unwrapped using vault and
|
||||
// returned.
|
||||
DeriveToken(*structs.Allocation, []string) (map[string]string, error)
|
||||
|
||||
// Fetch the Consul ACL token required for the task
|
||||
// GetConsulACL fetches the Consul ACL token required for the task
|
||||
GetConsulACL(string, string) (*vaultapi.Secret, error)
|
||||
|
||||
// Renews a token with the given increment and adds it to the min-heap
|
||||
// for periodic renewal.
|
||||
// RenewToken renews a token with the given increment and adds it to
|
||||
// the min-heap for periodic renewal.
|
||||
RenewToken(string, int) <-chan error
|
||||
|
||||
// Removes the token from the min-heap, stopping its renewal.
|
||||
// StopRenewToken removes the token from the min-heap, stopping its
|
||||
// renewal.
|
||||
StopRenewToken(string) error
|
||||
|
||||
// Renews a vault secret's lease and add the lease identifier to the
|
||||
// min-heap for periodic renewal.
|
||||
// RenewLease renews a vault secret's lease and adds the lease
|
||||
// identifier to the min-heap for periodic renewal.
|
||||
RenewLease(string, int) <-chan error
|
||||
|
||||
// Removes a secret's lease ID from the min-heap, stopping its renewal.
|
||||
// StopRenewLease removes a secret's lease ID from the min-heap,
|
||||
// stopping its renewal.
|
||||
StopRenewLease(string) error
|
||||
}
|
||||
|
||||
@@ -70,19 +72,20 @@ type vaultClient struct {
|
||||
// tokenData is the data of the passed VaultClient token
|
||||
token *tokenData
|
||||
|
||||
// API client to interact with vault
|
||||
// client is the API client to interact with vault
|
||||
client *vaultapi.Client
|
||||
|
||||
// Channel to notify heap modifications to the renewal loop
|
||||
// updateCh is the channel to notify heap modifications to the renewal
|
||||
// loop
|
||||
updateCh chan struct{}
|
||||
|
||||
// Channel to trigger termination of renewal loop
|
||||
// stopCh is the channel to trigger termination of renewal loop
|
||||
stopCh chan struct{}
|
||||
|
||||
// Min-Heap to keep track of both tokens and leases
|
||||
// heap is the min-heap to keep track of both tokens and leases
|
||||
heap *vaultClientHeap
|
||||
|
||||
// Configuration to connect to vault
|
||||
// config is the configuration to connect to vault
|
||||
config *config.VaultConfig
|
||||
|
||||
lock sync.RWMutex
|
||||
@@ -100,19 +103,20 @@ type tokenData struct {
|
||||
Root bool
|
||||
}
|
||||
|
||||
// Request object for renewals. This can be used for both token renewals and
|
||||
// secret's lease renewals.
|
||||
// vaultClientRenewalRequest is a request object for renewal of both tokens and
|
||||
// secret's leases.
|
||||
type vaultClientRenewalRequest struct {
|
||||
// Channel into which any renewal error will be sent down to
|
||||
// errCh is the channel into which any renewal error will be sent to
|
||||
errCh chan error
|
||||
|
||||
// This can either be a token or a lease identifier
|
||||
// id is an identifier which represents either a token or a lease
|
||||
id string
|
||||
|
||||
// Duration for which the token or lease should be renewed for
|
||||
// increment is the duration for which the token or lease should be
|
||||
// renewed for
|
||||
increment int
|
||||
|
||||
// Indicates whether the 'id' field is a token or not
|
||||
// isToken indicates whether the 'id' field is a token or not
|
||||
isToken bool
|
||||
}
|
||||
|
||||
@@ -137,14 +141,14 @@ type vaultDataHeapImp []*vaultClientHeapEntry
|
||||
|
||||
// NewVaultClient returns a new vault client from the given config.
|
||||
func NewVaultClient(config *config.VaultConfig, logger *log.Logger, tokenDeriver TokenDeriverFunc) (*vaultClient, error) {
|
||||
if !config.Enabled {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if config == nil {
|
||||
return nil, fmt.Errorf("nil vault config")
|
||||
}
|
||||
|
||||
if !config.Enabled {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if config.TaskTokenTTL == "" {
|
||||
return nil, fmt.Errorf("task_token_ttl not set")
|
||||
}
|
||||
@@ -290,6 +294,10 @@ func (c *vaultClient) GetConsulACL(token, path string) (*vaultapi.Secret, error)
|
||||
return nil, fmt.Errorf("missing consul ACL token vault path")
|
||||
}
|
||||
|
||||
if !c.ConnectionEstablished() {
|
||||
return nil, fmt.Errorf("connection with vault is not yet established")
|
||||
}
|
||||
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
@@ -308,17 +316,20 @@ func (c *vaultClient) GetConsulACL(token, path string) (*vaultapi.Secret, error)
|
||||
// loop. Any error returned during renewal will be written to a buffered
|
||||
// channel and the channel is returned instead of an actual error. This helps
|
||||
// the caller be notified of a renewal failure asynchronously for appropriate
|
||||
// actions to be taken.
|
||||
// actions to be taken. The caller of this function need not have to close the
|
||||
// error channel.
|
||||
func (c *vaultClient) RenewToken(token string, increment int) <-chan error {
|
||||
// Create a buffered error channel
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
if token == "" {
|
||||
errCh <- fmt.Errorf("missing token")
|
||||
close(errCh)
|
||||
return errCh
|
||||
}
|
||||
if increment < 1 {
|
||||
errCh <- fmt.Errorf("increment cannot be less than 1")
|
||||
close(errCh)
|
||||
return errCh
|
||||
}
|
||||
|
||||
@@ -345,7 +356,8 @@ func (c *vaultClient) RenewToken(token string, increment int) <-chan error {
|
||||
// the renewal loop. Any error returned during renewal will be written to a
|
||||
// buffered channel and the channel is returned instead of an actual error.
|
||||
// This helps the caller be notified of a renewal failure asynchronously for
|
||||
// appropriate actions to be taken.
|
||||
// appropriate actions to be taken. The caller of this function need not have
|
||||
// to close the error channel.
|
||||
func (c *vaultClient) RenewLease(leaseId string, increment int) <-chan error {
|
||||
c.logger.Printf("[DEBUG] client.vault: renewing lease %q", leaseId)
|
||||
// Create a buffered error channel
|
||||
@@ -353,17 +365,19 @@ func (c *vaultClient) RenewLease(leaseId string, increment int) <-chan error {
|
||||
|
||||
if leaseId == "" {
|
||||
errCh <- fmt.Errorf("missing lease ID")
|
||||
close(errCh)
|
||||
return errCh
|
||||
}
|
||||
|
||||
if increment < 1 {
|
||||
errCh <- fmt.Errorf("increment cannot be less than 1")
|
||||
close(errCh)
|
||||
return errCh
|
||||
}
|
||||
|
||||
// Create a renewal request using the supplied lease and duration
|
||||
renewalReq := &vaultClientRenewalRequest{
|
||||
errCh: make(chan error, 1),
|
||||
errCh: errCh,
|
||||
id: leaseId,
|
||||
increment: increment,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user