client: first pass at save/restore of state

This commit is contained in:
Armon Dadgar
2015-08-29 18:16:49 -07:00
parent 17e8860a03
commit 3596d073ee
3 changed files with 200 additions and 20 deletions

View File

@@ -4,9 +4,13 @@ import (
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -25,6 +29,7 @@ type taskStatus struct {
// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
config *config.Config
client *Client
logger *log.Logger
@@ -32,8 +37,9 @@ type AllocRunner struct {
dirtyCh chan struct{}
ctx *driver.ExecContext
tasks map[string]*TaskRunner
ctx *driver.ExecContext
tasks map[string]*TaskRunner
taskLock sync.RWMutex
taskStatus map[string]taskStatus
taskStatusLock sync.RWMutex
@@ -45,9 +51,16 @@ type AllocRunner struct {
destroyLock sync.Mutex
}
// allocRunnerState is used to snapshot the state of the alloc runner
type allocRunnerState struct {
Alloc *structs.Allocation
TaskStatus map[string]taskStatus
}
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(client *Client, alloc *structs.Allocation) *AllocRunner {
func NewAllocRunner(config *config.Config, client *Client, alloc *structs.Allocation) *AllocRunner {
ctx := &AllocRunner{
config: config,
client: client,
logger: client.logger,
alloc: alloc,
@@ -60,6 +73,71 @@ func NewAllocRunner(client *Client, alloc *structs.Allocation) *AllocRunner {
return ctx
}
// stateFilePath returns the path to our state file
func (r *AllocRunner) stateFilePath() string {
return filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
}
// RestoreState is used to restore the state of the alloc runner
func (r *AllocRunner) RestoreState() error {
// Load the snapshot
var snap allocRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
return err
}
// Restore fields
r.alloc = snap.Alloc
r.taskStatus = snap.TaskStatus
// Restore the task runners
var mErr multierror.Error
for name := range r.taskStatus {
task := &structs.Task{Name: name}
tr := NewTaskRunner(r.config, r, r.ctx, task)
r.tasks[name] = tr
if err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err)
mErr.Errors = append(mErr.Errors, err)
} else {
go tr.Run()
}
}
return mErr.ErrorOrNil()
}
// SaveState is used to snapshot our state
func (r *AllocRunner) SaveState() error {
r.taskStatusLock.RLock()
snap := allocRunnerState{
Alloc: r.alloc,
TaskStatus: r.taskStatus,
}
err := persistState(r.stateFilePath(), &snap)
r.taskStatusLock.RUnlock()
if err != nil {
return err
}
// Save state for each task
r.taskLock.RLock()
defer r.taskLock.RUnlock()
var mErr multierror.Error
for name, tr := range r.tasks {
if err := tr.SaveState(); err != nil {
r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v",
r.alloc.ID, name, err)
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// DestroyState is used to cleanup after ourselves
func (r *AllocRunner) DestroyState() error {
return os.RemoveAll(filepath.Dir(r.stateFilePath()))
}
// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
return r.alloc
@@ -173,11 +251,13 @@ func (r *AllocRunner) Run() {
r.ctx = driver.NewExecContext()
// Start the task runners
r.taskLock.Lock()
for _, task := range tg.Tasks {
tr := NewTaskRunner(r, r.ctx, task)
tr := NewTaskRunner(r.config, r, r.ctx, task)
r.tasks[task.Name] = tr
go tr.Run()
}
r.taskLock.Unlock()
OUTER:
// Wait for updates
@@ -191,10 +271,12 @@ OUTER:
}
// Update the task groups
r.taskLock.RLock()
for _, task := range tg.Tasks {
tr := r.tasks[task.Name]
tr.Update(task)
}
r.taskLock.RUnlock()
case <-r.destroyCh:
break OUTER
@@ -202,6 +284,8 @@ OUTER:
}
// Destroy each sub-task
r.taskLock.RLock()
defer r.taskLock.RUnlock()
for _, tr := range r.tasks {
tr.Destroy()
}
@@ -210,6 +294,11 @@ OUTER:
for _, tr := range r.tasks {
<-tr.WaitCh()
}
// Check if we should destroy our state
if r.destroy {
r.DestroyState()
}
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
}

View File

@@ -2,13 +2,16 @@ package client
import (
"fmt"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/fingerprint"
@@ -128,7 +131,7 @@ func (c *Client) Shutdown() error {
c.shutdown = true
close(c.shutdownCh)
c.connPool.Shutdown()
return nil
return c.saveState()
}
// RPC is used to forward an RPC call to a nomad server, or fail if no servers
@@ -222,8 +225,28 @@ func (c *Client) restoreState() error {
return nil
}
// TODO
return nil
// Scan the directory
list, err := ioutil.ReadDir(filepath.Join(c.config.StateDir, "alloc"))
if err != nil {
return fmt.Errorf("failed to list alloc state: %v", err)
}
// Load each alloc back
var mErr multierror.Error
for _, entry := range list {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
ar := NewAllocRunner(c.config, c, alloc)
c.allocs[id] = ar
if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v",
id, err)
mErr.Errors = append(mErr.Errors, err)
} else {
go ar.Run()
}
}
return mErr.ErrorOrNil()
}
// saveState is used to snapshot our state into the data dir
@@ -232,8 +255,17 @@ func (c *Client) saveState() error {
return nil
}
// TODO
return nil
var mErr multierror.Error
c.allocLock.RLock()
defer c.allocLock.RUnlock()
for id, ar := range c.allocs {
if err := ar.SaveState(); err != nil {
c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v",
id, err)
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// setupNode is used to setup the initial node
@@ -474,8 +506,8 @@ func (c *Client) runAllocs(updated []*structs.Allocation) {
// Get the existing allocs
c.allocLock.RLock()
exist := make([]*structs.Allocation, 0, len(c.allocs))
for _, ctx := range c.allocs {
exist = append(exist, ctx.Alloc())
for _, ar := range c.allocs {
exist = append(exist, ar.Alloc())
}
c.allocLock.RUnlock()
@@ -517,12 +549,12 @@ func (c *Client) runAllocs(updated []*structs.Allocation) {
func (c *Client) removeAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ctx, ok := c.allocs[alloc.ID]
ar, ok := c.allocs[alloc.ID]
if !ok {
c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID)
return nil
}
ctx.Destroy()
ar.Destroy()
delete(c.allocs, alloc.ID)
return nil
}
@@ -531,12 +563,12 @@ func (c *Client) removeAlloc(alloc *structs.Allocation) error {
func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
ctx, ok := c.allocs[exist.ID]
ar, ok := c.allocs[exist.ID]
if !ok {
c.logger.Printf("[WARN] client: missing context for alloc '%s'", exist.ID)
return nil
}
ctx.Update(update)
ar.Update(update)
return nil
}
@@ -544,8 +576,8 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ctx := NewAllocRunner(c, alloc)
c.allocs[alloc.ID] = ctx
go ctx.Run()
ar := NewAllocRunner(c.config, c, alloc)
c.allocs[alloc.ID] = ar
go ar.Run()
return nil
}

View File

@@ -1,16 +1,22 @@
package client
import (
"crypto/md5"
"encoding/hex"
"fmt"
"log"
"os"
"path/filepath"
"sync"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
)
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
type TaskRunner struct {
config *config.Config
allocRunner *AllocRunner
logger *log.Logger
ctx *driver.ExecContext
@@ -27,9 +33,15 @@ type TaskRunner struct {
waitCh chan struct{}
}
// taskRunnerState is used to snapshot the state of the task runner
type taskRunnerState struct {
Task *structs.Task
}
// NewTaskRunner is used to create a new task context
func NewTaskRunner(allocRunner *AllocRunner, ctx *driver.ExecContext, task *structs.Task) *TaskRunner {
func NewTaskRunner(config *config.Config, allocRunner *AllocRunner, ctx *driver.ExecContext, task *structs.Task) *TaskRunner {
tc := &TaskRunner{
config: config,
allocRunner: allocRunner,
logger: allocRunner.logger,
ctx: ctx,
@@ -47,6 +59,47 @@ func (r *TaskRunner) WaitCh() <-chan struct{} {
return r.waitCh
}
// stateFilePath returns the path to our state file
func (r *TaskRunner) stateFilePath() string {
// Get the MD5 of the task name
hashVal := md5.Sum([]byte(r.task.Name))
hashHex := hex.EncodeToString(hashVal[:])
dirName := fmt.Sprintf("task-%s", hashHex)
// Generate the path
path := filepath.Join(r.config.StateDir, "alloc", r.allocID,
dirName, "state.json")
return path
}
// RestoreState is used to restore our state
func (r *TaskRunner) RestoreState() error {
// Load the snapshot
var snap taskRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
return err
}
// Restore fields
r.task = snap.Task
// TODO: Restore the driver
return nil
}
// SaveState is used to snapshot our state
func (r *TaskRunner) SaveState() error {
snap := taskRunnerState{
Task: r.task,
}
return persistState(r.stateFilePath(), &snap)
}
// DestroyState is used to cleanup after ourselves
func (r *TaskRunner) DestroyState() error {
return os.RemoveAll(r.stateFilePath())
}
// setStatus is used to update the status of the task runner
func (r *TaskRunner) setStatus(status, desc string) {
r.allocRunner.setTaskStatus(r.task.Name, status, desc)
@@ -79,6 +132,7 @@ func (r *TaskRunner) Run() {
}
r.setStatus(structs.AllocClientStatusRunning, "task started")
OUTER:
// Wait for updates
for {
select {
@@ -94,7 +148,7 @@ func (r *TaskRunner) Run() {
r.setStatus(structs.AllocClientStatusDead,
"task completed")
}
return
break OUTER
case update := <-r.updateCh:
// Update
@@ -112,6 +166,11 @@ func (r *TaskRunner) Run() {
}
}
}
// Check if we should destroy our state
if r.destroy {
r.DestroyState()
}
}
// Update is used to update the task of the context