From 3596d073eeb62fb6f9adf3af5a1427aef1bb7232 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 29 Aug 2015 18:16:49 -0700 Subject: [PATCH] client: first pass at save/restore of state --- client/alloc_runner.go | 97 ++++++++++++++++++++++++++++++++++++++++-- client/client.go | 60 ++++++++++++++++++++------ client/task_runner.go | 63 ++++++++++++++++++++++++++- 3 files changed, 200 insertions(+), 20 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 468c4216f..47057b4c0 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -4,9 +4,13 @@ import ( "encoding/json" "fmt" "log" + "os" + "path/filepath" "sync" "time" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" ) @@ -25,6 +29,7 @@ type taskStatus struct { // AllocRunner is used to wrap an allocation and provide the execution context. type AllocRunner struct { + config *config.Config client *Client logger *log.Logger @@ -32,8 +37,9 @@ type AllocRunner struct { dirtyCh chan struct{} - ctx *driver.ExecContext - tasks map[string]*TaskRunner + ctx *driver.ExecContext + tasks map[string]*TaskRunner + taskLock sync.RWMutex taskStatus map[string]taskStatus taskStatusLock sync.RWMutex @@ -45,9 +51,16 @@ type AllocRunner struct { destroyLock sync.Mutex } +// allocRunnerState is used to snapshot the state of the alloc runner +type allocRunnerState struct { + Alloc *structs.Allocation + TaskStatus map[string]taskStatus +} + // NewAllocRunner is used to create a new allocation context -func NewAllocRunner(client *Client, alloc *structs.Allocation) *AllocRunner { +func NewAllocRunner(config *config.Config, client *Client, alloc *structs.Allocation) *AllocRunner { ctx := &AllocRunner{ + config: config, client: client, logger: client.logger, alloc: alloc, @@ -60,6 +73,71 @@ func NewAllocRunner(client *Client, alloc *structs.Allocation) *AllocRunner { return ctx } +// stateFilePath returns the path to our state file +func (r *AllocRunner) stateFilePath() string { + return filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json") +} + +// RestoreState is used to restore the state of the alloc runner +func (r *AllocRunner) RestoreState() error { + // Load the snapshot + var snap allocRunnerState + if err := restoreState(r.stateFilePath(), &snap); err != nil { + return err + } + + // Restore fields + r.alloc = snap.Alloc + r.taskStatus = snap.TaskStatus + + // Restore the task runners + var mErr multierror.Error + for name := range r.taskStatus { + task := &structs.Task{Name: name} + tr := NewTaskRunner(r.config, r, r.ctx, task) + r.tasks[name] = tr + if err := tr.RestoreState(); err != nil { + r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err) + mErr.Errors = append(mErr.Errors, err) + } else { + go tr.Run() + } + } + return mErr.ErrorOrNil() +} + +// SaveState is used to snapshot our state +func (r *AllocRunner) SaveState() error { + r.taskStatusLock.RLock() + snap := allocRunnerState{ + Alloc: r.alloc, + TaskStatus: r.taskStatus, + } + err := persistState(r.stateFilePath(), &snap) + r.taskStatusLock.RUnlock() + if err != nil { + return err + } + + // Save state for each task + r.taskLock.RLock() + defer r.taskLock.RUnlock() + var mErr multierror.Error + for name, tr := range r.tasks { + if err := tr.SaveState(); err != nil { + r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v", + r.alloc.ID, name, err) + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() +} + +// DestroyState is used to cleanup after ourselves +func (r *AllocRunner) DestroyState() error { + return os.RemoveAll(filepath.Dir(r.stateFilePath())) +} + // Alloc returns the associated allocation func (r *AllocRunner) Alloc() *structs.Allocation { return r.alloc @@ -173,11 +251,13 @@ func (r *AllocRunner) Run() { r.ctx = driver.NewExecContext() // Start the task runners + r.taskLock.Lock() for _, task := range tg.Tasks { - tr := NewTaskRunner(r, r.ctx, task) + tr := NewTaskRunner(r.config, r, r.ctx, task) r.tasks[task.Name] = tr go tr.Run() } + r.taskLock.Unlock() OUTER: // Wait for updates @@ -191,10 +271,12 @@ OUTER: } // Update the task groups + r.taskLock.RLock() for _, task := range tg.Tasks { tr := r.tasks[task.Name] tr.Update(task) } + r.taskLock.RUnlock() case <-r.destroyCh: break OUTER @@ -202,6 +284,8 @@ OUTER: } // Destroy each sub-task + r.taskLock.RLock() + defer r.taskLock.RUnlock() for _, tr := range r.tasks { tr.Destroy() } @@ -210,6 +294,11 @@ OUTER: for _, tr := range r.tasks { <-tr.WaitCh() } + + // Check if we should destroy our state + if r.destroy { + r.DestroyState() + } r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) } diff --git a/client/client.go b/client/client.go index 3ac5a7f5c..ce9f93fb5 100644 --- a/client/client.go +++ b/client/client.go @@ -2,13 +2,16 @@ package client import ( "fmt" + "io/ioutil" "log" "net" "os" + "path/filepath" "strconv" "sync" "time" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/fingerprint" @@ -128,7 +131,7 @@ func (c *Client) Shutdown() error { c.shutdown = true close(c.shutdownCh) c.connPool.Shutdown() - return nil + return c.saveState() } // RPC is used to forward an RPC call to a nomad server, or fail if no servers @@ -222,8 +225,28 @@ func (c *Client) restoreState() error { return nil } - // TODO - return nil + // Scan the directory + list, err := ioutil.ReadDir(filepath.Join(c.config.StateDir, "alloc")) + if err != nil { + return fmt.Errorf("failed to list alloc state: %v", err) + } + + // Load each alloc back + var mErr multierror.Error + for _, entry := range list { + id := entry.Name() + alloc := &structs.Allocation{ID: id} + ar := NewAllocRunner(c.config, c, alloc) + c.allocs[id] = ar + if err := ar.RestoreState(); err != nil { + c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", + id, err) + mErr.Errors = append(mErr.Errors, err) + } else { + go ar.Run() + } + } + return mErr.ErrorOrNil() } // saveState is used to snapshot our state into the data dir @@ -232,8 +255,17 @@ func (c *Client) saveState() error { return nil } - // TODO - return nil + var mErr multierror.Error + c.allocLock.RLock() + defer c.allocLock.RUnlock() + for id, ar := range c.allocs { + if err := ar.SaveState(); err != nil { + c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", + id, err) + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } // setupNode is used to setup the initial node @@ -474,8 +506,8 @@ func (c *Client) runAllocs(updated []*structs.Allocation) { // Get the existing allocs c.allocLock.RLock() exist := make([]*structs.Allocation, 0, len(c.allocs)) - for _, ctx := range c.allocs { - exist = append(exist, ctx.Alloc()) + for _, ar := range c.allocs { + exist = append(exist, ar.Alloc()) } c.allocLock.RUnlock() @@ -517,12 +549,12 @@ func (c *Client) runAllocs(updated []*structs.Allocation) { func (c *Client) removeAlloc(alloc *structs.Allocation) error { c.allocLock.Lock() defer c.allocLock.Unlock() - ctx, ok := c.allocs[alloc.ID] + ar, ok := c.allocs[alloc.ID] if !ok { c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID) return nil } - ctx.Destroy() + ar.Destroy() delete(c.allocs, alloc.ID) return nil } @@ -531,12 +563,12 @@ func (c *Client) removeAlloc(alloc *structs.Allocation) error { func (c *Client) updateAlloc(exist, update *structs.Allocation) error { c.allocLock.RLock() defer c.allocLock.RUnlock() - ctx, ok := c.allocs[exist.ID] + ar, ok := c.allocs[exist.ID] if !ok { c.logger.Printf("[WARN] client: missing context for alloc '%s'", exist.ID) return nil } - ctx.Update(update) + ar.Update(update) return nil } @@ -544,8 +576,8 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { func (c *Client) addAlloc(alloc *structs.Allocation) error { c.allocLock.Lock() defer c.allocLock.Unlock() - ctx := NewAllocRunner(c, alloc) - c.allocs[alloc.ID] = ctx - go ctx.Run() + ar := NewAllocRunner(c.config, c, alloc) + c.allocs[alloc.ID] = ar + go ar.Run() return nil } diff --git a/client/task_runner.go b/client/task_runner.go index 7f243d268..bbe4bd6e5 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1,16 +1,22 @@ package client import ( + "crypto/md5" + "encoding/hex" "fmt" "log" + "os" + "path/filepath" "sync" + "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" ) // TaskRunner is used to wrap a task within an allocation and provide the execution context. type TaskRunner struct { + config *config.Config allocRunner *AllocRunner logger *log.Logger ctx *driver.ExecContext @@ -27,9 +33,15 @@ type TaskRunner struct { waitCh chan struct{} } +// taskRunnerState is used to snapshot the state of the task runner +type taskRunnerState struct { + Task *structs.Task +} + // NewTaskRunner is used to create a new task context -func NewTaskRunner(allocRunner *AllocRunner, ctx *driver.ExecContext, task *structs.Task) *TaskRunner { +func NewTaskRunner(config *config.Config, allocRunner *AllocRunner, ctx *driver.ExecContext, task *structs.Task) *TaskRunner { tc := &TaskRunner{ + config: config, allocRunner: allocRunner, logger: allocRunner.logger, ctx: ctx, @@ -47,6 +59,47 @@ func (r *TaskRunner) WaitCh() <-chan struct{} { return r.waitCh } +// stateFilePath returns the path to our state file +func (r *TaskRunner) stateFilePath() string { + // Get the MD5 of the task name + hashVal := md5.Sum([]byte(r.task.Name)) + hashHex := hex.EncodeToString(hashVal[:]) + dirName := fmt.Sprintf("task-%s", hashHex) + + // Generate the path + path := filepath.Join(r.config.StateDir, "alloc", r.allocID, + dirName, "state.json") + return path +} + +// RestoreState is used to restore our state +func (r *TaskRunner) RestoreState() error { + // Load the snapshot + var snap taskRunnerState + if err := restoreState(r.stateFilePath(), &snap); err != nil { + return err + } + + // Restore fields + r.task = snap.Task + + // TODO: Restore the driver + return nil +} + +// SaveState is used to snapshot our state +func (r *TaskRunner) SaveState() error { + snap := taskRunnerState{ + Task: r.task, + } + return persistState(r.stateFilePath(), &snap) +} + +// DestroyState is used to cleanup after ourselves +func (r *TaskRunner) DestroyState() error { + return os.RemoveAll(r.stateFilePath()) +} + // setStatus is used to update the status of the task runner func (r *TaskRunner) setStatus(status, desc string) { r.allocRunner.setTaskStatus(r.task.Name, status, desc) @@ -79,6 +132,7 @@ func (r *TaskRunner) Run() { } r.setStatus(structs.AllocClientStatusRunning, "task started") +OUTER: // Wait for updates for { select { @@ -94,7 +148,7 @@ func (r *TaskRunner) Run() { r.setStatus(structs.AllocClientStatusDead, "task completed") } - return + break OUTER case update := <-r.updateCh: // Update @@ -112,6 +166,11 @@ func (r *TaskRunner) Run() { } } } + + // Check if we should destroy our state + if r.destroy { + r.DestroyState() + } } // Update is used to update the task of the context