mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
Async and sync saving of client state
This commit is contained in:
@@ -447,7 +447,7 @@ func (c *Client) Shutdown() error {
|
||||
c.shutdown = true
|
||||
close(c.shutdownCh)
|
||||
c.connPool.Shutdown()
|
||||
return c.saveState()
|
||||
return c.saveState(true)
|
||||
}
|
||||
|
||||
// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
|
||||
@@ -639,32 +639,40 @@ func (c *Client) restoreState() error {
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// saveState is used to snapshot our state into the data dir
|
||||
func (c *Client) saveState() error {
|
||||
// saveState is used to snapshot our state into the data dir. If blocking is set
|
||||
// to true, the function will only return once state has been saved. If false,
|
||||
// the errors will be logged and state saving will be asyncronous
|
||||
func (c *Client) saveState(blocking bool) error {
|
||||
if c.config.DevMode {
|
||||
return nil
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var l sync.Mutex
|
||||
var mErr multierror.Error
|
||||
runners := c.getAllocRunners()
|
||||
wg.Add(len(runners))
|
||||
|
||||
for id, ar := range c.getAllocRunners() {
|
||||
go func() {
|
||||
local := ar
|
||||
if err := local.SaveState(); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v",
|
||||
id, err)
|
||||
err := local.SaveState()
|
||||
if err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err)
|
||||
l.Lock()
|
||||
multierror.Append(&mErr, err)
|
||||
l.Unlock()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
|
||||
//var mErr multierror.Error
|
||||
//for id, ar := range c.getAllocRunners() {
|
||||
//if err := ar.SaveState(); err != nil {
|
||||
//c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v",
|
||||
//id, err)
|
||||
//mErr.Errors = append(mErr.Errors, err)
|
||||
//}
|
||||
//}
|
||||
//return mErr.ErrorOrNil()
|
||||
if blocking {
|
||||
wg.Wait()
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getAllocRunners returns a snapshot of the current set of alloc runners.
|
||||
@@ -1018,7 +1026,7 @@ func (c *Client) periodicSnapshot() {
|
||||
select {
|
||||
case <-snapshot:
|
||||
snapshot = time.After(stateSnapshotIntv)
|
||||
if err := c.saveState(); err != nil {
|
||||
if err := c.saveState(false); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to save state: %v", err)
|
||||
}
|
||||
|
||||
@@ -1577,7 +1585,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
|
||||
}
|
||||
|
||||
// Persist our state
|
||||
if err := c.saveState(); err != nil {
|
||||
if err := c.saveState(false); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to save state: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user