client: working on runners

This commit is contained in:
Armon Dadgar
2015-08-29 15:46:10 -07:00
parent 14e7306524
commit 4106e794a8
3 changed files with 197 additions and 29 deletions

View File

@@ -1,13 +1,28 @@
package client
import (
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 15 * time.Second
)
// taskStatus is used to track the status of a task
type taskStatus struct {
Status string
Description string
}
// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
client *Client
@@ -15,9 +30,14 @@ type AllocRunner struct {
alloc *structs.Allocation
dirtyCh chan struct{}
ctx *driver.ExecContext
tasks map[string]*TaskRunner
taskStatus map[string]taskStatus
taskStatusLock sync.RWMutex
updateCh chan *structs.Allocation
destroy bool
@@ -28,12 +48,14 @@ type AllocRunner struct {
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(client *Client, alloc *structs.Allocation) *AllocRunner {
ctx := &AllocRunner{
client: client,
logger: client.logger,
alloc: alloc,
tasks: make(map[string]*TaskRunner),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
client: client,
logger: client.logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStatus: make(map[string]taskStatus),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
}
return ctx
}
@@ -43,16 +65,107 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
return r.alloc
}
// setAlloc is used to update the allocation of the runner
// we preserve the existing client status and description
func (r *AllocRunner) setAlloc(alloc *structs.Allocation) {
if r.alloc != nil {
alloc.ClientStatus = r.alloc.ClientStatus
alloc.ClientDescription = r.alloc.ClientDescription
}
r.alloc = alloc
}
// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() {
var retryCh <-chan time.Time
for {
select {
case <-retryCh:
case <-r.dirtyCh:
case <-r.destroyCh:
return
}
// Scan the task status to termine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
pending = len(r.taskStatus) < len(r.tasks)
for _, status := range r.taskStatus {
switch status.Status {
case structs.AllocClientStatusRunning:
running = true
case structs.AllocClientStatusDead:
dead = true
case structs.AllocClientStatusFailed:
failed = true
}
}
if len(r.taskStatus) > 0 {
taskDesc, _ := json.Marshal(r.taskStatus)
r.alloc.ClientDescription = string(taskDesc)
}
r.taskStatusLock.RUnlock()
// Determine the alloc status
if failed {
r.alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
r.alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
r.alloc.ClientStatus = structs.AllocClientStatusDead
}
// Attempt to update the status
if err := r.client.updateAllocStatus(r.alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
r.alloc.ID, r.alloc.ClientStatus, err)
retryCh = time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv))
}
retryCh = nil
}
}
// setStatus is used to update the allocation status
func (r *AllocRunner) setStatus(status, desc string) {
r.alloc.ClientStatus = status
r.alloc.ClientDescription = desc
select {
case r.dirtyCh <- struct{}{}:
default:
}
}
// setTaskStatus is used to set the status of a task
func (r *AllocRunner) setTaskStatus(taskName, status, desc string) {
r.taskStatusLock.Lock()
defer r.taskStatusLock.Unlock()
r.taskStatus[taskName] = taskStatus{
Status: status,
Description: desc,
}
select {
case r.dirtyCh <- struct{}{}:
default:
}
}
// Run is a long running goroutine used to manage an allocation
func (r *AllocRunner) Run() {
r.logger.Printf("[DEBUG] client: starting context for alloc '%s'", r.alloc.ID)
go r.syncStatus()
// Find our task group in the allocation
// Check if the allocation is in a terminal status
alloc := r.alloc
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: aborting runner for alloc '%s', terminal status", r.alloc.ID)
return
}
r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID)
// Find the task group to run in the allocation
tg := alloc.Job.LookingTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
// TODO: Err out
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("missing task group '%s'", alloc.TaskGroup))
return
}
@@ -66,17 +179,38 @@ func (r *AllocRunner) Run() {
go tr.Run()
}
OUTER:
// Wait for updates
for {
select {
case update := <-r.updateCh:
// TODO: Update
r.alloc = update
// Check if we're in a terminal status
if update.TerminalStatus() {
r.setAlloc(update)
break OUTER
}
// Update the task groups
for _, task := range tg.Tasks {
tr := r.tasks[task.Name]
tr.Update(task)
}
case <-r.destroyCh:
// TODO: Destroy
return
break OUTER
}
}
// Destroy each sub-task
for _, tr := range r.tasks {
tr.Destroy()
}
// Wait for termination of the task runners
for _, tr := range r.tasks {
<-tr.WaitCh()
}
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
}
// Update is used to update the allocation of the context

