Merge pull request #2610 from hashicorp/f-bolt-db

Client persist state using bolt-db and more efficient write patterns
This commit is contained in:
Alex Dadgar
2017-05-09 13:01:36 -07:00
committed by GitHub
15 changed files with 779 additions and 236 deletions

View File

@@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
@@ -27,6 +28,13 @@ const (
taskReceivedSyncLimit = 30 * time.Second
)
var (
// The following are the key paths written to the state database
allocRunnerStateImmutableKey = []byte("immutable")
allocRunnerStateMutableKey = []byte("mutable")
allocRunnerStateAllocDirKey = []byte("alloc-dir")
)
// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation)
@@ -69,10 +77,18 @@ type AllocRunner struct {
destroyLock sync.Mutex
waitCh chan struct{}
// serialize saveAllocRunnerState calls
persistLock sync.Mutex
// State related fields
// stateDB is used to store the alloc runners state
stateDB *bolt.DB
// immutablePersisted and allocDirPersisted are used to track whether the
// immutable data and the alloc dir have been persisted. Once persisted we
// can lower write volume by not re-writing these values
immutablePersisted bool
allocDirPersisted bool
}
// COMPAT: Remove in 0.7.0
// allocRunnerState is used to snapshot the state of the alloc runner
type allocRunnerState struct {
Version string
@@ -95,13 +111,29 @@ type allocRunnerState struct {
} `json:"Context,omitempty"`
}
// allocRunnerImmutableState is state that only has to be written once as it
// doesn't change over the life-cycle of the alloc_runner.
type allocRunnerImmutableState struct {
Version string
Alloc *structs.Allocation
}
// allocRunnerMutableState is state that has to be written on each save as it
// changes over the life-cycle of the alloc_runner.
type allocRunnerMutableState struct {
AllocClientStatus string
AllocClientDescription string
TaskStates map[string]*structs.TaskState
}
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater,
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient,
consulClient ConsulServiceAPI) *AllocRunner {
ar := &AllocRunner{
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
alloc: alloc,
@@ -118,8 +150,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat
return ar
}
// stateFilePath returns the path to our state file
func (r *AllocRunner) stateFilePath() string {
// pre060StateFilePath returns the path to our state file that would have been
// written pre v0.6.0
// COMPAT: Remove in 0.7.0
func (r *AllocRunner) pre060StateFilePath() string {
r.allocLock.Lock()
defer r.allocLock.Unlock()
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
@@ -128,28 +162,79 @@ func (r *AllocRunner) stateFilePath() string {
// RestoreState is used to restore the state of the alloc runner
func (r *AllocRunner) RestoreState() error {
// Load the snapshot
var snap allocRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
return err
}
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
for taskName := range snap.Context.AllocDir.TaskDirs {
snap.AllocDir.NewTaskDir(taskName)
// COMPAT: Remove in 0.7.0
// Check if the old snapshot is there
oldPath := r.pre060StateFilePath()
var snap allocRunnerState
var upgrading bool
if err := pre060RestoreState(oldPath, &snap); err == nil {
// Restore fields
r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID)
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription
if r.alloc != nil {
r.taskStates = snap.Alloc.TaskStates
}
// COMPAT: Remove in 0.7.0
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
for taskName := range snap.Context.AllocDir.TaskDirs {
r.allocDir.NewTaskDir(taskName)
}
}
// Delete the old state
os.RemoveAll(oldPath)
upgrading = true
} else if !os.IsNotExist(err) {
// Something corrupt in the old state file
return err
} else {
// We are doing a normal restore
err := r.stateDB.View(func(tx *bolt.Tx) error {
bkt, err := getAllocationBucket(tx, r.alloc.ID)
if err != nil {
return fmt.Errorf("failed to get allocation bucket: %v", err)
}
// Get the state objects
var mutable allocRunnerMutableState
var immutable allocRunnerImmutableState
var allocDir allocdir.AllocDir
if err := getObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil {
return fmt.Errorf("failed to read alloc runner immutable state: %v", err)
}
if err := getObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil {
return fmt.Errorf("failed to read alloc runner mutable state: %v", err)
}
if err := getObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil {
return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err)
}
// Populate the fields
r.alloc = immutable.Alloc
r.allocDir = &allocDir
r.allocClientStatus = mutable.AllocClientStatus
r.allocClientDescription = mutable.AllocClientDescription
r.taskStates = mutable.TaskStates
r.alloc.ClientStatus = getClientStatus(r.taskStates)
return nil
})
if err != nil {
return fmt.Errorf("failed to read allocation state: %v", err)
}
}
// Restore fields
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription
var snapshotErrors multierror.Error
if r.alloc == nil {
snapshotErrors.Errors = append(snapshotErrors.Errors, fmt.Errorf("alloc_runner snapshot includes a nil allocation"))
@@ -161,11 +246,17 @@ func (r *AllocRunner) RestoreState() error {
return e
}
r.taskStates = snap.Alloc.TaskStates
tg := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("restored allocation doesn't contain task group %q", r.alloc.TaskGroup)
}
// Restore the task runners
var mErr multierror.Error
for name, state := range r.taskStates {
for _, task := range tg.Tasks {
name := task.Name
state := r.taskStates[name]
// Mark the task as restored.
r.restored[name] = struct{}{}
@@ -177,8 +268,7 @@ func (r *AllocRunner) RestoreState() error {
return err
}
task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr
// Skip tasks in terminal states.
@@ -193,22 +283,23 @@ func (r *AllocRunner) RestoreState() error {
// Only start if the alloc isn't in a terminal status.
go tr.Run()
if upgrading {
if err := tr.SaveState(); err != nil {
r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.alloc.ID, name, err)
}
}
// Restart task runner if RestoreState gave a reason
if restartReason != "" {
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.alloc.ID, name, restartReason)
tr.Restart("upgrade", restartReason)
}
}
}
return mErr.ErrorOrNil()
}
// GetAllocDir returns the alloc dir for the alloc runner
func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}
// SaveState is used to snapshot the state of the alloc runner
// if the fullSync is marked as false only the state of the Alloc Runner
// is snapshotted. If fullSync is marked as true, we snapshot
@@ -230,10 +321,7 @@ func (r *AllocRunner) SaveState() error {
}
func (r *AllocRunner) saveAllocRunnerState() error {
r.persistLock.Lock()
defer r.persistLock.Unlock()
// Create the snapshot.
// Grab all the relevant data
alloc := r.Alloc()
r.allocLock.Lock()
@@ -245,14 +333,55 @@ func (r *AllocRunner) saveAllocRunnerState() error {
allocDir := r.allocDir
r.allocDirLock.Unlock()
snap := allocRunnerState{
Version: r.config.Version,
Alloc: alloc,
AllocDir: allocDir,
AllocClientStatus: allocClientStatus,
AllocClientDescription: allocClientDescription,
}
return persistState(r.stateFilePath(), &snap)
// Start the transaction.
return r.stateDB.Batch(func(tx *bolt.Tx) error {
// Grab the allocation bucket
allocBkt, err := getAllocationBucket(tx, r.alloc.ID)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
// Write the immutable data
if !r.immutablePersisted {
immutable := &allocRunnerImmutableState{
Alloc: alloc,
Version: r.config.Version,
}
if err := putObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil {
return fmt.Errorf("failed to write alloc_runner immutable state: %v", err)
}
tx.OnCommit(func() {
r.immutablePersisted = true
})
}
// Write the alloc dir data if it hasn't been written before and it exists.
if !r.allocDirPersisted && r.allocDir != nil {
if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil {
return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err)
}
tx.OnCommit(func() {
r.allocDirPersisted = true
})
}
// Write the mutable state every time
mutable := &allocRunnerMutableState{
AllocClientStatus: allocClientStatus,
AllocClientDescription: allocClientDescription,
TaskStates: alloc.TaskStates,
}
if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil {
return fmt.Errorf("failed to write alloc_runner mutable state: %v", err)
}
return nil
})
}
func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
@@ -265,7 +394,12 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
// DestroyState is used to cleanup after ourselves
func (r *AllocRunner) DestroyState() error {
return os.RemoveAll(filepath.Dir(r.stateFilePath()))
return r.stateDB.Update(func(tx *bolt.Tx) error {
if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil {
return fmt.Errorf("failed to delete allocation bucket: %v", err)
}
return nil
})
}
// DestroyContext is used to destroy the context
@@ -273,6 +407,11 @@ func (r *AllocRunner) DestroyContext() error {
return r.allocDir.Destroy()
}
// GetAllocDir returns the alloc dir for the alloc runner
func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}
// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
@@ -285,8 +424,20 @@ func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.Ta
// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()
// Clear the job before copying
job := r.alloc.Job
// Since we are clearing the job, anything that access the alloc.Job field
// must acquire the lock or access it via this method.
r.alloc.Job = nil
alloc := r.alloc.Copy()
// Restore
r.alloc.Job = job
alloc.Job = job
// The status has explicitly been set.
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
@@ -303,10 +454,19 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Unlock()
// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
alloc.TaskStates = copyTaskStates(r.taskStates)
for _, state := range r.taskStates {
alloc.ClientStatus = getClientStatus(r.taskStates)
r.taskStatusLock.RUnlock()
return alloc
}
// getClientStatus takes in the task states for a given allocation and computes
// the client status
func getClientStatus(taskStates map[string]*structs.TaskState) string {
var pending, running, dead, failed bool
for _, state := range taskStates {
switch state.State {
case structs.TaskStateRunning:
running = true
@@ -320,20 +480,19 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
}
}
}
r.taskStatusLock.RUnlock()
// Determine the alloc status
if failed {
alloc.ClientStatus = structs.AllocClientStatusFailed
return structs.AllocClientStatusFailed
} else if running {
alloc.ClientStatus = structs.AllocClientStatusRunning
return structs.AllocClientStatusRunning
} else if pending {
alloc.ClientStatus = structs.AllocClientStatusPending
return structs.AllocClientStatusPending
} else if dead {
alloc.ClientStatus = structs.AllocClientStatusComplete
return structs.AllocClientStatusComplete
}
return alloc
return ""
}
// dirtySyncState is used to watch for state being marked dirty to sync
@@ -469,7 +628,7 @@ func (r *AllocRunner) Run() {
go r.dirtySyncState()
// Find the task group to run in the allocation
alloc := r.alloc
alloc := r.Alloc()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
@@ -522,7 +681,7 @@ func (r *AllocRunner) Run() {
taskdir := r.allocDir.NewTaskDir(task.Name)
r.allocDirLock.Unlock()
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
r.tasks[task.Name] = tr
tr.MarkReceived()
@@ -555,6 +714,10 @@ OUTER:
for _, tr := range runners {
tr.Update(update)
}
if err := r.syncStatus(); err != nil {
r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err)
}
case <-r.destroyCh:
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
break OUTER

View File

@@ -10,6 +10,7 @@ import (
"text/template"
"time"
"github.com/boltdb/bolt"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@@ -36,13 +37,15 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl
conf.Node = mock.Node()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
tmp, _ := ioutil.TempFile("", "state-db")
db, _ := bolt.Open(tmp.Name(), 0600, nil)
upd := &MockAllocStateUpdater{}
if !restarts {
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, newMockConsulServiceClient())
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient())
return upd, ar
}
@@ -171,9 +174,15 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}
// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
// Check the allocation state still exists
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if !allocationBucketExists(tx, ar.Alloc().ID) {
return fmt.Errorf("no bucket for alloc")
}
return nil
}); err != nil {
return false, fmt.Errorf("state destroyed")
}
// Check the alloc directory still exists
@@ -201,10 +210,14 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
}
// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if allocationBucketExists(tx, ar.Alloc().ID) {
return fmt.Errorf("bucket for alloc exists")
}
return nil
}); err != nil {
return false, fmt.Errorf("state not destroyed")
}
// Check the alloc directory was cleaned
@@ -249,10 +262,14 @@ func TestAllocRunner_Destroy(t *testing.T) {
}
// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if allocationBucketExists(tx, ar.Alloc().ID) {
return fmt.Errorf("bucket for alloc exists")
}
return nil
}); err != nil {
return false, fmt.Errorf("state not destroyed")
}
// Check the alloc directory was cleaned
@@ -324,7 +341,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, ar.config, upd.Update,
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
err = ar2.RestoreState()
@@ -368,12 +385,10 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
}
func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
ar.logger = prefixedTestLogger("ar1: ")
// Ensure task takes some time
ar.alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["run_for"] = "10s"
@@ -410,14 +425,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
}
// Ensure ar1 doesn't recreate the state file
ar.persistLock.Lock()
defer ar.persistLock.Unlock()
ar.allocLock.Lock()
defer ar.allocLock.Unlock()
// Ensure both alloc runners don't destroy
ar.destroy = true
// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
ar2.logger = prefixedTestLogger("ar2: ")
err = ar2.RestoreState()
@@ -429,8 +444,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if !allocationBucketExists(tx, ar2.Alloc().ID) {
return fmt.Errorf("no bucket for alloc")
}
return nil
}); err != nil {
return false, fmt.Errorf("state destroyed")
}
// Check the alloc directory still exists
@@ -459,10 +480,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
}
// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
if err := ar.stateDB.View(func(tx *bolt.Tx) error {
if allocationBucketExists(tx, ar2.Alloc().ID) {
return fmt.Errorf("bucket for alloc exists")
}
return nil
}); err != nil {
return false, fmt.Errorf("state not destroyed")
}
// Check the alloc directory was cleaned
@@ -497,7 +522,14 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
// Snapshot state
testutil.WaitForResult(func() (bool, error) {
return len(ar.tasks) == 1, nil
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("task never started: %v", err)
})
@@ -509,9 +541,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, origConfig, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
@@ -527,14 +557,14 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
return false, nil
}
for _, ev := range ar2.alloc.TaskStates["web"].Events {
for _, ev := range ar2.taskStates["web"].Events {
if strings.HasSuffix(ev.RestartReason, pre06ScriptCheckReason) {
return true, nil
}
}
return false, fmt.Errorf("no restart with proper reason found")
}, func(err error) {
t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.alloc.TaskStates["web"])
t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.taskStates["web"])
})
// Destroy and wait
@@ -584,6 +614,14 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
conf := config.DefaultConfig()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
tmp, err := ioutil.TempFile("", "state-db")
if err != nil {
t.Fatalf("error creating state db file: %v", err)
}
db, err := bolt.Open(tmp.Name(), 0600, nil)
if err != nil {
t.Fatalf("error creating state db: %v", err)
}
if err := os.MkdirAll(filepath.Join(conf.StateDir, "alloc", alloc.ID), 0777); err != nil {
t.Fatalf("error creating state dir: %v", err)
@@ -655,7 +693,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
alloc.Job.Type = structs.JobTypeBatch
vclient := vaultclient.NewMockVaultClient()
cclient := newMockConsulServiceClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, cclient)
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient)
defer ar.Destroy()
// RestoreState should fail on the task state since we only test the
@@ -671,7 +709,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
if len(merr.Errors) != 1 {
t.Fatalf("expected exactly 1 error from RestoreState but found: %d: %v", len(merr.Errors), err)
}
if expected := "task runner snapshot includes nil Task"; merr.Errors[0].Error() != expected {
if expected := "failed to get task bucket"; !strings.Contains(merr.Errors[0].Error(), expected) {
t.Fatalf("expected %q but got: %q", expected, merr.Errors[0].Error())
}

View File

@@ -16,6 +16,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/boltdb/bolt"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-multierror"
@@ -99,6 +100,9 @@ type Client struct {
config *config.Config
start time.Time
// stateDB is used to efficiently store client state.
stateDB *bolt.DB
// configCopy is a copy that should be passed to alloc-runners.
configCopy *config.Config
configLock sync.RWMutex
@@ -340,6 +344,13 @@ func (c *Client) init() error {
}
c.logger.Printf("[INFO] client: using state directory %v", c.config.StateDir)
// Create or open the state database
db, err := bolt.Open(filepath.Join(c.config.StateDir, "state.db"), 0600, nil)
if err != nil {
return fmt.Errorf("failed to create state database", err)
}
c.stateDB = db
// Ensure the alloc dir exists if we have one
if c.config.AllocDir != "" {
if err := os.MkdirAll(c.config.AllocDir, 0755); err != nil {
@@ -410,6 +421,13 @@ func (c *Client) Shutdown() error {
return nil
}
// Defer closing the database
defer func() {
if err := c.stateDB.Close(); err != nil {
c.logger.Printf("[ERR] client: failed to close state database on shutdown: %v", err)
}
}()
// Stop renewing tokens and secrets
if c.vaultClient != nil {
c.vaultClient.Stop()
@@ -590,49 +608,106 @@ func (c *Client) restoreState() error {
return nil
}
// COMPAT: Remove in 0.7.0
// 0.6.0 transistioned from individual state files to a single bolt-db.
// The upgrade path is to:
// Check if old state exists
// If so, restore from that and delete old state
// Restore using state database
// Allocs holds the IDs of the allocations being restored
var allocs []string
// Upgrading tracks whether this is a pre 0.6.0 upgrade path
var upgrading bool
// Scan the directory
list, err := ioutil.ReadDir(filepath.Join(c.config.StateDir, "alloc"))
if err != nil && os.IsNotExist(err) {
return nil
} else if err != nil {
allocDir := filepath.Join(c.config.StateDir, "alloc")
list, err := ioutil.ReadDir(allocDir)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to list alloc state: %v", err)
} else if err == nil && len(list) != 0 {
upgrading = true
for _, entry := range list {
allocs = append(allocs, entry.Name())
}
} else {
// Normal path
err := c.stateDB.View(func(tx *bolt.Tx) error {
allocs, err = getAllAllocationIDs(tx)
if err != nil {
return fmt.Errorf("failed to list allocations: %v", err)
}
return nil
})
if err != nil {
return err
}
}
// Load each alloc back
var mErr multierror.Error
for _, entry := range list {
id := entry.Name()
for _, id := range allocs {
alloc := &structs.Allocation{ID: id}
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
c.configLock.RUnlock()
c.allocLock.Lock()
c.allocs[id] = ar
c.allocLock.Unlock()
if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err)
mErr.Errors = append(mErr.Errors, err)
} else {
go ar.Run()
if upgrading {
if err := ar.SaveState(); err != nil {
c.logger.Printf("[WARN] client: initial save state for alloc %s failed: %v", id, err)
}
}
}
}
// Delete all the entries
if upgrading {
if err := os.RemoveAll(allocDir); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// saveState is used to snapshot our state into the data dir
// saveState is used to snapshot our state into the data dir.
func (c *Client) saveState() error {
if c.config.DevMode {
return nil
}
var wg sync.WaitGroup
var l sync.Mutex
var mErr multierror.Error
for id, ar := range c.getAllocRunners() {
if err := ar.SaveState(); err != nil {
c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v",
id, err)
mErr.Errors = append(mErr.Errors, err)
}
runners := c.getAllocRunners()
wg.Add(len(runners))
for id, ar := range runners {
go func(id string, ar *AllocRunner) {
err := ar.SaveState()
if err != nil {
c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err)
l.Lock()
multierror.Append(&mErr, err)
l.Unlock()
}
wg.Done()
}(id, ar)
}
wg.Wait()
return mErr.ErrorOrNil()
}
@@ -1466,8 +1541,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Remove the old allocations
for _, remove := range diff.removed {
if err := c.removeAlloc(remove); err != nil {
c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v",
remove.ID, err)
c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err)
}
}
@@ -1542,11 +1616,6 @@ func (c *Client) runAllocs(update *allocUpdates) {
add.ID, err)
}
}
// Persist our state
if err := c.saveState(); err != nil {
c.logger.Printf("[ERR] client: failed to save state: %v", err)
}
}
// blockForRemoteAlloc blocks until the previous allocation of an allocation has
@@ -1892,9 +1961,14 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
defer c.allocLock.Unlock()
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar.SetPreviousAllocDir(prevAllocDir)
c.configLock.RUnlock()
if err := ar.SaveState(); err != nil {
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err)
}
go ar.Run()
// Store the alloc runner.

