mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 01:45:44 +03:00
Move env template handling into consul_template.go
This commit is contained in:
@@ -1,7 +1,10 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -76,7 +79,7 @@ type TaskTemplateManager struct {
|
||||
|
||||
func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
|
||||
config *config.Config, vaultToken, taskDir string,
|
||||
taskEnv *env.TaskEnv) (*TaskTemplateManager, error) {
|
||||
envBuilder *env.Builder) (*TaskTemplateManager, error) {
|
||||
|
||||
// Check pre-conditions
|
||||
if hook == nil {
|
||||
@@ -85,7 +88,7 @@ func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
|
||||
return nil, fmt.Errorf("Invalid config given")
|
||||
} else if taskDir == "" {
|
||||
return nil, fmt.Errorf("Invalid task directory given")
|
||||
} else if taskEnv == nil {
|
||||
} else if envBuilder == nil {
|
||||
return nil, fmt.Errorf("Invalid task environment given")
|
||||
}
|
||||
|
||||
@@ -114,14 +117,14 @@ func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
|
||||
}
|
||||
|
||||
// Build the consul-template runner
|
||||
runner, lookup, err := templateRunner(tmpls, config, vaultToken, taskDir, taskEnv)
|
||||
runner, lookup, err := templateRunner(tmpls, config, vaultToken, taskDir, envBuilder.Build())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tm.runner = runner
|
||||
tm.lookup = lookup
|
||||
|
||||
go tm.run()
|
||||
go tm.run(envBuilder, taskDir)
|
||||
return tm, nil
|
||||
}
|
||||
|
||||
@@ -144,7 +147,7 @@ func (tm *TaskTemplateManager) Stop() {
|
||||
}
|
||||
|
||||
// run is the long lived loop that handles errors and templates being rendered
|
||||
func (tm *TaskTemplateManager) run() {
|
||||
func (tm *TaskTemplateManager) run(envBuilder *env.Builder, taskDir string) {
|
||||
// Runner is nil if there is no templates
|
||||
if tm.runner == nil {
|
||||
// Unblock the start if there is nothing to do
|
||||
@@ -192,6 +195,11 @@ WAIT:
|
||||
}
|
||||
}
|
||||
|
||||
for _, t := range tm.templates {
|
||||
if err := loadTemplateEnv(envBuilder, taskDir, t); err != nil {
|
||||
tm.hook.Kill("consul-template", err.Error(), true)
|
||||
}
|
||||
}
|
||||
allRenderedTime = time.Now()
|
||||
tm.hook.UnblockStart("consul-template")
|
||||
|
||||
@@ -243,6 +251,10 @@ WAIT:
|
||||
}
|
||||
|
||||
for _, tmpl := range tmpls {
|
||||
if err := loadTemplateEnv(envBuilder, taskDir, tmpl); err != nil {
|
||||
|
||||
tm.hook.Kill("consul-template", err.Error(), true)
|
||||
}
|
||||
switch tmpl.ChangeMode {
|
||||
case structs.TemplateChangeModeSignal:
|
||||
signals[tmpl.ChangeSignal] = struct{}{}
|
||||
@@ -490,3 +502,60 @@ func runnerConfig(config *config.Config, vaultToken string) (*ctconf.Config, err
|
||||
conf.Finalize()
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
// loadTemplateEnv loads task environment variables from templates.
|
||||
func loadTemplateEnv(builder *env.Builder, taskDir string, t *structs.Template) error {
|
||||
if !t.Envvars {
|
||||
return nil
|
||||
}
|
||||
f, err := os.Open(filepath.Join(taskDir, t.DestPath))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening env template: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Parse environment fil
|
||||
vars, err := parseEnvFile(f)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing env template %q: %v", t.DestPath, err)
|
||||
}
|
||||
|
||||
// Set the environment variables
|
||||
builder.SetTemplateEnv(vars)
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseEnvFile and return a map of the environment variables suitable for
|
||||
// TaskEnvironment.AppendEnvvars or an error.
|
||||
//
|
||||
// See nomad/structs#Template.Envvars comment for format.
|
||||
func parseEnvFile(r io.Reader) (map[string]string, error) {
|
||||
vars := make(map[string]string, 50)
|
||||
lines := 0
|
||||
scanner := bufio.NewScanner(r)
|
||||
for scanner.Scan() {
|
||||
lines++
|
||||
buf := scanner.Bytes()
|
||||
if len(buf) == 0 {
|
||||
// Skip empty lines
|
||||
continue
|
||||
}
|
||||
if buf[0] == '#' {
|
||||
// Skip lines starting with a #
|
||||
continue
|
||||
}
|
||||
n := bytes.IndexByte(buf, '=')
|
||||
if n == -1 {
|
||||
return nil, fmt.Errorf("line %d: no '=' sign: %q", lines, string(buf))
|
||||
}
|
||||
if len(buf) > n {
|
||||
vars[string(buf[0:n])] = string(buf[n+1 : len(buf)])
|
||||
} else {
|
||||
vars[string(buf[0:n])] = ""
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return vars, nil
|
||||
}
|
||||
|
||||
@@ -825,7 +825,7 @@ func (r *TaskRunner) updatedTokenHandler() {
|
||||
// Create a new templateManager
|
||||
var err error
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates,
|
||||
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder.Build())
|
||||
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
|
||||
@@ -886,8 +886,6 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
}
|
||||
|
||||
for {
|
||||
taskEnv := r.envBuilder.Build()
|
||||
|
||||
r.persistLock.Lock()
|
||||
downloaded := r.artifactsDownloaded
|
||||
r.persistLock.Unlock()
|
||||
@@ -895,6 +893,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
// Download the task's artifacts
|
||||
if !downloaded && len(r.task.Artifacts) > 0 {
|
||||
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
|
||||
taskEnv := r.envBuilder.Build()
|
||||
for _, artifact := range r.task.Artifacts {
|
||||
if err := getter.GetArtifact(taskEnv, artifact, r.taskDir.Dir); err != nil {
|
||||
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
|
||||
@@ -927,7 +926,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
if r.templateManager == nil {
|
||||
var err error
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates,
|
||||
r.config, r.vaultFuture.Get(), r.taskDir.Dir, taskEnv)
|
||||
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
|
||||
@@ -1295,12 +1294,6 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) {
|
||||
|
||||
// startTask creates the driver, task dir, and starts the task.
|
||||
func (r *TaskRunner) startTask() error {
|
||||
// Load any env templates into environment
|
||||
if err := r.loadTemplateEnv(); err != nil {
|
||||
//FIXME should we soft fail here?
|
||||
return fmt.Errorf("failed to load env vars from templates: %v", err)
|
||||
}
|
||||
|
||||
// Create a driver
|
||||
drv, err := r.createDriver()
|
||||
if err != nil {
|
||||
@@ -1354,37 +1347,6 @@ func (r *TaskRunner) startTask() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadTemplateEnv loads task environment variables from templates.
|
||||
func (r *TaskRunner) loadTemplateEnv() error {
|
||||
var merr multierror.Error
|
||||
all := make(map[string]string)
|
||||
for _, tmpl := range r.task.Templates {
|
||||
if !tmpl.Envvars {
|
||||
continue
|
||||
}
|
||||
f, err := os.Open(filepath.Join(r.taskDir.Dir, tmpl.DestPath))
|
||||
if err != nil {
|
||||
r.logger.Printf("[DEBUG] client: cannot load env vars from %q", tmpl.DestPath)
|
||||
// It's not an error for the template to not be rendered yet
|
||||
if !os.IsNotExist(err) {
|
||||
merr.Errors = append(merr.Errors, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
defer f.Close()
|
||||
vars, err := parseEnvFile(f)
|
||||
if err != nil {
|
||||
merr.Errors = append(merr.Errors, err)
|
||||
continue
|
||||
}
|
||||
for k, v := range vars {
|
||||
all[k] = v
|
||||
}
|
||||
}
|
||||
r.envBuilder.SetTemplateEnv(all)
|
||||
return merr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// registerServices and checks with Consul.
|
||||
func (r *TaskRunner) registerServices(d driver.Driver, h driver.ScriptExecutor) error {
|
||||
var exec driver.ScriptExecutor
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
|
||||
@@ -89,38 +86,3 @@ func pre060RestoreState(path string, data interface{}) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseEnvFile and return a map of the environment variables suitable for
|
||||
// TaskEnvironment.AppendEnvvars or an error.
|
||||
//
|
||||
// See nomad/structs#Template.Envvars comment for format.
|
||||
func parseEnvFile(r io.Reader) (map[string]string, error) {
|
||||
vars := make(map[string]string, 50)
|
||||
lines := 0
|
||||
scanner := bufio.NewScanner(r)
|
||||
for scanner.Scan() {
|
||||
lines++
|
||||
buf := scanner.Bytes()
|
||||
if len(buf) == 0 {
|
||||
// Skip empty lines
|
||||
continue
|
||||
}
|
||||
if buf[0] == '#' {
|
||||
// Skip lines starting with a #
|
||||
continue
|
||||
}
|
||||
n := bytes.IndexByte(buf, '=')
|
||||
if n == -1 {
|
||||
return nil, fmt.Errorf("error on line %d: no '=' sign: %q", lines, string(buf))
|
||||
}
|
||||
if len(buf) > n {
|
||||
vars[string(buf[0:n])] = string(buf[n+1 : len(buf)])
|
||||
} else {
|
||||
vars[string(buf[0:n])] = ""
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return vars, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user