View File

@@ -55,6 +55,9 @@ type DriverHandle interface {
// WaitCh is used to return a channel used wait for task completion
WaitCh() chan error
// Update is used to update the task if possible
Update(task *structs.Task) error
// Kill is used to stop the task
Kill() error
}

View File

@@ -1,6 +1,7 @@
package client
import (
"fmt"
"log"
"sync"
@@ -14,13 +15,16 @@ type TaskRunner struct {
logger *log.Logger
ctx *driver.ExecContext
task *structs.Task
allocID string
task *structs.Task
updateCh chan *structs.Task
destroy bool
destroyCh chan struct{}
destroyLock sync.Mutex
waitCh chan struct{}
}
// NewTaskRunner is used to create a new task context
@@ -29,37 +33,51 @@ func NewTaskRunner(allocRunner *AllocRunner, ctx *driver.ExecContext, task *stru
allocRunner: allocRunner,
logger: allocRunner.logger,
ctx: ctx,
allocID: allocRunner.Alloc().ID,
task: task,
updateCh: make(chan *structs.Task, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}
return tc
}
// Alloc returns the associated alloc
func (r *TaskRunner) Alloc() *structs.Allocation {
return r.allocRunner.Alloc()
// WaitCh returns a channel to wait for termination
func (r *TaskRunner) WaitCh() <-chan struct{} {
return r.waitCh
}
// setStatus is used to update the status of the task runner
func (r *TaskRunner) setStatus(status, desc string) {
r.allocRunner.setTaskStatus(r.task.Name, status, desc)
}
// Run is a long running routine used to manage the task
func (r *TaskRunner) Run() {
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", r.task.Name, r.Alloc().ID)
defer close(r.waitCh)
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')",
r.task.Name, r.allocID)
// Create the driver
driver, err := driver.NewDriver(r.task.Driver, r.logger)
if err != nil {
r.logger.Printf("[ERR] client: failed to create driver '%s' for alloc '%s'", r.task.Driver, r.Alloc().ID)
// TODO Err return
r.logger.Printf("[ERR] client: failed to create driver '%s' for alloc '%s'",
r.task.Driver, r.allocID)
r.setStatus(structs.AllocClientStatusFailed,
fmt.Sprintf("failed to create driver '%s'", r.task.Driver))
return
}
// Start the job
handle, err := driver.Start(r.ctx, r.task)
if err != nil {
r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s'", r.task.Name, r.Alloc().ID)
// TODO Err return
r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
r.setStatus(structs.AllocClientStatusFailed,
fmt.Sprintf("failed to start: %v", err))
return
}
r.setStatus(structs.AllocClientStatusRunning, "task started")
// Wait for updates
for {
@@ -67,19 +85,31 @@ func (r *TaskRunner) Run() {
case err := <-handle.WaitCh():
if err != nil {
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v",
r.task.Name, r.Alloc().ID, err)
r.task.Name, r.allocID, err)
r.setStatus(structs.AllocClientStatusDead,
fmt.Sprintf("task failed with: %v", err))
} else {
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'",
r.task.Name, r.Alloc().ID)
r.task.Name, r.allocID)
r.setStatus(structs.AllocClientStatusDead,
"task completed")
}
return
// TODO:
case update := <-r.updateCh:
// TODO: Update
// Update
r.task = update
if err := handle.Update(update); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
}
case <-r.destroyCh:
// TODO: Destroy
return
// Send the kill signal, and use the WaitCh to block until complete
if err := handle.Kill(); err != nil {
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
}
}
}
}
@@ -89,7 +119,8 @@ func (r *TaskRunner) Update(update *structs.Task) {
select {
case r.updateCh <- update:
default:
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", update.Name, r.Alloc().ID)
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')",
update.Name, r.allocID)
}
}