Merge pull request #2054 from hashicorp/f-prestart

Add Driver.Prestart method
This commit is contained in:
Michael Schurter
2016-12-20 16:18:56 -08:00
committed by GitHub
21 changed files with 280 additions and 67 deletions

View File

@@ -238,6 +238,7 @@ type TaskState struct {
const (
TaskSetupFailure = "Setup Failure"
TaskDriverFailure = "Driver Failure"
TaskDriverMessage = "Driver"
TaskReceived = "Received"
TaskFailedValidation = "Failed Validation"
TaskStarted = "Started"
@@ -263,6 +264,7 @@ type TaskEvent struct {
RestartReason string
SetupError string
DriverError string
DriverMessage string
ExitCode int
Signal int
Message string

View File

@@ -810,7 +810,7 @@ func (c *Client) setupDrivers() error {
var avail []string
var skipped []string
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil)
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil, nil)
for name := range driver.BuiltinDrivers {
// Skip fingerprinting drivers that are not in the whitelist if it is
// enabled.

View File

@@ -91,6 +91,9 @@ const (
type DockerDriver struct {
DriverContext
imageID string
driverConfig *DockerDriverConfig
}
type DockerDriverAuth struct {
@@ -344,31 +347,29 @@ func (d *DockerDriver) Abilities() DriverAbilities {
}
}
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
// Set environment variables.
d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath).
SetTaskLocalDir(allocdir.TaskLocalContainerPath).SetSecretsDir(allocdir.TaskSecretsContainerPath).Build()
driverConfig, err := NewDockerDriverConfig(task, d.taskEnv)
if err != nil {
return nil, err
return err
}
cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true)
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
return fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
// Initialize docker API clients
client, waitClient, err := d.dockerClients()
client, _, err := d.dockerClients()
if err != nil {
return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err)
return fmt.Errorf("Failed to connect to docker daemon: %s", err)
}
if err := d.createImage(driverConfig, client, taskDir); err != nil {
return nil, err
return err
}
image := driverConfig.ImageName
@@ -376,14 +377,26 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
dockerImage, err := client.InspectImage(image)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed getting image id for %s: %s", image, err)
return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
return fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
}
d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID)
// Set state needed by Start()
d.imageID = dockerImage.ID
d.driverConfig = driverConfig
return nil
}
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "executor", pluginLogFile),
@@ -409,9 +422,9 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
// Only launch syslog server if we're going to use it!
syslogAddr := ""
if runtime.GOOS == "darwin" && len(driverConfig.Logging) == 0 {
if runtime.GOOS == "darwin" && len(d.driverConfig.Logging) == 0 {
d.logger.Printf("[DEBUG] driver.docker: disabling syslog driver as Docker for Mac workaround")
} else if len(driverConfig.Logging) == 0 || driverConfig.Logging[0].Type == "syslog" {
} else if len(d.driverConfig.Logging) == 0 || d.driverConfig.Logging[0].Type == "syslog" {
ss, err := exec.LaunchSyslogServer()
if err != nil {
pluginClient.Kill()
@@ -420,11 +433,11 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
syslogAddr = ss.Addr
}
config, err := d.createContainerConfig(ctx, task, driverConfig, syslogAddr)
config, err := d.createContainerConfig(ctx, task, d.driverConfig, syslogAddr)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err)
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", d.imageID, err)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err)
return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", d.imageID, err)
}
container, rerr := d.createContainer(config)
@@ -437,17 +450,17 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
d.logger.Printf("[INFO] driver.docker: created container %s", container.ID)
cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true)
// We don't need to start the container if the container is already running
// since we don't create containers which are already present on the host
// and are running
if !container.State.Running {
// Start the container
err := d.startContainer(container)
if err != nil {
if err := d.startContainer(container); err != nil {
d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err)
pluginClient.Kill()
err.Err = fmt.Sprintf("Failed to start container %s: %s", container.ID, err)
return nil, err
return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err)
}
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)
} else {
@@ -464,7 +477,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
pluginClient: pluginClient,
cleanupImage: cleanupImage,
logger: d.logger,
imageID: dockerImage.ID,
imageID: d.imageID,
containerID: container.ID,
version: d.config.Version,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
@@ -955,6 +968,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke
}
}
d.emitEvent("Downloading image %s:%s", repo, tag)
err := client.PullImage(pullOptions, authOptions)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)

