From b97bbd9d30c40ab8b694581d58cd18367565473e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 17 Aug 2018 10:34:44 -0700 Subject: [PATCH] persist alloc state on changes, not periodically Allow alloc and task runners to persist their own state when something changes instead of periodically syncing all state. --- client/allocrunnerv2/alloc_runner.go | 42 ++++++---------------------- client/client.go | 42 +++++++++++----------------- client/interfaces/client.go | 2 +- 3 files changed, 27 insertions(+), 59 deletions(-) diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go index 68e1f90ee..b9cce6f12 100644 --- a/client/allocrunnerv2/alloc_runner.go +++ b/client/allocrunnerv2/alloc_runner.go @@ -177,16 +177,6 @@ MAIN: case <-taskWaitCh: // TaskRunners have all exited break MAIN - case updated := <-ar.updateCh: - // Update ar.alloc - ar.setAlloc(updated) - - //TODO Run AR Update hooks - - // Update task runners - for _, tr := range ar.tasks { - tr.Update(updated) - } } } @@ -230,18 +220,6 @@ func (ar *allocRunner) setAlloc(updated *structs.Allocation) { ar.allocLock.Unlock() } -// SaveState does all the state related stuff. Who knows. FIXME -//XXX do we need to do periodic syncing? if Saving is only called *before* Run -// *and* within Run -- *and* Updates are applid within Run -- we may be able to -// skip quite a bit of locking? maybe? -func (ar *allocRunner) SaveState() error { - //XXX Do we move this to the client - //XXX Track AllocModifyIndex to only write alloc on change? - - // Write the allocation - return ar.stateDB.PutAllocation(ar.Alloc()) -} - // Restore state from database. Must be called after NewAllocRunner but before // Run. func (ar *allocRunner) Restore() error { @@ -308,9 +286,7 @@ func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskStat calloc := ar.clientAlloc(states) // Update the server - if err := ar.stateUpdater.AllocStateUpdated(calloc); err != nil { - ar.logger.Error("failed to update remote allocation state", "error", err) - } + ar.stateUpdater.AllocStateUpdated(calloc) } // clientAlloc takes in the task states and returns an Allocation populated @@ -408,14 +384,14 @@ func getClientStatus(taskStates map[string]*structs.TaskState) (status, descript // If there is already a pending update it will be discarded and replaced by // the latest update. func (ar *allocRunner) Update(update *structs.Allocation) { - select { - case ar.updateCh <- update: - // Updated alloc sent - case <-ar.updateCh: - // There was a pending update; replace it with the new update. - // This also prevents Update() from blocking if its called - // concurrently with Run() exiting. - ar.updateCh <- update + // Update ar.alloc + ar.setAlloc(update) + + //TODO Run AR Update hooks + + // Update task runners + for _, tr := range ar.tasks { + tr.Update(update) } } diff --git a/client/client.go b/client/client.go index 13b3ec415..7605f22e3 100644 --- a/client/client.go +++ b/client/client.go @@ -107,7 +107,6 @@ type AllocRunner interface { IsWaiting() bool IsMigrating() bool WaitCh() <-chan struct{} - SaveState() error Update(*structs.Allocation) Alloc() *structs.Allocation Restore() error @@ -547,7 +546,7 @@ func (c *Client) Shutdown() error { c.shutdown = true close(c.shutdownCh) c.connPool.Shutdown() - return c.saveState() + return nil } // Stats is used to return statistics for debugging and insight @@ -811,10 +810,6 @@ func (c *Client) restoreState() error { // saveState is used to snapshot our state into the data dir. func (c *Client) saveState() error { - if c.config.DevMode { - return nil - } - var wg sync.WaitGroup var l sync.Mutex var mErr multierror.Error @@ -823,7 +818,7 @@ func (c *Client) saveState() error { for id, ar := range runners { go func(id string, ar AllocRunner) { - err := ar.SaveState() + err := c.stateDB.PutAllocation(ar.Alloc()) if err != nil { c.logger.Printf("[ERR] client: failed to save state for alloc %q: %v", id, err) l.Lock() @@ -1506,18 +1501,9 @@ func (c *Client) updateNodeStatus() error { return nil } -// XXX combine with below -func (c *Client) AllocStateUpdated(alloc *structs.Allocation) error { - if alloc == nil { - return fmt.Errorf("nil allocation provided to AllocStateUpdated") - } - - c.updateAllocStatus(alloc) - return nil -} - -// updateAllocStatus is used to update the status of an allocation -func (c *Client) updateAllocStatus(alloc *structs.Allocation) { +// AllocStateUpdated asynchronously updates the server with the current state +// of an allocations and its tasks. +func (c *Client) AllocStateUpdated(alloc *structs.Allocation) { if alloc.Terminated() { // Terminated, mark for GC if we're still tracking this alloc // runner. If it's not being tracked that means the server has @@ -1901,6 +1887,12 @@ func (c *Client) updateAlloc(update *structs.Allocation) { return } + // Update local copy of alloc + if err := c.stateDB.PutAllocation(update); err != nil { + c.logger.Printf("[ERR] client: dropping alloc update. Unable to persist locally: %q", update.ID) + } + + // Update alloc runner ar.Update(update) } @@ -1915,6 +1907,12 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error return nil } + // Initialize local copy of alloc before creating the alloc runner so + // we can't end up with an alloc runner that does not have an alloc. + if err := c.stateDB.PutAllocation(alloc); err != nil { + return err + } + //FIXME disabled previous alloc waiting/migrating // get the previous alloc runner - if one exists - for the // blocking/migrating watcher @@ -1931,7 +1929,6 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error // Copy the config since the node can be swapped out as it is being updated. // The long term fix is to pass in the config and node separately and then // we don't have to do a copy. - //ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc) //XXX FIXME create a root logger logger := hclog.New(&hclog.LoggerOptions{ Name: "nomad", @@ -1959,11 +1956,6 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error // Store the alloc runner. c.allocs[alloc.ID] = ar - // Initialize local state - if err := ar.SaveState(); err != nil { - c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err) - } - go ar.Run() return nil } diff --git a/client/interfaces/client.go b/client/interfaces/client.go index 6525df693..f79db6620 100644 --- a/client/interfaces/client.go +++ b/client/interfaces/client.go @@ -10,5 +10,5 @@ type Client interface { type AllocStateHandler interface { // AllocStateUpdated is used to emit an updated allocation. This allocation // is stripped to only include client settable fields. - AllocStateUpdated(alloc *structs.Allocation) error + AllocStateUpdated(alloc *structs.Allocation) }