Add PortMap to struct returned by Driver.Prestart

Moves env.Builder out of drivers entirely so one less thing to worry
about when implementing driver plugins.
This commit is contained in:
Michael Schurter
2017-05-19 11:08:49 -07:00
parent 37c1cbc9cf
commit 6db35013d2
10 changed files with 71 additions and 58 deletions

View File

@@ -180,7 +180,7 @@ func (c *DockerDriverConfig) Validate() error {
// NewDockerDriverConfig returns a docker driver config by parsing the HCL
// config
func NewDockerDriverConfig(task *structs.Task, envBuilder *env.Builder) (*DockerDriverConfig, error) {
func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnv) (*DockerDriverConfig, error) {
var dconf DockerDriverConfig
if err := mapstructure.WeakDecode(task.Config, &dconf); err != nil {
@@ -188,7 +188,6 @@ func NewDockerDriverConfig(task *structs.Task, envBuilder *env.Builder) (*Docker
}
// Interpolate everthing that is a string
env := envBuilder.Build()
dconf.ImageName = env.ReplaceEnv(dconf.ImageName)
dconf.Command = env.ReplaceEnv(dconf.Command)
dconf.IpcMode = env.ReplaceEnv(dconf.IpcMode)
@@ -456,8 +455,8 @@ func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoord
return GetDockerCoordinator(config), fmt.Sprintf("%s-%s", d.DriverContext.allocID, d.DriverContext.taskName)
}
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
driverConfig, err := NewDockerDriverConfig(task, d.envBuilder)
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*PrestartResponse, error) {
driverConfig, err := NewDockerDriverConfig(task, d.taskEnv)
if err != nil {
return nil, err
}
@@ -477,10 +476,11 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR
return nil, err
}
res := NewCreatedResources()
res.Add(dockerImageResKey, id)
resp := NewPrestartResponse()
resp.CreatedResources.Add(dockerImageResKey, id)
resp.PortMap = d.driverConfig.PortMap
d.imageID = id
return res, nil
return resp, nil
}
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
@@ -496,7 +496,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.envBuilder.Build(),
TaskEnv: d.taskEnv,
Task: task,
Driver: "docker",
AllocID: d.DriverContext.allocID,
@@ -900,14 +900,11 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
d.logger.Printf("[DEBUG] driver.docker: exposed port %s", containerPort)
}
d.envBuilder.SetPortMap(driverConfig.PortMap)
hostConfig.PortBindings = publishedPorts
config.ExposedPorts = exposedPorts
}
taskEnv := d.envBuilder.Build()
parsedArgs := taskEnv.ParseAndReplace(driverConfig.Args)
parsedArgs := d.taskEnv.ParseAndReplace(driverConfig.Args)
// If the user specified a custom command to run, we'll inject it here.
if driverConfig.Command != "" {
@@ -931,7 +928,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
d.logger.Printf("[DEBUG] driver.docker: applied labels on the container: %+v", config.Labels)
}
config.Env = taskEnv.List()
config.Env = d.taskEnv.List()
containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID)
d.logger.Printf("[DEBUG] driver.docker: setting container name to: %s", containerName)

View File

@@ -53,6 +53,24 @@ func NewDriver(name string, ctx *DriverContext) (Driver, error) {
// Factory is used to instantiate a new Driver
type Factory func(*DriverContext) Driver
// PrestartResponse is driver state returned by Driver.Prestart.
type PrestartResponse struct {
// CreatedResources by the driver.
CreatedResources *CreatedResources
// PortMap can be set by drivers to replace ports in environment
// variables with driver-specific mappings.
PortMap map[string]int
}
// NewPrestartResponse creates a new PrestartResponse with CreatedResources
// initialized.
func NewPrestartResponse() *PrestartResponse {
return &PrestartResponse{
CreatedResources: NewCreatedResources(),
}
}
// CreatedResources is a map of resources (eg downloaded images) created by a driver
// that must be cleaned up.
type CreatedResources struct {
@@ -176,7 +194,7 @@ type Driver interface {
// intialization steps like downloading images.
//
// CreatedResources may be non-nil even when an error occurs.
Prestart(*ExecContext, *structs.Task) (*CreatedResources, error)
Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error)
// Start is used to being task execution
Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error)
@@ -220,12 +238,12 @@ type LogEventFn func(message string, args ...interface{})
// node attributes into a Driver without having to change the Driver interface
// each time we do it. Used in conjection with Factory, above.
type DriverContext struct {
taskName string
allocID string
config *config.Config
logger *log.Logger
node *structs.Node
envBuilder *env.Builder
taskName string
allocID string
config *config.Config
logger *log.Logger
node *structs.Node
taskEnv *env.TaskEnv
emitEvent LogEventFn
}
@@ -241,15 +259,15 @@ func NewEmptyDriverContext() *DriverContext {
// private to the driver. If we want to change this later we can gorename all of
// the fields in DriverContext.
func NewDriverContext(taskName, allocID string, config *config.Config, node *structs.Node,
logger *log.Logger, envBuilder *env.Builder, eventEmitter LogEventFn) *DriverContext {
logger *log.Logger, taskEnv *env.TaskEnv, eventEmitter LogEventFn) *DriverContext {
return &DriverContext{
taskName: taskName,
allocID: allocID,
config: config,
node: node,
logger: logger,
envBuilder: envBuilder,
emitEvent: eventEmitter,
taskName: taskName,
allocID: allocID,
config: config,
node: node,
logger: logger,
taskEnv: taskEnv,
emitEvent: eventEmitter,
}
}

View File

@@ -98,7 +98,7 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
func (d *ExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *ExecDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}
@@ -124,7 +124,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.envBuilder.Build(),
TaskEnv: d.taskEnv,
Driver: "exec",
AllocID: d.DriverContext.allocID,
LogDir: ctx.TaskDir.LogDir,

View File

@@ -175,7 +175,7 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
return true, nil
}
func (d *JavaDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *JavaDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}
@@ -203,7 +203,7 @@ func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnv) (*JavaDriverConfi
}
func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
driverConfig, err := NewJavaDriverConfig(task, d.envBuilder.Build())
driverConfig, err := NewJavaDriverConfig(task, d.taskEnv)
if err != nil {
return nil, err
}
@@ -249,7 +249,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Set the context
executorCtx := &executor.ExecutorContext{
TaskEnv: d.envBuilder.Build(),
TaskEnv: d.taskEnv,
Driver: "java",
AllocID: d.DriverContext.allocID,
Task: task,

View File

@@ -173,7 +173,7 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e
return true, nil
}
func (d *LxcDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *LxcDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}

