diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 3b4e09873..82baaca53 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -4,6 +4,7 @@ import ( "log" "sync" + "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" ) @@ -14,6 +15,9 @@ type AllocRunner struct { alloc *structs.Allocation + ctx *driver.ExecContext + tasks map[string]*TaskRunner + updateCh chan *structs.Allocation destroy bool @@ -27,6 +31,7 @@ func NewAllocRunner(client *Client, alloc *structs.Allocation) *AllocRunner { client: client, logger: client.logger, alloc: alloc, + tasks: make(map[string]*TaskRunner), updateCh: make(chan *structs.Allocation, 8), destroyCh: make(chan struct{}), } @@ -42,7 +47,26 @@ func (r *AllocRunner) Alloc() *structs.Allocation { func (r *AllocRunner) Run() { r.logger.Printf("[DEBUG] client: starting context for alloc '%s'", r.alloc.ID) - // TODO: Start + // Find our task group in the allocation + alloc := r.alloc + tg := alloc.Job.LookingTaskGroup(alloc.TaskGroup) + if tg == nil { + r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) + // TODO: Err out + return + } + + // Create the execution context + r.ctx = driver.NewExecContext() + + // Start the task runners + for _, task := range tg.Tasks { + tr := NewTaskRunner(r, r.ctx, task) + r.tasks[task.Name] = tr + go tr.Run() + } + + // Wait for updates for { select { case update := <-r.updateCh: diff --git a/client/driver/driver.go b/client/driver/driver.go index 4131d10fb..a2f7f7643 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/nomad/structs" ) // BuiltinDrivers contains the built in registered drivers @@ -37,6 +38,25 @@ type Factory func(*log.Logger) Driver type Driver interface { // Drivers must support the fingerprint interface for detection fingerprint.Fingerprint + + // Start is used to being task execution + Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) + + // Open is used to re-open a handle to a task + Open(ctx *ExecContext, handleID string) (DriverHandle, error) +} + +// DriverHandle is an opaque handle into a driver used for task +// manipulation +type DriverHandle interface { + // Returns an opaque handle that can be used to re-open the handle + ID() string + + // WaitCh is used to return a channel used wait for task completion + WaitCh() chan error + + // Kill is used to stop the task + Kill() error } // ExecContext is shared between drivers within an allocation diff --git a/client/driver/exec.go b/client/driver/exec.go index 79cf9b00c..553423903 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -13,6 +13,11 @@ type ExecDriver struct { logger *log.Logger } +// execHandle is returned from Start/Open as a handle to the PID +type execHandle struct { + waitCh chan error +} + // NewExecDriver is used to create a new exec driver func NewExecDriver(logger *log.Logger) Driver { d := &ExecDriver{ @@ -26,3 +31,25 @@ func (d *ExecDriver) Fingerprint(node *structs.Node) (bool, error) { node.Attributes["driver.exec"] = "1" return true, nil } + +func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { + // TODO + return nil, nil +} + +func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { + // TODO + return nil, nil +} + +func (h *execHandle) ID() string { + return "test" +} + +func (h *execHandle) WaitCh() chan error { + return h.waitCh +} + +func (h *execHandle) Kill() error { + return nil +} diff --git a/client/task_runner.go b/client/task_runner.go index e72319c14..d6b9e36e8 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -4,13 +4,15 @@ import ( "log" "sync" + "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" ) // TaskRunner is used to wrap a task within an allocation and provide the execution context. type TaskRunner struct { - ctx *AllocRunner - logger *log.Logger + allocRunner *AllocRunner + logger *log.Logger + ctx *driver.ExecContext task *structs.Task @@ -22,24 +24,56 @@ type TaskRunner struct { } // NewTaskRunner is used to create a new task context -func NewTaskRunner(ctx *AllocRunner, task *structs.Task) *TaskRunner { +func NewTaskRunner(allocRunner *AllocRunner, ctx *driver.ExecContext, task *structs.Task) *TaskRunner { tc := &TaskRunner{ - ctx: ctx, - logger: ctx.logger, - task: task, - updateCh: make(chan *structs.Task, 8), - destroyCh: make(chan struct{}), + allocRunner: allocRunner, + logger: allocRunner.logger, + ctx: ctx, + task: task, + updateCh: make(chan *structs.Task, 8), + destroyCh: make(chan struct{}), } return tc } +// Alloc returns the associated alloc +func (r *TaskRunner) Alloc() *structs.Allocation { + return r.allocRunner.Alloc() +} + // Run is a long running routine used to manage the task func (r *TaskRunner) Run() { - r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", r.task.Name, r.ctx.Alloc().ID) + r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", r.task.Name, r.Alloc().ID) - // TODO: Start + // Create the driver + driver, err := driver.NewDriver(r.task.Driver, r.logger) + if err != nil { + r.logger.Printf("[ERR] client: failed to create driver '%s' for alloc '%s'", r.task.Driver, r.Alloc().ID) + // TODO Err return + return + } + + // Start the job + handle, err := driver.Start(r.ctx, r.task) + if err != nil { + r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s'", r.task.Name, r.Alloc().ID) + // TODO Err return + return + } + + // Wait for updates for { select { + case err := <-handle.WaitCh(): + if err != nil { + r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", + r.task.Name, r.Alloc().ID, err) + } else { + r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", + r.task.Name, r.Alloc().ID) + } + return + // TODO: case update := <-r.updateCh: // TODO: Update r.task = update @@ -55,7 +89,7 @@ func (r *TaskRunner) Update(update *structs.Task) { select { case r.updateCh <- update: default: - r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", update.Name, r.ctx.Alloc().ID) + r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", update.Name, r.Alloc().ID) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fcd487a0d..725dae9e1 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -589,6 +589,16 @@ type Job struct { ModifyIndex uint64 } +// LookupTaskGroup finds a task group by name +func (j *Job) LookingTaskGroup(name string) *TaskGroup { + for _, tg := range j.TaskGroups { + if tg.Name == name { + return tg + } + } + return nil +} + // TaskGroup is an atomic unit of placement. Each task group belongs to // a job and may contain any number of tasks. A task group support running // in many replicas using the same configuration..