Template emits events explaining why it is blocked

This PR does the following:
* Adds a mechanism to emit events in the TaskRunner
* Vendors a new version of Consul-Template that allows extraction of
missing dependencies
* Adds logic to our consul_template.go to determine missing events and
emit them in a batched fashion.
* Refactors the consul_template code to split the run method and take in
a config struct rather than many parameters.

Fixes https://github.com/hashicorp/nomad/issues/2578
This commit is contained in:
Alex Dadgar
2017-08-09 17:25:29 -07:00
parent 6c0bb2ba2c
commit 1e7ae913e2
4 changed files with 469 additions and 198 deletions

View File

@@ -620,8 +620,9 @@ func (r *AllocRunner) setStatus(status, desc string) {
}
}
// setTaskState is used to set the status of a task. If state is empty then the
// event is appended but not synced with the server. The event may be omitted
// setTaskState is used to set the status of a task. If lazySync is set then the
// event is appended but not synced with the server. If state is omitted, the
// last known state is used.
func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent, lazySync bool) {
r.taskStatusLock.Lock()
defer r.taskStatusLock.Unlock()

View File

@@ -5,6 +5,7 @@ import (
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
@@ -25,6 +26,15 @@ const (
// hostSrcOption is the Client option that determines whether the template
// source may be from the host
hostSrcOption = "template.allow_host_source"
// missingDepEventLimit is the number of missing dependencies that will be
// logged before we switch to showing just the number of missing
// dependencies.
missingDepEventLimit = 3
// DefaultMaxTemplateEventRate is the default maximum rate at which a
// template event should be fired.
DefaultMaxTemplateEventRate = 3 * time.Second
)
var (
@@ -55,15 +65,12 @@ type TaskHooks interface {
// TaskTemplateManager is used to run a set of templates for a given task
type TaskTemplateManager struct {
// templates is the set of templates we are managing
templates []*structs.Template
// config holds the template managers configuration
config *TaskTemplateManagerConfig
// lookup allows looking up the set of Nomad templates by their consul-template ID
lookup map[string][]*structs.Template
// hooks is used to signal/restart the task as templates are rendered
hook TaskHooks
// runner is the consul-template runner
runner *manager.Runner
@@ -79,29 +86,67 @@ type TaskTemplateManager struct {
shutdownLock sync.Mutex
}
func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
config *config.Config, vaultToken, taskDir string,
envBuilder *env.Builder) (*TaskTemplateManager, error) {
// TaskTemplateManagerConfig is used to configure an instance of the
// TaskTemplateManager
type TaskTemplateManagerConfig struct {
// Hooks is used to interact with the task the template manager is being run
// for
Hooks TaskHooks
// Templates is the set of templates we are managing
Templates []*structs.Template
// ClientConfig is the Nomad Client configuration
ClientConfig *config.Config
// VaultToken is the Vault token for the task.
VaultToken string
// TaskDir is the task's directory
TaskDir string
// EnvBuilder is the environment variable builder for the task.
EnvBuilder *env.Builder
// MaxTemplateEventRate is the maximum rate at which we should emit events.
MaxTemplateEventRate time.Duration
// retryRate is only used for testing and is used to increase the retry rate
retryRate time.Duration
}
// Validate validates the configuration.
func (c *TaskTemplateManagerConfig) Validate() error {
if c == nil {
return fmt.Errorf("Nil config passed")
} else if c.Hooks == nil {
return fmt.Errorf("Invalid task hooks given")
} else if c.ClientConfig == nil {
return fmt.Errorf("Invalid client config given")
} else if c.TaskDir == "" {
return fmt.Errorf("Invalid task directory given")
} else if c.EnvBuilder == nil {
return fmt.Errorf("Invalid task environment given")
} else if c.MaxTemplateEventRate == 0 {
return fmt.Errorf("Invalid max template event rate given")
}
return nil
}
func NewTaskTemplateManager(config *TaskTemplateManagerConfig) (*TaskTemplateManager, error) {
// Check pre-conditions
if hook == nil {
return nil, fmt.Errorf("Invalid task hook given")
} else if config == nil {
return nil, fmt.Errorf("Invalid config given")
} else if taskDir == "" {
return nil, fmt.Errorf("Invalid task directory given")
} else if envBuilder == nil {
return nil, fmt.Errorf("Invalid task environment given")
if err := config.Validate(); err != nil {
return nil, err
}
tm := &TaskTemplateManager{
templates: tmpls,
hook: hook,
config: config,
shutdownCh: make(chan struct{}),
}
// Parse the signals that we need
for _, tmpl := range tmpls {
for _, tmpl := range config.Templates {
if tmpl.ChangeSignal == "" {
continue
}
@@ -119,14 +164,14 @@ func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
}
// Build the consul-template runner
runner, lookup, err := templateRunner(tmpls, config, vaultToken, taskDir, envBuilder.Build())
runner, lookup, err := templateRunner(config)
if err != nil {
return nil, err
}
tm.runner = runner
tm.lookup = lookup
go tm.run(envBuilder, taskDir)
go tm.run()
return tm, nil
}
@@ -149,22 +194,61 @@ func (tm *TaskTemplateManager) Stop() {
}
// run is the long lived loop that handles errors and templates being rendered
func (tm *TaskTemplateManager) run(envBuilder *env.Builder, taskDir string) {
func (tm *TaskTemplateManager) run() {
// Runner is nil if there is no templates
if tm.runner == nil {
// Unblock the start if there is nothing to do
tm.hook.UnblockStart("consul-template")
tm.config.Hooks.UnblockStart("Template")
return
}
// Start the runner
go tm.runner.Start()
// Track when they have all been rendered so we don't signal the task for
// any render event before hand
var allRenderedTime time.Time
// Block till all the templates have been rendered
tm.handleFirstRender()
// Detect if there was a shutdown.
select {
case <-tm.shutdownCh:
return
default:
}
// Read environment variables from env templates before we unblock
envMap, err := loadTemplateEnv(tm.config.Templates, tm.config.TaskDir)
if err != nil {
tm.config.Hooks.Kill("Template", err.Error(), true)
return
}
tm.config.EnvBuilder.SetTemplateEnv(envMap)
// Unblock the task
tm.config.Hooks.UnblockStart("Template")
// If all our templates are change mode no-op, then we can exit here
if tm.allTemplatesNoop() {
return
}
// handle all subsequent render events.
tm.handleTemplateRerenders(time.Now())
}
// handleFirstRender blocks till all templates have been rendered
func (tm *TaskTemplateManager) handleFirstRender() {
// missingDependencies is the set of missing dependencies.
var missingDependencies map[string]struct{}
// eventTimer is used to trigger the firing of an event showing the missing
// dependencies.
var eventTimer *time.Timer
var eventTimerCh <-chan time.Time
// outstandingEvent tracks whether there is an outstanding event that should
// be fired.
outstandingEvent := false
// Handle the first rendering
// Wait till all the templates have been rendered
WAIT:
for {
@@ -176,7 +260,7 @@ WAIT:
continue
}
tm.hook.Kill("consul-template", err.Error(), true)
tm.config.Hooks.Kill("template", err.Error(), true)
case <-tm.runner.TemplateRenderedCh():
// A template has been rendered, figure out what to do
events := tm.runner.RenderEvents()
@@ -194,28 +278,85 @@ WAIT:
}
break WAIT
case <-tm.runner.RenderEventCh():
events := tm.runner.RenderEvents()
joinedSet := make(map[string]struct{})
for _, event := range events {
missing := event.MissingDeps
if missing == nil {
continue
}
for _, dep := range missing.List() {
joinedSet[dep.String()] = struct{}{}
}
}
// Check to see if the new joined set is the same as the old
different := len(joinedSet) != len(missingDependencies)
if !different {
for k := range joinedSet {
if _, ok := missingDependencies[k]; !ok {
different = true
break
}
}
}
// Nothing to do
if !different {
continue
}
// Update the missing set
missingDependencies = joinedSet
// Update the event timer channel
if eventTimer == nil {
// We are creating the event channel for the first time.
eventTimer = time.NewTimer(tm.config.MaxTemplateEventRate)
eventTimerCh = eventTimer.C
defer eventTimer.Stop()
outstandingEvent = true
continue
} else if !outstandingEvent {
// We got new data so reset
outstandingEvent = true
eventTimer.Reset(tm.config.MaxTemplateEventRate)
}
case <-eventTimerCh:
if missingDependencies == nil {
continue
}
// Clear the outstanding event
outstandingEvent = false
// Build the missing set
missingSlice := make([]string, 0, len(missingDependencies))
for k := range missingDependencies {
missingSlice = append(missingSlice, k)
}
sort.Strings(missingSlice)
if l := len(missingSlice); l > missingDepEventLimit {
missingSlice[missingDepEventLimit] = fmt.Sprintf("and %d more", l-missingDepEventLimit)
missingSlice = missingSlice[:missingDepEventLimit+1]
}
missingStr := strings.Join(missingSlice, ", ")
tm.config.Hooks.EmitEvent("Template", fmt.Sprintf("Missing: %s", missingStr))
}
}
}
// Read environment variables from env templates
envMap, err := loadTemplateEnv(tm.templates, taskDir)
if err != nil {
tm.hook.Kill("consul-template", err.Error(), true)
return
}
envBuilder.SetTemplateEnv(envMap)
allRenderedTime = time.Now()
tm.hook.UnblockStart("consul-template")
// If all our templates are change mode no-op, then we can exit here
if tm.allTemplatesNoop() {
return
}
// handleTemplateRerenders is used to handle template render events after they
// have all rendered. It takes action based on which set of templates re-render.
// The passed allRenderedTime is the time at which all templates have rendered.
// This is used to avoid signaling the task for any render event before hand.
func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time) {
// A lookup for the last time the template was handled
numTemplates := len(tm.templates)
handledRenders := make(map[string]time.Time, numTemplates)
handledRenders := make(map[string]time.Time, len(tm.config.Templates))
for {
select {
@@ -226,7 +367,7 @@ WAIT:
continue
}
tm.hook.Kill("consul-template", err.Error(), true)
tm.config.Hooks.Kill("Template", err.Error(), true)
case <-tm.runner.TemplateRenderedCh():
// A template has been rendered, figure out what to do
var handling []string
@@ -251,17 +392,17 @@ WAIT:
// Lookup the template and determine what to do
tmpls, ok := tm.lookup[id]
if !ok {
tm.hook.Kill("consul-template", fmt.Sprintf("consul-template runner returned unknown template id %q", id), true)
tm.config.Hooks.Kill("Template", fmt.Sprintf("template runner returned unknown template id %q", id), true)
return
}
// Read environment variables from templates
envMap, err := loadTemplateEnv(tmpls, taskDir)
envMap, err := loadTemplateEnv(tmpls, tm.config.TaskDir)
if err != nil {
tm.hook.Kill("consul-template", err.Error(), true)
tm.config.Hooks.Kill("Template", err.Error(), true)
return
}
envBuilder.SetTemplateEnv(envMap)
tm.config.EnvBuilder.SetTemplateEnv(envMap)
for _, tmpl := range tmpls {
switch tmpl.ChangeMode {
@@ -300,11 +441,11 @@ WAIT:
}
if restart {
tm.hook.Restart("consul-template", "template with change_mode restart re-rendered")
tm.config.Hooks.Restart("template", "template with change_mode restart re-rendered")
} else if len(signals) != 0 {
var mErr multierror.Error
for signal := range signals {
err := tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal])
err := tm.config.Hooks.Signal("Template", "template re-rendered", tm.signals[signal])
if err != nil {
multierror.Append(&mErr, err)
}
@@ -315,7 +456,7 @@ WAIT:
for signal := range signals {
flat = append(flat, tm.signals[signal])
}
tm.hook.Kill("consul-template", fmt.Sprintf("Sending signals %v failed: %v", flat, err), true)
tm.config.Hooks.Kill("Template", fmt.Sprintf("Sending signals %v failed: %v", flat, err), true)
}
}
}
@@ -325,7 +466,7 @@ WAIT:
// allTemplatesNoop returns whether all the managed templates have change mode noop.
func (tm *TaskTemplateManager) allTemplatesNoop() bool {
for _, tmpl := range tm.templates {
for _, tmpl := range tm.config.Templates {
if tmpl.ChangeMode != structs.TemplateChangeModeNoop {
return false
}
@@ -335,25 +476,23 @@ func (tm *TaskTemplateManager) allTemplatesNoop() bool {
}
// templateRunner returns a consul-template runner for the given templates and a
// lookup by destination to the template. If no templates are given, a nil
// template runner and lookup is returned.
func templateRunner(tmpls []*structs.Template, config *config.Config,
vaultToken, taskDir string, taskEnv *env.TaskEnv) (
// lookup by destination to the template. If no templates are in the config, a
// nil template runner and lookup is returned.
func templateRunner(config *TaskTemplateManagerConfig) (
*manager.Runner, map[string][]*structs.Template, error) {
if len(tmpls) == 0 {
if len(config.Templates) == 0 {
return nil, nil, nil
}
// Parse the templates
allowAbs := config.ReadBoolDefault(hostSrcOption, true)
ctmplMapping, err := parseTemplateConfigs(tmpls, taskDir, taskEnv, allowAbs)
ctmplMapping, err := parseTemplateConfigs(config)
if err != nil {
return nil, nil, err
}
// Create the runner configuration.
runnerConfig, err := newRunnerConfig(config, vaultToken, ctmplMapping)
runnerConfig, err := newRunnerConfig(config, ctmplMapping)
if err != nil {
return nil, nil, err
}
@@ -364,7 +503,7 @@ func templateRunner(tmpls []*structs.Template, config *config.Config,
}
// Set Nomad's environment variables
runner.Env = taskEnv.All()
runner.Env = config.EnvBuilder.Build().All()
// Build the lookup
idMap := runner.TemplateConfigMapping()
@@ -380,12 +519,14 @@ func templateRunner(tmpls []*structs.Template, config *config.Config,
return runner, lookup, nil
}
// parseTemplateConfigs converts the tasks templates into consul-templates
func parseTemplateConfigs(tmpls []*structs.Template, taskDir string,
taskEnv *env.TaskEnv, allowAbs bool) (map[ctconf.TemplateConfig]*structs.Template, error) {
// parseTemplateConfigs converts the tasks templates in the config into
// consul-templates
func parseTemplateConfigs(config *TaskTemplateManagerConfig) (map[ctconf.TemplateConfig]*structs.Template, error) {
allowAbs := config.ClientConfig.ReadBoolDefault(hostSrcOption, true)
taskEnv := config.EnvBuilder.Build()
ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(tmpls))
for _, tmpl := range tmpls {
ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(config.Templates))
for _, tmpl := range config.Templates {
var src, dest string
if tmpl.SourcePath != "" {
if filepath.IsAbs(tmpl.SourcePath) {
@@ -395,11 +536,11 @@ func parseTemplateConfigs(tmpls []*structs.Template, taskDir string,
src = tmpl.SourcePath
} else {
src = filepath.Join(taskDir, taskEnv.ReplaceEnv(tmpl.SourcePath))
src = filepath.Join(config.TaskDir, taskEnv.ReplaceEnv(tmpl.SourcePath))
}
}
if tmpl.DestPath != "" {
dest = filepath.Join(taskDir, taskEnv.ReplaceEnv(tmpl.DestPath))
dest = filepath.Join(config.TaskDir, taskEnv.ReplaceEnv(tmpl.DestPath))
}
ct := ctconf.DefaultTemplateConfig()
@@ -427,12 +568,11 @@ func parseTemplateConfigs(tmpls []*structs.Template, taskDir string,
}
// newRunnerConfig returns a consul-template runner configuration, setting the
// Vault and Consul configurations based on the clients configs. The parameters
// are the client config, Vault token if set and the mapping of consul-templates
// to Nomad templates.
func newRunnerConfig(config *config.Config, vaultToken string,
// Vault and Consul configurations based on the clients configs.
func newRunnerConfig(config *TaskTemplateManagerConfig,
templateMapping map[ctconf.TemplateConfig]*structs.Template) (*ctconf.Config, error) {
cc := config.ClientConfig
conf := ctconf.DefaultConfig()
// Gather the consul-template tempates
@@ -455,29 +595,29 @@ func newRunnerConfig(config *config.Config, vaultToken string,
}
// Force faster retries
if testRetryRate != 0 {
rate := testRetryRate
if config.retryRate != 0 {
rate := config.retryRate
conf.Consul.Retry.Backoff = &rate
}
// Setup the Consul config
if config.ConsulConfig != nil {
conf.Consul.Address = &config.ConsulConfig.Addr
conf.Consul.Token = &config.ConsulConfig.Token
if cc.ConsulConfig != nil {
conf.Consul.Address = &cc.ConsulConfig.Addr
conf.Consul.Token = &cc.ConsulConfig.Token
if config.ConsulConfig.EnableSSL != nil && *config.ConsulConfig.EnableSSL {
verify := config.ConsulConfig.VerifySSL != nil && *config.ConsulConfig.VerifySSL
if cc.ConsulConfig.EnableSSL != nil && *cc.ConsulConfig.EnableSSL {
verify := cc.ConsulConfig.VerifySSL != nil && *cc.ConsulConfig.VerifySSL
conf.Consul.SSL = &ctconf.SSLConfig{
Enabled: helper.BoolToPtr(true),
Verify: &verify,
Cert: &config.ConsulConfig.CertFile,
Key: &config.ConsulConfig.KeyFile,
CaCert: &config.ConsulConfig.CAFile,
Cert: &cc.ConsulConfig.CertFile,
Key: &cc.ConsulConfig.KeyFile,
CaCert: &cc.ConsulConfig.CAFile,
}
}
if config.ConsulConfig.Auth != "" {
parts := strings.SplitN(config.ConsulConfig.Auth, ":", 2)
if cc.ConsulConfig.Auth != "" {
parts := strings.SplitN(cc.ConsulConfig.Auth, ":", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("Failed to parse Consul Auth config")
}
@@ -495,22 +635,22 @@ func newRunnerConfig(config *config.Config, vaultToken string,
emptyStr := ""
conf.Vault.RenewToken = helper.BoolToPtr(false)
conf.Vault.Token = &emptyStr
if config.VaultConfig != nil && config.VaultConfig.IsEnabled() {
conf.Vault.Address = &config.VaultConfig.Addr
conf.Vault.Token = &vaultToken
if cc.VaultConfig != nil && cc.VaultConfig.IsEnabled() {
conf.Vault.Address = &cc.VaultConfig.Addr
conf.Vault.Token = &config.VaultToken
conf.Vault.Grace = helper.TimeToPtr(vaultGrace)
if strings.HasPrefix(config.VaultConfig.Addr, "https") || config.VaultConfig.TLSCertFile != "" {
skipVerify := config.VaultConfig.TLSSkipVerify != nil && *config.VaultConfig.TLSSkipVerify
if strings.HasPrefix(cc.VaultConfig.Addr, "https") || cc.VaultConfig.TLSCertFile != "" {
skipVerify := cc.VaultConfig.TLSSkipVerify != nil && *cc.VaultConfig.TLSSkipVerify
verify := !skipVerify
conf.Vault.SSL = &ctconf.SSLConfig{
Enabled: helper.BoolToPtr(true),
Verify: &verify,
Cert: &config.VaultConfig.TLSCertFile,
Key: &config.VaultConfig.TLSKeyFile,
CaCert: &config.VaultConfig.TLSCaFile,
CaPath: &config.VaultConfig.TLSCaPath,
ServerName: &config.VaultConfig.TLSServerName,
Cert: &cc.VaultConfig.TLSCertFile,
Key: &cc.VaultConfig.TLSKeyFile,
CaCert: &cc.VaultConfig.TLSCaFile,
CaPath: &cc.VaultConfig.TLSCaPath,
ServerName: &cc.VaultConfig.TLSServerName,
}
} else {
conf.Vault.SSL = &ctconf.SSLConfig{

View File

@@ -112,6 +112,7 @@ type testHarness struct {
taskDir string
vault *testutil.TestVault
consul *ctestutil.TestServer
emitRate time.Duration
}
// newTestHarness returns a harness starting a dev consul and vault server,
@@ -123,6 +124,7 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b
templates: templates,
node: mock.Node(),
config: &config.Config{Region: region},
emitRate: DefaultMaxTemplateEventRate,
}
// Build the task environment
@@ -158,22 +160,31 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b
}
func (h *testHarness) start(t *testing.T) {
manager, err := NewTaskTemplateManager(h.mockHooks, h.templates,
h.config, h.vaultToken, h.taskDir, h.envBuilder)
if err != nil {
if err := h.startWithErr(); err != nil {
t.Fatalf("failed to build task template manager: %v", err)
}
h.manager = manager
}
func (h *testHarness) startWithErr() error {
manager, err := NewTaskTemplateManager(h.mockHooks, h.templates,
h.config, h.vaultToken, h.taskDir, h.envBuilder)
h.manager = manager
var err error
h.manager, err = NewTaskTemplateManager(&TaskTemplateManagerConfig{
Hooks: h.mockHooks,
Templates: h.templates,
ClientConfig: h.config,
VaultToken: h.vaultToken,
TaskDir: h.taskDir,
EnvBuilder: h.envBuilder,
MaxTemplateEventRate: h.emitRate,
retryRate: 10 * time.Millisecond,
})
return err
}
func (h *testHarness) setEmitRate(d time.Duration) {
h.emitRate = d
}
// stop is used to stop any running Vault or Consul server plus the task manager
func (h *testHarness) stop() {
if h.vault != nil {
@@ -190,61 +201,118 @@ func (h *testHarness) stop() {
}
}
func TestTaskTemplateManager_Invalid(t *testing.T) {
func TestTaskTemplateManager_InvalidConfig(t *testing.T) {
t.Parallel()
hooks := NewMockTaskHooks()
var tmpls []*structs.Template
region := "global"
config := &config.Config{Region: region}
clientConfig := &config.Config{Region: "global"}
taskDir := "foo"
vaultToken := ""
a := mock.Alloc()
envBuilder := env.NewBuilder(mock.Node(), a, a.Job.TaskGroups[0].Tasks[0], config.Region)
envBuilder := env.NewBuilder(mock.Node(), a, a.Job.TaskGroups[0].Tasks[0], clientConfig.Region)
_, err := NewTaskTemplateManager(nil, nil, nil, "", "", nil)
if err == nil {
t.Fatalf("Expected error")
cases := []struct {
name string
config *TaskTemplateManagerConfig
expectedErr string
}{
{
name: "nil config",
config: nil,
expectedErr: "Nil config passed",
},
{
name: "bad hooks",
config: &TaskTemplateManagerConfig{
ClientConfig: clientConfig,
TaskDir: taskDir,
EnvBuilder: envBuilder,
MaxTemplateEventRate: DefaultMaxTemplateEventRate,
},
expectedErr: "task hooks",
},
{
name: "bad client config",
config: &TaskTemplateManagerConfig{
Hooks: hooks,
TaskDir: taskDir,
EnvBuilder: envBuilder,
MaxTemplateEventRate: DefaultMaxTemplateEventRate,
},
expectedErr: "client config",
},
{
name: "bad task dir",
config: &TaskTemplateManagerConfig{
ClientConfig: clientConfig,
Hooks: hooks,
EnvBuilder: envBuilder,
MaxTemplateEventRate: DefaultMaxTemplateEventRate,
},
expectedErr: "task directory",
},
{
name: "bad env builder",
config: &TaskTemplateManagerConfig{
ClientConfig: clientConfig,
Hooks: hooks,
TaskDir: taskDir,
MaxTemplateEventRate: DefaultMaxTemplateEventRate,
},
expectedErr: "task environment",
},
{
name: "bad max event rate",
config: &TaskTemplateManagerConfig{
ClientConfig: clientConfig,
Hooks: hooks,
TaskDir: taskDir,
EnvBuilder: envBuilder,
},
expectedErr: "template event rate",
},
{
name: "valid",
config: &TaskTemplateManagerConfig{
ClientConfig: clientConfig,
Hooks: hooks,
TaskDir: taskDir,
EnvBuilder: envBuilder,
MaxTemplateEventRate: DefaultMaxTemplateEventRate,
},
},
{
name: "invalid signal",
config: &TaskTemplateManagerConfig{
Templates: []*structs.Template{
{
DestPath: "foo",
EmbeddedTmpl: "hello, world",
ChangeMode: structs.TemplateChangeModeSignal,
ChangeSignal: "foobarbaz",
},
},
ClientConfig: clientConfig,
Hooks: hooks,
TaskDir: taskDir,
EnvBuilder: envBuilder,
MaxTemplateEventRate: DefaultMaxTemplateEventRate,
},
expectedErr: "parse signal",
},
}
_, err = NewTaskTemplateManager(nil, tmpls, config, vaultToken, taskDir, envBuilder)
if err == nil || !strings.Contains(err.Error(), "task hook") {
t.Fatalf("Expected invalid task hook error: %v", err)
}
_, err = NewTaskTemplateManager(hooks, tmpls, nil, vaultToken, taskDir, envBuilder)
if err == nil || !strings.Contains(err.Error(), "config") {
t.Fatalf("Expected invalid config error: %v", err)
}
_, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, "", envBuilder)
if err == nil || !strings.Contains(err.Error(), "task directory") {
t.Fatalf("Expected invalid task dir error: %v", err)
}
_, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, nil)
if err == nil || !strings.Contains(err.Error(), "task environment") {
t.Fatalf("Expected invalid task environment error: %v", err)
}
tm, err := NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, envBuilder)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if tm == nil {
t.Fatalf("Bad %v", tm)
}
// Build a template with a bad signal
tmpl := &structs.Template{
DestPath: "foo",
EmbeddedTmpl: "hello, world",
ChangeMode: structs.TemplateChangeModeSignal,
ChangeSignal: "foobarbaz",
}
tmpls = append(tmpls, tmpl)
tm, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, envBuilder)
if err == nil || !strings.Contains(err.Error(), "Failed to parse signal") {
t.Fatalf("Expected signal parsing error: %v", err)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
_, err := NewTaskTemplateManager(c.config)
if err != nil {
if c.expectedErr == "" {
t.Fatalf("unexpected error: %v", err)
} else if !strings.Contains(err.Error(), c.expectedErr) {
t.Fatalf("expected error to contain %q; got %q", c.expectedErr, err.Error())
}
} else if c.expectedErr != "" {
t.Fatalf("expected an error to contain %q", c.expectedErr)
}
})
}
}
@@ -460,9 +528,6 @@ func TestTaskTemplateManager_Unblock_Consul(t *testing.T) {
ChangeMode: structs.TemplateChangeModeNoop,
}
// Drop the retry rate
testRetryRate = 10 * time.Millisecond
harness := newTestHarness(t, []*structs.Template{template}, true, false)
harness.start(t)
defer harness.stop()
@@ -510,9 +575,6 @@ func TestTaskTemplateManager_Unblock_Vault(t *testing.T) {
ChangeMode: structs.TemplateChangeModeNoop,
}
// Drop the retry rate
testRetryRate = 10 * time.Millisecond
harness := newTestHarness(t, []*structs.Template{template}, false, true)
harness.start(t)
defer harness.stop()
@@ -569,9 +631,6 @@ func TestTaskTemplateManager_Unblock_Multi_Template(t *testing.T) {
ChangeMode: structs.TemplateChangeModeNoop,
}
// Drop the retry rate
testRetryRate = 10 * time.Millisecond
harness := newTestHarness(t, []*structs.Template{template, template2}, true, false)
harness.start(t)
defer harness.stop()
@@ -630,9 +689,6 @@ func TestTaskTemplateManager_Rerender_Noop(t *testing.T) {
ChangeMode: structs.TemplateChangeModeNoop,
}
// Drop the retry rate
testRetryRate = 10 * time.Millisecond
harness := newTestHarness(t, []*structs.Template{template}, true, false)
harness.start(t)
defer harness.stop()
@@ -716,9 +772,6 @@ func TestTaskTemplateManager_Rerender_Signal(t *testing.T) {
ChangeSignal: "SIGBUS",
}
// Drop the retry rate
testRetryRate = 10 * time.Millisecond
harness := newTestHarness(t, []*structs.Template{template, template2}, true, false)
harness.start(t)
defer harness.stop()
@@ -802,9 +855,6 @@ func TestTaskTemplateManager_Rerender_Restart(t *testing.T) {
ChangeMode: structs.TemplateChangeModeRestart,
}
// Drop the retry rate
testRetryRate = 10 * time.Millisecond
harness := newTestHarness(t, []*structs.Template{template}, true, false)
harness.start(t)
defer harness.stop()
@@ -905,9 +955,6 @@ func TestTaskTemplateManager_Signal_Error(t *testing.T) {
ChangeSignal: "SIGALRM",
}
// Drop the retry rate
testRetryRate = 10 * time.Millisecond
harness := newTestHarness(t, []*structs.Template{template}, true, false)
harness.start(t)
defer harness.stop()
@@ -1075,7 +1122,11 @@ func TestTaskTemplateManager_Config_ServerName(t *testing.T) {
Addr: "https://localhost/",
TLSServerName: "notlocalhost",
}
ctconf, err := newRunnerConfig(c, "token", nil)
config := &TaskTemplateManagerConfig{
ClientConfig: c,
VaultToken: "token",
}
ctconf, err := newRunnerConfig(config, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -1091,34 +1142,97 @@ func TestTaskTemplateManager_Config_VaultGrace(t *testing.T) {
t.Parallel()
assert := assert.New(t)
c := config.DefaultConfig()
c.Node = mock.Node()
c.VaultConfig = &sconfig.VaultConfig{
Enabled: helper.BoolToPtr(true),
Addr: "https://localhost/",
TLSServerName: "notlocalhost",
}
// Make a template that will render immediately
templates := []*structs.Template{
{
EmbeddedTmpl: "bar",
DestPath: "foo",
ChangeMode: structs.TemplateChangeModeNoop,
VaultGrace: 10 * time.Second,
},
{
EmbeddedTmpl: "baz",
DestPath: "bam",
ChangeMode: structs.TemplateChangeModeNoop,
VaultGrace: 100 * time.Second,
alloc := mock.Alloc()
config := &TaskTemplateManagerConfig{
ClientConfig: c,
VaultToken: "token",
// Make a template that will render immediately
Templates: []*structs.Template{
{
EmbeddedTmpl: "bar",
DestPath: "foo",
ChangeMode: structs.TemplateChangeModeNoop,
VaultGrace: 10 * time.Second,
},
{
EmbeddedTmpl: "baz",
DestPath: "bam",
ChangeMode: structs.TemplateChangeModeNoop,
VaultGrace: 100 * time.Second,
},
},
EnvBuilder: env.NewBuilder(c.Node, alloc, alloc.Job.TaskGroups[0].Tasks[0], c.Region),
}
taskEnv := env.NewTaskEnv(nil, nil)
ctmplMapping, err := parseTemplateConfigs(templates, "/fake/dir", taskEnv, false)
ctmplMapping, err := parseTemplateConfigs(config)
assert.Nil(err, "Parsing Templates")
ctconf, err := newRunnerConfig(c, "token", ctmplMapping)
ctconf, err := newRunnerConfig(config, ctmplMapping)
assert.Nil(err, "Building Runner Config")
assert.NotNil(ctconf.Vault.Grace, "Vault Grace Pointer")
assert.Equal(10*time.Second, *ctconf.Vault.Grace, "Vault Grace Value")
}
func TestTaskTemplateManager_BlockedEvents(t *testing.T) {
t.Parallel()
// Make a template that will render based on a key in Consul
var embedded string
for i := 0; i < 5; i++ {
embedded += fmt.Sprintf(`{{key "%d"}}`, i)
}
file := "my.tmpl"
template := &structs.Template{
EmbeddedTmpl: embedded,
DestPath: file,
ChangeMode: structs.TemplateChangeModeNoop,
}
harness := newTestHarness(t, []*structs.Template{template}, true, false)
harness.setEmitRate(100 * time.Millisecond)
harness.start(t)
defer harness.stop()
// Ensure that we get a blocked event
select {
case <-harness.mockHooks.UnblockCh:
t.Fatalf("Task unblock should have not have been called")
case <-harness.mockHooks.EmitEventCh:
case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second):
t.Fatalf("timeout")
}
// Check to see we got a correct message
event := harness.mockHooks.Events[0]
if !strings.Contains(event, "and 2 more") {
t.Fatalf("bad event: %q", event)
}
// Write 3 keys to Consul
for i := 0; i < 3; i++ {
harness.consul.SetKV(t, fmt.Sprintf("%d", i), []byte{0xa})
}
// Ensure that we get a blocked event
select {
case <-harness.mockHooks.UnblockCh:
t.Fatalf("Task unblock should have not have been called")
case <-harness.mockHooks.EmitEventCh:
case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second):
t.Fatalf("timeout")
}
// Check to see we got a correct message
event = harness.mockHooks.Events[len(harness.mockHooks.Events)-1]
if !strings.Contains(event, "Missing") || strings.Contains(event, "more") {
t.Fatalf("bad event: %q", event)
}
}

View File

@@ -188,7 +188,8 @@ func (s *taskRunnerState) Hash() []byte {
return h.Sum(nil)
}
// TaskStateUpdater is used to signal that tasks state has changed.
// TaskStateUpdater is used to signal that tasks state has changed. If lazySync
// is set the event won't be immediately pushed to the server.
type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent, lazySync bool)
// SignalEvent is a tuple of the signal and the event generating it
@@ -857,8 +858,16 @@ func (r *TaskRunner) updatedTokenHandler() {
// Create a new templateManager
var err error
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates,
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder)
r.templateManager, err = NewTaskTemplateManager(&TaskTemplateManagerConfig{
Hooks: r,
Templates: r.task.Templates,
ClientConfig: r.config,
VaultToken: r.vaultFuture.Get(),
TaskDir: r.taskDir.Dir,
EnvBuilder: r.envBuilder,
MaxTemplateEventRate: DefaultMaxTemplateEventRate,
})
if err != nil {
err := fmt.Errorf("failed to build task's template manager: %v", err)
r.setState(structs.TaskStateDead,
@@ -965,8 +974,15 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res
// Build the template manager
if r.templateManager == nil {
var err error
r.templateManager, err = NewTaskTemplateManager(r, task.Templates,
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder)
r.templateManager, err = NewTaskTemplateManager(&TaskTemplateManagerConfig{
Hooks: r,
Templates: r.task.Templates,
ClientConfig: r.config,
VaultToken: r.vaultFuture.Get(),
TaskDir: r.taskDir.Dir,
EnvBuilder: r.envBuilder,
MaxTemplateEventRate: DefaultMaxTemplateEventRate,
})
if err != nil {
err := fmt.Errorf("failed to build task's template manager: %v", err)
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), false)