From 4106e794a8ef84a9fa5952881236d3193e89b262 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 29 Aug 2015 15:46:10 -0700 Subject: [PATCH] client: working on runners --- client/alloc_runner.go | 160 ++++++++++++++++++++++++++++++++++++---- client/driver/driver.go | 3 + client/task_runner.go | 63 ++++++++++++---- 3 files changed, 197 insertions(+), 29 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 82baaca53..468c4216f 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -1,13 +1,28 @@ package client import ( + "encoding/json" + "fmt" "log" "sync" + "time" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // allocSyncRetryIntv is the interval on which we retry updating + // the status of the allocation + allocSyncRetryIntv = 15 * time.Second +) + +// taskStatus is used to track the status of a task +type taskStatus struct { + Status string + Description string +} + // AllocRunner is used to wrap an allocation and provide the execution context. type AllocRunner struct { client *Client @@ -15,9 +30,14 @@ type AllocRunner struct { alloc *structs.Allocation + dirtyCh chan struct{} + ctx *driver.ExecContext tasks map[string]*TaskRunner + taskStatus map[string]taskStatus + taskStatusLock sync.RWMutex + updateCh chan *structs.Allocation destroy bool @@ -28,12 +48,14 @@ type AllocRunner struct { // NewAllocRunner is used to create a new allocation context func NewAllocRunner(client *Client, alloc *structs.Allocation) *AllocRunner { ctx := &AllocRunner{ - client: client, - logger: client.logger, - alloc: alloc, - tasks: make(map[string]*TaskRunner), - updateCh: make(chan *structs.Allocation, 8), - destroyCh: make(chan struct{}), + client: client, + logger: client.logger, + alloc: alloc, + dirtyCh: make(chan struct{}, 1), + tasks: make(map[string]*TaskRunner), + taskStatus: make(map[string]taskStatus), + updateCh: make(chan *structs.Allocation, 8), + destroyCh: make(chan struct{}), } return ctx } @@ -43,16 +65,107 @@ func (r *AllocRunner) Alloc() *structs.Allocation { return r.alloc } +// setAlloc is used to update the allocation of the runner +// we preserve the existing client status and description +func (r *AllocRunner) setAlloc(alloc *structs.Allocation) { + if r.alloc != nil { + alloc.ClientStatus = r.alloc.ClientStatus + alloc.ClientDescription = r.alloc.ClientDescription + } + r.alloc = alloc +} + +// syncStatus is used to run and sync the status when it changes +func (r *AllocRunner) syncStatus() { + var retryCh <-chan time.Time + for { + select { + case <-retryCh: + case <-r.dirtyCh: + case <-r.destroyCh: + return + } + + // Scan the task status to termine the status of the alloc + var pending, running, dead, failed bool + r.taskStatusLock.RLock() + pending = len(r.taskStatus) < len(r.tasks) + for _, status := range r.taskStatus { + switch status.Status { + case structs.AllocClientStatusRunning: + running = true + case structs.AllocClientStatusDead: + dead = true + case structs.AllocClientStatusFailed: + failed = true + } + } + if len(r.taskStatus) > 0 { + taskDesc, _ := json.Marshal(r.taskStatus) + r.alloc.ClientDescription = string(taskDesc) + } + r.taskStatusLock.RUnlock() + + // Determine the alloc status + if failed { + r.alloc.ClientStatus = structs.AllocClientStatusFailed + } else if running { + r.alloc.ClientStatus = structs.AllocClientStatusRunning + } else if dead && !pending { + r.alloc.ClientStatus = structs.AllocClientStatusDead + } + + // Attempt to update the status + if err := r.client.updateAllocStatus(r.alloc); err != nil { + r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s", + r.alloc.ID, r.alloc.ClientStatus, err) + retryCh = time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv)) + } + retryCh = nil + } +} + +// setStatus is used to update the allocation status +func (r *AllocRunner) setStatus(status, desc string) { + r.alloc.ClientStatus = status + r.alloc.ClientDescription = desc + select { + case r.dirtyCh <- struct{}{}: + default: + } +} + +// setTaskStatus is used to set the status of a task +func (r *AllocRunner) setTaskStatus(taskName, status, desc string) { + r.taskStatusLock.Lock() + defer r.taskStatusLock.Unlock() + r.taskStatus[taskName] = taskStatus{ + Status: status, + Description: desc, + } + select { + case r.dirtyCh <- struct{}{}: + default: + } +} + // Run is a long running goroutine used to manage an allocation func (r *AllocRunner) Run() { - r.logger.Printf("[DEBUG] client: starting context for alloc '%s'", r.alloc.ID) + go r.syncStatus() - // Find our task group in the allocation + // Check if the allocation is in a terminal status alloc := r.alloc + if alloc.TerminalStatus() { + r.logger.Printf("[DEBUG] client: aborting runner for alloc '%s', terminal status", r.alloc.ID) + return + } + r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID) + + // Find the task group to run in the allocation tg := alloc.Job.LookingTaskGroup(alloc.TaskGroup) if tg == nil { r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) - // TODO: Err out + r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("missing task group '%s'", alloc.TaskGroup)) return } @@ -66,17 +179,38 @@ func (r *AllocRunner) Run() { go tr.Run() } +OUTER: // Wait for updates for { select { case update := <-r.updateCh: - // TODO: Update - r.alloc = update + // Check if we're in a terminal status + if update.TerminalStatus() { + r.setAlloc(update) + break OUTER + } + + // Update the task groups + for _, task := range tg.Tasks { + tr := r.tasks[task.Name] + tr.Update(task) + } + case <-r.destroyCh: - // TODO: Destroy - return + break OUTER } } + + // Destroy each sub-task + for _, tr := range r.tasks { + tr.Destroy() + } + + // Wait for termination of the task runners + for _, tr := range r.tasks { + <-tr.WaitCh() + } + r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) } // Update is used to update the allocation of the context diff --git a/client/driver/driver.go b/client/driver/driver.go index a2f7f7643..262c3a71a 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -55,6 +55,9 @@ type DriverHandle interface { // WaitCh is used to return a channel used wait for task completion WaitCh() chan error + // Update is used to update the task if possible + Update(task *structs.Task) error + // Kill is used to stop the task Kill() error } diff --git a/client/task_runner.go b/client/task_runner.go index d6b9e36e8..7f243d268 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1,6 +1,7 @@ package client import ( + "fmt" "log" "sync" @@ -14,13 +15,16 @@ type TaskRunner struct { logger *log.Logger ctx *driver.ExecContext - task *structs.Task + allocID string + task *structs.Task updateCh chan *structs.Task destroy bool destroyCh chan struct{} destroyLock sync.Mutex + + waitCh chan struct{} } // NewTaskRunner is used to create a new task context @@ -29,37 +33,51 @@ func NewTaskRunner(allocRunner *AllocRunner, ctx *driver.ExecContext, task *stru allocRunner: allocRunner, logger: allocRunner.logger, ctx: ctx, + allocID: allocRunner.Alloc().ID, task: task, updateCh: make(chan *structs.Task, 8), destroyCh: make(chan struct{}), + waitCh: make(chan struct{}), } return tc } -// Alloc returns the associated alloc -func (r *TaskRunner) Alloc() *structs.Allocation { - return r.allocRunner.Alloc() +// WaitCh returns a channel to wait for termination +func (r *TaskRunner) WaitCh() <-chan struct{} { + return r.waitCh +} + +// setStatus is used to update the status of the task runner +func (r *TaskRunner) setStatus(status, desc string) { + r.allocRunner.setTaskStatus(r.task.Name, status, desc) } // Run is a long running routine used to manage the task func (r *TaskRunner) Run() { - r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", r.task.Name, r.Alloc().ID) + defer close(r.waitCh) + r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", + r.task.Name, r.allocID) // Create the driver driver, err := driver.NewDriver(r.task.Driver, r.logger) if err != nil { - r.logger.Printf("[ERR] client: failed to create driver '%s' for alloc '%s'", r.task.Driver, r.Alloc().ID) - // TODO Err return + r.logger.Printf("[ERR] client: failed to create driver '%s' for alloc '%s'", + r.task.Driver, r.allocID) + r.setStatus(structs.AllocClientStatusFailed, + fmt.Sprintf("failed to create driver '%s'", r.task.Driver)) return } // Start the job handle, err := driver.Start(r.ctx, r.task) if err != nil { - r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s'", r.task.Name, r.Alloc().ID) - // TODO Err return + r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v", + r.task.Name, r.allocID, err) + r.setStatus(structs.AllocClientStatusFailed, + fmt.Sprintf("failed to start: %v", err)) return } + r.setStatus(structs.AllocClientStatusRunning, "task started") // Wait for updates for { @@ -67,19 +85,31 @@ func (r *TaskRunner) Run() { case err := <-handle.WaitCh(): if err != nil { r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", - r.task.Name, r.Alloc().ID, err) + r.task.Name, r.allocID, err) + r.setStatus(structs.AllocClientStatusDead, + fmt.Sprintf("task failed with: %v", err)) } else { r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", - r.task.Name, r.Alloc().ID) + r.task.Name, r.allocID) + r.setStatus(structs.AllocClientStatusDead, + "task completed") } return - // TODO: + case update := <-r.updateCh: - // TODO: Update + // Update r.task = update + if err := handle.Update(update); err != nil { + r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", + r.task.Name, r.allocID, err) + } + case <-r.destroyCh: - // TODO: Destroy - return + // Send the kill signal, and use the WaitCh to block until complete + if err := handle.Kill(); err != nil { + r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", + r.task.Name, r.allocID, err) + } } } } @@ -89,7 +119,8 @@ func (r *TaskRunner) Update(update *structs.Task) { select { case r.updateCh <- update: default: - r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", update.Name, r.Alloc().ID) + r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", + update.Name, r.allocID) } }