From 784706a1e51c7d8eff558b91db645ea02c1cbf92 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 6 Dec 2018 17:24:43 -0800 Subject: [PATCH] client/state: support upgrading from 0.8->0.9 Also persist and load DeploymentStatus to avoid rechecking health after client restarts. --- .gitignore | 3 + client/allocrunner/alloc_runner.go | 25 ++ client/allocrunner/alloc_runner_hooks.go | 9 + client/allocrunner/health_hook.go | 5 +- .../allocrunner/taskrunner/artifact_hook.go | 2 + .../allocrunner/taskrunner/dispatch_hook.go | 2 + .../allocrunner/taskrunner/task_dir_hook.go | 2 + client/client.go | 13 +- client/state/08types.go | 78 +++++ client/state/db_test.go | 13 +- client/state/interface.go | 10 + client/state/memdb.go | 21 ++ client/state/noopdb.go | 12 + client/state/state_database.go | 214 +++++++++++-- client/state/testdata/state-0.7.1.db.gz | Bin 0 -> 6163 bytes client/state/testdata/state-0.8.6-empty.db.gz | Bin 0 -> 153 bytes .../testdata/state-0.8.6-no-deploy.db.gz | Bin 0 -> 2802 bytes client/state/upgrade.go | 299 ++++++++++++++++++ client/state/upgrade_int_test.go | 159 ++++++++++ client/state/upgrade_test.go | 109 +++++++ 20 files changed, 950 insertions(+), 26 deletions(-) create mode 100644 client/state/08types.go create mode 100644 client/state/testdata/state-0.7.1.db.gz create mode 100644 client/state/testdata/state-0.8.6-empty.db.gz create mode 100644 client/state/testdata/state-0.8.6-no-deploy.db.gz create mode 100644 client/state/upgrade.go create mode 100644 client/state/upgrade_int_test.go create mode 100644 client/state/upgrade_test.go diff --git a/.gitignore b/.gitignore index c5cdb872a..cd2712dcc 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,9 @@ website/npm-debug.log # Test file exit-code +# Don't commit uncompressed state test files +client/state/testdata/*.db + ui/.sass-cache ui/static/base.css diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7cc2802d8..1bd8f3020 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -334,6 +334,18 @@ func (ar *allocRunner) GetAllocDir() *allocdir.AllocDir { // Restore state from database. Must be called after NewAllocRunner but before // Run. func (ar *allocRunner) Restore() error { + // Retrieve deployment status to avoid reseting it across agent + // restarts. Once a deployment status is set Nomad no longer monitors + // alloc health, so we must persist deployment state across restarts. + ds, err := ar.stateDB.GetDeploymentStatus(ar.id) + if err != nil { + return err + } + + ar.stateLock.Lock() + ar.state.DeploymentStatus = ds + ar.stateLock.Unlock() + // Restore task runners for _, tr := range ar.tasks { if err := tr.Restore(); err != nil { @@ -344,6 +356,19 @@ func (ar *allocRunner) Restore() error { return nil } +// persistDeploymentStatus stores AllocDeploymentStatus. +func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) { + if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil { + // While any persistence errors are very bad, the worst case + // scenario for failing to persist deployment status is that if + // the agent is restarted it will monitor the deployment status + // again. This could cause a deployment's status to change when + // that shouldn't happen. However, allowing that seems better + // than failing the entire allocation. + ar.logger.Error("error storing deployment status", "error", err) + } +} + // TaskStateUpdated is called by TaskRunner when a task's state has been // updated. It does not process the update synchronously but instead notifies a // goroutine the state has change. Since processing the state change may cause diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 9119663e3..85587a658 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -16,6 +16,13 @@ type allocHealthSetter struct { ar *allocRunner } +// HasHealth returns true if a deployment status is already set. +func (a *allocHealthSetter) HasHealth() bool { + a.ar.stateLock.Lock() + defer a.ar.stateLock.Unlock() + return a.ar.state.DeploymentStatus.HasHealth() +} + // ClearHealth allows the health watcher hook to clear the alloc's deployment // health if the deployment id changes. It does not update the server as the // status is only cleared when already receiving an update from the server. @@ -24,6 +31,7 @@ type allocHealthSetter struct { func (a *allocHealthSetter) ClearHealth() { a.ar.stateLock.Lock() a.ar.state.ClearDeploymentStatus() + a.ar.persistDeploymentStatus(nil) a.ar.stateLock.Unlock() } @@ -37,6 +45,7 @@ func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents // ModifyIndex as they're only mutated by the server. a.ar.stateLock.Lock() a.ar.state.SetDeploymentStatus(time.Now(), healthy) + a.ar.persistDeploymentStatus(a.ar.state.DeploymentStatus) a.ar.stateLock.Unlock() // If deployment is unhealthy emit task events explaining why diff --git a/client/allocrunner/health_hook.go b/client/allocrunner/health_hook.go index c7b26a717..698548d4e 100644 --- a/client/allocrunner/health_hook.go +++ b/client/allocrunner/health_hook.go @@ -16,6 +16,9 @@ import ( // healthMutator is able to set/clear alloc health. type healthSetter interface { + // HasHealth returns true if health is already set. + HasHealth() bool + // Set health via the mutator SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent) @@ -100,7 +103,7 @@ func (h *allocHealthWatcherHook) Name() string { // Not threadsafe so the caller should lock since Updates occur concurrently. func (h *allocHealthWatcherHook) init() error { // No need to watch health as it's already set - if h.alloc.DeploymentStatus.HasHealth() { + if h.healthSetter.HasHealth() { return nil } diff --git a/client/allocrunner/taskrunner/artifact_hook.go b/client/allocrunner/taskrunner/artifact_hook.go index cf0e81c62..7bed440ab 100644 --- a/client/allocrunner/taskrunner/artifact_hook.go +++ b/client/allocrunner/taskrunner/artifact_hook.go @@ -26,6 +26,8 @@ func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook { } func (*artifactHook) Name() string { + // Copied in client/state when upgrading from <0.9 schemas, so if you + // change it here you also must change it there. return "artifacts" } diff --git a/client/allocrunner/taskrunner/dispatch_hook.go b/client/allocrunner/taskrunner/dispatch_hook.go index 25a8f09f2..2564f8046 100644 --- a/client/allocrunner/taskrunner/dispatch_hook.go +++ b/client/allocrunner/taskrunner/dispatch_hook.go @@ -28,6 +28,8 @@ func newDispatchHook(alloc *structs.Allocation, logger hclog.Logger) *dispatchHo } func (*dispatchHook) Name() string { + // Copied in client/state when upgrading from <0.9 schemas, so if you + // change it here you also must change it there. return "dispatch_payload" } diff --git a/client/allocrunner/taskrunner/task_dir_hook.go b/client/allocrunner/taskrunner/task_dir_hook.go index 6a09d1e38..42c5ecc25 100644 --- a/client/allocrunner/taskrunner/task_dir_hook.go +++ b/client/allocrunner/taskrunner/task_dir_hook.go @@ -27,6 +27,8 @@ func newTaskDirHook(runner *TaskRunner, logger log.Logger) *taskDirHook { } func (h *taskDirHook) Name() string { + // Copied in client/state when upgrading from <0.9 schemas, so if you + // change it here you also must change it there. return "task_dir" } diff --git a/client/client.go b/client/client.go index 1e5552c75..c8a347632 100644 --- a/client/client.go +++ b/client/client.go @@ -462,10 +462,21 @@ func (c *Client) init() error { c.logger.Info("using state directory", "state_dir", c.config.StateDir) // Open the state database - db, err := state.GetStateDBFactory(c.config.DevMode)(c.config.StateDir) + db, err := state.GetStateDBFactory(c.config.DevMode)(c.logger, c.config.StateDir) if err != nil { return fmt.Errorf("failed to open state database: %v", err) } + + // Upgrade the state database + if err := db.Upgrade(); err != nil { + // Upgrade only returns an error on critical persistence + // failures in which an operator should intervene before the + // node is accessible. Upgrade drops and logs corrupt state it + // encounters, so failing to start the agent should be extremely + // rare. + return fmt.Errorf("failed to upgrade state database: %v", err) + } + c.stateDB = db // Ensure the alloc dir exists if we have one diff --git a/client/state/08types.go b/client/state/08types.go new file mode 100644 index 000000000..758e7ef6e --- /dev/null +++ b/client/state/08types.go @@ -0,0 +1,78 @@ +package state + +import ( + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" +) + +// allocRunnerMutableState08 is state that had to be written on each save as it +// changed over the life-cycle of the alloc_runner in Nomad 0.8. +// +// https://github.com/hashicorp/nomad/blob/v0.8.6/client/alloc_runner.go#L146-L153 +// +type allocRunnerMutableState08 struct { + // AllocClientStatus does not need to be upgraded as it is computed + // from task states. + AllocClientStatus string + + // AllocClientDescription does not need to be upgraded as it is computed + // from task states. + AllocClientDescription string + + TaskStates map[string]*structs.TaskState + DeploymentStatus *structs.AllocDeploymentStatus +} + +// taskRunnerState08 was used to snapshot the state of the task runner in Nomad +// 0.8. +// +// https://github.com/hashicorp/nomad/blob/v0.8.6/client/task_runner.go#L188-L197 +// +type taskRunnerState08 struct { + Version string + HandleID string + ArtifactDownloaded bool + TaskDirBuilt bool + PayloadRendered bool + DriverNetwork *cstructs.DriverNetwork + // Created Resources are no longer used. + //CreatedResources *driver.CreatedResources +} + +func (t *taskRunnerState08) Upgrade() *state.LocalState { + ls := state.NewLocalState() + + // Reuse DriverNetwork + ls.DriverNetwork = t.DriverNetwork + + // Upgrade artifact state + ls.Hooks["artifacts"] = &state.HookState{ + PrestartDone: t.ArtifactDownloaded, + } + + // Upgrade task dir state + ls.Hooks["task_dir"] = &state.HookState{ + PrestartDone: t.TaskDirBuilt, + } + + // Upgrade dispatch payload state + ls.Hooks["dispatch_payload"] = &state.HookState{ + PrestartDone: t.PayloadRendered, + } + + //TODO How to convert handles?! This does not work. + ls.TaskHandle = drivers.NewTaskHandle("TODO") + + //TODO where do we get this from? + ls.TaskHandle.Config = nil + + //TODO do we need to se this accurately? Or will RecoverTask handle it? + ls.TaskHandle.State = drivers.TaskStateUnknown + + //TODO do we need an envelope so drivers know this is an old state? + ls.TaskHandle.SetDriverState(t.HandleID) + + return ls +} diff --git a/client/state/db_test.go b/client/state/db_test.go index c28078657..68005a21a 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -9,6 +9,7 @@ import ( trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" @@ -19,7 +20,7 @@ func setupBoltDB(t *testing.T) (*BoltStateDB, func()) { dir, err := ioutil.TempDir("", "nomadtest") require.NoError(t, err) - db, err := NewBoltStateDB(dir) + db, err := NewBoltStateDB(testlog.HCLogger(t), dir) if err != nil { if err := os.RemoveAll(dir); err != nil { t.Logf("error removing boltdb dir: %v", err) @@ -242,3 +243,13 @@ func TestStateDB_DriverManager(t *testing.T) { require.Equal(state, ps) }) } + +// TestStateDB_Upgrade asserts calling Upgrade on new databases always +// succeeds. +func TestStateDB_Upgrade(t *testing.T) { + t.Parallel() + + testDB(t, func(t *testing.T, db StateDB) { + require.NoError(t, db.Upgrade()) + }) +} diff --git a/client/state/interface.go b/client/state/interface.go index 7f2cac7a9..2624b46ea 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -12,6 +12,11 @@ type StateDB interface { // Name of implementation. Name() string + // Upgrade ensures the layout of the database is at the latest version + // or returns an error. Corrupt data will be dropped when possible. + // Errors should be considered critical and unrecoverable. + Upgrade() error + // GetAllAllocations returns all valid allocations and a map of // allocation IDs to retrieval errors. // @@ -22,6 +27,11 @@ type StateDB interface { // not be stored. PutAllocation(*structs.Allocation) error + // Get/Put DeploymentStatus get and put the allocation's deployment + // status. It may be nil. + GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) + PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error + // GetTaskRunnerState returns the LocalState and TaskState for a // TaskRunner. Either state may be nil if it is not found, but if an // error is encountered only the error will be non-nil. diff --git a/client/state/memdb.go b/client/state/memdb.go index 0e01c01dc..f0a8d4243 100644 --- a/client/state/memdb.go +++ b/client/state/memdb.go @@ -15,6 +15,9 @@ type MemDB struct { // alloc_id -> value allocs map[string]*structs.Allocation + // alloc_id -> value + deployStatus map[string]*structs.AllocDeploymentStatus + // alloc_id -> task_name -> value localTaskState map[string]map[string]*state.LocalState taskState map[string]map[string]*structs.TaskState @@ -31,6 +34,7 @@ type MemDB struct { func NewMemDB() *MemDB { return &MemDB{ allocs: make(map[string]*structs.Allocation), + deployStatus: make(map[string]*structs.AllocDeploymentStatus), localTaskState: make(map[string]map[string]*state.LocalState), taskState: make(map[string]map[string]*structs.TaskState), } @@ -40,6 +44,10 @@ func (m *MemDB) Name() string { return "memdb" } +func (m *MemDB) Upgrade() error { + return nil +} + func (m *MemDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { m.mu.RLock() defer m.mu.RUnlock() @@ -59,6 +67,19 @@ func (m *MemDB) PutAllocation(alloc *structs.Allocation) error { return nil } +func (m *MemDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.deployStatus[allocID], nil +} + +func (m *MemDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error { + m.mu.Lock() + m.deployStatus[allocID] = ds + defer m.mu.Unlock() + return nil +} + func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/client/state/noopdb.go b/client/state/noopdb.go index cb03a55f5..53364ecba 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -14,6 +14,10 @@ func (n NoopDB) Name() string { return "noopdb" } +func (n NoopDB) Upgrade() error { + return nil +} + func (n NoopDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { return nil, nil, nil } @@ -22,6 +26,14 @@ func (n NoopDB) PutAllocation(*structs.Allocation) error { return nil } +func (n NoopDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) { + return nil, nil +} + +func (n NoopDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error { + return nil +} + func (n NoopDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { return nil, nil, nil } diff --git a/client/state/state_database.go b/client/state/state_database.go index c56722f66..9686a5e5f 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -2,8 +2,11 @@ package state import ( "fmt" + "os" "path/filepath" + "time" + hclog "github.com/hashicorp/go-hclog" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" @@ -14,9 +17,13 @@ import ( /* The client has a boltDB backed state store. The schema as of 0.9 looks as follows: +meta/ +|--> version -> "1" +|--> upgraded -> time.Now().Format(timeRFC3339) allocations/ |--> / - |--> alloc -> allocEntry{*structs.Allocation} + |--> alloc -> allocEntry{*structs.Allocation} + |--> deploy_status -> deployStatusEntry{*structs.AllocDeploymentStatus} |--> task-/ |--> local_state -> *trstate.LocalState # Local-only state |--> task_state -> *structs.TaskState # Sync'd to servers @@ -29,6 +36,21 @@ drivermanager/ */ var ( + // metaBucketName is the name of the metadata bucket + metaBucketName = []byte("meta") + + // metaVersionKey is the key the state schema version is stored under. + metaVersionKey = []byte("version") + + // metaVersion is the value of the state schema version to detect when + // an upgrade is needed. It skips the usual boltdd/msgpack backend to + // be as portable and futureproof as possible. + metaVersion = "1" + + // metaUpgradedKey is the key that stores the timestamp of the last + // time the schema was upgraded. + metaUpgradedKey = []byte("upgraded") + // allocationsBucketName is the bucket name containing all allocation related // data allocationsBucketName = []byte("allocations") @@ -37,6 +59,10 @@ var ( // allocEntry structs. allocKey = []byte("alloc") + // allocDeployStatusKey is the key *structs.AllocDeploymentStatus is + // stored under. + allocDeployStatusKey = []byte("deploy_status") + // allocations -> $allocid -> task-$taskname -> the keys below taskLocalStateKey = []byte("local_state") taskStateKey = []byte("task_state") @@ -60,13 +86,13 @@ func taskBucketName(taskName string) []byte { } // NewStateDBFunc creates a StateDB given a state directory. -type NewStateDBFunc func(stateDir string) (StateDB, error) +type NewStateDBFunc func(logger hclog.Logger, stateDir string) (StateDB, error) // GetStateDBFactory returns a func for creating a StateDB func GetStateDBFactory(devMode bool) NewStateDBFunc { // Return a noop state db implementation when in debug mode if devMode { - return func(string) (StateDB, error) { + return func(hclog.Logger, string) (StateDB, error) { return NoopDB{}, nil } } @@ -77,21 +103,42 @@ func GetStateDBFactory(devMode bool) NewStateDBFunc { // BoltStateDB persists and restores Nomad client state in a boltdb. All // methods are safe for concurrent access. type BoltStateDB struct { - db *boltdd.DB + stateDir string + db *boltdd.DB + logger hclog.Logger } // NewBoltStateDB creates or opens an existing boltdb state file or returns an // error. -func NewBoltStateDB(stateDir string) (StateDB, error) { +func NewBoltStateDB(logger hclog.Logger, stateDir string) (StateDB, error) { + fn := filepath.Join(stateDir, "state.db") + + // Check to see if the DB already exists + fi, err := os.Stat(fn) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + firstRun := fi == nil + // Create or open the boltdb state database - db, err := boltdd.Open(filepath.Join(stateDir, "state.db"), 0600, nil) + db, err := boltdd.Open(fn, 0600, nil) if err != nil { return nil, fmt.Errorf("failed to create state database: %v", err) } sdb := &BoltStateDB{ - db: db, + stateDir: stateDir, + db: db, + logger: logger, } + + // If db did not already exist, initialize metadata fields + if firstRun { + if err := sdb.init(); err != nil { + return nil, err + } + } + return sdb, nil } @@ -182,6 +229,59 @@ func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { }) } +// deployStatusEntry wraps values for DeploymentStatus keys. +type deployStatusEntry struct { + DeploymentStatus *structs.AllocDeploymentStatus +} + +// PutDeploymentStatus stores an allocation's DeploymentStatus or returns an +// error. +func (s *BoltStateDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error { + return s.db.Update(func(tx *boltdd.Tx) error { + return putDeploymentStatusImpl(tx, allocID, ds) + }) +} + +func putDeploymentStatusImpl(tx *boltdd.Tx, allocID string, ds *structs.AllocDeploymentStatus) error { + allocBkt, err := getAllocationBucket(tx, allocID) + if err != nil { + return err + } + + entry := deployStatusEntry{ + DeploymentStatus: ds, + } + return allocBkt.Put(allocDeployStatusKey, &entry) +} + +// GetDeploymentStatus retrieves an allocation's DeploymentStatus or returns an +// error. +func (s *BoltStateDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) { + var entry deployStatusEntry + + err := s.db.View(func(tx *boltdd.Tx) error { + allAllocsBkt := tx.Bucket(allocationsBucketName) + if allAllocsBkt == nil { + // No state, return + return nil + } + + allocBkt := allAllocsBkt.Bucket([]byte(allocID)) + if allocBkt == nil { + // No state for alloc, return + return nil + } + + return allocBkt.Get(allocDeployStatusKey, &entry) + }) + + if err != nil { + return nil, err + } + + return entry.DeploymentStatus, nil +} + // GetTaskRunnerState returns the LocalState and TaskState for a // TaskRunner. LocalState or TaskState will be nil if they do not exist. // @@ -244,31 +344,43 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc // PutTaskRunnerLocalState stores TaskRunner's LocalState or returns an error. func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val *trstate.LocalState) error { return s.db.Update(func(tx *boltdd.Tx) error { - taskBkt, err := getTaskBucket(tx, allocID, taskName) - if err != nil { - return fmt.Errorf("failed to retrieve allocation bucket: %v", err) - } - - if err := taskBkt.Put(taskLocalStateKey, val); err != nil { - return fmt.Errorf("failed to write task_runner state: %v", err) - } - - return nil + return putTaskRunnerLocalStateImpl(tx, allocID, taskName, val) }) } +// putTaskRunnerLocalStateImpl stores TaskRunner's LocalState in an ongoing +// transaction or returns an error. +func putTaskRunnerLocalStateImpl(tx *boltdd.Tx, allocID, taskName string, val *trstate.LocalState) error { + taskBkt, err := getTaskBucket(tx, allocID, taskName) + if err != nil { + return fmt.Errorf("failed to retrieve allocation bucket: %v", err) + } + + if err := taskBkt.Put(taskLocalStateKey, val); err != nil { + return fmt.Errorf("failed to write task_runner state: %v", err) + } + + return nil +} + // PutTaskState stores a task's state or returns an error. func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.TaskState) error { return s.db.Update(func(tx *boltdd.Tx) error { - taskBkt, err := getTaskBucket(tx, allocID, taskName) - if err != nil { - return fmt.Errorf("failed to retrieve allocation bucket: %v", err) - } - - return taskBkt.Put(taskStateKey, state) + return putTaskStateImpl(tx, allocID, taskName, state) }) } +// putTaskStateImpl stores a task's state in an ongoing transaction or returns +// an error. +func putTaskStateImpl(tx *boltdd.Tx, allocID, taskName string, state *structs.TaskState) error { + taskBkt, err := getTaskBucket(tx, allocID, taskName) + if err != nil { + return fmt.Errorf("failed to retrieve allocation bucket: %v", err) + } + + return taskBkt.Put(taskStateKey, state) +} + // DeleteTaskBucket is used to delete a task bucket if it exists. func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error { return s.db.Update(func(tx *boltdd.Tx) error { @@ -469,3 +581,59 @@ func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) { return ps, nil } + +// init initializes metadata entries in a newly created state database. +func (s *BoltStateDB) init() error { + return s.db.Update(func(tx *boltdd.Tx) error { + return addMeta(tx.BoltTx()) + }) +} + +// Upgrade bolt state db from 0.8 schema to 0.9 schema. Noop if already using +// 0.9 schema. Creates a backup before upgrading. +func (s *BoltStateDB) Upgrade() error { + // Check to see if the underlying DB needs upgrading. + upgrade, err := NeedsUpgrade(s.db.BoltDB()) + if err != nil { + return err + } + if !upgrade { + // No upgrade needed! + return nil + } + + // Upgraded needed. Backup the boltdb first. + backupFileName := filepath.Join(s.stateDir, "state.db.backup") + if err := backupDB(s.db.BoltDB(), backupFileName); err != nil { + return fmt.Errorf("error backing up state db: %v", err) + } + + // Perform the upgrade + if err := s.db.Update(func(tx *boltdd.Tx) error { + if err := UpgradeAllocs(s.logger, tx); err != nil { + return err + } + + // Add standard metadata + if err := addMeta(tx.BoltTx()); err != nil { + return err + } + + // Write the time the upgrade was done + bkt, err := tx.CreateBucketIfNotExists(metaBucketName) + if err != nil { + return err + } + return bkt.Put(metaUpgradedKey, time.Now().Format(time.RFC3339)) + }); err != nil { + return err + } + + s.logger.Info("successfully upgraded state") + return nil +} + +// DB allows access to the underlying BoltDB for testing purposes. +func (s *BoltStateDB) DB() *boltdd.DB { + return s.db +} diff --git a/client/state/testdata/state-0.7.1.db.gz b/client/state/testdata/state-0.7.1.db.gz new file mode 100644 index 0000000000000000000000000000000000000000..f319821546c9c58aee327329ab25bb06f93c772a GIT binary patch literal 6163 zcmch5XH-;6vo;0TIgk|gIKN*eNz zbIxfPavGuoGcc(yob!I)xoh2Z|K9qscdhQK-o2{2tDa}`hSE~WAeanJp4#{Uwc~2@ ztp8qYq4r1Fw>@k#jmjqm)jFUCg;pb=6h6t8C`eY?1I+4ehuORmS(n9lX!Qz8KBRQ+ zRpf%q!}A$-hV{XQ3#K!n8IM&Vkry>iCf-QDKAZDYv+Z5Ly;WqH8JJ_YrsvnOa>Mh; z#SccLiD(jhsVb#Pu#<6)?kfq7mN5$_f6~Kw6D#NZT{qf#J`(As?{CqFO>>Hk{2JN*6A*FYCLqZ|h>EhC& zX4-Fdt%7KKJY)wxZ4{~S3U8K(@3K5E&0&w0j7JEmc#4a@t%0mF!z0udV+;6AZK@6~ zy~b#;k}nFbZkJ3O27a@$T2ZHIPceC&sWdeDwX!0yXv9>95TOhyl8VFAv3RhaX)Vs! zV76-RSItC=+qLbqHmm9@qh@^7uVIH&7Y7z0@Vi?Fr$^I=zm}5ea0#O!PhcDdoRRzDOzhU2U zrz^SFycw`llUIJg>jt)DeK$JfZ+jqA={BYOd8?;cE=m}$W@YBH5+bDb$lRqCc73Dv z66kgaI#JVCY02>QVI$IM=>(thMk0Ru(;{1CoKe@#dGjo}#ieOdcm0gE&3L4A>m?nuW8 zq342^QNnv$+s~Y$eVS#>Wwq-l zRp0mvYO#SyGm9TdwzcF#Q}oj&7b)=@S-MI$tJM%*5NXTQFR8?zyXh@Kcl&R}NjGw5 zOPfbzGwi2}4?U{TPtoNP_UF7$ z?5O3}@wIUKB?YtoRLMKM7lmt_lEsEzvUBlYkkW9;?qeX3_ywgjgwcCJq`ba99LLL9 z9<#9qZU$~u{p&-&Xr)Fpwm+ThwAgSP5! zcFlx}LaUeG^38>Fpfw&QchWW(ysLtq<`A@9b> zM?CJ8;Hq}w+X_Ga{2s(a>++P{!db;=%jDcEb_;jN9zzUSepGZM(giL!{mMj0@}t3b zz3+@?<}8Y3A+r8A;T^P>Gc3mEK1~n2M2l8LkTN3KzH2j=RqPwPS+$ug4381Y_4g71zjnE;!anDP;HDOH(=K7It`9 zPL_?AT+sgMRq$OO7$=n@JxZzeVEsS|AdC0G)Mz***>fpm5J(gih>(Hl{(%Mjid`Hz z?t@WdApsm{*eV0`pm&13UIuWq9snv7V*fU5A`K9IyGFp&`czpqKtx?!1xKACnPF^n zAH3oJP)NggAG9^LoKTTDNrt(w;~aCUqj=D%A7AxZtyNNU@1yOc8w3gN4ExUXc?OJ0 z3Tw&9`L2yorDHvWg42D;v@_;mz7r9TigtINq@)Ga=Gs^==p;XZF~0ee`*_?UOpcMM zTzD!y!ahQ9>B^rEkl}IG)Y@I@71R4es#!Q3XY8Sx^ZVBs=x+MUFH@dB;PDXBl4XlR zJd8Dcq^^wZ)@yyszn8-8yuN@uSK_-Ekk>lzY!Ln^$(P_UuBdoVcCNrm5(e=`ZFUdT zjppk2Rj(QcM06~Ycn%{1DXH61uvKIW8(#kDox@M^-&(l8WL;3}5B8~Bt?n#sGt(re zGRp$qCrn!{VKjJjG|T~+KFzL)o7WO|8z#OeHy zyj(};dNEym5&c)LEA1AjUl(_sCk(^QWDnQw`JPm<7|WU4?>7?BJvS`(Q!imrFpiIQ zE#Gf{@zvzVkh?FUrml=TO|nV-LRZqs3C1!18cQLtNU;oCboS?HNyRdh7GD`Yi{v0N z5C3_4u&5kSWVq9<+?-aGl=Nr&vv=VPiQSMAeP?94Ps*WSWEvj(UOcm?i$S#|Y?Vhx zM_=fD?wc|%1+Csh#aW%13rjnKZ*Mo!sPOt&-Pk3k8oN6%W~y^y2FaW_8#Wscu~UD; z4DXi(3~=sLGSZS*)I(Px&qbFL#&Xb9vhsp(;$Iw6u~8u^=SPO!AR;SD$;Ql+uB*}`eOr#UjPTV)Lg4Y{0tPSkgNcXEl;&1YF3-`Da~jk%rl z+2+eLy7<7Wv%sTfer*<+gJ%=CF?a z0$b)t)D|#a3p(MuXyM2{+h&8hS&!c<3rK5yd8????SxWZgYStkh%NF%j|N@G7z%M1 zCFZ=j&G!6{OIZ1Dv-*R8>$GJks>Vr!Uo7<@Y(GNPA-@Y7wRXEqa}RbWvU~((RbU6gHfKRoWJTJ#cq7JDJo3EBjfy z<>hE!P&+dXK_svh9=(fkUIz{fY+{IlS{ym18dp__H;395O{wO&Ztkz#+2fkA%a=bO zEhyGerk{y9?|2?0_3nn(2R+> zZ4>g_W9pGcVBJb)&UOA6+u_r-L}wFa!veN#fH$uIXm`<4z#E*sprx;MP!UASYlXTW z_&FBd!Otxa9GjI01ISC8K&+ku`vnv?01Xi|q+HklNI!b4Go~nq44g4z{p`aLM^%j6 zHwD`juyEAH&<>&%PF3TR9Z%;LCh0->9&hqfSL2TkRl%h@No^ERG&lbN^Lhik5W#^; zRw9U?n*b3!LjtOdo{-R6&jthfVUOnpbV3EqmHM^+sw3cFCpN$vK%>L70dx^$*kRS! zCv4wSigQz%ST;HO!YC2DW@KNxG&xi|sk+`-G6R+??b&Q=(VaT(e*3-xqr^f?RaU(h zEVvMl_t`^ADRU9B;-{AKpFY-XAdKMRYr|2Cv>d;y1q3{(r!b(wDWc*yFnLZ>2N2BuhYMoHnmEvATo>7v!c%jY zl`VURd`MSl_8f_(Hr|#+vunSWFTu5@OsbySIIi8cffx+oIG||YzYQ9W*8F_xNF^W7 z9m!8^2U<8Afnsc*NL2@D{uw-e+QK=%_XDZ2RWSCE!tme5V-gzqx`D9`1zE92f~mfR zQjnKJ^FyvQo9RxG0BM>M`UCuZPj3R402ZKO?DX{{pe_%NFKR4cz=D4+y(pcH3^{NE zlsTRJ1^y`%$6u+RTv*;Yus+Jy&a~|GeIn2k{RNPyN^c^#j-MC@*zZWq+7D+NQy-Ft zLm=xKR-$&AFemHKs@=}*@GYuP!a2x#tKy93-sxAvieqlE^!Hr@iBPFYQ&S&3L$#2; zcMg+cI^V5maQmWg9h(5u*NC5`o9_>m;|wDL_=VCwKQIbYKgs)jAr^i^vr$;>$_7WcK5A({Ge#@b>_aQNi&ehJWwM+K#3{bj}fVI^EmJE;n-L5k2gBznB zTS}wm)O{ThK~#zQ*QwM6sa~iZM`Si{mi4%;(~x=p>F@thj>9+`vNF0eE(2hOtHuey znDg3)!~Z1q*z*^zLgc8V0-IuhB$VKhbT|+_4vfhMnG6jSSll>pw}L@;aWLj*ZJDJR zJ5KaW3Zk|rix`=(;&B=2{7gd^#dkd?fNlvSIg<>N?DBLy@>V3xglvMsAo87+Qy<8@ ze;+ccom{)K@IoA*PlfeRjP8@0{u9s36LmBEoLS0ZeZo z_JqRL`Q=kJQ2U#(*3xyYGGxrQB(EVflzEAknN!%uf|$w#;aUire!imQ65B^8yv-Q*BBHLJUpY$QZ?W zLHwk@iIcF;+xG|)X7`~d2f`L|ssuNO_<|lSr~J&?(OT9W zEfG*IEpjgFs-cKjL2CW@oi_|D!p7&aO1_meo4PPRJ=429aT5|?8F@I#b=g5EdgxY= zZi&|gbPsHMYN)|3>6k!qX6MRrto^PRSEuXWNGb?*yCTMB=dK?es;?mVz)`y9rl2mo zLxkHUXlW+a`C_MgHM4Hk$O?NEu@M4ouGmnGHgk!UvGkUT7JzH0&HD|Z_}Xuh*Hktr zTd`~65w96u{II2kp~RlJy{BC=fM+hK+jbdSzKZqSh5WRvL>+^RX9u~riBG6aIGr6y^Gmr`>Q5>Hi*~an&+_@~h}(%0 z)8ENO&H8R0q9i55f>RdBXWe5c_Vg_YmBw~Xz$KKZE8>7F zny4*nsm<;XY4=NGzEski)=3vBI@oJ|8LO!$7@6``D7uYPzZ3GmH)=bFV3upIuAEg} z63`p;xSA)fAo8}9sHWWZ4~XoIxx2aH>}spdmC^QIm3dXCJ?#lp_x~f0)!Mw=``%{i ziEfrd4%Wd#PhnebjN8C*o8mc@(?k_sopVp~t@b&7@^?iYh|IUB-W6H2XI^vj>o-kx zE_e8!qp2>f)8%Lf)t#1cGtCWP_h5)-kL6&0W12eN(U(P5in7(0yi4}ZSRPPy($;g@ zDC)xQ=#vP)f4vp;~Nu3rAFf?N5=TYW?adTcg~V9x5TbX_theat!oz+w~!e z26^7-?lLht>%!OFa}&`xeF2-7&vRsM6zAE-TMis&$O_;XBgFKPNVQ}b4jew`BZ3W3 z0{SROFh)Vf5GnJLA8}yf2x0(hN)@>Y2YQmPy`%8{<3%PM&sl-wW9R>=JUm`%`}D-! zaEfEem+d7y>={7LQ$Y4XvajGhfPs-=-elNf`1TfPfX!GjI*>*>cqoUPVotyj1Vr?K+*mH^ivUlcyuqHsm2|#HWATD5lCaMVQJ-MwBajC=H{Z^bW$C9c-A%1R; z&C4Lu|2rDwQpNzm7$9fJFia(ejNnBcZVkl2e$;}Rm)kTW(0pj6+y2b5%8wc^uz+h5 z)89t*G9>5V(}$t>uubARITk}sM{gVIZm-Md;nyejlU52b@)&$)sUpOApF)__pT3EE zhU;_SF@6eMo9NBmyrpC)3NjRFnIs?2KNXzZFhaAFeujl)Zv2F5mXIpei@ld` z7Q1LI{j=Eaz;X{J8b9DMtZl7D4|R=P()jVMw&uw?swyW`RWH-R>rT~^1S~J-fJKn! zOUsGt>Ul~EJ~;;G<1>tLf~tDBwgt46YD9}J#urUhzt|! q#+uaeF4LI!vM=k$VoZ$QjrAX6vI0s$!y@#Iz>3*{Y=DFiTmfwwgx+gxOLpVV4q=n;ar!={bWN(Q&cac8>W_Z& z*gX5L2mk;8z~v-=-v4pF{0!ae-PSe=0002j6Yu%TDgXcg;I^{U9{>OV0Jxt85OFoT H06+i$$bvw9 literal 0 HcmV?d00001 diff --git a/client/state/testdata/state-0.8.6-no-deploy.db.gz b/client/state/testdata/state-0.8.6-no-deploy.db.gz new file mode 100644 index 0000000000000000000000000000000000000000..917041eced42cacdf85702d31c36ad7c7786ec8c GIT binary patch literal 2802 zcmYjSc{CJ!7gkhADbZqoClQUMsIelE2* zknGDeg~@I(GuHWiz2|(s^WHz6d(U&9KkjzUm5Af!Ft$48u%B}=%ide`_L+7W)DlYf zTCvsRSAB9q0;|&Vjec^;>W=k12`7G`?WC?Qj^HZii`?C90^r_;B_CwD{&L(umCG*) zt6(`}!ed_*^53wh9Vn72DlC-bx1q?1^Zs15`0H|5z(^bnJuL0x3BmEa-%QcOg7AA9AI$daH9vAceWS*p=cvSmxZL&HjBo#NQD@|ock><;je zXZiWjffx00$KPgvS~I#|ig!=)R_n@OS>`esg7@p>nb!_~8V{;Xhb6tBA^sp1sUz^V(1%E~-&oIoJ345oXKCe@{IFM`}}j@86-Z?xLN4nKMAuOimru#=kVc(TbtnunR*Zk??7mFZc0ch1QGC0Qh_~Eu4j9U>&Zq|Q}1*AvvcxAGp+j6E`Kz!$wD4S$4`F< z#f-2Ga4`+%>3o%f0e7Fz3f_$B(D@-AF6SZgSvD&|hYT{5n zI+W+bbL;mimb|Z@yl+n9Exq(CxYUrCfqeM)SG!C@oL$6yY1e`PB4U{nyF=^!@kL2l zUt;L;1wZ`uEN`tb%QMw8&}wV-tY$l!@LWp2=er6#%3|WsQRGPPjn?j;zc*4YMAa2% zN|pEaZoMLO{M<+~(js&Q1osasWliylklaY-+if#g9wC`wzoI-OJ;md8Op)7c%sYW^ zniD|_X|%+{y7wO?sIhpbPZUEe>kagTZ%!8eb;r%Ze$6(UH`d1%g3vm|TmC z>u72W$``R=QF{|M&36KR`ap`YKDkM?bsdV1zax1|u*xB^C}pf!C?&IufyFtP!LuH#{ezDrGFDrkxI%rQ-=n{Gz<6iKw-g zge2hsi)Ys%mXV(4E<58=0u32ln#Q_T25RgQ0}IPP}ldw8*sjIlrg`P!#e$Ao168ww0dQh8w*)k3d_F{W@% z7B-+zliWXkan4k~2X5%DlXs$8cI1ctD6<=K243YapL`3|Too2o>t~4ThXVm8jDshu5ixU8Du&yhui@{EK|H$}8buCoqMp(Xdbgc4u4YIqEz5yhHhYhgm` z%lzM+M4$GFFQRMOpK;d`ysj+kRyYdzm9P3{z)>&UV5UDKH59m&Ww#Fw@zy6`+T)_y z+!YyCyJ`(v3*BjAaSp;iA#V=C*UT>6NX5&vmSo;jp6D@oka>P)ym)u%`MC>BE9WF6 zS=<78%4F_!NjJJm(Cg-S$>56ex5zfb#FKcgP`ty~K#Tm=vX`l^X;kfd8Lg(953|#I zux0Fi%OuTT9hK=1M6gD8PmUAhzpnRlxb=SQy+`)*U{3mur)zcb^_O_fW6mi~9_w<|Q5`JK+_N)YK!6q8Hge4VoHaPNIx+3@*~j^fp<2yAp7A8y8a+)R3Xpjhx4+wB^UPy$T$)EP=UM8Aab$C|1q?y&9n1v%Yn7L-g1D{OaY|E{vo=Vc&HA z{h?>ex{nEpizAb-pqodz@^d{Jx?zR32n1RtzlpPF(~}YUP*aPoW%v_k&YhEac7^3Fm~3~Cf|m{ zG>C;}zh5tgZ4C2&2o9buUmMv3Vk=P<=!^;eu9VcO88&ES0?NtBuv2XCMy3bT#LA((JpxGfL7PMF%X&cPq~iog`aH%2`BZ<1;9;re z`Efw88hW=a{c+I$5t|vJUft3nPpu&M!@943F8sWUGtit$+dsAR3@S*EJ{w<Y_yXwOy0Nm|HGh}^K9^S9{Bm9TkP!y!5jNQa6x_^=zQJ+G7i9x zCeJ$vD%eR3>|0P%3Rt_I$WmYy_H!3tozyMa^VMUBM*cet_EPC&jdFHXZ3xBG#;|3=Y@4B2Y(0rAC*^upPxx-cc%uX|oS`l1g=IxRL}Oby`#!5X zHIf3VKmHjdKagPS>|i&*-faIx=&;ye(0&7`f~2+o<=Z#0c+f7wwFjHnhHD@H;XoK3 zPzLkHj%52{LT=mSWS!3Rx7)whylsv*tCT-TW}hc*pXc`ez@VY5$*TVPrO4lVibrU@ zucUT12(1OAEHJ^b4Jv@dKR`kQ&{9VE6Z1z%aZxr_c0w`b09&TvIMK!iz6*doLfKpG4*bP8uqcMty&v4GQnEzdU4X>~dlG?(l?_(hT1wro6!Bzep!9AMA) zu@$*L>ro9(L#ZTFEzXCI9&AMlO-e{*}vfBQ%EZGdCTOc%+)v5(`wg*U~* literal 0 HcmV?d00001 diff --git a/client/state/upgrade.go b/client/state/upgrade.go new file mode 100644 index 000000000..53dc48017 --- /dev/null +++ b/client/state/upgrade.go @@ -0,0 +1,299 @@ +package state + +import ( + "bytes" + "fmt" + "os" + + "github.com/boltdb/bolt" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/helper/boltdd" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" +) + +// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is +// already up to date. +func NeedsUpgrade(bdb *bolt.DB) (bool, error) { + needsUpgrade := true + err := bdb.View(func(tx *bolt.Tx) error { + b := tx.Bucket(metaBucketName) + if b == nil { + // No meta bucket; upgrade + return nil + } + + v := b.Get(metaVersionKey) + if len(v) == 0 { + // No version; upgrade + return nil + } + + if string(v) != metaVersion { + // Version exists but does not match. Abort. + return fmt.Errorf("incompatible state version. expected %q but found %q", + metaVersion, v) + } + + // Version matches! Assume migrated! + needsUpgrade = false + return nil + }) + + return needsUpgrade, err +} + +// addMeta adds version metadata to BoltDB to mark it as upgraded and +// should be run at the end of the upgrade transaction. +func addMeta(tx *bolt.Tx) error { + // Create the meta bucket if it doesn't exist + bkt, err := tx.CreateBucketIfNotExists(metaBucketName) + if err != nil { + return err + } + + return bkt.Put(metaVersionKey, []byte(metaVersion)) +} + +// backupDB backs up the existing state database prior to upgrade overwriting +// previous backups. +func backupDB(bdb *bolt.DB, dst string) error { + fd, err := os.Create(dst) + if err != nil { + return err + } + + return bdb.View(func(tx *bolt.Tx) error { + if _, err := tx.WriteTo(fd); err != nil { + fd.Close() + return err + } + + return fd.Close() + }) +} + +// UpgradeSchema upgrades the boltdb schema. Example 0.8 schema: +// +// * allocations +// * 15d83e8a-74a2-b4da-3f17-ed5c12895ea8 +// * echo +// - simple-all (342 bytes) +// - alloc (2827 bytes) +// - alloc-dir (166 bytes) +// - immutable (15 bytes) +// - mutable (1294 bytes) +// +func UpgradeAllocs(logger hclog.Logger, tx *boltdd.Tx) error { + btx := tx.BoltTx() + allocationsBucket := btx.Bucket(allocationsBucketName) + if allocationsBucket == nil { + // No state! + return nil + } + + // Gather alloc buckets and remove unexpected key/value pairs + allocBuckets := [][]byte{} + cur := allocationsBucket.Cursor() + for k, v := cur.First(); k != nil; k, v = cur.Next() { + if v != nil { + logger.Warn("deleting unexpected key in state db", + "key", string(k), "value_bytes", len(v), + ) + + if err := cur.Delete(); err != nil { + return err + } + continue + } + + allocBuckets = append(allocBuckets, k) + } + + for _, allocBucket := range allocBuckets { + allocID := string(allocBucket) + + bkt := allocationsBucket.Bucket(allocBucket) + if bkt == nil { + // This should never happen as we just read the bucket. + return fmt.Errorf("unexpected bucket missing %q", allocID) + } + + allocLogger := logger.With("alloc_id", allocID) + if err := upgradeAllocBucket(allocLogger, tx, bkt, allocID); err != nil { + // Log and drop invalid allocs + allocLogger.Error("dropping invalid allocation due to error while upgrading state", + "error", err, + ) + + // If we can't delete the bucket something is seriously + // wrong, fail hard. + if err := allocationsBucket.DeleteBucket(allocBucket); err != nil { + return fmt.Errorf("error deleting invalid allocation state: %v", err) + } + } + } + + return nil +} + +// upgradeAllocBucket upgrades an alloc bucket. +func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, allocID string) error { + allocFound := false + taskBuckets := [][]byte{} + cur := bkt.Cursor() + for k, v := cur.First(); k != nil; k, v = cur.Next() { + switch string(k) { + case "alloc": + // Alloc has not changed; leave it be + allocFound = true + case "alloc-dir": + // Delete alloc-dir entries as they're no longer + // needed. + cur.Delete() + case "immutable": + // Skip decoding immutable state. Nothing from it needs + // to be upgraded. + cur.Delete() + case "mutable": + // Decode and upgrade + if err := upgradeOldAllocMutable(tx, allocID, v); err != nil { + return err + } + cur.Delete() + default: + if v != nil { + logger.Warn("deleting unexpected state entry for allocation", + "key", string(k), "value_bytes", len(v), + ) + + if err := cur.Delete(); err != nil { + return err + } + + continue + } + + // Nested buckets are tasks + taskBuckets = append(taskBuckets, k) + } + } + + // If the alloc entry was not found, abandon this allocation as the + // state has been corrupted. + if !allocFound { + return fmt.Errorf("alloc entry not found") + } + + // Upgrade tasks + for _, taskBucket := range taskBuckets { + taskName := string(taskBucket) + taskLogger := logger.With("task_name", taskName) + + taskBkt := bkt.Bucket(taskBucket) + if taskBkt == nil { + // This should never happen as we just read the bucket. + return fmt.Errorf("unexpected bucket missing %q", taskName) + } + + oldState, err := upgradeTaskBucket(taskLogger, taskBkt) + if err != nil { + taskLogger.Warn("dropping invalid task due to error while upgrading state", + "error", err, + ) + + // Delete the invalid task bucket and treat failures + // here as unrecoverable errors. + if err := bkt.DeleteBucket(taskBucket); err != nil { + return fmt.Errorf("error deleting invalid task state for task %q: %v", + taskName, err, + ) + } + } + + // Convert 0.8 task state to 0.9 task state + localTaskState := oldState.Upgrade() + + // Insert the new task state + if err := putTaskRunnerLocalStateImpl(tx, allocID, taskName, localTaskState); err != nil { + return err + } + + // Delete the old task bucket + if err := bkt.DeleteBucket(taskBucket); err != nil { + return err + } + + taskLogger.Trace("upgraded", "from", oldState.Version) + } + + return nil +} + +// upgradeTaskBucket iterates over keys in a task bucket, deleting invalid keys +// and returning the 0.8 version of the state. +func upgradeTaskBucket(logger hclog.Logger, bkt *bolt.Bucket) (*taskRunnerState08, error) { + simpleFound := false + var trState taskRunnerState08 + + cur := bkt.Cursor() + for k, v := cur.First(); k != nil; k, v = cur.Next() { + if !bytes.Equal(k, []byte("simple-all")) { + if v == nil { + // Delete Bucket + logger.Warn("deleting unexpected task state bucket", + "bucket", string(k), + ) + + if err := bkt.DeleteBucket(k); err != nil { + return nil, err + } + } else { + // Delete entry + logger.Warn("deleting unexpected task state entry", + "key", string(k), "value_bytes", len(v), + ) + + if err := cur.Delete(); err != nil { + return nil, err + } + } + continue + } + + // Decode simple-all + simpleFound = true + if err := codec.NewDecoderBytes(v, structs.MsgpackHandle).Decode(&trState); err != nil { + return nil, fmt.Errorf("failed to decode task state: %v", err) + } + } + + if !simpleFound { + return nil, fmt.Errorf("task state entry not found") + } + + return &trState, nil +} + +// upgradeOldAllocMutable upgrades Nomad 0.8 alloc runner state. +func upgradeOldAllocMutable(tx *boltdd.Tx, allocID string, oldBytes []byte) error { + var oldMutable allocRunnerMutableState08 + err := codec.NewDecoderBytes(oldBytes, structs.MsgpackHandle).Decode(&oldMutable) + if err != nil { + return err + } + + // Upgrade Deployment Status + if err := putDeploymentStatusImpl(tx, allocID, oldMutable.DeploymentStatus); err != nil { + return err + } + + // Upgrade Task States + for taskName, taskState := range oldMutable.TaskStates { + if err := putTaskStateImpl(tx, allocID, taskName, taskState); err != nil { + return err + } + } + + return nil +} diff --git a/client/state/upgrade_int_test.go b/client/state/upgrade_int_test.go new file mode 100644 index 000000000..043b00be3 --- /dev/null +++ b/client/state/upgrade_int_test.go @@ -0,0 +1,159 @@ +package state_test + +import ( + "compress/gzip" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/hashicorp/nomad/client/allocrunner" + "github.com/hashicorp/nomad/client/allocwatcher" + clientconfig "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/devicemanager" + dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + . "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/helper/boltdd" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/shared" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestBoltStateDB_Upgrade_Ok asserts upgading an old state db does not error +// during upgrade and restore. +func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) { + t.Parallel() + + files, err := filepath.Glob("testdata/*.db*") + require.NoError(t, err) + + for _, fn := range files { + t.Run(fn, func(t *testing.T) { + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + defer os.RemoveAll(dir) + + var src io.ReadCloser + src, err = os.Open(fn) + require.NoError(t, err) + defer src.Close() + + // testdata may be gzip'd; decode on copy + if strings.HasSuffix(fn, ".gz") { + src, err = gzip.NewReader(src) + require.NoError(t, err) + } + + dst, err := os.Create(filepath.Join(dir, "state.db")) + require.NoError(t, err) + + // Copy test files before testing them for safety + _, err = io.Copy(dst, src) + require.NoError(t, err) + + require.NoError(t, src.Close()) + + dbI, err := NewBoltStateDB(testlog.HCLogger(t), dir) + require.NoError(t, err) + defer dbI.Close() + + db := dbI.(*BoltStateDB) + + // Simply opening old files should *not* alter them + require.NoError(t, db.DB().View(func(tx *boltdd.Tx) error { + b := tx.Bucket([]byte("meta")) + if b != nil { + return fmt.Errorf("meta bucket found but should not exist yet!") + } + return nil + })) + + needsUpgrade, err := NeedsUpgrade(db.DB().BoltDB()) + require.NoError(t, err) + require.True(t, needsUpgrade) + + // Attept the upgrade + require.NoError(t, db.Upgrade()) + + needsUpgrade, err = NeedsUpgrade(db.DB().BoltDB()) + require.NoError(t, err) + require.False(t, needsUpgrade) + + // Ensure Allocations can be restored and + // NewAR/AR.Restore do not error. + allocs, errs, err := db.GetAllAllocations() + require.NoError(t, err) + assert.Len(t, errs, 0) + + for _, alloc := range allocs { + checkUpgradedAlloc(t, dir, db, alloc) + } + + // Should be nil for all upgrades + ps, err := db.GetDevicePluginState() + require.NoError(t, err) + require.Nil(t, ps) + + ps = &dmstate.PluginState{ + ReattachConfigs: map[string]*shared.ReattachConfig{ + "test": &shared.ReattachConfig{Pid: 1}, + }, + } + require.NoError(t, db.PutDevicePluginState(ps)) + + require.NoError(t, db.Close()) + }) + } +} + +// checkUpgradedAlloc creates and restores an AllocRunner from an upgraded +// database. +// +// It does not call AR.Run as its intended to be used against a wide test +// corpus in testdata that may be expensive to run and require unavailable +// dependencies. +func checkUpgradedAlloc(t *testing.T, path string, db StateDB, alloc *structs.Allocation) { + _, err := db.GetDeploymentStatus(alloc.ID) + assert.NoError(t, err) + + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + for _, task := range tg.Tasks { + _, _, err := db.GetTaskRunnerState(alloc.ID, task.Name) + require.NoError(t, err) + } + + clientConf, cleanup := clientconfig.TestClientConfig(t) + + // Does *not* cleanup overridden StateDir below. That's left alone for + // the caller to cleanup. + defer cleanup() + + clientConf.StateDir = path + + conf := &allocrunner.Config{ + Alloc: alloc, + Logger: clientConf.Logger, + ClientConfig: clientConf, + StateDB: db, + Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), + Vault: vaultclient.NewMockVaultClient(), + StateUpdater: &allocrunner.MockStateUpdater{}, + PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, + PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + } + ar, err := allocrunner.NewAllocRunner(conf) + require.NoError(t, err) + + // AllocRunner.Restore should not error + require.NoError(t, ar.Restore()) +} diff --git a/client/state/upgrade_test.go b/client/state/upgrade_test.go new file mode 100644 index 000000000..52a2ac7d9 --- /dev/null +++ b/client/state/upgrade_test.go @@ -0,0 +1,109 @@ +package state + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/boltdb/bolt" + "github.com/hashicorp/nomad/helper/boltdd" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/stretchr/testify/require" +) + +// TestUpgrade_NeedsUpgrade_New asserts new state dbs do not need upgrading. +func TestUpgrade_NeedsUpgrade_New(t *testing.T) { + t.Parallel() + + db, cleanup := setupBoltDB(t) + defer cleanup() + + up, err := NeedsUpgrade(db.DB().BoltDB()) + require.NoError(t, err) + require.False(t, up) +} + +// TestUpgrade_NeedsUpgrade_Old asserts state dbs with just the alloctions +// bucket *do* need upgrading. +func TestUpgrade_NeedsUpgrade_Old(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + + defer os.RemoveAll(dir) + + db, err := bolt.Open(filepath.Join(dir, "state.db"), 0666, nil) + require.NoError(t, err) + defer db.Close() + + // Create the allocations bucket which exists in both the old and 0.9 + // schemas + require.NoError(t, db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucket(allocationsBucketName) + return err + })) + + up, err := NeedsUpgrade(db) + require.NoError(t, err) + require.True(t, up) + + // Adding meta should mark it as upgraded + require.NoError(t, db.Update(addMeta)) + + up, err = NeedsUpgrade(db) + require.NoError(t, err) + require.False(t, up) +} + +// TestUpgrade_DeleteInvalidAllocs asserts invalid allocations are deleted +// during state upgades instead of failing the entire agent. +func TestUpgrade_DeleteInvalidAllocs_NoAlloc(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + + defer os.RemoveAll(dir) + + db, err := boltdd.Open(filepath.Join(dir, "state.db"), 0666, nil) + require.NoError(t, err) + defer db.Close() + + allocID := []byte(uuid.Generate()) + + // Create an allocation bucket with no `alloc` key. This is an observed + // pre-0.9 state corruption that should result in the allocation being + // dropped while allowing the upgrade to continue. + require.NoError(t, db.Update(func(tx *boltdd.Tx) error { + parentBkt, err := tx.CreateBucket(allocationsBucketName) + if err != nil { + return err + } + + _, err = parentBkt.CreateBucket(allocID) + return err + })) + + // Perform the Upgrade + require.NoError(t, db.Update(func(tx *boltdd.Tx) error { + return UpgradeAllocs(testlog.HCLogger(t), tx) + })) + + // Assert invalid allocation bucket was removed + require.NoError(t, db.View(func(tx *boltdd.Tx) error { + parentBkt := tx.Bucket(allocationsBucketName) + if parentBkt == nil { + return fmt.Errorf("parent allocations bucket should not have been removed") + } + + if parentBkt.Bucket(allocID) != nil { + return fmt.Errorf("invalid alloc bucket should have been deleted") + } + + return nil + })) +}