View File

@@ -97,6 +97,10 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie
driver := NewDockerDriver(driverCtx)
copyImage(execCtx, task, "busybox.tar", t)
if err := driver.Prestart(execCtx, task); err != nil {
execCtx.AllocDir.Destroy()
t.Fatalf("error in prestart: %v", err)
}
handle, err := driver.Start(execCtx, task)
if err != nil {
execCtx.AllocDir.Destroy()
@@ -175,6 +179,9 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
d := NewDockerDriver(driverCtx)
copyImage(execCtx, task, "busybox.tar", t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -264,6 +271,9 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) {
// Copy the image into the task's directory
copyImage(execCtx, task, "busybox.tar", t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -324,9 +334,9 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)
_, err := d.Start(execCtx, task)
err := d.Prestart(execCtx, task)
if err == nil {
t.Fatalf("want err: %v", err)
t.Fatalf("want error in prestart: %v", err)
}
if rerr, ok := err.(*structs.RecoverableError); !ok {
@@ -374,6 +384,9 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
d := NewDockerDriver(driverCtx)
copyImage(execCtx, task, "busybox.tar", t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -464,6 +477,9 @@ func TestDockerDriver_StartN(t *testing.T) {
d := NewDockerDriver(driverCtx)
copyImage(execCtx, task, "busybox.tar", t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart #%d: %v", idx+1, err)
}
handles[idx], err = d.Start(execCtx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
@@ -521,6 +537,9 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
copyImage(execCtx, task, "busybox_musl.tar", t)
copyImage(execCtx, task, "busybox_glibc.tar", t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart #%d: %v", idx+1, err)
}
handles[idx], err = d.Start(execCtx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
@@ -857,6 +876,10 @@ func TestDockerDriver_User(t *testing.T) {
defer execCtx.AllocDir.Destroy()
copyImage(execCtx, task, "busybox.tar", t)
if err := driver.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
// It should fail because the user "alice" does not exist on the given
// image.
handle, err := driver.Start(execCtx, task)
@@ -1006,6 +1029,9 @@ done
fmt.Errorf("Failed to write data")
}
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -1082,13 +1108,17 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
}
}
taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, "")
taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, cfg, "")
if err != nil {
cleanup()
t.Fatalf("Failed to get task env: %v", err)
}
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv)
logger := testLogger()
emitter := func(m string, args ...interface{}) {
logger.Printf("[EVENT] "+m, args...)
}
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv, emitter)
driver := NewDockerDriver(driverCtx)
copyImage(execCtx, task, "busybox.tar", t)
@@ -1111,6 +1141,9 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) {
task, driver, execCtx, _, cleanup := setupDockerVolumes(t, cfg, tmpvol)
defer cleanup()
if err := driver.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
if _, err := driver.Start(execCtx, task); err == nil {
t.Fatalf("Started driver successfully when volumes should have been disabled.")
}
@@ -1121,6 +1154,9 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) {
task, driver, execCtx, fn, cleanup := setupDockerVolumes(t, cfg, ".")
defer cleanup()
if err := driver.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := driver.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -1159,6 +1195,9 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) {
task, driver, execCtx, hostpath, cleanup := setupDockerVolumes(t, cfg, tmpvol)
defer cleanup()
if err := driver.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := driver.Start(execCtx, task)
if err != nil {
t.Fatalf("Failed to start docker driver: %v", err)

View File

@@ -5,6 +5,7 @@ import (
"log"
"os"
"path/filepath"
"strings"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
@@ -51,6 +52,10 @@ type Driver interface {
// Drivers must support the fingerprint interface for detection
fingerprint.Fingerprint
// Prestart prepares the task environment and performs expensive
// intialization steps like downloading images.
Prestart(*ExecContext, *structs.Task) error
// Start is used to being task execution
Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error)
@@ -70,6 +75,9 @@ type DriverAbilities struct {
SendSignals bool
}
// LogEventFn is a callback which allows Drivers to emit task events.
type LogEventFn func(message string, args ...interface{})
// DriverContext is a means to inject dependencies such as loggers, configs, and
// node attributes into a Driver without having to change the Driver interface
// each time we do it. Used in conjection with Factory, above.
@@ -79,18 +87,14 @@ type DriverContext struct {
logger *log.Logger
node *structs.Node
taskEnv *env.TaskEnvironment
emitEvent LogEventFn
}
// NewEmptyDriverContext returns a DriverContext with all fields set to their
// zero value.
func NewEmptyDriverContext() *DriverContext {
return &DriverContext{
taskName: "",
config: nil,
node: nil,
logger: nil,
taskEnv: nil,
}
return &DriverContext{}
}
// NewDriverContext initializes a new DriverContext with the specified fields.
@@ -98,13 +102,14 @@ func NewEmptyDriverContext() *DriverContext {
// private to the driver. If we want to change this later we can gorename all of
// the fields in DriverContext.
func NewDriverContext(taskName string, config *config.Config, node *structs.Node,
logger *log.Logger, taskEnv *env.TaskEnvironment) *DriverContext {
logger *log.Logger, taskEnv *env.TaskEnvironment, eventEmitter LogEventFn) *DriverContext {
return &DriverContext{
taskName: taskName,
config: config,
node: node,
logger: logger,
taskEnv: taskEnv,
taskName: taskName,
config: config,
node: node,
logger: logger,
taskEnv: taskEnv,
emitEvent: eventEmitter,
}
}
@@ -148,7 +153,8 @@ func NewExecContext(alloc *allocdir.AllocDir, allocID string) *ExecContext {
// GetTaskEnv converts the alloc dir, the node, task and alloc into a
// TaskEnvironment.
func GetTaskEnv(allocDir *allocdir.AllocDir, node *structs.Node,
task *structs.Task, alloc *structs.Allocation, vaultToken string) (*env.TaskEnvironment, error) {
task *structs.Task, alloc *structs.Allocation, conf *config.Config,
vaultToken string) (*env.TaskEnvironment, error) {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
env := env.NewTaskEnvironment(node).
@@ -184,6 +190,10 @@ func GetTaskEnv(allocDir *allocdir.AllocDir, node *structs.Node,
env.SetVaultToken(vaultToken, task.Vault.Env)
}
// Set the host environment variables.
filter := strings.Split(conf.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
env.AppendHostEnvvars(filter)
return env.Build(), nil
}

View File

@@ -84,12 +84,16 @@ func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) {
alloc := mock.Alloc()
execCtx := NewExecContext(allocDir, alloc.ID)
taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, "")
taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task, alloc, cfg, "")
if err != nil {
return nil, nil
}
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv)
logger := testLogger()
emitter := func(m string, args ...interface{}) {
logger.Printf("[EVENT] "+m, args...)
}
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, logger, taskEnv, emitter)
return driverCtx, execCtx
}
@@ -119,7 +123,7 @@ func TestDriver_GetTaskEnv(t *testing.T) {
alloc := mock.Alloc()
alloc.Name = "Bar"
env, err := GetTaskEnv(nil, nil, task, alloc, "")
env, err := GetTaskEnv(nil, nil, task, alloc, testConfig(), "")
if err != nil {
t.Fatalf("GetTaskEnv() failed: %v", err)
}
@@ -157,8 +161,17 @@ func TestDriver_GetTaskEnv(t *testing.T) {
}
act := env.EnvMap()
if !reflect.DeepEqual(act, exp) {
t.Fatalf("GetTaskEnv() returned %#v; want %#v", act, exp)
// Since host env vars are included only ensure expected env vars are present
for expk, expv := range exp {
v, ok := act[expk]
if !ok {
t.Errorf("%q not found in task env", expk)
continue
}
if v != expv {
t.Errorf("Expected %s=%q but found %q", expk, expv, v)
}
}
}

View File

@@ -7,13 +7,11 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
@@ -92,6 +90,10 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
func (d *ExecDriver) Prestart(execctx *ExecContext, task *structs.Task) error {
return nil
}
func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig ExecDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
@@ -104,10 +106,6 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
// Set the host environment variables.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
// Get the task directory for storing the executor logs.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {

View File

@@ -65,6 +65,9 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -105,6 +108,9 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -165,6 +171,9 @@ func TestExecDriver_Start_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -215,6 +224,9 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -265,6 +277,9 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -327,6 +342,9 @@ done
fmt.Errorf("Failed to write data")
}
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -388,6 +406,9 @@ func TestExecDriverUser(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err == nil {
handle.Kill()

View File

@@ -163,16 +163,16 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
return true, nil
}
func (d *JavaDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
}
func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig JavaDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
}
// Set the host environment variables.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)

View File

@@ -92,6 +92,9 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) {
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -142,6 +145,9 @@ func TestJavaDriver_Start_Wait(t *testing.T) {
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -204,6 +210,9 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) {
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -262,6 +271,9 @@ func TestJavaDriver_Signal(t *testing.T) {
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -317,6 +329,9 @@ func TestJavaDriverUser(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewJavaDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err == nil {
handle.Kill()

View File

@@ -168,6 +168,13 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e
return true, nil
}
func (d *LxcDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath)
d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath)
return nil
}
// Start starts the LXC Driver
func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig LxcDriverConfig

View File

@@ -69,6 +69,9 @@ func TestLxcDriver_Start_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewLxcDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -141,6 +144,9 @@ func TestLxcDriver_Open_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewLxcDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)

View File

@@ -75,6 +75,10 @@ func (d *MockDriver) Abilities() DriverAbilities {
}
}
func (d *MockDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
}
// Start starts the mock driver
func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig MockDriverConfig

