diff --git a/client/consul_template.go b/client/consul_template.go index e3f534f7e..8dbe58f91 100644 --- a/client/consul_template.go +++ b/client/consul_template.go @@ -76,7 +76,7 @@ type TaskTemplateManager struct { func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template, config *config.Config, vaultToken, taskDir string, - taskEnv *env.TaskEnvironment) (*TaskTemplateManager, error) { + taskEnv *env.TaskEnv) (*TaskTemplateManager, error) { // Check pre-conditions if hook == nil { @@ -317,7 +317,7 @@ func (tm *TaskTemplateManager) allTemplatesNoop() bool { // lookup by destination to the template. If no templates are given, a nil // template runner and lookup is returned. func templateRunner(tmpls []*structs.Template, config *config.Config, - vaultToken, taskDir string, taskEnv *env.TaskEnvironment) ( + vaultToken, taskDir string, taskEnv *env.TaskEnv) ( *manager.Runner, map[string][]*structs.Template, error) { if len(tmpls) == 0 { @@ -350,7 +350,7 @@ func templateRunner(tmpls []*structs.Template, config *config.Config, } // Set Nomad's environment variables - runner.Env = taskEnv.Build().EnvMapAll() + runner.Env = taskEnv.All() // Build the lookup idMap := runner.TemplateConfigMapping() @@ -368,9 +368,7 @@ func templateRunner(tmpls []*structs.Template, config *config.Config, // parseTemplateConfigs converts the tasks templates into consul-templates func parseTemplateConfigs(tmpls []*structs.Template, taskDir string, - taskEnv *env.TaskEnvironment, allowAbs bool) (map[ctconf.TemplateConfig]*structs.Template, error) { - // Build the task environment - taskEnv.Build() + taskEnv *env.TaskEnv, allowAbs bool) (map[ctconf.TemplateConfig]*structs.Template, error) { ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(tmpls)) for _, tmpl := range tmpls { diff --git a/client/driver/docker.go b/client/driver/docker.go index e108f68ef..3388394e0 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -180,7 +180,7 @@ func (c *DockerDriverConfig) Validate() error { // NewDockerDriverConfig returns a docker driver config by parsing the HCL // config -func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*DockerDriverConfig, error) { +func NewDockerDriverConfig(task *structs.Task, envBuilder *env.Builder) (*DockerDriverConfig, error) { var dconf DockerDriverConfig if err := mapstructure.WeakDecode(task.Config, &dconf); err != nil { @@ -188,6 +188,7 @@ func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*Docke } // Interpolate everthing that is a string + env := envBuilder.Build() dconf.ImageName = env.ReplaceEnv(dconf.ImageName) dconf.Command = env.ReplaceEnv(dconf.Command) dconf.IpcMode = env.ReplaceEnv(dconf.IpcMode) @@ -456,7 +457,7 @@ func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoord } func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) { - driverConfig, err := NewDockerDriverConfig(task, d.taskEnv) + driverConfig, err := NewDockerDriverConfig(task, d.envBuilder) if err != nil { return nil, err } @@ -495,7 +496,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, + TaskEnv: d.envBuilder.Build(), Task: task, Driver: "docker", AllocID: d.DriverContext.allocID, @@ -899,14 +900,14 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas d.logger.Printf("[DEBUG] driver.docker: exposed port %s", containerPort) } - d.taskEnv.SetPortMap(driverConfig.PortMap) + d.envBuilder.SetPortMap(driverConfig.PortMap) hostConfig.PortBindings = publishedPorts config.ExposedPorts = exposedPorts } - d.taskEnv.Build() - parsedArgs := d.taskEnv.ParseAndReplace(driverConfig.Args) + taskEnv := d.envBuilder.Build() + parsedArgs := taskEnv.ParseAndReplace(driverConfig.Args) // If the user specified a custom command to run, we'll inject it here. if driverConfig.Command != "" { @@ -930,7 +931,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas d.logger.Printf("[DEBUG] driver.docker: applied labels on the container: %+v", config.Labels) } - config.Env = d.taskEnv.EnvList() + config.Env = taskEnv.List() containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID) d.logger.Printf("[DEBUG] driver.docker: setting container name to: %s", containerName) diff --git a/client/driver/driver.go b/client/driver/driver.go index 8a3af3b8f..0f2796ac3 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -1,8 +1,6 @@ package driver import ( - "bufio" - "bytes" "context" "crypto/md5" "errors" @@ -10,8 +8,6 @@ import ( "io" "log" "os" - "path/filepath" - "strings" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -50,8 +46,8 @@ func NewDriver(name string, ctx *DriverContext) (Driver, error) { } // Instantiate the driver - f := factory(ctx) - return f, nil + d := factory(ctx) + return d, nil } // Factory is used to instantiate a new Driver @@ -224,12 +220,12 @@ type LogEventFn func(message string, args ...interface{}) // node attributes into a Driver without having to change the Driver interface // each time we do it. Used in conjection with Factory, above. type DriverContext struct { - taskName string - allocID string - config *config.Config - logger *log.Logger - node *structs.Node - taskEnv *env.TaskEnvironment + taskName string + allocID string + config *config.Config + logger *log.Logger + node *structs.Node + envBuilder *env.Builder emitEvent LogEventFn } @@ -245,15 +241,15 @@ func NewEmptyDriverContext() *DriverContext { // private to the driver. If we want to change this later we can gorename all of // the fields in DriverContext. func NewDriverContext(taskName, allocID string, config *config.Config, node *structs.Node, - logger *log.Logger, taskEnv *env.TaskEnvironment, eventEmitter LogEventFn) *DriverContext { + logger *log.Logger, envBuilder *env.Builder, eventEmitter LogEventFn) *DriverContext { return &DriverContext{ - taskName: taskName, - allocID: allocID, - config: config, - node: node, - logger: logger, - taskEnv: taskEnv, - emitEvent: eventEmitter, + taskName: taskName, + allocID: allocID, + config: config, + node: node, + logger: logger, + envBuilder: envBuilder, + emitEvent: eventEmitter, } } @@ -303,116 +299,6 @@ func NewExecContext(td *allocdir.TaskDir) *ExecContext { } } -// GetTaskEnv converts the alloc dir, the node, task and alloc into a -// TaskEnvironment. -func GetTaskEnv(taskDir *allocdir.TaskDir, node *structs.Node, - task *structs.Task, alloc *structs.Allocation, conf *config.Config, - vaultToken string) (*env.TaskEnvironment, error) { - - env := env.NewTaskEnvironment(node). - SetTaskMeta(alloc.Job.CombinedTaskMeta(alloc.TaskGroup, task.Name)). - SetJobName(alloc.Job.Name). - SetDatacenterName(node.Datacenter). - SetRegionName(conf.Region). - SetEnvvars(task.Env). - SetTaskName(task.Name) - - // Set env vars from env files - for _, tmpl := range task.Templates { - if !tmpl.Envvars { - continue - } - f, err := os.Open(filepath.Join(taskDir.Dir, tmpl.DestPath)) - if err != nil { - //FIXME GetTaskEnv may be called before env files are written - log.Printf("[DEBUG] driver: XXX FIXME Templates not rendered yet, skipping") - continue - } - defer f.Close() - vars, err := parseEnvFile(f) - if err != nil { - //TODO soft or hard fail?! - return nil, err - } - env.AppendEnvvars(vars) - } - - // Vary paths by filesystem isolation used - drv, err := NewDriver(task.Driver, NewEmptyDriverContext()) - if err != nil { - return nil, err - } - switch drv.FSIsolation() { - case cstructs.FSIsolationNone: - // Use host paths - env.SetAllocDir(taskDir.SharedAllocDir) - env.SetTaskLocalDir(taskDir.LocalDir) - env.SetSecretsDir(taskDir.SecretsDir) - default: - // filesystem isolation; use container paths - env.SetAllocDir(allocdir.SharedAllocContainerPath) - env.SetTaskLocalDir(allocdir.TaskLocalContainerPath) - env.SetSecretsDir(allocdir.TaskSecretsContainerPath) - } - - if task.Resources != nil { - env.SetMemLimit(task.Resources.MemoryMB). - SetCpuLimit(task.Resources.CPU). - SetNetworks(task.Resources.Networks) - } - - if alloc != nil { - env.SetAlloc(alloc) - } - - if task.Vault != nil { - env.SetVaultToken(vaultToken, task.Vault.Env) - } - - // Set the host environment variables for non-image based drivers - if drv.FSIsolation() != cstructs.FSIsolationImage { - filter := strings.Split(conf.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - env.AppendHostEnvvars(filter) - } - - return env.Build(), nil -} - -// parseEnvFile and return a map of the environment variables suitable for -// TaskEnvironment.AppendEnvvars or an error. -// -// See nomad/structs#Template.Envvars comment for format. -func parseEnvFile(r io.Reader) (map[string]string, error) { - vars := make(map[string]string, 50) - lines := 0 - scanner := bufio.NewScanner(r) - for scanner.Scan() { - lines++ - buf := scanner.Bytes() - if len(buf) == 0 { - // Skip empty lines - continue - } - if buf[0] == '#' { - // Skip lines starting with a # - continue - } - n := bytes.IndexByte(buf, '=') - if n == -1 { - return nil, fmt.Errorf("error on line %d: no '=' sign: %q", lines, string(buf)) - } - if len(buf) > n { - vars[string(buf[0:n])] = string(buf[n+1 : len(buf)]) - } else { - vars[string(buf[0:n])] = "" - } - } - if err := scanner.Err(); err != nil { - return nil, err - } - return vars, nil -} - func mapMergeStrInt(maps ...map[string]int) map[string]int { out := map[string]int{} for _, in := range maps { diff --git a/client/driver/env/env.go b/client/driver/env/env.go index 4059f7472..fbfff166f 100644 --- a/client/driver/env/env.go +++ b/client/driver/env/env.go @@ -6,6 +6,7 @@ import ( "os" "strconv" "strings" + "sync" "github.com/hashicorp/nomad/helper" hargs "github.com/hashicorp/nomad/helper/args" @@ -89,464 +90,431 @@ const ( nodeMetaPrefix = "meta." ) -// TaskEnvironment is used to expose information to a task via environment -// variables and provide interpolation of Nomad variables. -type TaskEnvironment struct { - Env map[string]string - TaskMeta map[string]string - AllocDir string - TaskDir string - SecretsDir string - CpuLimit int - MemLimit int - TaskName string - AllocIndex int - Datacenter string - Region string - AllocId string - AllocName string - Node *structs.Node - Networks []*structs.NetworkResource - PortMap map[string]int - VaultToken string - InjectVaultToken bool - JobName string - Alloc *structs.Allocation +// TaskEnv is a task's environment as well as node attribute's for +// interpolation. +type TaskEnv struct { + // NodeAttrs is the map of node attributes for interpolation + NodeAttrs map[string]string - // taskEnv is the variables that will be set in the tasks environment - TaskEnv map[string]string + // EnvMap is the map of environment variables + EnvMap map[string]string - // nodeValues is the values that are allowed for interprolation from the - // node. - NodeValues map[string]string + // envList is a memoized list created by List() + envList []string } -func NewTaskEnvironment(node *structs.Node) *TaskEnvironment { - return &TaskEnvironment{Node: node, AllocIndex: -1} +// NewTaskEnv creates a new task environment with the given environment and +// node attribute maps. +func NewTaskEnv(env, node map[string]string) *TaskEnv { + return &TaskEnv{ + NodeAttrs: node, + EnvMap: env, + } } -// ParseAndReplace takes the user supplied args replaces any instance of an -// environment variable or nomad variable in the args with the actual value. -func (t *TaskEnvironment) ParseAndReplace(args []string) []string { - replaced := make([]string, len(args)) - for i, arg := range args { - replaced[i] = hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues) +// List returns the task's environment as a slice of NAME=value pair strings. +func (t *TaskEnv) List() []string { + if t.envList != nil { + return t.envList } - return replaced -} - -// ReplaceEnv takes an arg and replaces all occurrences of environment variables -// and nomad variables. If the variable is found in the passed map it is -// replaced, otherwise the original string is returned. -func (t *TaskEnvironment) ReplaceEnv(arg string) string { - return hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues) -} - -// Build must be called after all the tasks environment values have been set. -func (t *TaskEnvironment) Build() *TaskEnvironment { - t.NodeValues = make(map[string]string) - t.TaskEnv = make(map[string]string) - - // Build the meta - for k, v := range t.TaskMeta { - t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v - t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, k)] = v - } - - // Build the ports - for _, network := range t.Networks { - for label, value := range network.MapLabelToValues(nil) { - t.TaskEnv[fmt.Sprintf("%s%s", IpPrefix, label)] = network.IP - t.TaskEnv[fmt.Sprintf("%s%s", HostPortPrefix, label)] = strconv.Itoa(value) - if forwardedPort, ok := t.PortMap[label]; ok { - value = forwardedPort - } - t.TaskEnv[fmt.Sprintf("%s%s", PortPrefix, label)] = strconv.Itoa(value) - IPPort := net.JoinHostPort(network.IP, strconv.Itoa(value)) - t.TaskEnv[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort - - } - } - - // Build the directories - if t.AllocDir != "" { - t.TaskEnv[AllocDir] = t.AllocDir - } - if t.TaskDir != "" { - t.TaskEnv[TaskLocalDir] = t.TaskDir - } - if t.SecretsDir != "" { - t.TaskEnv[SecretsDir] = t.SecretsDir - } - - // Build the resource limits - if t.MemLimit != 0 { - t.TaskEnv[MemLimit] = strconv.Itoa(t.MemLimit) - } - if t.CpuLimit != 0 { - t.TaskEnv[CpuLimit] = strconv.Itoa(t.CpuLimit) - } - - // Build the tasks ids - if t.AllocId != "" { - t.TaskEnv[AllocID] = t.AllocId - } - if t.AllocName != "" { - t.TaskEnv[AllocName] = t.AllocName - } - if t.AllocIndex != -1 { - t.TaskEnv[AllocIndex] = strconv.Itoa(t.AllocIndex) - } - if t.TaskName != "" { - t.TaskEnv[TaskName] = t.TaskName - } - if t.JobName != "" { - t.TaskEnv[JobName] = t.JobName - } - if t.Datacenter != "" { - t.TaskEnv[Datacenter] = t.Datacenter - } - if t.Region != "" { - t.TaskEnv[Region] = t.Region - } - - // Build the addr of the other tasks - if t.Alloc != nil { - for taskName, resources := range t.Alloc.TaskResources { - if taskName == t.TaskName { - continue - } - for _, nw := range resources.Networks { - ports := make([]structs.Port, 0, len(nw.ReservedPorts)+len(nw.DynamicPorts)) - for _, port := range nw.ReservedPorts { - ports = append(ports, port) - } - for _, port := range nw.DynamicPorts { - ports = append(ports, port) - } - for _, p := range ports { - key := fmt.Sprintf("%s%s_%s", AddrPrefix, taskName, p.Label) - t.TaskEnv[key] = fmt.Sprintf("%s:%d", nw.IP, p.Value) - key = fmt.Sprintf("%s%s_%s", IpPrefix, taskName, p.Label) - t.TaskEnv[key] = nw.IP - key = fmt.Sprintf("%s%s_%s", PortPrefix, taskName, p.Label) - t.TaskEnv[key] = strconv.Itoa(p.Value) - } - } - } - } - - // Build the node - if t.Node != nil { - // Set up the node values. - t.NodeValues[nodeIdKey] = t.Node.ID - t.NodeValues[nodeDcKey] = t.Node.Datacenter - t.NodeValues[nodeRegionKey] = t.Region - t.NodeValues[nodeNameKey] = t.Node.Name - t.NodeValues[nodeClassKey] = t.Node.NodeClass - - // Set up the attributes. - for k, v := range t.Node.Attributes { - t.NodeValues[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v - } - - // Set up the meta. - for k, v := range t.Node.Meta { - t.NodeValues[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v - } - } - - // Build the Vault Token - if t.InjectVaultToken && t.VaultToken != "" { - t.TaskEnv[VaultToken] = t.VaultToken - } - - // Interpret the environment variables - interpreted := make(map[string]string, len(t.Env)) - for k, v := range t.Env { - interpreted[k] = hargs.ReplaceEnv(v, t.NodeValues, t.TaskEnv) - } - - for k, v := range interpreted { - t.TaskEnv[k] = v - } - - // Clean keys (see #2405) - cleanedEnv := make(map[string]string, len(t.TaskEnv)) - for k, v := range t.TaskEnv { - cleanedK := helper.CleanEnvVar(k, '_') - cleanedEnv[cleanedK] = v - } - t.TaskEnv = cleanedEnv - - return t -} - -// EnvList returns a list of strings with NAME=value pairs. -func (t *TaskEnvironment) EnvList() []string { env := []string{} - for k, v := range t.TaskEnv { + for k, v := range t.EnvMap { env = append(env, fmt.Sprintf("%s=%s", k, v)) } return env } -// EnvMap returns a copy of the tasks environment variables. -func (t *TaskEnvironment) EnvMap() map[string]string { - m := make(map[string]string, len(t.TaskEnv)) - for k, v := range t.TaskEnv { +// Map of the task's environment variables. +func (t *TaskEnv) Map() map[string]string { + m := make(map[string]string, len(t.EnvMap)) + for k, v := range t.EnvMap { m[k] = v } return m } -// EnvMapAll returns the environment variables that will be set as well as node -// meta/attrs in the map. This is appropriate for interpolation. -func (t *TaskEnvironment) EnvMapAll() map[string]string { - m := make(map[string]string, len(t.TaskEnv)) - for k, v := range t.TaskEnv { +// All of the task's environment variables and the node's attributes in a +// single map. +func (t *TaskEnv) All() map[string]string { + m := make(map[string]string, len(t.EnvMap)+len(t.NodeAttrs)) + for k, v := range t.EnvMap { m[k] = v } - for k, v := range t.NodeValues { + for k, v := range t.NodeAttrs { m[k] = v } return m } -// Builder methods to build the TaskEnvironment -func (t *TaskEnvironment) SetAllocDir(dir string) *TaskEnvironment { - t.AllocDir = dir - return t +// ParseAndReplace takes the user supplied args replaces any instance of an +// environment variable or Nomad variable in the args with the actual value. +func (t *TaskEnv) ParseAndReplace(args []string) []string { + replaced := make([]string, len(args)) + for i, arg := range args { + replaced[i] = hargs.ReplaceEnv(arg, t.EnvMap, t.NodeAttrs) + } + + return replaced } -func (t *TaskEnvironment) ClearAllocDir() *TaskEnvironment { - t.AllocDir = "" - return t +// ReplaceEnv takes an arg and replaces all occurrences of environment variables +// and Nomad variables. If the variable is found in the passed map it is +// replaced, otherwise the original string is returned. +func (t *TaskEnv) ReplaceEnv(arg string) string { + return hargs.ReplaceEnv(arg, t.EnvMap, t.NodeAttrs) } -func (t *TaskEnvironment) SetTaskLocalDir(dir string) *TaskEnvironment { - t.TaskDir = dir - return t +// Builder is used to build task environment's and is safe for concurrent use. +type Builder struct { + // envvars are custom set environment variables + envvars map[string]string + + // templateEnv are env vars set from templates + templateEnv map[string]string + + // hostEnv are environment variables filtered from the host + hostEnv map[string]string + + // nodeAttrs are Node attributes and metadata + nodeAttrs map[string]string + + // taskMeta are the meta attributes on the task + taskMeta map[string]string + + // allocDir from task's perspective; eg /alloc + allocDir string + + // localDir from task's perspective; eg /local + localDir string + + // secrestsDir from task's perspective; eg /secrets + secretsDir string + + cpuLimit int + memLimit int + taskName string + allocIndex int + datacenter string + region string + allocId string + allocName string + portMap map[string]string + vaultToken string + injectVaultToken bool + jobName string + + // otherPorts for tasks in the same alloc + otherPorts map[string]string + + // networks related environment variables + networks map[string]string + + mu *sync.RWMutex } -func (t *TaskEnvironment) ClearTaskLocalDir() *TaskEnvironment { - t.TaskDir = "" - return t +// NewBuilder creates a new task environment builder. +func NewBuilder(node *structs.Node, taskName string) *Builder { + b := &Builder{ + taskName: taskName, + envvars: make(map[string]string), + nodeAttrs: make(map[string]string), + mu: &sync.RWMutex{}, + } + return b.setNode(node) } -func (t *TaskEnvironment) SetSecretsDir(dir string) *TaskEnvironment { - t.SecretsDir = dir - return t +// Build must be called after all the tasks environment values have been set. +func (b *Builder) Build() *TaskEnv { + nodeAttrs := make(map[string]string) + envMap := make(map[string]string) + + b.mu.RLock() + defer b.mu.RUnlock() + + // Add the directories + if b.allocDir != "" { + envMap[AllocDir] = b.allocDir + } + if b.localDir != "" { + envMap[TaskLocalDir] = b.localDir + } + if b.secretsDir != "" { + envMap[SecretsDir] = b.secretsDir + } + + // Add the resource limits + if b.memLimit != 0 { + envMap[MemLimit] = strconv.Itoa(b.memLimit) + } + if b.cpuLimit != 0 { + envMap[CpuLimit] = strconv.Itoa(b.cpuLimit) + } + + // Add the task metadata + if b.allocId != "" { + envMap[AllocID] = b.allocId + } + if b.allocName != "" { + envMap[AllocName] = b.allocName + } + if b.allocIndex != -1 { + envMap[AllocIndex] = strconv.Itoa(b.allocIndex) + } + if b.taskName != "" { + envMap[TaskName] = b.taskName + } + if b.jobName != "" { + envMap[JobName] = b.jobName + } + if b.datacenter != "" { + envMap[Datacenter] = b.datacenter + } + if b.region != "" { + envMap[Region] = b.region + + // Copy region over to node attrs + nodeAttrs[nodeRegionKey] = b.region + } + + // Build the addr of the other tasks + for k, v := range b.otherPorts { + envMap[k] = v + } + + // Build the Vault Token + if b.injectVaultToken && b.vaultToken != "" { + envMap[VaultToken] = b.vaultToken + } + + // Copy node attributes + for k, v := range b.nodeAttrs { + nodeAttrs[k] = v + } + + // Interpolate and add environment variables + for k, v := range b.hostEnv { + envMap[k] = hargs.ReplaceEnv(v, nodeAttrs, envMap) + } + + // Copy task env vars second as they override host env vars + for k, v := range b.envvars { + envMap[k] = v + } + + // Copy template env vars third as they override task env vars + for k, v := range b.templateEnv { + envMap[k] = v + } + + // Clean keys (see #2405) + cleanedEnv := make(map[string]string, len(envMap)) + for k, v := range envMap { + cleanedK := helper.CleanEnvVar(k, '_') + cleanedEnv[cleanedK] = v + } + + return NewTaskEnv(cleanedEnv, nodeAttrs) } -func (t *TaskEnvironment) ClearSecretsDir() *TaskEnvironment { - t.SecretsDir = "" - return t +// setNode is called from NewBuilder to populate node attributes. +func (b *Builder) setNode(n *structs.Node) *Builder { + b.nodeAttrs[nodeIdKey] = n.ID + b.nodeAttrs[nodeDcKey] = n.Datacenter + b.nodeAttrs[nodeNameKey] = n.Name + b.nodeAttrs[nodeClassKey] = n.NodeClass + + // Set up the attributes. + for k, v := range n.Attributes { + b.nodeAttrs[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v + } + + // Set up the meta. + for k, v := range n.Meta { + b.nodeAttrs[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v + } + return b } -func (t *TaskEnvironment) SetMemLimit(limit int) *TaskEnvironment { - t.MemLimit = limit - return t +func (b *Builder) SetJobName(name string) *Builder { + b.mu.Lock() + b.jobName = name + b.mu.Unlock() + return b } -func (t *TaskEnvironment) ClearMemLimit() *TaskEnvironment { - t.MemLimit = 0 - return t +func (b *Builder) SetRegion(r string) *Builder { + b.mu.Lock() + b.region = r + b.mu.Unlock() + return b } -func (t *TaskEnvironment) SetCpuLimit(limit int) *TaskEnvironment { - t.CpuLimit = limit - return t +func (b *Builder) SetAllocDir(dir string) *Builder { + b.mu.Lock() + b.allocDir = dir + b.mu.Unlock() + return b } -func (t *TaskEnvironment) ClearCpuLimit() *TaskEnvironment { - t.CpuLimit = 0 - return t +func (b *Builder) SetTaskLocalDir(dir string) *Builder { + b.mu.Lock() + b.localDir = dir + b.mu.Unlock() + return b } -func (t *TaskEnvironment) SetNetworks(networks []*structs.NetworkResource) *TaskEnvironment { - t.Networks = networks - return t +func (b *Builder) SetSecretsDir(dir string) *Builder { + b.mu.Lock() + b.secretsDir = dir + b.mu.Unlock() + return b } -func (t *TaskEnvironment) clearNetworks() *TaskEnvironment { - t.Networks = nil - return t +func (b *Builder) SetResources(r *structs.Resources) *Builder { + if r == nil { + return b + } + + // Build up env map for network addresses + newNetworks := make(map[string]string, len(r.Networks)*4) + for _, network := range r.Networks { + for label, intVal := range network.MapLabelToValues(nil) { + value := strconv.Itoa(intVal) + newNetworks[fmt.Sprintf("%s%s", IpPrefix, label)] = network.IP + newNetworks[fmt.Sprintf("%s%s", HostPortPrefix, label)] = value + if forwardedPort, ok := b.portMap[label]; ok { + value = forwardedPort + } + newNetworks[fmt.Sprintf("%s%s", PortPrefix, label)] = value + IPPort := net.JoinHostPort(network.IP, value) + newNetworks[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort + + } + } + + b.mu.Lock() + b.memLimit = r.MemoryMB + b.cpuLimit = r.CPU + b.networks = newNetworks + b.mu.Unlock() + return b } -func (t *TaskEnvironment) SetPortMap(portMap map[string]int) *TaskEnvironment { - t.PortMap = portMap - return t -} - -func (t *TaskEnvironment) clearPortMap() *TaskEnvironment { - t.PortMap = nil - return t +func (b *Builder) SetPortMap(portMap map[string]int) *Builder { + newPortMap := make(map[string]string, len(portMap)) + for k, v := range portMap { + newPortMap[k] = strconv.Itoa(v) + } + b.mu.Lock() + b.portMap = newPortMap + b.mu.Unlock() + return b } // Takes a map of meta values to be passed to the task. The keys are capatilized // when the environent variable is set. -func (t *TaskEnvironment) SetTaskMeta(m map[string]string) *TaskEnvironment { - t.TaskMeta = m - return t -} - -func (t *TaskEnvironment) ClearTaskMeta() *TaskEnvironment { - t.TaskMeta = nil - return t -} - -func (t *TaskEnvironment) SetEnvvars(m map[string]string) *TaskEnvironment { - t.Env = m - return t -} - -// Appends the given environment variables. -func (t *TaskEnvironment) AppendEnvvars(m map[string]string) *TaskEnvironment { - if t.Env == nil { - t.Env = make(map[string]string, len(m)) - } +func (b *Builder) SetTaskMeta(m map[string]string) *Builder { + newM := make(map[string]string, len(m)*2) for k, v := range m { - t.Env[k] = v + newM[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v + newM[fmt.Sprintf("%s%s", MetaPrefix, k)] = v } - return t + + b.mu.Lock() + b.taskMeta = newM + b.mu.Unlock() + return b } -// AppendHostEnvvars adds the host environment variables to the tasks. The -// filter parameter can be use to filter host environment from entering the -// tasks. -func (t *TaskEnvironment) AppendHostEnvvars(filter []string) *TaskEnvironment { - hostEnv := os.Environ() - if t.Env == nil { - t.Env = make(map[string]string, len(hostEnv)) +// Merge the given environment variables into existing ones. +func (b *Builder) MergeEnvvars(m map[string]string) *Builder { + b.mu.Lock() + for k, v := range m { + b.envvars[k] = v } + b.mu.Unlock() + return b +} - // Index the filtered environment variables. - index := make(map[string]struct{}, len(filter)) +// SetHostEnvvars adds the host environment variables to the tasks. The filter +// parameter can be use to filter host environment from entering the tasks. +func (b *Builder) SetHostEnvvars(filter []string) *Builder { + filterMap := make(map[string]struct{}, len(filter)) for _, f := range filter { - index[f] = struct{}{} + filterMap[f] = struct{}{} } - for _, e := range hostEnv { + fullHostEnv := os.Environ() + filteredHostEnv := make(map[string]string, len(fullHostEnv)) + for _, e := range fullHostEnv { parts := strings.SplitN(e, "=", 2) key, value := parts[0], parts[1] // Skip filtered environment variables - if _, filtered := index[key]; filtered { + if _, filtered := filterMap[key]; filtered { continue } - // Don't override the tasks environment variables. - if _, existing := t.Env[key]; !existing { - t.Env[key] = value - } + filteredHostEnv[key] = value } - return t + b.mu.Lock() + b.hostEnv = filteredHostEnv + b.mu.Unlock() + return b } -func (t *TaskEnvironment) ClearEnvvars() *TaskEnvironment { - t.Env = nil - return t +// SetAlloc related environment variables. +func (b *Builder) SetAlloc(alloc *structs.Allocation) *Builder { + b.mu.Lock() + defer b.mu.Unlock() + b.allocId = alloc.ID + b.allocName = alloc.Name + b.allocIndex = alloc.Index() + + // Add ports from other tasks + for taskName, resources := range alloc.TaskResources { + if taskName == b.taskName { + continue + } + for _, nw := range resources.Networks { + for _, p := range nw.ReservedPorts { + addPort(b.otherPorts, b.taskName, nw.IP, p.Label, p.Value) + } + for _, p := range nw.DynamicPorts { + addPort(b.otherPorts, b.taskName, nw.IP, p.Label, p.Value) + } + } + } + return b } -// Helper method for setting all fields from an allocation. -func (t *TaskEnvironment) SetAlloc(alloc *structs.Allocation) *TaskEnvironment { - t.AllocId = alloc.ID - t.AllocName = alloc.Name - t.AllocIndex = alloc.Index() - t.Alloc = alloc - return t +func (b *Builder) SetTemplateEnv(m map[string]string) *Builder { + b.mu.Lock() + b.templateEnv = m + b.mu.Unlock() + return b } -// Helper method for clearing all fields from an allocation. -func (t *TaskEnvironment) ClearAlloc(alloc *structs.Allocation) *TaskEnvironment { - return t.ClearAllocId().ClearAllocName().ClearAllocIndex() +func (b *Builder) SetVaultToken(token string, inject bool) *Builder { + b.mu.Lock() + b.vaultToken = token + b.injectVaultToken = inject + b.mu.Unlock() + return b } -func (t *TaskEnvironment) SetAllocIndex(index int) *TaskEnvironment { - t.AllocIndex = index - return t -} - -func (t *TaskEnvironment) ClearAllocIndex() *TaskEnvironment { - t.AllocIndex = -1 - return t -} - -func (t *TaskEnvironment) SetAllocId(id string) *TaskEnvironment { - t.AllocId = id - return t -} - -func (t *TaskEnvironment) ClearAllocId() *TaskEnvironment { - t.AllocId = "" - return t -} - -func (t *TaskEnvironment) SetAllocName(name string) *TaskEnvironment { - t.AllocName = name - return t -} - -func (t *TaskEnvironment) ClearAllocName() *TaskEnvironment { - t.AllocName = "" - return t -} - -func (t *TaskEnvironment) SetTaskName(name string) *TaskEnvironment { - t.TaskName = name - return t -} - -func (t *TaskEnvironment) ClearTaskName() *TaskEnvironment { - t.TaskName = "" - return t -} - -func (t *TaskEnvironment) SetJobName(name string) *TaskEnvironment { - t.JobName = name - return t -} - -func (t *TaskEnvironment) ClearJobName() *TaskEnvironment { - t.JobName = "" - return t -} - -func (t *TaskEnvironment) SetDatacenterName(name string) *TaskEnvironment { - t.Datacenter = name - return t -} - -func (t *TaskEnvironment) ClearDatacenterName() *TaskEnvironment { - t.Datacenter = "" - return t -} - -func (t *TaskEnvironment) SetRegionName(name string) *TaskEnvironment { - t.Region = name - return t -} - -func (t *TaskEnvironment) ClearRegionName() *TaskEnvironment { - t.Region = "" - return t -} - -func (t *TaskEnvironment) SetVaultToken(token string, inject bool) *TaskEnvironment { - t.VaultToken = token - t.InjectVaultToken = inject - return t -} - -func (t *TaskEnvironment) ClearVaultToken() *TaskEnvironment { - t.VaultToken = "" - t.InjectVaultToken = false - return t +// addPort keys and values for other tasks to an env var map +func addPort(m map[string]string, taskName, ip, portLabel string, port int) { + key := fmt.Sprintf("%s%s_%s", AddrPrefix, taskName, portLabel) + m[key] = fmt.Sprintf("%s:%d", ip, port) + key = fmt.Sprintf("%s%s_%s", IpPrefix, taskName, portLabel) + m[key] = ip + key = fmt.Sprintf("%s%s_%s", PortPrefix, taskName, portLabel) + m[key] = strconv.Itoa(port) } diff --git a/client/driver/exec.go b/client/driver/exec.go index fdc3bbc01..dc544bb95 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -124,7 +124,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, + TaskEnv: d.envBuilder.Build(), Driver: "exec", AllocID: d.DriverContext.allocID, LogDir: ctx.TaskDir.LogDir, diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 9cf3ad2b5..0d556f5bf 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -66,7 +66,7 @@ type Executor interface { // wants to run and isolate it type ExecutorContext struct { // TaskEnv holds information about the environment of a Task - TaskEnv *env.TaskEnvironment + TaskEnv *env.TaskEnv // Task is the task whose executor is being launched Task *structs.Task @@ -229,7 +229,6 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro // set the task dir as the working directory for the command e.cmd.Dir = e.ctx.TaskDir - e.ctx.TaskEnv.Build() // configuring the chroot, resource container, and start the plugin // process in the chroot. if err := e.configureIsolation(); err != nil { @@ -274,7 +273,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro // Set the commands arguments e.cmd.Path = path e.cmd.Args = append([]string{e.cmd.Path}, e.ctx.TaskEnv.ParseAndReplace(command.Args)...) - e.cmd.Env = e.ctx.TaskEnv.EnvList() + e.cmd.Env = e.ctx.TaskEnv.List() // Start the process if err := e.cmd.Start(); err != nil { @@ -295,7 +294,7 @@ func (e *UniversalExecutor) Exec(deadline time.Time, name string, args []string) // ExecScript executes cmd with args and returns the output, exit code, and // error. Output is truncated to client/driver/structs.CheckBufSize -func ExecScript(ctx context.Context, dir string, env *env.TaskEnvironment, attrs *syscall.SysProcAttr, +func ExecScript(ctx context.Context, dir string, env *env.TaskEnv, attrs *syscall.SysProcAttr, name string, args []string) ([]byte, int, error) { name = env.ReplaceEnv(name) cmd := exec.CommandContext(ctx, name, env.ParseAndReplace(args)...) @@ -303,7 +302,7 @@ func ExecScript(ctx context.Context, dir string, env *env.TaskEnvironment, attrs // Copy runtime environment from the main command cmd.SysProcAttr = attrs cmd.Dir = dir - cmd.Env = env.EnvList() + cmd.Env = env.List() // Capture output buf, _ := circbuf.NewBuffer(int64(dstructs.CheckBufSize)) diff --git a/client/driver/java.go b/client/driver/java.go index 5801d2e83..cc21bf4af 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -179,7 +179,7 @@ func (d *JavaDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, e return nil, nil } -func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*JavaDriverConfig, error) { +func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnv) (*JavaDriverConfig, error) { var driverConfig JavaDriverConfig if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { return nil, err @@ -203,7 +203,7 @@ func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*JavaDri } func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { - driverConfig, err := NewJavaDriverConfig(task, d.taskEnv) + driverConfig, err := NewJavaDriverConfig(task, d.envBuilder.Build()) if err != nil { return nil, err } @@ -249,7 +249,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, // Set the context executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, + TaskEnv: d.envBuilder.Build(), Driver: "java", AllocID: d.DriverContext.allocID, Task: task, diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 856b2c023..ba2139da3 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -238,7 +238,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, + TaskEnv: d.envBuilder.Build(), Driver: "qemu", AllocID: d.DriverContext.allocID, Task: task, diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 558289fdd..b391409e8 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -50,7 +50,7 @@ type rawExecHandle struct { logger *log.Logger waitCh chan *dstructs.WaitResult doneCh chan struct{} - taskEnv *env.TaskEnvironment + taskEnv *env.TaskEnv taskDir *allocdir.TaskDir } @@ -133,7 +133,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, + TaskEnv: d.envBuilder.Build(), Driver: "raw_exec", AllocID: d.DriverContext.allocID, Task: task, @@ -169,7 +169,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl logger: d.logger, doneCh: make(chan struct{}), waitCh: make(chan *dstructs.WaitResult, 1), - taskEnv: d.taskEnv, + taskEnv: d.envBuilder.Build(), taskDir: ctx.TaskDir, } go h.run() @@ -218,7 +218,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e version: id.Version, doneCh: make(chan struct{}), waitCh: make(chan *dstructs.WaitResult, 1), - taskEnv: d.taskEnv, + taskEnv: d.envBuilder.Build(), taskDir: ctx.TaskDir, } go h.run() diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 0c5e350ef..54fb4450f 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -88,7 +88,7 @@ type RktDriverConfig struct { // rktHandle is returned from Start/Open as a handle to the PID type rktHandle struct { uuid string - env *env.TaskEnvironment + env *env.TaskEnv taskDir *allocdir.TaskDir pluginClient *plugin.Client executorPid int @@ -310,7 +310,8 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e cmdArgs = append(cmdArgs, fmt.Sprintf("--debug=%t", debug)) // Inject environment variables - for k, v := range d.taskEnv.EnvMap() { + taskEnv := d.envBuilder.Build() + for k, v := range taskEnv.Map() { cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%q", k, v)) } @@ -400,7 +401,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e // Add user passed arguments. if len(driverConfig.Args) != 0 { - parsed := d.taskEnv.ParseAndReplace(driverConfig.Args) + parsed := taskEnv.ParseAndReplace(driverConfig.Args) // Need to start arguments with "--" if len(parsed) > 0 { @@ -412,9 +413,11 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e } } - // Set the host environment variables. + // Set the host environment variables as by default they aren't set for + // image based drivers. filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - d.taskEnv.AppendHostEnvvars(filter) + d.envBuilder.SetHostEnvvars(filter) + taskEnv = d.envBuilder.Build() pluginLogFile := filepath.Join(ctx.TaskDir.Dir, fmt.Sprintf("%s-executor.out", task.Name)) executorConfig := &dstructs.ExecutorConfig{ @@ -427,7 +430,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, + TaskEnv: taskEnv, Driver: "rkt", AllocID: d.DriverContext.allocID, Task: task, @@ -477,7 +480,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e maxKill := d.DriverContext.config.MaxKillTimeout h := &rktHandle{ uuid: uuid, - env: d.taskEnv, + env: taskEnv, taskDir: ctx.TaskDir, pluginClient: pluginClient, executor: execIntf, @@ -519,7 +522,7 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error // Return a driver handle h := &rktHandle{ uuid: id.UUID, - env: d.taskEnv, + env: d.envBuilder.Build(), taskDir: ctx.TaskDir, pluginClient: pluginClient, executorPid: id.ExecutorPid, diff --git a/client/getter/getter.go b/client/getter/getter.go index cff9f965e..6b0394e98 100644 --- a/client/getter/getter.go +++ b/client/getter/getter.go @@ -51,8 +51,7 @@ func getClient(src, dst string) *gg.Client { } // getGetterUrl returns the go-getter URL to download the artifact. -func getGetterUrl(taskEnv *env.TaskEnvironment, artifact *structs.TaskArtifact) (string, error) { - taskEnv.Build() +func getGetterUrl(taskEnv *env.TaskEnv, artifact *structs.TaskArtifact) (string, error) { source := taskEnv.ReplaceEnv(artifact.GetterSource) // Handle an invalid URL when given a go-getter url such as @@ -85,7 +84,7 @@ func getGetterUrl(taskEnv *env.TaskEnvironment, artifact *structs.TaskArtifact) } // GetArtifact downloads an artifact into the specified task directory. -func GetArtifact(taskEnv *env.TaskEnvironment, artifact *structs.TaskArtifact, taskDir string) error { +func GetArtifact(taskEnv *env.TaskEnv, artifact *structs.TaskArtifact, taskDir string) error { url, err := getGetterUrl(taskEnv, artifact) if err != nil { return newGetError(artifact.GetterSource, err, false) diff --git a/client/task_runner.go b/client/task_runner.go index 7afc1958a..9cf18586c 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -85,9 +85,8 @@ type TaskRunner struct { task *structs.Task taskDir *allocdir.TaskDir - // taskEnv is the environment variables of the task - taskEnv *env.TaskEnvironment - taskEnvLock sync.Mutex + // envBuilder is used to build the task's environment + envBuilder *env.Builder // updateCh is used to receive updated versions of the allocation updateCh chan *structs.Allocation @@ -215,6 +214,15 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, } restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type) + // Initialize the environment builder + envBuilder := env.NewBuilder(config.Node, task.Name). + SetTaskMeta(alloc.Job.CombinedTaskMeta(alloc.TaskGroup, task.Name)). + SetJobName(alloc.Job.Name). + SetRegion(config.Region). + MergeEnvvars(task.Env). + SetResources(task.Resources). + SetAlloc(alloc) + tc := &TaskRunner{ config: config, stateDB: stateDB, @@ -224,6 +232,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, alloc: alloc, task: task, taskDir: taskDir, + envBuilder: envBuilder, createdResources: driver.NewCreatedResources(), consul: consulClient, vaultClient: vaultClient, @@ -310,11 +319,6 @@ func (r *TaskRunner) RestoreState() (string, error) { r.payloadRendered = snap.PayloadRendered r.setCreatedResources(snap.CreatedResources) - if err := r.setTaskEnv(); err != nil { - return "", fmt.Errorf("client: failed to create task environment for task %q in allocation %q: %v", - r.task.Name, r.alloc.ID, err) - } - if r.task.Vault != nil { // Read the token from the secret directory tokenPath := filepath.Join(r.taskDir.SecretsDir, vaultTokenFile) @@ -480,35 +484,8 @@ func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { r.updater(r.task.Name, state, event) } -// setTaskEnv sets the task environment. It returns an error if it could not be -// created. -func (r *TaskRunner) setTaskEnv() error { - r.taskEnvLock.Lock() - defer r.taskEnvLock.Unlock() - - taskEnv, err := driver.GetTaskEnv(r.taskDir, r.config.Node, - r.task.Copy(), r.alloc, r.config, r.vaultFuture.Get()) - if err != nil { - return err - } - r.taskEnv = taskEnv - return nil -} - -// getTaskEnv returns the task environment -func (r *TaskRunner) getTaskEnv() *env.TaskEnvironment { - r.taskEnvLock.Lock() - defer r.taskEnvLock.Unlock() - return r.taskEnv -} - // createDriver makes a driver for the task func (r *TaskRunner) createDriver() (driver.Driver, error) { - env := r.getTaskEnv() - if env == nil { - return nil, fmt.Errorf("task environment not made for task %q in allocation %q", r.task.Name, r.alloc.ID) - } - // Create a task-specific event emitter callback to expose minimal // state to drivers eventEmitter := func(m string, args ...interface{}) { @@ -517,13 +494,34 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) { r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg)) } - driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, env, eventEmitter) - driver, err := driver.NewDriver(r.task.Driver, driverCtx) + driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, r.envBuilder, eventEmitter) + d, err := driver.NewDriver(r.task.Driver, driverCtx) if err != nil { return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v", r.task.Driver, r.alloc.ID, err) } - return driver, err + + // Set driver-specific environment variables + switch d.FSIsolation() { + case cstructs.FSIsolationNone: + // Use host paths + r.envBuilder.SetAllocDir(r.taskDir.SharedAllocDir) + r.envBuilder.SetTaskLocalDir(r.taskDir.LocalDir) + r.envBuilder.SetSecretsDir(r.taskDir.SecretsDir) + default: + // filesystem isolation; use container paths + r.envBuilder.SetAllocDir(allocdir.SharedAllocContainerPath) + r.envBuilder.SetTaskLocalDir(allocdir.TaskLocalContainerPath) + r.envBuilder.SetSecretsDir(allocdir.TaskSecretsContainerPath) + } + + // Set the host environment variables for non-image based drivers + if d.FSIsolation() != cstructs.FSIsolationImage { + filter := strings.Split(r.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") + r.envBuilder.SetHostEnvvars(filter) + } + + return d, err } // Run is a long running routine used to manage the task @@ -532,15 +530,6 @@ func (r *TaskRunner) Run() { r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", r.task.Name, r.alloc.ID) - // Create the initial environment, this will be recreated if a Vault token - // is needed - if err := r.setTaskEnv(); err != nil { - r.setState( - structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err)) - return - } - if err := r.validateTask(); err != nil { r.setState( structs.TaskStateDead, @@ -613,8 +602,9 @@ func (r *TaskRunner) validateTask() error { } // Validate the Service names + taskEnv := r.envBuilder.Build() for i, service := range r.task.Services { - name := r.taskEnv.ReplaceEnv(service.Name) + name := taskEnv.ReplaceEnv(service.Name) if err := service.ValidateName(name); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("service (%d) failed validation: %v", i, err)) } @@ -851,12 +841,7 @@ func (r *TaskRunner) writeToken(token string) error { func (r *TaskRunner) updatedTokenHandler() { // Update the tasks environment - if err := r.setTaskEnv(); err != nil { - r.setState( - structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) - return - } + r.envBuilder.SetVaultToken(r.vaultFuture.Get(), r.task.Vault.Env) if r.templateManager != nil { r.templateManager.Stop() @@ -864,7 +849,7 @@ func (r *TaskRunner) updatedTokenHandler() { // Create a new templateManager var err error r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, - r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.getTaskEnv()) + r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder.Build()) if err != nil { err := fmt.Errorf("failed to build task's template manager: %v", err) r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) @@ -888,14 +873,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) { return } r.logger.Printf("[DEBUG] client: retrieved Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID) - } - - if err := r.setTaskEnv(); err != nil { - r.setState( - structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) - resultCh <- false - return + r.envBuilder.SetVaultToken(r.vaultFuture.Get(), r.task.Vault.Env) } // If the job is a dispatch job and there is a payload write it to disk @@ -932,6 +910,8 @@ func (r *TaskRunner) prestart(resultCh chan bool) { } for { + taskEnv := r.envBuilder.Build() + r.persistLock.Lock() downloaded := r.artifactsDownloaded r.persistLock.Unlock() @@ -940,7 +920,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) { if !downloaded && len(r.task.Artifacts) > 0 { r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts)) for _, artifact := range r.task.Artifacts { - if err := getter.GetArtifact(r.getTaskEnv(), artifact, r.taskDir.Dir); err != nil { + if err := getter.GetArtifact(taskEnv, artifact, r.taskDir.Dir); err != nil { wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err) r.logger.Printf("[DEBUG] client: %v", wrapped) r.setState(structs.TaskStatePending, @@ -971,7 +951,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) { if r.templateManager == nil { var err error r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, - r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.getTaskEnv()) + r.config, r.vaultFuture.Get(), r.taskDir.Dir, taskEnv) if err != nil { err := fmt.Errorf("failed to build task's template manager: %v", err) r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) @@ -1339,10 +1319,10 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) { // startTask creates the driver, task dir, and starts the task. func (r *TaskRunner) startTask() error { - // Env vars may have been updated prior to task starting, so update the - // env vars before starting the task - if err := r.setTaskEnv(); err != nil { - return fmt.Errorf("failed updating environment before starting task: %v", err) + // Load any env templates into environment + if err := r.loadTemplateEnv(); err != nil { + //FIXME should we soft fail here? + return fmt.Errorf("failed to load env vars from templates: %v", err) } // Create a driver @@ -1398,6 +1378,37 @@ func (r *TaskRunner) startTask() error { return nil } +// loadTemplateEnv loads task environment variables from templates. +func (r *TaskRunner) loadTemplateEnv() error { + var merr multierror.Error + all := make(map[string]string) + for _, tmpl := range r.task.Templates { + if !tmpl.Envvars { + continue + } + f, err := os.Open(filepath.Join(r.taskDir.Dir, tmpl.DestPath)) + if err != nil { + r.logger.Printf("[DEBUG] client: cannot load env vars from %q", tmpl.DestPath) + // It's not an error for the template to not be rendered yet + if !os.IsNotExist(err) { + merr.Errors = append(merr.Errors, err) + } + continue + } + defer f.Close() + vars, err := parseEnvFile(f) + if err != nil { + merr.Errors = append(merr.Errors, err) + continue + } + for k, v := range vars { + all[k] = v + } + } + r.envBuilder.SetTemplateEnv(all) + return merr.ErrorOrNil() +} + // registerServices and checks with Consul. func (r *TaskRunner) registerServices(d driver.Driver, h driver.ScriptExecutor) error { var exec driver.ScriptExecutor @@ -1405,13 +1416,13 @@ func (r *TaskRunner) registerServices(d driver.Driver, h driver.ScriptExecutor) // Allow set the script executor if the driver supports it exec = h } - interpolateServices(r.getTaskEnv(), r.task) + interpolateServices(r.envBuilder.Build(), r.task) return r.consul.RegisterTask(r.alloc.ID, r.task, exec) } // interpolateServices interpolates tags in a service and checks with values from the // task's environment. -func interpolateServices(taskEnv *env.TaskEnvironment, task *structs.Task) { +func interpolateServices(taskEnv *env.TaskEnv, task *structs.Task) { for _, service := range task.Services { for _, check := range service.Checks { check.Name = taskEnv.ReplaceEnv(check.Name) @@ -1580,11 +1591,7 @@ func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, ol // Allow set the script executor if the driver supports it exec = h } - newTaskEnv, err := driver.GetTaskEnv(r.taskDir, r.config.Node, new, newAlloc, r.config, r.vaultFuture.Get()) - if err != nil { - return err - } - interpolateServices(newTaskEnv, new) + interpolateServices(r.envBuilder.Build(), new) return r.consul.UpdateTask(r.alloc.ID, old, new, exec) } diff --git a/client/util.go b/client/util.go index 32f765550..8c6c0657e 100644 --- a/client/util.go +++ b/client/util.go @@ -1,8 +1,11 @@ package client import ( + "bufio" + "bytes" "encoding/json" "fmt" + "io" "io/ioutil" "math/rand" @@ -86,3 +89,38 @@ func pre060RestoreState(path string, data interface{}) error { } return nil } + +// parseEnvFile and return a map of the environment variables suitable for +// TaskEnvironment.AppendEnvvars or an error. +// +// See nomad/structs#Template.Envvars comment for format. +func parseEnvFile(r io.Reader) (map[string]string, error) { + vars := make(map[string]string, 50) + lines := 0 + scanner := bufio.NewScanner(r) + for scanner.Scan() { + lines++ + buf := scanner.Bytes() + if len(buf) == 0 { + // Skip empty lines + continue + } + if buf[0] == '#' { + // Skip lines starting with a # + continue + } + n := bytes.IndexByte(buf, '=') + if n == -1 { + return nil, fmt.Errorf("error on line %d: no '=' sign: %q", lines, string(buf)) + } + if len(buf) > n { + vars[string(buf[0:n])] = string(buf[n+1 : len(buf)]) + } else { + vars[string(buf[0:n])] = "" + } + } + if err := scanner.Err(); err != nil { + return nil, err + } + return vars, nil +}