View File

@@ -1,4 +1,4 @@
// +build nomad_test
//+build nomad_test
package driver
@@ -84,7 +84,7 @@ func (d *MockDriver) FSIsolation() cstructs.FSIsolation {
return cstructs.FSIsolationNone
}
func (d *MockDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *MockDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}

View File

@@ -131,7 +131,7 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
return true, nil
}
func (d *QemuDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *QemuDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}
@@ -238,7 +238,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.envBuilder.Build(),
TaskEnv: d.taskEnv,
Driver: "qemu",
AllocID: d.DriverContext.allocID,
Task: task,

View File

@@ -106,7 +106,7 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo
return false, nil
}
func (d *RawExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *RawExecDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}
@@ -133,7 +133,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.envBuilder.Build(),
TaskEnv: d.taskEnv,
Driver: "raw_exec",
AllocID: d.DriverContext.allocID,
Task: task,
@@ -169,7 +169,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
taskEnv: d.envBuilder.Build(),
taskEnv: d.taskEnv,
taskDir: ctx.TaskDir,
}
go h.run()
@@ -218,7 +218,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
version: id.Version,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
taskEnv: d.envBuilder.Build(),
taskEnv: d.taskEnv,
taskDir: ctx.TaskDir,
}
go h.run()

View File

@@ -223,7 +223,7 @@ func (d *RktDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) (*PrestartResponse, error) {
return nil, nil
}
@@ -310,8 +310,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
cmdArgs = append(cmdArgs, fmt.Sprintf("--debug=%t", debug))
// Inject environment variables
taskEnv := d.envBuilder.Build()
for k, v := range taskEnv.Map() {
for k, v := range d.taskEnv.Map() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%q", k, v))
}
@@ -401,7 +400,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
// Add user passed arguments.
if len(driverConfig.Args) != 0 {
parsed := taskEnv.ParseAndReplace(driverConfig.Args)
parsed := d.taskEnv.ParseAndReplace(driverConfig.Args)
// Need to start arguments with "--"
if len(parsed) > 0 {
@@ -413,12 +412,6 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
}
}
// Set the host environment variables as by default they aren't set for
// image based drivers.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.envBuilder.SetHostEnvvars(filter)
taskEnv = d.envBuilder.Build()
pluginLogFile := filepath.Join(ctx.TaskDir.Dir, fmt.Sprintf("%s-executor.out", task.Name))
executorConfig := &dstructs.ExecutorConfig{
LogFile: pluginLogFile,
@@ -430,7 +423,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: taskEnv,
TaskEnv: d.taskEnv,
Driver: "rkt",
AllocID: d.DriverContext.allocID,
Task: task,
@@ -480,7 +473,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
maxKill := d.DriverContext.config.MaxKillTimeout
h := &rktHandle{
uuid: uuid,
env: taskEnv,
env: d.taskEnv,
taskDir: ctx.TaskDir,
pluginClient: pluginClient,
executor: execIntf,
@@ -522,7 +515,7 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error
// Return a driver handle
h := &rktHandle{
uuid: id.UUID,
env: d.envBuilder.Build(),
env: d.taskEnv,
taskDir: ctx.TaskDir,
pluginClient: pluginClient,
executorPid: id.ExecutorPid,

View File

@@ -488,7 +488,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
}
driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, r.envBuilder, eventEmitter)
driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, r.envBuilder.Build(), eventEmitter)
d, err := driver.NewDriver(r.task.Driver, driverCtx)
if err != nil {
return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
@@ -1303,13 +1303,18 @@ func (r *TaskRunner) startTask() error {
// Run prestart
ctx := driver.NewExecContext(r.taskDir)
res, err := drv.Prestart(ctx, r.task)
resp, err := drv.Prestart(ctx, r.task)
// Merge newly created resources into previously created resources
r.createdResourcesLock.Lock()
r.createdResources.Merge(res)
r.createdResources.Merge(resp.CreatedResources)
r.createdResourcesLock.Unlock()
// Update environment with PortMap if it was returned
if len(resp.PortMap) > 0 {
r.envBuilder.SetPortMap(resp.PortMap)
}
if err != nil {
wrapped := fmt.Sprintf("failed to initialize task %q for alloc %q: %v",
r.task.Name, r.alloc.ID, err)