diff --git a/api/tasks.go b/api/tasks.go index 2ca804e62..c1b95c32e 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -238,6 +238,7 @@ type TaskState struct { const ( TaskSetupFailure = "Setup Failure" TaskDriverFailure = "Driver Failure" + TaskDriverMessage = "Driver" TaskReceived = "Received" TaskFailedValidation = "Failed Validation" TaskStarted = "Started" @@ -263,6 +264,7 @@ type TaskEvent struct { RestartReason string SetupError string DriverError string + DriverMessage string ExitCode int Signal int Message string diff --git a/client/client.go b/client/client.go index 18fbb4ae8..d4b5b7f16 100644 --- a/client/client.go +++ b/client/client.go @@ -810,7 +810,7 @@ func (c *Client) setupDrivers() error { var avail []string var skipped []string - driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil) + driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil, nil) for name := range driver.BuiltinDrivers { // Skip fingerprinting drivers that are not in the whitelist if it is // enabled. diff --git a/client/driver/docker.go b/client/driver/docker.go index 6a443c034..c90279574 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -91,6 +91,9 @@ const ( type DockerDriver struct { DriverContext + + imageID string + driverConfig *DockerDriverConfig } type DockerDriverAuth struct { @@ -344,31 +347,29 @@ func (d *DockerDriver) Abilities() DriverAbilities { } } -func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { +func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) error { // Set environment variables. d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath). SetTaskLocalDir(allocdir.TaskLocalContainerPath).SetSecretsDir(allocdir.TaskSecretsContainerPath).Build() driverConfig, err := NewDockerDriverConfig(task, d.taskEnv) if err != nil { - return nil, err + return err } - cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true) - taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] if !ok { - return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + return fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) } // Initialize docker API clients - client, waitClient, err := d.dockerClients() + client, _, err := d.dockerClients() if err != nil { - return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) + return fmt.Errorf("Failed to connect to docker daemon: %s", err) } if err := d.createImage(driverConfig, client, taskDir); err != nil { - return nil, err + return err } image := driverConfig.ImageName @@ -376,14 +377,26 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle dockerImage, err := client.InspectImage(image) if err != nil { d.logger.Printf("[ERR] driver.docker: failed getting image id for %s: %s", image, err) - return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err) + return fmt.Errorf("Failed to determine image id for `%s`: %s", image, err) } d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID) + // Set state needed by Start() + d.imageID = dockerImage.ID + d.driverConfig = driverConfig + return nil +} + +func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { bin, err := discover.NomadExecutable() if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) } + + taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] + if !ok { + return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + } pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) pluginConfig := &plugin.ClientConfig{ Cmd: exec.Command(bin, "executor", pluginLogFile), @@ -409,9 +422,9 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle // Only launch syslog server if we're going to use it! syslogAddr := "" - if runtime.GOOS == "darwin" && len(driverConfig.Logging) == 0 { + if runtime.GOOS == "darwin" && len(d.driverConfig.Logging) == 0 { d.logger.Printf("[DEBUG] driver.docker: disabling syslog driver as Docker for Mac workaround") - } else if len(driverConfig.Logging) == 0 || driverConfig.Logging[0].Type == "syslog" { + } else if len(d.driverConfig.Logging) == 0 || d.driverConfig.Logging[0].Type == "syslog" { ss, err := exec.LaunchSyslogServer() if err != nil { pluginClient.Kill() @@ -420,11 +433,11 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle syslogAddr = ss.Addr } - config, err := d.createContainerConfig(ctx, task, driverConfig, syslogAddr) + config, err := d.createContainerConfig(ctx, task, d.driverConfig, syslogAddr) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err) + d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", d.imageID, err) pluginClient.Kill() - return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err) + return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", d.imageID, err) } container, rerr := d.createContainer(config) @@ -437,17 +450,17 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle d.logger.Printf("[INFO] driver.docker: created container %s", container.ID) + cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true) + // We don't need to start the container if the container is already running // since we don't create containers which are already present on the host // and are running if !container.State.Running { // Start the container - err := d.startContainer(container) - if err != nil { + if err := d.startContainer(container); err != nil { d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err) pluginClient.Kill() - err.Err = fmt.Sprintf("Failed to start container %s: %s", container.ID, err) - return nil, err + return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err) } d.logger.Printf("[INFO] driver.docker: started container %s", container.ID) } else { @@ -464,7 +477,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle pluginClient: pluginClient, cleanupImage: cleanupImage, logger: d.logger, - imageID: dockerImage.ID, + imageID: d.imageID, containerID: container.ID, version: d.config.Version, killTimeout: GetKillTimeout(task.KillTimeout, maxKill), @@ -955,6 +968,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke } } + d.emitEvent("Downloading image %s:%s", repo, tag) err := client.PullImage(pullOptions, authOptions) if err != nil { d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index ff90747d8..f743a7f25 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -97,6 +97,10 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie driver := NewDockerDriver(driverCtx) copyImage(execCtx, task, "busybox.tar", t) + if err := driver.Prestart(execCtx, task); err != nil { + execCtx.AllocDir.Destroy() + t.Fatalf("error in prestart: %v", err) + } handle, err := driver.Start(execCtx, task) if err != nil { execCtx.AllocDir.Destroy() @@ -175,6 +179,9 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { d := NewDockerDriver(driverCtx) copyImage(execCtx, task, "busybox.tar", t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -264,6 +271,9 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) { // Copy the image into the task's directory copyImage(execCtx, task, "busybox.tar", t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -324,9 +334,9 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewDockerDriver(driverCtx) - _, err := d.Start(execCtx, task) + err := d.Prestart(execCtx, task) if err == nil { - t.Fatalf("want err: %v", err) + t.Fatalf("want error in prestart: %v", err) } if rerr, ok := err.(*structs.RecoverableError); !ok { @@ -374,6 +384,9 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { d := NewDockerDriver(driverCtx) copyImage(execCtx, task, "busybox.tar", t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -464,6 +477,9 @@ func TestDockerDriver_StartN(t *testing.T) { d := NewDockerDriver(driverCtx) copyImage(execCtx, task, "busybox.tar", t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart #%d: %v", idx+1, err) + } handles[idx], err = d.Start(execCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) @@ -521,6 +537,9 @@ func TestDockerDriver_StartNVersions(t *testing.T) { copyImage(execCtx, task, "busybox_musl.tar", t) copyImage(execCtx, task, "busybox_glibc.tar", t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart #%d: %v", idx+1, err) + } handles[idx], err = d.Start(execCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) @@ -857,6 +876,10 @@ func TestDockerDriver_User(t *testing.T) { defer execCtx.AllocDir.Destroy() copyImage(execCtx, task, "busybox.tar", t) + if err := driver.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } + // It should fail because the user "alice" does not exist on the given // image. handle, err := driver.Start(execCtx, task) @@ -1006,6 +1029,9 @@ done fmt.Errorf("Failed to write data") } + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -1082,13 +1108,17 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str } } - taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, "") + taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, cfg, "") if err != nil { cleanup() t.Fatalf("Failed to get task env: %v", err) } - driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv) + logger := testLogger() + emitter := func(m string, args ...interface{}) { + logger.Printf("[EVENT] "+m, args...) + } + driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv, emitter) driver := NewDockerDriver(driverCtx) copyImage(execCtx, task, "busybox.tar", t) @@ -1111,6 +1141,9 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, _, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() + if err := driver.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } if _, err := driver.Start(execCtx, task); err == nil { t.Fatalf("Started driver successfully when volumes should have been disabled.") } @@ -1121,6 +1154,9 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, fn, cleanup := setupDockerVolumes(t, cfg, ".") defer cleanup() + if err := driver.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -1159,6 +1195,9 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) { task, driver, execCtx, hostpath, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() + if err := driver.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("Failed to start docker driver: %v", err) diff --git a/client/driver/driver.go b/client/driver/driver.go index 6e7a9d010..4060fee56 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -5,6 +5,7 @@ import ( "log" "os" "path/filepath" + "strings" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -51,6 +52,10 @@ type Driver interface { // Drivers must support the fingerprint interface for detection fingerprint.Fingerprint + // Prestart prepares the task environment and performs expensive + // intialization steps like downloading images. + Prestart(*ExecContext, *structs.Task) error + // Start is used to being task execution Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) @@ -70,6 +75,9 @@ type DriverAbilities struct { SendSignals bool } +// LogEventFn is a callback which allows Drivers to emit task events. +type LogEventFn func(message string, args ...interface{}) + // DriverContext is a means to inject dependencies such as loggers, configs, and // node attributes into a Driver without having to change the Driver interface // each time we do it. Used in conjection with Factory, above. @@ -79,18 +87,14 @@ type DriverContext struct { logger *log.Logger node *structs.Node taskEnv *env.TaskEnvironment + + emitEvent LogEventFn } // NewEmptyDriverContext returns a DriverContext with all fields set to their // zero value. func NewEmptyDriverContext() *DriverContext { - return &DriverContext{ - taskName: "", - config: nil, - node: nil, - logger: nil, - taskEnv: nil, - } + return &DriverContext{} } // NewDriverContext initializes a new DriverContext with the specified fields. @@ -98,13 +102,14 @@ func NewEmptyDriverContext() *DriverContext { // private to the driver. If we want to change this later we can gorename all of // the fields in DriverContext. func NewDriverContext(taskName string, config *config.Config, node *structs.Node, - logger *log.Logger, taskEnv *env.TaskEnvironment) *DriverContext { + logger *log.Logger, taskEnv *env.TaskEnvironment, eventEmitter LogEventFn) *DriverContext { return &DriverContext{ - taskName: taskName, - config: config, - node: node, - logger: logger, - taskEnv: taskEnv, + taskName: taskName, + config: config, + node: node, + logger: logger, + taskEnv: taskEnv, + emitEvent: eventEmitter, } } @@ -148,7 +153,8 @@ func NewExecContext(alloc *allocdir.AllocDir, allocID string) *ExecContext { // GetTaskEnv converts the alloc dir, the node, task and alloc into a // TaskEnvironment. func GetTaskEnv(allocDir *allocdir.AllocDir, node *structs.Node, - task *structs.Task, alloc *structs.Allocation, vaultToken string) (*env.TaskEnvironment, error) { + task *structs.Task, alloc *structs.Allocation, conf *config.Config, + vaultToken string) (*env.TaskEnvironment, error) { tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) env := env.NewTaskEnvironment(node). @@ -184,6 +190,10 @@ func GetTaskEnv(allocDir *allocdir.AllocDir, node *structs.Node, env.SetVaultToken(vaultToken, task.Vault.Env) } + // Set the host environment variables. + filter := strings.Split(conf.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") + env.AppendHostEnvvars(filter) + return env.Build(), nil } diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 6ca271301..a8b1742fd 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -84,12 +84,16 @@ func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) { alloc := mock.Alloc() execCtx := NewExecContext(allocDir, alloc.ID) - taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, "") + taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, cfg, "") if err != nil { return nil, nil } - driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv) + logger := testLogger() + emitter := func(m string, args ...interface{}) { + logger.Printf("[EVENT] "+m, args...) + } + driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, logger, taskEnv, emitter) return driverCtx, execCtx } @@ -119,7 +123,7 @@ func TestDriver_GetTaskEnv(t *testing.T) { alloc := mock.Alloc() alloc.Name = "Bar" - env, err := GetTaskEnv(nil, nil, task, alloc, "") + env, err := GetTaskEnv(nil, nil, task, alloc, testConfig(), "") if err != nil { t.Fatalf("GetTaskEnv() failed: %v", err) } @@ -157,8 +161,17 @@ func TestDriver_GetTaskEnv(t *testing.T) { } act := env.EnvMap() - if !reflect.DeepEqual(act, exp) { - t.Fatalf("GetTaskEnv() returned %#v; want %#v", act, exp) + + // Since host env vars are included only ensure expected env vars are present + for expk, expv := range exp { + v, ok := act[expk] + if !ok { + t.Errorf("%q not found in task env", expk) + continue + } + if v != expv { + t.Errorf("Expected %s=%q but found %q", expk, expv, v) + } } } diff --git a/client/driver/exec.go b/client/driver/exec.go index 89c1061bb..1e99e885f 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -7,13 +7,11 @@ import ( "os" "os/exec" "path/filepath" - "strings" "time" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" dstructs "github.com/hashicorp/nomad/client/driver/structs" cstructs "github.com/hashicorp/nomad/client/structs" @@ -92,6 +90,10 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } +func (d *ExecDriver) Prestart(execctx *ExecContext, task *structs.Task) error { + return nil +} + func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig ExecDriverConfig if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { @@ -104,10 +106,6 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } - // Set the host environment variables. - filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - d.taskEnv.AppendHostEnvvars(filter) - // Get the task directory for storing the executor logs. taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] if !ok { diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 125ff1de6..0a1fa9c8b 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -65,6 +65,9 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -105,6 +108,9 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -165,6 +171,9 @@ func TestExecDriver_Start_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -215,6 +224,9 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -265,6 +277,9 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -327,6 +342,9 @@ done fmt.Errorf("Failed to write data") } + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -388,6 +406,9 @@ func TestExecDriverUser(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err == nil { handle.Kill() diff --git a/client/driver/java.go b/client/driver/java.go index 770e1292c..919c738db 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -163,16 +163,16 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, return true, nil } +func (d *JavaDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + return nil +} + func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig JavaDriverConfig if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { return nil, err } - // Set the host environment variables. - filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - d.taskEnv.AppendHostEnvvars(filter) - taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] if !ok { return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) diff --git a/client/driver/java_test.go b/client/driver/java_test.go index 466589715..225458540 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -92,6 +92,9 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { dst, _ := execCtx.AllocDir.TaskDirs[task.Name] copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -142,6 +145,9 @@ func TestJavaDriver_Start_Wait(t *testing.T) { dst, _ := execCtx.AllocDir.TaskDirs[task.Name] copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -204,6 +210,9 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { dst, _ := execCtx.AllocDir.TaskDirs[task.Name] copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -262,6 +271,9 @@ func TestJavaDriver_Signal(t *testing.T) { dst, _ := execCtx.AllocDir.TaskDirs[task.Name] copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -317,6 +329,9 @@ func TestJavaDriverUser(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewJavaDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err == nil { handle.Kill() diff --git a/client/driver/lxc.go b/client/driver/lxc.go index 314f52924..9cb48ebb6 100644 --- a/client/driver/lxc.go +++ b/client/driver/lxc.go @@ -168,6 +168,13 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e return true, nil } +func (d *LxcDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath) + d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath) + d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath) + return nil +} + // Start starts the LXC Driver func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig LxcDriverConfig diff --git a/client/driver/lxc_test.go b/client/driver/lxc_test.go index d8df51873..fab36075e 100644 --- a/client/driver/lxc_test.go +++ b/client/driver/lxc_test.go @@ -69,6 +69,9 @@ func TestLxcDriver_Start_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewLxcDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -141,6 +144,9 @@ func TestLxcDriver_Open_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewLxcDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index e13cad580..9ad11579b 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -75,6 +75,10 @@ func (d *MockDriver) Abilities() DriverAbilities { } } +func (d *MockDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + return nil +} + // Start starts the mock driver func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig MockDriverConfig diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 0d1346fd4..0c916a377 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -135,6 +135,10 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, return true, nil } +func (d *QemuDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + return nil +} + // Run an existing Qemu image. Start() will pull down an existing, valid Qemu // image and save it to the Drivers Allocation Dir func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 2c8a6c6e5..c83fde4b6 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -7,7 +7,6 @@ import ( "os" "os/exec" "path/filepath" - "strings" "time" "github.com/hashicorp/go-plugin" @@ -107,6 +106,10 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo return false, nil } +func (d *RawExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + return nil +} + func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig ExecDriverConfig if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { @@ -125,10 +128,6 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return nil, err } - // Set the host environment variables. - filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") - d.taskEnv.AppendHostEnvvars(filter) - bin, err := discover.NomadExecutable() if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index 0103a7007..2b8d4f4fa 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -75,6 +75,9 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRawExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -120,6 +123,9 @@ func TestRawExecDriver_Start_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRawExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -170,6 +176,9 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRawExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -219,6 +228,9 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRawExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -268,6 +280,9 @@ func TestRawExecDriverUser(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRawExecDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err == nil { handle.Kill() @@ -313,6 +328,9 @@ done fmt.Errorf("Failed to write data") } + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("prestart err: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 8e100dd3c..6e1683ab7 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -206,6 +206,13 @@ func (d *RktDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } +func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) error { + d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath) + d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath) + d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath) + return nil +} + // Run an existing Rkt image. func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig RktDriverConfig @@ -289,10 +296,6 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e cmdArgs = append(cmdArgs, fmt.Sprintf("--debug=%t", debug)) // Inject environment variables - d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath) - d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath) - d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath) - d.taskEnv.Build() for k, v := range d.taskEnv.EnvMap() { cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%v", k, v)) } diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index 9a3ec06bc..514bb23ce 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -98,6 +98,9 @@ func TestRktDriver_Start_DNS(t *testing.T) { d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -145,6 +148,9 @@ func TestRktDriver_Start_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -202,6 +208,9 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -252,6 +261,7 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) { "-c", fmt.Sprintf(`echo -n %s > foo/%s`, string(exp), file), }, + "net": []string{"none"}, "volumes": []string{fmt.Sprintf("%s:/foo", tmpvol)}, }, LogConfig: &structs.LogConfig{ @@ -268,6 +278,9 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -326,6 +339,9 @@ func TestRktDriverUser(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err == nil { handle.Kill() @@ -364,6 +380,9 @@ func TestRktTrustPrefix(t *testing.T) { d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err == nil { handle.Kill() @@ -438,6 +457,9 @@ func TestRktDriver_PortsMapping(t *testing.T) { d := NewRktDriver(driverCtx) + if err := d.Prestart(execCtx, task); err != nil { + t.Fatalf("error in prestart: %v", err) + } handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) diff --git a/client/task_runner.go b/client/task_runner.go index 4c068c570..1d1e297a1 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -324,7 +324,8 @@ func (r *TaskRunner) setTaskEnv() error { r.taskEnvLock.Lock() defer r.taskEnvLock.Unlock() - taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task.Copy(), r.alloc, r.vaultFuture.Get()) + taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, + r.task.Copy(), r.alloc, r.config, r.vaultFuture.Get()) if err != nil { return err } @@ -346,7 +347,15 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) { return nil, fmt.Errorf("task environment not made for task %q in allocation %q", r.task.Name, r.alloc.ID) } - driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env) + // Create a task-specific event emitter callback to expose minimal + // state to drivers + eventEmitter := func(m string, args ...interface{}) { + msg := fmt.Sprintf(m, args...) + r.logger.Printf("[DEBUG] client: driver event for alloc %q: %s", r.alloc.ID, msg) + r.setState("", structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg)) + } + + driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env, eventEmitter) driver, err := driver.NewDriver(r.task.Driver, driverCtx) if err != nil { return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v", @@ -1019,17 +1028,31 @@ func (r *TaskRunner) startTask() error { // Create a driver driver, err := r.createDriver() if err != nil { - return fmt.Errorf("failed to create driver of task '%s' for alloc '%s': %v", + return fmt.Errorf("failed to create driver of task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err) } + // Run prestart + if err := driver.Prestart(r.ctx, r.task); err != nil { + wrapped := fmt.Errorf("failed to initialize task %q for alloc %q: %v", + r.task.Name, r.alloc.ID, err) + + r.logger.Printf("[WARN] client: %v", wrapped) + + if rerr, ok := err.(*structs.RecoverableError); ok { + return structs.NewRecoverableError(wrapped, rerr.Recoverable) + } + + return wrapped + } + // Start the job handle, err := driver.Start(r.ctx, r.task) if err != nil { - wrapped := fmt.Errorf("failed to start task '%s' for alloc '%s': %v", + wrapped := fmt.Errorf("failed to start task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err) - r.logger.Printf("[INFO] client: %v", wrapped) + r.logger.Printf("[WARN] client: %v", wrapped) if rerr, ok := err.(*structs.RecoverableError); ok { return structs.NewRecoverableError(wrapped, rerr.Recoverable) diff --git a/command/alloc_status.go b/command/alloc_status.go index f936e0df7..45cc15b7c 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -380,6 +380,8 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { } else { desc = "Task signaled to restart" } + case api.TaskDriverMessage: + desc = event.DriverMessage } // Reverse order so we are sorted by time diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5eef36fd7..eddd2cac5 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2556,6 +2556,11 @@ const ( // TaskSiblingFailed indicates that a sibling task in the task group has // failed. TaskSiblingFailed = "Sibling task failed" + + // TaskDriverMessage is an informational event message emitted by + // drivers such as when they're performing a long running action like + // downloading an image. + TaskDriverMessage = "Driver" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -2614,6 +2619,9 @@ type TaskEvent struct { // TaskSignal is the signal that was sent to the task TaskSignal string + + // DriverMessage indicates a driver action being taken. + DriverMessage string } func (te *TaskEvent) GoString() string { @@ -2742,6 +2750,11 @@ func (e *TaskEvent) SetVaultRenewalError(err error) *TaskEvent { return e } +func (e *TaskEvent) SetDriverMessage(m string) *TaskEvent { + e.DriverMessage = m + return e +} + // TaskArtifact is an artifact to download before running the task. type TaskArtifact struct { // GetterSource is the source to download an artifact using go-getter