mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 09:25:46 +03:00
Merge pull request #1887 from hashicorp/b-fixes
Fixes to Consul-Template when file exists + other small stuff
This commit is contained in:
@@ -85,6 +85,7 @@ type Allocation struct {
|
||||
ClientStatus string
|
||||
ClientDescription string
|
||||
TaskStates map[string]*TaskState
|
||||
PreviousAllocation string
|
||||
CreateIndex uint64
|
||||
ModifyIndex uint64
|
||||
CreateTime int64
|
||||
|
||||
@@ -49,9 +49,6 @@ type TaskTemplateManager struct {
|
||||
// lookup allows looking up the set of Nomad templates by their consul-template ID
|
||||
lookup map[string][]*structs.Template
|
||||
|
||||
// allRendered marks whether all the templates have been rendered
|
||||
allRendered bool
|
||||
|
||||
// hooks is used to signal/restart the task as templates are rendered
|
||||
hook TaskHooks
|
||||
|
||||
@@ -71,7 +68,7 @@ type TaskTemplateManager struct {
|
||||
}
|
||||
|
||||
func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
|
||||
allRendered bool, config *config.Config, vaultToken, taskDir string,
|
||||
config *config.Config, vaultToken, taskDir string,
|
||||
taskEnv *env.TaskEnvironment) (*TaskTemplateManager, error) {
|
||||
|
||||
// Check pre-conditions
|
||||
@@ -86,10 +83,9 @@ func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
|
||||
}
|
||||
|
||||
tm := &TaskTemplateManager{
|
||||
templates: tmpls,
|
||||
allRendered: allRendered,
|
||||
hook: hook,
|
||||
shutdownCh: make(chan struct{}),
|
||||
templates: tmpls,
|
||||
hook: hook,
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Parse the signals that we need
|
||||
@@ -145,10 +141,7 @@ func (tm *TaskTemplateManager) run() {
|
||||
// Runner is nil if there is no templates
|
||||
if tm.runner == nil {
|
||||
// Unblock the start if there is nothing to do
|
||||
if !tm.allRendered {
|
||||
tm.hook.UnblockStart("consul-template")
|
||||
}
|
||||
|
||||
tm.hook.UnblockStart("consul-template")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -160,47 +153,41 @@ func (tm *TaskTemplateManager) run() {
|
||||
var allRenderedTime time.Time
|
||||
|
||||
// Handle the first rendering
|
||||
if !tm.allRendered {
|
||||
// Wait till all the templates have been rendered
|
||||
WAIT:
|
||||
for {
|
||||
select {
|
||||
case <-tm.shutdownCh:
|
||||
return
|
||||
case err, ok := <-tm.runner.ErrCh:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
tm.hook.Kill("consul-template", err.Error(), true)
|
||||
case <-tm.runner.TemplateRenderedCh():
|
||||
// A template has been rendered, figure out what to do
|
||||
events := tm.runner.RenderEvents()
|
||||
|
||||
// Not all templates have been rendered yet
|
||||
if len(events) < len(tm.lookup) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
// This template hasn't been rendered
|
||||
if event.LastDidRender.IsZero() {
|
||||
continue WAIT
|
||||
}
|
||||
}
|
||||
|
||||
break WAIT
|
||||
// Wait till all the templates have been rendered
|
||||
WAIT:
|
||||
for {
|
||||
select {
|
||||
case <-tm.shutdownCh:
|
||||
return
|
||||
case err, ok := <-tm.runner.ErrCh:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO Thinking, I believe we could check every 30 seconds and if
|
||||
// they are all would be rendered we should start anyways. That is
|
||||
// the reattach mechanism when they have all been rendered
|
||||
}
|
||||
tm.hook.Kill("consul-template", err.Error(), true)
|
||||
case <-tm.runner.TemplateRenderedCh():
|
||||
// A template has been rendered, figure out what to do
|
||||
events := tm.runner.RenderEvents()
|
||||
|
||||
allRenderedTime = time.Now()
|
||||
tm.hook.UnblockStart("consul-template")
|
||||
// Not all templates have been rendered yet
|
||||
if len(events) < len(tm.lookup) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
// This template hasn't been rendered
|
||||
if event.LastWouldRender.IsZero() {
|
||||
continue WAIT
|
||||
}
|
||||
}
|
||||
|
||||
break WAIT
|
||||
}
|
||||
}
|
||||
|
||||
allRenderedTime = time.Now()
|
||||
tm.hook.UnblockStart("consul-template")
|
||||
|
||||
// If all our templates are change mode no-op, then we can exit here
|
||||
if tm.allTemplatesNoop() {
|
||||
return
|
||||
@@ -227,17 +214,17 @@ func (tm *TaskTemplateManager) run() {
|
||||
restart := false
|
||||
var splay time.Duration
|
||||
|
||||
now := time.Now()
|
||||
for id, event := range tm.runner.RenderEvents() {
|
||||
events := tm.runner.RenderEvents()
|
||||
for id, event := range events {
|
||||
|
||||
// First time through
|
||||
if allRenderedTime.After(event.LastDidRender) {
|
||||
handledRenders[id] = now
|
||||
if allRenderedTime.After(event.LastDidRender) || allRenderedTime.Equal(event.LastDidRender) {
|
||||
handledRenders[id] = allRenderedTime
|
||||
continue
|
||||
}
|
||||
|
||||
// We have already handled this one
|
||||
if htime := handledRenders[id]; htime.After(event.LastDidRender) {
|
||||
if htime := handledRenders[id]; htime.After(event.LastDidRender) || htime.Equal(event.LastDidRender) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -276,9 +263,8 @@ func (tm *TaskTemplateManager) run() {
|
||||
}
|
||||
|
||||
// Update handle time
|
||||
now = time.Now()
|
||||
for _, id := range handling {
|
||||
handledRenders[id] = now
|
||||
handledRenders[id] = events[id].LastDidRender
|
||||
}
|
||||
|
||||
if restart {
|
||||
|
||||
@@ -95,7 +95,7 @@ type testHarness struct {
|
||||
|
||||
// newTestHarness returns a harness starting a dev consul and vault server,
|
||||
// building the appropriate config and creating a TaskTemplateManager
|
||||
func newTestHarness(t *testing.T, templates []*structs.Template, allRendered, consul, vault bool) *testHarness {
|
||||
func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault bool) *testHarness {
|
||||
harness := &testHarness{
|
||||
mockHooks: NewMockTaskHooks(),
|
||||
templates: templates,
|
||||
@@ -126,14 +126,17 @@ func newTestHarness(t *testing.T, templates []*structs.Template, allRendered, co
|
||||
harness.vaultToken = harness.vault.RootToken
|
||||
}
|
||||
|
||||
manager, err := NewTaskTemplateManager(harness.mockHooks, templates, allRendered,
|
||||
harness.config, harness.vaultToken, harness.taskDir, harness.taskEnv)
|
||||
return harness
|
||||
}
|
||||
|
||||
func (h *testHarness) start(t *testing.T) {
|
||||
manager, err := NewTaskTemplateManager(h.mockHooks, h.templates,
|
||||
h.config, h.vaultToken, h.taskDir, h.taskEnv)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to build task template manager: %v", err)
|
||||
}
|
||||
|
||||
harness.manager = manager
|
||||
return harness
|
||||
h.manager = manager
|
||||
}
|
||||
|
||||
// stop is used to stop any running Vault or Consul server plus the task manager
|
||||
@@ -160,32 +163,32 @@ func TestTaskTemplateManager_Invalid(t *testing.T) {
|
||||
vaultToken := ""
|
||||
taskEnv := env.NewTaskEnvironment(mock.Node())
|
||||
|
||||
_, err := NewTaskTemplateManager(nil, nil, false, nil, "", "", nil)
|
||||
_, err := NewTaskTemplateManager(nil, nil, nil, "", "", nil)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected error")
|
||||
}
|
||||
|
||||
_, err = NewTaskTemplateManager(nil, tmpls, false, config, vaultToken, taskDir, taskEnv)
|
||||
_, err = NewTaskTemplateManager(nil, tmpls, config, vaultToken, taskDir, taskEnv)
|
||||
if err == nil || !strings.Contains(err.Error(), "task hook") {
|
||||
t.Fatalf("Expected invalid task hook error: %v", err)
|
||||
}
|
||||
|
||||
_, err = NewTaskTemplateManager(hooks, tmpls, false, nil, vaultToken, taskDir, taskEnv)
|
||||
_, err = NewTaskTemplateManager(hooks, tmpls, nil, vaultToken, taskDir, taskEnv)
|
||||
if err == nil || !strings.Contains(err.Error(), "config") {
|
||||
t.Fatalf("Expected invalid config error: %v", err)
|
||||
}
|
||||
|
||||
_, err = NewTaskTemplateManager(hooks, tmpls, false, config, vaultToken, "", taskEnv)
|
||||
_, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, "", taskEnv)
|
||||
if err == nil || !strings.Contains(err.Error(), "task directory") {
|
||||
t.Fatalf("Expected invalid task dir error: %v", err)
|
||||
}
|
||||
|
||||
_, err = NewTaskTemplateManager(hooks, tmpls, false, config, vaultToken, taskDir, nil)
|
||||
_, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, nil)
|
||||
if err == nil || !strings.Contains(err.Error(), "task environment") {
|
||||
t.Fatalf("Expected invalid task environment error: %v", err)
|
||||
}
|
||||
|
||||
tm, err := NewTaskTemplateManager(hooks, tmpls, false, config, vaultToken, taskDir, taskEnv)
|
||||
tm, err := NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, taskEnv)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
} else if tm == nil {
|
||||
@@ -201,7 +204,7 @@ func TestTaskTemplateManager_Invalid(t *testing.T) {
|
||||
}
|
||||
|
||||
tmpls = append(tmpls, tmpl)
|
||||
tm, err = NewTaskTemplateManager(hooks, tmpls, false, config, vaultToken, taskDir, taskEnv)
|
||||
tm, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, taskEnv)
|
||||
if err == nil || !strings.Contains(err.Error(), "Failed to parse signal") {
|
||||
t.Fatalf("Expected signal parsing error: %v", err)
|
||||
}
|
||||
@@ -217,7 +220,8 @@ func TestTaskTemplateManager_Unblock_Static(t *testing.T) {
|
||||
ChangeMode: structs.TemplateChangeModeNoop,
|
||||
}
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template}, false, false, false)
|
||||
harness := newTestHarness(t, []*structs.Template{template}, false, false)
|
||||
harness.start(t)
|
||||
defer harness.stop()
|
||||
|
||||
// Wait for the unblock
|
||||
@@ -239,6 +243,46 @@ func TestTaskTemplateManager_Unblock_Static(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskTemplateManager_Unblock_Static_AlreadyRendered(t *testing.T) {
|
||||
// Make a template that will render immediately
|
||||
content := "hello, world!"
|
||||
file := "my.tmpl"
|
||||
template := &structs.Template{
|
||||
EmbeddedTmpl: content,
|
||||
DestPath: file,
|
||||
ChangeMode: structs.TemplateChangeModeNoop,
|
||||
}
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template}, false, false)
|
||||
|
||||
// Write the contents
|
||||
path := filepath.Join(harness.taskDir, file)
|
||||
if err := ioutil.WriteFile(path, []byte(content), 0777); err != nil {
|
||||
t.Fatalf("Failed to write data: %v", err)
|
||||
}
|
||||
|
||||
harness.start(t)
|
||||
defer harness.stop()
|
||||
|
||||
// Wait for the unblock
|
||||
select {
|
||||
case <-harness.mockHooks.UnblockCh:
|
||||
case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second):
|
||||
t.Fatalf("Task unblock should have been called")
|
||||
}
|
||||
|
||||
// Check the file is there
|
||||
path = filepath.Join(harness.taskDir, file)
|
||||
raw, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read rendered template from %q: %v", path, err)
|
||||
}
|
||||
|
||||
if s := string(raw); s != content {
|
||||
t.Fatalf("Unexpected template data; got %q, want %q", s, content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskTemplateManager_Unblock_Consul(t *testing.T) {
|
||||
// Make a template that will render based on a key in Consul
|
||||
key := "foo"
|
||||
@@ -254,7 +298,8 @@ func TestTaskTemplateManager_Unblock_Consul(t *testing.T) {
|
||||
// Drop the retry rate
|
||||
testRetryRate = 10 * time.Millisecond
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template}, false, true, false)
|
||||
harness := newTestHarness(t, []*structs.Template{template}, true, false)
|
||||
harness.start(t)
|
||||
defer harness.stop()
|
||||
|
||||
// Ensure no unblock
|
||||
@@ -302,7 +347,8 @@ func TestTaskTemplateManager_Unblock_Vault(t *testing.T) {
|
||||
// Drop the retry rate
|
||||
testRetryRate = 10 * time.Millisecond
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template}, false, false, true)
|
||||
harness := newTestHarness(t, []*structs.Template{template}, false, true)
|
||||
harness.start(t)
|
||||
defer harness.stop()
|
||||
|
||||
// Ensure no unblock
|
||||
@@ -359,7 +405,8 @@ func TestTaskTemplateManager_Unblock_Multi_Template(t *testing.T) {
|
||||
// Drop the retry rate
|
||||
testRetryRate = 10 * time.Millisecond
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template, template2}, false, true, false)
|
||||
harness := newTestHarness(t, []*structs.Template{template, template2}, true, false)
|
||||
harness.start(t)
|
||||
defer harness.stop()
|
||||
|
||||
// Ensure no unblock
|
||||
@@ -418,7 +465,8 @@ func TestTaskTemplateManager_Rerender_Noop(t *testing.T) {
|
||||
// Drop the retry rate
|
||||
testRetryRate = 10 * time.Millisecond
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template}, false, true, false)
|
||||
harness := newTestHarness(t, []*structs.Template{template}, true, false)
|
||||
harness.start(t)
|
||||
defer harness.stop()
|
||||
|
||||
// Ensure no unblock
|
||||
@@ -502,7 +550,8 @@ func TestTaskTemplateManager_Rerender_Signal(t *testing.T) {
|
||||
// Drop the retry rate
|
||||
testRetryRate = 10 * time.Millisecond
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template, template2}, false, true, false)
|
||||
harness := newTestHarness(t, []*structs.Template{template, template2}, true, false)
|
||||
harness.start(t)
|
||||
defer harness.stop()
|
||||
|
||||
// Ensure no unblock
|
||||
@@ -523,6 +572,10 @@ func TestTaskTemplateManager_Rerender_Signal(t *testing.T) {
|
||||
t.Fatalf("Task unblock should have been called")
|
||||
}
|
||||
|
||||
if len(harness.mockHooks.Signals) != 0 {
|
||||
t.Fatalf("Should not have received any signals: %+v", harness.mockHooks)
|
||||
}
|
||||
|
||||
// Update the keys in Consul
|
||||
harness.consul.SetKV(key1, []byte(content1_2))
|
||||
harness.consul.SetKV(key2, []byte(content2_2))
|
||||
@@ -582,7 +635,8 @@ func TestTaskTemplateManager_Rerender_Restart(t *testing.T) {
|
||||
// Drop the retry rate
|
||||
testRetryRate = 10 * time.Millisecond
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template}, false, true, false)
|
||||
harness := newTestHarness(t, []*structs.Template{template}, true, false)
|
||||
harness.start(t)
|
||||
defer harness.stop()
|
||||
|
||||
// Ensure no unblock
|
||||
@@ -641,7 +695,8 @@ func TestTaskTemplateManager_Interpolate_Destination(t *testing.T) {
|
||||
ChangeMode: structs.TemplateChangeModeNoop,
|
||||
}
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template}, false, false, false)
|
||||
harness := newTestHarness(t, []*structs.Template{template}, false, false)
|
||||
harness.start(t)
|
||||
defer harness.stop()
|
||||
|
||||
// Ensure unblock
|
||||
@@ -664,54 +719,11 @@ func TestTaskTemplateManager_Interpolate_Destination(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskTemplateManager_AllRendered_Signal(t *testing.T) {
|
||||
// Make a template that renders based on a key in Consul and sends SIGALRM
|
||||
key1 := "foo"
|
||||
content1_1 := "bar"
|
||||
embedded1 := fmt.Sprintf(`{{key "%s"}}`, key1)
|
||||
file1 := "my.tmpl"
|
||||
template := &structs.Template{
|
||||
EmbeddedTmpl: embedded1,
|
||||
DestPath: file1,
|
||||
ChangeMode: structs.TemplateChangeModeSignal,
|
||||
ChangeSignal: "SIGALRM",
|
||||
}
|
||||
|
||||
// Drop the retry rate
|
||||
testRetryRate = 10 * time.Millisecond
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template}, true, true, false)
|
||||
defer harness.stop()
|
||||
|
||||
// Write the key to Consul
|
||||
harness.consul.SetKV(key1, []byte(content1_1))
|
||||
|
||||
// Wait for restart
|
||||
select {
|
||||
case <-harness.mockHooks.RestartCh:
|
||||
t.Fatalf("Restart with signal policy: %+v", harness.mockHooks)
|
||||
case <-harness.mockHooks.SignalCh:
|
||||
break
|
||||
case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second):
|
||||
t.Fatalf("Should have received a signals: %+v", harness.mockHooks)
|
||||
}
|
||||
|
||||
// Check the files have been updated
|
||||
path := filepath.Join(harness.taskDir, file1)
|
||||
raw, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read rendered template from %q: %v", path, err)
|
||||
}
|
||||
|
||||
if s := string(raw); s != content1_1 {
|
||||
t.Fatalf("Unexpected template data; got %q, want %q", s, content1_1)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskTemplateManager_Signal_Error(t *testing.T) {
|
||||
// Make a template that renders based on a key in Consul and sends SIGALRM
|
||||
key1 := "foo"
|
||||
content1_1 := "bar"
|
||||
content1 := "bar"
|
||||
content2 := "baz"
|
||||
embedded1 := fmt.Sprintf(`{{key "%s"}}`, key1)
|
||||
file1 := "my.tmpl"
|
||||
template := &structs.Template{
|
||||
@@ -724,13 +736,24 @@ func TestTaskTemplateManager_Signal_Error(t *testing.T) {
|
||||
// Drop the retry rate
|
||||
testRetryRate = 10 * time.Millisecond
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template}, true, true, false)
|
||||
harness := newTestHarness(t, []*structs.Template{template}, true, false)
|
||||
harness.start(t)
|
||||
defer harness.stop()
|
||||
|
||||
harness.mockHooks.SignalError = fmt.Errorf("test error")
|
||||
|
||||
// Write the key to Consul
|
||||
harness.consul.SetKV(key1, []byte(content1_1))
|
||||
harness.consul.SetKV(key1, []byte(content1))
|
||||
|
||||
// Wait a little
|
||||
select {
|
||||
case <-harness.mockHooks.UnblockCh:
|
||||
case <-time.After(time.Duration(2*testutil.TestMultiplier()) * time.Second):
|
||||
t.Fatalf("Should have received unblock: %+v", harness.mockHooks)
|
||||
}
|
||||
|
||||
// Write the key to Consul
|
||||
harness.consul.SetKV(key1, []byte(content2))
|
||||
|
||||
// Wait for kill channel
|
||||
select {
|
||||
|
||||
@@ -97,9 +97,6 @@ type TaskRunner struct {
|
||||
// templateManager is used to manage any consul-templates this task may have
|
||||
templateManager *TaskTemplateManager
|
||||
|
||||
// templatesRendered mark whether the templates have been rendered
|
||||
templatesRendered bool
|
||||
|
||||
// startCh is used to trigger the start of the task
|
||||
startCh chan struct{}
|
||||
|
||||
@@ -132,7 +129,6 @@ type taskRunnerState struct {
|
||||
Task *structs.Task
|
||||
HandleID string
|
||||
ArtifactDownloaded bool
|
||||
TemplatesRendered bool
|
||||
}
|
||||
|
||||
// TaskStateUpdater is used to signal that tasks state has changed.
|
||||
@@ -235,7 +231,6 @@ func (r *TaskRunner) RestoreState() error {
|
||||
r.task = snap.Task
|
||||
}
|
||||
r.artifactsDownloaded = snap.ArtifactDownloaded
|
||||
r.templatesRendered = snap.TemplatesRendered
|
||||
|
||||
if err := r.setTaskEnv(); err != nil {
|
||||
return fmt.Errorf("client: failed to create task environment for task %q in allocation %q: %v",
|
||||
@@ -298,7 +293,6 @@ func (r *TaskRunner) SaveState() error {
|
||||
Task: r.task,
|
||||
Version: r.config.Version,
|
||||
ArtifactDownloaded: r.artifactsDownloaded,
|
||||
TemplatesRendered: r.templatesRendered,
|
||||
}
|
||||
r.handleLock.Lock()
|
||||
if r.handle != nil {
|
||||
@@ -674,7 +668,7 @@ func (r *TaskRunner) updatedTokenHandler() {
|
||||
|
||||
// Create a new templateManager
|
||||
var err error
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, r.templatesRendered,
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates,
|
||||
r.config, r.vaultFuture.Get(), r.taskDir, r.getTaskEnv())
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
@@ -712,7 +706,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
// Build the template manager
|
||||
if r.templateManager == nil {
|
||||
var err error
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, r.templatesRendered,
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates,
|
||||
r.config, r.vaultFuture.Get(), r.taskDir, r.getTaskEnv())
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
@@ -741,7 +735,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
}
|
||||
|
||||
// We don't have to wait for any template
|
||||
if len(r.task.Templates) == 0 || r.templatesRendered {
|
||||
if len(r.task.Templates) == 0 {
|
||||
// Send the start signal
|
||||
select {
|
||||
case r.startCh <- struct{}{}:
|
||||
@@ -757,8 +751,6 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
// perioidcally enumerate what we are still blocked on
|
||||
select {
|
||||
case <-r.unblockCh:
|
||||
r.templatesRendered = true
|
||||
|
||||
// Send the start signal
|
||||
select {
|
||||
case r.startCh <- struct{}{}:
|
||||
|
||||
@@ -940,26 +940,35 @@ func (b *batchFuture) Respond(index uint64, err error) {
|
||||
// tasks
|
||||
func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
|
||||
reply *structs.DeriveVaultTokenResponse) error {
|
||||
|
||||
// setErr is a helper for setting the recoverable error on the reply and
|
||||
// logging it
|
||||
setErr := func(e error, recoverable bool) {
|
||||
reply.Error = structs.NewRecoverableError(e, recoverable)
|
||||
n.srv.logger.Printf("[ERR] nomad.client: DeriveVaultToken failed (recoverable %v): %v", recoverable, e)
|
||||
}
|
||||
|
||||
if done, err := n.srv.forward("Node.DeriveVaultToken", args, args, reply); done {
|
||||
reply.Error = structs.NewRecoverableError(err, err == structs.ErrNoLeader)
|
||||
setErr(err, err == structs.ErrNoLeader)
|
||||
return nil
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "client", "derive_vault_token"}, time.Now())
|
||||
|
||||
// Verify the arguments
|
||||
if args.NodeID == "" {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("missing node ID"), false)
|
||||
setErr(fmt.Errorf("missing node ID"), false)
|
||||
return nil
|
||||
}
|
||||
if args.SecretID == "" {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("missing node SecretID"), false)
|
||||
setErr(fmt.Errorf("missing node SecretID"), false)
|
||||
return nil
|
||||
}
|
||||
if args.AllocID == "" {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("missing allocation ID"), false)
|
||||
setErr(fmt.Errorf("missing allocation ID"), false)
|
||||
return nil
|
||||
}
|
||||
if len(args.Tasks) == 0 {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("no tasks specified"), false)
|
||||
setErr(fmt.Errorf("no tasks specified"), false)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -970,50 +979,50 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
|
||||
// tokens
|
||||
snap, err := n.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
reply.Error = structs.NewRecoverableError(err, false)
|
||||
setErr(err, false)
|
||||
return nil
|
||||
}
|
||||
node, err := snap.NodeByID(args.NodeID)
|
||||
if err != nil {
|
||||
reply.Error = structs.NewRecoverableError(err, false)
|
||||
setErr(err, false)
|
||||
return nil
|
||||
}
|
||||
if node == nil {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("Node %q does not exist", args.NodeID), false)
|
||||
setErr(fmt.Errorf("Node %q does not exist", args.NodeID), false)
|
||||
return nil
|
||||
}
|
||||
if node.SecretID != args.SecretID {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("SecretID mismatch"), false)
|
||||
setErr(fmt.Errorf("SecretID mismatch"), false)
|
||||
return nil
|
||||
}
|
||||
|
||||
alloc, err := snap.AllocByID(args.AllocID)
|
||||
if err != nil {
|
||||
reply.Error = structs.NewRecoverableError(err, false)
|
||||
setErr(err, false)
|
||||
return nil
|
||||
}
|
||||
if alloc == nil {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("Allocation %q does not exist", args.AllocID), false)
|
||||
setErr(fmt.Errorf("Allocation %q does not exist", args.AllocID), false)
|
||||
return nil
|
||||
}
|
||||
if alloc.NodeID != args.NodeID {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("Allocation %q not running on Node %q", args.AllocID, args.NodeID), false)
|
||||
setErr(fmt.Errorf("Allocation %q not running on Node %q", args.AllocID, args.NodeID), false)
|
||||
return nil
|
||||
}
|
||||
if alloc.TerminalStatus() {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("Can't request Vault token for terminal allocation"), false)
|
||||
setErr(fmt.Errorf("Can't request Vault token for terminal allocation"), false)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check the policies
|
||||
policies := alloc.Job.VaultPolicies()
|
||||
if policies == nil {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("Job doesn't require Vault policies"), false)
|
||||
setErr(fmt.Errorf("Job doesn't require Vault policies"), false)
|
||||
return nil
|
||||
}
|
||||
tg, ok := policies[alloc.TaskGroup]
|
||||
if !ok {
|
||||
reply.Error = structs.NewRecoverableError(fmt.Errorf("Task group does not require Vault policies"), false)
|
||||
setErr(fmt.Errorf("Task group does not require Vault policies"), false)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1028,7 +1037,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
|
||||
if len(unneeded) != 0 {
|
||||
e := fmt.Errorf("Requested Vault tokens for tasks without defined Vault policies: %s",
|
||||
strings.Join(unneeded, ", "))
|
||||
reply.Error = structs.NewRecoverableError(e, false)
|
||||
setErr(e, false)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1116,6 +1125,8 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
|
||||
|
||||
// If there was an error revoke the created tokens
|
||||
if createErr != nil {
|
||||
n.srv.logger.Printf("[ERR] nomad.node: Vault token creation failed: %v", createErr)
|
||||
|
||||
if revokeErr := n.srv.vault.RevokeTokens(context.Background(), accessors, false); revokeErr != nil {
|
||||
n.srv.logger.Printf("[ERR] nomad.node: Vault token revocation failed: %v", revokeErr)
|
||||
}
|
||||
@@ -1142,7 +1153,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
|
||||
retry = true
|
||||
}
|
||||
|
||||
reply.Error = structs.NewRecoverableError(err, retry)
|
||||
setErr(err, retry)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -204,7 +204,11 @@ func (v *vaultClient) Stop() {
|
||||
// creation/lookup/revocation operation are allowed. All queued revocations are
|
||||
// cancelled if set un-active as it is assumed another instances is taking over
|
||||
func (v *vaultClient) SetActive(active bool) {
|
||||
atomic.StoreInt32(&v.active, 1)
|
||||
if active {
|
||||
atomic.StoreInt32(&v.active, 1)
|
||||
} else {
|
||||
atomic.StoreInt32(&v.active, 0)
|
||||
}
|
||||
|
||||
// Clear out the revoking tokens
|
||||
v.revLock.Lock()
|
||||
@@ -839,7 +843,7 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V
|
||||
}
|
||||
|
||||
if err := v.auth.RevokeAccessor(va.Accessor); err != nil {
|
||||
return fmt.Errorf("failed to revoke token (alloc: %q, node: %q, task: %q)", va.AllocID, va.NodeID, va.Task)
|
||||
return fmt.Errorf("failed to revoke token (alloc: %q, node: %q, task: %q): %v", va.AllocID, va.NodeID, va.Task, err)
|
||||
}
|
||||
case <-pCtx.Done():
|
||||
return nil
|
||||
|
||||
7
vendor/github.com/hashicorp/consul-template/manager/runner.go
generated
vendored
7
vendor/github.com/hashicorp/consul-template/manager/runner.go
generated
vendored
@@ -427,7 +427,7 @@ func (r *Runner) Signal(s os.Signal) error {
|
||||
func (r *Runner) Run() error {
|
||||
log.Printf("[INFO] (runner) running")
|
||||
|
||||
var renderedAny bool
|
||||
var wouldRenderAny, renderedAny bool
|
||||
var commands []*config.ConfigTemplate
|
||||
depsMap := make(map[string]dep.Dependency)
|
||||
|
||||
@@ -541,6 +541,9 @@ func (r *Runner) Run() error {
|
||||
// Make a note that we have rendered this template (required for once
|
||||
// mode and just generally nice for debugging purposes).
|
||||
r.markRenderTime(tmpl.ID(), false)
|
||||
|
||||
// Record that at least one template would have been rendered.
|
||||
wouldRenderAny = true
|
||||
}
|
||||
|
||||
// If we _actually_ rendered the template to disk, we want to run the
|
||||
@@ -572,7 +575,7 @@ func (r *Runner) Run() error {
|
||||
}
|
||||
|
||||
// Check if we need to deliver any rendered signals
|
||||
if renderedAny {
|
||||
if wouldRenderAny || renderedAny {
|
||||
// Send the signal that a template got rendered
|
||||
select {
|
||||
case r.renderedCh <- struct{}{}:
|
||||
|
||||
30
vendor/vendor.json
vendored
30
vendor/vendor.json
vendored
@@ -497,44 +497,44 @@
|
||||
{
|
||||
"checksumSHA1": "+JUQvWp1JUVeRT5weWL9hi6Fu4Y=",
|
||||
"path": "github.com/hashicorp/consul-template/child",
|
||||
"revision": "34f1ee1925645f22320e1224d8dc535efe4ac9e1",
|
||||
"revisionTime": "2016-10-25T16:32:43Z"
|
||||
"revision": "17cd016cdfa6601e82256b8d624b1331a0c188a7",
|
||||
"revisionTime": "2016-10-28T21:56:23Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "UerCY17HM5DSJ/rE760qxm99Al4=",
|
||||
"path": "github.com/hashicorp/consul-template/config",
|
||||
"revision": "34f1ee1925645f22320e1224d8dc535efe4ac9e1",
|
||||
"revisionTime": "2016-10-25T16:32:43Z"
|
||||
"revision": "17cd016cdfa6601e82256b8d624b1331a0c188a7",
|
||||
"revisionTime": "2016-10-28T21:56:23Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "0nA6tnACi/MkE+Mb5L1gqbc3tpw=",
|
||||
"path": "github.com/hashicorp/consul-template/dependency",
|
||||
"revision": "34f1ee1925645f22320e1224d8dc535efe4ac9e1",
|
||||
"revisionTime": "2016-10-25T16:32:43Z"
|
||||
"revision": "17cd016cdfa6601e82256b8d624b1331a0c188a7",
|
||||
"revisionTime": "2016-10-28T21:56:23Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "2/lmGAanNTSlm2j83xMbBzHvzdk=",
|
||||
"checksumSHA1": "KcDxr/mNzYzTeFSCQyhpU1Nm/Ug=",
|
||||
"path": "github.com/hashicorp/consul-template/manager",
|
||||
"revision": "34f1ee1925645f22320e1224d8dc535efe4ac9e1",
|
||||
"revisionTime": "2016-10-25T16:32:43Z"
|
||||
"revision": "17cd016cdfa6601e82256b8d624b1331a0c188a7",
|
||||
"revisionTime": "2016-10-28T21:56:23Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "ByMIKPf7bXpyhhy80IjKLKYrjpo=",
|
||||
"path": "github.com/hashicorp/consul-template/signals",
|
||||
"revision": "34f1ee1925645f22320e1224d8dc535efe4ac9e1",
|
||||
"revisionTime": "2016-10-25T16:32:43Z"
|
||||
"revision": "17cd016cdfa6601e82256b8d624b1331a0c188a7",
|
||||
"revisionTime": "2016-10-28T21:56:23Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "bkSJRnR2VyZA1KoyOF/eSkxVVFg=",
|
||||
"path": "github.com/hashicorp/consul-template/template",
|
||||
"revision": "34f1ee1925645f22320e1224d8dc535efe4ac9e1",
|
||||
"revisionTime": "2016-10-25T16:32:43Z"
|
||||
"revision": "17cd016cdfa6601e82256b8d624b1331a0c188a7",
|
||||
"revisionTime": "2016-10-28T21:56:23Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "HfWf4Vf1fBJh5HgHLdjpF5vs0Lk=",
|
||||
"path": "github.com/hashicorp/consul-template/watch",
|
||||
"revision": "34f1ee1925645f22320e1224d8dc535efe4ac9e1",
|
||||
"revisionTime": "2016-10-25T16:32:43Z"
|
||||
"revision": "17cd016cdfa6601e82256b8d624b1331a0c188a7",
|
||||
"revisionTime": "2016-10-28T21:56:23Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "kWbL0V4o8vJL75mzeQzhF6p5jiQ=",
|
||||
|
||||
Reference in New Issue
Block a user