Cleaned up the logic to calculate restart duration

This commit is contained in:
Diptanu Choudhury
2015-11-05 11:12:31 -08:00
parent 2a9dd21a53
commit 2e168af9d3
5 changed files with 204 additions and 116 deletions

View File

@@ -59,10 +59,10 @@ type AllocRunner struct {
// allocRunnerState is used to snapshot the state of the alloc runner
type allocRunnerState struct {
Alloc *structs.Allocation
Alloc *structs.Allocation
RestartPolicy *structs.RestartPolicy
TaskStatus map[string]taskStatus
Context *driver.ExecContext
TaskStatus map[string]taskStatus
Context *driver.ExecContext
}
// NewAllocRunner is used to create a new allocation context
@@ -102,10 +102,11 @@ func (r *AllocRunner) RestoreState() error {
r.ctx = snap.Context
// Restore the task runners
jobType := r.alloc.Job.Type
var mErr multierror.Error
for name := range r.taskStatus {
task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, jobType, r.RestartPolicy)
r.tasks[name] = tr
if err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err)
@@ -121,10 +122,10 @@ func (r *AllocRunner) RestoreState() error {
func (r *AllocRunner) SaveState() error {
r.taskStatusLock.RLock()
snap := allocRunnerState{
Alloc: r.alloc,
Alloc: r.alloc,
RestartPolicy: r.RestartPolicy,
TaskStatus: r.taskStatus,
Context: r.ctx,
TaskStatus: r.taskStatus,
Context: r.ctx,
}
err := persistState(r.stateFilePath(), &snap)
r.taskStatusLock.RUnlock()
@@ -307,8 +308,8 @@ func (r *AllocRunner) Run() {
// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, r.RestartPolicy)
jobType := r.alloc.Job.Type
tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, jobType, r.RestartPolicy)
r.tasks[task.Name] = tr
go tr.Run()
}

81
client/restarts.go Normal file
View File

@@ -0,0 +1,81 @@
package client
import (
"github.com/hashicorp/nomad/nomad/structs"
"time"
)
// The errorCounter keeps track of the number of times a process has exited
// It returns the duration after which a task is restarted
// For Batch jobs, the interval is set to zero value since the takss
// will be restarted only upto maxAttempts times
type restartTracker interface {
nextRestart() (bool, time.Duration)
increment()
}
func newRestartTracker(jobType string, restartPolicy *structs.RestartPolicy) restartTracker {
switch jobType {
case structs.JobTypeService:
return &serviceRestartTracker{
maxAttempts: restartPolicy.Attempts,
startTime: time.Now(),
interval: restartPolicy.Interval,
delay: restartPolicy.Delay,
}
default:
return &batchRestartTracker{
maxAttempts: restartPolicy.Attempts,
delay: restartPolicy.Delay,
}
}
}
type batchRestartTracker struct {
maxAttempts int
delay time.Duration
count int
}
func (b *batchRestartTracker) increment() {
b.count = b.count + 1
}
func (b *batchRestartTracker) nextRestart() (bool, time.Duration) {
if b.count < b.maxAttempts {
return true, b.delay
}
return false, 0
}
type serviceRestartTracker struct {
maxAttempts int
delay time.Duration
interval time.Duration
count int
startTime time.Time
}
func (c *serviceRestartTracker) increment() {
if c.count <= c.maxAttempts {
c.count = c.count + 1
}
}
func (c *serviceRestartTracker) nextRestart() (bool, time.Duration) {
windowEndTime := c.startTime.Add(c.interval)
now := time.Now()
if now.After(windowEndTime) {
c.count = 0
c.startTime = time.Now()
return true, c.delay
}
if c.count < c.maxAttempts {
return true, c.delay
}
return true, windowEndTime.Sub(now)
}

39
client/restarts_test.go Normal file
View File

@@ -0,0 +1,39 @@
package client
import (
"github.com/hashicorp/nomad/nomad/structs"
"testing"
"time"
)
func TestTaskRunner_ServiceRestartCounter(t *testing.T) {
rt := newRestartTracker(structs.JobTypeService, &structs.RestartPolicy{Attempts: 2, Interval: 2 * time.Minute, Delay: 1 * time.Second})
rt.increment()
rt.increment()
rt.increment()
rt.increment()
rt.increment()
actual, when := rt.nextRestart()
if !actual {
t.Fatalf("Expect %v, Actual: %v", true, actual)
}
}
func TestTaskRunner_BatchRestartCounter(t *testing.T) {
rt := newRestartTracker(structs.JobTypeBatch, &structs.RestartPolicy{Attempts: 2, Interval: 1 * time.Second, Delay: 1 * time.Second})
rt.increment()
rt.increment()
rt.increment()
rt.increment()
rt.increment()
actual, _ := rt.nextRestart()
if actual {
t.Fatalf("Expect %v, Actual: %v", false, actual)
}
time.Sleep(1 * time.Second)
actual, _ = rt.nextRestart()
if actual {
t.Fatalf("Expect %v, Actual: %v", false, actual)
}
}

View File

@@ -15,39 +15,14 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
type errorCounter struct {
count int
maxAttempts int
startTime time.Time
interval time.Duration
}
func newErrorCounter(maxAttempts int, interval time.Duration) *errorCounter {
return &errorCounter{maxAttempts: maxAttempts, startTime: time.Now(), interval: interval}
}
func (c *errorCounter) Increment() {
if c.count <= c.maxAttempts {
c.count = c.count + 1
}
}
func (c *errorCounter) shouldRestart() bool {
if time.Now().After(c.startTime.Add(c.interval)) {
c.count = 0
c.startTime = time.Now()
}
return c.count < c.maxAttempts
}
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
type TaskRunner struct {
config *config.Config
updater TaskStateUpdater
logger *log.Logger
ctx *driver.ExecContext
allocID string
errorCounter *errorCounter
config *config.Config
updater TaskStateUpdater
logger *log.Logger
ctx *driver.ExecContext
allocID string
restartTracker restartTracker
task *structs.Task
restartPolicy *structs.RestartPolicy
@@ -72,22 +47,22 @@ type TaskStateUpdater func(taskName, status, desc string)
// NewTaskRunner is used to create a new task context
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, ctx *driver.ExecContext,
allocID string, task *structs.Task,
allocID string, task *structs.Task, taskType string,
restartPolicy *structs.RestartPolicy) *TaskRunner {
ec := newErrorCounter(restartPolicy.Attempts, restartPolicy.Interval)
rt := newRestartTracker(taskType, restartPolicy)
tc := &TaskRunner{
config: config,
updater: updater,
logger: logger,
errorCounter: ec,
ctx: ctx,
allocID: allocID,
task: task,
restartPolicy: restartPolicy,
updateCh: make(chan *structs.Task, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
config: config,
updater: updater,
logger: logger,
restartTracker: rt,
ctx: ctx,
allocID: allocID,
task: task,
restartPolicy: restartPolicy,
updateCh: make(chan *structs.Task, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}
return tc
}
@@ -195,27 +170,9 @@ func (r *TaskRunner) startTask() error {
return nil
}
func (r *TaskRunner) restartTask() (bool, error) {
r.errorCounter.Increment()
if !r.errorCounter.shouldRestart() {
r.logger.Printf("[INFO] client: Not restarting task since it has been started %v times since %v", r.errorCounter.count, r.errorCounter.startTime)
return false, nil
}
r.logger.Printf("[DEBUG] client: Sleeping for %v before restaring task: %v", r.restartPolicy.Delay, r.task.Name)
time.Sleep(r.restartPolicy.Delay)
r.logger.Printf("[DEBUG] client: Restarting Task: %v", r.task.Name)
if err := r.startTask(); err != nil {
r.logger.Printf("[ERR] client: Couldn't re-start task: %v because of error: %v", r.task.Name, err)
return false, err
}
r.logger.Printf("[INFO] client: Successfuly restated Task: %v", r.task.Name)
return true, nil
}
// Run is a long running routine used to manage the task
func (r *TaskRunner) Run() {
var err error
defer close(r.waitCh)
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')",
r.task.Name, r.allocID)
@@ -227,31 +184,62 @@ func (r *TaskRunner) Run() {
}
}
// Monitoring the Driver
err = r.monitorDriver(r.handle.WaitCh(), r.updateCh, r.destroyCh)
for err != nil {
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
r.restartTracker.increment()
shouldRestart, when := r.restartTracker.nextRestart()
if !shouldRestart {
r.logger.Printf("[INFO] Not restarting")
r.setStatus(structs.AllocClientStatusDead, fmt.Sprintf("task failed with: %v", err))
break
}
r.logger.Printf("[INFO] Restarting Task: %v", r.task.Name)
r.logger.Printf("[DEBUG] Sleeping for %v before restarting Task %v", when, r.task.Name)
ch := time.After(when)
L:
for {
select {
case <-ch:
break L
case <-r.destroyCh:
break L
}
}
r.destroyLock.Lock()
if r.destroy {
r.logger.Printf("[DEBUG] Not restarting task: %v because it's destroyed by user", r.task.Name)
break
}
if err = r.startTask(); err != nil {
r.destroyLock.Unlock()
continue
}
r.destroyLock.Unlock()
err = r.monitorDriver(r.handle.WaitCh(), r.updateCh, r.destroyCh)
}
// Cleanup after ourselves
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'",
r.task.Name, r.allocID)
r.setStatus(structs.AllocClientStatusDead,
"task completed")
r.DestroyState()
}
func (r *TaskRunner) monitorDriver(waitCh chan error, updateCh chan *structs.Task, destroyCh chan struct{}) error {
var err error
OUTER:
// Wait for updates
for {
select {
case err := <-r.handle.WaitCh():
if err != nil {
// Trying to restart the task
if _, err := r.restartTask(); err == nil {
// We have succesfully restarted the task, going
// back to listening to events
continue
}
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
r.setStatus(structs.AllocClientStatusDead,
fmt.Sprintf("task failed with: %v", err))
} else {
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'",
r.task.Name, r.allocID)
r.setStatus(structs.AllocClientStatusDead,
"task completed")
}
case err = <-waitCh:
break OUTER
case update := <-r.updateCh:
case update := <-updateCh:
// Update
r.task = update
if err := r.handle.Update(update); err != nil {
@@ -259,7 +247,7 @@ OUTER:
r.task.Name, r.allocID, err)
}
case <-r.destroyCh:
case <-destroyCh:
// Send the kill signal, and use the WaitCh to block until complete
if err := r.handle.Kill(); err != nil {
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v",
@@ -267,9 +255,7 @@ OUTER:
}
}
}
// Cleanup after ourselves
r.DestroyState()
return err
}
// Update is used to update the task of the context

View File

@@ -53,7 +53,7 @@ func testTaskRunner() (*MockTaskStateUpdater, *TaskRunner) {
ctx := driver.NewExecContext(allocDir)
rp := structs.NewRestartPolicy(structs.JobTypeService)
tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, rp)
tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, structs.JobTypeService, rp)
return upd, tr
}
@@ -155,25 +155,6 @@ func TestTaskRunner_Update(t *testing.T) {
})
}
func TestTaskRunner_RestartCounter(t *testing.T) {
rc := newErrorCounter(3, 1*time.Second)
rc.Increment()
rc.Increment()
rc.Increment()
rc.Increment()
rc.Increment()
actual := rc.shouldRestart()
if actual {
t.Fatalf("Expect %v, Actual: %v", false, actual)
}
time.Sleep(1 * time.Second)
actual = rc.shouldRestart()
if !actual {
t.Fatalf("Expect %v, Actual: %v", false, actual)
}
}
/*
TODO: This test is disabled til a follow-up api changes the restore state interface.
The driver/executor interface will be changed from Open to Cleanup, in which