View File

@@ -135,6 +135,10 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
return true, nil
}
func (d *QemuDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
}
// Run an existing Qemu image. Start() will pull down an existing, valid Qemu
// image and save it to the Drivers Allocation Dir
func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {

View File

@@ -7,7 +7,6 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/hashicorp/go-plugin"
@@ -107,6 +106,10 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo
return false, nil
}
func (d *RawExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
}
func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig ExecDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
@@ -125,10 +128,6 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
return nil, err
}
// Set the host environment variables.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)

View File

@@ -75,6 +75,9 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -120,6 +123,9 @@ func TestRawExecDriver_Start_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -170,6 +176,9 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -219,6 +228,9 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -268,6 +280,9 @@ func TestRawExecDriverUser(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err == nil {
handle.Kill()
@@ -313,6 +328,9 @@ done
fmt.Errorf("Failed to write data")
}
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)

View File

@@ -206,6 +206,13 @@ func (d *RktDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath)
d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath)
return nil
}
// Run an existing Rkt image.
func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig RktDriverConfig
@@ -289,10 +296,6 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
cmdArgs = append(cmdArgs, fmt.Sprintf("--debug=%t", debug))
// Inject environment variables
d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath)
d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath)
d.taskEnv.Build()
for k, v := range d.taskEnv.EnvMap() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%v", k, v))
}

