Refactor TaskEnvironment into Builder and TaskEnv

This commit is contained in:
Michael Schurter
2017-05-16 17:33:50 -07:00
parent 7dac668adf
commit ace00980a1
13 changed files with 519 additions and 620 deletions

View File

@@ -76,7 +76,7 @@ type TaskTemplateManager struct {
func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
config *config.Config, vaultToken, taskDir string,
taskEnv *env.TaskEnvironment) (*TaskTemplateManager, error) {
taskEnv *env.TaskEnv) (*TaskTemplateManager, error) {
// Check pre-conditions
if hook == nil {
@@ -317,7 +317,7 @@ func (tm *TaskTemplateManager) allTemplatesNoop() bool {
// lookup by destination to the template. If no templates are given, a nil
// template runner and lookup is returned.
func templateRunner(tmpls []*structs.Template, config *config.Config,
vaultToken, taskDir string, taskEnv *env.TaskEnvironment) (
vaultToken, taskDir string, taskEnv *env.TaskEnv) (
*manager.Runner, map[string][]*structs.Template, error) {
if len(tmpls) == 0 {
@@ -350,7 +350,7 @@ func templateRunner(tmpls []*structs.Template, config *config.Config,
}
// Set Nomad's environment variables
runner.Env = taskEnv.Build().EnvMapAll()
runner.Env = taskEnv.All()
// Build the lookup
idMap := runner.TemplateConfigMapping()
@@ -368,9 +368,7 @@ func templateRunner(tmpls []*structs.Template, config *config.Config,
// parseTemplateConfigs converts the tasks templates into consul-templates
func parseTemplateConfigs(tmpls []*structs.Template, taskDir string,
taskEnv *env.TaskEnvironment, allowAbs bool) (map[ctconf.TemplateConfig]*structs.Template, error) {
// Build the task environment
taskEnv.Build()
taskEnv *env.TaskEnv, allowAbs bool) (map[ctconf.TemplateConfig]*structs.Template, error) {
ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(tmpls))
for _, tmpl := range tmpls {

View File

@@ -180,7 +180,7 @@ func (c *DockerDriverConfig) Validate() error {
// NewDockerDriverConfig returns a docker driver config by parsing the HCL
// config
func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*DockerDriverConfig, error) {
func NewDockerDriverConfig(task *structs.Task, envBuilder *env.Builder) (*DockerDriverConfig, error) {
var dconf DockerDriverConfig
if err := mapstructure.WeakDecode(task.Config, &dconf); err != nil {
@@ -188,6 +188,7 @@ func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*Docke
}
// Interpolate everthing that is a string
env := envBuilder.Build()
dconf.ImageName = env.ReplaceEnv(dconf.ImageName)
dconf.Command = env.ReplaceEnv(dconf.Command)
dconf.IpcMode = env.ReplaceEnv(dconf.IpcMode)
@@ -456,7 +457,7 @@ func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoord
}
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
driverConfig, err := NewDockerDriverConfig(task, d.taskEnv)
driverConfig, err := NewDockerDriverConfig(task, d.envBuilder)
if err != nil {
return nil, err
}
@@ -495,7 +496,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: d.envBuilder.Build(),
Task: task,
Driver: "docker",
AllocID: d.DriverContext.allocID,
@@ -899,14 +900,14 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
d.logger.Printf("[DEBUG] driver.docker: exposed port %s", containerPort)
}
d.taskEnv.SetPortMap(driverConfig.PortMap)
d.envBuilder.SetPortMap(driverConfig.PortMap)
hostConfig.PortBindings = publishedPorts
config.ExposedPorts = exposedPorts
}
d.taskEnv.Build()
parsedArgs := d.taskEnv.ParseAndReplace(driverConfig.Args)
taskEnv := d.envBuilder.Build()
parsedArgs := taskEnv.ParseAndReplace(driverConfig.Args)
// If the user specified a custom command to run, we'll inject it here.
if driverConfig.Command != "" {
@@ -930,7 +931,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
d.logger.Printf("[DEBUG] driver.docker: applied labels on the container: %+v", config.Labels)
}
config.Env = d.taskEnv.EnvList()
config.Env = taskEnv.List()
containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID)
d.logger.Printf("[DEBUG] driver.docker: setting container name to: %s", containerName)

View File

@@ -1,8 +1,6 @@
package driver
import (
"bufio"
"bytes"
"context"
"crypto/md5"
"errors"
@@ -10,8 +8,6 @@ import (
"io"
"log"
"os"
"path/filepath"
"strings"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
@@ -50,8 +46,8 @@ func NewDriver(name string, ctx *DriverContext) (Driver, error) {
}
// Instantiate the driver
f := factory(ctx)
return f, nil
d := factory(ctx)
return d, nil
}
// Factory is used to instantiate a new Driver
@@ -224,12 +220,12 @@ type LogEventFn func(message string, args ...interface{})
// node attributes into a Driver without having to change the Driver interface
// each time we do it. Used in conjection with Factory, above.
type DriverContext struct {
taskName string
allocID string
config *config.Config
logger *log.Logger
node *structs.Node
taskEnv *env.TaskEnvironment
taskName string
allocID string
config *config.Config
logger *log.Logger
node *structs.Node
envBuilder *env.Builder
emitEvent LogEventFn
}
@@ -245,15 +241,15 @@ func NewEmptyDriverContext() *DriverContext {
// private to the driver. If we want to change this later we can gorename all of
// the fields in DriverContext.
func NewDriverContext(taskName, allocID string, config *config.Config, node *structs.Node,
logger *log.Logger, taskEnv *env.TaskEnvironment, eventEmitter LogEventFn) *DriverContext {
logger *log.Logger, envBuilder *env.Builder, eventEmitter LogEventFn) *DriverContext {
return &DriverContext{
taskName: taskName,
allocID: allocID,
config: config,
node: node,
logger: logger,
taskEnv: taskEnv,
emitEvent: eventEmitter,
taskName: taskName,
allocID: allocID,
config: config,
node: node,
logger: logger,
envBuilder: envBuilder,
emitEvent: eventEmitter,
}
}
@@ -303,116 +299,6 @@ func NewExecContext(td *allocdir.TaskDir) *ExecContext {
}
}
// GetTaskEnv converts the alloc dir, the node, task and alloc into a
// TaskEnvironment.
func GetTaskEnv(taskDir *allocdir.TaskDir, node *structs.Node,
task *structs.Task, alloc *structs.Allocation, conf *config.Config,
vaultToken string) (*env.TaskEnvironment, error) {
env := env.NewTaskEnvironment(node).
SetTaskMeta(alloc.Job.CombinedTaskMeta(alloc.TaskGroup, task.Name)).
SetJobName(alloc.Job.Name).
SetDatacenterName(node.Datacenter).
SetRegionName(conf.Region).
SetEnvvars(task.Env).
SetTaskName(task.Name)
// Set env vars from env files
for _, tmpl := range task.Templates {
if !tmpl.Envvars {
continue
}
f, err := os.Open(filepath.Join(taskDir.Dir, tmpl.DestPath))
if err != nil {
//FIXME GetTaskEnv may be called before env files are written
log.Printf("[DEBUG] driver: XXX FIXME Templates not rendered yet, skipping")
continue
}
defer f.Close()
vars, err := parseEnvFile(f)
if err != nil {
//TODO soft or hard fail?!
return nil, err
}
env.AppendEnvvars(vars)
}
// Vary paths by filesystem isolation used
drv, err := NewDriver(task.Driver, NewEmptyDriverContext())
if err != nil {
return nil, err
}
switch drv.FSIsolation() {
case cstructs.FSIsolationNone:
// Use host paths
env.SetAllocDir(taskDir.SharedAllocDir)
env.SetTaskLocalDir(taskDir.LocalDir)
env.SetSecretsDir(taskDir.SecretsDir)
default:
// filesystem isolation; use container paths
env.SetAllocDir(allocdir.SharedAllocContainerPath)
env.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
env.SetSecretsDir(allocdir.TaskSecretsContainerPath)
}
if task.Resources != nil {
env.SetMemLimit(task.Resources.MemoryMB).
SetCpuLimit(task.Resources.CPU).
SetNetworks(task.Resources.Networks)
}
if alloc != nil {
env.SetAlloc(alloc)
}
if task.Vault != nil {
env.SetVaultToken(vaultToken, task.Vault.Env)
}
// Set the host environment variables for non-image based drivers
if drv.FSIsolation() != cstructs.FSIsolationImage {
filter := strings.Split(conf.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
env.AppendHostEnvvars(filter)
}
return env.Build(), nil
}
// parseEnvFile and return a map of the environment variables suitable for
// TaskEnvironment.AppendEnvvars or an error.
//
// See nomad/structs#Template.Envvars comment for format.
func parseEnvFile(r io.Reader) (map[string]string, error) {
vars := make(map[string]string, 50)
lines := 0
scanner := bufio.NewScanner(r)
for scanner.Scan() {
lines++
buf := scanner.Bytes()
if len(buf) == 0 {
// Skip empty lines
continue
}
if buf[0] == '#' {
// Skip lines starting with a #
continue
}
n := bytes.IndexByte(buf, '=')
if n == -1 {
return nil, fmt.Errorf("error on line %d: no '=' sign: %q", lines, string(buf))
}
if len(buf) > n {
vars[string(buf[0:n])] = string(buf[n+1 : len(buf)])
} else {
vars[string(buf[0:n])] = ""
}
}
if err := scanner.Err(); err != nil {
return nil, err
}
return vars, nil
}
func mapMergeStrInt(maps ...map[string]int) map[string]int {
out := map[string]int{}
for _, in := range maps {

View File

@@ -6,6 +6,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"github.com/hashicorp/nomad/helper"
hargs "github.com/hashicorp/nomad/helper/args"
@@ -89,464 +90,431 @@ const (
nodeMetaPrefix = "meta."
)
// TaskEnvironment is used to expose information to a task via environment
// variables and provide interpolation of Nomad variables.
type TaskEnvironment struct {
Env map[string]string
TaskMeta map[string]string
AllocDir string
TaskDir string
SecretsDir string
CpuLimit int
MemLimit int
TaskName string
AllocIndex int
Datacenter string
Region string
AllocId string
AllocName string
Node *structs.Node
Networks []*structs.NetworkResource
PortMap map[string]int
VaultToken string
InjectVaultToken bool
JobName string
Alloc *structs.Allocation
// TaskEnv is a task's environment as well as node attribute's for
// interpolation.
type TaskEnv struct {
// NodeAttrs is the map of node attributes for interpolation
NodeAttrs map[string]string
// taskEnv is the variables that will be set in the tasks environment
TaskEnv map[string]string
// EnvMap is the map of environment variables
EnvMap map[string]string
// nodeValues is the values that are allowed for interprolation from the
// node.
NodeValues map[string]string
// envList is a memoized list created by List()
envList []string
}
func NewTaskEnvironment(node *structs.Node) *TaskEnvironment {
return &TaskEnvironment{Node: node, AllocIndex: -1}
// NewTaskEnv creates a new task environment with the given environment and
// node attribute maps.
func NewTaskEnv(env, node map[string]string) *TaskEnv {
return &TaskEnv{
NodeAttrs: node,
EnvMap: env,
}
}
// ParseAndReplace takes the user supplied args replaces any instance of an
// environment variable or nomad variable in the args with the actual value.
func (t *TaskEnvironment) ParseAndReplace(args []string) []string {
replaced := make([]string, len(args))
for i, arg := range args {
replaced[i] = hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues)
// List returns the task's environment as a slice of NAME=value pair strings.
func (t *TaskEnv) List() []string {
if t.envList != nil {
return t.envList
}
return replaced
}
// ReplaceEnv takes an arg and replaces all occurrences of environment variables
// and nomad variables. If the variable is found in the passed map it is
// replaced, otherwise the original string is returned.
func (t *TaskEnvironment) ReplaceEnv(arg string) string {
return hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues)
}
// Build must be called after all the tasks environment values have been set.
func (t *TaskEnvironment) Build() *TaskEnvironment {
t.NodeValues = make(map[string]string)
t.TaskEnv = make(map[string]string)
// Build the meta
for k, v := range t.TaskMeta {
t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, k)] = v
}
// Build the ports
for _, network := range t.Networks {
for label, value := range network.MapLabelToValues(nil) {
t.TaskEnv[fmt.Sprintf("%s%s", IpPrefix, label)] = network.IP
t.TaskEnv[fmt.Sprintf("%s%s", HostPortPrefix, label)] = strconv.Itoa(value)
if forwardedPort, ok := t.PortMap[label]; ok {
value = forwardedPort
}
t.TaskEnv[fmt.Sprintf("%s%s", PortPrefix, label)] = strconv.Itoa(value)
IPPort := net.JoinHostPort(network.IP, strconv.Itoa(value))
t.TaskEnv[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort
}
}
// Build the directories
if t.AllocDir != "" {
t.TaskEnv[AllocDir] = t.AllocDir
}
if t.TaskDir != "" {
t.TaskEnv[TaskLocalDir] = t.TaskDir
}
if t.SecretsDir != "" {
t.TaskEnv[SecretsDir] = t.SecretsDir
}
// Build the resource limits
if t.MemLimit != 0 {
t.TaskEnv[MemLimit] = strconv.Itoa(t.MemLimit)
}
if t.CpuLimit != 0 {
t.TaskEnv[CpuLimit] = strconv.Itoa(t.CpuLimit)
}
// Build the tasks ids
if t.AllocId != "" {
t.TaskEnv[AllocID] = t.AllocId
}
if t.AllocName != "" {
t.TaskEnv[AllocName] = t.AllocName
}
if t.AllocIndex != -1 {
t.TaskEnv[AllocIndex] = strconv.Itoa(t.AllocIndex)
}
if t.TaskName != "" {
t.TaskEnv[TaskName] = t.TaskName
}
if t.JobName != "" {
t.TaskEnv[JobName] = t.JobName
}
if t.Datacenter != "" {
t.TaskEnv[Datacenter] = t.Datacenter
}
if t.Region != "" {
t.TaskEnv[Region] = t.Region
}
// Build the addr of the other tasks
if t.Alloc != nil {
for taskName, resources := range t.Alloc.TaskResources {
if taskName == t.TaskName {
continue
}
for _, nw := range resources.Networks {
ports := make([]structs.Port, 0, len(nw.ReservedPorts)+len(nw.DynamicPorts))
for _, port := range nw.ReservedPorts {
ports = append(ports, port)
}
for _, port := range nw.DynamicPorts {
ports = append(ports, port)
}
for _, p := range ports {
key := fmt.Sprintf("%s%s_%s", AddrPrefix, taskName, p.Label)
t.TaskEnv[key] = fmt.Sprintf("%s:%d", nw.IP, p.Value)
key = fmt.Sprintf("%s%s_%s", IpPrefix, taskName, p.Label)
t.TaskEnv[key] = nw.IP
key = fmt.Sprintf("%s%s_%s", PortPrefix, taskName, p.Label)
t.TaskEnv[key] = strconv.Itoa(p.Value)
}
}
}
}
// Build the node
if t.Node != nil {
// Set up the node values.
t.NodeValues[nodeIdKey] = t.Node.ID
t.NodeValues[nodeDcKey] = t.Node.Datacenter
t.NodeValues[nodeRegionKey] = t.Region
t.NodeValues[nodeNameKey] = t.Node.Name
t.NodeValues[nodeClassKey] = t.Node.NodeClass
// Set up the attributes.
for k, v := range t.Node.Attributes {
t.NodeValues[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v
}
// Set up the meta.
for k, v := range t.Node.Meta {
t.NodeValues[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v
}
}
// Build the Vault Token
if t.InjectVaultToken && t.VaultToken != "" {
t.TaskEnv[VaultToken] = t.VaultToken
}
// Interpret the environment variables
interpreted := make(map[string]string, len(t.Env))
for k, v := range t.Env {
interpreted[k] = hargs.ReplaceEnv(v, t.NodeValues, t.TaskEnv)
}
for k, v := range interpreted {
t.TaskEnv[k] = v
}
// Clean keys (see #2405)
cleanedEnv := make(map[string]string, len(t.TaskEnv))
for k, v := range t.TaskEnv {
cleanedK := helper.CleanEnvVar(k, '_')
cleanedEnv[cleanedK] = v
}
t.TaskEnv = cleanedEnv
return t
}
// EnvList returns a list of strings with NAME=value pairs.
func (t *TaskEnvironment) EnvList() []string {
env := []string{}
for k, v := range t.TaskEnv {
for k, v := range t.EnvMap {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}
return env
}
// EnvMap returns a copy of the tasks environment variables.
func (t *TaskEnvironment) EnvMap() map[string]string {
m := make(map[string]string, len(t.TaskEnv))
for k, v := range t.TaskEnv {
// Map of the task's environment variables.
func (t *TaskEnv) Map() map[string]string {
m := make(map[string]string, len(t.EnvMap))
for k, v := range t.EnvMap {
m[k] = v
}
return m
}
// EnvMapAll returns the environment variables that will be set as well as node
// meta/attrs in the map. This is appropriate for interpolation.
func (t *TaskEnvironment) EnvMapAll() map[string]string {
m := make(map[string]string, len(t.TaskEnv))
for k, v := range t.TaskEnv {
// All of the task's environment variables and the node's attributes in a
// single map.
func (t *TaskEnv) All() map[string]string {
m := make(map[string]string, len(t.EnvMap)+len(t.NodeAttrs))
for k, v := range t.EnvMap {
m[k] = v
}
for k, v := range t.NodeValues {
for k, v := range t.NodeAttrs {
m[k] = v
}
return m
}
// Builder methods to build the TaskEnvironment
func (t *TaskEnvironment) SetAllocDir(dir string) *TaskEnvironment {
t.AllocDir = dir
return t
// ParseAndReplace takes the user supplied args replaces any instance of an
// environment variable or Nomad variable in the args with the actual value.
func (t *TaskEnv) ParseAndReplace(args []string) []string {
replaced := make([]string, len(args))
for i, arg := range args {
replaced[i] = hargs.ReplaceEnv(arg, t.EnvMap, t.NodeAttrs)
}
return replaced
}
func (t *TaskEnvironment) ClearAllocDir() *TaskEnvironment {
t.AllocDir = ""
return t
// ReplaceEnv takes an arg and replaces all occurrences of environment variables
// and Nomad variables. If the variable is found in the passed map it is
// replaced, otherwise the original string is returned.
func (t *TaskEnv) ReplaceEnv(arg string) string {
return hargs.ReplaceEnv(arg, t.EnvMap, t.NodeAttrs)
}
func (t *TaskEnvironment) SetTaskLocalDir(dir string) *TaskEnvironment {
t.TaskDir = dir
return t
// Builder is used to build task environment's and is safe for concurrent use.
type Builder struct {
// envvars are custom set environment variables
envvars map[string]string
// templateEnv are env vars set from templates
templateEnv map[string]string
// hostEnv are environment variables filtered from the host
hostEnv map[string]string
// nodeAttrs are Node attributes and metadata
nodeAttrs map[string]string
// taskMeta are the meta attributes on the task
taskMeta map[string]string
// allocDir from task's perspective; eg /alloc
allocDir string
// localDir from task's perspective; eg /local
localDir string
// secrestsDir from task's perspective; eg /secrets
secretsDir string
cpuLimit int
memLimit int
taskName string
allocIndex int
datacenter string
region string
allocId string
allocName string
portMap map[string]string
vaultToken string
injectVaultToken bool
jobName string
// otherPorts for tasks in the same alloc
otherPorts map[string]string
// networks related environment variables
networks map[string]string
mu *sync.RWMutex
}
func (t *TaskEnvironment) ClearTaskLocalDir() *TaskEnvironment {
t.TaskDir = ""
return t
// NewBuilder creates a new task environment builder.
func NewBuilder(node *structs.Node, taskName string) *Builder {
b := &Builder{
taskName: taskName,
envvars: make(map[string]string),
nodeAttrs: make(map[string]string),
mu: &sync.RWMutex{},
}
return b.setNode(node)
}
func (t *TaskEnvironment) SetSecretsDir(dir string) *TaskEnvironment {
t.SecretsDir = dir
return t
// Build must be called after all the tasks environment values have been set.
func (b *Builder) Build() *TaskEnv {
nodeAttrs := make(map[string]string)
envMap := make(map[string]string)
b.mu.RLock()
defer b.mu.RUnlock()
// Add the directories
if b.allocDir != "" {
envMap[AllocDir] = b.allocDir
}
if b.localDir != "" {
envMap[TaskLocalDir] = b.localDir
}
if b.secretsDir != "" {
envMap[SecretsDir] = b.secretsDir
}
// Add the resource limits
if b.memLimit != 0 {
envMap[MemLimit] = strconv.Itoa(b.memLimit)
}
if b.cpuLimit != 0 {
envMap[CpuLimit] = strconv.Itoa(b.cpuLimit)
}
// Add the task metadata
if b.allocId != "" {
envMap[AllocID] = b.allocId
}
if b.allocName != "" {
envMap[AllocName] = b.allocName
}
if b.allocIndex != -1 {
envMap[AllocIndex] = strconv.Itoa(b.allocIndex)
}
if b.taskName != "" {
envMap[TaskName] = b.taskName
}
if b.jobName != "" {
envMap[JobName] = b.jobName
}
if b.datacenter != "" {
envMap[Datacenter] = b.datacenter
}
if b.region != "" {
envMap[Region] = b.region
// Copy region over to node attrs
nodeAttrs[nodeRegionKey] = b.region
}
// Build the addr of the other tasks
for k, v := range b.otherPorts {
envMap[k] = v
}
// Build the Vault Token
if b.injectVaultToken && b.vaultToken != "" {
envMap[VaultToken] = b.vaultToken
}
// Copy node attributes
for k, v := range b.nodeAttrs {
nodeAttrs[k] = v
}
// Interpolate and add environment variables
for k, v := range b.hostEnv {
envMap[k] = hargs.ReplaceEnv(v, nodeAttrs, envMap)
}
// Copy task env vars second as they override host env vars
for k, v := range b.envvars {
envMap[k] = v
}
// Copy template env vars third as they override task env vars
for k, v := range b.templateEnv {
envMap[k] = v
}
// Clean keys (see #2405)
cleanedEnv := make(map[string]string, len(envMap))
for k, v := range envMap {
cleanedK := helper.CleanEnvVar(k, '_')
cleanedEnv[cleanedK] = v
}
return NewTaskEnv(cleanedEnv, nodeAttrs)
}
func (t *TaskEnvironment) ClearSecretsDir() *TaskEnvironment {
t.SecretsDir = ""
return t
// setNode is called from NewBuilder to populate node attributes.
func (b *Builder) setNode(n *structs.Node) *Builder {
b.nodeAttrs[nodeIdKey] = n.ID
b.nodeAttrs[nodeDcKey] = n.Datacenter
b.nodeAttrs[nodeNameKey] = n.Name
b.nodeAttrs[nodeClassKey] = n.NodeClass
// Set up the attributes.
for k, v := range n.Attributes {
b.nodeAttrs[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v
}
// Set up the meta.
for k, v := range n.Meta {
b.nodeAttrs[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v
}
return b
}
func (t *TaskEnvironment) SetMemLimit(limit int) *TaskEnvironment {
t.MemLimit = limit
return t
func (b *Builder) SetJobName(name string) *Builder {
b.mu.Lock()
b.jobName = name
b.mu.Unlock()
return b
}
func (t *TaskEnvironment) ClearMemLimit() *TaskEnvironment {
t.MemLimit = 0
return t
func (b *Builder) SetRegion(r string) *Builder {
b.mu.Lock()
b.region = r
b.mu.Unlock()
return b
}
func (t *TaskEnvironment) SetCpuLimit(limit int) *TaskEnvironment {
t.CpuLimit = limit
return t
func (b *Builder) SetAllocDir(dir string) *Builder {
b.mu.Lock()
b.allocDir = dir
b.mu.Unlock()
return b
}
func (t *TaskEnvironment) ClearCpuLimit() *TaskEnvironment {
t.CpuLimit = 0
return t
func (b *Builder) SetTaskLocalDir(dir string) *Builder {
b.mu.Lock()
b.localDir = dir
b.mu.Unlock()
return b
}
func (t *TaskEnvironment) SetNetworks(networks []*structs.NetworkResource) *TaskEnvironment {
t.Networks = networks
return t
func (b *Builder) SetSecretsDir(dir string) *Builder {
b.mu.Lock()
b.secretsDir = dir
b.mu.Unlock()
return b
}
func (t *TaskEnvironment) clearNetworks() *TaskEnvironment {
t.Networks = nil
return t
func (b *Builder) SetResources(r *structs.Resources) *Builder {
if r == nil {
return b
}
// Build up env map for network addresses
newNetworks := make(map[string]string, len(r.Networks)*4)
for _, network := range r.Networks {
for label, intVal := range network.MapLabelToValues(nil) {
value := strconv.Itoa(intVal)
newNetworks[fmt.Sprintf("%s%s", IpPrefix, label)] = network.IP
newNetworks[fmt.Sprintf("%s%s", HostPortPrefix, label)] = value
if forwardedPort, ok := b.portMap[label]; ok {
value = forwardedPort
}
newNetworks[fmt.Sprintf("%s%s", PortPrefix, label)] = value
IPPort := net.JoinHostPort(network.IP, value)
newNetworks[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort
}
}
b.mu.Lock()
b.memLimit = r.MemoryMB
b.cpuLimit = r.CPU
b.networks = newNetworks
b.mu.Unlock()
return b
}
func (t *TaskEnvironment) SetPortMap(portMap map[string]int) *TaskEnvironment {
t.PortMap = portMap
return t
}
func (t *TaskEnvironment) clearPortMap() *TaskEnvironment {
t.PortMap = nil
return t
func (b *Builder) SetPortMap(portMap map[string]int) *Builder {
newPortMap := make(map[string]string, len(portMap))
for k, v := range portMap {
newPortMap[k] = strconv.Itoa(v)
}
b.mu.Lock()
b.portMap = newPortMap
b.mu.Unlock()
return b
}
// Takes a map of meta values to be passed to the task. The keys are capatilized
// when the environent variable is set.
func (t *TaskEnvironment) SetTaskMeta(m map[string]string) *TaskEnvironment {
t.TaskMeta = m
return t
}
func (t *TaskEnvironment) ClearTaskMeta() *TaskEnvironment {
t.TaskMeta = nil
return t
}
func (t *TaskEnvironment) SetEnvvars(m map[string]string) *TaskEnvironment {
t.Env = m
return t
}
// Appends the given environment variables.
func (t *TaskEnvironment) AppendEnvvars(m map[string]string) *TaskEnvironment {
if t.Env == nil {
t.Env = make(map[string]string, len(m))
}
func (b *Builder) SetTaskMeta(m map[string]string) *Builder {
newM := make(map[string]string, len(m)*2)
for k, v := range m {
t.Env[k] = v
newM[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
newM[fmt.Sprintf("%s%s", MetaPrefix, k)] = v
}
return t
b.mu.Lock()
b.taskMeta = newM
b.mu.Unlock()
return b
}
// AppendHostEnvvars adds the host environment variables to the tasks. The
// filter parameter can be use to filter host environment from entering the
// tasks.
func (t *TaskEnvironment) AppendHostEnvvars(filter []string) *TaskEnvironment {
hostEnv := os.Environ()
if t.Env == nil {
t.Env = make(map[string]string, len(hostEnv))
// Merge the given environment variables into existing ones.
func (b *Builder) MergeEnvvars(m map[string]string) *Builder {
b.mu.Lock()
for k, v := range m {
b.envvars[k] = v
}
b.mu.Unlock()
return b
}
// Index the filtered environment variables.
index := make(map[string]struct{}, len(filter))
// SetHostEnvvars adds the host environment variables to the tasks. The filter
// parameter can be use to filter host environment from entering the tasks.
func (b *Builder) SetHostEnvvars(filter []string) *Builder {
filterMap := make(map[string]struct{}, len(filter))
for _, f := range filter {
index[f] = struct{}{}
filterMap[f] = struct{}{}
}
for _, e := range hostEnv {
fullHostEnv := os.Environ()
filteredHostEnv := make(map[string]string, len(fullHostEnv))
for _, e := range fullHostEnv {
parts := strings.SplitN(e, "=", 2)
key, value := parts[0], parts[1]
// Skip filtered environment variables
if _, filtered := index[key]; filtered {
if _, filtered := filterMap[key]; filtered {
continue
}
// Don't override the tasks environment variables.
if _, existing := t.Env[key]; !existing {
t.Env[key] = value
}
filteredHostEnv[key] = value
}
return t
b.mu.Lock()
b.hostEnv = filteredHostEnv
b.mu.Unlock()
return b
}
func (t *TaskEnvironment) ClearEnvvars() *TaskEnvironment {
t.Env = nil
return t
// SetAlloc related environment variables.
func (b *Builder) SetAlloc(alloc *structs.Allocation) *Builder {
b.mu.Lock()
defer b.mu.Unlock()
b.allocId = alloc.ID
b.allocName = alloc.Name
b.allocIndex = alloc.Index()
// Add ports from other tasks
for taskName, resources := range alloc.TaskResources {
if taskName == b.taskName {
continue
}
for _, nw := range resources.Networks {
for _, p := range nw.ReservedPorts {
addPort(b.otherPorts, b.taskName, nw.IP, p.Label, p.Value)
}
for _, p := range nw.DynamicPorts {
addPort(b.otherPorts, b.taskName, nw.IP, p.Label, p.Value)
}
}
}
return b
}
// Helper method for setting all fields from an allocation.
func (t *TaskEnvironment) SetAlloc(alloc *structs.Allocation) *TaskEnvironment {
t.AllocId = alloc.ID
t.AllocName = alloc.Name
t.AllocIndex = alloc.Index()
t.Alloc = alloc
return t
func (b *Builder) SetTemplateEnv(m map[string]string) *Builder {
b.mu.Lock()
b.templateEnv = m
b.mu.Unlock()
return b
}
// Helper method for clearing all fields from an allocation.
func (t *TaskEnvironment) ClearAlloc(alloc *structs.Allocation) *TaskEnvironment {
return t.ClearAllocId().ClearAllocName().ClearAllocIndex()
func (b *Builder) SetVaultToken(token string, inject bool) *Builder {
b.mu.Lock()
b.vaultToken = token
b.injectVaultToken = inject
b.mu.Unlock()
return b
}
func (t *TaskEnvironment) SetAllocIndex(index int) *TaskEnvironment {
t.AllocIndex = index
return t
}
func (t *TaskEnvironment) ClearAllocIndex() *TaskEnvironment {
t.AllocIndex = -1
return t
}
func (t *TaskEnvironment) SetAllocId(id string) *TaskEnvironment {
t.AllocId = id
return t
}
func (t *TaskEnvironment) ClearAllocId() *TaskEnvironment {
t.AllocId = ""
return t
}
func (t *TaskEnvironment) SetAllocName(name string) *TaskEnvironment {
t.AllocName = name
return t
}
func (t *TaskEnvironment) ClearAllocName() *TaskEnvironment {
t.AllocName = ""
return t
}
func (t *TaskEnvironment) SetTaskName(name string) *TaskEnvironment {
t.TaskName = name
return t
}
func (t *TaskEnvironment) ClearTaskName() *TaskEnvironment {
t.TaskName = ""
return t
}
func (t *TaskEnvironment) SetJobName(name string) *TaskEnvironment {
t.JobName = name
return t
}
func (t *TaskEnvironment) ClearJobName() *TaskEnvironment {
t.JobName = ""
return t
}
func (t *TaskEnvironment) SetDatacenterName(name string) *TaskEnvironment {
t.Datacenter = name
return t
}
func (t *TaskEnvironment) ClearDatacenterName() *TaskEnvironment {
t.Datacenter = ""
return t
}
func (t *TaskEnvironment) SetRegionName(name string) *TaskEnvironment {
t.Region = name
return t
}
func (t *TaskEnvironment) ClearRegionName() *TaskEnvironment {
t.Region = ""
return t
}
func (t *TaskEnvironment) SetVaultToken(token string, inject bool) *TaskEnvironment {
t.VaultToken = token
t.InjectVaultToken = inject
return t
}
func (t *TaskEnvironment) ClearVaultToken() *TaskEnvironment {
t.VaultToken = ""
t.InjectVaultToken = false
return t
// addPort keys and values for other tasks to an env var map
func addPort(m map[string]string, taskName, ip, portLabel string, port int) {
key := fmt.Sprintf("%s%s_%s", AddrPrefix, taskName, portLabel)
m[key] = fmt.Sprintf("%s:%d", ip, port)
key = fmt.Sprintf("%s%s_%s", IpPrefix, taskName, portLabel)
m[key] = ip
key = fmt.Sprintf("%s%s_%s", PortPrefix, taskName, portLabel)
m[key] = strconv.Itoa(port)
}

View File

@@ -124,7 +124,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: d.envBuilder.Build(),
Driver: "exec",
AllocID: d.DriverContext.allocID,
LogDir: ctx.TaskDir.LogDir,

View File

@@ -66,7 +66,7 @@ type Executor interface {
// wants to run and isolate it
type ExecutorContext struct {
// TaskEnv holds information about the environment of a Task
TaskEnv *env.TaskEnvironment
TaskEnv *env.TaskEnv
// Task is the task whose executor is being launched
Task *structs.Task
@@ -229,7 +229,6 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro
// set the task dir as the working directory for the command
e.cmd.Dir = e.ctx.TaskDir
e.ctx.TaskEnv.Build()
// configuring the chroot, resource container, and start the plugin
// process in the chroot.
if err := e.configureIsolation(); err != nil {
@@ -274,7 +273,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro
// Set the commands arguments
e.cmd.Path = path
e.cmd.Args = append([]string{e.cmd.Path}, e.ctx.TaskEnv.ParseAndReplace(command.Args)...)
e.cmd.Env = e.ctx.TaskEnv.EnvList()
e.cmd.Env = e.ctx.TaskEnv.List()
// Start the process
if err := e.cmd.Start(); err != nil {
@@ -295,7 +294,7 @@ func (e *UniversalExecutor) Exec(deadline time.Time, name string, args []string)
// ExecScript executes cmd with args and returns the output, exit code, and
// error. Output is truncated to client/driver/structs.CheckBufSize
func ExecScript(ctx context.Context, dir string, env *env.TaskEnvironment, attrs *syscall.SysProcAttr,
func ExecScript(ctx context.Context, dir string, env *env.TaskEnv, attrs *syscall.SysProcAttr,
name string, args []string) ([]byte, int, error) {
name = env.ReplaceEnv(name)
cmd := exec.CommandContext(ctx, name, env.ParseAndReplace(args)...)
@@ -303,7 +302,7 @@ func ExecScript(ctx context.Context, dir string, env *env.TaskEnvironment, attrs
// Copy runtime environment from the main command
cmd.SysProcAttr = attrs
cmd.Dir = dir
cmd.Env = env.EnvList()
cmd.Env = env.List()
// Capture output
buf, _ := circbuf.NewBuffer(int64(dstructs.CheckBufSize))

View File

@@ -179,7 +179,7 @@ func (d *JavaDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, e
return nil, nil
}
func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*JavaDriverConfig, error) {
func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnv) (*JavaDriverConfig, error) {
var driverConfig JavaDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
@@ -203,7 +203,7 @@ func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*JavaDri
}
func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
driverConfig, err := NewJavaDriverConfig(task, d.taskEnv)
driverConfig, err := NewJavaDriverConfig(task, d.envBuilder.Build())
if err != nil {
return nil, err
}
@@ -249,7 +249,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Set the context
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: d.envBuilder.Build(),
Driver: "java",
AllocID: d.DriverContext.allocID,
Task: task,

View File

@@ -238,7 +238,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: d.envBuilder.Build(),
Driver: "qemu",
AllocID: d.DriverContext.allocID,
Task: task,

View File

@@ -50,7 +50,7 @@ type rawExecHandle struct {
logger *log.Logger
waitCh chan *dstructs.WaitResult
doneCh chan struct{}
taskEnv *env.TaskEnvironment
taskEnv *env.TaskEnv
taskDir *allocdir.TaskDir
}
@@ -133,7 +133,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: d.envBuilder.Build(),
Driver: "raw_exec",
AllocID: d.DriverContext.allocID,
Task: task,
@@ -169,7 +169,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
taskEnv: d.taskEnv,
taskEnv: d.envBuilder.Build(),
taskDir: ctx.TaskDir,
}
go h.run()
@@ -218,7 +218,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
version: id.Version,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
taskEnv: d.taskEnv,
taskEnv: d.envBuilder.Build(),
taskDir: ctx.TaskDir,
}
go h.run()

View File

@@ -88,7 +88,7 @@ type RktDriverConfig struct {
// rktHandle is returned from Start/Open as a handle to the PID
type rktHandle struct {
uuid string
env *env.TaskEnvironment
env *env.TaskEnv
taskDir *allocdir.TaskDir
pluginClient *plugin.Client
executorPid int
@@ -310,7 +310,8 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
cmdArgs = append(cmdArgs, fmt.Sprintf("--debug=%t", debug))
// Inject environment variables
for k, v := range d.taskEnv.EnvMap() {
taskEnv := d.envBuilder.Build()
for k, v := range taskEnv.Map() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%q", k, v))
}
@@ -400,7 +401,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
// Add user passed arguments.
if len(driverConfig.Args) != 0 {
parsed := d.taskEnv.ParseAndReplace(driverConfig.Args)
parsed := taskEnv.ParseAndReplace(driverConfig.Args)
// Need to start arguments with "--"
if len(parsed) > 0 {
@@ -412,9 +413,11 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
}
}
// Set the host environment variables.
// Set the host environment variables as by default they aren't set for
// image based drivers.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
d.envBuilder.SetHostEnvvars(filter)
taskEnv = d.envBuilder.Build()
pluginLogFile := filepath.Join(ctx.TaskDir.Dir, fmt.Sprintf("%s-executor.out", task.Name))
executorConfig := &dstructs.ExecutorConfig{
@@ -427,7 +430,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: taskEnv,
Driver: "rkt",
AllocID: d.DriverContext.allocID,
Task: task,
@@ -477,7 +480,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
maxKill := d.DriverContext.config.MaxKillTimeout
h := &rktHandle{
uuid: uuid,
env: d.taskEnv,
env: taskEnv,
taskDir: ctx.TaskDir,
pluginClient: pluginClient,
executor: execIntf,
@@ -519,7 +522,7 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error
// Return a driver handle
h := &rktHandle{
uuid: id.UUID,
env: d.taskEnv,
env: d.envBuilder.Build(),
taskDir: ctx.TaskDir,
pluginClient: pluginClient,
executorPid: id.ExecutorPid,

View File

@@ -51,8 +51,7 @@ func getClient(src, dst string) *gg.Client {
}
// getGetterUrl returns the go-getter URL to download the artifact.
func getGetterUrl(taskEnv *env.TaskEnvironment, artifact *structs.TaskArtifact) (string, error) {
taskEnv.Build()
func getGetterUrl(taskEnv *env.TaskEnv, artifact *structs.TaskArtifact) (string, error) {
source := taskEnv.ReplaceEnv(artifact.GetterSource)
// Handle an invalid URL when given a go-getter url such as
@@ -85,7 +84,7 @@ func getGetterUrl(taskEnv *env.TaskEnvironment, artifact *structs.TaskArtifact)
}
// GetArtifact downloads an artifact into the specified task directory.
func GetArtifact(taskEnv *env.TaskEnvironment, artifact *structs.TaskArtifact, taskDir string) error {
func GetArtifact(taskEnv *env.TaskEnv, artifact *structs.TaskArtifact, taskDir string) error {
url, err := getGetterUrl(taskEnv, artifact)
if err != nil {
return newGetError(artifact.GetterSource, err, false)

View File

@@ -85,9 +85,8 @@ type TaskRunner struct {
task *structs.Task
taskDir *allocdir.TaskDir
// taskEnv is the environment variables of the task
taskEnv *env.TaskEnvironment
taskEnvLock sync.Mutex
// envBuilder is used to build the task's environment
envBuilder *env.Builder
// updateCh is used to receive updated versions of the allocation
updateCh chan *structs.Allocation
@@ -215,6 +214,15 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
}
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
// Initialize the environment builder
envBuilder := env.NewBuilder(config.Node, task.Name).
SetTaskMeta(alloc.Job.CombinedTaskMeta(alloc.TaskGroup, task.Name)).
SetJobName(alloc.Job.Name).
SetRegion(config.Region).
MergeEnvvars(task.Env).
SetResources(task.Resources).
SetAlloc(alloc)
tc := &TaskRunner{
config: config,
stateDB: stateDB,
@@ -224,6 +232,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
alloc: alloc,
task: task,
taskDir: taskDir,
envBuilder: envBuilder,
createdResources: driver.NewCreatedResources(),
consul: consulClient,
vaultClient: vaultClient,
@@ -310,11 +319,6 @@ func (r *TaskRunner) RestoreState() (string, error) {
r.payloadRendered = snap.PayloadRendered
r.setCreatedResources(snap.CreatedResources)
if err := r.setTaskEnv(); err != nil {
return "", fmt.Errorf("client: failed to create task environment for task %q in allocation %q: %v",
r.task.Name, r.alloc.ID, err)
}
if r.task.Vault != nil {
// Read the token from the secret directory
tokenPath := filepath.Join(r.taskDir.SecretsDir, vaultTokenFile)
@@ -480,35 +484,8 @@ func (r *TaskRunner) setState(state string, event *structs.TaskEvent) {
r.updater(r.task.Name, state, event)
}
// setTaskEnv sets the task environment. It returns an error if it could not be
// created.
func (r *TaskRunner) setTaskEnv() error {
r.taskEnvLock.Lock()
defer r.taskEnvLock.Unlock()
taskEnv, err := driver.GetTaskEnv(r.taskDir, r.config.Node,
r.task.Copy(), r.alloc, r.config, r.vaultFuture.Get())
if err != nil {
return err
}
r.taskEnv = taskEnv
return nil
}
// getTaskEnv returns the task environment
func (r *TaskRunner) getTaskEnv() *env.TaskEnvironment {
r.taskEnvLock.Lock()
defer r.taskEnvLock.Unlock()
return r.taskEnv
}
// createDriver makes a driver for the task
func (r *TaskRunner) createDriver() (driver.Driver, error) {
env := r.getTaskEnv()
if env == nil {
return nil, fmt.Errorf("task environment not made for task %q in allocation %q", r.task.Name, r.alloc.ID)
}
// Create a task-specific event emitter callback to expose minimal
// state to drivers
eventEmitter := func(m string, args ...interface{}) {
@@ -517,13 +494,34 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
}
driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, env, eventEmitter)
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, r.envBuilder, eventEmitter)
d, err := driver.NewDriver(r.task.Driver, driverCtx)
if err != nil {
return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
r.task.Driver, r.alloc.ID, err)
}
return driver, err
// Set driver-specific environment variables
switch d.FSIsolation() {
case cstructs.FSIsolationNone:
// Use host paths
r.envBuilder.SetAllocDir(r.taskDir.SharedAllocDir)
r.envBuilder.SetTaskLocalDir(r.taskDir.LocalDir)
r.envBuilder.SetSecretsDir(r.taskDir.SecretsDir)
default:
// filesystem isolation; use container paths
r.envBuilder.SetAllocDir(allocdir.SharedAllocContainerPath)
r.envBuilder.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
r.envBuilder.SetSecretsDir(allocdir.TaskSecretsContainerPath)
}
// Set the host environment variables for non-image based drivers
if d.FSIsolation() != cstructs.FSIsolationImage {
filter := strings.Split(r.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
r.envBuilder.SetHostEnvvars(filter)
}
return d, err
}
// Run is a long running routine used to manage the task
@@ -532,15 +530,6 @@ func (r *TaskRunner) Run() {
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')",
r.task.Name, r.alloc.ID)
// Create the initial environment, this will be recreated if a Vault token
// is needed
if err := r.setTaskEnv(); err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err))
return
}
if err := r.validateTask(); err != nil {
r.setState(
structs.TaskStateDead,
@@ -613,8 +602,9 @@ func (r *TaskRunner) validateTask() error {
}
// Validate the Service names
taskEnv := r.envBuilder.Build()
for i, service := range r.task.Services {
name := r.taskEnv.ReplaceEnv(service.Name)
name := taskEnv.ReplaceEnv(service.Name)
if err := service.ValidateName(name); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("service (%d) failed validation: %v", i, err))
}
@@ -851,12 +841,7 @@ func (r *TaskRunner) writeToken(token string) error {
func (r *TaskRunner) updatedTokenHandler() {
// Update the tasks environment
if err := r.setTaskEnv(); err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
return
}
r.envBuilder.SetVaultToken(r.vaultFuture.Get(), r.task.Vault.Env)
if r.templateManager != nil {
r.templateManager.Stop()
@@ -864,7 +849,7 @@ func (r *TaskRunner) updatedTokenHandler() {
// Create a new templateManager
var err error
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates,
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.getTaskEnv())
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder.Build())
if err != nil {
err := fmt.Errorf("failed to build task's template manager: %v", err)
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
@@ -888,14 +873,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
return
}
r.logger.Printf("[DEBUG] client: retrieved Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID)
}
if err := r.setTaskEnv(); err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
resultCh <- false
return
r.envBuilder.SetVaultToken(r.vaultFuture.Get(), r.task.Vault.Env)
}
// If the job is a dispatch job and there is a payload write it to disk
@@ -932,6 +910,8 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
}
for {
taskEnv := r.envBuilder.Build()
r.persistLock.Lock()
downloaded := r.artifactsDownloaded
r.persistLock.Unlock()
@@ -940,7 +920,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
if !downloaded && len(r.task.Artifacts) > 0 {
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
for _, artifact := range r.task.Artifacts {
if err := getter.GetArtifact(r.getTaskEnv(), artifact, r.taskDir.Dir); err != nil {
if err := getter.GetArtifact(taskEnv, artifact, r.taskDir.Dir); err != nil {
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
r.logger.Printf("[DEBUG] client: %v", wrapped)
r.setState(structs.TaskStatePending,
@@ -971,7 +951,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
if r.templateManager == nil {
var err error
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates,
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.getTaskEnv())
r.config, r.vaultFuture.Get(), r.taskDir.Dir, taskEnv)
if err != nil {
err := fmt.Errorf("failed to build task's template manager: %v", err)
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
@@ -1339,10 +1319,10 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) {
// startTask creates the driver, task dir, and starts the task.
func (r *TaskRunner) startTask() error {
// Env vars may have been updated prior to task starting, so update the
// env vars before starting the task
if err := r.setTaskEnv(); err != nil {
return fmt.Errorf("failed updating environment before starting task: %v", err)
// Load any env templates into environment
if err := r.loadTemplateEnv(); err != nil {
//FIXME should we soft fail here?
return fmt.Errorf("failed to load env vars from templates: %v", err)
}
// Create a driver
@@ -1398,6 +1378,37 @@ func (r *TaskRunner) startTask() error {
return nil
}
// loadTemplateEnv loads task environment variables from templates.
func (r *TaskRunner) loadTemplateEnv() error {
var merr multierror.Error
all := make(map[string]string)
for _, tmpl := range r.task.Templates {
if !tmpl.Envvars {
continue
}
f, err := os.Open(filepath.Join(r.taskDir.Dir, tmpl.DestPath))
if err != nil {
r.logger.Printf("[DEBUG] client: cannot load env vars from %q", tmpl.DestPath)
// It's not an error for the template to not be rendered yet
if !os.IsNotExist(err) {
merr.Errors = append(merr.Errors, err)
}
continue
}
defer f.Close()
vars, err := parseEnvFile(f)
if err != nil {
merr.Errors = append(merr.Errors, err)
continue
}
for k, v := range vars {
all[k] = v
}
}
r.envBuilder.SetTemplateEnv(all)
return merr.ErrorOrNil()
}
// registerServices and checks with Consul.
func (r *TaskRunner) registerServices(d driver.Driver, h driver.ScriptExecutor) error {
var exec driver.ScriptExecutor
@@ -1405,13 +1416,13 @@ func (r *TaskRunner) registerServices(d driver.Driver, h driver.ScriptExecutor)
// Allow set the script executor if the driver supports it
exec = h
}
interpolateServices(r.getTaskEnv(), r.task)
interpolateServices(r.envBuilder.Build(), r.task)
return r.consul.RegisterTask(r.alloc.ID, r.task, exec)
}
// interpolateServices interpolates tags in a service and checks with values from the
// task's environment.
func interpolateServices(taskEnv *env.TaskEnvironment, task *structs.Task) {
func interpolateServices(taskEnv *env.TaskEnv, task *structs.Task) {
for _, service := range task.Services {
for _, check := range service.Checks {
check.Name = taskEnv.ReplaceEnv(check.Name)
@@ -1580,11 +1591,7 @@ func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, ol
// Allow set the script executor if the driver supports it
exec = h
}
newTaskEnv, err := driver.GetTaskEnv(r.taskDir, r.config.Node, new, newAlloc, r.config, r.vaultFuture.Get())
if err != nil {
return err
}
interpolateServices(newTaskEnv, new)
interpolateServices(r.envBuilder.Build(), new)
return r.consul.UpdateTask(r.alloc.ID, old, new, exec)
}

View File

@@ -1,8 +1,11 @@
package client
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
@@ -86,3 +89,38 @@ func pre060RestoreState(path string, data interface{}) error {
}
return nil
}
// parseEnvFile and return a map of the environment variables suitable for
// TaskEnvironment.AppendEnvvars or an error.
//
// See nomad/structs#Template.Envvars comment for format.
func parseEnvFile(r io.Reader) (map[string]string, error) {
vars := make(map[string]string, 50)
lines := 0
scanner := bufio.NewScanner(r)
for scanner.Scan() {
lines++
buf := scanner.Bytes()
if len(buf) == 0 {
// Skip empty lines
continue
}
if buf[0] == '#' {
// Skip lines starting with a #
continue
}
n := bytes.IndexByte(buf, '=')
if n == -1 {
return nil, fmt.Errorf("error on line %d: no '=' sign: %q", lines, string(buf))
}
if len(buf) > n {
vars[string(buf[0:n])] = string(buf[n+1 : len(buf)])
} else {
vars[string(buf[0:n])] = ""
}
}
if err := scanner.Err(); err != nil {
return nil, err
}
return vars, nil
}