diff --git a/.gitignore b/.gitignore index c5cdb872a..cd2712dcc 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,9 @@ website/npm-debug.log # Test file exit-code +# Don't commit uncompressed state test files +client/state/testdata/*.db + ui/.sass-cache ui/static/base.css diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7cc2802d8..1bd8f3020 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -334,6 +334,18 @@ func (ar *allocRunner) GetAllocDir() *allocdir.AllocDir { // Restore state from database. Must be called after NewAllocRunner but before // Run. func (ar *allocRunner) Restore() error { + // Retrieve deployment status to avoid reseting it across agent + // restarts. Once a deployment status is set Nomad no longer monitors + // alloc health, so we must persist deployment state across restarts. + ds, err := ar.stateDB.GetDeploymentStatus(ar.id) + if err != nil { + return err + } + + ar.stateLock.Lock() + ar.state.DeploymentStatus = ds + ar.stateLock.Unlock() + // Restore task runners for _, tr := range ar.tasks { if err := tr.Restore(); err != nil { @@ -344,6 +356,19 @@ func (ar *allocRunner) Restore() error { return nil } +// persistDeploymentStatus stores AllocDeploymentStatus. +func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) { + if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil { + // While any persistence errors are very bad, the worst case + // scenario for failing to persist deployment status is that if + // the agent is restarted it will monitor the deployment status + // again. This could cause a deployment's status to change when + // that shouldn't happen. However, allowing that seems better + // than failing the entire allocation. + ar.logger.Error("error storing deployment status", "error", err) + } +} + // TaskStateUpdated is called by TaskRunner when a task's state has been // updated. It does not process the update synchronously but instead notifies a // goroutine the state has change. Since processing the state change may cause diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 9119663e3..85587a658 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -16,6 +16,13 @@ type allocHealthSetter struct { ar *allocRunner } +// HasHealth returns true if a deployment status is already set. +func (a *allocHealthSetter) HasHealth() bool { + a.ar.stateLock.Lock() + defer a.ar.stateLock.Unlock() + return a.ar.state.DeploymentStatus.HasHealth() +} + // ClearHealth allows the health watcher hook to clear the alloc's deployment // health if the deployment id changes. It does not update the server as the // status is only cleared when already receiving an update from the server. @@ -24,6 +31,7 @@ type allocHealthSetter struct { func (a *allocHealthSetter) ClearHealth() { a.ar.stateLock.Lock() a.ar.state.ClearDeploymentStatus() + a.ar.persistDeploymentStatus(nil) a.ar.stateLock.Unlock() } @@ -37,6 +45,7 @@ func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents // ModifyIndex as they're only mutated by the server. a.ar.stateLock.Lock() a.ar.state.SetDeploymentStatus(time.Now(), healthy) + a.ar.persistDeploymentStatus(a.ar.state.DeploymentStatus) a.ar.stateLock.Unlock() // If deployment is unhealthy emit task events explaining why diff --git a/client/allocrunner/health_hook.go b/client/allocrunner/health_hook.go index c7b26a717..698548d4e 100644 --- a/client/allocrunner/health_hook.go +++ b/client/allocrunner/health_hook.go @@ -16,6 +16,9 @@ import ( // healthMutator is able to set/clear alloc health. type healthSetter interface { + // HasHealth returns true if health is already set. + HasHealth() bool + // Set health via the mutator SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent) @@ -100,7 +103,7 @@ func (h *allocHealthWatcherHook) Name() string { // Not threadsafe so the caller should lock since Updates occur concurrently. func (h *allocHealthWatcherHook) init() error { // No need to watch health as it's already set - if h.alloc.DeploymentStatus.HasHealth() { + if h.healthSetter.HasHealth() { return nil } diff --git a/client/allocrunner/taskrunner/artifact_hook.go b/client/allocrunner/taskrunner/artifact_hook.go index cf0e81c62..7bed440ab 100644 --- a/client/allocrunner/taskrunner/artifact_hook.go +++ b/client/allocrunner/taskrunner/artifact_hook.go @@ -26,6 +26,8 @@ func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook { } func (*artifactHook) Name() string { + // Copied in client/state when upgrading from <0.9 schemas, so if you + // change it here you also must change it there. return "artifacts" } diff --git a/client/allocrunner/taskrunner/dispatch_hook.go b/client/allocrunner/taskrunner/dispatch_hook.go index 25a8f09f2..2564f8046 100644 --- a/client/allocrunner/taskrunner/dispatch_hook.go +++ b/client/allocrunner/taskrunner/dispatch_hook.go @@ -28,6 +28,8 @@ func newDispatchHook(alloc *structs.Allocation, logger hclog.Logger) *dispatchHo } func (*dispatchHook) Name() string { + // Copied in client/state when upgrading from <0.9 schemas, so if you + // change it here you also must change it there. return "dispatch_payload" } diff --git a/client/allocrunner/taskrunner/task_dir_hook.go b/client/allocrunner/taskrunner/task_dir_hook.go index 6a09d1e38..42c5ecc25 100644 --- a/client/allocrunner/taskrunner/task_dir_hook.go +++ b/client/allocrunner/taskrunner/task_dir_hook.go @@ -27,6 +27,8 @@ func newTaskDirHook(runner *TaskRunner, logger log.Logger) *taskDirHook { } func (h *taskDirHook) Name() string { + // Copied in client/state when upgrading from <0.9 schemas, so if you + // change it here you also must change it there. return "task_dir" } diff --git a/client/client.go b/client/client.go index 1e5552c75..c8a347632 100644 --- a/client/client.go +++ b/client/client.go @@ -462,10 +462,21 @@ func (c *Client) init() error { c.logger.Info("using state directory", "state_dir", c.config.StateDir) // Open the state database - db, err := state.GetStateDBFactory(c.config.DevMode)(c.config.StateDir) + db, err := state.GetStateDBFactory(c.config.DevMode)(c.logger, c.config.StateDir) if err != nil { return fmt.Errorf("failed to open state database: %v", err) } + + // Upgrade the state database + if err := db.Upgrade(); err != nil { + // Upgrade only returns an error on critical persistence + // failures in which an operator should intervene before the + // node is accessible. Upgrade drops and logs corrupt state it + // encounters, so failing to start the agent should be extremely + // rare. + return fmt.Errorf("failed to upgrade state database: %v", err) + } + c.stateDB = db // Ensure the alloc dir exists if we have one diff --git a/client/state/08types.go b/client/state/08types.go new file mode 100644 index 000000000..758e7ef6e --- /dev/null +++ b/client/state/08types.go @@ -0,0 +1,78 @@ +package state + +import ( + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" +) + +// allocRunnerMutableState08 is state that had to be written on each save as it +// changed over the life-cycle of the alloc_runner in Nomad 0.8. +// +// https://github.com/hashicorp/nomad/blob/v0.8.6/client/alloc_runner.go#L146-L153 +// +type allocRunnerMutableState08 struct { + // AllocClientStatus does not need to be upgraded as it is computed + // from task states. + AllocClientStatus string + + // AllocClientDescription does not need to be upgraded as it is computed + // from task states. + AllocClientDescription string + + TaskStates map[string]*structs.TaskState + DeploymentStatus *structs.AllocDeploymentStatus +} + +// taskRunnerState08 was used to snapshot the state of the task runner in Nomad +// 0.8. +// +// https://github.com/hashicorp/nomad/blob/v0.8.6/client/task_runner.go#L188-L197 +// +type taskRunnerState08 struct { + Version string + HandleID string + ArtifactDownloaded bool + TaskDirBuilt bool + PayloadRendered bool + DriverNetwork *cstructs.DriverNetwork + // Created Resources are no longer used. + //CreatedResources *driver.CreatedResources +} + +func (t *taskRunnerState08) Upgrade() *state.LocalState { + ls := state.NewLocalState() + + // Reuse DriverNetwork + ls.DriverNetwork = t.DriverNetwork + + // Upgrade artifact state + ls.Hooks["artifacts"] = &state.HookState{ + PrestartDone: t.ArtifactDownloaded, + } + + // Upgrade task dir state + ls.Hooks["task_dir"] = &state.HookState{ + PrestartDone: t.TaskDirBuilt, + } + + // Upgrade dispatch payload state + ls.Hooks["dispatch_payload"] = &state.HookState{ + PrestartDone: t.PayloadRendered, + } + + //TODO How to convert handles?! This does not work. + ls.TaskHandle = drivers.NewTaskHandle("TODO") + + //TODO where do we get this from? + ls.TaskHandle.Config = nil + + //TODO do we need to se this accurately? Or will RecoverTask handle it? + ls.TaskHandle.State = drivers.TaskStateUnknown + + //TODO do we need an envelope so drivers know this is an old state? + ls.TaskHandle.SetDriverState(t.HandleID) + + return ls +} diff --git a/client/state/db_test.go b/client/state/db_test.go index c28078657..68005a21a 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -9,6 +9,7 @@ import ( trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" @@ -19,7 +20,7 @@ func setupBoltDB(t *testing.T) (*BoltStateDB, func()) { dir, err := ioutil.TempDir("", "nomadtest") require.NoError(t, err) - db, err := NewBoltStateDB(dir) + db, err := NewBoltStateDB(testlog.HCLogger(t), dir) if err != nil { if err := os.RemoveAll(dir); err != nil { t.Logf("error removing boltdb dir: %v", err) @@ -242,3 +243,13 @@ func TestStateDB_DriverManager(t *testing.T) { require.Equal(state, ps) }) } + +// TestStateDB_Upgrade asserts calling Upgrade on new databases always +// succeeds. +func TestStateDB_Upgrade(t *testing.T) { + t.Parallel() + + testDB(t, func(t *testing.T, db StateDB) { + require.NoError(t, db.Upgrade()) + }) +} diff --git a/client/state/interface.go b/client/state/interface.go index 7f2cac7a9..2624b46ea 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -12,6 +12,11 @@ type StateDB interface { // Name of implementation. Name() string + // Upgrade ensures the layout of the database is at the latest version + // or returns an error. Corrupt data will be dropped when possible. + // Errors should be considered critical and unrecoverable. + Upgrade() error + // GetAllAllocations returns all valid allocations and a map of // allocation IDs to retrieval errors. // @@ -22,6 +27,11 @@ type StateDB interface { // not be stored. PutAllocation(*structs.Allocation) error + // Get/Put DeploymentStatus get and put the allocation's deployment + // status. It may be nil. + GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) + PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error + // GetTaskRunnerState returns the LocalState and TaskState for a // TaskRunner. Either state may be nil if it is not found, but if an // error is encountered only the error will be non-nil. diff --git a/client/state/memdb.go b/client/state/memdb.go index 0e01c01dc..f0a8d4243 100644 --- a/client/state/memdb.go +++ b/client/state/memdb.go @@ -15,6 +15,9 @@ type MemDB struct { // alloc_id -> value allocs map[string]*structs.Allocation + // alloc_id -> value + deployStatus map[string]*structs.AllocDeploymentStatus + // alloc_id -> task_name -> value localTaskState map[string]map[string]*state.LocalState taskState map[string]map[string]*structs.TaskState @@ -31,6 +34,7 @@ type MemDB struct { func NewMemDB() *MemDB { return &MemDB{ allocs: make(map[string]*structs.Allocation), + deployStatus: make(map[string]*structs.AllocDeploymentStatus), localTaskState: make(map[string]map[string]*state.LocalState), taskState: make(map[string]map[string]*structs.TaskState), } @@ -40,6 +44,10 @@ func (m *MemDB) Name() string { return "memdb" } +func (m *MemDB) Upgrade() error { + return nil +} + func (m *MemDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { m.mu.RLock() defer m.mu.RUnlock() @@ -59,6 +67,19 @@ func (m *MemDB) PutAllocation(alloc *structs.Allocation) error { return nil } +func (m *MemDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.deployStatus[allocID], nil +} + +func (m *MemDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error { + m.mu.Lock() + m.deployStatus[allocID] = ds + defer m.mu.Unlock() + return nil +} + func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/client/state/noopdb.go b/client/state/noopdb.go index cb03a55f5..53364ecba 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -14,6 +14,10 @@ func (n NoopDB) Name() string { return "noopdb" } +func (n NoopDB) Upgrade() error { + return nil +} + func (n NoopDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { return nil, nil, nil } @@ -22,6 +26,14 @@ func (n NoopDB) PutAllocation(*structs.Allocation) error { return nil } +func (n NoopDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) { + return nil, nil +} + +func (n NoopDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error { + return nil +} + func (n NoopDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { return nil, nil, nil } diff --git a/client/state/state_database.go b/client/state/state_database.go index c56722f66..9686a5e5f 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -2,8 +2,11 @@ package state import ( "fmt" + "os" "path/filepath" + "time" + hclog "github.com/hashicorp/go-hclog" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" @@ -14,9 +17,13 @@ import ( /* The client has a boltDB backed state store. The schema as of 0.9 looks as follows: +meta/ +|--> version -> "1" +|--> upgraded -> time.Now().Format(timeRFC3339) allocations/ |--> / - |--> alloc -> allocEntry{*structs.Allocation} + |--> alloc -> allocEntry{*structs.Allocation} + |--> deploy_status -> deployStatusEntry{*structs.AllocDeploymentStatus} |--> task-/ |--> local_state -> *trstate.LocalState # Local-only state |--> task_state -> *structs.TaskState # Sync'd to servers @@ -29,6 +36,21 @@ drivermanager/ */ var ( + // metaBucketName is the name of the metadata bucket + metaBucketName = []byte("meta") + + // metaVersionKey is the key the state schema version is stored under. + metaVersionKey = []byte("version") + + // metaVersion is the value of the state schema version to detect when + // an upgrade is needed. It skips the usual boltdd/msgpack backend to + // be as portable and futureproof as possible. + metaVersion = "1" + + // metaUpgradedKey is the key that stores the timestamp of the last + // time the schema was upgraded. + metaUpgradedKey = []byte("upgraded") + // allocationsBucketName is the bucket name containing all allocation related // data allocationsBucketName = []byte("allocations") @@ -37,6 +59,10 @@ var ( // allocEntry structs. allocKey = []byte("alloc") + // allocDeployStatusKey is the key *structs.AllocDeploymentStatus is + // stored under. + allocDeployStatusKey = []byte("deploy_status") + // allocations -> $allocid -> task-$taskname -> the keys below taskLocalStateKey = []byte("local_state") taskStateKey = []byte("task_state") @@ -60,13 +86,13 @@ func taskBucketName(taskName string) []byte { } // NewStateDBFunc creates a StateDB given a state directory. -type NewStateDBFunc func(stateDir string) (StateDB, error) +type NewStateDBFunc func(logger hclog.Logger, stateDir string) (StateDB, error) // GetStateDBFactory returns a func for creating a StateDB func GetStateDBFactory(devMode bool) NewStateDBFunc { // Return a noop state db implementation when in debug mode if devMode { - return func(string) (StateDB, error) { + return func(hclog.Logger, string) (StateDB, error) { return NoopDB{}, nil } } @@ -77,21 +103,42 @@ func GetStateDBFactory(devMode bool) NewStateDBFunc { // BoltStateDB persists and restores Nomad client state in a boltdb. All // methods are safe for concurrent access. type BoltStateDB struct { - db *boltdd.DB + stateDir string + db *boltdd.DB + logger hclog.Logger } // NewBoltStateDB creates or opens an existing boltdb state file or returns an // error. -func NewBoltStateDB(stateDir string) (StateDB, error) { +func NewBoltStateDB(logger hclog.Logger, stateDir string) (StateDB, error) { + fn := filepath.Join(stateDir, "state.db") + + // Check to see if the DB already exists + fi, err := os.Stat(fn) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + firstRun := fi == nil + // Create or open the boltdb state database - db, err := boltdd.Open(filepath.Join(stateDir, "state.db"), 0600, nil) + db, err := boltdd.Open(fn, 0600, nil) if err != nil { return nil, fmt.Errorf("failed to create state database: %v", err) } sdb := &BoltStateDB{ - db: db, + stateDir: stateDir, + db: db, + logger: logger, } + + // If db did not already exist, initialize metadata fields + if firstRun { + if err := sdb.init(); err != nil { + return nil, err + } + } + return sdb, nil } @@ -182,6 +229,59 @@ func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { }) } +// deployStatusEntry wraps values for DeploymentStatus keys. +type deployStatusEntry struct { + DeploymentStatus *structs.AllocDeploymentStatus +} + +// PutDeploymentStatus stores an allocation's DeploymentStatus or returns an +// error. +func (s *BoltStateDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error { + return s.db.Update(func(tx *boltdd.Tx) error { + return putDeploymentStatusImpl(tx, allocID, ds) + }) +} + +func putDeploymentStatusImpl(tx *boltdd.Tx, allocID string, ds *structs.AllocDeploymentStatus) error { + allocBkt, err := getAllocationBucket(tx, allocID) + if err != nil { + return err + } + + entry := deployStatusEntry{ + DeploymentStatus: ds, + } + return allocBkt.Put(allocDeployStatusKey, &entry) +} + +// GetDeploymentStatus retrieves an allocation's DeploymentStatus or returns an +// error. +func (s *BoltStateDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) { + var entry deployStatusEntry + + err := s.db.View(func(tx *boltdd.Tx) error { + allAllocsBkt := tx.Bucket(allocationsBucketName) + if allAllocsBkt == nil { + // No state, return + return nil + } + + allocBkt := allAllocsBkt.Bucket([]byte(allocID)) + if allocBkt == nil { + // No state for alloc, return + return nil + } + + return allocBkt.Get(allocDeployStatusKey, &entry) + }) + + if err != nil { + return nil, err + } + + return entry.DeploymentStatus, nil +} + // GetTaskRunnerState returns the LocalState and TaskState for a // TaskRunner. LocalState or TaskState will be nil if they do not exist. // @@ -244,31 +344,43 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc // PutTaskRunnerLocalState stores TaskRunner's LocalState or returns an error. func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val *trstate.LocalState) error { return s.db.Update(func(tx *boltdd.Tx) error { - taskBkt, err := getTaskBucket(tx, allocID, taskName) - if err != nil { - return fmt.Errorf("failed to retrieve allocation bucket: %v", err) - } - - if err := taskBkt.Put(taskLocalStateKey, val); err != nil { - return fmt.Errorf("failed to write task_runner state: %v", err) - } - - return nil + return putTaskRunnerLocalStateImpl(tx, allocID, taskName, val) }) } +// putTaskRunnerLocalStateImpl stores TaskRunner's LocalState in an ongoing +// transaction or returns an error. +func putTaskRunnerLocalStateImpl(tx *boltdd.Tx, allocID, taskName string, val *trstate.LocalState) error { + taskBkt, err := getTaskBucket(tx, allocID, taskName) + if err != nil { + return fmt.Errorf("failed to retrieve allocation bucket: %v", err) + } + + if err := taskBkt.Put(taskLocalStateKey, val); err != nil { + return fmt.Errorf("failed to write task_runner state: %v", err) + } + + return nil +} + // PutTaskState stores a task's state or returns an error. func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.TaskState) error { return s.db.Update(func(tx *boltdd.Tx) error { - taskBkt, err := getTaskBucket(tx, allocID, taskName) - if err != nil { - return fmt.Errorf("failed to retrieve allocation bucket: %v", err) - } - - return taskBkt.Put(taskStateKey, state) + return putTaskStateImpl(tx, allocID, taskName, state) }) } +// putTaskStateImpl stores a task's state in an ongoing transaction or returns +// an error. +func putTaskStateImpl(tx *boltdd.Tx, allocID, taskName string, state *structs.TaskState) error { + taskBkt, err := getTaskBucket(tx, allocID, taskName) + if err != nil { + return fmt.Errorf("failed to retrieve allocation bucket: %v", err) + } + + return taskBkt.Put(taskStateKey, state) +} + // DeleteTaskBucket is used to delete a task bucket if it exists. func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error { return s.db.Update(func(tx *boltdd.Tx) error { @@ -469,3 +581,59 @@ func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) { return ps, nil } + +// init initializes metadata entries in a newly created state database. +func (s *BoltStateDB) init() error { + return s.db.Update(func(tx *boltdd.Tx) error { + return addMeta(tx.BoltTx()) + }) +} + +// Upgrade bolt state db from 0.8 schema to 0.9 schema. Noop if already using +// 0.9 schema. Creates a backup before upgrading. +func (s *BoltStateDB) Upgrade() error { + // Check to see if the underlying DB needs upgrading. + upgrade, err := NeedsUpgrade(s.db.BoltDB()) + if err != nil { + return err + } + if !upgrade { + // No upgrade needed! + return nil + } + + // Upgraded needed. Backup the boltdb first. + backupFileName := filepath.Join(s.stateDir, "state.db.backup") + if err := backupDB(s.db.BoltDB(), backupFileName); err != nil { + return fmt.Errorf("error backing up state db: %v", err) + } + + // Perform the upgrade + if err := s.db.Update(func(tx *boltdd.Tx) error { + if err := UpgradeAllocs(s.logger, tx); err != nil { + return err + } + + // Add standard metadata + if err := addMeta(tx.BoltTx()); err != nil { + return err + } + + // Write the time the upgrade was done + bkt, err := tx.CreateBucketIfNotExists(metaBucketName) + if err != nil { + return err + } + return bkt.Put(metaUpgradedKey, time.Now().Format(time.RFC3339)) + }); err != nil { + return err + } + + s.logger.Info("successfully upgraded state") + return nil +} + +// DB allows access to the underlying BoltDB for testing purposes. +func (s *BoltStateDB) DB() *boltdd.DB { + return s.db +} diff --git a/client/state/testdata/state-0.7.1.db.gz b/client/state/testdata/state-0.7.1.db.gz new file mode 100644 index 000000000..f31982154 Binary files /dev/null and b/client/state/testdata/state-0.7.1.db.gz differ diff --git a/client/state/testdata/state-0.8.6-empty.db.gz b/client/state/testdata/state-0.8.6-empty.db.gz new file mode 100644 index 000000000..5e8ccde2a Binary files /dev/null and b/client/state/testdata/state-0.8.6-empty.db.gz differ diff --git a/client/state/testdata/state-0.8.6-no-deploy.db.gz b/client/state/testdata/state-0.8.6-no-deploy.db.gz new file mode 100644 index 000000000..917041ece Binary files /dev/null and b/client/state/testdata/state-0.8.6-no-deploy.db.gz differ diff --git a/client/state/upgrade.go b/client/state/upgrade.go new file mode 100644 index 000000000..53dc48017 --- /dev/null +++ b/client/state/upgrade.go @@ -0,0 +1,299 @@ +package state + +import ( + "bytes" + "fmt" + "os" + + "github.com/boltdb/bolt" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/helper/boltdd" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" +) + +// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is +// already up to date. +func NeedsUpgrade(bdb *bolt.DB) (bool, error) { + needsUpgrade := true + err := bdb.View(func(tx *bolt.Tx) error { + b := tx.Bucket(metaBucketName) + if b == nil { + // No meta bucket; upgrade + return nil + } + + v := b.Get(metaVersionKey) + if len(v) == 0 { + // No version; upgrade + return nil + } + + if string(v) != metaVersion { + // Version exists but does not match. Abort. + return fmt.Errorf("incompatible state version. expected %q but found %q", + metaVersion, v) + } + + // Version matches! Assume migrated! + needsUpgrade = false + return nil + }) + + return needsUpgrade, err +} + +// addMeta adds version metadata to BoltDB to mark it as upgraded and +// should be run at the end of the upgrade transaction. +func addMeta(tx *bolt.Tx) error { + // Create the meta bucket if it doesn't exist + bkt, err := tx.CreateBucketIfNotExists(metaBucketName) + if err != nil { + return err + } + + return bkt.Put(metaVersionKey, []byte(metaVersion)) +} + +// backupDB backs up the existing state database prior to upgrade overwriting +// previous backups. +func backupDB(bdb *bolt.DB, dst string) error { + fd, err := os.Create(dst) + if err != nil { + return err + } + + return bdb.View(func(tx *bolt.Tx) error { + if _, err := tx.WriteTo(fd); err != nil { + fd.Close() + return err + } + + return fd.Close() + }) +} + +// UpgradeSchema upgrades the boltdb schema. Example 0.8 schema: +// +// * allocations +// * 15d83e8a-74a2-b4da-3f17-ed5c12895ea8 +// * echo +// - simple-all (342 bytes) +// - alloc (2827 bytes) +// - alloc-dir (166 bytes) +// - immutable (15 bytes) +// - mutable (1294 bytes) +// +func UpgradeAllocs(logger hclog.Logger, tx *boltdd.Tx) error { + btx := tx.BoltTx() + allocationsBucket := btx.Bucket(allocationsBucketName) + if allocationsBucket == nil { + // No state! + return nil + } + + // Gather alloc buckets and remove unexpected key/value pairs + allocBuckets := [][]byte{} + cur := allocationsBucket.Cursor() + for k, v := cur.First(); k != nil; k, v = cur.Next() { + if v != nil { + logger.Warn("deleting unexpected key in state db", + "key", string(k), "value_bytes", len(v), + ) + + if err := cur.Delete(); err != nil { + return err + } + continue + } + + allocBuckets = append(allocBuckets, k) + } + + for _, allocBucket := range allocBuckets { + allocID := string(allocBucket) + + bkt := allocationsBucket.Bucket(allocBucket) + if bkt == nil { + // This should never happen as we just read the bucket. + return fmt.Errorf("unexpected bucket missing %q", allocID) + } + + allocLogger := logger.With("alloc_id", allocID) + if err := upgradeAllocBucket(allocLogger, tx, bkt, allocID); err != nil { + // Log and drop invalid allocs + allocLogger.Error("dropping invalid allocation due to error while upgrading state", + "error", err, + ) + + // If we can't delete the bucket something is seriously + // wrong, fail hard. + if err := allocationsBucket.DeleteBucket(allocBucket); err != nil { + return fmt.Errorf("error deleting invalid allocation state: %v", err) + } + } + } + + return nil +} + +// upgradeAllocBucket upgrades an alloc bucket. +func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, allocID string) error { + allocFound := false + taskBuckets := [][]byte{} + cur := bkt.Cursor() + for k, v := cur.First(); k != nil; k, v = cur.Next() { + switch string(k) { + case "alloc": + // Alloc has not changed; leave it be + allocFound = true + case "alloc-dir": + // Delete alloc-dir entries as they're no longer + // needed. + cur.Delete() + case "immutable": + // Skip decoding immutable state. Nothing from it needs + // to be upgraded. + cur.Delete() + case "mutable": + // Decode and upgrade + if err := upgradeOldAllocMutable(tx, allocID, v); err != nil { + return err + } + cur.Delete() + default: + if v != nil { + logger.Warn("deleting unexpected state entry for allocation", + "key", string(k), "value_bytes", len(v), + ) + + if err := cur.Delete(); err != nil { + return err + } + + continue + } + + // Nested buckets are tasks + taskBuckets = append(taskBuckets, k) + } + } + + // If the alloc entry was not found, abandon this allocation as the + // state has been corrupted. + if !allocFound { + return fmt.Errorf("alloc entry not found") + } + + // Upgrade tasks + for _, taskBucket := range taskBuckets { + taskName := string(taskBucket) + taskLogger := logger.With("task_name", taskName) + + taskBkt := bkt.Bucket(taskBucket) + if taskBkt == nil { + // This should never happen as we just read the bucket. + return fmt.Errorf("unexpected bucket missing %q", taskName) + } + + oldState, err := upgradeTaskBucket(taskLogger, taskBkt) + if err != nil { + taskLogger.Warn("dropping invalid task due to error while upgrading state", + "error", err, + ) + + // Delete the invalid task bucket and treat failures + // here as unrecoverable errors. + if err := bkt.DeleteBucket(taskBucket); err != nil { + return fmt.Errorf("error deleting invalid task state for task %q: %v", + taskName, err, + ) + } + } + + // Convert 0.8 task state to 0.9 task state + localTaskState := oldState.Upgrade() + + // Insert the new task state + if err := putTaskRunnerLocalStateImpl(tx, allocID, taskName, localTaskState); err != nil { + return err + } + + // Delete the old task bucket + if err := bkt.DeleteBucket(taskBucket); err != nil { + return err + } + + taskLogger.Trace("upgraded", "from", oldState.Version) + } + + return nil +} + +// upgradeTaskBucket iterates over keys in a task bucket, deleting invalid keys +// and returning the 0.8 version of the state. +func upgradeTaskBucket(logger hclog.Logger, bkt *bolt.Bucket) (*taskRunnerState08, error) { + simpleFound := false + var trState taskRunnerState08 + + cur := bkt.Cursor() + for k, v := cur.First(); k != nil; k, v = cur.Next() { + if !bytes.Equal(k, []byte("simple-all")) { + if v == nil { + // Delete Bucket + logger.Warn("deleting unexpected task state bucket", + "bucket", string(k), + ) + + if err := bkt.DeleteBucket(k); err != nil { + return nil, err + } + } else { + // Delete entry + logger.Warn("deleting unexpected task state entry", + "key", string(k), "value_bytes", len(v), + ) + + if err := cur.Delete(); err != nil { + return nil, err + } + } + continue + } + + // Decode simple-all + simpleFound = true + if err := codec.NewDecoderBytes(v, structs.MsgpackHandle).Decode(&trState); err != nil { + return nil, fmt.Errorf("failed to decode task state: %v", err) + } + } + + if !simpleFound { + return nil, fmt.Errorf("task state entry not found") + } + + return &trState, nil +} + +// upgradeOldAllocMutable upgrades Nomad 0.8 alloc runner state. +func upgradeOldAllocMutable(tx *boltdd.Tx, allocID string, oldBytes []byte) error { + var oldMutable allocRunnerMutableState08 + err := codec.NewDecoderBytes(oldBytes, structs.MsgpackHandle).Decode(&oldMutable) + if err != nil { + return err + } + + // Upgrade Deployment Status + if err := putDeploymentStatusImpl(tx, allocID, oldMutable.DeploymentStatus); err != nil { + return err + } + + // Upgrade Task States + for taskName, taskState := range oldMutable.TaskStates { + if err := putTaskStateImpl(tx, allocID, taskName, taskState); err != nil { + return err + } + } + + return nil +} diff --git a/client/state/upgrade_int_test.go b/client/state/upgrade_int_test.go new file mode 100644 index 000000000..043b00be3 --- /dev/null +++ b/client/state/upgrade_int_test.go @@ -0,0 +1,159 @@ +package state_test + +import ( + "compress/gzip" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/hashicorp/nomad/client/allocrunner" + "github.com/hashicorp/nomad/client/allocwatcher" + clientconfig "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/devicemanager" + dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + . "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/helper/boltdd" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/shared" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestBoltStateDB_Upgrade_Ok asserts upgading an old state db does not error +// during upgrade and restore. +func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) { + t.Parallel() + + files, err := filepath.Glob("testdata/*.db*") + require.NoError(t, err) + + for _, fn := range files { + t.Run(fn, func(t *testing.T) { + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + defer os.RemoveAll(dir) + + var src io.ReadCloser + src, err = os.Open(fn) + require.NoError(t, err) + defer src.Close() + + // testdata may be gzip'd; decode on copy + if strings.HasSuffix(fn, ".gz") { + src, err = gzip.NewReader(src) + require.NoError(t, err) + } + + dst, err := os.Create(filepath.Join(dir, "state.db")) + require.NoError(t, err) + + // Copy test files before testing them for safety + _, err = io.Copy(dst, src) + require.NoError(t, err) + + require.NoError(t, src.Close()) + + dbI, err := NewBoltStateDB(testlog.HCLogger(t), dir) + require.NoError(t, err) + defer dbI.Close() + + db := dbI.(*BoltStateDB) + + // Simply opening old files should *not* alter them + require.NoError(t, db.DB().View(func(tx *boltdd.Tx) error { + b := tx.Bucket([]byte("meta")) + if b != nil { + return fmt.Errorf("meta bucket found but should not exist yet!") + } + return nil + })) + + needsUpgrade, err := NeedsUpgrade(db.DB().BoltDB()) + require.NoError(t, err) + require.True(t, needsUpgrade) + + // Attept the upgrade + require.NoError(t, db.Upgrade()) + + needsUpgrade, err = NeedsUpgrade(db.DB().BoltDB()) + require.NoError(t, err) + require.False(t, needsUpgrade) + + // Ensure Allocations can be restored and + // NewAR/AR.Restore do not error. + allocs, errs, err := db.GetAllAllocations() + require.NoError(t, err) + assert.Len(t, errs, 0) + + for _, alloc := range allocs { + checkUpgradedAlloc(t, dir, db, alloc) + } + + // Should be nil for all upgrades + ps, err := db.GetDevicePluginState() + require.NoError(t, err) + require.Nil(t, ps) + + ps = &dmstate.PluginState{ + ReattachConfigs: map[string]*shared.ReattachConfig{ + "test": &shared.ReattachConfig{Pid: 1}, + }, + } + require.NoError(t, db.PutDevicePluginState(ps)) + + require.NoError(t, db.Close()) + }) + } +} + +// checkUpgradedAlloc creates and restores an AllocRunner from an upgraded +// database. +// +// It does not call AR.Run as its intended to be used against a wide test +// corpus in testdata that may be expensive to run and require unavailable +// dependencies. +func checkUpgradedAlloc(t *testing.T, path string, db StateDB, alloc *structs.Allocation) { + _, err := db.GetDeploymentStatus(alloc.ID) + assert.NoError(t, err) + + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + for _, task := range tg.Tasks { + _, _, err := db.GetTaskRunnerState(alloc.ID, task.Name) + require.NoError(t, err) + } + + clientConf, cleanup := clientconfig.TestClientConfig(t) + + // Does *not* cleanup overridden StateDir below. That's left alone for + // the caller to cleanup. + defer cleanup() + + clientConf.StateDir = path + + conf := &allocrunner.Config{ + Alloc: alloc, + Logger: clientConf.Logger, + ClientConfig: clientConf, + StateDB: db, + Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), + Vault: vaultclient.NewMockVaultClient(), + StateUpdater: &allocrunner.MockStateUpdater{}, + PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, + PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + } + ar, err := allocrunner.NewAllocRunner(conf) + require.NoError(t, err) + + // AllocRunner.Restore should not error + require.NoError(t, ar.Restore()) +} diff --git a/client/state/upgrade_test.go b/client/state/upgrade_test.go new file mode 100644 index 000000000..52a2ac7d9 --- /dev/null +++ b/client/state/upgrade_test.go @@ -0,0 +1,109 @@ +package state + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/boltdb/bolt" + "github.com/hashicorp/nomad/helper/boltdd" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/stretchr/testify/require" +) + +// TestUpgrade_NeedsUpgrade_New asserts new state dbs do not need upgrading. +func TestUpgrade_NeedsUpgrade_New(t *testing.T) { + t.Parallel() + + db, cleanup := setupBoltDB(t) + defer cleanup() + + up, err := NeedsUpgrade(db.DB().BoltDB()) + require.NoError(t, err) + require.False(t, up) +} + +// TestUpgrade_NeedsUpgrade_Old asserts state dbs with just the alloctions +// bucket *do* need upgrading. +func TestUpgrade_NeedsUpgrade_Old(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + + defer os.RemoveAll(dir) + + db, err := bolt.Open(filepath.Join(dir, "state.db"), 0666, nil) + require.NoError(t, err) + defer db.Close() + + // Create the allocations bucket which exists in both the old and 0.9 + // schemas + require.NoError(t, db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucket(allocationsBucketName) + return err + })) + + up, err := NeedsUpgrade(db) + require.NoError(t, err) + require.True(t, up) + + // Adding meta should mark it as upgraded + require.NoError(t, db.Update(addMeta)) + + up, err = NeedsUpgrade(db) + require.NoError(t, err) + require.False(t, up) +} + +// TestUpgrade_DeleteInvalidAllocs asserts invalid allocations are deleted +// during state upgades instead of failing the entire agent. +func TestUpgrade_DeleteInvalidAllocs_NoAlloc(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + + defer os.RemoveAll(dir) + + db, err := boltdd.Open(filepath.Join(dir, "state.db"), 0666, nil) + require.NoError(t, err) + defer db.Close() + + allocID := []byte(uuid.Generate()) + + // Create an allocation bucket with no `alloc` key. This is an observed + // pre-0.9 state corruption that should result in the allocation being + // dropped while allowing the upgrade to continue. + require.NoError(t, db.Update(func(tx *boltdd.Tx) error { + parentBkt, err := tx.CreateBucket(allocationsBucketName) + if err != nil { + return err + } + + _, err = parentBkt.CreateBucket(allocID) + return err + })) + + // Perform the Upgrade + require.NoError(t, db.Update(func(tx *boltdd.Tx) error { + return UpgradeAllocs(testlog.HCLogger(t), tx) + })) + + // Assert invalid allocation bucket was removed + require.NoError(t, db.View(func(tx *boltdd.Tx) error { + parentBkt := tx.Bucket(allocationsBucketName) + if parentBkt == nil { + return fmt.Errorf("parent allocations bucket should not have been removed") + } + + if parentBkt.Bucket(allocID) != nil { + return fmt.Errorf("invalid alloc bucket should have been deleted") + } + + return nil + })) +}