mirror of
https://github.com/kemko/nomad.git
synced 2026-01-09 20:05:42 +03:00
merge
This commit is contained in:
@@ -533,7 +533,7 @@ func (c *Client) setupDrivers() error {
|
||||
|
||||
var avail []string
|
||||
var skipped []string
|
||||
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger)
|
||||
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil)
|
||||
for name := range driver.BuiltinDrivers {
|
||||
// Skip fingerprinting drivers that are not in the whitelist if it is
|
||||
// enabled.
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
"github.com/hashicorp/nomad/helper/args"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
@@ -188,9 +187,12 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri
|
||||
}
|
||||
|
||||
// Create environment variables.
|
||||
env := TaskEnvironmentVariables(ctx, task)
|
||||
env.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName))
|
||||
env.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal))
|
||||
taskEnv, err := GetTaskEnv(ctx.AllocDir, d.node, task)
|
||||
if err != nil {
|
||||
return c, err
|
||||
}
|
||||
taskEnv.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName))
|
||||
taskEnv.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal))
|
||||
|
||||
config := &docker.Config{
|
||||
Image: driverConfig.ImageName,
|
||||
@@ -343,20 +345,20 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri
|
||||
d.logger.Printf("[DEBUG] driver.docker: exposed port %s", containerPort)
|
||||
}
|
||||
|
||||
// This was set above in a call to TaskEnvironmentVariables but if we
|
||||
// This was set above in a call to GetTaskEnv but if we
|
||||
// have mapped any ports we will need to override them.
|
||||
//
|
||||
// TODO refactor the implementation in TaskEnvironmentVariables to match
|
||||
// TODO refactor the implementation in GetTaskEnv to match
|
||||
// the 0.2 ports world view. Docker seems to be the only place where
|
||||
// this is actually needed, but this is kinda hacky.
|
||||
if len(driverConfig.PortMap) > 0 {
|
||||
env.SetPorts(network.MapLabelToValues(driverConfig.PortMap))
|
||||
taskEnv.SetPorts(network.MapLabelToValues(driverConfig.PortMap))
|
||||
}
|
||||
hostConfig.PortBindings = publishedPorts
|
||||
config.ExposedPorts = exposedPorts
|
||||
}
|
||||
|
||||
parsedArgs := args.ParseAndReplace(driverConfig.Args, env.Map())
|
||||
parsedArgs := taskEnv.ParseAndReplace(driverConfig.Args)
|
||||
|
||||
// If the user specified a custom command to run as their entrypoint, we'll
|
||||
// inject it here.
|
||||
@@ -376,7 +378,7 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri
|
||||
d.logger.Printf("[DEBUG] driver.docker: applied labels on the container: %+v", config.Labels)
|
||||
}
|
||||
|
||||
config.Env = env.List()
|
||||
config.Env = taskEnv.EnvList()
|
||||
|
||||
containerName := fmt.Sprintf("%s-%s", task.Name, ctx.AllocID)
|
||||
d.logger.Printf("[DEBUG] driver.docker: setting container name to: %s", containerName)
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver/environment"
|
||||
"github.com/hashicorp/nomad/client/driver/env"
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
||||
@@ -66,18 +66,21 @@ type DriverContext struct {
|
||||
config *config.Config
|
||||
logger *log.Logger
|
||||
node *structs.Node
|
||||
taskEnv *env.TaskEnvironment
|
||||
}
|
||||
|
||||
// NewDriverContext initializes a new DriverContext with the specified fields.
|
||||
// This enables other packages to create DriverContexts but keeps the fields
|
||||
// private to the driver. If we want to change this later we can gorename all of
|
||||
// the fields in DriverContext.
|
||||
func NewDriverContext(taskName string, config *config.Config, node *structs.Node, logger *log.Logger) *DriverContext {
|
||||
func NewDriverContext(taskName string, config *config.Config, node *structs.Node,
|
||||
logger *log.Logger, taskEnv *env.TaskEnvironment) *DriverContext {
|
||||
return &DriverContext{
|
||||
taskName: taskName,
|
||||
config: config,
|
||||
node: node,
|
||||
logger: logger,
|
||||
taskEnv: taskEnv,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,17 +128,18 @@ func NewExecContext(alloc *allocdir.AllocDir, allocID string) *ExecContext {
|
||||
return &ExecContext{AllocDir: alloc, AllocID: allocID}
|
||||
}
|
||||
|
||||
// TaskEnvironmentVariables converts exec context and task configuration into a
|
||||
// GetTaskEnv converts the alloc dir, the node and task configuration into a
|
||||
// TaskEnvironment.
|
||||
func TaskEnvironmentVariables(ctx *ExecContext, task *structs.Task) environment.TaskEnvironment {
|
||||
env := environment.NewTaskEnivornment()
|
||||
env.SetMeta(task.Meta)
|
||||
func GetTaskEnv(alloc *allocdir.AllocDir, node *structs.Node, task *structs.Task) (*env.TaskEnvironment, error) {
|
||||
env := env.NewTaskEnvironment(node).
|
||||
SetMeta(task.Meta).
|
||||
SetEnvvars(task.Env)
|
||||
|
||||
if ctx.AllocDir != nil {
|
||||
env.SetAllocDir(ctx.AllocDir.SharedDir)
|
||||
taskdir, ok := ctx.AllocDir.TaskDirs[task.Name]
|
||||
if alloc != nil {
|
||||
env.SetAllocDir(alloc.SharedDir)
|
||||
taskdir, ok := alloc.TaskDirs[task.Name]
|
||||
if !ok {
|
||||
// TODO: Update this to return an error
|
||||
return nil, fmt.Errorf("failed to get task directory for task %q", task.Name)
|
||||
}
|
||||
|
||||
env.SetTaskLocalDir(filepath.Join(taskdir, allocdir.TaskLocal))
|
||||
@@ -152,11 +156,7 @@ func TaskEnvironmentVariables(ctx *ExecContext, task *structs.Task) environment.
|
||||
}
|
||||
}
|
||||
|
||||
if task.Env != nil {
|
||||
env.SetEnvvars(task.Env)
|
||||
}
|
||||
|
||||
return env
|
||||
return env.Build(), nil
|
||||
}
|
||||
|
||||
func mapMergeStrInt(maps ...map[string]int) map[string]int {
|
||||
|
||||
@@ -81,7 +81,6 @@ func TestDriver_KillTimeout(t *testing.T) {
|
||||
|
||||
func TestDriver_TaskEnvironmentVariables(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := &ExecContext{}
|
||||
task := &structs.Task{
|
||||
Env: map[string]string{
|
||||
"HELLO": "world",
|
||||
@@ -104,7 +103,10 @@ func TestDriver_TaskEnvironmentVariables(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
env := TaskEnvironmentVariables(ctx, task)
|
||||
env, err := GetTaskEnv(nil, nil, task)
|
||||
if err != nil {
|
||||
t.Fatalf("GetTaskEnv() failed: %v", err)
|
||||
}
|
||||
exp := map[string]string{
|
||||
"NOMAD_CPU_LIMIT": "1000",
|
||||
"NOMAD_MEMORY_LIMIT": "500",
|
||||
@@ -123,7 +125,7 @@ func TestDriver_TaskEnvironmentVariables(t *testing.T) {
|
||||
|
||||
act := env.Map()
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("TaskEnvironmentVariables(%#v, %#v) returned %#v; want %#v", ctx, task, act, exp)
|
||||
t.Fatalf("GetTaskEnv() returned %#v; want %#v", act, exp)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,169 +0,0 @@
|
||||
package environment
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// A set of environment variables that are exported by each driver.
|
||||
const (
|
||||
// The path to the alloc directory that is shared across tasks within a task
|
||||
// group.
|
||||
AllocDir = "NOMAD_ALLOC_DIR"
|
||||
|
||||
// The path to the tasks local directory where it can store data that is
|
||||
// persisted to the alloc is removed.
|
||||
TaskLocalDir = "NOMAD_TASK_DIR"
|
||||
|
||||
// The tasks memory limit in MBs.
|
||||
MemLimit = "NOMAD_MEMORY_LIMIT"
|
||||
|
||||
// The tasks limit in MHz.
|
||||
CpuLimit = "NOMAD_CPU_LIMIT"
|
||||
|
||||
// The IP address for the task.
|
||||
TaskIP = "NOMAD_IP"
|
||||
|
||||
// Prefix for passing both dynamic and static port allocations to
|
||||
// tasks.
|
||||
// E.g. $NOMAD_PORT_1 or $NOMAD_PORT_http
|
||||
PortPrefix = "NOMAD_PORT_"
|
||||
|
||||
// Prefix for passing task meta data.
|
||||
MetaPrefix = "NOMAD_META_"
|
||||
)
|
||||
|
||||
var (
|
||||
nomadVars = []string{AllocDir, TaskLocalDir, MemLimit, CpuLimit, TaskIP, PortPrefix, MetaPrefix}
|
||||
)
|
||||
|
||||
type TaskEnvironment map[string]string
|
||||
|
||||
func NewTaskEnivornment() TaskEnvironment {
|
||||
return make(map[string]string)
|
||||
}
|
||||
|
||||
// ParseFromList parses a list of strings with NAME=value pairs and returns a
|
||||
// TaskEnvironment.
|
||||
func ParseFromList(envVars []string) (TaskEnvironment, error) {
|
||||
t := NewTaskEnivornment()
|
||||
|
||||
for _, pair := range envVars {
|
||||
// Start the search from the second byte to skip a possible leading
|
||||
// "=". Cmd.exe on Windows creates some special environment variables
|
||||
// that start with an "=" and they can be properly retrieved by OS
|
||||
// functions so we should handle them properly here.
|
||||
idx := strings.Index(pair[1:], "=")
|
||||
if idx == -1 {
|
||||
return nil, fmt.Errorf("Couldn't parse environment variable: %v", pair)
|
||||
}
|
||||
idx++ // adjust for slice offset above
|
||||
t[pair[:idx]] = pair[idx+1:]
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// Returns a list of strings with NAME=value pairs.
|
||||
func (t TaskEnvironment) List() []string {
|
||||
env := []string{}
|
||||
for k, v := range t {
|
||||
env = append(env, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
|
||||
return env
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) Map() map[string]string {
|
||||
return t
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) SetAllocDir(dir string) {
|
||||
t[AllocDir] = dir
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) ClearAllocDir() {
|
||||
delete(t, AllocDir)
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) SetTaskLocalDir(dir string) {
|
||||
t[TaskLocalDir] = dir
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) ClearTaskLocalDir() {
|
||||
delete(t, TaskLocalDir)
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) SetMemLimit(limit int) {
|
||||
t[MemLimit] = strconv.Itoa(limit)
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) ClearMemLimit() {
|
||||
delete(t, MemLimit)
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) SetCpuLimit(limit int) {
|
||||
t[CpuLimit] = strconv.Itoa(limit)
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) ClearCpuLimit() {
|
||||
delete(t, CpuLimit)
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) SetTaskIp(ip string) {
|
||||
t[TaskIP] = ip
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) ClearTaskIp() {
|
||||
delete(t, TaskIP)
|
||||
}
|
||||
|
||||
// Takes a map of port labels to their port value.
|
||||
func (t TaskEnvironment) SetPorts(ports map[string]int) {
|
||||
for label, port := range ports {
|
||||
t[fmt.Sprintf("%s%s", PortPrefix, label)] = strconv.Itoa(port)
|
||||
}
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) ClearPorts() {
|
||||
for k, _ := range t {
|
||||
if strings.HasPrefix(k, PortPrefix) {
|
||||
delete(t, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Takes a map of meta values to be passed to the task. The keys are capatilized
|
||||
// when the environent variable is set.
|
||||
func (t TaskEnvironment) SetMeta(m map[string]string) {
|
||||
for k, v := range m {
|
||||
t[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) ClearMeta() {
|
||||
for k, _ := range t {
|
||||
if strings.HasPrefix(k, MetaPrefix) {
|
||||
delete(t, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) SetEnvvars(m map[string]string) {
|
||||
for k, v := range m {
|
||||
t[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (t TaskEnvironment) ClearEnvvars() {
|
||||
OUTER:
|
||||
for k, _ := range t {
|
||||
for _, nomadPrefix := range nomadVars {
|
||||
if strings.HasPrefix(k, nomadPrefix) {
|
||||
continue OUTER
|
||||
}
|
||||
}
|
||||
delete(t, k)
|
||||
}
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
package environment
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestEnvironment_AsList(t *testing.T) {
|
||||
env := NewTaskEnivornment()
|
||||
env.SetTaskIp("127.0.0.1")
|
||||
env.SetPorts(map[string]int{"http": 80})
|
||||
env.SetMeta(map[string]string{"foo": "baz"})
|
||||
|
||||
act := env.List()
|
||||
exp := []string{"NOMAD_IP=127.0.0.1", "NOMAD_PORT_http=80", "NOMAD_META_FOO=baz"}
|
||||
sort.Strings(act)
|
||||
sort.Strings(exp)
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("env.List() returned %v; want %v", act, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnvironment_ParseFromList(t *testing.T) {
|
||||
input := []string{
|
||||
"foo=bar",
|
||||
"BAZ=baM",
|
||||
"bar=emb=edded", // This can be done in multiple OSes.
|
||||
"=ExitCode=00000000", // A Windows cmd.exe annoyance
|
||||
}
|
||||
env, err := ParseFromList(input)
|
||||
if err != nil {
|
||||
t.Fatalf("ParseFromList(%#v) failed: %v", input, err)
|
||||
}
|
||||
|
||||
exp := map[string]string{
|
||||
"foo": "bar",
|
||||
"BAZ": "baM",
|
||||
"bar": "emb=edded",
|
||||
"=ExitCode": "00000000",
|
||||
}
|
||||
|
||||
if len(env) != len(exp) {
|
||||
t.Errorf("ParseFromList(%#v) has length %v; want %v", input, len(env), len(exp))
|
||||
}
|
||||
|
||||
for k, v := range exp {
|
||||
if actV, ok := env[k]; !ok {
|
||||
t.Errorf("ParseFromList(%#v) doesn't contain expected %v", input, k)
|
||||
} else if actV != v {
|
||||
t.Errorf("ParseFromList(%#v) has incorrect value for %v; got %v; want %v", input, k, actV, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnvironment_ClearEnvvars(t *testing.T) {
|
||||
env := NewTaskEnivornment()
|
||||
env.SetTaskIp("127.0.0.1")
|
||||
env.SetEnvvars(map[string]string{"foo": "baz", "bar": "bang"})
|
||||
|
||||
act := env.List()
|
||||
exp := []string{"NOMAD_IP=127.0.0.1", "bar=bang", "foo=baz"}
|
||||
sort.Strings(act)
|
||||
sort.Strings(exp)
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("env.List() returned %v; want %v", act, exp)
|
||||
}
|
||||
|
||||
// Clear the environent variables.
|
||||
env.ClearEnvvars()
|
||||
|
||||
act = env.List()
|
||||
exp = []string{"NOMAD_IP=127.0.0.1"}
|
||||
sort.Strings(act)
|
||||
sort.Strings(exp)
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("env.List() returned %v; want %v", act, exp)
|
||||
}
|
||||
}
|
||||
@@ -92,17 +92,15 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
}
|
||||
}
|
||||
|
||||
// Get the environment variables.
|
||||
envVars := TaskEnvironmentVariables(ctx, task)
|
||||
|
||||
// Setup the command
|
||||
cmd := executor.Command(command, driverConfig.Args...)
|
||||
execCtx := executor.NewExecutorContext(d.taskEnv)
|
||||
cmd := executor.Command(execCtx, command, driverConfig.Args...)
|
||||
if err := cmd.Limit(task.Resources); err != nil {
|
||||
return nil, fmt.Errorf("failed to constrain resources: %s", err)
|
||||
}
|
||||
|
||||
// Populate environment variables
|
||||
cmd.Command().Env = envVars.List()
|
||||
cmd.Command().Env = d.taskEnv.EnvList()
|
||||
|
||||
if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil {
|
||||
return nil, fmt.Errorf("failed to configure task directory: %v", err)
|
||||
@@ -136,7 +134,8 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
||||
}
|
||||
|
||||
// Find the process
|
||||
cmd, err := executor.OpenId(id.ExecutorId)
|
||||
execCtx := executor.NewExecutorContext(d.taskEnv)
|
||||
cmd, err := executor.OpenId(execCtx, id.ExecutorId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err)
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
||||
"github.com/hashicorp/nomad/client/driver/env"
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
)
|
||||
|
||||
@@ -75,9 +76,23 @@ type Executor interface {
|
||||
Command() *exec.Cmd
|
||||
}
|
||||
|
||||
// Command is a mirror of exec.Command that returns a platform-specific Executor
|
||||
func Command(name string, args ...string) Executor {
|
||||
executor := NewExecutor()
|
||||
// ExecutorContext is a means to inject dependencies such as loggers, configs, and
|
||||
// node attributes into a Driver without having to change the Driver interface
|
||||
// each time we do it. Used in conjection with Factory, above.
|
||||
type ExecutorContext struct {
|
||||
taskEnv *env.TaskEnvironment
|
||||
}
|
||||
|
||||
// NewExecutorContext initializes a new DriverContext with the specified fields.
|
||||
func NewExecutorContext(taskEnv *env.TaskEnvironment) *ExecutorContext {
|
||||
return &ExecutorContext{
|
||||
taskEnv: taskEnv,
|
||||
}
|
||||
}
|
||||
|
||||
// Command returns a platform-specific Executor
|
||||
func Command(ctx *ExecutorContext, name string, args ...string) Executor {
|
||||
executor := NewExecutor(ctx)
|
||||
SetCommand(executor, name, args)
|
||||
return executor
|
||||
}
|
||||
@@ -98,8 +113,8 @@ func SetCommand(e Executor, name string, args []string) {
|
||||
|
||||
// OpenId is similar to executor.Command but will attempt to reopen with the
|
||||
// passed ID.
|
||||
func OpenId(id string) (Executor, error) {
|
||||
executor := NewExecutor()
|
||||
func OpenId(ctx *ExecutorContext, id string) (Executor, error) {
|
||||
executor := NewExecutor(ctx)
|
||||
err := executor.Open(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -11,9 +11,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/driver/environment"
|
||||
"github.com/hashicorp/nomad/client/driver/spawn"
|
||||
"github.com/hashicorp/nomad/helper/args"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
@@ -22,6 +20,7 @@ import (
|
||||
// BasicExecutor should work everywhere, and as a result does not include
|
||||
// any resource restrictions or runas capabilities.
|
||||
type BasicExecutor struct {
|
||||
*ExecutorContext
|
||||
cmd exec.Cmd
|
||||
spawn *spawn.Spawner
|
||||
taskName string
|
||||
@@ -29,8 +28,8 @@ type BasicExecutor struct {
|
||||
allocDir string
|
||||
}
|
||||
|
||||
func NewBasicExecutor() Executor {
|
||||
return &BasicExecutor{}
|
||||
func NewBasicExecutor(ctx *ExecutorContext) Executor {
|
||||
return &BasicExecutor{ExecutorContext: ctx}
|
||||
}
|
||||
|
||||
func (e *BasicExecutor) Limit(resources *structs.Resources) error {
|
||||
@@ -56,13 +55,8 @@ func (e *BasicExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocD
|
||||
func (e *BasicExecutor) Start() error {
|
||||
// Parse the commands arguments and replace instances of Nomad environment
|
||||
// variables.
|
||||
envVars, err := environment.ParseFromList(e.cmd.Env)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.cmd.Path = args.ReplaceEnv(e.cmd.Path, envVars.Map())
|
||||
e.cmd.Args = args.ParseAndReplace(e.cmd.Args, envVars.Map())
|
||||
e.cmd.Path = e.taskEnv.ReplaceEnv(e.cmd.Path)
|
||||
e.cmd.Args = e.taskEnv.ParseAndReplace(e.cmd.Args)
|
||||
|
||||
spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status"))
|
||||
e.spawn = spawn.NewSpawner(spawnState)
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"github.com/hashicorp/nomad/client/driver/environment"
|
||||
"github.com/hashicorp/nomad/client/driver/spawn"
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
"github.com/hashicorp/nomad/helper/args"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
@@ -42,16 +41,17 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
func NewExecutor() Executor {
|
||||
return NewLinuxExecutor()
|
||||
func NewExecutor(ctx *ExecutorContext) Executor {
|
||||
return NewLinuxExecutor(ctx)
|
||||
}
|
||||
|
||||
func NewLinuxExecutor() Executor {
|
||||
return &LinuxExecutor{}
|
||||
func NewLinuxExecutor(ctx *ExecutorContext) Executor {
|
||||
return &LinuxExecutor{ExecutorContext: ctx}
|
||||
}
|
||||
|
||||
// Linux executor is designed to run on linux kernel 2.8+.
|
||||
type LinuxExecutor struct {
|
||||
*ExecutorContext
|
||||
cmd exec.Cmd
|
||||
user *user.User
|
||||
|
||||
@@ -161,13 +161,8 @@ func (e *LinuxExecutor) Start() error {
|
||||
|
||||
// Parse the commands arguments and replace instances of Nomad environment
|
||||
// variables.
|
||||
envVars, err := environment.ParseFromList(e.cmd.Env)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.cmd.Path = args.ReplaceEnv(e.cmd.Path, envVars.Map())
|
||||
e.cmd.Args = args.ParseAndReplace(e.cmd.Args, envVars.Map())
|
||||
e.cmd.Path = e.taskEnv.ReplaceEnv(e.cmd.Path, envVars.Map())
|
||||
e.cmd.Args = e.taskEnv.ParseAndReplace(e.cmd.Args, envVars.Map())
|
||||
|
||||
spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status"))
|
||||
e.spawn = spawn.NewSpawner(spawnState)
|
||||
|
||||
@@ -2,11 +2,13 @@
|
||||
|
||||
package executor
|
||||
|
||||
func NewExecutor() Executor {
|
||||
return &UniversalExecutor{BasicExecutor{}}
|
||||
func NewExecutor(ctx *ExecutorContext) Executor {
|
||||
return &UniversalExecutor{
|
||||
BasicExecutor: NewBasicExecutor(ctx).(*BasicExecutor),
|
||||
}
|
||||
}
|
||||
|
||||
// UniversalExecutor wraps the BasicExecutor
|
||||
type UniversalExecutor struct {
|
||||
BasicExecutor
|
||||
*BasicExecutor
|
||||
}
|
||||
|
||||
@@ -125,9 +125,6 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
|
||||
jarName := filepath.Base(path)
|
||||
|
||||
// Get the environment variables.
|
||||
envVars := TaskEnvironmentVariables(ctx, task)
|
||||
|
||||
args := []string{}
|
||||
// Look for jvm options
|
||||
if len(driverConfig.JvmOpts) != 0 {
|
||||
@@ -143,10 +140,11 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
|
||||
// Setup the command
|
||||
// Assumes Java is in the $PATH, but could probably be detected
|
||||
cmd := executor.Command("java", args...)
|
||||
execCtx := executor.NewExecutorContext(d.taskEnv)
|
||||
cmd := executor.Command(execCtx, "java", args...)
|
||||
|
||||
// Populate environment variables
|
||||
cmd.Command().Env = envVars.List()
|
||||
cmd.Command().Env = d.taskEnv.EnvList()
|
||||
|
||||
if err := cmd.Limit(task.Resources); err != nil {
|
||||
return nil, fmt.Errorf("failed to constrain resources: %s", err)
|
||||
@@ -185,7 +183,8 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
||||
}
|
||||
|
||||
// Find the process
|
||||
cmd, err := executor.OpenId(id.ExecutorId)
|
||||
execCtx := executor.NewExecutorContext(d.taskEnv)
|
||||
cmd, err := executor.OpenId(execCtx, id.ExecutorId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err)
|
||||
}
|
||||
|
||||
@@ -184,8 +184,8 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
}
|
||||
|
||||
// Setup the command
|
||||
cmd := executor.NewBasicExecutor()
|
||||
executor.SetCommand(cmd, args[0], args[1:])
|
||||
execCtx := executor.NewExecutorContext(d.taskEnv)
|
||||
cmd := executor.Command(execCtx, args[0], args[1:]...)
|
||||
if err := cmd.Limit(task.Resources); err != nil {
|
||||
return nil, fmt.Errorf("failed to constrain resources: %s", err)
|
||||
}
|
||||
@@ -225,8 +225,9 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
||||
}
|
||||
|
||||
// Find the process
|
||||
cmd := executor.NewBasicExecutor()
|
||||
if err := cmd.Open(id.ExecutorId); err != nil {
|
||||
execCtx := executor.NewExecutorContext(d.taskEnv)
|
||||
cmd, err := executor.OpenId(execCtx, id.ExecutorId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -90,18 +90,16 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
|
||||
}
|
||||
}
|
||||
|
||||
// Get the environment variables.
|
||||
envVars := TaskEnvironmentVariables(ctx, task)
|
||||
|
||||
// Setup the command
|
||||
cmd := executor.NewBasicExecutor()
|
||||
execCtx := executor.NewExecutorContext(d.taskEnv)
|
||||
cmd := executor.NewBasicExecutor(execCtx)
|
||||
executor.SetCommand(cmd, command, driverConfig.Args)
|
||||
if err := cmd.Limit(task.Resources); err != nil {
|
||||
return nil, fmt.Errorf("failed to constrain resources: %s", err)
|
||||
}
|
||||
|
||||
// Populate environment variables
|
||||
cmd.Command().Env = envVars.List()
|
||||
cmd.Command().Env = d.taskEnv.EnvList()
|
||||
|
||||
if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil {
|
||||
return nil, fmt.Errorf("failed to configure task directory: %v", err)
|
||||
@@ -135,7 +133,8 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
|
||||
}
|
||||
|
||||
// Find the process
|
||||
cmd := executor.NewBasicExecutor()
|
||||
execCtx := executor.NewExecutorContext(d.taskEnv)
|
||||
cmd := executor.NewBasicExecutor(execCtx)
|
||||
if err := cmd.Open(id.ExecutorId); err != nil {
|
||||
return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err)
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
"github.com/hashicorp/nomad/helper/args"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
@@ -148,13 +147,10 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
||||
cmdArgs = append(cmdArgs, "--insecure-options=all")
|
||||
}
|
||||
|
||||
// Inject the environment variables.
|
||||
envVars := TaskEnvironmentVariables(ctx, task)
|
||||
d.taskEnv.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName)).
|
||||
SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)).Build()
|
||||
|
||||
envVars.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName))
|
||||
envVars.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal))
|
||||
|
||||
for k, v := range envVars.Map() {
|
||||
for k, v := range d.taskEnv.EnvMap() {
|
||||
cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%v", k, v))
|
||||
}
|
||||
|
||||
@@ -188,7 +184,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
||||
|
||||
// Add user passed arguments.
|
||||
if len(driverConfig.Args) != 0 {
|
||||
parsed := args.ParseAndReplace(driverConfig.Args, envVars.Map())
|
||||
parsed := d.taskEnv.ParseAndReplace(driverConfig.Args)
|
||||
|
||||
// Need to start arguments with "--"
|
||||
if len(parsed) > 0 {
|
||||
|
||||
@@ -172,12 +172,22 @@ func (r *TaskRunner) setState(state string, event *structs.TaskEvent) {
|
||||
|
||||
// createDriver makes a driver for the task
|
||||
func (r *TaskRunner) createDriver() (driver.Driver, error) {
|
||||
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger)
|
||||
taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
|
||||
r.task.Driver, r.alloc.ID, err)
|
||||
r.logger.Printf("[ERR] client: %s", err)
|
||||
return nil, err
|
||||
|
||||
}
|
||||
|
||||
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, taskEnv)
|
||||
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
|
||||
r.task.Driver, r.alloc.ID, err)
|
||||
r.logger.Printf("[ERR] client: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
return driver, err
|
||||
}
|
||||
|
||||
@@ -3,33 +3,23 @@ package args
|
||||
import "regexp"
|
||||
|
||||
var (
|
||||
envRe = regexp.MustCompile(`\$({[a-zA-Z0-9_]+}|[a-zA-Z0-9_]+)`)
|
||||
envRe = regexp.MustCompile(`\$({[a-zA-Z0-9_\.]+}|[a-zA-Z0-9_\.]+)`)
|
||||
)
|
||||
|
||||
// ParseAndReplace takes the user supplied args and a map of environment
|
||||
// variables. It replaces any instance of an environment variable in the args
|
||||
// with the actual value.
|
||||
func ParseAndReplace(args []string, env map[string]string) []string {
|
||||
replaced := make([]string, len(args))
|
||||
for i, arg := range args {
|
||||
replaced[i] = ReplaceEnv(arg, env)
|
||||
}
|
||||
|
||||
return replaced
|
||||
}
|
||||
|
||||
// ReplaceEnv takes an arg and replaces all occurences of environment variables.
|
||||
// If the variable is found in the passed map it is replaced, otherwise the
|
||||
// original string is returned.
|
||||
func ReplaceEnv(arg string, env map[string]string) string {
|
||||
func ReplaceEnv(arg string, environents ...map[string]string) string {
|
||||
return envRe.ReplaceAllStringFunc(arg, func(arg string) string {
|
||||
stripped := arg[1:]
|
||||
if stripped[0] == '{' {
|
||||
stripped = stripped[1 : len(stripped)-1]
|
||||
}
|
||||
|
||||
if value, ok := env[stripped]; ok {
|
||||
return value
|
||||
for _, env := range environents {
|
||||
if value, ok := env[stripped]; ok {
|
||||
return value
|
||||
}
|
||||
}
|
||||
|
||||
return arg
|
||||
|
||||
@@ -7,45 +7,58 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
ipKey = "NOMAD_IP"
|
||||
ipVal = "127.0.0.1"
|
||||
portKey = "NOMAD_PORT_WEB"
|
||||
portVal = ":80"
|
||||
ipKey = "NOMAD_IP"
|
||||
ipVal = "127.0.0.1"
|
||||
portKey = "NOMAD_PORT_WEB"
|
||||
portVal = ":80"
|
||||
periodKey = "NOMAD.PERIOD"
|
||||
periodVal = "period"
|
||||
)
|
||||
|
||||
var (
|
||||
envVars = map[string]string{
|
||||
ipKey: ipVal,
|
||||
portKey: portVal,
|
||||
ipKey: ipVal,
|
||||
portKey: portVal,
|
||||
periodKey: periodVal,
|
||||
}
|
||||
)
|
||||
|
||||
func TestDriverArgs_ParseAndReplaceInvalidEnv(t *testing.T) {
|
||||
input := []string{"invalid", "$FOO"}
|
||||
exp := []string{"invalid", "$FOO"}
|
||||
act := ParseAndReplace(input, envVars)
|
||||
func TestArgs_ReplaceEnv_Invalid(t *testing.T) {
|
||||
input := "$FOO"
|
||||
exp := "$FOO"
|
||||
act := ReplaceEnv(input, envVars)
|
||||
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
t.Fatalf("ReplaceEnv(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDriverArgs_ParseAndReplaceValidEnv(t *testing.T) {
|
||||
input := []string{"nomad_ip", fmt.Sprintf(`"$%v"!`, ipKey)}
|
||||
exp := []string{"nomad_ip", fmt.Sprintf("\"%s\"!", ipVal)}
|
||||
act := ParseAndReplace(input, envVars)
|
||||
func TestArgs_ReplaceEnv_Valid(t *testing.T) {
|
||||
input := fmt.Sprintf(`"$%v"!`, ipKey)
|
||||
exp := fmt.Sprintf("\"%s\"!", ipVal)
|
||||
act := ReplaceEnv(input, envVars)
|
||||
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
t.Fatalf("ReplaceEnv(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDriverArgs_ParseAndReplaceChainedEnv(t *testing.T) {
|
||||
input := []string{"-foo", fmt.Sprintf("$%s$%s", ipKey, portKey)}
|
||||
exp := []string{"-foo", fmt.Sprintf("%s%s", ipVal, portVal)}
|
||||
act := ParseAndReplace(input, envVars)
|
||||
func TestArgs_ReplaceEnv_Period(t *testing.T) {
|
||||
input := fmt.Sprintf(`"$%v"!`, periodKey)
|
||||
exp := fmt.Sprintf("\"%s\"!", periodVal)
|
||||
act := ReplaceEnv(input, envVars)
|
||||
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("ParseAndReplace(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
t.Fatalf("ReplaceEnv(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestArgs_ReplaceEnv_Chained(t *testing.T) {
|
||||
input := fmt.Sprintf("$%s$%s", ipKey, portKey)
|
||||
exp := fmt.Sprintf("%s%s", ipVal, portVal)
|
||||
act := ReplaceEnv(input, envVars)
|
||||
|
||||
if !reflect.DeepEqual(act, exp) {
|
||||
t.Fatalf("ReplaceEnv(%v, %v) returned %#v; want %#v", input, envVars, act, exp)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user