mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
client/state: support upgrading from 0.8->0.9
Also persist and load DeploymentStatus to avoid rechecking health after client restarts.
This commit is contained in:
committed by
Michael Schurter
parent
bd53ccf548
commit
784706a1e5
3
.gitignore
vendored
3
.gitignore
vendored
@@ -39,6 +39,9 @@ website/npm-debug.log
|
||||
# Test file
|
||||
exit-code
|
||||
|
||||
# Don't commit uncompressed state test files
|
||||
client/state/testdata/*.db
|
||||
|
||||
ui/.sass-cache
|
||||
ui/static/base.css
|
||||
|
||||
|
||||
@@ -334,6 +334,18 @@ func (ar *allocRunner) GetAllocDir() *allocdir.AllocDir {
|
||||
// Restore state from database. Must be called after NewAllocRunner but before
|
||||
// Run.
|
||||
func (ar *allocRunner) Restore() error {
|
||||
// Retrieve deployment status to avoid reseting it across agent
|
||||
// restarts. Once a deployment status is set Nomad no longer monitors
|
||||
// alloc health, so we must persist deployment state across restarts.
|
||||
ds, err := ar.stateDB.GetDeploymentStatus(ar.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ar.stateLock.Lock()
|
||||
ar.state.DeploymentStatus = ds
|
||||
ar.stateLock.Unlock()
|
||||
|
||||
// Restore task runners
|
||||
for _, tr := range ar.tasks {
|
||||
if err := tr.Restore(); err != nil {
|
||||
@@ -344,6 +356,19 @@ func (ar *allocRunner) Restore() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// persistDeploymentStatus stores AllocDeploymentStatus.
|
||||
func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) {
|
||||
if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil {
|
||||
// While any persistence errors are very bad, the worst case
|
||||
// scenario for failing to persist deployment status is that if
|
||||
// the agent is restarted it will monitor the deployment status
|
||||
// again. This could cause a deployment's status to change when
|
||||
// that shouldn't happen. However, allowing that seems better
|
||||
// than failing the entire allocation.
|
||||
ar.logger.Error("error storing deployment status", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TaskStateUpdated is called by TaskRunner when a task's state has been
|
||||
// updated. It does not process the update synchronously but instead notifies a
|
||||
// goroutine the state has change. Since processing the state change may cause
|
||||
|
||||
@@ -16,6 +16,13 @@ type allocHealthSetter struct {
|
||||
ar *allocRunner
|
||||
}
|
||||
|
||||
// HasHealth returns true if a deployment status is already set.
|
||||
func (a *allocHealthSetter) HasHealth() bool {
|
||||
a.ar.stateLock.Lock()
|
||||
defer a.ar.stateLock.Unlock()
|
||||
return a.ar.state.DeploymentStatus.HasHealth()
|
||||
}
|
||||
|
||||
// ClearHealth allows the health watcher hook to clear the alloc's deployment
|
||||
// health if the deployment id changes. It does not update the server as the
|
||||
// status is only cleared when already receiving an update from the server.
|
||||
@@ -24,6 +31,7 @@ type allocHealthSetter struct {
|
||||
func (a *allocHealthSetter) ClearHealth() {
|
||||
a.ar.stateLock.Lock()
|
||||
a.ar.state.ClearDeploymentStatus()
|
||||
a.ar.persistDeploymentStatus(nil)
|
||||
a.ar.stateLock.Unlock()
|
||||
}
|
||||
|
||||
@@ -37,6 +45,7 @@ func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents
|
||||
// ModifyIndex as they're only mutated by the server.
|
||||
a.ar.stateLock.Lock()
|
||||
a.ar.state.SetDeploymentStatus(time.Now(), healthy)
|
||||
a.ar.persistDeploymentStatus(a.ar.state.DeploymentStatus)
|
||||
a.ar.stateLock.Unlock()
|
||||
|
||||
// If deployment is unhealthy emit task events explaining why
|
||||
|
||||
@@ -16,6 +16,9 @@ import (
|
||||
|
||||
// healthMutator is able to set/clear alloc health.
|
||||
type healthSetter interface {
|
||||
// HasHealth returns true if health is already set.
|
||||
HasHealth() bool
|
||||
|
||||
// Set health via the mutator
|
||||
SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent)
|
||||
|
||||
@@ -100,7 +103,7 @@ func (h *allocHealthWatcherHook) Name() string {
|
||||
// Not threadsafe so the caller should lock since Updates occur concurrently.
|
||||
func (h *allocHealthWatcherHook) init() error {
|
||||
// No need to watch health as it's already set
|
||||
if h.alloc.DeploymentStatus.HasHealth() {
|
||||
if h.healthSetter.HasHealth() {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,8 @@ func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook {
|
||||
}
|
||||
|
||||
func (*artifactHook) Name() string {
|
||||
// Copied in client/state when upgrading from <0.9 schemas, so if you
|
||||
// change it here you also must change it there.
|
||||
return "artifacts"
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@ func newDispatchHook(alloc *structs.Allocation, logger hclog.Logger) *dispatchHo
|
||||
}
|
||||
|
||||
func (*dispatchHook) Name() string {
|
||||
// Copied in client/state when upgrading from <0.9 schemas, so if you
|
||||
// change it here you also must change it there.
|
||||
return "dispatch_payload"
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,8 @@ func newTaskDirHook(runner *TaskRunner, logger log.Logger) *taskDirHook {
|
||||
}
|
||||
|
||||
func (h *taskDirHook) Name() string {
|
||||
// Copied in client/state when upgrading from <0.9 schemas, so if you
|
||||
// change it here you also must change it there.
|
||||
return "task_dir"
|
||||
}
|
||||
|
||||
|
||||
@@ -462,10 +462,21 @@ func (c *Client) init() error {
|
||||
c.logger.Info("using state directory", "state_dir", c.config.StateDir)
|
||||
|
||||
// Open the state database
|
||||
db, err := state.GetStateDBFactory(c.config.DevMode)(c.config.StateDir)
|
||||
db, err := state.GetStateDBFactory(c.config.DevMode)(c.logger, c.config.StateDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open state database: %v", err)
|
||||
}
|
||||
|
||||
// Upgrade the state database
|
||||
if err := db.Upgrade(); err != nil {
|
||||
// Upgrade only returns an error on critical persistence
|
||||
// failures in which an operator should intervene before the
|
||||
// node is accessible. Upgrade drops and logs corrupt state it
|
||||
// encounters, so failing to start the agent should be extremely
|
||||
// rare.
|
||||
return fmt.Errorf("failed to upgrade state database: %v", err)
|
||||
}
|
||||
|
||||
c.stateDB = db
|
||||
|
||||
// Ensure the alloc dir exists if we have one
|
||||
|
||||
78
client/state/08types.go
Normal file
78
client/state/08types.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
)
|
||||
|
||||
// allocRunnerMutableState08 is state that had to be written on each save as it
|
||||
// changed over the life-cycle of the alloc_runner in Nomad 0.8.
|
||||
//
|
||||
// https://github.com/hashicorp/nomad/blob/v0.8.6/client/alloc_runner.go#L146-L153
|
||||
//
|
||||
type allocRunnerMutableState08 struct {
|
||||
// AllocClientStatus does not need to be upgraded as it is computed
|
||||
// from task states.
|
||||
AllocClientStatus string
|
||||
|
||||
// AllocClientDescription does not need to be upgraded as it is computed
|
||||
// from task states.
|
||||
AllocClientDescription string
|
||||
|
||||
TaskStates map[string]*structs.TaskState
|
||||
DeploymentStatus *structs.AllocDeploymentStatus
|
||||
}
|
||||
|
||||
// taskRunnerState08 was used to snapshot the state of the task runner in Nomad
|
||||
// 0.8.
|
||||
//
|
||||
// https://github.com/hashicorp/nomad/blob/v0.8.6/client/task_runner.go#L188-L197
|
||||
//
|
||||
type taskRunnerState08 struct {
|
||||
Version string
|
||||
HandleID string
|
||||
ArtifactDownloaded bool
|
||||
TaskDirBuilt bool
|
||||
PayloadRendered bool
|
||||
DriverNetwork *cstructs.DriverNetwork
|
||||
// Created Resources are no longer used.
|
||||
//CreatedResources *driver.CreatedResources
|
||||
}
|
||||
|
||||
func (t *taskRunnerState08) Upgrade() *state.LocalState {
|
||||
ls := state.NewLocalState()
|
||||
|
||||
// Reuse DriverNetwork
|
||||
ls.DriverNetwork = t.DriverNetwork
|
||||
|
||||
// Upgrade artifact state
|
||||
ls.Hooks["artifacts"] = &state.HookState{
|
||||
PrestartDone: t.ArtifactDownloaded,
|
||||
}
|
||||
|
||||
// Upgrade task dir state
|
||||
ls.Hooks["task_dir"] = &state.HookState{
|
||||
PrestartDone: t.TaskDirBuilt,
|
||||
}
|
||||
|
||||
// Upgrade dispatch payload state
|
||||
ls.Hooks["dispatch_payload"] = &state.HookState{
|
||||
PrestartDone: t.PayloadRendered,
|
||||
}
|
||||
|
||||
//TODO How to convert handles?! This does not work.
|
||||
ls.TaskHandle = drivers.NewTaskHandle("TODO")
|
||||
|
||||
//TODO where do we get this from?
|
||||
ls.TaskHandle.Config = nil
|
||||
|
||||
//TODO do we need to se this accurately? Or will RecoverTask handle it?
|
||||
ls.TaskHandle.State = drivers.TaskStateUnknown
|
||||
|
||||
//TODO do we need an envelope so drivers know this is an old state?
|
||||
ls.TaskHandle.SetDriverState(t.HandleID)
|
||||
|
||||
return ls
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/kr/pretty"
|
||||
@@ -19,7 +20,7 @@ func setupBoltDB(t *testing.T) (*BoltStateDB, func()) {
|
||||
dir, err := ioutil.TempDir("", "nomadtest")
|
||||
require.NoError(t, err)
|
||||
|
||||
db, err := NewBoltStateDB(dir)
|
||||
db, err := NewBoltStateDB(testlog.HCLogger(t), dir)
|
||||
if err != nil {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Logf("error removing boltdb dir: %v", err)
|
||||
@@ -242,3 +243,13 @@ func TestStateDB_DriverManager(t *testing.T) {
|
||||
require.Equal(state, ps)
|
||||
})
|
||||
}
|
||||
|
||||
// TestStateDB_Upgrade asserts calling Upgrade on new databases always
|
||||
// succeeds.
|
||||
func TestStateDB_Upgrade(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testDB(t, func(t *testing.T, db StateDB) {
|
||||
require.NoError(t, db.Upgrade())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -12,6 +12,11 @@ type StateDB interface {
|
||||
// Name of implementation.
|
||||
Name() string
|
||||
|
||||
// Upgrade ensures the layout of the database is at the latest version
|
||||
// or returns an error. Corrupt data will be dropped when possible.
|
||||
// Errors should be considered critical and unrecoverable.
|
||||
Upgrade() error
|
||||
|
||||
// GetAllAllocations returns all valid allocations and a map of
|
||||
// allocation IDs to retrieval errors.
|
||||
//
|
||||
@@ -22,6 +27,11 @@ type StateDB interface {
|
||||
// not be stored.
|
||||
PutAllocation(*structs.Allocation) error
|
||||
|
||||
// Get/Put DeploymentStatus get and put the allocation's deployment
|
||||
// status. It may be nil.
|
||||
GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error)
|
||||
PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error
|
||||
|
||||
// GetTaskRunnerState returns the LocalState and TaskState for a
|
||||
// TaskRunner. Either state may be nil if it is not found, but if an
|
||||
// error is encountered only the error will be non-nil.
|
||||
|
||||
@@ -15,6 +15,9 @@ type MemDB struct {
|
||||
// alloc_id -> value
|
||||
allocs map[string]*structs.Allocation
|
||||
|
||||
// alloc_id -> value
|
||||
deployStatus map[string]*structs.AllocDeploymentStatus
|
||||
|
||||
// alloc_id -> task_name -> value
|
||||
localTaskState map[string]map[string]*state.LocalState
|
||||
taskState map[string]map[string]*structs.TaskState
|
||||
@@ -31,6 +34,7 @@ type MemDB struct {
|
||||
func NewMemDB() *MemDB {
|
||||
return &MemDB{
|
||||
allocs: make(map[string]*structs.Allocation),
|
||||
deployStatus: make(map[string]*structs.AllocDeploymentStatus),
|
||||
localTaskState: make(map[string]map[string]*state.LocalState),
|
||||
taskState: make(map[string]map[string]*structs.TaskState),
|
||||
}
|
||||
@@ -40,6 +44,10 @@ func (m *MemDB) Name() string {
|
||||
return "memdb"
|
||||
}
|
||||
|
||||
func (m *MemDB) Upgrade() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
@@ -59,6 +67,19 @@ func (m *MemDB) PutAllocation(alloc *structs.Allocation) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.deployStatus[allocID], nil
|
||||
}
|
||||
|
||||
func (m *MemDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error {
|
||||
m.mu.Lock()
|
||||
m.deployStatus[allocID] = ds
|
||||
defer m.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
@@ -14,6 +14,10 @@ func (n NoopDB) Name() string {
|
||||
return "noopdb"
|
||||
}
|
||||
|
||||
func (n NoopDB) Upgrade() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n NoopDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
@@ -22,6 +26,14 @@ func (n NoopDB) PutAllocation(*structs.Allocation) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n NoopDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (n NoopDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n NoopDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
@@ -2,8 +2,11 @@ package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
||||
@@ -14,9 +17,13 @@ import (
|
||||
/*
|
||||
The client has a boltDB backed state store. The schema as of 0.9 looks as follows:
|
||||
|
||||
meta/
|
||||
|--> version -> "1"
|
||||
|--> upgraded -> time.Now().Format(timeRFC3339)
|
||||
allocations/
|
||||
|--> <alloc-id>/
|
||||
|--> alloc -> allocEntry{*structs.Allocation}
|
||||
|--> alloc -> allocEntry{*structs.Allocation}
|
||||
|--> deploy_status -> deployStatusEntry{*structs.AllocDeploymentStatus}
|
||||
|--> task-<name>/
|
||||
|--> local_state -> *trstate.LocalState # Local-only state
|
||||
|--> task_state -> *structs.TaskState # Sync'd to servers
|
||||
@@ -29,6 +36,21 @@ drivermanager/
|
||||
*/
|
||||
|
||||
var (
|
||||
// metaBucketName is the name of the metadata bucket
|
||||
metaBucketName = []byte("meta")
|
||||
|
||||
// metaVersionKey is the key the state schema version is stored under.
|
||||
metaVersionKey = []byte("version")
|
||||
|
||||
// metaVersion is the value of the state schema version to detect when
|
||||
// an upgrade is needed. It skips the usual boltdd/msgpack backend to
|
||||
// be as portable and futureproof as possible.
|
||||
metaVersion = "1"
|
||||
|
||||
// metaUpgradedKey is the key that stores the timestamp of the last
|
||||
// time the schema was upgraded.
|
||||
metaUpgradedKey = []byte("upgraded")
|
||||
|
||||
// allocationsBucketName is the bucket name containing all allocation related
|
||||
// data
|
||||
allocationsBucketName = []byte("allocations")
|
||||
@@ -37,6 +59,10 @@ var (
|
||||
// allocEntry structs.
|
||||
allocKey = []byte("alloc")
|
||||
|
||||
// allocDeployStatusKey is the key *structs.AllocDeploymentStatus is
|
||||
// stored under.
|
||||
allocDeployStatusKey = []byte("deploy_status")
|
||||
|
||||
// allocations -> $allocid -> task-$taskname -> the keys below
|
||||
taskLocalStateKey = []byte("local_state")
|
||||
taskStateKey = []byte("task_state")
|
||||
@@ -60,13 +86,13 @@ func taskBucketName(taskName string) []byte {
|
||||
}
|
||||
|
||||
// NewStateDBFunc creates a StateDB given a state directory.
|
||||
type NewStateDBFunc func(stateDir string) (StateDB, error)
|
||||
type NewStateDBFunc func(logger hclog.Logger, stateDir string) (StateDB, error)
|
||||
|
||||
// GetStateDBFactory returns a func for creating a StateDB
|
||||
func GetStateDBFactory(devMode bool) NewStateDBFunc {
|
||||
// Return a noop state db implementation when in debug mode
|
||||
if devMode {
|
||||
return func(string) (StateDB, error) {
|
||||
return func(hclog.Logger, string) (StateDB, error) {
|
||||
return NoopDB{}, nil
|
||||
}
|
||||
}
|
||||
@@ -77,21 +103,42 @@ func GetStateDBFactory(devMode bool) NewStateDBFunc {
|
||||
// BoltStateDB persists and restores Nomad client state in a boltdb. All
|
||||
// methods are safe for concurrent access.
|
||||
type BoltStateDB struct {
|
||||
db *boltdd.DB
|
||||
stateDir string
|
||||
db *boltdd.DB
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
// NewBoltStateDB creates or opens an existing boltdb state file or returns an
|
||||
// error.
|
||||
func NewBoltStateDB(stateDir string) (StateDB, error) {
|
||||
func NewBoltStateDB(logger hclog.Logger, stateDir string) (StateDB, error) {
|
||||
fn := filepath.Join(stateDir, "state.db")
|
||||
|
||||
// Check to see if the DB already exists
|
||||
fi, err := os.Stat(fn)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
firstRun := fi == nil
|
||||
|
||||
// Create or open the boltdb state database
|
||||
db, err := boltdd.Open(filepath.Join(stateDir, "state.db"), 0600, nil)
|
||||
db, err := boltdd.Open(fn, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create state database: %v", err)
|
||||
}
|
||||
|
||||
sdb := &BoltStateDB{
|
||||
db: db,
|
||||
stateDir: stateDir,
|
||||
db: db,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
// If db did not already exist, initialize metadata fields
|
||||
if firstRun {
|
||||
if err := sdb.init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return sdb, nil
|
||||
}
|
||||
|
||||
@@ -182,6 +229,59 @@ func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error {
|
||||
})
|
||||
}
|
||||
|
||||
// deployStatusEntry wraps values for DeploymentStatus keys.
|
||||
type deployStatusEntry struct {
|
||||
DeploymentStatus *structs.AllocDeploymentStatus
|
||||
}
|
||||
|
||||
// PutDeploymentStatus stores an allocation's DeploymentStatus or returns an
|
||||
// error.
|
||||
func (s *BoltStateDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
return putDeploymentStatusImpl(tx, allocID, ds)
|
||||
})
|
||||
}
|
||||
|
||||
func putDeploymentStatusImpl(tx *boltdd.Tx, allocID string, ds *structs.AllocDeploymentStatus) error {
|
||||
allocBkt, err := getAllocationBucket(tx, allocID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
entry := deployStatusEntry{
|
||||
DeploymentStatus: ds,
|
||||
}
|
||||
return allocBkt.Put(allocDeployStatusKey, &entry)
|
||||
}
|
||||
|
||||
// GetDeploymentStatus retrieves an allocation's DeploymentStatus or returns an
|
||||
// error.
|
||||
func (s *BoltStateDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) {
|
||||
var entry deployStatusEntry
|
||||
|
||||
err := s.db.View(func(tx *boltdd.Tx) error {
|
||||
allAllocsBkt := tx.Bucket(allocationsBucketName)
|
||||
if allAllocsBkt == nil {
|
||||
// No state, return
|
||||
return nil
|
||||
}
|
||||
|
||||
allocBkt := allAllocsBkt.Bucket([]byte(allocID))
|
||||
if allocBkt == nil {
|
||||
// No state for alloc, return
|
||||
return nil
|
||||
}
|
||||
|
||||
return allocBkt.Get(allocDeployStatusKey, &entry)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return entry.DeploymentStatus, nil
|
||||
}
|
||||
|
||||
// GetTaskRunnerState returns the LocalState and TaskState for a
|
||||
// TaskRunner. LocalState or TaskState will be nil if they do not exist.
|
||||
//
|
||||
@@ -244,31 +344,43 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc
|
||||
// PutTaskRunnerLocalState stores TaskRunner's LocalState or returns an error.
|
||||
func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val *trstate.LocalState) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
taskBkt, err := getTaskBucket(tx, allocID, taskName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
||||
}
|
||||
|
||||
if err := taskBkt.Put(taskLocalStateKey, val); err != nil {
|
||||
return fmt.Errorf("failed to write task_runner state: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return putTaskRunnerLocalStateImpl(tx, allocID, taskName, val)
|
||||
})
|
||||
}
|
||||
|
||||
// putTaskRunnerLocalStateImpl stores TaskRunner's LocalState in an ongoing
|
||||
// transaction or returns an error.
|
||||
func putTaskRunnerLocalStateImpl(tx *boltdd.Tx, allocID, taskName string, val *trstate.LocalState) error {
|
||||
taskBkt, err := getTaskBucket(tx, allocID, taskName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
||||
}
|
||||
|
||||
if err := taskBkt.Put(taskLocalStateKey, val); err != nil {
|
||||
return fmt.Errorf("failed to write task_runner state: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutTaskState stores a task's state or returns an error.
|
||||
func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.TaskState) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
taskBkt, err := getTaskBucket(tx, allocID, taskName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
||||
}
|
||||
|
||||
return taskBkt.Put(taskStateKey, state)
|
||||
return putTaskStateImpl(tx, allocID, taskName, state)
|
||||
})
|
||||
}
|
||||
|
||||
// putTaskStateImpl stores a task's state in an ongoing transaction or returns
|
||||
// an error.
|
||||
func putTaskStateImpl(tx *boltdd.Tx, allocID, taskName string, state *structs.TaskState) error {
|
||||
taskBkt, err := getTaskBucket(tx, allocID, taskName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
||||
}
|
||||
|
||||
return taskBkt.Put(taskStateKey, state)
|
||||
}
|
||||
|
||||
// DeleteTaskBucket is used to delete a task bucket if it exists.
|
||||
func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
@@ -469,3 +581,59 @@ func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) {
|
||||
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
// init initializes metadata entries in a newly created state database.
|
||||
func (s *BoltStateDB) init() error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
return addMeta(tx.BoltTx())
|
||||
})
|
||||
}
|
||||
|
||||
// Upgrade bolt state db from 0.8 schema to 0.9 schema. Noop if already using
|
||||
// 0.9 schema. Creates a backup before upgrading.
|
||||
func (s *BoltStateDB) Upgrade() error {
|
||||
// Check to see if the underlying DB needs upgrading.
|
||||
upgrade, err := NeedsUpgrade(s.db.BoltDB())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !upgrade {
|
||||
// No upgrade needed!
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upgraded needed. Backup the boltdb first.
|
||||
backupFileName := filepath.Join(s.stateDir, "state.db.backup")
|
||||
if err := backupDB(s.db.BoltDB(), backupFileName); err != nil {
|
||||
return fmt.Errorf("error backing up state db: %v", err)
|
||||
}
|
||||
|
||||
// Perform the upgrade
|
||||
if err := s.db.Update(func(tx *boltdd.Tx) error {
|
||||
if err := UpgradeAllocs(s.logger, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add standard metadata
|
||||
if err := addMeta(tx.BoltTx()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write the time the upgrade was done
|
||||
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return bkt.Put(metaUpgradedKey, time.Now().Format(time.RFC3339))
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Info("successfully upgraded state")
|
||||
return nil
|
||||
}
|
||||
|
||||
// DB allows access to the underlying BoltDB for testing purposes.
|
||||
func (s *BoltStateDB) DB() *boltdd.DB {
|
||||
return s.db
|
||||
}
|
||||
|
||||
BIN
client/state/testdata/state-0.7.1.db.gz
vendored
Normal file
BIN
client/state/testdata/state-0.7.1.db.gz
vendored
Normal file
Binary file not shown.
BIN
client/state/testdata/state-0.8.6-empty.db.gz
vendored
Normal file
BIN
client/state/testdata/state-0.8.6-empty.db.gz
vendored
Normal file
Binary file not shown.
BIN
client/state/testdata/state-0.8.6-no-deploy.db.gz
vendored
Normal file
BIN
client/state/testdata/state-0.8.6-no-deploy.db.gz
vendored
Normal file
Binary file not shown.
299
client/state/upgrade.go
Normal file
299
client/state/upgrade.go
Normal file
@@ -0,0 +1,299 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/helper/boltdd"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is
|
||||
// already up to date.
|
||||
func NeedsUpgrade(bdb *bolt.DB) (bool, error) {
|
||||
needsUpgrade := true
|
||||
err := bdb.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(metaBucketName)
|
||||
if b == nil {
|
||||
// No meta bucket; upgrade
|
||||
return nil
|
||||
}
|
||||
|
||||
v := b.Get(metaVersionKey)
|
||||
if len(v) == 0 {
|
||||
// No version; upgrade
|
||||
return nil
|
||||
}
|
||||
|
||||
if string(v) != metaVersion {
|
||||
// Version exists but does not match. Abort.
|
||||
return fmt.Errorf("incompatible state version. expected %q but found %q",
|
||||
metaVersion, v)
|
||||
}
|
||||
|
||||
// Version matches! Assume migrated!
|
||||
needsUpgrade = false
|
||||
return nil
|
||||
})
|
||||
|
||||
return needsUpgrade, err
|
||||
}
|
||||
|
||||
// addMeta adds version metadata to BoltDB to mark it as upgraded and
|
||||
// should be run at the end of the upgrade transaction.
|
||||
func addMeta(tx *bolt.Tx) error {
|
||||
// Create the meta bucket if it doesn't exist
|
||||
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bkt.Put(metaVersionKey, []byte(metaVersion))
|
||||
}
|
||||
|
||||
// backupDB backs up the existing state database prior to upgrade overwriting
|
||||
// previous backups.
|
||||
func backupDB(bdb *bolt.DB, dst string) error {
|
||||
fd, err := os.Create(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bdb.View(func(tx *bolt.Tx) error {
|
||||
if _, err := tx.WriteTo(fd); err != nil {
|
||||
fd.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return fd.Close()
|
||||
})
|
||||
}
|
||||
|
||||
// UpgradeSchema upgrades the boltdb schema. Example 0.8 schema:
|
||||
//
|
||||
// * allocations
|
||||
// * 15d83e8a-74a2-b4da-3f17-ed5c12895ea8
|
||||
// * echo
|
||||
// - simple-all (342 bytes)
|
||||
// - alloc (2827 bytes)
|
||||
// - alloc-dir (166 bytes)
|
||||
// - immutable (15 bytes)
|
||||
// - mutable (1294 bytes)
|
||||
//
|
||||
func UpgradeAllocs(logger hclog.Logger, tx *boltdd.Tx) error {
|
||||
btx := tx.BoltTx()
|
||||
allocationsBucket := btx.Bucket(allocationsBucketName)
|
||||
if allocationsBucket == nil {
|
||||
// No state!
|
||||
return nil
|
||||
}
|
||||
|
||||
// Gather alloc buckets and remove unexpected key/value pairs
|
||||
allocBuckets := [][]byte{}
|
||||
cur := allocationsBucket.Cursor()
|
||||
for k, v := cur.First(); k != nil; k, v = cur.Next() {
|
||||
if v != nil {
|
||||
logger.Warn("deleting unexpected key in state db",
|
||||
"key", string(k), "value_bytes", len(v),
|
||||
)
|
||||
|
||||
if err := cur.Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
allocBuckets = append(allocBuckets, k)
|
||||
}
|
||||
|
||||
for _, allocBucket := range allocBuckets {
|
||||
allocID := string(allocBucket)
|
||||
|
||||
bkt := allocationsBucket.Bucket(allocBucket)
|
||||
if bkt == nil {
|
||||
// This should never happen as we just read the bucket.
|
||||
return fmt.Errorf("unexpected bucket missing %q", allocID)
|
||||
}
|
||||
|
||||
allocLogger := logger.With("alloc_id", allocID)
|
||||
if err := upgradeAllocBucket(allocLogger, tx, bkt, allocID); err != nil {
|
||||
// Log and drop invalid allocs
|
||||
allocLogger.Error("dropping invalid allocation due to error while upgrading state",
|
||||
"error", err,
|
||||
)
|
||||
|
||||
// If we can't delete the bucket something is seriously
|
||||
// wrong, fail hard.
|
||||
if err := allocationsBucket.DeleteBucket(allocBucket); err != nil {
|
||||
return fmt.Errorf("error deleting invalid allocation state: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// upgradeAllocBucket upgrades an alloc bucket.
|
||||
func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, allocID string) error {
|
||||
allocFound := false
|
||||
taskBuckets := [][]byte{}
|
||||
cur := bkt.Cursor()
|
||||
for k, v := cur.First(); k != nil; k, v = cur.Next() {
|
||||
switch string(k) {
|
||||
case "alloc":
|
||||
// Alloc has not changed; leave it be
|
||||
allocFound = true
|
||||
case "alloc-dir":
|
||||
// Delete alloc-dir entries as they're no longer
|
||||
// needed.
|
||||
cur.Delete()
|
||||
case "immutable":
|
||||
// Skip decoding immutable state. Nothing from it needs
|
||||
// to be upgraded.
|
||||
cur.Delete()
|
||||
case "mutable":
|
||||
// Decode and upgrade
|
||||
if err := upgradeOldAllocMutable(tx, allocID, v); err != nil {
|
||||
return err
|
||||
}
|
||||
cur.Delete()
|
||||
default:
|
||||
if v != nil {
|
||||
logger.Warn("deleting unexpected state entry for allocation",
|
||||
"key", string(k), "value_bytes", len(v),
|
||||
)
|
||||
|
||||
if err := cur.Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Nested buckets are tasks
|
||||
taskBuckets = append(taskBuckets, k)
|
||||
}
|
||||
}
|
||||
|
||||
// If the alloc entry was not found, abandon this allocation as the
|
||||
// state has been corrupted.
|
||||
if !allocFound {
|
||||
return fmt.Errorf("alloc entry not found")
|
||||
}
|
||||
|
||||
// Upgrade tasks
|
||||
for _, taskBucket := range taskBuckets {
|
||||
taskName := string(taskBucket)
|
||||
taskLogger := logger.With("task_name", taskName)
|
||||
|
||||
taskBkt := bkt.Bucket(taskBucket)
|
||||
if taskBkt == nil {
|
||||
// This should never happen as we just read the bucket.
|
||||
return fmt.Errorf("unexpected bucket missing %q", taskName)
|
||||
}
|
||||
|
||||
oldState, err := upgradeTaskBucket(taskLogger, taskBkt)
|
||||
if err != nil {
|
||||
taskLogger.Warn("dropping invalid task due to error while upgrading state",
|
||||
"error", err,
|
||||
)
|
||||
|
||||
// Delete the invalid task bucket and treat failures
|
||||
// here as unrecoverable errors.
|
||||
if err := bkt.DeleteBucket(taskBucket); err != nil {
|
||||
return fmt.Errorf("error deleting invalid task state for task %q: %v",
|
||||
taskName, err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Convert 0.8 task state to 0.9 task state
|
||||
localTaskState := oldState.Upgrade()
|
||||
|
||||
// Insert the new task state
|
||||
if err := putTaskRunnerLocalStateImpl(tx, allocID, taskName, localTaskState); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete the old task bucket
|
||||
if err := bkt.DeleteBucket(taskBucket); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
taskLogger.Trace("upgraded", "from", oldState.Version)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// upgradeTaskBucket iterates over keys in a task bucket, deleting invalid keys
|
||||
// and returning the 0.8 version of the state.
|
||||
func upgradeTaskBucket(logger hclog.Logger, bkt *bolt.Bucket) (*taskRunnerState08, error) {
|
||||
simpleFound := false
|
||||
var trState taskRunnerState08
|
||||
|
||||
cur := bkt.Cursor()
|
||||
for k, v := cur.First(); k != nil; k, v = cur.Next() {
|
||||
if !bytes.Equal(k, []byte("simple-all")) {
|
||||
if v == nil {
|
||||
// Delete Bucket
|
||||
logger.Warn("deleting unexpected task state bucket",
|
||||
"bucket", string(k),
|
||||
)
|
||||
|
||||
if err := bkt.DeleteBucket(k); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// Delete entry
|
||||
logger.Warn("deleting unexpected task state entry",
|
||||
"key", string(k), "value_bytes", len(v),
|
||||
)
|
||||
|
||||
if err := cur.Delete(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Decode simple-all
|
||||
simpleFound = true
|
||||
if err := codec.NewDecoderBytes(v, structs.MsgpackHandle).Decode(&trState); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode task state: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !simpleFound {
|
||||
return nil, fmt.Errorf("task state entry not found")
|
||||
}
|
||||
|
||||
return &trState, nil
|
||||
}
|
||||
|
||||
// upgradeOldAllocMutable upgrades Nomad 0.8 alloc runner state.
|
||||
func upgradeOldAllocMutable(tx *boltdd.Tx, allocID string, oldBytes []byte) error {
|
||||
var oldMutable allocRunnerMutableState08
|
||||
err := codec.NewDecoderBytes(oldBytes, structs.MsgpackHandle).Decode(&oldMutable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Upgrade Deployment Status
|
||||
if err := putDeploymentStatusImpl(tx, allocID, oldMutable.DeploymentStatus); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Upgrade Task States
|
||||
for taskName, taskState := range oldMutable.TaskStates {
|
||||
if err := putTaskStateImpl(tx, allocID, taskName, taskState); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
159
client/state/upgrade_int_test.go
Normal file
159
client/state/upgrade_int_test.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package state_test
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunner"
|
||||
"github.com/hashicorp/nomad/client/allocwatcher"
|
||||
clientconfig "github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
. "github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/helper/boltdd"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/shared"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestBoltStateDB_Upgrade_Ok asserts upgading an old state db does not error
|
||||
// during upgrade and restore.
|
||||
func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
files, err := filepath.Glob("testdata/*.db*")
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, fn := range files {
|
||||
t.Run(fn, func(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "nomadtest")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
var src io.ReadCloser
|
||||
src, err = os.Open(fn)
|
||||
require.NoError(t, err)
|
||||
defer src.Close()
|
||||
|
||||
// testdata may be gzip'd; decode on copy
|
||||
if strings.HasSuffix(fn, ".gz") {
|
||||
src, err = gzip.NewReader(src)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
dst, err := os.Create(filepath.Join(dir, "state.db"))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Copy test files before testing them for safety
|
||||
_, err = io.Copy(dst, src)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, src.Close())
|
||||
|
||||
dbI, err := NewBoltStateDB(testlog.HCLogger(t), dir)
|
||||
require.NoError(t, err)
|
||||
defer dbI.Close()
|
||||
|
||||
db := dbI.(*BoltStateDB)
|
||||
|
||||
// Simply opening old files should *not* alter them
|
||||
require.NoError(t, db.DB().View(func(tx *boltdd.Tx) error {
|
||||
b := tx.Bucket([]byte("meta"))
|
||||
if b != nil {
|
||||
return fmt.Errorf("meta bucket found but should not exist yet!")
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
|
||||
needsUpgrade, err := NeedsUpgrade(db.DB().BoltDB())
|
||||
require.NoError(t, err)
|
||||
require.True(t, needsUpgrade)
|
||||
|
||||
// Attept the upgrade
|
||||
require.NoError(t, db.Upgrade())
|
||||
|
||||
needsUpgrade, err = NeedsUpgrade(db.DB().BoltDB())
|
||||
require.NoError(t, err)
|
||||
require.False(t, needsUpgrade)
|
||||
|
||||
// Ensure Allocations can be restored and
|
||||
// NewAR/AR.Restore do not error.
|
||||
allocs, errs, err := db.GetAllAllocations()
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, errs, 0)
|
||||
|
||||
for _, alloc := range allocs {
|
||||
checkUpgradedAlloc(t, dir, db, alloc)
|
||||
}
|
||||
|
||||
// Should be nil for all upgrades
|
||||
ps, err := db.GetDevicePluginState()
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, ps)
|
||||
|
||||
ps = &dmstate.PluginState{
|
||||
ReattachConfigs: map[string]*shared.ReattachConfig{
|
||||
"test": &shared.ReattachConfig{Pid: 1},
|
||||
},
|
||||
}
|
||||
require.NoError(t, db.PutDevicePluginState(ps))
|
||||
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// checkUpgradedAlloc creates and restores an AllocRunner from an upgraded
|
||||
// database.
|
||||
//
|
||||
// It does not call AR.Run as its intended to be used against a wide test
|
||||
// corpus in testdata that may be expensive to run and require unavailable
|
||||
// dependencies.
|
||||
func checkUpgradedAlloc(t *testing.T, path string, db StateDB, alloc *structs.Allocation) {
|
||||
_, err := db.GetDeploymentStatus(alloc.ID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
for _, task := range tg.Tasks {
|
||||
_, _, err := db.GetTaskRunnerState(alloc.ID, task.Name)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
clientConf, cleanup := clientconfig.TestClientConfig(t)
|
||||
|
||||
// Does *not* cleanup overridden StateDir below. That's left alone for
|
||||
// the caller to cleanup.
|
||||
defer cleanup()
|
||||
|
||||
clientConf.StateDir = path
|
||||
|
||||
conf := &allocrunner.Config{
|
||||
Alloc: alloc,
|
||||
Logger: clientConf.Logger,
|
||||
ClientConfig: clientConf,
|
||||
StateDB: db,
|
||||
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
|
||||
Vault: vaultclient.NewMockVaultClient(),
|
||||
StateUpdater: &allocrunner.MockStateUpdater{},
|
||||
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
|
||||
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
|
||||
DeviceManager: devicemanager.NoopMockManager(),
|
||||
DriverManager: drivermanager.TestDriverManager(t),
|
||||
}
|
||||
ar, err := allocrunner.NewAllocRunner(conf)
|
||||
require.NoError(t, err)
|
||||
|
||||
// AllocRunner.Restore should not error
|
||||
require.NoError(t, ar.Restore())
|
||||
}
|
||||
109
client/state/upgrade_test.go
Normal file
109
client/state/upgrade_test.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/hashicorp/nomad/helper/boltdd"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestUpgrade_NeedsUpgrade_New asserts new state dbs do not need upgrading.
|
||||
func TestUpgrade_NeedsUpgrade_New(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
up, err := NeedsUpgrade(db.DB().BoltDB())
|
||||
require.NoError(t, err)
|
||||
require.False(t, up)
|
||||
}
|
||||
|
||||
// TestUpgrade_NeedsUpgrade_Old asserts state dbs with just the alloctions
|
||||
// bucket *do* need upgrading.
|
||||
func TestUpgrade_NeedsUpgrade_Old(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir, err := ioutil.TempDir("", "nomadtest")
|
||||
require.NoError(t, err)
|
||||
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db, err := bolt.Open(filepath.Join(dir, "state.db"), 0666, nil)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
// Create the allocations bucket which exists in both the old and 0.9
|
||||
// schemas
|
||||
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := tx.CreateBucket(allocationsBucketName)
|
||||
return err
|
||||
}))
|
||||
|
||||
up, err := NeedsUpgrade(db)
|
||||
require.NoError(t, err)
|
||||
require.True(t, up)
|
||||
|
||||
// Adding meta should mark it as upgraded
|
||||
require.NoError(t, db.Update(addMeta))
|
||||
|
||||
up, err = NeedsUpgrade(db)
|
||||
require.NoError(t, err)
|
||||
require.False(t, up)
|
||||
}
|
||||
|
||||
// TestUpgrade_DeleteInvalidAllocs asserts invalid allocations are deleted
|
||||
// during state upgades instead of failing the entire agent.
|
||||
func TestUpgrade_DeleteInvalidAllocs_NoAlloc(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir, err := ioutil.TempDir("", "nomadtest")
|
||||
require.NoError(t, err)
|
||||
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db, err := boltdd.Open(filepath.Join(dir, "state.db"), 0666, nil)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
allocID := []byte(uuid.Generate())
|
||||
|
||||
// Create an allocation bucket with no `alloc` key. This is an observed
|
||||
// pre-0.9 state corruption that should result in the allocation being
|
||||
// dropped while allowing the upgrade to continue.
|
||||
require.NoError(t, db.Update(func(tx *boltdd.Tx) error {
|
||||
parentBkt, err := tx.CreateBucket(allocationsBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = parentBkt.CreateBucket(allocID)
|
||||
return err
|
||||
}))
|
||||
|
||||
// Perform the Upgrade
|
||||
require.NoError(t, db.Update(func(tx *boltdd.Tx) error {
|
||||
return UpgradeAllocs(testlog.HCLogger(t), tx)
|
||||
}))
|
||||
|
||||
// Assert invalid allocation bucket was removed
|
||||
require.NoError(t, db.View(func(tx *boltdd.Tx) error {
|
||||
parentBkt := tx.Bucket(allocationsBucketName)
|
||||
if parentBkt == nil {
|
||||
return fmt.Errorf("parent allocations bucket should not have been removed")
|
||||
}
|
||||
|
||||
if parentBkt.Bucket(allocID) != nil {
|
||||
return fmt.Errorf("invalid alloc bucket should have been deleted")
|
||||
}
|
||||
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
Reference in New Issue
Block a user