client: making progress

This commit is contained in:
Armon Dadgar
2015-08-23 16:49:48 -07:00
parent 8b97638318
commit 0bf1aec4a5
5 changed files with 127 additions and 12 deletions

View File

@@ -4,6 +4,7 @@ import (
"log"
"sync"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -14,6 +15,9 @@ type AllocRunner struct {
alloc *structs.Allocation
ctx *driver.ExecContext
tasks map[string]*TaskRunner
updateCh chan *structs.Allocation
destroy bool
@@ -27,6 +31,7 @@ func NewAllocRunner(client *Client, alloc *structs.Allocation) *AllocRunner {
client: client,
logger: client.logger,
alloc: alloc,
tasks: make(map[string]*TaskRunner),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
}
@@ -42,7 +47,26 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
func (r *AllocRunner) Run() {
r.logger.Printf("[DEBUG] client: starting context for alloc '%s'", r.alloc.ID)
// TODO: Start
// Find our task group in the allocation
alloc := r.alloc
tg := alloc.Job.LookingTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
// TODO: Err out
return
}
// Create the execution context
r.ctx = driver.NewExecContext()
// Start the task runners
for _, task := range tg.Tasks {
tr := NewTaskRunner(r, r.ctx, task)
r.tasks[task.Name] = tr
go tr.Run()
}
// Wait for updates
for {
select {
case update := <-r.updateCh:

View File

@@ -6,6 +6,7 @@ import (
"sync"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad/structs"
)
// BuiltinDrivers contains the built in registered drivers
@@ -37,6 +38,25 @@ type Factory func(*log.Logger) Driver
type Driver interface {
// Drivers must support the fingerprint interface for detection
fingerprint.Fingerprint
// Start is used to being task execution
Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error)
// Open is used to re-open a handle to a task
Open(ctx *ExecContext, handleID string) (DriverHandle, error)
}
// DriverHandle is an opaque handle into a driver used for task
// manipulation
type DriverHandle interface {
// Returns an opaque handle that can be used to re-open the handle
ID() string
// WaitCh is used to return a channel used wait for task completion
WaitCh() chan error
// Kill is used to stop the task
Kill() error
}
// ExecContext is shared between drivers within an allocation

View File

@@ -13,6 +13,11 @@ type ExecDriver struct {
logger *log.Logger
}
// execHandle is returned from Start/Open as a handle to the PID
type execHandle struct {
waitCh chan error
}
// NewExecDriver is used to create a new exec driver
func NewExecDriver(logger *log.Logger) Driver {
d := &ExecDriver{
@@ -26,3 +31,25 @@ func (d *ExecDriver) Fingerprint(node *structs.Node) (bool, error) {
node.Attributes["driver.exec"] = "1"
return true, nil
}
func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
// TODO
return nil, nil
}
func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
// TODO
return nil, nil
}
func (h *execHandle) ID() string {
return "test"
}
func (h *execHandle) WaitCh() chan error {
return h.waitCh
}
func (h *execHandle) Kill() error {
return nil
}

View File

@@ -4,13 +4,15 @@ import (
"log"
"sync"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
)
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
type TaskRunner struct {
ctx *AllocRunner
logger *log.Logger
allocRunner *AllocRunner
logger *log.Logger
ctx *driver.ExecContext
task *structs.Task
@@ -22,24 +24,56 @@ type TaskRunner struct {
}
// NewTaskRunner is used to create a new task context
func NewTaskRunner(ctx *AllocRunner, task *structs.Task) *TaskRunner {
func NewTaskRunner(allocRunner *AllocRunner, ctx *driver.ExecContext, task *structs.Task) *TaskRunner {
tc := &TaskRunner{
ctx: ctx,
logger: ctx.logger,
task: task,
updateCh: make(chan *structs.Task, 8),
destroyCh: make(chan struct{}),
allocRunner: allocRunner,
logger: allocRunner.logger,
ctx: ctx,
task: task,
updateCh: make(chan *structs.Task, 8),
destroyCh: make(chan struct{}),
}
return tc
}
// Alloc returns the associated alloc
func (r *TaskRunner) Alloc() *structs.Allocation {
return r.allocRunner.Alloc()
}
// Run is a long running routine used to manage the task
func (r *TaskRunner) Run() {
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", r.task.Name, r.ctx.Alloc().ID)
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", r.task.Name, r.Alloc().ID)
// TODO: Start
// Create the driver
driver, err := driver.NewDriver(r.task.Driver, r.logger)
if err != nil {
r.logger.Printf("[ERR] client: failed to create driver '%s' for alloc '%s'", r.task.Driver, r.Alloc().ID)
// TODO Err return
return
}
// Start the job
handle, err := driver.Start(r.ctx, r.task)
if err != nil {
r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s'", r.task.Name, r.Alloc().ID)
// TODO Err return
return
}
// Wait for updates
for {
select {
case err := <-handle.WaitCh():
if err != nil {
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v",
r.task.Name, r.Alloc().ID, err)
} else {
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'",
r.task.Name, r.Alloc().ID)
}
return
// TODO:
case update := <-r.updateCh:
// TODO: Update
r.task = update
@@ -55,7 +89,7 @@ func (r *TaskRunner) Update(update *structs.Task) {
select {
case r.updateCh <- update:
default:
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", update.Name, r.ctx.Alloc().ID)
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", update.Name, r.Alloc().ID)
}
}

View File

@@ -589,6 +589,16 @@ type Job struct {
ModifyIndex uint64
}
// LookupTaskGroup finds a task group by name
func (j *Job) LookingTaskGroup(name string) *TaskGroup {
for _, tg := range j.TaskGroups {
if tg.Name == name {
return tg
}
}
return nil
}
// TaskGroup is an atomic unit of placement. Each task group belongs to
// a job and may contain any number of tasks. A task group support running
// in many replicas using the same configuration..