diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 125cfc3c8..81d2185f2 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -163,12 +163,14 @@ func (r *AllocRunner) pre060StateFilePath() string { // RestoreState is used to restore the state of the alloc runner func (r *AllocRunner) RestoreState() error { + // COMPAT: Remove in 0.7.0 // Check if the old snapshot is there oldPath := r.pre060StateFilePath() var snap allocRunnerState + var upgrading bool if err := pre060RestoreState(oldPath, &snap); err == nil { // Restore fields - r.logger.Printf("[DEBUG] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID) + r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID) r.alloc = snap.Alloc r.allocDir = snap.AllocDir r.allocClientStatus = snap.AllocClientStatus @@ -178,6 +180,7 @@ func (r *AllocRunner) RestoreState() error { r.taskStates = snap.Alloc.TaskStates } + // COMPAT: Remove in 0.7.0 // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old // Context struct to new AllocDir struct if snap.AllocDir == nil && snap.Context != nil { @@ -190,6 +193,7 @@ func (r *AllocRunner) RestoreState() error { // Delete the old state os.RemoveAll(oldPath) + upgrading = true } else if !os.IsNotExist(err) { // Something corrupt in the old state file return err @@ -222,6 +226,7 @@ func (r *AllocRunner) RestoreState() error { r.allocClientStatus = mutable.AllocClientStatus r.allocClientDescription = mutable.AllocClientDescription r.taskStates = mutable.TaskStates + r.alloc.ClientStatus = getClientStatus(r.taskStates) return nil }) @@ -277,6 +282,12 @@ func (r *AllocRunner) RestoreState() error { } else if !r.alloc.TerminalStatus() { // Only start if the alloc isn't in a terminal status. go tr.Run() + + if upgrading { + if err := tr.SaveState(); err != nil { + r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.alloc.ID, name, err) + } + } } } @@ -437,10 +448,19 @@ func (r *AllocRunner) Alloc() *structs.Allocation { r.allocLock.Unlock() // Scan the task states to determine the status of the alloc - var pending, running, dead, failed bool r.taskStatusLock.RLock() alloc.TaskStates = copyTaskStates(r.taskStates) - for _, state := range r.taskStates { + alloc.ClientStatus = getClientStatus(r.taskStates) + r.taskStatusLock.RUnlock() + + return alloc +} + +// getClientStatus takes in the task states for a given allocation and computes +// the client status +func getClientStatus(taskStates map[string]*structs.TaskState) string { + var pending, running, dead, failed bool + for _, state := range taskStates { switch state.State { case structs.TaskStateRunning: running = true @@ -454,20 +474,19 @@ func (r *AllocRunner) Alloc() *structs.Allocation { } } } - r.taskStatusLock.RUnlock() // Determine the alloc status if failed { - alloc.ClientStatus = structs.AllocClientStatusFailed + return structs.AllocClientStatusFailed } else if running { - alloc.ClientStatus = structs.AllocClientStatusRunning + return structs.AllocClientStatusRunning } else if pending { - alloc.ClientStatus = structs.AllocClientStatusPending + return structs.AllocClientStatusPending } else if dead { - alloc.ClientStatus = structs.AllocClientStatusComplete + return structs.AllocClientStatusComplete } - return alloc + return "" } // dirtySyncState is used to watch for state being marked dirty to sync diff --git a/client/client.go b/client/client.go index a6a6ba2a4..59460bf11 100644 --- a/client/client.go +++ b/client/client.go @@ -447,7 +447,7 @@ func (c *Client) Shutdown() error { c.shutdown = true close(c.shutdownCh) c.connPool.Shutdown() - return c.saveState(true) + return c.saveState() } // RPC is used to forward an RPC call to a nomad server, or fail if no servers. @@ -663,6 +663,12 @@ func (c *Client) restoreState() error { mErr.Errors = append(mErr.Errors, err) } else { go ar.Run() + + if upgrading { + if err := ar.SaveState(); err != nil { + c.logger.Printf("[WARN] client: initial save state for alloc %s failed: %v", id, err) + } + } } } @@ -676,10 +682,8 @@ func (c *Client) restoreState() error { return mErr.ErrorOrNil() } -// saveState is used to snapshot our state into the data dir. If blocking is set -// to true, the function will only return once state has been saved. If false, -// the errors will be logged and state saving will be asyncronous -func (c *Client) saveState(blocking bool) error { +// saveState is used to snapshot our state into the data dir. +func (c *Client) saveState() error { if c.config.DevMode { return nil } @@ -703,12 +707,8 @@ func (c *Client) saveState(blocking bool) error { }(id, ar) } - if blocking { - wg.Wait() - return mErr.ErrorOrNil() - } - - return nil + wg.Wait() + return mErr.ErrorOrNil() } // getAllocRunners returns a snapshot of the current set of alloc runners. @@ -1062,7 +1062,7 @@ func (c *Client) periodicSnapshot() { select { case <-snapshot: snapshot = time.After(stateSnapshotIntv) - if err := c.saveState(false); err != nil { + if err := c.saveState(); err != nil { c.logger.Printf("[ERR] client: failed to save state: %v", err) } diff --git a/client/task_runner.go b/client/task_runner.go index 8f91d8f4c..5d1a9d23e 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -367,6 +367,7 @@ func (r *TaskRunner) RestoreState() error { // SaveState is used to snapshot our state func (r *TaskRunner) SaveState() error { r.persistLock.Lock() + defer r.persistLock.Unlock() snap := taskRunnerState{ Version: r.config.Version, ArtifactDownloaded: r.artifactsDownloaded, @@ -384,7 +385,6 @@ func (r *TaskRunner) SaveState() error { // If nothing has changed avoid the write h := snap.Hash() if bytes.Equal(h, r.persistedHash) { - r.persistLock.Unlock() return nil } @@ -393,7 +393,6 @@ func (r *TaskRunner) SaveState() error { if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(&snap); err != nil { return fmt.Errorf("failed to serialize snapshot: %v", err) } - r.persistLock.Unlock() // Start the transaction. return r.stateDB.Batch(func(tx *bolt.Tx) error { diff --git a/client/util.go b/client/util.go index 233a02595..32f765550 100644 --- a/client/util.go +++ b/client/util.go @@ -1,16 +1,12 @@ package client import ( - "bytes" "encoding/json" "fmt" "io/ioutil" "math/rand" - "os" - "path/filepath" "github.com/hashicorp/nomad/nomad/structs" - "github.com/ugorji/go/codec" ) type allocTuple struct { @@ -78,34 +74,6 @@ func shuffleStrings(list []string) { } } -// persistState is used to help with saving state -func persistState(path string, data interface{}) error { - var buf bytes.Buffer - enc := codec.NewEncoder(&buf, structs.JsonHandlePretty) - if err := enc.Encode(data); err != nil { - return err - } - - if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { - return fmt.Errorf("failed to make dirs for %s: %v", path, err) - } - tmpPath := path + ".tmp" - if err := ioutil.WriteFile(tmpPath, buf.Bytes(), 0600); err != nil { - return fmt.Errorf("failed to save state to tmp: %v", err) - } - if err := os.Rename(tmpPath, path); err != nil { - return fmt.Errorf("failed to rename tmp to path: %v", err) - } - - // Sanity check since users have reported empty state files on disk - if stat, err := os.Stat(path); err != nil { - return fmt.Errorf("unable to stat state file %s: %v", path, err) - } else if stat.Size() == 0 { - return fmt.Errorf("persisted invalid state file %s; see https://github.com/hashicorp/nomad/issues/1367", path) - } - return nil -} - // pre060RestoreState is used to read back in the persisted state for pre v0.6.0 // state func pre060RestoreState(path string, data interface{}) error {