From ee17940dfe98e2857b09814d5f4a86bdc850021d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 29 Nov 2016 16:39:36 -0800 Subject: [PATCH 01/10] Add Driver.Prestart method The Driver.Prestart method currently does very little but lays the foundation for where lifecycle plugins can interleave execution _after_ task environment setup but _before_ the task starts. Currently Prestart does two things: * Any driver specific task environment building * Download Docker images This change also attaches a TaskEvent emitter to Drivers, so they can emit events during task initialization. --- api/tasks.go | 44 ++++++++++++++-------------- client/client.go | 2 +- client/driver/docker.go | 53 ++++++++++++++++++++++------------ client/driver/docker_test.go | 45 +++++++++++++++++++++++++++-- client/driver/driver.go | 30 ++++++++++--------- client/driver/driver_test.go | 6 +++- client/driver/exec.go | 11 ++++--- client/driver/exec_test.go | 21 ++++++++++++++ client/driver/java.go | 11 ++++--- client/driver/java_test.go | 15 ++++++++++ client/driver/lxc.go | 4 +++ client/driver/lxc_test.go | 6 ++++ client/driver/mock_driver.go | 4 +++ client/driver/qemu.go | 4 +++ client/driver/raw_exec.go | 11 ++++--- client/driver/raw_exec_test.go | 18 ++++++++++++ client/driver/rkt.go | 11 ++++--- client/driver/rkt_test.go | 22 ++++++++++++++ client/task_runner.go | 30 ++++++++++++++++--- command/alloc_status.go | 2 ++ nomad/structs/structs.go | 12 ++++++++ 21 files changed, 284 insertions(+), 78 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index 2ca804e62..8aa078232 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -252,30 +252,32 @@ const ( TaskSiblingFailed = "Sibling task failed" TaskSignaling = "Signaling" TaskRestartSignal = "Restart Signaled" + TaskInitializing = "Initializing" ) // TaskEvent is an event that effects the state of a task and contains meta-data // appropriate to the events type. type TaskEvent struct { - Type string - Time int64 - FailsTask bool - RestartReason string - SetupError string - DriverError string - ExitCode int - Signal int - Message string - KillReason string - KillTimeout time.Duration - KillError string - StartDelay int64 - DownloadError string - ValidationError string - DiskLimit int64 - DiskSize int64 - FailedSibling string - VaultError string - TaskSignalReason string - TaskSignal string + Type string + Time int64 + FailsTask bool + RestartReason string + SetupError string + DriverError string + ExitCode int + Signal int + Message string + KillReason string + KillTimeout time.Duration + KillError string + StartDelay int64 + DownloadError string + ValidationError string + DiskLimit int64 + DiskSize int64 + FailedSibling string + VaultError string + TaskSignalReason string + TaskSignal string + InitializationMessage string } diff --git a/client/client.go b/client/client.go index e691b972c..d147eb745 100644 --- a/client/client.go +++ b/client/client.go @@ -792,7 +792,7 @@ func (c *Client) setupDrivers() error { var avail []string var skipped []string - driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil) + driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil, nil) for name := range driver.BuiltinDrivers { // Skip fingerprinting drivers that are not in the whitelist if it is // enabled. diff --git a/client/driver/docker.go b/client/driver/docker.go index 0dacbac10..b0078b7e6 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -91,6 +91,10 @@ const ( type DockerDriver struct { DriverContext + + imageID string + waitClient *docker.Client + driverConfig *DockerDriverConfig } type DockerDriverAuth struct { @@ -339,31 +343,29 @@ func (d *DockerDriver) Abilities() DriverAbilities { } } -func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { +func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) error { // Set environment variables. d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath). SetTaskLocalDir(allocdir.TaskLocalContainerPath).SetSecretsDir(allocdir.TaskSecretsContainerPath).Build() driverConfig, err := NewDockerDriverConfig(task, d.taskEnv) if err != nil { - return nil, err + return err } - cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true) - taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] if !ok { - return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + return fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) } // Initialize docker API clients client, waitClient, err := d.dockerClients() if err != nil { - return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) + return fmt.Errorf("Failed to connect to docker daemon: %s", err) } if err := d.createImage(driverConfig, client, taskDir); err != nil { - return nil, err + return err } image := driverConfig.ImageName @@ -371,14 +373,27 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle dockerImage, err := client.InspectImage(image) if err != nil { d.logger.Printf("[ERR] driver.docker: failed getting image id for %s: %s", image, err) - return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err) + return fmt.Errorf("Failed to determine image id for `%s`: %s", image, err) } d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID) + // Set state needed by Start() + d.imageID = dockerImage.ID + d.waitClient = waitClient + d.driverConfig = driverConfig + return nil +} + +func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { bin, err := discover.NomadExecutable() if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) } + + taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] + if !ok { + return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + } pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) pluginConfig := &plugin.ClientConfig{ Cmd: exec.Command(bin, "executor", pluginLogFile), @@ -404,9 +419,9 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle // Only launch syslog server if we're going to use it! syslogAddr := "" - if runtime.GOOS == "darwin" && len(driverConfig.Logging) == 0 { + if runtime.GOOS == "darwin" && len(d.driverConfig.Logging) == 0 { d.logger.Printf("[DEBUG] driver.docker: disabling syslog driver as Docker for Mac workaround") - } else if len(driverConfig.Logging) == 0 || driverConfig.Logging[0].Type == "syslog" { + } else if len(d.driverConfig.Logging) == 0 || d.driverConfig.Logging[0].Type == "syslog" { ss, err := exec.LaunchSyslogServer() if err != nil { pluginClient.Kill() @@ -415,11 +430,11 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle syslogAddr = ss.Addr } - config, err := d.createContainerConfig(ctx, task, driverConfig, syslogAddr) + config, err := d.createContainerConfig(ctx, task, d.driverConfig, syslogAddr) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err) + d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", d.imageID, err) pluginClient.Kill() - return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err) + return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", d.imageID, err) } container, rerr := d.createContainer(config) @@ -432,17 +447,17 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle d.logger.Printf("[INFO] driver.docker: created container %s", container.ID) + cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true) + // We don't need to start the container if the container is already running // since we don't create containers which are already present on the host // and are running if !container.State.Running { // Start the container - err := d.startContainer(container) - if err != nil { + if err := client.StartContainer(container.ID, container.HostConfig); err != nil { d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err) pluginClient.Kill() - err.Err = fmt.Sprintf("Failed to start container %s: %s", container.ID, err) - return nil, err + return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err) } d.logger.Printf("[INFO] driver.docker: started container %s", container.ID) } else { @@ -454,12 +469,12 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle maxKill := d.DriverContext.config.MaxKillTimeout h := &DockerHandle{ client: client, - waitClient: waitClient, + waitClient: d.waitClient, executor: exec, pluginClient: pluginClient, cleanupImage: cleanupImage, logger: d.logger, - imageID: dockerImage.ID, + imageID: d.imageID, containerID: container.ID, version: d.config.Version, killTimeout: GetKillTimeout(task.KillTimeout, maxKill), diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 69cd9f01c..d54e57acc 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -101,6 +101,10 @@ func dockerSetup(t *testing.T, task *structs.Task) (*docker.Client, DriverHandle driver := NewDockerDriver(driverCtx) copyImage(execCtx, task, "busybox.tar", t) + if err := driver.Prestart(execCtx, task); err != nil { + execCtx.AllocDir.Destroy() + t.Fatalf("error in prestart: %v", err) + } handle, err := driver.Start(execCtx, task) if err != nil { execCtx.AllocDir.Destroy() @@ -167,6 +171,9 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { d := NewDockerDriver(driverCtx) copyImage(execCtx, task, "busybox.tar", t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -256,6 +263,9 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) { // Copy the image into the task's directory copyImage(execCtx, task, "busybox.tar", t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -316,9 +326,9 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewDockerDriver(driverCtx) - _, err := d.Start(execCtx, task) + err := d.Prestart(execCtx, task) if err == nil { - t.Fatalf("want err: %v", err) + t.Fatalf("want error in prestart: %v", err) } if rerr, ok := err.(*structs.RecoverableError); !ok { @@ -366,6 +376,9 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { d := NewDockerDriver(driverCtx) copyImage(execCtx, task, "busybox.tar", t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -456,6 +469,9 @@ func TestDockerDriver_StartN(t *testing.T) { d := NewDockerDriver(driverCtx) copyImage(execCtx, task, "busybox.tar", t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart #%d: %v", idx+1, err) + } handles[idx], err = d.Start(execCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) @@ -513,6 +529,9 @@ func TestDockerDriver_StartNVersions(t *testing.T) { copyImage(execCtx, task, "busybox_musl.tar", t) copyImage(execCtx, task, "busybox_glibc.tar", t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart #%d: %v", idx+1, err) + } handles[idx], err = d.Start(execCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) @@ -804,6 +823,10 @@ func TestDockerDriver_User(t *testing.T) { defer execCtx.AllocDir.Destroy() copyImage(execCtx, task, "busybox.tar", t) + if err := driver.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } + // It should fail because the user "alice" does not exist on the given // image. handle, err := driver.Start(execCtx, task) @@ -953,6 +976,9 @@ done fmt.Errorf("Failed to write data") } + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -1035,7 +1061,11 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str t.Fatalf("Failed to get task env: %v", err) } - driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv) + logger := testLogger() + emitter := func(m string, args ...interface{}) { + logger.Printf("[EVENT] "+m, args...) + } + driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv, emitter) driver := NewDockerDriver(driverCtx) copyImage(execCtx, task, "busybox.tar", t) @@ -1058,6 +1088,9 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, _, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() + if err := driver.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } if _, err := driver.Start(execCtx, task); err == nil { t.Fatalf("Started driver successfully when volumes should have been disabled.") } @@ -1068,6 +1101,9 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, fn, cleanup := setupDockerVolumes(t, cfg, ".") defer cleanup() + if err := driver.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -1106,6 +1142,9 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) { task, driver, execCtx, hostpath, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() + if err := driver.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("Failed to start docker driver: %v", err) diff --git a/client/driver/driver.go b/client/driver/driver.go index 6e7a9d010..5d45ca4c8 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -51,6 +51,10 @@ type Driver interface { // Drivers must support the fingerprint interface for detection fingerprint.Fingerprint + // Prestart prepares the task environment and performs expensive + // intialization steps like downloading images. + Prestart(*ExecContext, *structs.Task) error + // Start is used to being task execution Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) @@ -70,6 +74,9 @@ type DriverAbilities struct { SendSignals bool } +// LogEventFn is a callback which allows Drivers to emit task events. +type LogEventFn func(message string, args ...interface{}) + // DriverContext is a means to inject dependencies such as loggers, configs, and // node attributes into a Driver without having to change the Driver interface // each time we do it. Used in conjection with Factory, above. @@ -79,18 +86,14 @@ type DriverContext struct { logger *log.Logger node *structs.Node taskEnv *env.TaskEnvironment + + emitEvent LogEventFn } // NewEmptyDriverContext returns a DriverContext with all fields set to their // zero value. func NewEmptyDriverContext() *DriverContext { - return &DriverContext{ - taskName: "", - config: nil, - node: nil, - logger: nil, - taskEnv: nil, - } + return &DriverContext{} } // NewDriverContext initializes a new DriverContext with the specified fields. @@ -98,13 +101,14 @@ func NewEmptyDriverContext() *DriverContext { // private to the driver. If we want to change this later we can gorename all of // the fields in DriverContext. func NewDriverContext(taskName string, config *config.Config, node *structs.Node, - logger *log.Logger, taskEnv *env.TaskEnvironment) *DriverContext { + logger *log.Logger, taskEnv *env.TaskEnvironment, eventEmitter LogEventFn) *DriverContext { return &DriverContext{ - taskName: taskName, - config: config, - node: node, - logger: logger, - taskEnv: taskEnv, + taskName: taskName, + config: config, + node: node, + logger: logger, + taskEnv: taskEnv, + emitEvent: eventEmitter, } } diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 6ca271301..903689966 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -89,7 +89,11 @@ func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) { return nil, nil } - driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv) + logger := testLogger() + emitter := func(m string, args ...interface{}) { + logger.Printf("[EVENT] "+m, args...) + } + driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, logger, taskEnv, emitter) return driverCtx, execCtx } diff --git a/client/driver/exec.go b/client/driver/exec.go index 89c1061bb..2bc39f113 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -92,6 +92,13 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } +func (d *ExecDriver) Prestart(execctx *ExecContext, task *structs.Task) error { + // Set the host environment variables. + filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") + d.taskEnv.AppendHostEnvvars(filter) + return nil +} + func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig ExecDriverConfig if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { @@ -104,10 +111,6 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } - // Set the host environment variables. - filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - d.taskEnv.AppendHostEnvvars(filter) - // Get the task directory for storing the executor logs. taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] if !ok { diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 125ff1de6..0a1fa9c8b 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -65,6 +65,9 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -105,6 +108,9 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -165,6 +171,9 @@ func TestExecDriver_Start_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -215,6 +224,9 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -265,6 +277,9 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -327,6 +342,9 @@ done fmt.Errorf("Failed to write data") } + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -388,6 +406,9 @@ func TestExecDriverUser(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err == nil { handle.Kill() diff --git a/client/driver/java.go b/client/driver/java.go index 770e1292c..9f3a92701 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -163,16 +163,19 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, return true, nil } +func (d *JavaDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + // Set the host environment variables. + filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") + d.taskEnv.AppendHostEnvvars(filter) + return nil +} + func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig JavaDriverConfig if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { return nil, err } - // Set the host environment variables. - filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - d.taskEnv.AppendHostEnvvars(filter) - taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] if !ok { return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) diff --git a/client/driver/java_test.go b/client/driver/java_test.go index 466589715..225458540 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -92,6 +92,9 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { dst, _ := execCtx.AllocDir.TaskDirs[task.Name] copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -142,6 +145,9 @@ func TestJavaDriver_Start_Wait(t *testing.T) { dst, _ := execCtx.AllocDir.TaskDirs[task.Name] copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -204,6 +210,9 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { dst, _ := execCtx.AllocDir.TaskDirs[task.Name] copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -262,6 +271,9 @@ func TestJavaDriver_Signal(t *testing.T) { dst, _ := execCtx.AllocDir.TaskDirs[task.Name] copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -317,6 +329,9 @@ func TestJavaDriverUser(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewJavaDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err == nil { handle.Kill() diff --git a/client/driver/lxc.go b/client/driver/lxc.go index 5aae27cb5..7e867decd 100644 --- a/client/driver/lxc.go +++ b/client/driver/lxc.go @@ -168,6 +168,10 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e return true, nil } +func (d *LxcDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + return nil +} + // Start starts the LXC Driver func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig LxcDriverConfig diff --git a/client/driver/lxc_test.go b/client/driver/lxc_test.go index d8df51873..fab36075e 100644 --- a/client/driver/lxc_test.go +++ b/client/driver/lxc_test.go @@ -69,6 +69,9 @@ func TestLxcDriver_Start_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewLxcDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -141,6 +144,9 @@ func TestLxcDriver_Open_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewLxcDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index e13cad580..9ad11579b 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -75,6 +75,10 @@ func (d *MockDriver) Abilities() DriverAbilities { } } +func (d *MockDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + return nil +} + // Start starts the mock driver func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig MockDriverConfig diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 0d1346fd4..0c916a377 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -135,6 +135,10 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, return true, nil } +func (d *QemuDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + return nil +} + // Run an existing Qemu image. Start() will pull down an existing, valid Qemu // image and save it to the Drivers Allocation Dir func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 2c8a6c6e5..d881dfddc 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -107,6 +107,13 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo return false, nil } +func (d *RawExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + // Set the host environment variables. + filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") + d.taskEnv.AppendHostEnvvars(filter) + return nil +} + func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig ExecDriverConfig if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { @@ -125,10 +132,6 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return nil, err } - // Set the host environment variables. - filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - d.taskEnv.AppendHostEnvvars(filter) - bin, err := discover.NomadExecutable() if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index 0103a7007..2b8d4f4fa 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -75,6 +75,9 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRawExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -120,6 +123,9 @@ func TestRawExecDriver_Start_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRawExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -170,6 +176,9 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRawExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -219,6 +228,9 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRawExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -268,6 +280,9 @@ func TestRawExecDriverUser(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRawExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err == nil { handle.Kill() @@ -313,6 +328,9 @@ done fmt.Errorf("Failed to write data") } + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 8e100dd3c..6e1683ab7 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -206,6 +206,13 @@ func (d *RktDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } +func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath) + d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath) + d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath) + return nil +} + // Run an existing Rkt image. func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig RktDriverConfig @@ -289,10 +296,6 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e cmdArgs = append(cmdArgs, fmt.Sprintf("--debug=%t", debug)) // Inject environment variables - d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath) - d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath) - d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath) - d.taskEnv.Build() for k, v := range d.taskEnv.EnvMap() { cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%v", k, v)) } diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index 9a3ec06bc..514bb23ce 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -98,6 +98,9 @@ func TestRktDriver_Start_DNS(t *testing.T) { d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -145,6 +148,9 @@ func TestRktDriver_Start_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -202,6 +208,9 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -252,6 +261,7 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) { "-c", fmt.Sprintf(`echo -n %s > foo/%s`, string(exp), file), }, + "net": []string{"none"}, "volumes": []string{fmt.Sprintf("%s:/foo", tmpvol)}, }, LogConfig: &structs.LogConfig{ @@ -268,6 +278,9 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -326,6 +339,9 @@ func TestRktDriverUser(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err == nil { handle.Kill() @@ -364,6 +380,9 @@ func TestRktTrustPrefix(t *testing.T) { d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err == nil { handle.Kill() @@ -438,6 +457,9 @@ func TestRktDriver_PortsMapping(t *testing.T) { d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) diff --git a/client/task_runner.go b/client/task_runner.go index 4c068c570..f059d2764 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -346,7 +346,15 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) { return nil, fmt.Errorf("task environment not made for task %q in allocation %q", r.task.Name, r.alloc.ID) } - driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env) + // Create a task-specific event emitter callback to expose minimal + // state to drivers + eventEmitter := func(m string, args ...interface{}) { + msg := fmt.Sprintf(m, args...) + r.logger.Printf("[DEBUG] client: initialization event for alloc %q: %s", r.alloc.ID, msg) + r.setState("", structs.NewTaskEvent(structs.TaskInitializing).SetInitializationMessage(msg)) + } + + driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env, eventEmitter) driver, err := driver.NewDriver(r.task.Driver, driverCtx) if err != nil { return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v", @@ -1019,17 +1027,31 @@ func (r *TaskRunner) startTask() error { // Create a driver driver, err := r.createDriver() if err != nil { - return fmt.Errorf("failed to create driver of task '%s' for alloc '%s': %v", + return fmt.Errorf("failed to create driver of task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err) } + // Run prestart + if err := driver.Prestart(r.ctx, r.task); err != nil { + wrapped := fmt.Errorf("failed to initialize task %q for alloc %q: %v", + r.task.Name, r.alloc.ID, err) + + r.logger.Printf("[WARN] client: %v", wrapped) + + if rerr, ok := err.(*structs.RecoverableError); ok { + return structs.NewRecoverableError(wrapped, rerr.Recoverable) + } + + return wrapped + } + // Start the job handle, err := driver.Start(r.ctx, r.task) if err != nil { - wrapped := fmt.Errorf("failed to start task '%s' for alloc '%s': %v", + wrapped := fmt.Errorf("failed to start task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err) - r.logger.Printf("[INFO] client: %v", wrapped) + r.logger.Printf("[WARN] client: %v", wrapped) if rerr, ok := err.(*structs.RecoverableError); ok { return structs.NewRecoverableError(wrapped, rerr.Recoverable) diff --git a/command/alloc_status.go b/command/alloc_status.go index 4d4a9094d..128ef40cf 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -378,6 +378,8 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { } else { desc = "Task signaled to restart" } + case api.TaskInitializing: + desc = event.InitializationMessage } // Reverse order so we are sorted by time diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9f94bcacf..391c824fe 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2555,6 +2555,10 @@ const ( // TaskSiblingFailed indicates that a sibling task in the task group has // failed. TaskSiblingFailed = "Sibling task failed" + + // TaskInitializing indicates that a task is performing a potentially + // slow initialization action such as downloading a Docker image. + TaskInitializing = "Initializing" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -2613,6 +2617,9 @@ type TaskEvent struct { // TaskSignal is the signal that was sent to the task TaskSignal string + + // InitializationMessage indicates the initialization step being executed. + InitializationMessage string } func (te *TaskEvent) GoString() string { @@ -2741,6 +2748,11 @@ func (e *TaskEvent) SetVaultRenewalError(err error) *TaskEvent { return e } +func (e *TaskEvent) SetInitializationMessage(m string) *TaskEvent { + e.InitializationMessage = m + return e +} + // TaskArtifact is an artifact to download before running the task. type TaskArtifact struct { // GetterSource is the source to download an artifact using go-getter From 50934da2c4f45a07b386d3f82e6d1ca8371df1d3 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 20 Dec 2016 11:40:34 -0800 Subject: [PATCH 02/10] Emit "Downloading image" event --- client/driver/docker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/driver/docker.go b/client/driver/docker.go index b0078b7e6..68df26d42 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -950,6 +950,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke } } + d.emitEvent("Downloading image") err := client.PullImage(pullOptions, authOptions) if err != nil { d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) From 750d59773c27db5ca4bb510dc8f165befb1984e5 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 20 Dec 2016 11:51:09 -0800 Subject: [PATCH 03/10] Rename InitializationMessage to DriverMessage --- api/tasks.go | 46 ++++++++++++++++++++-------------------- client/task_runner.go | 4 ++-- command/alloc_status.go | 4 ++-- nomad/structs/structs.go | 15 +++++++------ 4 files changed, 35 insertions(+), 34 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index 8aa078232..c1b95c32e 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -238,6 +238,7 @@ type TaskState struct { const ( TaskSetupFailure = "Setup Failure" TaskDriverFailure = "Driver Failure" + TaskDriverMessage = "Driver" TaskReceived = "Received" TaskFailedValidation = "Failed Validation" TaskStarted = "Started" @@ -252,32 +253,31 @@ const ( TaskSiblingFailed = "Sibling task failed" TaskSignaling = "Signaling" TaskRestartSignal = "Restart Signaled" - TaskInitializing = "Initializing" ) // TaskEvent is an event that effects the state of a task and contains meta-data // appropriate to the events type. type TaskEvent struct { - Type string - Time int64 - FailsTask bool - RestartReason string - SetupError string - DriverError string - ExitCode int - Signal int - Message string - KillReason string - KillTimeout time.Duration - KillError string - StartDelay int64 - DownloadError string - ValidationError string - DiskLimit int64 - DiskSize int64 - FailedSibling string - VaultError string - TaskSignalReason string - TaskSignal string - InitializationMessage string + Type string + Time int64 + FailsTask bool + RestartReason string + SetupError string + DriverError string + DriverMessage string + ExitCode int + Signal int + Message string + KillReason string + KillTimeout time.Duration + KillError string + StartDelay int64 + DownloadError string + ValidationError string + DiskLimit int64 + DiskSize int64 + FailedSibling string + VaultError string + TaskSignalReason string + TaskSignal string } diff --git a/client/task_runner.go b/client/task_runner.go index f059d2764..46b47c1d0 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -350,8 +350,8 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) { // state to drivers eventEmitter := func(m string, args ...interface{}) { msg := fmt.Sprintf(m, args...) - r.logger.Printf("[DEBUG] client: initialization event for alloc %q: %s", r.alloc.ID, msg) - r.setState("", structs.NewTaskEvent(structs.TaskInitializing).SetInitializationMessage(msg)) + r.logger.Printf("[DEBUG] client: driver event for alloc %q: %s", r.alloc.ID, msg) + r.setState("", structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg)) } driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env, eventEmitter) diff --git a/command/alloc_status.go b/command/alloc_status.go index 128ef40cf..8117d4f34 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -378,8 +378,8 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { } else { desc = "Task signaled to restart" } - case api.TaskInitializing: - desc = event.InitializationMessage + case api.TaskDriverMessage: + desc = event.DriverMessage } // Reverse order so we are sorted by time diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 391c824fe..969bdec34 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2556,9 +2556,10 @@ const ( // failed. TaskSiblingFailed = "Sibling task failed" - // TaskInitializing indicates that a task is performing a potentially - // slow initialization action such as downloading a Docker image. - TaskInitializing = "Initializing" + // TaskDriverMessage is an informational event message emitted by + // drivers such as when they're performing a long running action like + // downloading an image. + TaskDriverMessage = "Driver" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -2618,8 +2619,8 @@ type TaskEvent struct { // TaskSignal is the signal that was sent to the task TaskSignal string - // InitializationMessage indicates the initialization step being executed. - InitializationMessage string + // DriverMessage indicates a driver action being taken. + DriverMessage string } func (te *TaskEvent) GoString() string { @@ -2748,8 +2749,8 @@ func (e *TaskEvent) SetVaultRenewalError(err error) *TaskEvent { return e } -func (e *TaskEvent) SetInitializationMessage(m string) *TaskEvent { - e.InitializationMessage = m +func (e *TaskEvent) SetDriverMessage(m string) *TaskEvent { + e.DriverMessage = m return e } From bbcc27ef1b14caaeada85710cb53d56002065fc2 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 20 Dec 2016 11:55:40 -0800 Subject: [PATCH 04/10] Use startContainer wrapper --- client/driver/docker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 68df26d42..de4e50947 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -454,7 +454,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle // and are running if !container.State.Running { // Start the container - if err := client.StartContainer(container.ID, container.HostConfig); err != nil { + if err := d.startContainer(container); err != nil { d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err) pluginClient.Kill() return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err) From 67028132333539bccc94a671a289f5e14fb32248 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 20 Dec 2016 11:57:26 -0800 Subject: [PATCH 05/10] Fix formatting of downloading image message --- client/driver/docker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index de4e50947..1f5a2e323 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -950,7 +950,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke } } - d.emitEvent("Downloading image") + d.emitEvent("Downloading image %s:%s", repo, tag) err := client.PullImage(pullOptions, authOptions) if err != nil { d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) From aaa70ab7b80645de41a9465198fde2cbae69751d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 20 Dec 2016 12:24:24 -0800 Subject: [PATCH 06/10] Append host env vars on every task env --- client/driver/driver.go | 8 +++++++- client/driver/exec.go | 5 ----- client/task_runner.go | 3 ++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/client/driver/driver.go b/client/driver/driver.go index 5d45ca4c8..4060fee56 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -5,6 +5,7 @@ import ( "log" "os" "path/filepath" + "strings" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -152,7 +153,8 @@ func NewExecContext(alloc *allocdir.AllocDir, allocID string) *ExecContext { // GetTaskEnv converts the alloc dir, the node, task and alloc into a // TaskEnvironment. func GetTaskEnv(allocDir *allocdir.AllocDir, node *structs.Node, - task *structs.Task, alloc *structs.Allocation, vaultToken string) (*env.TaskEnvironment, error) { + task *structs.Task, alloc *structs.Allocation, conf *config.Config, + vaultToken string) (*env.TaskEnvironment, error) { tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) env := env.NewTaskEnvironment(node). @@ -188,6 +190,10 @@ func GetTaskEnv(allocDir *allocdir.AllocDir, node *structs.Node, env.SetVaultToken(vaultToken, task.Vault.Env) } + // Set the host environment variables. + filter := strings.Split(conf.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") + env.AppendHostEnvvars(filter) + return env.Build(), nil } diff --git a/client/driver/exec.go b/client/driver/exec.go index 2bc39f113..1e99e885f 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -7,13 +7,11 @@ import ( "os" "os/exec" "path/filepath" - "strings" "time" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" dstructs "github.com/hashicorp/nomad/client/driver/structs" cstructs "github.com/hashicorp/nomad/client/structs" @@ -93,9 +91,6 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) { } func (d *ExecDriver) Prestart(execctx *ExecContext, task *structs.Task) error { - // Set the host environment variables. - filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - d.taskEnv.AppendHostEnvvars(filter) return nil } diff --git a/client/task_runner.go b/client/task_runner.go index 46b47c1d0..1d1e297a1 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -324,7 +324,8 @@ func (r *TaskRunner) setTaskEnv() error { r.taskEnvLock.Lock() defer r.taskEnvLock.Unlock() - taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task.Copy(), r.alloc, r.vaultFuture.Get()) + taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, + r.task.Copy(), r.alloc, r.config, r.vaultFuture.Get()) if err != nil { return err } From 5ad7eeaed551614d949ca0814845c4a9019dd4e0 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 20 Dec 2016 14:29:57 -0800 Subject: [PATCH 07/10] Remove unneeded waitClient field --- client/driver/docker.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 1f5a2e323..c7d7db8a4 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -93,7 +93,6 @@ type DockerDriver struct { DriverContext imageID string - waitClient *docker.Client driverConfig *DockerDriverConfig } @@ -359,7 +358,7 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) error { } // Initialize docker API clients - client, waitClient, err := d.dockerClients() + client, _, err := d.dockerClients() if err != nil { return fmt.Errorf("Failed to connect to docker daemon: %s", err) } @@ -379,7 +378,6 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) error { // Set state needed by Start() d.imageID = dockerImage.ID - d.waitClient = waitClient d.driverConfig = driverConfig return nil } @@ -469,7 +467,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle maxKill := d.DriverContext.config.MaxKillTimeout h := &DockerHandle{ client: client, - waitClient: d.waitClient, + waitClient: waitClient, executor: exec, pluginClient: pluginClient, cleanupImage: cleanupImage, From 630812bbdd0fb5789f60bf5189ee888f43a3e3fc Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 20 Dec 2016 14:37:18 -0800 Subject: [PATCH 08/10] lxc: Set image local env vars --- client/driver/lxc.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/driver/lxc.go b/client/driver/lxc.go index 7e867decd..afff9cba7 100644 --- a/client/driver/lxc.go +++ b/client/driver/lxc.go @@ -169,6 +169,9 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e } func (d *LxcDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath) + d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath) + d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath) return nil } From decee199780075ad5a41b834d536808f5cb79ba4 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 20 Dec 2016 14:37:35 -0800 Subject: [PATCH 09/10] Fix tests broken by TaskEnv change --- client/driver/docker_test.go | 2 +- client/driver/driver_test.go | 17 +++++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index d54e57acc..7ba95e8d5 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -1055,7 +1055,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str } } - taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, "") + taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, cfg, "") if err != nil { cleanup() t.Fatalf("Failed to get task env: %v", err) diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 903689966..a8b1742fd 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -84,7 +84,7 @@ func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) { alloc := mock.Alloc() execCtx := NewExecContext(allocDir, alloc.ID) - taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, "") + taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, cfg, "") if err != nil { return nil, nil } @@ -123,7 +123,7 @@ func TestDriver_GetTaskEnv(t *testing.T) { alloc := mock.Alloc() alloc.Name = "Bar" - env, err := GetTaskEnv(nil, nil, task, alloc, "") + env, err := GetTaskEnv(nil, nil, task, alloc, testConfig(), "") if err != nil { t.Fatalf("GetTaskEnv() failed: %v", err) } @@ -161,8 +161,17 @@ func TestDriver_GetTaskEnv(t *testing.T) { } act := env.EnvMap() - if !reflect.DeepEqual(act, exp) { - t.Fatalf("GetTaskEnv() returned %#v; want %#v", act, exp) + + // Since host env vars are included only ensure expected env vars are present + for expk, expv := range exp { + v, ok := act[expk] + if !ok { + t.Errorf("%q not found in task env", expk) + continue + } + if v != expv { + t.Errorf("Expected %s=%q but found %q", expk, expv, v) + } } } From 904543ef1bfe0bfa3ae11790c2b7e1b84ad59660 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 20 Dec 2016 16:14:42 -0800 Subject: [PATCH 10/10] Remove unneeded env building --- client/driver/java.go | 3 --- client/driver/raw_exec.go | 4 ---- 2 files changed, 7 deletions(-) diff --git a/client/driver/java.go b/client/driver/java.go index 9f3a92701..919c738db 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -164,9 +164,6 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, } func (d *JavaDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - // Set the host environment variables. - filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - d.taskEnv.AppendHostEnvvars(filter) return nil } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index d881dfddc..c83fde4b6 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -7,7 +7,6 @@ import ( "os" "os/exec" "path/filepath" - "strings" "time" "github.com/hashicorp/go-plugin" @@ -108,9 +107,6 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo } func (d *RawExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - // Set the host environment variables. - filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - d.taskEnv.AppendHostEnvvars(filter) return nil }