View File

@@ -2,8 +2,10 @@ package driver
import (
"context"
"crypto/md5"
"errors"
"fmt"
"io"
"log"
"os"
"strings"
@@ -150,6 +152,20 @@ func (r *CreatedResources) Merge(o *CreatedResources) {
}
}
func (r *CreatedResources) Hash() []byte {
h := md5.New()
for k, values := range r.Resources {
io.WriteString(h, k)
io.WriteString(h, "values")
for i, v := range values {
io.WriteString(h, fmt.Sprintf("%d-%v", i, v))
}
}
return h.Sum(nil)
}
// Driver is used for execution of tasks. This allows Nomad
// to support many pluggable implementations of task drivers.
// Examples could include LXC, Docker, Qemu, etc.

214
client/state_database.go Normal file
View File

@@ -0,0 +1,214 @@
package client
import (
"bytes"
"fmt"
"github.com/boltdb/bolt"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
)
/*
The client has a boltDB backed state store. The schema as of 0.6 looks as follows:
allocations/ (bucket)
|--> <alloc-id>/ (bucket)
|--> alloc_runner persisted objects (k/v)
|--> <task-name>/ (bucket)
|--> task_runner persisted objects (k/v)
*/
var (
// allocationsBucket is the bucket name containing all allocation related
// data
allocationsBucket = []byte("allocations")
)
func putObject(bkt *bolt.Bucket, key []byte, obj interface{}) error {
if !bkt.Writable() {
return fmt.Errorf("bucket must be writable")
}
// Serialize the object
var buf bytes.Buffer
if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(obj); err != nil {
return fmt.Errorf("failed to encode passed object: %v", err)
}
if err := bkt.Put(key, buf.Bytes()); err != nil {
return fmt.Errorf("failed to write data at key %v: %v", string(key), err)
}
return nil
}
func putData(bkt *bolt.Bucket, key, value []byte) error {
if !bkt.Writable() {
return fmt.Errorf("bucket must be writable")
}
if err := bkt.Put(key, value); err != nil {
return fmt.Errorf("failed to write data at key %v: %v", string(key), err)
}
return nil
}
func getObject(bkt *bolt.Bucket, key []byte, obj interface{}) error {
// Get the data
data := bkt.Get(key)
if data == nil {
return fmt.Errorf("no data at key %v", string(key))
}
// Deserialize the object
if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(obj); err != nil {
return fmt.Errorf("failed to decode data into passed object: %v", err)
}
return nil
}
// getAllocationBucket returns the bucket used to persist state about a
// particular allocation. If the root allocation bucket or the specific
// allocation bucket doesn't exist, it will be created as long as the
// transaction is writable.
func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) {
var err error
w := tx.Writable()
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
if allocations == nil {
if !w {
return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable")
}
allocations, err = tx.CreateBucket(allocationsBucket)
if err != nil {
return nil, err
}
}
// Retrieve the specific allocations bucket
key := []byte(allocID)
alloc := allocations.Bucket(key)
if alloc == nil {
if !w {
return nil, fmt.Errorf("Allocation bucket doesn't exist and transaction is not writable")
}
alloc, err = allocations.CreateBucket(key)
if err != nil {
return nil, err
}
}
return alloc, nil
}
func allocationBucketExists(tx *bolt.Tx, allocID string) bool {
allocations := tx.Bucket(allocationsBucket)
if allocations == nil {
return false
}
// Retrieve the specific allocations bucket
alloc := allocations.Bucket([]byte(allocID))
return alloc != nil
}
// getTaskBucket returns the bucket used to persist state about a
// particular task. If the root allocation bucket, the specific
// allocation or task bucket doesn't exist, they will be created as long as the
// transaction is writable.
func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) {
alloc, err := getAllocationBucket(tx, allocID)
if err != nil {
return nil, err
}
// Retrieve the specific task bucket
w := tx.Writable()
key := []byte(taskName)
task := alloc.Bucket(key)
if task == nil {
if !w {
return nil, fmt.Errorf("Task bucket doesn't exist and transaction is not writable")
}
task, err = alloc.CreateBucket(key)
if err != nil {
return nil, err
}
}
return task, nil
}
// deleteAllocationBucket is used to delete an allocation bucket if it exists.
func deleteAllocationBucket(tx *bolt.Tx, allocID string) error {
if !tx.Writable() {
return fmt.Errorf("transaction must be writable")
}
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
if allocations == nil {
return nil
}
// Check if the bucket exists
key := []byte(allocID)
if allocBkt := allocations.Bucket(key); allocBkt == nil {
return nil
}
return allocations.DeleteBucket(key)
}
// deleteTaskBucket is used to delete a task bucket if it exists.
func deleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error {
if !tx.Writable() {
return fmt.Errorf("transaction must be writable")
}
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
if allocations == nil {
return nil
}
// Retrieve the specific allocations bucket
alloc := allocations.Bucket([]byte(allocID))
if alloc == nil {
return nil
}
// Check if the bucket exists
key := []byte(taskName)
if taskBkt := alloc.Bucket(key); taskBkt == nil {
return nil
}
return alloc.DeleteBucket(key)
}
func getAllAllocationIDs(tx *bolt.Tx) ([]string, error) {
allocationsBkt := tx.Bucket(allocationsBucket)
if allocationsBkt == nil {
return nil, nil
}
// Create a cursor for iteration.
var allocIDs []string
c := allocationsBkt.Cursor()
// Iterate over all the buckets
for k, _ := c.First(); k != nil; k, _ = c.Next() {
allocIDs = append(allocIDs, string(k))
}
return allocIDs, nil
}

