diff --git a/client/client.go b/client/client.go index ffb1bc219..5bb939380 100644 --- a/client/client.go +++ b/client/client.go @@ -645,15 +645,26 @@ func (c *Client) saveState() error { return nil } - var mErr multierror.Error for id, ar := range c.getAllocRunners() { - if err := ar.SaveState(); err != nil { - c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", - id, err) - mErr.Errors = append(mErr.Errors, err) - } + go func() { + local := ar + if err := local.SaveState(); err != nil { + c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", + id, err) + } + }() } - return mErr.ErrorOrNil() + return nil + + //var mErr multierror.Error + //for id, ar := range c.getAllocRunners() { + //if err := ar.SaveState(); err != nil { + //c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", + //id, err) + //mErr.Errors = append(mErr.Errors, err) + //} + //} + //return mErr.ErrorOrNil() } // getAllocRunners returns a snapshot of the current set of alloc runners. diff --git a/client/task_runner.go b/client/task_runner.go index 5444a5f12..00c760a86 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" "github.com/hashicorp/nomad/client/driver/env" dstructs "github.com/hashicorp/nomad/client/driver/structs" @@ -339,7 +340,6 @@ func (r *TaskRunner) RestoreState() error { func (r *TaskRunner) SaveState() error { // XXX needs to be updated r.persistLock.Lock() - defer r.persistLock.Unlock() snap := taskRunnerState{ Version: r.config.Version, @@ -357,9 +357,17 @@ func (r *TaskRunner) SaveState() error { h := snap.Hash() if bytes.Equal(h, r.persistedHash) { + r.persistLock.Unlock() return nil } + // Serialize the object + var buf bytes.Buffer + if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(&snap); err != nil { + return fmt.Errorf("failed to serialize snapshot: %v", err) + } + r.persistLock.Unlock() + // Start the transaction. return r.stateDB.Batch(func(tx *bolt.Tx) error { // Grab the task bucket @@ -368,7 +376,7 @@ func (r *TaskRunner) SaveState() error { return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } - if err := putObject(taskBkt, taskRunnerStateAllKey, &snap); err != nil { + if err := putData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil { return fmt.Errorf("failed to write task_runner state: %v", err) } @@ -391,11 +399,6 @@ func (r *TaskRunner) DestroyState() error { // setState is used to update the state of the task runner func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { - // Persist our state to disk. - if err := r.SaveState(); err != nil { - r.logger.Printf("[ERR] client: failed to save state of Task Runner for task %q: %v", r.task.Name, err) - } - // Indicate the task has been updated. r.updater(r.task.Name, state, event) }