mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 17:35:43 +03:00
Merge pull request #1828 from hashicorp/f-vault-options
Vault token renewal errors handled by client
This commit is contained in:
@@ -173,12 +173,13 @@ type Template struct {
|
||||
ChangeMode string
|
||||
ChangeSignal string
|
||||
Splay time.Duration
|
||||
Once bool
|
||||
}
|
||||
|
||||
type Vault struct {
|
||||
Policies []string
|
||||
Env bool
|
||||
Policies []string
|
||||
Env bool
|
||||
ChangeMode string
|
||||
ChangeSignal string
|
||||
}
|
||||
|
||||
// NewTask creates and initializes a new Task.
|
||||
|
||||
@@ -2,7 +2,6 @@ package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -31,10 +30,6 @@ const (
|
||||
// watchdogInterval is the interval at which resource constraints for the
|
||||
// allocation are being checked and enforced.
|
||||
watchdogInterval = 5 * time.Second
|
||||
|
||||
// vaultTokenFile is the name of the file holding the Vault token inside the
|
||||
// task's secret directory
|
||||
vaultTokenFile = "vault_token"
|
||||
)
|
||||
|
||||
// AllocStateUpdater is used to update the status of an allocation
|
||||
@@ -69,7 +64,6 @@ type AllocRunner struct {
|
||||
updateCh chan *structs.Allocation
|
||||
|
||||
vaultClient vaultclient.VaultClient
|
||||
vaultTokens map[string]vaultToken
|
||||
|
||||
otherAllocDir *allocdir.AllocDir
|
||||
|
||||
@@ -145,9 +139,6 @@ func (r *AllocRunner) RestoreState() error {
|
||||
return e
|
||||
}
|
||||
|
||||
// Recover the Vault tokens
|
||||
vaultErr := r.recoverVaultTokens()
|
||||
|
||||
// Restore the task runners
|
||||
var mErr multierror.Error
|
||||
for name, state := range r.taskStates {
|
||||
@@ -156,13 +147,9 @@ func (r *AllocRunner) RestoreState() error {
|
||||
|
||||
task := &structs.Task{Name: name}
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(),
|
||||
task)
|
||||
task, r.vaultClient)
|
||||
r.tasks[name] = tr
|
||||
|
||||
if vt, ok := r.vaultTokens[name]; ok {
|
||||
tr.SetVaultToken(vt.token, vt.renewalCh)
|
||||
}
|
||||
|
||||
// Skip tasks in terminal states.
|
||||
if state.State == structs.TaskStateDead {
|
||||
continue
|
||||
@@ -177,20 +164,6 @@ func (r *AllocRunner) RestoreState() error {
|
||||
}
|
||||
}
|
||||
|
||||
// Since this is somewhat of an expected case we do not return an error but
|
||||
// handle it gracefully.
|
||||
if vaultErr != nil {
|
||||
msg := fmt.Sprintf("failed to recover Vault tokens for allocation %q: %v", r.alloc.ID, vaultErr)
|
||||
r.logger.Printf("[ERR] client: %s", msg)
|
||||
r.setStatus(structs.AllocClientStatusFailed, msg)
|
||||
|
||||
// Destroy the task runners and set the error
|
||||
r.destroyTaskRunners(structs.NewTaskEvent(structs.TaskVaultRenewalFailed).SetVaultRenewalError(vaultErr))
|
||||
|
||||
// Handle cleanup
|
||||
go r.handleDestroy()
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
@@ -376,13 +349,6 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
|
||||
r.appendTaskEvent(taskState, event)
|
||||
|
||||
if state == structs.TaskStateDead {
|
||||
// If the task has a Vault token, stop renewing it
|
||||
if vt, ok := r.vaultTokens[taskName]; ok {
|
||||
if err := r.vaultClient.StopRenewToken(vt.token); err != nil {
|
||||
r.logger.Printf("[ERR] client: stopping token renewal for task %q failed: %v", taskName, err)
|
||||
}
|
||||
}
|
||||
|
||||
// If the task failed, we should kill all the other tasks in the task group.
|
||||
if taskState.Failed() {
|
||||
var destroyingTasks []string
|
||||
@@ -467,15 +433,6 @@ func (r *AllocRunner) Run() {
|
||||
return
|
||||
}
|
||||
|
||||
// Request Vault tokens for the tasks that require them
|
||||
err := r.deriveVaultTokens()
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failed to derive Vault token for allocation %q: %v", r.alloc.ID, err)
|
||||
r.logger.Printf("[ERR] client: %s", msg)
|
||||
r.setStatus(structs.AllocClientStatusFailed, msg)
|
||||
return
|
||||
}
|
||||
|
||||
// Start the task runners
|
||||
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.alloc.ID)
|
||||
r.taskLock.Lock()
|
||||
@@ -484,15 +441,10 @@ func (r *AllocRunner) Run() {
|
||||
continue
|
||||
}
|
||||
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(), task.Copy())
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(), task.Copy(), r.vaultClient)
|
||||
r.tasks[task.Name] = tr
|
||||
tr.MarkReceived()
|
||||
|
||||
// If the task has a vault token set it before running
|
||||
if vt, ok := r.vaultTokens[task.Name]; ok {
|
||||
tr.SetVaultToken(vt.token, vt.renewalCh)
|
||||
}
|
||||
|
||||
go tr.Run()
|
||||
}
|
||||
r.taskLock.Unlock()
|
||||
@@ -575,149 +527,6 @@ func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
|
||||
r.syncStatus()
|
||||
}
|
||||
|
||||
// vaultToken acts as a tuple of the token and renewal channel
|
||||
type vaultToken struct {
|
||||
token string
|
||||
renewalCh <-chan error
|
||||
}
|
||||
|
||||
// deriveVaultTokens derives the required vault tokens and returns a map of the
|
||||
// tasks to their respective vault token and renewal channel. This must be
|
||||
// called after the allocation directory is created as the vault tokens are
|
||||
// written to disk.
|
||||
func (r *AllocRunner) deriveVaultTokens() error {
|
||||
required, err := r.tasksRequiringVaultTokens()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(required) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if r.vaultTokens == nil {
|
||||
r.vaultTokens = make(map[string]vaultToken, len(required))
|
||||
}
|
||||
|
||||
// Get the tokens
|
||||
tokens, err := r.vaultClient.DeriveToken(r.Alloc(), required)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to derive Vault tokens: %v", err)
|
||||
}
|
||||
|
||||
// Persist the tokens to the appropriate secret directories
|
||||
adir := r.ctx.AllocDir
|
||||
for task, token := range tokens {
|
||||
// Has been recovered
|
||||
if _, ok := r.vaultTokens[task]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
secretDir, err := adir.GetSecretDir(task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to determine task %s secret dir in alloc %q: %v", task, r.alloc.ID, err)
|
||||
}
|
||||
|
||||
// Write the token to the file system
|
||||
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
||||
if err := ioutil.WriteFile(tokenPath, []byte(token), 0777); err != nil {
|
||||
return fmt.Errorf("failed to save Vault tokens to secret dir for task %q in alloc %q: %v", task, r.alloc.ID, err)
|
||||
}
|
||||
|
||||
// Start renewing the token
|
||||
renewCh, err := r.vaultClient.RenewToken(token, 10)
|
||||
if err != nil {
|
||||
var mErr multierror.Error
|
||||
errMsg := fmt.Errorf("failed to renew Vault token for task %q in alloc %q: %v", task, r.alloc.ID, err)
|
||||
multierror.Append(&mErr, errMsg)
|
||||
|
||||
// Clean up any token that we have started renewing
|
||||
for _, token := range r.vaultTokens {
|
||||
if err := r.vaultClient.StopRenewToken(token.token); err != nil {
|
||||
multierror.Append(&mErr, err)
|
||||
}
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
r.vaultTokens[task] = vaultToken{token: token, renewalCh: renewCh}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// tasksRequiringVaultTokens returns the set of tasks that require a Vault token
|
||||
func (r *AllocRunner) tasksRequiringVaultTokens() ([]string, error) {
|
||||
// Get the tasks
|
||||
tg := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
return nil, fmt.Errorf("Failed to lookup task group in alloc")
|
||||
}
|
||||
|
||||
// Retrieve any required Vault tokens
|
||||
var required []string
|
||||
for _, task := range tg.Tasks {
|
||||
if task.Vault != nil && len(task.Vault.Policies) != 0 {
|
||||
required = append(required, task.Name)
|
||||
}
|
||||
}
|
||||
|
||||
return required, nil
|
||||
}
|
||||
|
||||
// recoverVaultTokens reads the Vault tokens for the tasks that have Vault
|
||||
// tokens off disk. If there is an error, it is returned, otherwise token
|
||||
// renewal is started.
|
||||
func (r *AllocRunner) recoverVaultTokens() error {
|
||||
required, err := r.tasksRequiringVaultTokens()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(required) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read the tokens and start renewing them
|
||||
adir := r.ctx.AllocDir
|
||||
renewingTokens := make(map[string]vaultToken, len(required))
|
||||
for _, task := range required {
|
||||
secretDir, err := adir.GetSecretDir(task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to determine task %s secret dir in alloc %q: %v", task, r.alloc.ID, err)
|
||||
}
|
||||
|
||||
// Read the token from the secret directory
|
||||
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
||||
data, err := ioutil.ReadFile(tokenPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read token for task %q in alloc %q: %v", task, r.alloc.ID, err)
|
||||
}
|
||||
|
||||
token := string(data)
|
||||
renewCh, err := r.vaultClient.RenewToken(token, 10)
|
||||
if err != nil {
|
||||
var mErr multierror.Error
|
||||
errMsg := fmt.Errorf("failed to renew Vault token for task %q in alloc %q: %v", task, r.alloc.ID, err)
|
||||
multierror.Append(&mErr, errMsg)
|
||||
|
||||
// Clean up any token that we have started renewing
|
||||
for _, token := range renewingTokens {
|
||||
if err := r.vaultClient.StopRenewToken(token.token); err != nil {
|
||||
multierror.Append(&mErr, err)
|
||||
}
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
renewingTokens[task] = vaultToken{token: token, renewalCh: renewCh}
|
||||
}
|
||||
|
||||
r.vaultTokens = renewingTokens
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkResources monitors and enforces alloc resource usage. It returns an
|
||||
// appropriate task event describing why the allocation had to be killed.
|
||||
func (r *AllocRunner) checkResources() (*structs.TaskEvent, string) {
|
||||
|
||||
@@ -620,248 +620,6 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestAllocRunner_SimpleRun_VaultToken(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{"exit_code": "0"}
|
||||
task.Vault = &structs.Vault{
|
||||
Policies: []string{"default"},
|
||||
}
|
||||
|
||||
upd, ar := testAllocRunnerFromAlloc(alloc, false)
|
||||
go ar.Run()
|
||||
defer ar.Destroy()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, fmt.Errorf("No updates")
|
||||
}
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
if last.ClientStatus != structs.AllocClientStatusComplete {
|
||||
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
tr, ok := ar.tasks[task.Name]
|
||||
if !ok {
|
||||
t.Fatalf("No task runner made")
|
||||
}
|
||||
|
||||
// Check that the task runner was given the token
|
||||
token := tr.vaultToken
|
||||
if token == "" || tr.vaultRenewalCh == nil {
|
||||
t.Fatalf("Vault token not set properly")
|
||||
}
|
||||
|
||||
// Check that it was written to disk
|
||||
secretDir, err := ar.ctx.AllocDir.GetSecretDir(task.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
||||
data, err := ioutil.ReadFile(tokenPath)
|
||||
if err != nil {
|
||||
t.Fatalf("token not written to disk: %v", err)
|
||||
}
|
||||
|
||||
if string(data) != token {
|
||||
t.Fatalf("Bad token written to disk")
|
||||
}
|
||||
|
||||
// Check that we stopped renewing the token
|
||||
mockVC := ar.vaultClient.(*vaultclient.MockVaultClient)
|
||||
if len(mockVC.StoppedTokens) != 1 || mockVC.StoppedTokens[0] != token {
|
||||
t.Fatalf("We didn't stop renewing the token")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocRunner_SaveRestoreState_VaultTokens_Valid(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "10s",
|
||||
}
|
||||
task.Vault = &structs.Vault{
|
||||
Policies: []string{"default"},
|
||||
}
|
||||
|
||||
upd, ar := testAllocRunnerFromAlloc(alloc, false)
|
||||
go ar.Run()
|
||||
|
||||
// Snapshot state
|
||||
var token string
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if len(ar.tasks) != 1 {
|
||||
return false, fmt.Errorf("Task not started")
|
||||
}
|
||||
|
||||
tr, ok := ar.tasks[task.Name]
|
||||
if !ok {
|
||||
return false, fmt.Errorf("Incorrect task runner")
|
||||
}
|
||||
|
||||
if tr.vaultToken == "" {
|
||||
return false, fmt.Errorf("Bad token")
|
||||
}
|
||||
|
||||
token = tr.vaultToken
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("task never started: %v", err)
|
||||
})
|
||||
|
||||
err := ar.SaveState()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create a new alloc runner
|
||||
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
|
||||
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient)
|
||||
err = ar2.RestoreState()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
go ar2.Run()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if len(ar2.tasks) != 1 {
|
||||
return false, fmt.Errorf("Incorrect number of tasks")
|
||||
}
|
||||
|
||||
tr, ok := ar2.tasks[task.Name]
|
||||
if !ok {
|
||||
return false, fmt.Errorf("Incorrect task runner")
|
||||
}
|
||||
|
||||
if tr.vaultToken != token {
|
||||
return false, fmt.Errorf("Got token %q; want %q", tr.vaultToken, token)
|
||||
}
|
||||
|
||||
if upd.Count == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
return last.ClientStatus == structs.AllocClientStatusRunning, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
|
||||
})
|
||||
|
||||
// Destroy and wait
|
||||
ar2.Destroy()
|
||||
start := time.Now()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
alloc := ar2.Alloc()
|
||||
if alloc.ClientStatus != structs.AllocClientStatusComplete {
|
||||
return false, fmt.Errorf("Bad client status; got %v; want %v", alloc.ClientStatus, structs.AllocClientStatusComplete)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
|
||||
})
|
||||
|
||||
if time.Since(start) > time.Duration(testutil.TestMultiplier()*5)*time.Second {
|
||||
t.Fatalf("took too long to terminate")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocRunner_SaveRestoreState_VaultTokens_Invalid(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "10s",
|
||||
}
|
||||
task.Vault = &structs.Vault{
|
||||
Policies: []string{"default"},
|
||||
}
|
||||
|
||||
upd, ar := testAllocRunnerFromAlloc(alloc, false)
|
||||
go ar.Run()
|
||||
|
||||
// Snapshot state
|
||||
var token string
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if len(ar.tasks) != 1 {
|
||||
return false, fmt.Errorf("Task not started")
|
||||
}
|
||||
|
||||
tr, ok := ar.tasks[task.Name]
|
||||
if !ok {
|
||||
return false, fmt.Errorf("Incorrect task runner")
|
||||
}
|
||||
|
||||
if tr.vaultToken == "" {
|
||||
return false, fmt.Errorf("Bad token")
|
||||
}
|
||||
|
||||
token = tr.vaultToken
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("task never started: %v", err)
|
||||
})
|
||||
|
||||
err := ar.SaveState()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create a new alloc runner
|
||||
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
|
||||
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient)
|
||||
|
||||
// Invalidate the token
|
||||
mockVC := ar2.vaultClient.(*vaultclient.MockVaultClient)
|
||||
renewErr := fmt.Errorf("Test disallowing renewal")
|
||||
mockVC.SetRenewTokenError(token, renewErr)
|
||||
|
||||
// Restore and run
|
||||
err = ar2.RestoreState()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
go ar2.Run()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
return last.ClientStatus == structs.AllocClientStatusFailed, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
|
||||
})
|
||||
|
||||
// Destroy and wait
|
||||
ar2.Destroy()
|
||||
start := time.Now()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
alloc := ar2.Alloc()
|
||||
if alloc.ClientStatus != structs.AllocClientStatusFailed {
|
||||
return false, fmt.Errorf("Bad client status; got %v; want %v", alloc.ClientStatus, structs.AllocClientStatusFailed)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
|
||||
})
|
||||
|
||||
if time.Since(start) > time.Duration(testutil.TestMultiplier()*5)*time.Second {
|
||||
t.Fatalf("took too long to terminate")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocRunner_MoveAllocDir(t *testing.T) {
|
||||
// Create an alloc runner
|
||||
alloc := mock.Alloc()
|
||||
|
||||
@@ -141,6 +141,7 @@ func (tm *TaskTemplateManager) Stop() {
|
||||
|
||||
// run is the long lived loop that handles errors and templates being rendered
|
||||
func (tm *TaskTemplateManager) run() {
|
||||
// Runner is nil if there is no templates
|
||||
if tm.runner == nil {
|
||||
// Unblock the start if there is nothing to do
|
||||
if !tm.allRendered {
|
||||
@@ -189,6 +190,10 @@ func (tm *TaskTemplateManager) run() {
|
||||
|
||||
break WAIT
|
||||
}
|
||||
|
||||
// TODO Thinking, I believe we could check every 30 seconds and if
|
||||
// they are all would be rendered we should start anyways. That is
|
||||
// the reattach mechanism when they have all been rendered
|
||||
}
|
||||
|
||||
allRenderedTime = time.Now()
|
||||
|
||||
@@ -1021,7 +1021,6 @@ func (h *DockerHandle) Signal(s os.Signal) error {
|
||||
ID: h.containerID,
|
||||
Signal: dockerSignal,
|
||||
}
|
||||
h.logger.Printf("Sending: %v", dockerSignal)
|
||||
return h.client.KillContainer(opts)
|
||||
|
||||
}
|
||||
|
||||
@@ -101,6 +101,19 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
// Clear out the existing state
|
||||
defer func() {
|
||||
r.startErr = nil
|
||||
r.waitRes = nil
|
||||
r.restartTriggered = false
|
||||
}()
|
||||
|
||||
// Hot path if a restart was triggered
|
||||
if r.restartTriggered {
|
||||
r.reason = ""
|
||||
return structs.TaskRestarting, 0
|
||||
}
|
||||
|
||||
// Hot path if no attempts are expected
|
||||
if r.policy.Attempts == 0 {
|
||||
r.reason = ReasonNoRestartsAllowed
|
||||
@@ -121,25 +134,13 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
|
||||
r.startTime = now
|
||||
}
|
||||
|
||||
var state string
|
||||
var dur time.Duration
|
||||
if r.startErr != nil {
|
||||
state, dur = r.handleStartError()
|
||||
return r.handleStartError()
|
||||
} else if r.waitRes != nil {
|
||||
state, dur = r.handleWaitResult()
|
||||
} else if r.restartTriggered {
|
||||
state, dur = structs.TaskRestarting, 0
|
||||
r.reason = ""
|
||||
} else {
|
||||
state, dur = "", 0
|
||||
return r.handleWaitResult()
|
||||
}
|
||||
|
||||
// Clear out the existing state
|
||||
r.startErr = nil
|
||||
r.waitRes = nil
|
||||
r.restartTriggered = false
|
||||
|
||||
return state, dur
|
||||
return "", 0
|
||||
}
|
||||
|
||||
// handleStartError returns the new state and potential wait duration for
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -12,10 +13,12 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul-template/signals"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/client/getter"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
||||
"github.com/hashicorp/nomad/client/driver/env"
|
||||
@@ -35,6 +38,18 @@ const (
|
||||
// killFailureLimit is how many times we will attempt to kill a task before
|
||||
// giving up and potentially leaking resources.
|
||||
killFailureLimit = 5
|
||||
|
||||
// vaultBackoffBaseline is the baseline time for exponential backoff when
|
||||
// attempting to retrieve a Vault token
|
||||
vaultBackoffBaseline = 5 * time.Second
|
||||
|
||||
// vaultBackoffLimit is the limit of the exponential backoff when attempting
|
||||
// to retrieve a Vault token
|
||||
vaultBackoffLimit = 3 * time.Minute
|
||||
|
||||
// vaultTokenFile is the name of the file holding the Vault token inside the
|
||||
// task's secret directory
|
||||
vaultTokenFile = "vault_token"
|
||||
)
|
||||
|
||||
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
|
||||
@@ -53,8 +68,14 @@ type TaskRunner struct {
|
||||
resourceUsage *cstructs.TaskResourceUsage
|
||||
resourceUsageLock sync.RWMutex
|
||||
|
||||
task *structs.Task
|
||||
taskEnv *env.TaskEnvironment
|
||||
task *structs.Task
|
||||
taskDir string
|
||||
|
||||
// taskEnv is the environment variables of the task
|
||||
taskEnv *env.TaskEnvironment
|
||||
taskEnvLock sync.Mutex
|
||||
|
||||
// updateCh is used to receive updated versions of the allocation
|
||||
updateCh chan *structs.Allocation
|
||||
|
||||
handle driver.DriverHandle
|
||||
@@ -64,10 +85,14 @@ type TaskRunner struct {
|
||||
// downloaded
|
||||
artifactsDownloaded bool
|
||||
|
||||
// vaultToken and vaultRenewalCh are optionally set if the task requires
|
||||
// Vault tokens
|
||||
vaultToken string
|
||||
vaultRenewalCh <-chan error
|
||||
// vaultFuture is the means to wait for and get a Vault token
|
||||
vaultFuture *tokenFuture
|
||||
|
||||
// recoveredVaultToken is the token that was recovered through a restore
|
||||
recoveredVaultToken string
|
||||
|
||||
// vaultClient is used to retrieve and renew any needed Vault token
|
||||
vaultClient vaultclient.VaultClient
|
||||
|
||||
// templateManager is used to manage any consul-templates this task may have
|
||||
templateManager *TaskTemplateManager
|
||||
@@ -75,6 +100,9 @@ type TaskRunner struct {
|
||||
// templatesRendered mark whether the templates have been rendered
|
||||
templatesRendered bool
|
||||
|
||||
// startCh is used to trigger the start of the task
|
||||
startCh chan struct{}
|
||||
|
||||
// unblockCh is used to unblock the starting of the task
|
||||
unblockCh chan struct{}
|
||||
unblocked bool
|
||||
@@ -86,16 +114,13 @@ type TaskRunner struct {
|
||||
// signalCh is used to send a signal to a task
|
||||
signalCh chan SignalEvent
|
||||
|
||||
// killCh is used to kill a task
|
||||
killCh chan *structs.TaskEvent
|
||||
killed bool
|
||||
killLock sync.Mutex
|
||||
|
||||
destroy bool
|
||||
destroyCh chan struct{}
|
||||
destroyLock sync.Mutex
|
||||
destroyEvent *structs.TaskEvent
|
||||
waitCh chan struct{}
|
||||
|
||||
// waitCh closing marks the run loop as having exited
|
||||
waitCh chan struct{}
|
||||
|
||||
// serialize SaveState calls
|
||||
persistLock sync.Mutex
|
||||
@@ -128,7 +153,8 @@ type SignalEvent struct {
|
||||
// NewTaskRunner is used to create a new task context
|
||||
func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
updater TaskStateUpdater, ctx *driver.ExecContext,
|
||||
alloc *structs.Allocation, task *structs.Task) *TaskRunner {
|
||||
alloc *structs.Allocation, task *structs.Task,
|
||||
vaultClient vaultclient.VaultClient) *TaskRunner {
|
||||
|
||||
// Merge in the task resources
|
||||
task.Resources = alloc.TaskResources[task.Name]
|
||||
@@ -141,6 +167,13 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
}
|
||||
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
|
||||
|
||||
// Get the task directory
|
||||
taskDir, ok := ctx.AllocDir.TaskDirs[task.Name]
|
||||
if !ok {
|
||||
logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", alloc.ID, task.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
tc := &TaskRunner{
|
||||
config: config,
|
||||
updater: updater,
|
||||
@@ -149,25 +182,21 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
ctx: ctx,
|
||||
alloc: alloc,
|
||||
task: task,
|
||||
taskDir: taskDir,
|
||||
vaultClient: vaultClient,
|
||||
vaultFuture: NewTokenFuture().Set(""),
|
||||
updateCh: make(chan *structs.Allocation, 64),
|
||||
destroyCh: make(chan struct{}),
|
||||
waitCh: make(chan struct{}),
|
||||
startCh: make(chan struct{}, 1),
|
||||
unblockCh: make(chan struct{}),
|
||||
restartCh: make(chan *structs.TaskEvent),
|
||||
signalCh: make(chan SignalEvent),
|
||||
killCh: make(chan *structs.TaskEvent),
|
||||
}
|
||||
|
||||
return tc
|
||||
}
|
||||
|
||||
// SetVaultToken is used to set the Vault token and renewal channel for the task
|
||||
// runner
|
||||
func (r *TaskRunner) SetVaultToken(token string, renewalCh <-chan error) {
|
||||
r.vaultToken = token
|
||||
r.vaultRenewalCh = renewalCh
|
||||
}
|
||||
|
||||
// MarkReceived marks the task as received.
|
||||
func (r *TaskRunner) MarkReceived() {
|
||||
r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
|
||||
@@ -213,6 +242,27 @@ func (r *TaskRunner) RestoreState() error {
|
||||
r.task.Name, r.alloc.ID, err)
|
||||
}
|
||||
|
||||
if r.task.Vault != nil {
|
||||
secretDir, err := r.ctx.AllocDir.GetSecretDir(r.task.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to determine task %s secret dir in alloc %q: %v", r.task.Name, r.alloc.ID, err)
|
||||
}
|
||||
|
||||
// Read the token from the secret directory
|
||||
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
||||
data, err := ioutil.ReadFile(tokenPath)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to read token for task %q in alloc %q: %v", r.task.Name, r.alloc.ID, err)
|
||||
}
|
||||
|
||||
// Token file doesn't exist
|
||||
} else {
|
||||
// Store the recovered token
|
||||
r.recoveredVaultToken = string(data)
|
||||
}
|
||||
}
|
||||
|
||||
// Restore the driver
|
||||
if snap.HandleID != "" {
|
||||
driver, err := r.createDriver()
|
||||
@@ -277,7 +327,10 @@ func (r *TaskRunner) setState(state string, event *structs.TaskEvent) {
|
||||
// setTaskEnv sets the task environment. It returns an error if it could not be
|
||||
// created.
|
||||
func (r *TaskRunner) setTaskEnv() error {
|
||||
taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task.Copy(), r.alloc, r.vaultToken)
|
||||
r.taskEnvLock.Lock()
|
||||
defer r.taskEnvLock.Unlock()
|
||||
|
||||
taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task.Copy(), r.alloc, r.vaultFuture.Get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -285,13 +338,21 @@ func (r *TaskRunner) setTaskEnv() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// getTaskEnv returns the task environment
|
||||
func (r *TaskRunner) getTaskEnv() *env.TaskEnvironment {
|
||||
r.taskEnvLock.Lock()
|
||||
defer r.taskEnvLock.Unlock()
|
||||
return r.taskEnv
|
||||
}
|
||||
|
||||
// createDriver makes a driver for the task
|
||||
func (r *TaskRunner) createDriver() (driver.Driver, error) {
|
||||
if r.taskEnv == nil {
|
||||
env := r.getTaskEnv()
|
||||
if env == nil {
|
||||
return nil, fmt.Errorf("task environment not made for task %q in allocation %q", r.task.Name, r.alloc.ID)
|
||||
}
|
||||
|
||||
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, r.taskEnv)
|
||||
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env)
|
||||
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
|
||||
@@ -313,14 +374,20 @@ func (r *TaskRunner) Run() {
|
||||
return
|
||||
}
|
||||
|
||||
if err := r.setTaskEnv(); err != nil {
|
||||
r.setState(
|
||||
structs.TaskStateDead,
|
||||
structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||
return
|
||||
// If there is no Vault policy leave the static future created in
|
||||
// NewTaskRunner
|
||||
if r.task.Vault != nil {
|
||||
// Start the go-routine to get a Vault token
|
||||
r.vaultFuture.Clear()
|
||||
go r.vaultManager(r.recoveredVaultToken)
|
||||
}
|
||||
|
||||
// Start the run loop
|
||||
r.run()
|
||||
|
||||
// Do any cleanup necessary
|
||||
r.postrun()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -356,25 +423,287 @@ func (r *TaskRunner) validateTask() error {
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// prestart handles life-cycle tasks that occur before the task has started.
|
||||
func (r *TaskRunner) prestart(taskDir string) (success bool) {
|
||||
// Build the template manager
|
||||
var err error
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, r.templatesRendered,
|
||||
r.config, r.vaultToken, taskDir, r.taskEnv)
|
||||
// tokenFuture stores the Vault token and allows consumers to block till a valid
|
||||
// token exists
|
||||
type tokenFuture struct {
|
||||
waiting []chan struct{}
|
||||
token string
|
||||
set bool
|
||||
m sync.Mutex
|
||||
}
|
||||
|
||||
// NewTokenFuture returns a new token future without any token set
|
||||
func NewTokenFuture() *tokenFuture {
|
||||
return &tokenFuture{}
|
||||
}
|
||||
|
||||
// Wait returns a channel that can be waited on. When this channel unblocks, a
|
||||
// valid token will be available via the Get method
|
||||
func (f *tokenFuture) Wait() <-chan struct{} {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
|
||||
c := make(chan struct{})
|
||||
if f.set {
|
||||
close(c)
|
||||
return c
|
||||
}
|
||||
|
||||
f.waiting = append(f.waiting, c)
|
||||
return c
|
||||
}
|
||||
|
||||
// Set sets the token value and unblocks any caller of Wait
|
||||
func (f *tokenFuture) Set(token string) *tokenFuture {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
|
||||
f.set = true
|
||||
f.token = token
|
||||
for _, w := range f.waiting {
|
||||
close(w)
|
||||
}
|
||||
f.waiting = nil
|
||||
return f
|
||||
}
|
||||
|
||||
// Clear clears the set vault token.
|
||||
func (f *tokenFuture) Clear() *tokenFuture {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
|
||||
f.token = ""
|
||||
f.set = false
|
||||
return f
|
||||
}
|
||||
|
||||
// Get returns the set Vault token
|
||||
func (f *tokenFuture) Get() string {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
return f.token
|
||||
}
|
||||
|
||||
// vaultManager should be called in a go-routine and manages the derivation,
|
||||
// renewal and handling of errors with the Vault token. The optional parameter
|
||||
// allows setting the initial Vault token. This is useful when the Vault token
|
||||
// is recovered off disk.
|
||||
func (r *TaskRunner) vaultManager(token string) {
|
||||
// updatedToken lets us store state between loops. If true, a new token
|
||||
// has been retrieved and we need to apply the Vault change mode
|
||||
var updatedToken bool
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
// Check if we should exit
|
||||
select {
|
||||
case <-r.waitCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Clear the token
|
||||
r.vaultFuture.Clear()
|
||||
|
||||
// Check if there already is a token which can be the case for
|
||||
// restoring the TaskRunner
|
||||
if token == "" {
|
||||
// Get a token
|
||||
var ok bool
|
||||
token, ok = r.deriveVaultToken()
|
||||
if !ok {
|
||||
// We are shutting down
|
||||
return
|
||||
}
|
||||
|
||||
// Write the token to disk
|
||||
if err := r.writeToken(token); err != nil {
|
||||
e := fmt.Errorf("failed to write Vault token to disk")
|
||||
r.logger.Printf("[ERR] client: %v for task %v on alloc %q: %v", e, r.task.Name, r.alloc.ID, err)
|
||||
r.Kill("vault", e.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Start the renewal process
|
||||
renewCh, err := r.vaultClient.RenewToken(token, 30)
|
||||
|
||||
// An error returned means the token is not being renewed
|
||||
if err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to start renewal of Vault token for task %v on alloc %q: %v", r.task.Name, r.alloc.ID, err)
|
||||
token = ""
|
||||
goto OUTER
|
||||
}
|
||||
|
||||
// The Vault token is valid now, so set it
|
||||
r.vaultFuture.Set(token)
|
||||
|
||||
if updatedToken {
|
||||
switch r.task.Vault.ChangeMode {
|
||||
case structs.VaultChangeModeSignal:
|
||||
s, err := signals.Parse(r.task.Vault.ChangeSignal)
|
||||
if err != nil {
|
||||
e := fmt.Errorf("failed to parse signal: %v", err)
|
||||
r.logger.Printf("[ERR] client: %v", err)
|
||||
r.Kill("vault", e.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if err := r.Signal("vault", "new Vault token acquired", s); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to send signal to task %v for alloc %q: %v", r.task.Name, r.alloc.ID, err)
|
||||
r.Kill("vault", fmt.Sprintf("failed to send signal to task: %v", err))
|
||||
return
|
||||
}
|
||||
case structs.VaultChangeModeRestart:
|
||||
r.Restart("vault", "new Vault token acquired")
|
||||
case structs.VaultChangeModeNoop:
|
||||
fallthrough
|
||||
default:
|
||||
r.logger.Printf("[ERR] client: Invalid Vault change mode: %q", r.task.Vault.ChangeMode)
|
||||
}
|
||||
|
||||
// We have handled it
|
||||
updatedToken = false
|
||||
|
||||
// Call the handler
|
||||
r.updatedTokenHandler()
|
||||
}
|
||||
|
||||
// Start watching for renewal errors
|
||||
select {
|
||||
case err := <-renewCh:
|
||||
// Clear the token
|
||||
token = ""
|
||||
r.logger.Printf("[ERR] client: failed to renew Vault token for task %v on alloc %q: %v", r.task.Name, r.alloc.ID, err)
|
||||
|
||||
// Check if we have to do anything
|
||||
if r.task.Vault.ChangeMode != structs.VaultChangeModeNoop {
|
||||
updatedToken = true
|
||||
}
|
||||
case <-r.waitCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// deriveVaultToken derives the Vault token using exponential backoffs. It
|
||||
// returns the Vault token and whether the token is valid. If it is not valid we
|
||||
// are shutting down
|
||||
func (r *TaskRunner) deriveVaultToken() (string, bool) {
|
||||
attempts := 0
|
||||
for {
|
||||
tokens, err := r.vaultClient.DeriveToken(r.alloc, []string{r.task.Name})
|
||||
if err == nil {
|
||||
return tokens[r.task.Name], true
|
||||
}
|
||||
|
||||
// Handle the retry case
|
||||
backoff := (1 << (2 * uint64(attempts))) * vaultBackoffBaseline
|
||||
if backoff > vaultBackoffLimit {
|
||||
backoff = vaultBackoffLimit
|
||||
}
|
||||
r.logger.Printf("[ERR] client: failed to derive Vault token for task %v on alloc %q: %v; retrying in %v", r.task.Name, r.alloc.ID, err, backoff)
|
||||
|
||||
attempts++
|
||||
|
||||
// Wait till retrying
|
||||
select {
|
||||
case <-r.waitCh:
|
||||
return "", false
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writeToken writes the given token to disk
|
||||
func (r *TaskRunner) writeToken(token string) error {
|
||||
// Write the token to disk
|
||||
secretDir, err := r.ctx.AllocDir.GetSecretDir(r.task.Name)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err))
|
||||
r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err)
|
||||
return fmt.Errorf("failed to determine task %s secret dir in alloc %q: %v", r.task.Name, r.alloc.ID, err)
|
||||
}
|
||||
|
||||
// Write the token to the file system
|
||||
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
||||
if err := ioutil.WriteFile(tokenPath, []byte(token), 0777); err != nil {
|
||||
return fmt.Errorf("failed to save Vault tokens to secret dir for task %q in alloc %q: %v", r.task.Name, r.alloc.ID, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updatedTokenHandler is called when a new Vault token is retrieved. Things
|
||||
// that rely on the token should be updated here.
|
||||
func (r *TaskRunner) updatedTokenHandler() {
|
||||
|
||||
// Update the tasks environment
|
||||
if err := r.setTaskEnv(); err != nil {
|
||||
r.setState(
|
||||
structs.TaskStateDead,
|
||||
structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||
return
|
||||
}
|
||||
|
||||
if r.templateManager != nil {
|
||||
r.templateManager.Stop()
|
||||
|
||||
// Create a new templateManager
|
||||
var err error
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, r.templatesRendered,
|
||||
r.config, r.vaultFuture.Get(), r.taskDir, r.getTaskEnv())
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err))
|
||||
r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// prestart handles life-cycle tasks that occur before the task has started.
|
||||
func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
|
||||
if r.task.Vault != nil {
|
||||
// Wait for the token
|
||||
r.logger.Printf("[DEBUG] client: waiting for Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID)
|
||||
tokenCh := r.vaultFuture.Wait()
|
||||
select {
|
||||
case <-tokenCh:
|
||||
case <-r.waitCh:
|
||||
resultCh <- false
|
||||
return
|
||||
}
|
||||
r.logger.Printf("[DEBUG] client: retrieved Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID)
|
||||
}
|
||||
|
||||
if err := r.setTaskEnv(); err != nil {
|
||||
r.setState(
|
||||
structs.TaskStateDead,
|
||||
structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||
resultCh <- false
|
||||
return
|
||||
}
|
||||
|
||||
// Build the template manager
|
||||
if r.templateManager == nil {
|
||||
var err error
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, r.templatesRendered,
|
||||
r.config, r.vaultFuture.Get(), r.taskDir, r.getTaskEnv())
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err))
|
||||
r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err)
|
||||
resultCh <- false
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
// Download the task's artifacts
|
||||
if !r.artifactsDownloaded && len(r.task.Artifacts) > 0 {
|
||||
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
|
||||
for _, artifact := range r.task.Artifacts {
|
||||
if err := getter.GetArtifact(r.taskEnv, artifact, taskDir); err != nil {
|
||||
if err := getter.GetArtifact(r.getTaskEnv(), artifact, r.taskDir); err != nil {
|
||||
r.setState(structs.TaskStatePending,
|
||||
structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(err))
|
||||
r.restartTracker.SetStartError(dstructs.NewRecoverableError(err, true))
|
||||
@@ -385,99 +714,106 @@ func (r *TaskRunner) prestart(taskDir string) (success bool) {
|
||||
r.artifactsDownloaded = true
|
||||
}
|
||||
|
||||
// We don't have to wait
|
||||
if r.templatesRendered {
|
||||
return true
|
||||
// We don't have to wait for any template
|
||||
if len(r.task.Templates) == 0 || r.templatesRendered {
|
||||
// Send the start signal
|
||||
select {
|
||||
case r.startCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
resultCh <- true
|
||||
return
|
||||
}
|
||||
|
||||
// Block for consul-template
|
||||
// TODO Hooks should register themselves as blocking and then we can
|
||||
// perioidcally enumerate what we are still blocked on
|
||||
select {
|
||||
case <-r.unblockCh:
|
||||
r.templatesRendered = true
|
||||
return true
|
||||
case event := <-r.killCh:
|
||||
r.setState(structs.TaskStateDead, event)
|
||||
r.logger.Printf("[ERR] client: task killed: %v", event)
|
||||
return false
|
||||
case update := <-r.updateCh:
|
||||
if err := r.handleUpdate(update); err != nil {
|
||||
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
|
||||
}
|
||||
case err := <-r.vaultRenewalCh:
|
||||
if err == nil {
|
||||
continue // Only handle once.
|
||||
|
||||
// Send the start signal
|
||||
select {
|
||||
case r.startCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
// This is a fatal error as the task is not valid if it requested a
|
||||
// Vault token and the token has now expired.
|
||||
r.logger.Printf("[WARN] client: vault token for task %q not renewed: %v", r.task.Name, err)
|
||||
r.Destroy(structs.NewTaskEvent(structs.TaskVaultRenewalFailed).SetVaultRenewalError(err))
|
||||
|
||||
case <-r.destroyCh:
|
||||
r.setState(structs.TaskStateDead, r.destroyEvent)
|
||||
return false
|
||||
resultCh <- true
|
||||
return
|
||||
case <-r.waitCh:
|
||||
// The run loop has exited so exit too
|
||||
resultCh <- false
|
||||
return
|
||||
}
|
||||
|
||||
RESTART:
|
||||
restart := r.shouldRestart()
|
||||
if !restart {
|
||||
return false
|
||||
resultCh <- false
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// postrun is used to do any cleanup that is necessary after exiting the runloop
|
||||
func (r *TaskRunner) postrun() {
|
||||
// Stop the template manager
|
||||
if r.templateManager != nil {
|
||||
r.templateManager.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// run is the main run loop that handles starting the application, destroying
|
||||
// it, restarts and signals.
|
||||
func (r *TaskRunner) run() {
|
||||
// Predeclare things so we can jump to the RESTART
|
||||
var handleEmpty bool
|
||||
var stopCollection chan struct{}
|
||||
|
||||
// Get the task directory
|
||||
taskDir, ok := r.ctx.AllocDir.TaskDirs[r.task.Name]
|
||||
if !ok {
|
||||
err := fmt.Errorf("task directory couldn't be found")
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||
r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name)
|
||||
return
|
||||
}
|
||||
|
||||
// Do all prestart events first
|
||||
if success := r.prestart(taskDir); !success {
|
||||
return
|
||||
}
|
||||
var handleWaitCh chan *dstructs.WaitResult
|
||||
|
||||
for {
|
||||
// Start the task if not yet started or it is being forced. This logic
|
||||
// is necessary because in the case of a restore the handle already
|
||||
// exists.
|
||||
r.handleLock.Lock()
|
||||
handleEmpty = r.handle == nil
|
||||
r.handleLock.Unlock()
|
||||
// Do the prestart activities
|
||||
prestartResultCh := make(chan bool, 1)
|
||||
go r.prestart(prestartResultCh)
|
||||
|
||||
if handleEmpty {
|
||||
startErr := r.startTask()
|
||||
r.restartTracker.SetStartError(startErr)
|
||||
if startErr != nil {
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr))
|
||||
goto RESTART
|
||||
}
|
||||
|
||||
// Mark the task as started
|
||||
r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
|
||||
r.runningLock.Lock()
|
||||
r.running = true
|
||||
r.runningLock.Unlock()
|
||||
}
|
||||
|
||||
if stopCollection == nil {
|
||||
stopCollection = make(chan struct{})
|
||||
go r.collectResourceUsageStats(stopCollection)
|
||||
}
|
||||
|
||||
// Wait for updates
|
||||
WAIT:
|
||||
for {
|
||||
select {
|
||||
case waitRes := <-r.handle.WaitCh():
|
||||
case success := <-prestartResultCh:
|
||||
if !success {
|
||||
return
|
||||
}
|
||||
case <-r.startCh:
|
||||
// Start the task if not yet started or it is being forced. This logic
|
||||
// is necessary because in the case of a restore the handle already
|
||||
// exists.
|
||||
r.handleLock.Lock()
|
||||
handleEmpty := r.handle == nil
|
||||
r.handleLock.Unlock()
|
||||
|
||||
if handleEmpty {
|
||||
startErr := r.startTask()
|
||||
r.restartTracker.SetStartError(startErr)
|
||||
if startErr != nil {
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr))
|
||||
goto RESTART
|
||||
}
|
||||
|
||||
// Mark the task as started
|
||||
r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
|
||||
r.runningLock.Lock()
|
||||
r.running = true
|
||||
r.runningLock.Unlock()
|
||||
}
|
||||
|
||||
if stopCollection == nil {
|
||||
stopCollection = make(chan struct{})
|
||||
go r.collectResourceUsageStats(stopCollection)
|
||||
}
|
||||
|
||||
handleWaitCh = r.handle.WaitCh()
|
||||
|
||||
case waitRes := <-handleWaitCh:
|
||||
if waitRes == nil {
|
||||
panic("nil wait")
|
||||
}
|
||||
@@ -503,16 +839,6 @@ func (r *TaskRunner) run() {
|
||||
if err := r.handleUpdate(update); err != nil {
|
||||
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
|
||||
}
|
||||
case err := <-r.vaultRenewalCh:
|
||||
if err == nil {
|
||||
// Only handle once.
|
||||
continue
|
||||
}
|
||||
|
||||
// This is a fatal error as the task is not valid if it
|
||||
// requested a Vault token and the token has now expired.
|
||||
r.logger.Printf("[WARN] client: vault token for task %q not renewed: %v", r.task.Name, err)
|
||||
r.Destroy(structs.NewTaskEvent(structs.TaskVaultRenewalFailed).SetVaultRenewalError(err))
|
||||
|
||||
case se := <-r.signalCh:
|
||||
r.logger.Printf("[DEBUG] client: task being signalled with %v: %s", se.s, se.e.TaskSignalReason)
|
||||
@@ -524,25 +850,38 @@ func (r *TaskRunner) run() {
|
||||
case event := <-r.restartCh:
|
||||
r.logger.Printf("[DEBUG] client: task being restarted: %s", event.RestartReason)
|
||||
r.setState(structs.TaskStateRunning, event)
|
||||
r.killTask(event.RestartReason, stopCollection)
|
||||
r.killTask(event.RestartReason)
|
||||
|
||||
close(stopCollection)
|
||||
|
||||
// Since the restart isn't from a failure, restart immediately
|
||||
// and don't count against the restart policy
|
||||
r.restartTracker.SetRestartTriggered()
|
||||
break WAIT
|
||||
|
||||
case event := <-r.killCh:
|
||||
r.logger.Printf("[ERR] client: task being killed: %s", event.KillReason)
|
||||
r.killTask(event.KillReason, stopCollection)
|
||||
return
|
||||
|
||||
case <-r.destroyCh:
|
||||
// Store the task event that provides context on the task destroy.
|
||||
if r.destroyEvent.Type != structs.TaskKilled {
|
||||
r.setState(structs.TaskStateRunning, r.destroyEvent)
|
||||
r.runningLock.Lock()
|
||||
running := r.running
|
||||
r.runningLock.Unlock()
|
||||
if !running {
|
||||
r.setState(structs.TaskStateDead, r.destroyEvent)
|
||||
return
|
||||
}
|
||||
|
||||
r.killTask("", stopCollection)
|
||||
// Store the task event that provides context on the task
|
||||
// destroy. The Killed event is set from the alloc_runner and
|
||||
// doesn't add detail
|
||||
reason := ""
|
||||
if r.destroyEvent.Type != structs.TaskKilled {
|
||||
if r.destroyEvent.Type == structs.TaskKilling {
|
||||
reason = r.destroyEvent.KillReason
|
||||
} else {
|
||||
r.setState(structs.TaskStateRunning, r.destroyEvent)
|
||||
}
|
||||
}
|
||||
|
||||
r.killTask(reason)
|
||||
close(stopCollection)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -556,6 +895,7 @@ func (r *TaskRunner) run() {
|
||||
// Clear the handle so a new driver will be created.
|
||||
r.handleLock.Lock()
|
||||
r.handle = nil
|
||||
handleWaitCh = nil
|
||||
stopCollection = nil
|
||||
r.handleLock.Unlock()
|
||||
}
|
||||
@@ -607,9 +947,7 @@ func (r *TaskRunner) shouldRestart() bool {
|
||||
}
|
||||
|
||||
// killTask kills the running task, storing the reason in the Killing TaskEvent.
|
||||
// The associated stats collection channel is also closed once the task is
|
||||
// successfully killed.
|
||||
func (r *TaskRunner) killTask(reason string, statsCh chan struct{}) {
|
||||
func (r *TaskRunner) killTask(reason string) {
|
||||
r.runningLock.Lock()
|
||||
running := r.running
|
||||
r.runningLock.Unlock()
|
||||
@@ -633,9 +971,6 @@ func (r *TaskRunner) killTask(reason string, statsCh chan struct{}) {
|
||||
r.running = false
|
||||
r.runningLock.Unlock()
|
||||
|
||||
// Stop collection of the task's resource usage
|
||||
close(statsCh)
|
||||
|
||||
// Store that the task has been destroyed and any associated error.
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err))
|
||||
}
|
||||
@@ -842,23 +1177,13 @@ func (r *TaskRunner) Signal(source, reason string, s os.Signal) error {
|
||||
}
|
||||
|
||||
// Kill will kill a task and store the error, no longer restarting the task
|
||||
// TODO need to be able to fail the task
|
||||
func (r *TaskRunner) Kill(source, reason string) {
|
||||
r.killLock.Lock()
|
||||
defer r.killLock.Unlock()
|
||||
if r.killed {
|
||||
return
|
||||
}
|
||||
|
||||
reasonStr := fmt.Sprintf("%s: %s", source, reason)
|
||||
event := structs.NewTaskEvent(structs.TaskKilling).SetKillReason(reasonStr)
|
||||
|
||||
r.logger.Printf("[DEBUG] client: killing task %v for alloc %q: %v", r.task.Name, r.alloc.ID, reasonStr)
|
||||
|
||||
select {
|
||||
case r.killCh <- event:
|
||||
close(r.killCh)
|
||||
case <-r.waitCh:
|
||||
}
|
||||
r.Destroy(event)
|
||||
}
|
||||
|
||||
// UnblockStart unblocks the starting of the task. It currently assumes only
|
||||
@@ -871,6 +1196,7 @@ func (r *TaskRunner) UnblockStart(source string) {
|
||||
}
|
||||
|
||||
r.logger.Printf("[DEBUG] client: unblocking task %v for alloc %q: %v", r.task.Name, r.alloc.ID, source)
|
||||
r.unblocked = true
|
||||
close(r.unblockCh)
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -14,6 +15,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
@@ -59,8 +61,9 @@ func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTas
|
||||
allocDir := allocdir.NewAllocDir(filepath.Join(conf.AllocDir, alloc.ID), task.Resources.DiskMB)
|
||||
allocDir.Build([]*structs.Task{task})
|
||||
|
||||
vclient := vaultclient.NewMockVaultClient()
|
||||
ctx := driver.NewExecContext(allocDir, alloc.ID)
|
||||
tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc, task)
|
||||
tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc, task, vclient)
|
||||
if !restarts {
|
||||
tr.restartTracker = noRestartsTracker()
|
||||
}
|
||||
@@ -219,24 +222,65 @@ func TestTaskRunner_Update(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTaskRunner_SaveRestoreState(t *testing.T) {
|
||||
ctestutil.ExecCompatible(t)
|
||||
upd, tr := testTaskRunner(false)
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "5s",
|
||||
}
|
||||
|
||||
// Change command to ensure we run for a bit
|
||||
tr.task.Config["command"] = "/bin/sleep"
|
||||
tr.task.Config["args"] = []string{"10"}
|
||||
// Give it a Vault token
|
||||
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
||||
|
||||
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
go tr.Run()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
|
||||
// Snapshot state
|
||||
time.Sleep(2 * time.Second)
|
||||
// Wait for the task to be running and then snapshot the state
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if l := len(upd.events); l != 2 {
|
||||
return false, fmt.Errorf("Expect two events; got %v", l)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
if err := tr.SaveState(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Read the token from the file system
|
||||
secretDir, err := tr.ctx.AllocDir.GetSecretDir(task.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to determine task %s secret dir: %v", err)
|
||||
}
|
||||
|
||||
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
||||
data, err := ioutil.ReadFile(tokenPath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read file: %v", err)
|
||||
}
|
||||
token := string(data)
|
||||
if len(token) == 0 {
|
||||
t.Fatalf("Token not written to disk")
|
||||
}
|
||||
|
||||
// Create a new task runner
|
||||
tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update,
|
||||
tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name})
|
||||
tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name}, tr.vaultClient)
|
||||
tr2.restartTracker = noRestartsTracker()
|
||||
if err := tr2.RestoreState(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -244,11 +288,16 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
|
||||
defer tr2.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
|
||||
// Destroy and wait
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return tr2.handle != nil, fmt.Errorf("RestoreState() didn't open handle")
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
select {
|
||||
case <-tr2.WaitCh():
|
||||
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
// Check that we recovered the token
|
||||
if act := tr2.vaultFuture.Get(); act != token {
|
||||
t.Fatalf("Vault token not properly recovered")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_Download_List(t *testing.T) {
|
||||
@@ -411,68 +460,6 @@ func TestTaskRunner_Validate_UserEnforcement(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_VaultTokenRenewal(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "10s",
|
||||
}
|
||||
task.Vault = &structs.Vault{
|
||||
Policies: []string{"default"},
|
||||
}
|
||||
|
||||
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
renewalCh := make(chan error, 1)
|
||||
renewalErr := fmt.Errorf("test vault renewal error")
|
||||
tr.SetVaultToken(structs.GenerateUUID(), renewalCh)
|
||||
go tr.Run()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
renewalCh <- renewalErr
|
||||
close(renewalCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
if len(upd.events) != 5 {
|
||||
t.Fatalf("should have 3 updates: %#v", upd.events)
|
||||
}
|
||||
|
||||
if upd.state != structs.TaskStateDead {
|
||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
if upd.events[2].Type != structs.TaskVaultRenewalFailed {
|
||||
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskVaultRenewalFailed)
|
||||
}
|
||||
|
||||
if upd.events[3].Type != structs.TaskKilling {
|
||||
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilling)
|
||||
}
|
||||
|
||||
if upd.events[4].Type != structs.TaskKilled {
|
||||
t.Fatalf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskKilled)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_RestartTask(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -618,3 +605,463 @@ func TestTaskRunner_SignalFailure(t *testing.T) {
|
||||
t.Fatalf("Didn't receive error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_BlockForVault(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "1s",
|
||||
}
|
||||
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
||||
|
||||
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
// Control when we get a Vault token
|
||||
token := "1234"
|
||||
waitCh := make(chan struct{})
|
||||
handler := func(*structs.Allocation, []string) (map[string]string, error) {
|
||||
<-waitCh
|
||||
return map[string]string{task.Name: token}, nil
|
||||
}
|
||||
tr.vaultClient.(*vaultclient.MockVaultClient).DeriveTokenFn = handler
|
||||
|
||||
go tr.Run()
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
t.Fatalf("premature exit")
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
|
||||
if len(upd.events) != 1 {
|
||||
t.Fatalf("should have 1 updates: %#v", upd.events)
|
||||
}
|
||||
|
||||
if upd.state != structs.TaskStatePending {
|
||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStatePending)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
// Unblock
|
||||
close(waitCh)
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
if len(upd.events) != 3 {
|
||||
t.Fatalf("should have 3 updates: %#v", upd.events)
|
||||
}
|
||||
|
||||
if upd.state != structs.TaskStateDead {
|
||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
if upd.events[2].Type != structs.TaskTerminated {
|
||||
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
|
||||
}
|
||||
|
||||
// Check that the token is on disk
|
||||
secretDir, err := tr.ctx.AllocDir.GetSecretDir(task.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to determine task %s secret dir: %v", err)
|
||||
}
|
||||
|
||||
// Read the token from the file system
|
||||
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
||||
data, err := ioutil.ReadFile(tokenPath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read file: %v", err)
|
||||
}
|
||||
|
||||
if act := string(data); act != token {
|
||||
t.Fatalf("Token didn't get written to disk properly, got %q; want %q", act, token)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_DeriveToken_Retry(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "1s",
|
||||
}
|
||||
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
||||
|
||||
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
// Control when we get a Vault token
|
||||
token := "1234"
|
||||
count := 0
|
||||
handler := func(*structs.Allocation, []string) (map[string]string, error) {
|
||||
if count > 0 {
|
||||
return map[string]string{task.Name: token}, nil
|
||||
}
|
||||
|
||||
count++
|
||||
return nil, fmt.Errorf("Want a retry")
|
||||
}
|
||||
tr.vaultClient.(*vaultclient.MockVaultClient).DeriveTokenFn = handler
|
||||
go tr.Run()
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
if len(upd.events) != 3 {
|
||||
t.Fatalf("should have 3 updates: %#v", upd.events)
|
||||
}
|
||||
|
||||
if upd.state != structs.TaskStateDead {
|
||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
if upd.events[2].Type != structs.TaskTerminated {
|
||||
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
|
||||
}
|
||||
|
||||
// Check that the token is on disk
|
||||
secretDir, err := tr.ctx.AllocDir.GetSecretDir(task.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to determine task %s secret dir: %v", err)
|
||||
}
|
||||
|
||||
// Read the token from the file system
|
||||
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
||||
data, err := ioutil.ReadFile(tokenPath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read file: %v", err)
|
||||
}
|
||||
|
||||
if act := string(data); act != token {
|
||||
t.Fatalf("Token didn't get written to disk properly, got %q; want %q", act, token)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_Template_Block(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "1s",
|
||||
}
|
||||
task.Templates = []*structs.Template{
|
||||
{
|
||||
EmbeddedTmpl: "{{key \"foo\"}}",
|
||||
DestPath: "local/test",
|
||||
ChangeMode: structs.TemplateChangeModeNoop,
|
||||
},
|
||||
}
|
||||
|
||||
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
go tr.Run()
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
t.Fatalf("premature exit")
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
|
||||
if len(upd.events) != 1 {
|
||||
t.Fatalf("should have 1 updates: %#v", upd.events)
|
||||
}
|
||||
|
||||
if upd.state != structs.TaskStatePending {
|
||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStatePending)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
// Unblock
|
||||
tr.UnblockStart("test")
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
if len(upd.events) != 3 {
|
||||
t.Fatalf("should have 3 updates: %#v", upd.events)
|
||||
}
|
||||
|
||||
if upd.state != structs.TaskStateDead {
|
||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
if upd.events[2].Type != structs.TaskTerminated {
|
||||
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_Template_NewVaultToken(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "1s",
|
||||
}
|
||||
task.Templates = []*structs.Template{
|
||||
{
|
||||
EmbeddedTmpl: "{{key \"foo\"}}",
|
||||
DestPath: "local/test",
|
||||
ChangeMode: structs.TemplateChangeModeNoop,
|
||||
},
|
||||
}
|
||||
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
||||
|
||||
_, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
go tr.Run()
|
||||
|
||||
// Wait for a Vault token
|
||||
var token string
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if token = tr.vaultFuture.Get(); token == "" {
|
||||
return false, fmt.Errorf("No Vault token")
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Error the token renewal
|
||||
vc := tr.vaultClient.(*vaultclient.MockVaultClient)
|
||||
renewalCh, ok := vc.RenewTokens[token]
|
||||
if !ok {
|
||||
t.Fatalf("no renewal channel")
|
||||
}
|
||||
|
||||
originalManager := tr.templateManager
|
||||
|
||||
renewalCh <- fmt.Errorf("Test killing")
|
||||
close(renewalCh)
|
||||
|
||||
// Wait for a new Vault token
|
||||
var token2 string
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if token2 = tr.vaultFuture.Get(); token2 == "" || token2 == token {
|
||||
return false, fmt.Errorf("No new Vault token")
|
||||
}
|
||||
|
||||
if originalManager == tr.templateManager {
|
||||
return false, fmt.Errorf("Template manager not updated")
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestTaskRunner_VaultManager_Restart(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "10s",
|
||||
}
|
||||
task.Vault = &structs.Vault{
|
||||
Policies: []string{"default"},
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
}
|
||||
|
||||
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
go tr.Run()
|
||||
|
||||
// Wait for the task to start
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if l := len(upd.events); l != 2 {
|
||||
return false, fmt.Errorf("Expect two events; got %v", l)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Error the token renewal
|
||||
vc := tr.vaultClient.(*vaultclient.MockVaultClient)
|
||||
renewalCh, ok := vc.RenewTokens[tr.vaultFuture.Get()]
|
||||
if !ok {
|
||||
t.Fatalf("no renewal channel")
|
||||
}
|
||||
|
||||
renewalCh <- fmt.Errorf("Test killing")
|
||||
close(renewalCh)
|
||||
|
||||
// Ensure a restart
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if l := len(upd.events); l != 7 {
|
||||
return false, fmt.Errorf("Expect seven events; got %#v", upd.events)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
if upd.events[2].Type != structs.TaskRestartSignal {
|
||||
return false, fmt.Errorf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskRestartSignal)
|
||||
}
|
||||
|
||||
if upd.events[3].Type != structs.TaskKilling {
|
||||
return false, fmt.Errorf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilling)
|
||||
}
|
||||
|
||||
if upd.events[4].Type != structs.TaskKilled {
|
||||
return false, fmt.Errorf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskKilled)
|
||||
}
|
||||
|
||||
if upd.events[5].Type != structs.TaskRestarting {
|
||||
return false, fmt.Errorf("Sixth Event was %v; want %v", upd.events[5].Type, structs.TaskRestarting)
|
||||
}
|
||||
|
||||
if upd.events[6].Type != structs.TaskStarted {
|
||||
return false, fmt.Errorf("Seventh Event was %v; want %v", upd.events[6].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestTaskRunner_VaultManager_Signal(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "10s",
|
||||
}
|
||||
task.Vault = &structs.Vault{
|
||||
Policies: []string{"default"},
|
||||
ChangeMode: structs.VaultChangeModeSignal,
|
||||
ChangeSignal: "SIGUSR1",
|
||||
}
|
||||
|
||||
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
go tr.Run()
|
||||
|
||||
// Wait for the task to start
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if l := len(upd.events); l != 2 {
|
||||
return false, fmt.Errorf("Expect two events; got %v", l)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Error the token renewal
|
||||
vc := tr.vaultClient.(*vaultclient.MockVaultClient)
|
||||
renewalCh, ok := vc.RenewTokens[tr.vaultFuture.Get()]
|
||||
if !ok {
|
||||
t.Fatalf("no renewal channel")
|
||||
}
|
||||
|
||||
renewalCh <- fmt.Errorf("Test killing")
|
||||
close(renewalCh)
|
||||
|
||||
// Ensure a restart
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if l := len(upd.events); l != 3 {
|
||||
return false, fmt.Errorf("Expect three events; got %#v", upd.events)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
if upd.events[2].Type != structs.TaskSignaling {
|
||||
return false, fmt.Errorf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskSignaling)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -350,19 +350,21 @@ func (c *vaultClient) renew(req *vaultClientRenewalRequest) error {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
if !c.config.IsEnabled() {
|
||||
return fmt.Errorf("vault client not enabled")
|
||||
}
|
||||
if !c.running {
|
||||
return fmt.Errorf("vault client is not running")
|
||||
}
|
||||
|
||||
if req == nil {
|
||||
return fmt.Errorf("nil renewal request")
|
||||
}
|
||||
if req.errCh == nil {
|
||||
return fmt.Errorf("renewal request error channel nil")
|
||||
}
|
||||
|
||||
if !c.config.IsEnabled() {
|
||||
close(req.errCh)
|
||||
return fmt.Errorf("vault client not enabled")
|
||||
}
|
||||
if !c.running {
|
||||
close(req.errCh)
|
||||
return fmt.Errorf("vault client is not running")
|
||||
}
|
||||
if req.id == "" {
|
||||
close(req.errCh)
|
||||
return fmt.Errorf("missing id in renewal request")
|
||||
|
||||
@@ -21,12 +21,21 @@ type MockVaultClient struct {
|
||||
// DeriveTokenErrors maps an allocation ID and tasks to an error when the
|
||||
// token is derived
|
||||
DeriveTokenErrors map[string]map[string]error
|
||||
|
||||
// DeriveTokenFn allows the caller to control the DeriveToken function. If
|
||||
// not set an error is returned if found in DeriveTokenErrors and otherwise
|
||||
// a token is generated and returned
|
||||
DeriveTokenFn func(a *structs.Allocation, tasks []string) (map[string]string, error)
|
||||
}
|
||||
|
||||
// NewMockVaultClient returns a MockVaultClient for testing
|
||||
func NewMockVaultClient() *MockVaultClient { return &MockVaultClient{} }
|
||||
|
||||
func (vc *MockVaultClient) DeriveToken(a *structs.Allocation, tasks []string) (map[string]string, error) {
|
||||
if vc.DeriveTokenFn != nil {
|
||||
return vc.DeriveTokenFn(a, tasks)
|
||||
}
|
||||
|
||||
tokens := make(map[string]string, len(tasks))
|
||||
for _, task := range tasks {
|
||||
if tasks, ok := vc.DeriveTokenErrors[a.ID]; ok {
|
||||
|
||||
@@ -209,8 +209,8 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
|
||||
|
||||
// If we have a vault block, then parse that
|
||||
if o := listVal.Filter("vault"); len(o.Items) > 0 {
|
||||
var jobVault structs.Vault
|
||||
if err := parseVault(&jobVault, o); err != nil {
|
||||
jobVault := structs.DefaultVaultBlock()
|
||||
if err := parseVault(jobVault, o); err != nil {
|
||||
return multierror.Prefix(err, "vault ->")
|
||||
}
|
||||
|
||||
@@ -218,7 +218,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
|
||||
for _, tg := range result.TaskGroups {
|
||||
for _, task := range tg.Tasks {
|
||||
if task.Vault == nil {
|
||||
task.Vault = &jobVault
|
||||
task.Vault = jobVault
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -335,15 +335,15 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error {
|
||||
|
||||
// If we have a vault block, then parse that
|
||||
if o := listVal.Filter("vault"); len(o.Items) > 0 {
|
||||
var tgVault structs.Vault
|
||||
if err := parseVault(&tgVault, o); err != nil {
|
||||
tgVault := structs.DefaultVaultBlock()
|
||||
if err := parseVault(tgVault, o); err != nil {
|
||||
return multierror.Prefix(err, fmt.Sprintf("'%s', vault ->", n))
|
||||
}
|
||||
|
||||
// Go through the tasks and if they don't have a Vault block, set it
|
||||
for _, task := range g.Tasks {
|
||||
if task.Vault == nil {
|
||||
task.Vault = &tgVault
|
||||
task.Vault = tgVault
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -717,12 +717,12 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
|
||||
|
||||
// If we have a vault block, then parse that
|
||||
if o := listVal.Filter("vault"); len(o.Items) > 0 {
|
||||
var v structs.Vault
|
||||
if err := parseVault(&v, o); err != nil {
|
||||
v := structs.DefaultVaultBlock()
|
||||
if err := parseVault(v, o); err != nil {
|
||||
return multierror.Prefix(err, fmt.Sprintf("'%s', vault ->", n))
|
||||
}
|
||||
|
||||
t.Vault = &v
|
||||
t.Vault = v
|
||||
}
|
||||
|
||||
*result = append(*result, &t)
|
||||
@@ -1177,6 +1177,8 @@ func parseVault(result *structs.Vault, list *ast.ObjectList) error {
|
||||
valid := []string{
|
||||
"policies",
|
||||
"env",
|
||||
"change_mode",
|
||||
"change_signal",
|
||||
}
|
||||
if err := checkHCLKeys(listVal, valid); err != nil {
|
||||
return multierror.Prefix(err, "vault ->")
|
||||
@@ -1187,11 +1189,6 @@ func parseVault(result *structs.Vault, list *ast.ObjectList) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Default the env bool
|
||||
if _, ok := m["env"]; !ok {
|
||||
m["env"] = true
|
||||
}
|
||||
|
||||
if err := mapstructure.WeakDecode(m, result); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -159,8 +159,9 @@ func TestParse(t *testing.T) {
|
||||
},
|
||||
},
|
||||
Vault: &structs.Vault{
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: true,
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: true,
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
},
|
||||
Templates: []*structs.Template{
|
||||
{
|
||||
@@ -199,6 +200,12 @@ func TestParse(t *testing.T) {
|
||||
},
|
||||
},
|
||||
LogConfig: structs.DefaultLogConfig(),
|
||||
Vault: &structs.Vault{
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: false,
|
||||
ChangeMode: structs.VaultChangeModeSignal,
|
||||
ChangeSignal: "SIGUSR1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -475,16 +482,18 @@ func TestParse(t *testing.T) {
|
||||
Name: "redis",
|
||||
LogConfig: structs.DefaultLogConfig(),
|
||||
Vault: &structs.Vault{
|
||||
Policies: []string{"group"},
|
||||
Env: true,
|
||||
Policies: []string{"group"},
|
||||
Env: true,
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
},
|
||||
},
|
||||
&structs.Task{
|
||||
Name: "redis2",
|
||||
LogConfig: structs.DefaultLogConfig(),
|
||||
Vault: &structs.Vault{
|
||||
Policies: []string{"task"},
|
||||
Env: false,
|
||||
Policies: []string{"task"},
|
||||
Env: false,
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -498,8 +507,9 @@ func TestParse(t *testing.T) {
|
||||
Name: "redis",
|
||||
LogConfig: structs.DefaultLogConfig(),
|
||||
Vault: &structs.Vault{
|
||||
Policies: []string{"job"},
|
||||
Env: true,
|
||||
Policies: []string{"job"},
|
||||
Env: true,
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -526,6 +536,10 @@ func TestParse(t *testing.T) {
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(actual, tc.Result) {
|
||||
diff, err := actual.Diff(tc.Result, true)
|
||||
if err == nil {
|
||||
t.Logf("file %s diff:\n%#v\n", tc.File, diff)
|
||||
}
|
||||
t.Fatalf("file: %s\n\n%#v\n\n%#v", tc.File, actual, tc.Result)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,6 +165,13 @@ job "binstore-storagelocker" {
|
||||
attribute = "kernel.arch"
|
||||
value = "amd64"
|
||||
}
|
||||
|
||||
vault {
|
||||
policies = ["foo", "bar"]
|
||||
env = false
|
||||
change_mode = "signal"
|
||||
change_signal = "SIGUSR1"
|
||||
}
|
||||
}
|
||||
|
||||
constraint {
|
||||
|
||||
@@ -373,7 +373,10 @@ func TestJobEndpoint_Register_Vault_Disabled(t *testing.T) {
|
||||
|
||||
// Create the register request with a job asking for a vault policy
|
||||
job := mock.Job()
|
||||
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{"foo"}}
|
||||
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{
|
||||
Policies: []string{"foo"},
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
}
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
@@ -405,7 +408,10 @@ func TestJobEndpoint_Register_Vault_AllowUnauthenticated(t *testing.T) {
|
||||
|
||||
// Create the register request with a job asking for a vault policy
|
||||
job := mock.Job()
|
||||
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{"foo"}}
|
||||
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{
|
||||
Policies: []string{"foo"},
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
}
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
@@ -451,7 +457,10 @@ func TestJobEndpoint_Register_Vault_NoToken(t *testing.T) {
|
||||
// Create the register request with a job asking for a vault policy but
|
||||
// don't send a Vault token
|
||||
job := mock.Job()
|
||||
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{"foo"}}
|
||||
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{
|
||||
Policies: []string{"foo"},
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
}
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
@@ -506,7 +515,10 @@ func TestJobEndpoint_Register_Vault_Policies(t *testing.T) {
|
||||
// send the bad Vault token
|
||||
job := mock.Job()
|
||||
job.VaultToken = badToken
|
||||
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{policy}}
|
||||
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{
|
||||
Policies: []string{policy},
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
}
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
@@ -565,7 +577,10 @@ func TestJobEndpoint_Register_Vault_Policies(t *testing.T) {
|
||||
// send the root Vault token
|
||||
job2 := mock.Job()
|
||||
job2.VaultToken = rootToken
|
||||
job2.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{policy}}
|
||||
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{
|
||||
Policies: []string{policy},
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
}
|
||||
req = &structs.JobRegisterRequest{
|
||||
Job: job2,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
|
||||
@@ -3032,8 +3032,10 @@ func TestTaskDiff(t *testing.T) {
|
||||
Old: &Task{},
|
||||
New: &Task{
|
||||
Vault: &Vault{
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: true,
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: true,
|
||||
ChangeMode: "signal",
|
||||
ChangeSignal: "SIGUSR1",
|
||||
},
|
||||
},
|
||||
Expected: &TaskDiff{
|
||||
@@ -3043,6 +3045,18 @@ func TestTaskDiff(t *testing.T) {
|
||||
Type: DiffTypeAdded,
|
||||
Name: "Vault",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "ChangeMode",
|
||||
Old: "",
|
||||
New: "signal",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "ChangeSignal",
|
||||
Old: "",
|
||||
New: "SIGUSR1",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "Env",
|
||||
@@ -3078,8 +3092,10 @@ func TestTaskDiff(t *testing.T) {
|
||||
// Vault deleted
|
||||
Old: &Task{
|
||||
Vault: &Vault{
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: true,
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: true,
|
||||
ChangeMode: "signal",
|
||||
ChangeSignal: "SIGUSR1",
|
||||
},
|
||||
},
|
||||
New: &Task{},
|
||||
@@ -3090,6 +3106,18 @@ func TestTaskDiff(t *testing.T) {
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "Vault",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "ChangeMode",
|
||||
Old: "signal",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "ChangeSignal",
|
||||
Old: "SIGUSR1",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "Env",
|
||||
@@ -3125,14 +3153,18 @@ func TestTaskDiff(t *testing.T) {
|
||||
// Vault edited
|
||||
Old: &Task{
|
||||
Vault: &Vault{
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: true,
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: true,
|
||||
ChangeMode: "signal",
|
||||
ChangeSignal: "SIGUSR1",
|
||||
},
|
||||
},
|
||||
New: &Task{
|
||||
Vault: &Vault{
|
||||
Policies: []string{"bar", "baz"},
|
||||
Env: false,
|
||||
Policies: []string{"bar", "baz"},
|
||||
Env: false,
|
||||
ChangeMode: "restart",
|
||||
ChangeSignal: "foo",
|
||||
},
|
||||
},
|
||||
Expected: &TaskDiff{
|
||||
@@ -3142,6 +3174,18 @@ func TestTaskDiff(t *testing.T) {
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Vault",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "ChangeMode",
|
||||
Old: "signal",
|
||||
New: "restart",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "ChangeSignal",
|
||||
Old: "SIGUSR1",
|
||||
New: "foo",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Env",
|
||||
@@ -3174,18 +3218,22 @@ func TestTaskDiff(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
// LogConfig edited with context
|
||||
// Vault edited with context
|
||||
Contextual: true,
|
||||
Old: &Task{
|
||||
Vault: &Vault{
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: true,
|
||||
Policies: []string{"foo", "bar"},
|
||||
Env: true,
|
||||
ChangeMode: "signal",
|
||||
ChangeSignal: "SIGUSR1",
|
||||
},
|
||||
},
|
||||
New: &Task{
|
||||
Vault: &Vault{
|
||||
Policies: []string{"bar", "baz"},
|
||||
Env: true,
|
||||
Policies: []string{"bar", "baz"},
|
||||
Env: true,
|
||||
ChangeMode: "signal",
|
||||
ChangeSignal: "SIGUSR1",
|
||||
},
|
||||
},
|
||||
Expected: &TaskDiff{
|
||||
@@ -3195,6 +3243,18 @@ func TestTaskDiff(t *testing.T) {
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Vault",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "ChangeMode",
|
||||
Old: "signal",
|
||||
New: "signal",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "ChangeSignal",
|
||||
Old: "SIGUSR1",
|
||||
New: "SIGUSR1",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "Env",
|
||||
|
||||
@@ -2818,6 +2818,17 @@ func (d *EphemeralDisk) Copy() *EphemeralDisk {
|
||||
return ld
|
||||
}
|
||||
|
||||
const (
|
||||
// VaultChangeModeNoop takes no action when a new token is retrieved.
|
||||
VaultChangeModeNoop = "noop"
|
||||
|
||||
// VaultChangeModeSignal signals the task when a new token is retrieved.
|
||||
VaultChangeModeSignal = "signal"
|
||||
|
||||
// VaultChangeModeRestart restarts the task when a new token is retrieved.
|
||||
VaultChangeModeRestart = "restart"
|
||||
)
|
||||
|
||||
// Vault stores the set of premissions a task needs access to from Vault.
|
||||
type Vault struct {
|
||||
// Policies is the set of policies that the task needs access to
|
||||
@@ -2826,6 +2837,21 @@ type Vault struct {
|
||||
// Env marks whether the Vault Token should be exposed as an environment
|
||||
// variable
|
||||
Env bool
|
||||
|
||||
// ChangeMode is used to configure the task's behavior when the Vault
|
||||
// token changes because the original token could not be renewed in time.
|
||||
ChangeMode string `mapstructure:"change_mode"`
|
||||
|
||||
// ChangeSignal is the signal sent to the task when a new token is
|
||||
// retrieved. This is only valid when using the signal change mode.
|
||||
ChangeSignal string `mapstructure:"change_signal"`
|
||||
}
|
||||
|
||||
func DefaultVaultBlock() *Vault {
|
||||
return &Vault{
|
||||
Env: true,
|
||||
ChangeMode: VaultChangeModeRestart,
|
||||
}
|
||||
}
|
||||
|
||||
// Copy returns a copy of this Vault block.
|
||||
@@ -2849,6 +2875,16 @@ func (v *Vault) Validate() error {
|
||||
return fmt.Errorf("Policy list can not be empty")
|
||||
}
|
||||
|
||||
switch v.ChangeMode {
|
||||
case VaultChangeModeSignal:
|
||||
if v.ChangeSignal == "" {
|
||||
return fmt.Errorf("Signal must be specified when using change mode %q", VaultChangeModeSignal)
|
||||
}
|
||||
case VaultChangeModeNoop, VaultChangeModeRestart:
|
||||
default:
|
||||
return fmt.Errorf("Unknown change mode %q", v.ChangeMode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1307,3 +1307,21 @@ func TestAllocation_Terminated(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestVault_Validate(t *testing.T) {
|
||||
v := &Vault{
|
||||
Env: true,
|
||||
ChangeMode: VaultChangeModeNoop,
|
||||
}
|
||||
|
||||
if err := v.Validate(); err == nil || !strings.Contains(err.Error(), "Policy list") {
|
||||
t.Fatalf("Expected policy list empty error")
|
||||
}
|
||||
|
||||
v.Policies = []string{"foo"}
|
||||
v.ChangeMode = VaultChangeModeSignal
|
||||
|
||||
if err := v.Validate(); err == nil || !strings.Contains(err.Error(), "Signal must") {
|
||||
t.Fatalf("Expected signal empty error")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -367,6 +367,9 @@ func tasksUpdated(a, b *structs.TaskGroup) bool {
|
||||
if !reflect.DeepEqual(at.Vault, bt.Vault) {
|
||||
return true
|
||||
}
|
||||
if !reflect.DeepEqual(at.Templates, bt.Templates) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Inspect the network to see if the dynamic ports are different
|
||||
if len(at.Resources.Networks) != len(bt.Resources.Networks) {
|
||||
|
||||
92
vendor/github.com/hashicorp/consul-template/config/config.go
generated
vendored
92
vendor/github.com/hashicorp/consul-template/config/config.go
generated
vendored
@@ -139,22 +139,24 @@ func (c *Config) Copy() *Config {
|
||||
|
||||
if c.Vault.SSL != nil {
|
||||
config.Vault.SSL = &SSLConfig{
|
||||
Enabled: c.Vault.SSL.Enabled,
|
||||
Verify: c.Vault.SSL.Verify,
|
||||
Cert: c.Vault.SSL.Cert,
|
||||
Key: c.Vault.SSL.Key,
|
||||
CaCert: c.Vault.SSL.CaCert,
|
||||
Enabled: c.Vault.SSL.Enabled,
|
||||
Verify: c.Vault.SSL.Verify,
|
||||
Cert: c.Vault.SSL.Cert,
|
||||
Key: c.Vault.SSL.Key,
|
||||
CaCert: c.Vault.SSL.CaCert,
|
||||
ServerName: c.Vault.SSL.ServerName,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if c.SSL != nil {
|
||||
config.SSL = &SSLConfig{
|
||||
Enabled: c.SSL.Enabled,
|
||||
Verify: c.SSL.Verify,
|
||||
Cert: c.SSL.Cert,
|
||||
Key: c.SSL.Key,
|
||||
CaCert: c.SSL.CaCert,
|
||||
Enabled: c.SSL.Enabled,
|
||||
Verify: c.SSL.Verify,
|
||||
Cert: c.SSL.Cert,
|
||||
Key: c.SSL.Key,
|
||||
CaCert: c.SSL.CaCert,
|
||||
ServerName: c.SSL.ServerName,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -284,6 +286,9 @@ func (c *Config) Merge(config *Config) {
|
||||
if config.WasSet("vault.ssl.enabled") {
|
||||
c.Vault.SSL.Enabled = config.Vault.SSL.Enabled
|
||||
}
|
||||
if config.WasSet("vault.ssl.server_name") {
|
||||
c.Vault.SSL.ServerName = config.Vault.SSL.ServerName
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -327,6 +332,9 @@ func (c *Config) Merge(config *Config) {
|
||||
if config.WasSet("ssl.enabled") {
|
||||
c.SSL.Enabled = config.SSL.Enabled
|
||||
}
|
||||
if config.WasSet("ssl.server_name") {
|
||||
c.SSL.ServerName = config.SSL.ServerName
|
||||
}
|
||||
}
|
||||
|
||||
if config.WasSet("syslog") {
|
||||
@@ -447,27 +455,20 @@ func (c *Config) Set(key string) {
|
||||
}
|
||||
}
|
||||
|
||||
// ParseConfig reads the configuration file at the given path and returns a new
|
||||
// Config struct with the data populated.
|
||||
func ParseConfig(path string) (*Config, error) {
|
||||
// Parse parses the given string contents as a config
|
||||
func Parse(s string) (*Config, error) {
|
||||
var errs *multierror.Error
|
||||
|
||||
// Read the contents of the file
|
||||
contents, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading config at %q: %s", path, err)
|
||||
}
|
||||
|
||||
// Parse the file (could be HCL or JSON)
|
||||
var shadow interface{}
|
||||
if err := hcl.Decode(&shadow, string(contents)); err != nil {
|
||||
return nil, fmt.Errorf("error decoding config at %q: %s", path, err)
|
||||
if err := hcl.Decode(&shadow, s); err != nil {
|
||||
return nil, fmt.Errorf("error decoding config: %s", err)
|
||||
}
|
||||
|
||||
// Convert to a map and flatten the keys we want to flatten
|
||||
parsed, ok := shadow.(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("error converting config at %q", path)
|
||||
return nil, fmt.Errorf("error converting config")
|
||||
}
|
||||
flattenKeys(parsed, []string{
|
||||
"auth",
|
||||
@@ -514,9 +515,6 @@ func ParseConfig(path string) (*Config, error) {
|
||||
return nil, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
// Store a reference to the path where this config was read from
|
||||
config.Path = path
|
||||
|
||||
// Explicitly check for the nil signal and set the value back to nil
|
||||
if config.ReloadSignal == signals.SIGNIL {
|
||||
config.ReloadSignal = nil
|
||||
@@ -573,9 +571,30 @@ func ParseConfig(path string) (*Config, error) {
|
||||
return config, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
// ConfigFromPath iterates and merges all configuration files in a given
|
||||
// Must returns a config object that must compile. If there are any errors, this
|
||||
// function will panic. This is most useful in testing or constants.
|
||||
func Must(s string) *Config {
|
||||
c, err := Parse(s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// FromFile reads the configuration file at the given path and returns a new
|
||||
// Config struct with the data populated.
|
||||
func FromFile(path string) (*Config, error) {
|
||||
c, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading config at %q: %s", path, err)
|
||||
}
|
||||
|
||||
return Parse(string(c))
|
||||
}
|
||||
|
||||
// FromPath iterates and merges all configuration files in a given
|
||||
// directory, returning the resulting config.
|
||||
func ConfigFromPath(path string) (*Config, error) {
|
||||
func FromPath(path string) (*Config, error) {
|
||||
// Ensure the given filepath exists
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("config: missing file/folder: %s", path)
|
||||
@@ -611,7 +630,7 @@ func ConfigFromPath(path string) (*Config, error) {
|
||||
}
|
||||
|
||||
// Parse and merge the config
|
||||
newConfig, err := ParseConfig(path)
|
||||
newConfig, err := FromFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -626,7 +645,7 @@ func ConfigFromPath(path string) (*Config, error) {
|
||||
|
||||
return config, nil
|
||||
} else if stat.Mode().IsRegular() {
|
||||
return ParseConfig(path)
|
||||
return FromFile(path)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("config: unknown filetype: %q", stat.Mode().String())
|
||||
@@ -710,6 +729,10 @@ func DefaultConfig() *Config {
|
||||
config.Vault.SSL.Verify = false
|
||||
}
|
||||
|
||||
if v := os.Getenv("VAULT_TLS_SERVER_NAME"); v != "" {
|
||||
config.Vault.SSL.ServerName = v
|
||||
}
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
@@ -773,11 +796,12 @@ type DeduplicateConfig struct {
|
||||
|
||||
// SSLConfig is the configuration for SSL.
|
||||
type SSLConfig struct {
|
||||
Enabled bool `mapstructure:"enabled"`
|
||||
Verify bool `mapstructure:"verify"`
|
||||
Cert string `mapstructure:"cert"`
|
||||
Key string `mapstructure:"key"`
|
||||
CaCert string `mapstructure:"ca_cert"`
|
||||
Enabled bool `mapstructure:"enabled"`
|
||||
Verify bool `mapstructure:"verify"`
|
||||
Cert string `mapstructure:"cert"`
|
||||
Key string `mapstructure:"key"`
|
||||
CaCert string `mapstructure:"ca_cert"`
|
||||
ServerName string `mapstructure:"server_name"`
|
||||
}
|
||||
|
||||
// SyslogConfig is the configuration for syslog.
|
||||
|
||||
25
vendor/github.com/hashicorp/consul-template/config/config_testing.go
generated
vendored
25
vendor/github.com/hashicorp/consul-template/config/config_testing.go
generated
vendored
@@ -1,25 +0,0 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestConfig(contents string, t *testing.T) *Config {
|
||||
f, err := ioutil.TempFile(os.TempDir(), "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = f.Write([]byte(contents))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
config, err := ParseConfig(f.Name())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return config
|
||||
}
|
||||
27
vendor/github.com/hashicorp/consul-template/dependency/catalog_node.go
generated
vendored
27
vendor/github.com/hashicorp/consul-template/dependency/catalog_node.go
generated
vendored
@@ -25,11 +25,12 @@ type NodeDetail struct {
|
||||
|
||||
// NodeService is a service on a single node.
|
||||
type NodeService struct {
|
||||
ID string
|
||||
Service string
|
||||
Tags ServiceTags
|
||||
Port int
|
||||
Address string
|
||||
ID string
|
||||
Service string
|
||||
Tags ServiceTags
|
||||
Port int
|
||||
Address string
|
||||
EnableTagOverride bool
|
||||
}
|
||||
|
||||
// CatalogNode represents a single node from the Consul catalog.
|
||||
@@ -108,19 +109,21 @@ func (d *CatalogNode) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}
|
||||
services := make(NodeServiceList, 0, len(n.Services))
|
||||
for _, v := range n.Services {
|
||||
services = append(services, &NodeService{
|
||||
ID: v.ID,
|
||||
Service: v.Service,
|
||||
Tags: ServiceTags(deepCopyAndSortTags(v.Tags)),
|
||||
Port: v.Port,
|
||||
Address: v.Address,
|
||||
ID: v.ID,
|
||||
Service: v.Service,
|
||||
Tags: ServiceTags(deepCopyAndSortTags(v.Tags)),
|
||||
Port: v.Port,
|
||||
Address: v.Address,
|
||||
EnableTagOverride: v.EnableTagOverride,
|
||||
})
|
||||
}
|
||||
sort.Stable(services)
|
||||
|
||||
node := &NodeDetail{
|
||||
Node: &Node{
|
||||
Node: n.Node.Node,
|
||||
Address: n.Node.Address,
|
||||
Node: n.Node.Node,
|
||||
Address: n.Node.Address,
|
||||
TaggedAddresses: n.Node.TaggedAddresses,
|
||||
},
|
||||
Services: services,
|
||||
}
|
||||
|
||||
10
vendor/github.com/hashicorp/consul-template/dependency/catalog_nodes.go
generated
vendored
10
vendor/github.com/hashicorp/consul-template/dependency/catalog_nodes.go
generated
vendored
@@ -18,8 +18,9 @@ func init() {
|
||||
|
||||
// Node is a node entry in Consul
|
||||
type Node struct {
|
||||
Node string
|
||||
Address string
|
||||
Node string
|
||||
Address string
|
||||
TaggedAddresses map[string]string
|
||||
}
|
||||
|
||||
// CatalogNodes is the representation of all registered nodes in Consul.
|
||||
@@ -80,8 +81,9 @@ func (d *CatalogNodes) Fetch(clients *ClientSet, opts *QueryOptions) (interface{
|
||||
nodes := make([]*Node, 0, len(n))
|
||||
for _, node := range n {
|
||||
nodes = append(nodes, &Node{
|
||||
Node: node.Node,
|
||||
Address: node.Address,
|
||||
Node: node.Node,
|
||||
Address: node.Address,
|
||||
TaggedAddresses: node.TaggedAddresses,
|
||||
})
|
||||
}
|
||||
sort.Stable(NodeList(nodes))
|
||||
|
||||
12
vendor/github.com/hashicorp/consul-template/dependency/client_set.go
generated
vendored
12
vendor/github.com/hashicorp/consul-template/dependency/client_set.go
generated
vendored
@@ -47,6 +47,7 @@ type CreateConsulClientInput struct {
|
||||
SSLCert string
|
||||
SSLKey string
|
||||
SSLCACert string
|
||||
ServerName string
|
||||
}
|
||||
|
||||
// CreateVaultClientInput is used as input to the CreateVaultClient function.
|
||||
@@ -59,6 +60,7 @@ type CreateVaultClientInput struct {
|
||||
SSLCert string
|
||||
SSLKey string
|
||||
SSLCACert string
|
||||
ServerName string
|
||||
}
|
||||
|
||||
// NewClientSet creates a new client set that is ready to accept clients.
|
||||
@@ -135,6 +137,11 @@ func (c *ClientSet) CreateConsulClient(i *CreateConsulClientInput) error {
|
||||
tlsConfig.BuildNameToCertificate()
|
||||
|
||||
// SSL verification
|
||||
if i.ServerName != "" {
|
||||
tlsConfig.ServerName = i.ServerName
|
||||
tlsConfig.InsecureSkipVerify = false
|
||||
log.Printf("[DEBUG] (clients) using explicit consul TLS server host name: %s", tlsConfig.ServerName)
|
||||
}
|
||||
if !i.SSLVerify {
|
||||
log.Printf("[WARN] (clients) disabling consul SSL verification")
|
||||
tlsConfig.InsecureSkipVerify = true
|
||||
@@ -213,6 +220,11 @@ func (c *ClientSet) CreateVaultClient(i *CreateVaultClientInput) error {
|
||||
tlsConfig.BuildNameToCertificate()
|
||||
|
||||
// SSL verification
|
||||
if i.ServerName != "" {
|
||||
tlsConfig.ServerName = i.ServerName
|
||||
tlsConfig.InsecureSkipVerify = false
|
||||
log.Printf("[DEBUG] (clients) using explicit vault TLS server host name: %s", tlsConfig.ServerName)
|
||||
}
|
||||
if !i.SSLVerify {
|
||||
log.Printf("[WARN] (clients) disabling vault SSL verification")
|
||||
tlsConfig.InsecureSkipVerify = true
|
||||
|
||||
2
vendor/github.com/hashicorp/consul-template/dependency/file.go
generated
vendored
2
vendor/github.com/hashicorp/consul-template/dependency/file.go
generated
vendored
@@ -48,7 +48,7 @@ func (d *File) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *Resp
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("file: error watching: %s", err)
|
||||
return nil, nil, fmt.Errorf("file: error watching: %s", err)
|
||||
}
|
||||
|
||||
d.mutex.Lock()
|
||||
|
||||
88
vendor/github.com/hashicorp/consul-template/dependency/store_key.go
generated
vendored
88
vendor/github.com/hashicorp/consul-template/dependency/store_key.go
generated
vendored
@@ -18,13 +18,21 @@ type StoreKey struct {
|
||||
Path string
|
||||
DataCenter string
|
||||
|
||||
defaultValue string
|
||||
defaultGiven bool
|
||||
defaultValue string
|
||||
defaultGiven bool
|
||||
existenceCheck bool
|
||||
|
||||
stopped bool
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// kvGetResponse is a wrapper around the Consul API response.
|
||||
type kvGetResponse struct {
|
||||
pair *api.KVPair
|
||||
meta *api.QueryMeta
|
||||
err error
|
||||
}
|
||||
|
||||
// Fetch queries the Consul API defined by the given client and returns string
|
||||
// of the value to Path.
|
||||
func (d *StoreKey) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
|
||||
@@ -49,45 +57,55 @@ func (d *StoreKey) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *
|
||||
return nil, nil, fmt.Errorf("store key: error getting client: %s", err)
|
||||
}
|
||||
|
||||
var pair *api.KVPair
|
||||
var qm *api.QueryMeta
|
||||
dataCh := make(chan struct{})
|
||||
dataCh := make(chan *kvGetResponse, 1)
|
||||
|
||||
go func() {
|
||||
log.Printf("[DEBUG] (%s) querying consul with %+v", d.Display(), consulOpts)
|
||||
pair, qm, err = consul.KV().Get(d.Path, consulOpts)
|
||||
close(dataCh)
|
||||
pair, meta, err := consul.KV().Get(d.Path, consulOpts)
|
||||
resp := &kvGetResponse{pair: pair, meta: meta, err: err}
|
||||
|
||||
select {
|
||||
case dataCh <- resp:
|
||||
case <-d.stopCh:
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-d.stopCh:
|
||||
return nil, nil, ErrStopped
|
||||
case <-dataCh:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("store key: error fetching: %s", err)
|
||||
}
|
||||
|
||||
rm := &ResponseMetadata{
|
||||
LastIndex: qm.LastIndex,
|
||||
LastContact: qm.LastContact,
|
||||
}
|
||||
|
||||
if pair == nil {
|
||||
if d.defaultGiven {
|
||||
log.Printf("[DEBUG] (%s) Consul returned no data (using default of %q)",
|
||||
d.Display(), d.defaultValue)
|
||||
return d.defaultValue, rm, nil
|
||||
case resp := <-dataCh:
|
||||
if resp.err != nil {
|
||||
return "", nil, fmt.Errorf("store key: error fetching: %s", resp.err)
|
||||
}
|
||||
|
||||
log.Printf("[WARN] (%s) Consul returned no data (does the path exist?)",
|
||||
d.Display())
|
||||
return "", rm, nil
|
||||
rm := &ResponseMetadata{
|
||||
LastIndex: resp.meta.LastIndex,
|
||||
LastContact: resp.meta.LastContact,
|
||||
}
|
||||
|
||||
if d.existenceCheck {
|
||||
return (resp.pair != nil), rm, nil
|
||||
}
|
||||
|
||||
if resp.pair == nil {
|
||||
if d.defaultGiven {
|
||||
log.Printf("[DEBUG] (%s) Consul returned no data (using default of %q)",
|
||||
d.Display(), d.defaultValue)
|
||||
return d.defaultValue, rm, nil
|
||||
}
|
||||
return nil, rm, nil
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] (%s) Consul returned %s", d.Display(), resp.pair.Value)
|
||||
|
||||
return string(resp.pair.Value), rm, nil
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] (%s) Consul returned %s", d.Display(), pair.Value)
|
||||
|
||||
return string(pair.Value), rm, nil
|
||||
// SetExistenceCheck sets this keys as an existence check instead of a value
|
||||
// check.
|
||||
func (d *StoreKey) SetExistenceCheck(b bool) {
|
||||
d.existenceCheck = true
|
||||
}
|
||||
|
||||
// SetDefault is used to set the default value.
|
||||
@@ -103,17 +121,27 @@ func (d *StoreKey) CanShare() bool {
|
||||
|
||||
// HashCode returns a unique identifier.
|
||||
func (d *StoreKey) HashCode() string {
|
||||
if d.existenceCheck {
|
||||
return fmt.Sprintf("StoreKeyExists|%s", d.rawKey)
|
||||
}
|
||||
|
||||
if d.defaultGiven {
|
||||
return fmt.Sprintf("StoreKey|%s|%s", d.rawKey, d.defaultValue)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("StoreKey|%s", d.rawKey)
|
||||
}
|
||||
|
||||
// Display prints the human-friendly output.
|
||||
func (d *StoreKey) Display() string {
|
||||
if d.existenceCheck {
|
||||
return fmt.Sprintf(`"key_exists(%s)"`, d.rawKey)
|
||||
}
|
||||
|
||||
if d.defaultGiven {
|
||||
return fmt.Sprintf(`"key_or_default(%s, %q)"`, d.rawKey, d.defaultValue)
|
||||
}
|
||||
|
||||
return fmt.Sprintf(`"key(%s)"`, d.rawKey)
|
||||
}
|
||||
|
||||
|
||||
6
vendor/github.com/hashicorp/consul-template/manager/runner.go
generated
vendored
6
vendor/github.com/hashicorp/consul-template/manager/runner.go
generated
vendored
@@ -873,6 +873,10 @@ func (r *Runner) execute(command string, timeout time.Duration) error {
|
||||
customEnv["VAULT_CACERT"] = r.config.Vault.SSL.CaCert
|
||||
}
|
||||
|
||||
if r.config.Vault.SSL.ServerName != "" {
|
||||
customEnv["VAULT_TLS_SERVER_NAME"] = r.config.Vault.SSL.ServerName
|
||||
}
|
||||
|
||||
currentEnv := os.Environ()
|
||||
cmdEnv := make([]string, len(currentEnv), len(currentEnv)+len(customEnv))
|
||||
copy(cmdEnv, currentEnv)
|
||||
@@ -1162,6 +1166,7 @@ func newClientSet(config *config.Config) (*dep.ClientSet, error) {
|
||||
SSLCert: config.SSL.Cert,
|
||||
SSLKey: config.SSL.Key,
|
||||
SSLCACert: config.SSL.CaCert,
|
||||
ServerName: config.SSL.ServerName,
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("runner: %s", err)
|
||||
}
|
||||
@@ -1175,6 +1180,7 @@ func newClientSet(config *config.Config) (*dep.ClientSet, error) {
|
||||
SSLCert: config.Vault.SSL.Cert,
|
||||
SSLKey: config.Vault.SSL.Key,
|
||||
SSLCACert: config.Vault.SSL.CaCert,
|
||||
ServerName: config.Vault.SSL.ServerName,
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("runner: %s", err)
|
||||
}
|
||||
|
||||
2
vendor/github.com/hashicorp/consul-template/template/template.go
generated
vendored
2
vendor/github.com/hashicorp/consul-template/template/template.go
generated
vendored
@@ -125,6 +125,7 @@ func funcMap(brain *Brain, used, missing map[string]dep.Dependency) template.Fun
|
||||
"datacenters": datacentersFunc(brain, used, missing),
|
||||
"file": fileFunc(brain, used, missing),
|
||||
"key": keyFunc(brain, used, missing),
|
||||
"key_exists": keyExistsFunc(brain, used, missing),
|
||||
"key_or_default": keyWithDefaultFunc(brain, used, missing),
|
||||
"ls": lsFunc(brain, used, missing),
|
||||
"node": nodeFunc(brain, used, missing),
|
||||
@@ -160,6 +161,7 @@ func funcMap(brain *Brain, used, missing map[string]dep.Dependency) template.Fun
|
||||
"toJSON": toJSON,
|
||||
"toJSONPretty": toJSONPretty,
|
||||
"toTitle": toTitle,
|
||||
"toTOML": toTOML,
|
||||
"toUpper": toUpper,
|
||||
"toYAML": toYAML,
|
||||
"split": split,
|
||||
|
||||
26
vendor/github.com/hashicorp/consul-template/template/template_functions.go
generated
vendored
26
vendor/github.com/hashicorp/consul-template/template/template_functions.go
generated
vendored
@@ -102,6 +102,32 @@ func keyFunc(brain *Brain,
|
||||
}
|
||||
}
|
||||
|
||||
// keyExistsFunc returns true if a key exists, false otherwise.
|
||||
func keyExistsFunc(brain *Brain,
|
||||
used, missing map[string]dep.Dependency) func(string) (bool, error) {
|
||||
return func(s string) (bool, error) {
|
||||
if len(s) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
d, err := dep.ParseStoreKey(s)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
d.SetExistenceCheck(true)
|
||||
|
||||
addDependency(used, d)
|
||||
|
||||
if value, ok := brain.Recall(d); ok {
|
||||
return value.(bool), nil
|
||||
}
|
||||
|
||||
addDependency(missing, d)
|
||||
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// keyWithDefaultFunc returns or accumulates key dependencies that have a
|
||||
// default value.
|
||||
func keyWithDefaultFunc(brain *Brain,
|
||||
|
||||
7
vendor/github.com/hashicorp/consul-template/watch/view.go
generated
vendored
7
vendor/github.com/hashicorp/consul-template/watch/view.go
generated
vendored
@@ -197,6 +197,13 @@ func (v *View) fetch(doneCh chan<- struct{}, errCh chan<- error) {
|
||||
v.dataLock.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
if data == nil {
|
||||
log.Printf("[DEBUG](view) %s data was not present", v.display())
|
||||
v.dataLock.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
v.data = data
|
||||
v.receivedData = true
|
||||
v.dataLock.Unlock()
|
||||
|
||||
38
vendor/vendor.json
vendored
38
vendor/vendor.json
vendored
@@ -497,44 +497,44 @@
|
||||
{
|
||||
"checksumSHA1": "+JUQvWp1JUVeRT5weWL9hi6Fu4Y=",
|
||||
"path": "github.com/hashicorp/consul-template/child",
|
||||
"revision": "a8f654d612969519c9fde20bc8eb21418d763f73",
|
||||
"revisionTime": "2016-10-03T19:46:06Z"
|
||||
"revision": "19e08f28ce262c29d5c0ce75bbd94383e174f435",
|
||||
"revisionTime": "2016-10-11T19:01:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "tSuVPDoqSzoWmo2oEF5NGkIJHxQ=",
|
||||
"checksumSHA1": "4UjfRv2xqB5mtlpkqFU4Dt9X6eg=",
|
||||
"path": "github.com/hashicorp/consul-template/config",
|
||||
"revision": "a8f654d612969519c9fde20bc8eb21418d763f73",
|
||||
"revisionTime": "2016-10-03T19:46:06Z"
|
||||
"revision": "19e08f28ce262c29d5c0ce75bbd94383e174f435",
|
||||
"revisionTime": "2016-10-11T19:01:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "3xeTTZejqagwfwaYT8M3rq1Ixko=",
|
||||
"checksumSHA1": "DTkCivsayT2xN/23RxkduPhTts8=",
|
||||
"path": "github.com/hashicorp/consul-template/dependency",
|
||||
"revision": "a8f654d612969519c9fde20bc8eb21418d763f73",
|
||||
"revisionTime": "2016-10-03T19:46:06Z"
|
||||
"revision": "19e08f28ce262c29d5c0ce75bbd94383e174f435",
|
||||
"revisionTime": "2016-10-11T19:01:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "Z1QKRRJ/6/FjRIw1LJXTtOUxxX8=",
|
||||
"checksumSHA1": "G8tX+yjynwZYZOiUFvkrojdXGpg=",
|
||||
"path": "github.com/hashicorp/consul-template/manager",
|
||||
"revision": "a8f654d612969519c9fde20bc8eb21418d763f73",
|
||||
"revisionTime": "2016-10-03T19:46:06Z"
|
||||
"revision": "19e08f28ce262c29d5c0ce75bbd94383e174f435",
|
||||
"revisionTime": "2016-10-11T19:01:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "ByMIKPf7bXpyhhy80IjKLKYrjpo=",
|
||||
"path": "github.com/hashicorp/consul-template/signals",
|
||||
"revision": "a8f654d612969519c9fde20bc8eb21418d763f73",
|
||||
"revisionTime": "2016-10-03T19:46:06Z"
|
||||
"revision": "19e08f28ce262c29d5c0ce75bbd94383e174f435",
|
||||
"revisionTime": "2016-10-11T19:01:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "pvmWk53vtXCCL6p+c3XU2aQfkx4=",
|
||||
"checksumSHA1": "bkSJRnR2VyZA1KoyOF/eSkxVVFg=",
|
||||
"path": "github.com/hashicorp/consul-template/template",
|
||||
"revision": "a8f654d612969519c9fde20bc8eb21418d763f73",
|
||||
"revisionTime": "2016-10-03T19:46:06Z"
|
||||
"revision": "19e08f28ce262c29d5c0ce75bbd94383e174f435",
|
||||
"revisionTime": "2016-10-11T19:01:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "sR2e74n4zQEUDR6ssjvMr9lP7sI=",
|
||||
"checksumSHA1": "HfWf4Vf1fBJh5HgHLdjpF5vs0Lk=",
|
||||
"path": "github.com/hashicorp/consul-template/watch",
|
||||
"revision": "a8f654d612969519c9fde20bc8eb21418d763f73",
|
||||
"revisionTime": "2016-10-03T19:46:06Z"
|
||||
"revision": "19e08f28ce262c29d5c0ce75bbd94383e174f435",
|
||||
"revisionTime": "2016-10-11T19:01:37Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "kWbL0V4o8vJL75mzeQzhF6p5jiQ=",
|
||||
|
||||
Reference in New Issue
Block a user