From 7e60f0ba778474160ec2de45aed0f5d2a6c966f1 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 12 Jul 2018 17:56:52 -0700 Subject: [PATCH] client: implement all-or-nothing alloc restoration Restoring calls NewAR -> Restore -> Run NewAR now calls NewTR AR.Restore calls TR.Restore AR.Run calls TR.Run --- client/allocrunnerv2/alloc_runner.go | 122 ++++++++++-------- .../allocrunnerv2/taskrunner/state/state.go | 7 + .../allocrunnerv2/taskrunner/task_runner.go | 28 +++- .../taskrunner/task_runner_hooks.go | 3 +- client/client.go | 121 ++++++++--------- client/state/state_database.go | 55 +++++++- 6 files changed, 214 insertions(+), 122 deletions(-) diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go index f7d2aeabf..e1c554c10 100644 --- a/client/allocrunnerv2/alloc_runner.go +++ b/client/allocrunnerv2/alloc_runner.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunnerv2/state" "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner" "github.com/hashicorp/nomad/client/config" + clientstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" @@ -57,12 +58,18 @@ type allocRunner struct { } // NewAllocRunner returns a new allocation runner. -func NewAllocRunner(config *Config) *allocRunner { +func NewAllocRunner(config *Config) (*allocRunner, error) { + alloc := config.Alloc + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg == nil { + return nil, fmt.Errorf("failed to lookup task group %q", alloc.TaskGroup) + } + ar := &allocRunner{ + alloc: alloc, clientConfig: config.ClientConfig, vaultClient: config.Vault, - alloc: config.Alloc, - tasks: make(map[string]*taskrunner.TaskRunner), + tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), waitCh: make(chan struct{}), updateCh: make(chan *structs.Allocation), stateDB: config.StateDB, @@ -70,15 +77,44 @@ func NewAllocRunner(config *Config) *allocRunner { // Create alloc dir //XXX update AllocDir to hc log - ar.allocDir = allocdir.NewAllocDir(nil, filepath.Join(config.ClientConfig.AllocDir, config.Alloc.ID)) + ar.allocDir = allocdir.NewAllocDir(nil, filepath.Join(config.ClientConfig.AllocDir, alloc.ID)) // Create the logger based on the allocation ID - ar.logger = config.Logger.With("alloc_id", config.Alloc.ID) + ar.logger = config.Logger.With("alloc_id", alloc.ID) // Initialize the runners hooks. ar.initRunnerHooks() - return ar + // Create the TaskRunners + if err := ar.initTaskRunners(tg.Tasks); err != nil { + return nil, err + } + + return ar, nil +} + +// initTaskRunners creates task runners but does *not* run them. +func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { + for _, task := range tasks { + config := &taskrunner.Config{ + Alloc: ar.alloc, + ClientConfig: ar.clientConfig, + Task: task, + TaskDir: ar.allocDir.NewTaskDir(task.Name), + Logger: ar.logger, + StateDB: ar.stateDB, + VaultClient: ar.vaultClient, + } + + // Create, but do not Run, the task runner + tr, err := taskrunner.NewTaskRunner(config) + if err != nil { + return fmt.Errorf("failed creating runner for task %q: %v", task.Name, err) + } + + ar.tasks[task.Name] = tr + } + return nil } func (ar *allocRunner) WaitCh() <-chan struct{} { @@ -91,7 +127,6 @@ func (ar *allocRunner) Run() { // Close the wait channel defer close(ar.waitCh) - var err error var taskWaitCh <-chan struct{} // Run the prestart hooks @@ -102,10 +137,7 @@ func (ar *allocRunner) Run() { } // Run the runners - taskWaitCh, err = ar.runImpl() - if err != nil { - ar.logger.Error("starting tasks failed", "error", err) - } + taskWaitCh = ar.runImpl() for { select { @@ -130,20 +162,9 @@ POST: } // runImpl is used to run the runners. -func (ar *allocRunner) runImpl() (<-chan struct{}, error) { - // Grab the task group - alloc := ar.Alloc() - tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if tg == nil { - // XXX Fail and exit - ar.logger.Error("failed to lookup task group", "task_group", alloc.TaskGroup) - return nil, fmt.Errorf("failed to lookup task group %q", alloc.TaskGroup) - } - - for _, task := range tg.Tasks { - if err := ar.runTask(alloc, task); err != nil { - return nil, err - } +func (ar *allocRunner) runImpl() <-chan struct{} { + for _, task := range ar.tasks { + go task.Run() } // Return a combined WaitCh that is closed when all task runners have @@ -156,32 +177,7 @@ func (ar *allocRunner) runImpl() (<-chan struct{}, error) { } }() - return waitCh, nil -} - -// runTask is used to run a task. -func (ar *allocRunner) runTask(alloc *structs.Allocation, task *structs.Task) error { - // Create the runner - config := &taskrunner.Config{ - Alloc: alloc, - ClientConfig: ar.clientConfig, - Task: task, - TaskDir: ar.allocDir.NewTaskDir(task.Name), - Logger: ar.logger, - StateDB: ar.stateDB, - VaultClient: ar.vaultClient, - } - tr, err := taskrunner.NewTaskRunner(config) - if err != nil { - return err - } - - // Start the runner - go tr.Run() - - // Store the runner - ar.tasks[task.Name] = tr - return nil + return waitCh } // Alloc returns the current allocation being run by this runner. @@ -193,9 +189,29 @@ func (ar *allocRunner) Alloc() *structs.Allocation { } // SaveState does all the state related stuff. Who knows. FIXME -//XXX +//XXX do we need to do periodic syncing? if Saving is only called *before* Run +// *and* within Run -- *and* Updates are applid within Run -- we may be able to +// skip quite a bit of locking? maybe? func (ar *allocRunner) SaveState() error { - return nil + return ar.stateDB.Update(func(tx *bolt.Tx) error { + //XXX Track EvalID to only write alloc on change? + // Write the allocation + return clientstate.PutAllocation(tx, ar.Alloc()) + }) +} + +// Restore state from database. Must be called after NewAllocRunner but before +// Run. +func (ar *allocRunner) Restore() error { + return ar.stateDB.View(func(tx *bolt.Tx) error { + // Restore task runners + for _, tr := range ar.tasks { + if err := tr.Restore(tx); err != nil { + return err + } + } + return nil + }) } // Update the running allocation with a new version received from the server. diff --git a/client/allocrunnerv2/taskrunner/state/state.go b/client/allocrunnerv2/taskrunner/state/state.go index 287fa5a8b..9e3b4d245 100644 --- a/client/allocrunnerv2/taskrunner/state/state.go +++ b/client/allocrunnerv2/taskrunner/state/state.go @@ -5,6 +5,13 @@ import ( "github.com/hashicorp/nomad/helper" ) +var ( + // taskRunnerStateAllKey holds all the task runners state. At the moment + // there is no need to split it + //XXX refactor out of client/state and taskrunner + taskRunnerStateAllKey = []byte("simple-all") +) + // LocalState is Task state which is persisted for use when restarting Nomad // agents. type LocalState struct { diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index e5fd74838..4160eea74 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -18,7 +18,7 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/driver/env" - oldstate "github.com/hashicorp/nomad/client/state" + clientstate "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" "github.com/ugorji/go/codec" @@ -42,6 +42,7 @@ const ( var ( // taskRunnerStateAllKey holds all the task runners state. At the moment // there is no need to split it + //XXX refactor out of clientstate and new state taskRunnerStateAllKey = []byte("simple-all") ) @@ -446,16 +447,15 @@ func (tr *TaskRunner) persistLocalState() error { return nil } - // Start the transaction. - return tr.stateDB.Batch(func(tx *bolt.Tx) error { + return tr.stateDB.Update(func(tx *bolt.Tx) error { // Grab the task bucket //XXX move into new state pkg - taskBkt, err := oldstate.GetTaskBucket(tx, tr.allocID, tr.taskName) + taskBkt, err := clientstate.GetTaskBucket(tx, tr.allocID, tr.taskName) if err != nil { return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } - if err := oldstate.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil { + if err := clientstate.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil { return fmt.Errorf("failed to write task_runner state: %v", err) } @@ -468,6 +468,24 @@ func (tr *TaskRunner) persistLocalState() error { }) } +// Restore task runner state. Called by AllocRunner.Restore after NewTaskRunner +// but before Run. +func (tr *TaskRunner) Restore(tx *bolt.Tx) error { + bkt, err := clientstate.GetTaskBucket(tx, tr.allocID, tr.taskName) + if err != nil { + return fmt.Errorf("failed to get task %q bucket: %v", tr.taskName, err) + } + + //XXX set persisted hash to avoid immediate write on first use? + var ls state.LocalState + if err := clientstate.GetObject(bkt, taskRunnerStateAllKey, &ls); err != nil { + return fmt.Errorf("failed to read task runner state: %v", err) + } + tr.localState = &ls + + return nil +} + // SetState sets the task runners allocation state. func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) { // Ensure the event is populated with human readable strings diff --git a/client/allocrunnerv2/taskrunner/task_runner_hooks.go b/client/allocrunnerv2/taskrunner/task_runner_hooks.go index 9323e683e..9affd2231 100644 --- a/client/allocrunnerv2/taskrunner/task_runner_hooks.go +++ b/client/allocrunnerv2/taskrunner/task_runner_hooks.go @@ -124,8 +124,9 @@ func (tr *TaskRunner) prestart() error { } tr.localStateLock.Unlock() - // Persist local state if the hook state has changed + // Store and persist local state if the hook state has changed if !hookState.Equal(origHookState) { + tr.localState.Hooks[name] = hookState if err := tr.persistLocalState(); err != nil { return err } diff --git a/client/client.go b/client/client.go index be4c1dee7..f08a5b425 100644 --- a/client/client.go +++ b/client/client.go @@ -32,7 +32,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunnerv2" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/servers" - "github.com/hashicorp/nomad/client/state" + clientstate "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" @@ -110,6 +110,8 @@ type AllocRunner interface { SaveState() error Update(*structs.Allocation) Alloc() *structs.Allocation + Restore() error + Run() } // Client is used to implement the client interaction with Nomad. Clients @@ -725,6 +727,7 @@ func (c *Client) restoreState() error { return nil } + //XXX REMOVED! make a note in backward compat / upgrading doc // COMPAT: Remove in 0.7.0 // 0.6.0 transitioned from individual state files to a single bolt-db. // The upgrade path is to: @@ -732,74 +735,72 @@ func (c *Client) restoreState() error { // If so, restore from that and delete old state // Restore using state database - // Allocs holds the IDs of the allocations being restored - var allocs []string - - // Upgrading tracks whether this is a pre 0.6.0 upgrade path - var upgrading bool - - // Scan the directory - allocDir := filepath.Join(c.config.StateDir, "alloc") - list, err := ioutil.ReadDir(allocDir) - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to list alloc state: %v", err) - } else if err == nil && len(list) != 0 { - upgrading = true - for _, entry := range list { - allocs = append(allocs, entry.Name()) - } - } else { - // Normal path - err := c.stateDB.View(func(tx *bolt.Tx) error { - allocs, err = state.GetAllAllocationIDs(tx) - if err != nil { - return fmt.Errorf("failed to list allocations: %v", err) - } - return nil - }) + // Restore allocations + var allocs []*structs.Allocation + var err error + err = c.stateDB.View(func(tx *bolt.Tx) error { + allocs, err = clientstate.GetAllAllocations(tx) if err != nil { - return err + return fmt.Errorf("failed to list allocations: %v", err) } + return nil + }) + if err != nil { + return err } // Load each alloc back var mErr multierror.Error - for _, id := range allocs { - alloc := &structs.Allocation{ID: id} - - // don't worry about blocking/migrating when restoring - watcher := allocrunner.NoopPrevAlloc{} + for _, alloc := range allocs { + //XXX FIXME create a root logger + logger := hclog.New(&hclog.LoggerOptions{ + Name: "nomad", + Level: hclog.LevelFromString(c.configCopy.LogLevel), + TimeFormat: time.RFC3339, + }) c.configLock.RLock() - ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher) + arConf := &allocrunnerv2.Config{ + Alloc: alloc, + Logger: logger, + ClientConfig: c.config, + StateDB: c.stateDB, + } c.configLock.RUnlock() + ar, err := allocrunnerv2.NewAllocRunner(arConf) + if err != nil { + c.logger.Printf("[ERR] client: failed to create alloc %q: %v", alloc.ID, err) + mErr.Errors = append(mErr.Errors, err) + continue + } + + // Restore state + if err := ar.Restore(); err != nil { + c.logger.Printf("[ERR] client: failed to restore alloc %q: %v", alloc.ID, err) + mErr.Errors = append(mErr.Errors, err) + continue + } + + //XXX is this locking necessary? c.allocLock.Lock() - c.allocs[id] = ar + c.allocs[alloc.ID] = ar c.allocLock.Unlock() - - if err := ar.RestoreState(); err != nil { - c.logger.Printf("[ERR] client: failed to restore state for alloc %q: %v", id, err) - mErr.Errors = append(mErr.Errors, err) - } else { - go ar.Run() - - if upgrading { - if err := ar.SaveState(); err != nil { - c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", id, err) - } - } - } } - // Delete all the entries - if upgrading { - if err := os.RemoveAll(allocDir); err != nil { - mErr.Errors = append(mErr.Errors, err) - } + // Don't run any allocs if there were any failures + //XXX removing this check would switch from all-or-nothing restores to + // best-effort. went with all-or-nothing for now + if err := mErr.ErrorOrNil(); err != nil { + return err } - return mErr.ErrorOrNil() + // All allocs restored successfully, run them! + for _, ar := range c.allocs { + go ar.Run() + } + + return nil } // saveState is used to snapshot our state into the data dir. @@ -1918,10 +1919,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error // The long term fix is to pass in the config and node separately and then // we don't have to do a copy. //ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc) - //XXX FIXME + //XXX FIXME create a root logger logger := hclog.New(&hclog.LoggerOptions{ - Name: "nomad", - Level: hclog.LevelFromString(c.configCopy.LogLevel), + Name: "nomad", + Level: hclog.LevelFromString(c.configCopy.LogLevel), + TimeFormat: time.RFC3339, }) c.configLock.RLock() @@ -1934,12 +1936,15 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error } c.configLock.RUnlock() - ar := allocrunnerv2.NewAllocRunner(arConf) + ar, err := allocrunnerv2.NewAllocRunner(arConf) + if err != nil { + return err + } // Store the alloc runner. c.allocs[alloc.ID] = ar - //XXX(schmichael) Why do we do this? + // Initialize local state if err := ar.SaveState(); err != nil { c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err) } diff --git a/client/state/state_database.go b/client/state/state_database.go index 4c3d9c2ed..0a170b9a3 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -184,20 +184,65 @@ func DeleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error { return alloc.DeleteBucket(key) } -func GetAllAllocationIDs(tx *bolt.Tx) ([]string, error) { +//XXX duplicated in arv2?! +var ( + // The following are the key paths written to the state database + allocRunnerStateAllocKey = []byte("alloc") +) + +type allocRunnerAllocState struct { + Alloc *structs.Allocation +} + +func GetAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, error) { allocationsBkt := tx.Bucket(allocationsBucket) if allocationsBkt == nil { + // No allocs return nil, nil } + var allocs []*structs.Allocation + // Create a cursor for iteration. - var allocIDs []string c := allocationsBkt.Cursor() - // Iterate over all the buckets + // Iterate over all the allocation buckets for k, _ := c.First(); k != nil; k, _ = c.Next() { - allocIDs = append(allocIDs, string(k)) + allocID := string(k) + allocBkt := allocationsBkt.Bucket(k) + if allocBkt == nil { + //XXX merr? + return nil, fmt.Errorf("alloc %q missing", allocID) + } + + var allocState allocRunnerAllocState + if err := GetObject(allocBkt, allocRunnerStateAllocKey, &allocState); err != nil { + //XXX merr? + return nil, fmt.Errorf("failed to restore alloc %q: %v", allocID, err) + } + allocs = append(allocs, allocState.Alloc) } - return allocIDs, nil + return allocs, nil +} + +// PutAllocation stores an allocation given a writable transaction. +func PutAllocation(tx *bolt.Tx, alloc *structs.Allocation) error { + // Retrieve the root allocations bucket + allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucket) + if err != nil { + return err + } + + // Retrieve the specific allocations bucket + key := []byte(alloc.ID) + allocBkt, err := allocsBkt.CreateBucketIfNotExists(key) + if err != nil { + return err + } + + allocState := allocRunnerAllocState{ + Alloc: alloc, + } + return PutObject(allocBkt, allocRunnerStateAllocKey, &allocState) }