diff --git a/client/allocrunnerv2/taskrunner/interfaces/events.go b/client/allocrunnerv2/taskrunner/interfaces/events.go new file mode 100644 index 000000000..3f908cedf --- /dev/null +++ b/client/allocrunnerv2/taskrunner/interfaces/events.go @@ -0,0 +1,8 @@ +package interfaces + +import "github.com/hashicorp/nomad/nomad/structs" + +type EventEmitter interface { + SetState(state string, event *structs.TaskEvent) + EmitEvent(source, message string) +} diff --git a/client/allocrunnerv2/taskrunner/interfaces/lifecycle.go b/client/allocrunnerv2/taskrunner/interfaces/lifecycle.go new file mode 100644 index 000000000..e440943a3 --- /dev/null +++ b/client/allocrunnerv2/taskrunner/interfaces/lifecycle.go @@ -0,0 +1,11 @@ +package interfaces + +import "os" + +// XXX These should probably all return an error and we should have predefined +// error types for the task not currently running +type TaskLifecycle interface { + Restart(source, reason string, failure bool) + Signal(source, reason string, s os.Signal) error + Kill(source, reason string, fail bool) +} diff --git a/client/allocrunnerv2/taskrunner/lifecycle.go b/client/allocrunnerv2/taskrunner/lifecycle.go index e93fbf065..4d3f7bd0f 100644 --- a/client/allocrunnerv2/taskrunner/lifecycle.go +++ b/client/allocrunnerv2/taskrunner/lifecycle.go @@ -2,14 +2,6 @@ package taskrunner import "os" -// XXX These should probably all return an error and we should have predefined -// error types for the task not currently running -type TaskLifecycle interface { - Restart(source, reason string, failure bool) - Signal(source, reason string, s os.Signal) error - Kill(source, reason string, fail bool) -} - func (tr *TaskRunner) Restart(source, reason string, failure bool) { // TODO } diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index 708a63df3..f54d5d6bd 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -498,6 +498,11 @@ func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) { //} } +func (tr *TaskRunner) EmitEvent(source, message string) { + event := structs.NewTaskEvent(source).SetMessage(message) + tr.SetState("", event) +} + // WaitCh is closed when TaskRunner.Run exits. func (tr *TaskRunner) WaitCh() <-chan struct{} { return tr.waitCh diff --git a/client/allocrunnerv2/taskrunner/task_runner_hooks.go b/client/allocrunnerv2/taskrunner/task_runner_hooks.go index cb2382f8d..5205754a1 100644 --- a/client/allocrunnerv2/taskrunner/task_runner_hooks.go +++ b/client/allocrunnerv2/taskrunner/task_runner_hooks.go @@ -9,16 +9,13 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/getter" "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" + ti "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state" cconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" ) -type EventEmitter interface { - SetState(state string, event *structs.TaskEvent) -} - // initHooks intializes the tasks hooks. func (tr *TaskRunner) initHooks() { hookLogger := tr.logger.Named("task_hook") @@ -43,6 +40,18 @@ func (tr *TaskRunner) initHooks() { task: tr.taskName, })) } + + // If there are templates is enabled, add the hook + if task := tr.Task(); len(task.Templates) != 0 { + tr.runnerHooks = append(tr.runnerHooks, newTemplateHook(&templateHookConfig{ + logger: hookLogger, + lifecycle: tr, + events: tr, + templates: task.Templates, + clientConfig: tr.clientConfig, + envBuilder: tr.envBuilder, + })) + } } // prerun is used to run the runners prerun hooks. @@ -304,11 +313,11 @@ func (h *taskDirHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunRequ // artifactHook downloads artifacts for a task. type artifactHook struct { - eventEmitter EventEmitter + eventEmitter ti.EventEmitter logger log.Logger } -func newArtifactHook(e EventEmitter, logger log.Logger) *artifactHook { +func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook { h := &artifactHook{ eventEmitter: e, } diff --git a/client/allocrunnerv2/taskrunner/template/template.go b/client/allocrunnerv2/taskrunner/template/template.go new file mode 100644 index 000000000..27dba209f --- /dev/null +++ b/client/allocrunnerv2/taskrunner/template/template.go @@ -0,0 +1,679 @@ +package template + +import ( + "fmt" + "math/rand" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" + "time" + + ctconf "github.com/hashicorp/consul-template/config" + "github.com/hashicorp/consul-template/manager" + "github.com/hashicorp/consul-template/signals" + envparse "github.com/hashicorp/go-envparse" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/env" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // consulTemplateSourceName is the source name when using the TaskHooks. + consulTemplateSourceName = "Template" + + // hostSrcOption is the Client option that determines whether the template + // source may be from the host + hostSrcOption = "template.allow_host_source" + + // missingDepEventLimit is the number of missing dependencies that will be + // logged before we switch to showing just the number of missing + // dependencies. + missingDepEventLimit = 3 + + // DefaultMaxTemplateEventRate is the default maximum rate at which a + // template event should be fired. + DefaultMaxTemplateEventRate = 3 * time.Second +) + +// TaskTemplateManager is used to run a set of templates for a given task +type TaskTemplateManager struct { + // config holds the template managers configuration + config *TaskTemplateManagerConfig + + // lookup allows looking up the set of Nomad templates by their consul-template ID + lookup map[string][]*structs.Template + + // runner is the consul-template runner + runner *manager.Runner + + // signals is a lookup map from the string representation of a signal to its + // actual signal + signals map[string]os.Signal + + // shutdownCh is used to signal and started goroutine to shutdown + shutdownCh chan struct{} + + // shutdown marks whether the manager has been shutdown + shutdown bool + shutdownLock sync.Mutex +} + +// TaskTemplateManagerConfig is used to configure an instance of the +// TaskTemplateManager +type TaskTemplateManagerConfig struct { + // UnblockCh is closed when the template has been rendered + UnblockCh chan struct{} + + // Lifecycle is used to interact with the task the template manager is being + // run for + Lifecycle interfaces.TaskLifecycle + + // Events is used to emit events for the task + Events interfaces.EventEmitter + + // Templates is the set of templates we are managing + Templates []*structs.Template + + // ClientConfig is the Nomad Client configuration + ClientConfig *config.Config + + // VaultToken is the Vault token for the task. + VaultToken string + + // TaskDir is the task's directory + TaskDir string + + // EnvBuilder is the environment variable builder for the task. + EnvBuilder *env.Builder + + // MaxTemplateEventRate is the maximum rate at which we should emit events. + MaxTemplateEventRate time.Duration + + // retryRate is only used for testing and is used to increase the retry rate + retryRate time.Duration +} + +// Validate validates the configuration. +func (c *TaskTemplateManagerConfig) Validate() error { + if c == nil { + return fmt.Errorf("Nil config passed") + } else if c.UnblockCh == nil { + return fmt.Errorf("Invalid unblock channel given") + } else if c.Lifecycle == nil { + return fmt.Errorf("Invalid lifecycle hooks given") + } else if c.Events == nil { + return fmt.Errorf("Invalid event hook given") + } else if c.ClientConfig == nil { + return fmt.Errorf("Invalid client config given") + } else if c.TaskDir == "" { + return fmt.Errorf("Invalid task directory given: %q", c.TaskDir) + } else if c.EnvBuilder == nil { + return fmt.Errorf("Invalid task environment given") + } else if c.MaxTemplateEventRate == 0 { + return fmt.Errorf("Invalid max template event rate given") + } + + return nil +} + +func NewTaskTemplateManager(config *TaskTemplateManagerConfig) (*TaskTemplateManager, error) { + // Check pre-conditions + if err := config.Validate(); err != nil { + return nil, err + } + + tm := &TaskTemplateManager{ + config: config, + shutdownCh: make(chan struct{}), + } + + // Parse the signals that we need + for _, tmpl := range config.Templates { + if tmpl.ChangeSignal == "" { + continue + } + + sig, err := signals.Parse(tmpl.ChangeSignal) + if err != nil { + return nil, fmt.Errorf("Failed to parse signal %q", tmpl.ChangeSignal) + } + + if tm.signals == nil { + tm.signals = make(map[string]os.Signal) + } + + tm.signals[tmpl.ChangeSignal] = sig + } + + // Build the consul-template runner + runner, lookup, err := templateRunner(config) + if err != nil { + return nil, err + } + tm.runner = runner + tm.lookup = lookup + + go tm.run() + return tm, nil +} + +// Stop is used to stop the consul-template runner +func (tm *TaskTemplateManager) Stop() { + tm.shutdownLock.Lock() + defer tm.shutdownLock.Unlock() + + if tm.shutdown { + return + } + + close(tm.shutdownCh) + tm.shutdown = true + + // Stop the consul-template runner + if tm.runner != nil { + tm.runner.Stop() + } +} + +// run is the long lived loop that handles errors and templates being rendered +func (tm *TaskTemplateManager) run() { + // Runner is nil if there is no templates + if tm.runner == nil { + // Unblock the start if there is nothing to do + close(tm.config.UnblockCh) + return + } + + // Start the runner + go tm.runner.Start() + + // Block till all the templates have been rendered + tm.handleFirstRender() + + // Detect if there was a shutdown. + select { + case <-tm.shutdownCh: + return + default: + } + + // Read environment variables from env templates before we unblock + envMap, err := loadTemplateEnv(tm.config.Templates, tm.config.TaskDir) + if err != nil { + tm.config.Lifecycle.Kill(consulTemplateSourceName, err.Error(), true) + return + } + tm.config.EnvBuilder.SetTemplateEnv(envMap) + + // Unblock the task + close(tm.config.UnblockCh) + + // If all our templates are change mode no-op, then we can exit here + if tm.allTemplatesNoop() { + return + } + + // handle all subsequent render events. + tm.handleTemplateRerenders(time.Now()) +} + +// handleFirstRender blocks till all templates have been rendered +func (tm *TaskTemplateManager) handleFirstRender() { + // missingDependencies is the set of missing dependencies. + var missingDependencies map[string]struct{} + + // eventTimer is used to trigger the firing of an event showing the missing + // dependencies. + eventTimer := time.NewTimer(tm.config.MaxTemplateEventRate) + if !eventTimer.Stop() { + <-eventTimer.C + } + + // outstandingEvent tracks whether there is an outstanding event that should + // be fired. + outstandingEvent := false + + // Wait till all the templates have been rendered +WAIT: + for { + select { + case <-tm.shutdownCh: + return + case err, ok := <-tm.runner.ErrCh: + if !ok { + continue + } + + tm.config.Lifecycle.Kill(consulTemplateSourceName, err.Error(), true) + case <-tm.runner.TemplateRenderedCh(): + // A template has been rendered, figure out what to do + events := tm.runner.RenderEvents() + + // Not all templates have been rendered yet + if len(events) < len(tm.lookup) { + continue + } + + for _, event := range events { + // This template hasn't been rendered + if event.LastWouldRender.IsZero() { + continue WAIT + } + } + + break WAIT + case <-tm.runner.RenderEventCh(): + events := tm.runner.RenderEvents() + joinedSet := make(map[string]struct{}) + for _, event := range events { + missing := event.MissingDeps + if missing == nil { + continue + } + + for _, dep := range missing.List() { + joinedSet[dep.String()] = struct{}{} + } + } + + // Check to see if the new joined set is the same as the old + different := len(joinedSet) != len(missingDependencies) + if !different { + for k := range joinedSet { + if _, ok := missingDependencies[k]; !ok { + different = true + break + } + } + } + + // Nothing to do + if !different { + continue + } + + // Update the missing set + missingDependencies = joinedSet + + // Update the event timer channel + if !outstandingEvent { + // We got new data so reset + outstandingEvent = true + eventTimer.Reset(tm.config.MaxTemplateEventRate) + } + case <-eventTimer.C: + if missingDependencies == nil { + continue + } + + // Clear the outstanding event + outstandingEvent = false + + // Build the missing set + missingSlice := make([]string, 0, len(missingDependencies)) + for k := range missingDependencies { + missingSlice = append(missingSlice, k) + } + sort.Strings(missingSlice) + + if l := len(missingSlice); l > missingDepEventLimit { + missingSlice[missingDepEventLimit] = fmt.Sprintf("and %d more", l-missingDepEventLimit) + missingSlice = missingSlice[:missingDepEventLimit+1] + } + + missingStr := strings.Join(missingSlice, ", ") + tm.config.Events.EmitEvent(consulTemplateSourceName, fmt.Sprintf("Missing: %s", missingStr)) + } + } +} + +// handleTemplateRerenders is used to handle template render events after they +// have all rendered. It takes action based on which set of templates re-render. +// The passed allRenderedTime is the time at which all templates have rendered. +// This is used to avoid signaling the task for any render event before hand. +func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time) { + // A lookup for the last time the template was handled + handledRenders := make(map[string]time.Time, len(tm.config.Templates)) + + for { + select { + case <-tm.shutdownCh: + return + case err, ok := <-tm.runner.ErrCh: + if !ok { + continue + } + + tm.config.Lifecycle.Kill(consulTemplateSourceName, err.Error(), true) + case <-tm.runner.TemplateRenderedCh(): + // A template has been rendered, figure out what to do + var handling []string + signals := make(map[string]struct{}) + restart := false + var splay time.Duration + + events := tm.runner.RenderEvents() + for id, event := range events { + + // First time through + if allRenderedTime.After(event.LastDidRender) || allRenderedTime.Equal(event.LastDidRender) { + handledRenders[id] = allRenderedTime + continue + } + + // We have already handled this one + if htime := handledRenders[id]; htime.After(event.LastDidRender) || htime.Equal(event.LastDidRender) { + continue + } + + // Lookup the template and determine what to do + tmpls, ok := tm.lookup[id] + if !ok { + tm.config.Lifecycle.Kill(consulTemplateSourceName, fmt.Sprintf("template runner returned unknown template id %q", id), true) + return + } + + // Read environment variables from templates + envMap, err := loadTemplateEnv(tm.config.Templates, tm.config.TaskDir) + if err != nil { + tm.config.Lifecycle.Kill(consulTemplateSourceName, err.Error(), true) + return + } + tm.config.EnvBuilder.SetTemplateEnv(envMap) + + for _, tmpl := range tmpls { + switch tmpl.ChangeMode { + case structs.TemplateChangeModeSignal: + signals[tmpl.ChangeSignal] = struct{}{} + case structs.TemplateChangeModeRestart: + restart = true + case structs.TemplateChangeModeNoop: + continue + } + + if tmpl.Splay > splay { + splay = tmpl.Splay + } + } + + handling = append(handling, id) + } + + if restart || len(signals) != 0 { + if splay != 0 { + ns := splay.Nanoseconds() + offset := rand.Int63n(ns) + t := time.Duration(offset) + + select { + case <-time.After(t): + case <-tm.shutdownCh: + return + } + } + + // Update handle time + for _, id := range handling { + handledRenders[id] = events[id].LastDidRender + } + + if restart { + const failure = false + tm.config.Lifecycle.Restart(consulTemplateSourceName, "template with change_mode restart re-rendered", failure) + } else if len(signals) != 0 { + var mErr multierror.Error + for signal := range signals { + err := tm.config.Lifecycle.Signal(consulTemplateSourceName, "template re-rendered", tm.signals[signal]) + if err != nil { + multierror.Append(&mErr, err) + } + } + + if err := mErr.ErrorOrNil(); err != nil { + flat := make([]os.Signal, 0, len(signals)) + for signal := range signals { + flat = append(flat, tm.signals[signal]) + } + tm.config.Lifecycle.Kill(consulTemplateSourceName, fmt.Sprintf("Sending signals %v failed: %v", flat, err), true) + } + } + } + } + } +} + +// allTemplatesNoop returns whether all the managed templates have change mode noop. +func (tm *TaskTemplateManager) allTemplatesNoop() bool { + for _, tmpl := range tm.config.Templates { + if tmpl.ChangeMode != structs.TemplateChangeModeNoop { + return false + } + } + + return true +} + +// templateRunner returns a consul-template runner for the given templates and a +// lookup by destination to the template. If no templates are in the config, a +// nil template runner and lookup is returned. +func templateRunner(config *TaskTemplateManagerConfig) ( + *manager.Runner, map[string][]*structs.Template, error) { + + if len(config.Templates) == 0 { + return nil, nil, nil + } + + // Parse the templates + ctmplMapping, err := parseTemplateConfigs(config) + if err != nil { + return nil, nil, err + } + + // Create the runner configuration. + runnerConfig, err := newRunnerConfig(config, ctmplMapping) + if err != nil { + return nil, nil, err + } + + runner, err := manager.NewRunner(runnerConfig, false, false) + if err != nil { + return nil, nil, err + } + + // Set Nomad's environment variables + runner.Env = config.EnvBuilder.Build().All() + + // Build the lookup + idMap := runner.TemplateConfigMapping() + lookup := make(map[string][]*structs.Template, len(idMap)) + for id, ctmpls := range idMap { + for _, ctmpl := range ctmpls { + templates := lookup[id] + templates = append(templates, ctmplMapping[ctmpl]) + lookup[id] = templates + } + } + + return runner, lookup, nil +} + +// parseTemplateConfigs converts the tasks templates in the config into +// consul-templates +func parseTemplateConfigs(config *TaskTemplateManagerConfig) (map[ctconf.TemplateConfig]*structs.Template, error) { + allowAbs := config.ClientConfig.ReadBoolDefault(hostSrcOption, true) + taskEnv := config.EnvBuilder.Build() + + ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(config.Templates)) + for _, tmpl := range config.Templates { + var src, dest string + if tmpl.SourcePath != "" { + if filepath.IsAbs(tmpl.SourcePath) { + if !allowAbs { + return nil, fmt.Errorf("Specifying absolute template paths disallowed by client config: %q", tmpl.SourcePath) + } + + src = tmpl.SourcePath + } else { + src = filepath.Join(config.TaskDir, taskEnv.ReplaceEnv(tmpl.SourcePath)) + } + } + if tmpl.DestPath != "" { + dest = filepath.Join(config.TaskDir, taskEnv.ReplaceEnv(tmpl.DestPath)) + } + + ct := ctconf.DefaultTemplateConfig() + ct.Source = &src + ct.Destination = &dest + ct.Contents = &tmpl.EmbeddedTmpl + ct.LeftDelim = &tmpl.LeftDelim + ct.RightDelim = &tmpl.RightDelim + + // Set the permissions + if tmpl.Perms != "" { + v, err := strconv.ParseUint(tmpl.Perms, 8, 12) + if err != nil { + return nil, fmt.Errorf("Failed to parse %q as octal: %v", tmpl.Perms, err) + } + m := os.FileMode(v) + ct.Perms = &m + } + ct.Finalize() + + ctmpls[*ct] = tmpl + } + + return ctmpls, nil +} + +// newRunnerConfig returns a consul-template runner configuration, setting the +// Vault and Consul configurations based on the clients configs. +func newRunnerConfig(config *TaskTemplateManagerConfig, + templateMapping map[ctconf.TemplateConfig]*structs.Template) (*ctconf.Config, error) { + + cc := config.ClientConfig + conf := ctconf.DefaultConfig() + + // Gather the consul-template templates + flat := ctconf.TemplateConfigs(make([]*ctconf.TemplateConfig, 0, len(templateMapping))) + for ctmpl := range templateMapping { + local := ctmpl + flat = append(flat, &local) + } + conf.Templates = &flat + + // Go through the templates and determine the minimum Vault grace + vaultGrace := time.Duration(-1) + for _, tmpl := range templateMapping { + // Initial condition + if vaultGrace < 0 { + vaultGrace = tmpl.VaultGrace + } else if tmpl.VaultGrace < vaultGrace { + vaultGrace = tmpl.VaultGrace + } + } + + // Force faster retries + if config.retryRate != 0 { + rate := config.retryRate + conf.Consul.Retry.Backoff = &rate + } + + // Setup the Consul config + if cc.ConsulConfig != nil { + conf.Consul.Address = &cc.ConsulConfig.Addr + conf.Consul.Token = &cc.ConsulConfig.Token + + if cc.ConsulConfig.EnableSSL != nil && *cc.ConsulConfig.EnableSSL { + verify := cc.ConsulConfig.VerifySSL != nil && *cc.ConsulConfig.VerifySSL + conf.Consul.SSL = &ctconf.SSLConfig{ + Enabled: helper.BoolToPtr(true), + Verify: &verify, + Cert: &cc.ConsulConfig.CertFile, + Key: &cc.ConsulConfig.KeyFile, + CaCert: &cc.ConsulConfig.CAFile, + } + } + + if cc.ConsulConfig.Auth != "" { + parts := strings.SplitN(cc.ConsulConfig.Auth, ":", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("Failed to parse Consul Auth config") + } + + conf.Consul.Auth = &ctconf.AuthConfig{ + Enabled: helper.BoolToPtr(true), + Username: &parts[0], + Password: &parts[1], + } + } + } + + // Setup the Vault config + // Always set these to ensure nothing is picked up from the environment + emptyStr := "" + conf.Vault.RenewToken = helper.BoolToPtr(false) + conf.Vault.Token = &emptyStr + if cc.VaultConfig != nil && cc.VaultConfig.IsEnabled() { + conf.Vault.Address = &cc.VaultConfig.Addr + conf.Vault.Token = &config.VaultToken + conf.Vault.Grace = helper.TimeToPtr(vaultGrace) + + if strings.HasPrefix(cc.VaultConfig.Addr, "https") || cc.VaultConfig.TLSCertFile != "" { + skipVerify := cc.VaultConfig.TLSSkipVerify != nil && *cc.VaultConfig.TLSSkipVerify + verify := !skipVerify + conf.Vault.SSL = &ctconf.SSLConfig{ + Enabled: helper.BoolToPtr(true), + Verify: &verify, + Cert: &cc.VaultConfig.TLSCertFile, + Key: &cc.VaultConfig.TLSKeyFile, + CaCert: &cc.VaultConfig.TLSCaFile, + CaPath: &cc.VaultConfig.TLSCaPath, + ServerName: &cc.VaultConfig.TLSServerName, + } + } else { + conf.Vault.SSL = &ctconf.SSLConfig{ + Enabled: helper.BoolToPtr(false), + Verify: helper.BoolToPtr(false), + Cert: &emptyStr, + Key: &emptyStr, + CaCert: &emptyStr, + CaPath: &emptyStr, + ServerName: &emptyStr, + } + } + } + + conf.Finalize() + return conf, nil +} + +// loadTemplateEnv loads task environment variables from all templates. +func loadTemplateEnv(tmpls []*structs.Template, taskDir string) (map[string]string, error) { + all := make(map[string]string, 50) + for _, t := range tmpls { + if !t.Envvars { + continue + } + f, err := os.Open(filepath.Join(taskDir, t.DestPath)) + if err != nil { + return nil, fmt.Errorf("error opening env template: %v", err) + } + defer f.Close() + + // Parse environment fil + vars, err := envparse.Parse(f) + if err != nil { + return nil, fmt.Errorf("error parsing env template %q: %v", t.DestPath, err) + } + for k, v := range vars { + all[k] = v + } + } + return all, nil +} diff --git a/client/allocrunnerv2/taskrunner/template/template_test.go b/client/allocrunnerv2/taskrunner/template/template_test.go new file mode 100644 index 000000000..102f6e745 --- /dev/null +++ b/client/allocrunnerv2/taskrunner/template/template_test.go @@ -0,0 +1,1342 @@ +package template + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + "time" + + ctestutil "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/env" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + sconfig "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + // TestTaskName is the name of the injected task. It should appear in the + // environment variable $NOMAD_TASK_NAME + TestTaskName = "test-task" +) + +// MockTaskHooks is a mock of the TaskHooks interface useful for testing +type MockTaskHooks struct { + Restarts int + RestartCh chan struct{} + + Signals []os.Signal + SignalCh chan struct{} + + // SignalError is returned when Signal is called on the mock hook + SignalError error + + UnblockCh chan struct{} + + KillReason string + KillCh chan struct{} + + Events []string + EmitEventCh chan struct{} +} + +func NewMockTaskHooks() *MockTaskHooks { + return &MockTaskHooks{ + UnblockCh: make(chan struct{}, 1), + RestartCh: make(chan struct{}, 1), + SignalCh: make(chan struct{}, 1), + KillCh: make(chan struct{}, 1), + EmitEventCh: make(chan struct{}, 1), + } +} +func (m *MockTaskHooks) Restart(source, reason string, failure bool) { + m.Restarts++ + select { + case m.RestartCh <- struct{}{}: + default: + } +} + +func (m *MockTaskHooks) Signal(source, reason string, s os.Signal) error { + m.Signals = append(m.Signals, s) + select { + case m.SignalCh <- struct{}{}: + default: + } + + return m.SignalError +} + +func (m *MockTaskHooks) Kill(source, reason string, fail bool) { + m.KillReason = reason + select { + case m.KillCh <- struct{}{}: + default: + } +} + +func (m *MockTaskHooks) EmitEvent(source, message string) { + m.Events = append(m.Events, message) + select { + case m.EmitEventCh <- struct{}{}: + default: + } +} + +func (m *MockTaskHooks) SetState(state string, event *structs.TaskEvent) {} + +// testHarness is used to test the TaskTemplateManager by spinning up +// Consul/Vault as needed +type testHarness struct { + manager *TaskTemplateManager + mockHooks *MockTaskHooks + templates []*structs.Template + envBuilder *env.Builder + node *structs.Node + config *config.Config + vaultToken string + taskDir string + vault *testutil.TestVault + consul *ctestutil.TestServer + emitRate time.Duration +} + +// newTestHarness returns a harness starting a dev consul and vault server, +// building the appropriate config and creating a TaskTemplateManager +func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault bool) *testHarness { + region := "global" + harness := &testHarness{ + mockHooks: NewMockTaskHooks(), + templates: templates, + node: mock.Node(), + config: &config.Config{Region: region}, + emitRate: DefaultMaxTemplateEventRate, + } + + // Build the task environment + a := mock.Alloc() + task := a.Job.TaskGroups[0].Tasks[0] + task.Name = TestTaskName + harness.envBuilder = env.NewBuilder(harness.node, a, task, region) + + // Make a tempdir + d, err := ioutil.TempDir("", "ct_test") + if err != nil { + t.Fatalf("Failed to make tmpdir: %v", err) + } + harness.taskDir = d + + if consul { + harness.consul, err = ctestutil.NewTestServer() + if err != nil { + t.Fatalf("error starting test Consul server: %v", err) + } + harness.config.ConsulConfig = &sconfig.ConsulConfig{ + Addr: harness.consul.HTTPAddr, + } + } + + if vault { + harness.vault = testutil.NewTestVault(t) + harness.config.VaultConfig = harness.vault.Config + harness.vaultToken = harness.vault.RootToken + } + + return harness +} + +func (h *testHarness) start(t *testing.T) { + if err := h.startWithErr(); err != nil { + t.Fatalf("failed to build task template manager: %v", err) + } +} + +func (h *testHarness) startWithErr() error { + var err error + h.manager, err = NewTaskTemplateManager(&TaskTemplateManagerConfig{ + UnblockCh: h.mockHooks.UnblockCh, + Lifecycle: h.mockHooks, + Events: h.mockHooks, + Templates: h.templates, + ClientConfig: h.config, + VaultToken: h.vaultToken, + TaskDir: h.taskDir, + EnvBuilder: h.envBuilder, + MaxTemplateEventRate: h.emitRate, + retryRate: 10 * time.Millisecond, + }) + + return err +} + +func (h *testHarness) setEmitRate(d time.Duration) { + h.emitRate = d +} + +// stop is used to stop any running Vault or Consul server plus the task manager +func (h *testHarness) stop() { + if h.vault != nil { + h.vault.Stop() + } + if h.consul != nil { + h.consul.Stop() + } + if h.manager != nil { + h.manager.Stop() + } + if h.taskDir != "" { + os.RemoveAll(h.taskDir) + } +} + +func TestTaskTemplateManager_InvalidConfig(t *testing.T) { + t.Parallel() + hooks := NewMockTaskHooks() + clientConfig := &config.Config{Region: "global"} + taskDir := "foo" + a := mock.Alloc() + envBuilder := env.NewBuilder(mock.Node(), a, a.Job.TaskGroups[0].Tasks[0], clientConfig.Region) + + cases := []struct { + name string + config *TaskTemplateManagerConfig + expectedErr string + }{ + { + name: "nil config", + config: nil, + expectedErr: "Nil config passed", + }, + { + name: "bad lifecycle hooks", + config: &TaskTemplateManagerConfig{ + UnblockCh: hooks.UnblockCh, + Events: hooks, + ClientConfig: clientConfig, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "lifecycle hooks", + }, + { + name: "bad event hooks", + config: &TaskTemplateManagerConfig{ + UnblockCh: hooks.UnblockCh, + Lifecycle: hooks, + ClientConfig: clientConfig, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "event hook", + }, + { + name: "bad client config", + config: &TaskTemplateManagerConfig{ + UnblockCh: hooks.UnblockCh, + Lifecycle: hooks, + Events: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "client config", + }, + { + name: "bad task dir", + config: &TaskTemplateManagerConfig{ + UnblockCh: hooks.UnblockCh, + ClientConfig: clientConfig, + Lifecycle: hooks, + Events: hooks, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "task directory", + }, + { + name: "bad env builder", + config: &TaskTemplateManagerConfig{ + UnblockCh: hooks.UnblockCh, + ClientConfig: clientConfig, + Lifecycle: hooks, + Events: hooks, + TaskDir: taskDir, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "task environment", + }, + { + name: "bad max event rate", + config: &TaskTemplateManagerConfig{ + UnblockCh: hooks.UnblockCh, + ClientConfig: clientConfig, + Lifecycle: hooks, + Events: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + }, + expectedErr: "template event rate", + }, + { + name: "valid", + config: &TaskTemplateManagerConfig{ + UnblockCh: hooks.UnblockCh, + ClientConfig: clientConfig, + Lifecycle: hooks, + Events: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + }, + { + name: "invalid signal", + config: &TaskTemplateManagerConfig{ + UnblockCh: hooks.UnblockCh, + Templates: []*structs.Template{ + { + DestPath: "foo", + EmbeddedTmpl: "hello, world", + ChangeMode: structs.TemplateChangeModeSignal, + ChangeSignal: "foobarbaz", + }, + }, + ClientConfig: clientConfig, + Lifecycle: hooks, + Events: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "parse signal", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + _, err := NewTaskTemplateManager(c.config) + if err != nil { + if c.expectedErr == "" { + t.Fatalf("unexpected error: %v", err) + } else if !strings.Contains(err.Error(), c.expectedErr) { + t.Fatalf("expected error to contain %q; got %q", c.expectedErr, err.Error()) + } + } else if c.expectedErr != "" { + t.Fatalf("expected an error to contain %q", c.expectedErr) + } + }) + } +} + +func TestTaskTemplateManager_HostPath(t *testing.T) { + t.Parallel() + // Make a template that will render immediately and write it to a tmp file + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("Bad: %v", err) + } + defer f.Close() + defer os.Remove(f.Name()) + + content := "hello, world!" + if _, err := io.WriteString(f, content); err != nil { + t.Fatalf("Bad: %v", err) + } + + file := "my.tmpl" + template := &structs.Template{ + SourcePath: f.Name(), + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, false, false) + harness.start(t) + defer harness.stop() + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Check the file is there + path := filepath.Join(harness.taskDir, file) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content { + t.Fatalf("Unexpected template data; got %q, want %q", s, content) + } + + // Change the config to disallow host sources + harness = newTestHarness(t, []*structs.Template{template}, false, false) + harness.config.Options = map[string]string{ + hostSrcOption: "false", + } + if err := harness.startWithErr(); err == nil || !strings.Contains(err.Error(), "absolute") { + t.Fatalf("Expected absolute template path disallowed: %v", err) + } +} + +func TestTaskTemplateManager_Unblock_Static(t *testing.T) { + t.Parallel() + // Make a template that will render immediately + content := "hello, world!" + file := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: content, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, false, false) + harness.start(t) + defer harness.stop() + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Check the file is there + path := filepath.Join(harness.taskDir, file) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content { + t.Fatalf("Unexpected template data; got %q, want %q", s, content) + } +} + +func TestTaskTemplateManager_Permissions(t *testing.T) { + t.Parallel() + // Make a template that will render immediately + content := "hello, world!" + file := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: content, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + Perms: "777", + } + + harness := newTestHarness(t, []*structs.Template{template}, false, false) + harness.start(t) + defer harness.stop() + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Check the file is there + path := filepath.Join(harness.taskDir, file) + fi, err := os.Stat(path) + if err != nil { + t.Fatalf("Failed to stat file: %v", err) + } + + if m := fi.Mode(); m != os.ModePerm { + t.Fatalf("Got mode %v; want %v", m, os.ModePerm) + } +} + +func TestTaskTemplateManager_Unblock_Static_NomadEnv(t *testing.T) { + t.Parallel() + // Make a template that will render immediately + content := `Hello Nomad Task: {{env "NOMAD_TASK_NAME"}}` + expected := fmt.Sprintf("Hello Nomad Task: %s", TestTaskName) + file := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: content, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, false, false) + harness.start(t) + defer harness.stop() + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Check the file is there + path := filepath.Join(harness.taskDir, file) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != expected { + t.Fatalf("Unexpected template data; got %q, want %q", s, expected) + } +} + +func TestTaskTemplateManager_Unblock_Static_AlreadyRendered(t *testing.T) { + t.Parallel() + // Make a template that will render immediately + content := "hello, world!" + file := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: content, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, false, false) + + // Write the contents + path := filepath.Join(harness.taskDir, file) + if err := ioutil.WriteFile(path, []byte(content), 0777); err != nil { + t.Fatalf("Failed to write data: %v", err) + } + + harness.start(t) + defer harness.stop() + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Check the file is there + path = filepath.Join(harness.taskDir, file) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content { + t.Fatalf("Unexpected template data; got %q, want %q", s, content) + } +} + +func TestTaskTemplateManager_Unblock_Consul(t *testing.T) { + t.Parallel() + // Make a template that will render based on a key in Consul + key := "foo" + content := "barbaz" + embedded := fmt.Sprintf(`{{key "%s"}}`, key) + file := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: embedded, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, true, false) + harness.start(t) + defer harness.stop() + + // Ensure no unblock + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + } + + // Write the key to Consul + harness.consul.SetKV(t, key, []byte(content)) + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Check the file is there + path := filepath.Join(harness.taskDir, file) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content { + t.Fatalf("Unexpected template data; got %q, want %q", s, content) + } +} + +func TestTaskTemplateManager_Unblock_Vault(t *testing.T) { + t.Parallel() + require := require.New(t) + // Make a template that will render based on a key in Vault + vaultPath := "secret/data/password" + key := "password" + content := "barbaz" + embedded := fmt.Sprintf(`{{with secret "%s"}}{{.Data.data.%s}}{{end}}`, vaultPath, key) + file := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: embedded, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, false, true) + harness.start(t) + defer harness.stop() + + // Ensure no unblock + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should not have been called") + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + } + + // Write the secret to Vault + logical := harness.vault.Client.Logical() + _, err := logical.Write(vaultPath, map[string]interface{}{"data": map[string]interface{}{key: content}}) + require.NoError(err) + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Check the file is there + path := filepath.Join(harness.taskDir, file) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content { + t.Fatalf("Unexpected template data; got %q, want %q", s, content) + } +} + +func TestTaskTemplateManager_Unblock_Multi_Template(t *testing.T) { + t.Parallel() + // Make a template that will render immediately + staticContent := "hello, world!" + staticFile := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: staticContent, + DestPath: staticFile, + ChangeMode: structs.TemplateChangeModeNoop, + } + + // Make a template that will render based on a key in Consul + consulKey := "foo" + consulContent := "barbaz" + consulEmbedded := fmt.Sprintf(`{{key "%s"}}`, consulKey) + consulFile := "consul.tmpl" + template2 := &structs.Template{ + EmbeddedTmpl: consulEmbedded, + DestPath: consulFile, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template, template2}, true, false) + harness.start(t) + defer harness.stop() + + // Ensure no unblock + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + } + + // Check that the static file has been rendered + path := filepath.Join(harness.taskDir, staticFile) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != staticContent { + t.Fatalf("Unexpected template data; got %q, want %q", s, staticContent) + } + + // Write the key to Consul + harness.consul.SetKV(t, consulKey, []byte(consulContent)) + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Check the consul file is there + path = filepath.Join(harness.taskDir, consulFile) + raw, err = ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != consulContent { + t.Fatalf("Unexpected template data; got %q, want %q", s, consulContent) + } +} + +func TestTaskTemplateManager_Rerender_Noop(t *testing.T) { + t.Parallel() + // Make a template that will render based on a key in Consul + key := "foo" + content1 := "bar" + content2 := "baz" + embedded := fmt.Sprintf(`{{key "%s"}}`, key) + file := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: embedded, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, true, false) + harness.start(t) + defer harness.stop() + + // Ensure no unblock + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + } + + // Write the key to Consul + harness.consul.SetKV(t, key, []byte(content1)) + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Check the file is there + path := filepath.Join(harness.taskDir, file) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content1 { + t.Fatalf("Unexpected template data; got %q, want %q", s, content1) + } + + // Update the key in Consul + harness.consul.SetKV(t, key, []byte(content2)) + + select { + case <-harness.mockHooks.RestartCh: + t.Fatalf("Noop ignored: %+v", harness.mockHooks) + case <-harness.mockHooks.SignalCh: + t.Fatalf("Noop ignored: %+v", harness.mockHooks) + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + } + + // Check the file has been updated + path = filepath.Join(harness.taskDir, file) + raw, err = ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content2 { + t.Fatalf("Unexpected template data; got %q, want %q", s, content2) + } +} + +func TestTaskTemplateManager_Rerender_Signal(t *testing.T) { + t.Parallel() + // Make a template that renders based on a key in Consul and sends SIGALRM + key1 := "foo" + content1_1 := "bar" + content1_2 := "baz" + embedded1 := fmt.Sprintf(`{{key "%s"}}`, key1) + file1 := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: embedded1, + DestPath: file1, + ChangeMode: structs.TemplateChangeModeSignal, + ChangeSignal: "SIGALRM", + } + + // Make a template that renders based on a key in Consul and sends SIGBUS + key2 := "bam" + content2_1 := "cat" + content2_2 := "dog" + embedded2 := fmt.Sprintf(`{{key "%s"}}`, key2) + file2 := "my-second.tmpl" + template2 := &structs.Template{ + EmbeddedTmpl: embedded2, + DestPath: file2, + ChangeMode: structs.TemplateChangeModeSignal, + ChangeSignal: "SIGBUS", + } + + harness := newTestHarness(t, []*structs.Template{template, template2}, true, false) + harness.start(t) + defer harness.stop() + + // Ensure no unblock + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + } + + // Write the key to Consul + harness.consul.SetKV(t, key1, []byte(content1_1)) + harness.consul.SetKV(t, key2, []byte(content2_1)) + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + if len(harness.mockHooks.Signals) != 0 { + t.Fatalf("Should not have received any signals: %+v", harness.mockHooks) + } + + // Update the keys in Consul + harness.consul.SetKV(t, key1, []byte(content1_2)) + harness.consul.SetKV(t, key2, []byte(content2_2)) + + // Wait for signals + timeout := time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second) +OUTER: + for { + select { + case <-harness.mockHooks.RestartCh: + t.Fatalf("Restart with signal policy: %+v", harness.mockHooks) + case <-harness.mockHooks.SignalCh: + if len(harness.mockHooks.Signals) != 2 { + continue + } + break OUTER + case <-timeout: + t.Fatalf("Should have received two signals: %+v", harness.mockHooks) + } + } + + // Check the files have been updated + path := filepath.Join(harness.taskDir, file1) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content1_2 { + t.Fatalf("Unexpected template data; got %q, want %q", s, content1_2) + } + + path = filepath.Join(harness.taskDir, file2) + raw, err = ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content2_2 { + t.Fatalf("Unexpected template data; got %q, want %q", s, content2_2) + } +} + +func TestTaskTemplateManager_Rerender_Restart(t *testing.T) { + t.Parallel() + // Make a template that renders based on a key in Consul and sends restart + key1 := "bam" + content1_1 := "cat" + content1_2 := "dog" + embedded1 := fmt.Sprintf(`{{key "%s"}}`, key1) + file1 := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: embedded1, + DestPath: file1, + ChangeMode: structs.TemplateChangeModeRestart, + } + + harness := newTestHarness(t, []*structs.Template{template}, true, false) + harness.start(t) + defer harness.stop() + + // Ensure no unblock + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + } + + // Write the key to Consul + harness.consul.SetKV(t, key1, []byte(content1_1)) + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Update the keys in Consul + harness.consul.SetKV(t, key1, []byte(content1_2)) + + // Wait for restart + timeout := time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second) +OUTER: + for { + select { + case <-harness.mockHooks.RestartCh: + break OUTER + case <-harness.mockHooks.SignalCh: + t.Fatalf("Signal with restart policy: %+v", harness.mockHooks) + case <-timeout: + t.Fatalf("Should have received a restart: %+v", harness.mockHooks) + } + } + + // Check the files have been updated + path := filepath.Join(harness.taskDir, file1) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content1_2 { + t.Fatalf("Unexpected template data; got %q, want %q", s, content1_2) + } +} + +func TestTaskTemplateManager_Interpolate_Destination(t *testing.T) { + t.Parallel() + // Make a template that will have its destination interpolated + content := "hello, world!" + file := "${node.unique.id}.tmpl" + template := &structs.Template{ + EmbeddedTmpl: content, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, false, false) + harness.start(t) + defer harness.stop() + + // Ensure unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + // Check the file is there + actual := fmt.Sprintf("%s.tmpl", harness.node.ID) + path := filepath.Join(harness.taskDir, actual) + raw, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("Failed to read rendered template from %q: %v", path, err) + } + + if s := string(raw); s != content { + t.Fatalf("Unexpected template data; got %q, want %q", s, content) + } +} + +func TestTaskTemplateManager_Signal_Error(t *testing.T) { + t.Parallel() + // Make a template that renders based on a key in Consul and sends SIGALRM + key1 := "foo" + content1 := "bar" + content2 := "baz" + embedded1 := fmt.Sprintf(`{{key "%s"}}`, key1) + file1 := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: embedded1, + DestPath: file1, + ChangeMode: structs.TemplateChangeModeSignal, + ChangeSignal: "SIGALRM", + } + + harness := newTestHarness(t, []*structs.Template{template}, true, false) + harness.start(t) + defer harness.stop() + + harness.mockHooks.SignalError = fmt.Errorf("test error") + + // Write the key to Consul + harness.consul.SetKV(t, key1, []byte(content1)) + + // Wait a little + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(2*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Should have received unblock: %+v", harness.mockHooks) + } + + // Write the key to Consul + harness.consul.SetKV(t, key1, []byte(content2)) + + // Wait for kill channel + select { + case <-harness.mockHooks.KillCh: + break + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Should have received a signals: %+v", harness.mockHooks) + } + + if !strings.Contains(harness.mockHooks.KillReason, "Sending signals") { + t.Fatalf("Unexpected error: %v", harness.mockHooks.KillReason) + } +} + +// TestTaskTemplateManager_Env asserts templates with the env flag set are read +// into the task's environment. +func TestTaskTemplateManager_Env(t *testing.T) { + t.Parallel() + template := &structs.Template{ + EmbeddedTmpl: ` +# Comment lines are ok + +FOO=bar +foo=123 +ANYTHING_goes=Spaces are=ok! +`, + DestPath: "test.env", + ChangeMode: structs.TemplateChangeModeNoop, + Envvars: true, + } + harness := newTestHarness(t, []*structs.Template{template}, true, false) + harness.start(t) + defer harness.stop() + + // Wait a little + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(2*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Should have received unblock: %+v", harness.mockHooks) + } + + // Validate environment + env := harness.envBuilder.Build().Map() + if len(env) < 3 { + t.Fatalf("expected at least 3 env vars but found %d:\n%#v\n", len(env), env) + } + if env["FOO"] != "bar" { + t.Errorf("expected FOO=bar but found %q", env["FOO"]) + } + if env["foo"] != "123" { + t.Errorf("expected foo=123 but found %q", env["foo"]) + } + if env["ANYTHING_goes"] != "Spaces are=ok!" { + t.Errorf("expected ANYTHING_GOES='Spaces are ok!' but found %q", env["ANYTHING_goes"]) + } +} + +// TestTaskTemplateManager_Env_Missing asserts the core env +// template processing function returns errors when files don't exist +func TestTaskTemplateManager_Env_Missing(t *testing.T) { + t.Parallel() + d, err := ioutil.TempDir("", "ct_env_missing") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(d) + + // Fake writing the file so we don't have to run the whole template manager + err = ioutil.WriteFile(filepath.Join(d, "exists.env"), []byte("FOO=bar\n"), 0644) + if err != nil { + t.Fatalf("error writing template file: %v", err) + } + + templates := []*structs.Template{ + { + EmbeddedTmpl: "FOO=bar\n", + DestPath: "exists.env", + Envvars: true, + }, + { + EmbeddedTmpl: "WHAT=ever\n", + DestPath: "missing.env", + Envvars: true, + }, + } + + if vars, err := loadTemplateEnv(templates, d); err == nil { + t.Fatalf("expected an error but instead got env vars: %#v", vars) + } +} + +// TestTaskTemplateManager_Env_Multi asserts the core env +// template processing function returns combined env vars from multiple +// templates correctly. +func TestTaskTemplateManager_Env_Multi(t *testing.T) { + t.Parallel() + d, err := ioutil.TempDir("", "ct_env_missing") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(d) + + // Fake writing the files so we don't have to run the whole template manager + err = ioutil.WriteFile(filepath.Join(d, "zzz.env"), []byte("FOO=bar\nSHARED=nope\n"), 0644) + if err != nil { + t.Fatalf("error writing template file 1: %v", err) + } + err = ioutil.WriteFile(filepath.Join(d, "aaa.env"), []byte("BAR=foo\nSHARED=yup\n"), 0644) + if err != nil { + t.Fatalf("error writing template file 2: %v", err) + } + + // Templates will get loaded in order (not alpha sorted) + templates := []*structs.Template{ + { + DestPath: "zzz.env", + Envvars: true, + }, + { + DestPath: "aaa.env", + Envvars: true, + }, + } + + vars, err := loadTemplateEnv(templates, d) + if err != nil { + t.Fatalf("expected an error but instead got env vars: %#v", vars) + } + if vars["FOO"] != "bar" { + t.Errorf("expected FOO=bar but found %q", vars["FOO"]) + } + if vars["BAR"] != "foo" { + t.Errorf("expected BAR=foo but found %q", vars["BAR"]) + } + if vars["SHARED"] != "yup" { + t.Errorf("expected FOO=bar but found %q", vars["yup"]) + } +} + +func TestTaskTemplateManager_Rerender_Env(t *testing.T) { + t.Parallel() + // Make a template that renders based on a key in Consul and sends restart + key1 := "bam" + key2 := "bar" + content1_1 := "cat" + content1_2 := "dog" + t1 := &structs.Template{ + EmbeddedTmpl: ` +FOO={{key "bam"}} +`, + DestPath: "test.env", + ChangeMode: structs.TemplateChangeModeRestart, + Envvars: true, + } + t2 := &structs.Template{ + EmbeddedTmpl: ` +BAR={{key "bar"}} +`, + DestPath: "test2.env", + ChangeMode: structs.TemplateChangeModeRestart, + Envvars: true, + } + + harness := newTestHarness(t, []*structs.Template{t1, t2}, true, false) + harness.start(t) + defer harness.stop() + + // Ensure no unblock + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + } + + // Write the key to Consul + harness.consul.SetKV(t, key1, []byte(content1_1)) + harness.consul.SetKV(t, key2, []byte(content1_1)) + + // Wait for the unblock + select { + case <-harness.mockHooks.UnblockCh: + case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Task unblock should have been called") + } + + env := harness.envBuilder.Build().Map() + if v, ok := env["FOO"]; !ok || v != content1_1 { + t.Fatalf("Bad env for FOO: %v %v", v, ok) + } + if v, ok := env["BAR"]; !ok || v != content1_1 { + t.Fatalf("Bad env for BAR: %v %v", v, ok) + } + + // Update the keys in Consul + harness.consul.SetKV(t, key1, []byte(content1_2)) + + // Wait for restart + timeout := time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second) +OUTER: + for { + select { + case <-harness.mockHooks.RestartCh: + break OUTER + case <-harness.mockHooks.SignalCh: + t.Fatalf("Signal with restart policy: %+v", harness.mockHooks) + case <-timeout: + t.Fatalf("Should have received a restart: %+v", harness.mockHooks) + } + } + + env = harness.envBuilder.Build().Map() + if v, ok := env["FOO"]; !ok || v != content1_2 { + t.Fatalf("Bad env for FOO: %v %v", v, ok) + } + if v, ok := env["BAR"]; !ok || v != content1_1 { + t.Fatalf("Bad env for BAR: %v %v", v, ok) + } +} + +// TestTaskTemplateManager_Config_ServerName asserts the tls_server_name +// setting is propagated to consul-template's configuration. See #2776 +func TestTaskTemplateManager_Config_ServerName(t *testing.T) { + t.Parallel() + c := config.DefaultConfig() + c.VaultConfig = &sconfig.VaultConfig{ + Enabled: helper.BoolToPtr(true), + Addr: "https://localhost/", + TLSServerName: "notlocalhost", + } + config := &TaskTemplateManagerConfig{ + ClientConfig: c, + VaultToken: "token", + } + ctconf, err := newRunnerConfig(config, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if *ctconf.Vault.SSL.ServerName != c.VaultConfig.TLSServerName { + t.Fatalf("expected %q but found %q", c.VaultConfig.TLSServerName, *ctconf.Vault.SSL.ServerName) + } +} + +// TestTaskTemplateManager_Config_VaultGrace asserts the vault_grace setting is +// propagated to consul-template's configuration. +func TestTaskTemplateManager_Config_VaultGrace(t *testing.T) { + t.Parallel() + assert := assert.New(t) + c := config.DefaultConfig() + c.Node = mock.Node() + c.VaultConfig = &sconfig.VaultConfig{ + Enabled: helper.BoolToPtr(true), + Addr: "https://localhost/", + TLSServerName: "notlocalhost", + } + + alloc := mock.Alloc() + config := &TaskTemplateManagerConfig{ + ClientConfig: c, + VaultToken: "token", + + // Make a template that will render immediately + Templates: []*structs.Template{ + { + EmbeddedTmpl: "bar", + DestPath: "foo", + ChangeMode: structs.TemplateChangeModeNoop, + VaultGrace: 10 * time.Second, + }, + { + EmbeddedTmpl: "baz", + DestPath: "bam", + ChangeMode: structs.TemplateChangeModeNoop, + VaultGrace: 100 * time.Second, + }, + }, + EnvBuilder: env.NewBuilder(c.Node, alloc, alloc.Job.TaskGroups[0].Tasks[0], c.Region), + } + + ctmplMapping, err := parseTemplateConfigs(config) + assert.Nil(err, "Parsing Templates") + + ctconf, err := newRunnerConfig(config, ctmplMapping) + assert.Nil(err, "Building Runner Config") + assert.NotNil(ctconf.Vault.Grace, "Vault Grace Pointer") + assert.Equal(10*time.Second, *ctconf.Vault.Grace, "Vault Grace Value") +} + +func TestTaskTemplateManager_BlockedEvents(t *testing.T) { + t.Parallel() + // Make a template that will render based on a key in Consul + var embedded string + for i := 0; i < 5; i++ { + embedded += fmt.Sprintf(`{{key "%d"}}`, i) + } + + file := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: embedded, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, true, false) + harness.setEmitRate(100 * time.Millisecond) + harness.start(t) + defer harness.stop() + + // Ensure that we get a blocked event + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-harness.mockHooks.EmitEventCh: + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + t.Fatalf("timeout") + } + + // Check to see we got a correct message + event := harness.mockHooks.Events[0] + if !strings.Contains(event, "and 2 more") { + t.Fatalf("bad event: %q", event) + } + + // Write 3 keys to Consul + for i := 0; i < 3; i++ { + harness.consul.SetKV(t, fmt.Sprintf("%d", i), []byte{0xa}) + } + + // Ensure that we get a blocked event + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-harness.mockHooks.EmitEventCh: + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + t.Fatalf("timeout") + } + + // Check to see we got a correct message + event = harness.mockHooks.Events[len(harness.mockHooks.Events)-1] + if !strings.Contains(event, "Missing") || strings.Contains(event, "more") { + t.Fatalf("bad event: %q", event) + } +} diff --git a/client/allocrunnerv2/taskrunner/template_hook.go b/client/allocrunnerv2/taskrunner/template_hook.go new file mode 100644 index 000000000..38f1a434b --- /dev/null +++ b/client/allocrunnerv2/taskrunner/template_hook.go @@ -0,0 +1,157 @@ +package taskrunner + +import ( + "context" + "fmt" + "sync" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" + ti "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" + "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/template" + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/env" + "github.com/hashicorp/nomad/nomad/structs" +) + +type templateHookConfig struct { + // logger is used to log + logger log.Logger + + // lifecycle is used to interact with the task's lifecycle + lifecycle ti.TaskLifecycle + + // events is used to emit events + events ti.EventEmitter + + // templates is the set of templates we are managing + templates []*structs.Template + + // clientConfig is the Nomad Client configuration + clientConfig *config.Config + + // envBuilder is the environment variable builder for the task. + envBuilder *env.Builder +} + +type templateHook struct { + config *templateHookConfig + + // logger is used to log + logger log.Logger + + // templateManager is used to manage any consul-templates this task may have + templateManager *template.TaskTemplateManager + managerLock sync.Mutex + + // vaultToken is the current Vault token + vaultToken string + + // taskDir is the task directory + taskDir string +} + +func newTemplateHook(config *templateHookConfig) *templateHook { + h := &templateHook{ + config: config, + } + h.logger = config.logger.Named(h.Name()) + return h +} + +func (*templateHook) Name() string { + return "template" +} + +func (h *templateHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunRequest, resp *interfaces.TaskPrerunResponse) error { + h.managerLock.Lock() + defer h.managerLock.Unlock() + + // If we have already run prerun before exit early. + if h.templateManager != nil { + return nil + } + + // Store the current Vault token and the task directory + h.taskDir = req.TaskDir + h.vaultToken = req.VaultToken + unblockCh, err := h.newManager() + if err != nil { + return err + } + + // Wait for the template to render + select { + case <-ctx.Done(): + case <-unblockCh: + } + + return nil +} + +func (h *templateHook) newManager() (unblock chan struct{}, err error) { + unblock = make(chan struct{}) + m, err := template.NewTaskTemplateManager(&template.TaskTemplateManagerConfig{ + UnblockCh: unblock, + Lifecycle: h.config.lifecycle, + Events: h.config.events, + Templates: h.config.templates, + ClientConfig: h.config.clientConfig, + VaultToken: h.vaultToken, + TaskDir: h.taskDir, + EnvBuilder: h.config.envBuilder, + MaxTemplateEventRate: template.DefaultMaxTemplateEventRate, + }) + if err != nil { + h.logger.Error("failed to create template manager", "error", err) + return nil, err + } + + h.templateManager = m + return unblock, nil +} + +func (h *templateHook) Poststop(ctx context.Context, req *interfaces.TaskPoststopRequest, resp *interfaces.TaskPoststopResponse) error { + h.managerLock.Lock() + defer h.managerLock.Unlock() + + // Shutdown any created template + if h.templateManager != nil { + h.templateManager.Stop() + } + + return nil +} + +// Handle new Vault token +func (h *templateHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, resp *interfaces.TaskUpdateResponse) error { + h.managerLock.Lock() + defer h.managerLock.Unlock() + + // Nothing to do + if h.templateManager == nil { + return nil + } + + // Check if the Vault token has changed + if req.VaultToken == h.vaultToken { + return nil + } else { + h.vaultToken = req.VaultToken + } + + // Shutdown the old template + h.templateManager.Stop() + h.templateManager = nil + + // Create the new template + if _, err := h.newManager(); err != nil { + err := fmt.Errorf("failed to build template manager: %v", err) + h.logger.Error("failed to build template manager", "error", err) + // XXX I think we can skip this + // r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), false) + h.config.lifecycle.Kill(h.Name(), err.Error(), true) + } + + return nil +} diff --git a/client/allocrunnerv2/taskrunner/vault_hook.go b/client/allocrunnerv2/taskrunner/vault_hook.go index 01e896ac6..be0702ab4 100644 --- a/client/allocrunnerv2/taskrunner/vault_hook.go +++ b/client/allocrunnerv2/taskrunner/vault_hook.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" + ti "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" ) @@ -50,8 +51,8 @@ func (tr *TaskRunner) updatedVaultToken(token string) { type vaultHookConfig struct { vaultStanza *structs.Vault client vaultclient.VaultClient - events EventEmitter - lifecycle TaskLifecycle + events ti.EventEmitter + lifecycle ti.TaskLifecycle updater vaultTokenUpdateHandler logger log.Logger alloc *structs.Allocation @@ -63,10 +64,10 @@ type vaultHook struct { vaultStanza *structs.Vault // eventEmitter is used to emit events to the task - eventEmitter EventEmitter + eventEmitter ti.EventEmitter // lifecycle is used to signal, restart and kill a task - lifecycle TaskLifecycle + lifecycle ti.TaskLifecycle // updater is used to update the Vault token updater vaultTokenUpdateHandler