View File

@@ -98,6 +98,9 @@ func TestRktDriver_Start_DNS(t *testing.T) {
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -145,6 +148,9 @@ func TestRktDriver_Start_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -202,6 +208,9 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -252,6 +261,7 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) {
"-c",
fmt.Sprintf(`echo -n %s > foo/%s`, string(exp), file),
},
"net": []string{"none"},
"volumes": []string{fmt.Sprintf("%s:/foo", tmpvol)},
},
LogConfig: &structs.LogConfig{
@@ -268,6 +278,9 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -326,6 +339,9 @@ func TestRktDriverUser(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err == nil {
handle.Kill()
@@ -364,6 +380,9 @@ func TestRktTrustPrefix(t *testing.T) {
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err == nil {
handle.Kill()
@@ -438,6 +457,9 @@ func TestRktDriver_PortsMapping(t *testing.T) {
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)

View File

@@ -324,7 +324,8 @@ func (r *TaskRunner) setTaskEnv() error {
r.taskEnvLock.Lock()
defer r.taskEnvLock.Unlock()
taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task.Copy(), r.alloc, r.vaultFuture.Get())
taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node,
r.task.Copy(), r.alloc, r.config, r.vaultFuture.Get())
if err != nil {
return err
}
@@ -346,7 +347,15 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
return nil, fmt.Errorf("task environment not made for task %q in allocation %q", r.task.Name, r.alloc.ID)
}
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env)
// Create a task-specific event emitter callback to expose minimal
// state to drivers
eventEmitter := func(m string, args ...interface{}) {
msg := fmt.Sprintf(m, args...)
r.logger.Printf("[DEBUG] client: driver event for alloc %q: %s", r.alloc.ID, msg)
r.setState("", structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
}
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env, eventEmitter)
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
if err != nil {
return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
@@ -1019,17 +1028,31 @@ func (r *TaskRunner) startTask() error {
// Create a driver
driver, err := r.createDriver()
if err != nil {
return fmt.Errorf("failed to create driver of task '%s' for alloc '%s': %v",
return fmt.Errorf("failed to create driver of task %q for alloc %q: %v",
r.task.Name, r.alloc.ID, err)
}
// Run prestart
if err := driver.Prestart(r.ctx, r.task); err != nil {
wrapped := fmt.Errorf("failed to initialize task %q for alloc %q: %v",
r.task.Name, r.alloc.ID, err)
r.logger.Printf("[WARN] client: %v", wrapped)
if rerr, ok := err.(*structs.RecoverableError); ok {
return structs.NewRecoverableError(wrapped, rerr.Recoverable)
}
return wrapped
}
// Start the job
handle, err := driver.Start(r.ctx, r.task)
if err != nil {
wrapped := fmt.Errorf("failed to start task '%s' for alloc '%s': %v",
wrapped := fmt.Errorf("failed to start task %q for alloc %q: %v",
r.task.Name, r.alloc.ID, err)
r.logger.Printf("[INFO] client: %v", wrapped)
r.logger.Printf("[WARN] client: %v", wrapped)
if rerr, ok := err.(*structs.RecoverableError); ok {
return structs.NewRecoverableError(wrapped, rerr.Recoverable)

View File

@@ -380,6 +380,8 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
} else {
desc = "Task signaled to restart"
}
case api.TaskDriverMessage:
desc = event.DriverMessage
}
// Reverse order so we are sorted by time

View File

@@ -2556,6 +2556,11 @@ const (
// TaskSiblingFailed indicates that a sibling task in the task group has
// failed.
TaskSiblingFailed = "Sibling task failed"
// TaskDriverMessage is an informational event message emitted by
// drivers such as when they're performing a long running action like
// downloading an image.
TaskDriverMessage = "Driver"
)
// TaskEvent is an event that effects the state of a task and contains meta-data
@@ -2614,6 +2619,9 @@ type TaskEvent struct {
// TaskSignal is the signal that was sent to the task
TaskSignal string
// DriverMessage indicates a driver action being taken.
DriverMessage string
}
func (te *TaskEvent) GoString() string {
@@ -2742,6 +2750,11 @@ func (e *TaskEvent) SetVaultRenewalError(err error) *TaskEvent {
return e
}
func (e *TaskEvent) SetDriverMessage(m string) *TaskEvent {
e.DriverMessage = m
return e
}
// TaskArtifact is an artifact to download before running the task.
type TaskArtifact struct {
// GetterSource is the source to download an artifact using go-getter