View File

@@ -1,9 +1,11 @@
package client
import (
"bytes"
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"log"
"os"
@@ -13,6 +15,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/boltdb/bolt"
"github.com/golang/snappy"
"github.com/hashicorp/consul-template/signals"
"github.com/hashicorp/go-multierror"
@@ -23,6 +26,7 @@ import (
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
"github.com/hashicorp/nomad/client/driver/env"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
@@ -55,8 +59,15 @@ const (
vaultTokenFile = "vault_token"
)
var (
// taskRunnerStateAllKey holds all the task runners state. At the moment
// there is no need to split it
taskRunnerStateAllKey = []byte("simple-all")
)
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
type TaskRunner struct {
stateDB *bolt.DB
config *config.Config
updater TaskStateUpdater
logger *log.Logger
@@ -143,17 +154,33 @@ type TaskRunner struct {
// AllocRunner, so all state fields must be synchronized using this
// lock.
persistLock sync.Mutex
// persistedHash is the hash of the last persisted snapshot. It is used to
// detect if a new snapshot has to be writen to disk.
persistedHash []byte
}
// taskRunnerState is used to snapshot the state of the task runner
type taskRunnerState struct {
Version string
Task *structs.Task
HandleID string
ArtifactDownloaded bool
TaskDirBuilt bool
CreatedResources *driver.CreatedResources
PayloadRendered bool
CreatedResources *driver.CreatedResources
}
func (s *taskRunnerState) Hash() []byte {
h := md5.New()
io.WriteString(h, s.Version)
io.WriteString(h, s.HandleID)
io.WriteString(h, fmt.Sprintf("%v", s.ArtifactDownloaded))
io.WriteString(h, fmt.Sprintf("%v", s.TaskDirBuilt))
io.WriteString(h, fmt.Sprintf("%v", s.PayloadRendered))
h.Write(s.CreatedResources.Hash())
return h.Sum(nil)
}
// TaskStateUpdater is used to signal that tasks state has changed.
@@ -173,7 +200,7 @@ type SignalEvent struct {
// NewTaskRunner is used to create a new task context
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, taskDir *allocdir.TaskDir,
stateDB *bolt.DB, updater TaskStateUpdater, taskDir *allocdir.TaskDir,
alloc *structs.Allocation, task *structs.Task,
vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI) *TaskRunner {
@@ -190,6 +217,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
tc := &TaskRunner{
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
restartTracker: restartTracker,
@@ -222,17 +250,17 @@ func (r *TaskRunner) WaitCh() <-chan struct{} {
return r.waitCh
}
// stateFilePath returns the path to our state file
func (r *TaskRunner) stateFilePath() string {
// pre060StateFilePath returns the path to our state file that would have been
// written pre v0.6.0
// COMPAT: Remove in 0.7.0
func (r *TaskRunner) pre060StateFilePath() string {
// Get the MD5 of the task name
hashVal := md5.Sum([]byte(r.task.Name))
hashHex := hex.EncodeToString(hashVal[:])
dirName := fmt.Sprintf("task-%s", hashHex)
// Generate the path
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID,
dirName, "state.json")
return path
return filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, dirName, "state.json")
}
// RestoreState is used to restore our state. If a non-empty string is returned
@@ -240,22 +268,46 @@ func (r *TaskRunner) stateFilePath() string {
// backwards incompatible upgrades that need to restart tasks with a new
// executor.
func (r *TaskRunner) RestoreState() (string, error) {
// Load the snapshot
// COMPAT: Remove in 0.7.0
// 0.6.0 transistioned from individual state files to a single bolt-db.
// The upgrade path is to:
// Check if old state exists
// If so, restore from that and delete old state
// Restore using state database
var snap taskRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
// Check if the old snapshot is there
oldPath := r.pre060StateFilePath()
if err := pre060RestoreState(oldPath, &snap); err == nil {
// Delete the old state
os.RemoveAll(oldPath)
} else if !os.IsNotExist(err) {
// Something corrupt in the old state file
return "", err
} else {
// We are doing a normal restore
err := r.stateDB.View(func(tx *bolt.Tx) error {
bkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name)
if err != nil {
return fmt.Errorf("failed to get task bucket: %v", err)
}
if err := getObject(bkt, taskRunnerStateAllKey, &snap); err != nil {
return fmt.Errorf("failed to read task runner state: %v", err)
}
return nil
})
if err != nil {
return "", err
}
}
// Restore fields
if snap.Task == nil {
return "", fmt.Errorf("task runner snapshot includes nil Task")
} else {
r.task = snap.Task
}
// Restore fields from the snapshot
r.artifactsDownloaded = snap.ArtifactDownloaded
r.taskDirBuilt = snap.TaskDirBuilt
r.payloadRendered = snap.PayloadRendered
r.setCreatedResources(snap.CreatedResources)
if err := r.setTaskEnv(); err != nil {
@@ -357,9 +409,7 @@ func pre06ScriptCheck(ver, driver string, services []*structs.Service) bool {
func (r *TaskRunner) SaveState() error {
r.persistLock.Lock()
defer r.persistLock.Unlock()
snap := taskRunnerState{
Task: r.task,
Version: r.config.Version,
ArtifactDownloaded: r.artifactsDownloaded,
TaskDirBuilt: r.taskDirBuilt,
@@ -372,7 +422,38 @@ func (r *TaskRunner) SaveState() error {
snap.HandleID = r.handle.ID()
}
r.handleLock.Unlock()
return persistState(r.stateFilePath(), &snap)
// If nothing has changed avoid the write
h := snap.Hash()
if bytes.Equal(h, r.persistedHash) {
return nil
}
// Serialize the object
var buf bytes.Buffer
if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(&snap); err != nil {
return fmt.Errorf("failed to serialize snapshot: %v", err)
}
// Start the transaction.
return r.stateDB.Batch(func(tx *bolt.Tx) error {
// Grab the task bucket
taskBkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
if err := putData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil {
return fmt.Errorf("failed to write task_runner state: %v", err)
}
// Store the hash that was persisted
tx.OnCommit(func() {
r.persistedHash = h
})
return nil
})
}
// DestroyState is used to cleanup after ourselves
@@ -380,7 +461,12 @@ func (r *TaskRunner) DestroyState() error {
r.persistLock.Lock()
defer r.persistLock.Unlock()
return os.RemoveAll(r.stateFilePath())
return r.stateDB.Update(func(tx *bolt.Tx) error {
if err := deleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil {
return fmt.Errorf("failed to delete task bucket: %v", err)
}
return nil
})
}
// setState is used to update the state of the task runner

View File

@@ -13,6 +13,7 @@ import (
"testing"
"time"
"github.com/boltdb/bolt"
"github.com/golang/snappy"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
@@ -77,6 +78,16 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat
conf.Node = mock.Node()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
tmp, err := ioutil.TempFile("", "state-db")
if err != nil {
t.Fatalf("error creating state db file: %v", err)
}
db, err := bolt.Open(tmp.Name(), 0600, nil)
if err != nil {
t.Fatalf("error creating state db: %v", err)
}
upd := &MockTaskStateUpdater{}
task := alloc.Job.TaskGroups[0].Tasks[0]
// Initialize the port listing. This should be done by the offer process but
@@ -106,7 +117,7 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat
vclient := vaultclient.NewMockVaultClient()
cclient := newMockConsulServiceClient()
tr := NewTaskRunner(logger, conf, upd.Update, taskDir, alloc, task, vclient, cclient)
tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, cclient)
if !restarts {
tr.restartTracker = noRestartsTracker()
}
@@ -366,8 +377,8 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
}
// Create a new task runner
task2 := &structs.Task{Name: ctx.tr.task.Name, Driver: ctx.tr.task.Driver}
tr2 := NewTaskRunner(ctx.tr.logger, ctx.tr.config, ctx.upd.Update,
task2 := &structs.Task{Name: ctx.tr.task.Name, Driver: ctx.tr.task.Driver, Vault: ctx.tr.task.Vault}
tr2 := NewTaskRunner(ctx.tr.logger, ctx.tr.config, ctx.tr.stateDB, ctx.upd.Update,
ctx.tr.taskDir, ctx.tr.alloc, task2, ctx.tr.vaultClient, ctx.tr.consul)
tr2.restartTracker = noRestartsTracker()
if _, err := tr2.RestoreState(); err != nil {

View File

@@ -5,8 +5,6 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -76,40 +74,12 @@ func shuffleStrings(list []string) {
}
}
// persistState is used to help with saving state
func persistState(path string, data interface{}) error {
buf, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to encode state: %v", err)
}
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return fmt.Errorf("failed to make dirs for %s: %v", path, err)
}
tmpPath := path + ".tmp"
if err := ioutil.WriteFile(tmpPath, buf, 0600); err != nil {
return fmt.Errorf("failed to save state to tmp: %v", err)
}
if err := os.Rename(tmpPath, path); err != nil {
return fmt.Errorf("failed to rename tmp to path: %v", err)
}
// Sanity check since users have reported empty state files on disk
if stat, err := os.Stat(path); err != nil {
return fmt.Errorf("unable to stat state file %s: %v", path, err)
} else if stat.Size() == 0 {
return fmt.Errorf("persisted invalid state file %s; see https://github.com/hashicorp/nomad/issues/1367", path)
}
return nil
}
// restoreState is used to read back in the persisted state
func restoreState(path string, data interface{}) error {
// pre060RestoreState is used to read back in the persisted state for pre v0.6.0
// state
func pre060RestoreState(path string, data interface{}) error {
buf, err := ioutil.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("failed to read state: %v", err)
return err
}
if err := json.Unmarshal(buf, data); err != nil {
return fmt.Errorf("failed to decode state: %v", err)

View File

@@ -1,9 +1,6 @@
package client
import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"testing"
@@ -75,42 +72,3 @@ func TestShuffleStrings(t *testing.T) {
t.Fatalf("shuffle failed")
}
}
func TestPersistRestoreState(t *testing.T) {
t.Parallel()
dir, err := ioutil.TempDir("", "nomad")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(dir)
// Use a state path inside a non-existent directory. This
// verifies that the directory is created properly.
statePath := filepath.Join(dir, "subdir", "test-persist")
type stateTest struct {
Foo int
Bar string
Baz bool
}
state := stateTest{
Foo: 42,
Bar: "the quick brown fox",
Baz: true,
}
err = persistState(statePath, &state)
if err != nil {
t.Fatalf("err: %v", err)
}
var out stateTest
err = restoreState(statePath, &out)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(state, out) {
t.Fatalf("bad: %#v %#v", state, out)
}
}

View File

@@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/boltdb/bolt"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/client"
@@ -75,6 +76,15 @@ func TestConsul_Integration(t *testing.T) {
}
defer os.RemoveAll(conf.AllocDir)
tmp, err := ioutil.TempFile("", "state-db")
if err != nil {
t.Fatalf("error creating state db file: %v", err)
}
db, err := bolt.Open(tmp.Name(), 0600, nil)
if err != nil {
t.Fatalf("error creating state db: %v", err)
}
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
@@ -135,7 +145,7 @@ func TestConsul_Integration(t *testing.T) {
serviceClient.Run()
close(consulRan)
}()
tr := client.NewTaskRunner(logger, conf, logUpdate, taskDir, alloc, task, vclient, serviceClient)
tr := client.NewTaskRunner(logger, conf, db, logUpdate, taskDir, alloc, task, vclient, serviceClient)
tr.MarkReceived()
go tr.Run()
defer func() {

View File

@@ -21,6 +21,7 @@ import (
"github.com/docker/docker/pkg/ioutils"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hpcloud/tail/watch"
"github.com/ugorji/go/codec"
)
@@ -290,7 +291,7 @@ func NewStreamFramer(out io.WriteCloser, plainTxt bool,
heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer {
// Create a JSON encoder
enc := codec.NewEncoder(out, jsonHandle)
enc := codec.NewEncoder(out, structs.JsonHandle)
// Create the heartbeat and flush ticker
heartbeat := time.NewTicker(heartbeatRate)

View File

@@ -19,6 +19,7 @@ import (
"time"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/ugorji/go/codec"
)
@@ -123,7 +124,7 @@ func TestStreamFramer_Flush(t *testing.T) {
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, jsonHandle)
dec := codec.NewDecoder(r, structs.JsonHandle)
f := "foo"
fe := "bar"
@@ -191,7 +192,7 @@ func TestStreamFramer_Batch(t *testing.T) {
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, jsonHandle)
dec := codec.NewDecoder(r, structs.JsonHandle)
f := "foo"
fe := "bar"
@@ -268,7 +269,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, jsonHandle)
dec := codec.NewDecoder(r, structs.JsonHandle)
// Start the reader
resultCh := make(chan struct{})
@@ -320,7 +321,7 @@ func TestStreamFramer_Order(t *testing.T) {
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, jsonHandle)
dec := codec.NewDecoder(r, structs.JsonHandle)
files := []string{"1", "2", "3", "4", "5"}
input := bytes.NewBuffer(make([]byte, 0, 100000))
@@ -592,7 +593,7 @@ func TestHTTP_Stream_Modify(t *testing.T) {
r, w := io.Pipe()
defer r.Close()
defer w.Close()
dec := codec.NewDecoder(r, jsonHandle)
dec := codec.NewDecoder(r, structs.JsonHandle)
data := []byte("helloworld")
@@ -668,7 +669,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) {
r, w := io.Pipe()
defer r.Close()
defer w.Close()
dec := codec.NewDecoder(r, jsonHandle)
dec := codec.NewDecoder(r, structs.JsonHandle)
data := []byte("helloworld")
@@ -778,7 +779,7 @@ func TestHTTP_Stream_Delete(t *testing.T) {
wrappedW := &WriteCloseChecker{WriteCloser: w}
defer r.Close()
defer w.Close()
dec := codec.NewDecoder(r, jsonHandle)
dec := codec.NewDecoder(r, structs.JsonHandle)
data := []byte("helloworld")
@@ -869,7 +870,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) {
wrappedW := &WriteCloseChecker{WriteCloser: w}
defer r.Close()
defer w.Close()
dec := codec.NewDecoder(r, jsonHandle)
dec := codec.NewDecoder(r, structs.JsonHandle)
var received []byte
@@ -955,7 +956,7 @@ func TestHTTP_Logs_Follow(t *testing.T) {
wrappedW := &WriteCloseChecker{WriteCloser: w}
defer r.Close()
defer w.Close()
dec := codec.NewDecoder(r, jsonHandle)
dec := codec.NewDecoder(r, structs.JsonHandle)
var received []byte
@@ -1071,7 +1072,7 @@ func BenchmarkHTTP_Logs_Follow(t *testing.B) {
wrappedW := &WriteCloseChecker{WriteCloser: w}
defer r.Close()
defer w.Close()
dec := codec.NewDecoder(r, jsonHandle)
dec := codec.NewDecoder(r, structs.JsonHandle)
var received []byte

View File

@@ -29,18 +29,6 @@ const (
scadaHTTPAddr = "SCADA"
)
var (
// jsonHandle and jsonHandlePretty are the codec handles to JSON encode
// structs. The pretty handle will add indents for easier human consumption.
jsonHandle = &codec.JsonHandle{
HTMLCharsAsIs: true,
}
jsonHandlePretty = &codec.JsonHandle{
HTMLCharsAsIs: true,
Indent: 4,
}
)
// HTTPServer is used to wrap an Agent and expose it over an HTTP interface
type HTTPServer struct {
agent *Agent
@@ -186,6 +174,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
s.mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
s.mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}
}
@@ -248,13 +237,13 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
if obj != nil {
var buf bytes.Buffer
if prettyPrint {
enc := codec.NewEncoder(&buf, jsonHandlePretty)
enc := codec.NewEncoder(&buf, structs.JsonHandlePretty)
err = enc.Encode(obj)
if err == nil {
buf.Write([]byte("\n"))
}
} else {
enc := codec.NewEncoder(&buf, jsonHandle)
enc := codec.NewEncoder(&buf, structs.JsonHandle)
err = enc.Encode(obj)
}
if err != nil {

Binary file not shown.

View File

@@ -4231,6 +4231,18 @@ var MsgpackHandle = func() *codec.MsgpackHandle {
return h
}()
var (
// JsonHandle and JsonHandlePretty are the codec handles to JSON encode
// structs. The pretty handle will add indents for easier human consumption.
JsonHandle = &codec.JsonHandle{
HTMLCharsAsIs: true,
}
JsonHandlePretty = &codec.JsonHandle{
HTMLCharsAsIs: true,
Indent: 4,
}
)
var HashiMsgpackHandle = func() *hcodec.MsgpackHandle {
h := &hcodec.MsgpackHandle{RawToString: true}