mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 01:45:44 +03:00
client/state: improve upgradeTaskBucket error handling
And add a test
This commit is contained in:
@@ -16,7 +16,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func setupBoltDB(t *testing.T) (*BoltStateDB, func()) {
|
||||
func setupBoltStateDB(t *testing.T) (*BoltStateDB, func()) {
|
||||
dir, err := ioutil.TempDir("", "nomadtest")
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -41,7 +41,7 @@ func setupBoltDB(t *testing.T) (*BoltStateDB, func()) {
|
||||
}
|
||||
|
||||
func testDB(t *testing.T, f func(*testing.T, StateDB)) {
|
||||
boltdb, cleanup := setupBoltDB(t)
|
||||
boltdb, cleanup := setupBoltStateDB(t)
|
||||
defer cleanup()
|
||||
|
||||
memdb := NewMemDB()
|
||||
|
||||
@@ -237,19 +237,19 @@ func upgradeTaskBucket(logger hclog.Logger, bkt *bolt.Bucket) (*taskRunnerState0
|
||||
|
||||
cur := bkt.Cursor()
|
||||
for k, v := cur.First(); k != nil; k, v = cur.Next() {
|
||||
if !bytes.Equal(k, []byte("simple-all")) {
|
||||
if v == nil {
|
||||
// value is nil: delete unexpected bucket
|
||||
logger.Warn("deleting unexpected task state bucket",
|
||||
"bucket", string(k),
|
||||
)
|
||||
if v == nil {
|
||||
// value is nil: delete unexpected bucket
|
||||
logger.Warn("deleting unexpected task state bucket",
|
||||
"bucket", string(k),
|
||||
)
|
||||
|
||||
if err := bkt.DeleteBucket(k); err != nil {
|
||||
return nil, fmt.Errorf("error deleting unexpected task bucket %q: %v", string(k), err)
|
||||
}
|
||||
continue
|
||||
if err := bkt.DeleteBucket(k); err != nil {
|
||||
return nil, fmt.Errorf("error deleting unexpected task bucket %q: %v", string(k), err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if !bytes.Equal(k, []byte("simple-all")) {
|
||||
// value is non-nil: delete unexpected entry
|
||||
logger.Warn("deleting unexpected task state entry",
|
||||
"key", string(k), "value_bytes", len(v),
|
||||
@@ -264,7 +264,7 @@ func upgradeTaskBucket(logger hclog.Logger, bkt *bolt.Bucket) (*taskRunnerState0
|
||||
// Decode simple-all
|
||||
simpleFound = true
|
||||
if err := codec.NewDecoderBytes(v, structs.MsgpackHandle).Decode(&trState); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode task state: %v", err)
|
||||
return nil, fmt.Errorf("failed to decode task state from 'simple-all' entry: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,11 +14,28 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func setupBoltDB(t *testing.T) (*bolt.DB, func()) {
|
||||
dir, err := ioutil.TempDir("", "nomadtest")
|
||||
require.NoError(t, err)
|
||||
|
||||
db, err := bolt.Open(filepath.Join(dir, "state.db"), 0666, nil)
|
||||
if err != nil {
|
||||
os.RemoveAll(dir)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
return db, func() {
|
||||
require.NoError(t, db.Close())
|
||||
require.NoError(t, os.RemoveAll(dir))
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpgrade_NeedsUpgrade_New asserts new state dbs do not need upgrading.
|
||||
func TestUpgrade_NeedsUpgrade_New(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, cleanup := setupBoltDB(t)
|
||||
// Setting up a new StateDB should initialize it at the latest version.
|
||||
db, cleanup := setupBoltStateDB(t)
|
||||
defer cleanup()
|
||||
|
||||
up, err := NeedsUpgrade(db.DB().BoltDB())
|
||||
@@ -31,14 +48,8 @@ func TestUpgrade_NeedsUpgrade_New(t *testing.T) {
|
||||
func TestUpgrade_NeedsUpgrade_Old(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir, err := ioutil.TempDir("", "nomadtest")
|
||||
require.NoError(t, err)
|
||||
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db, err := bolt.Open(filepath.Join(dir, "state.db"), 0666, nil)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
db, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Create the allocations bucket which exists in both the old and 0.9
|
||||
// schemas
|
||||
@@ -77,14 +88,14 @@ func TestUpgrade_NeedsUpgrade_Error(t *testing.T) {
|
||||
db, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
require.NoError(t, db.DB().BoltDB().Update(func(tx *bolt.Tx) error {
|
||||
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
|
||||
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
|
||||
require.NoError(t, err)
|
||||
|
||||
return bkt.Put(metaVersionKey, tc)
|
||||
}))
|
||||
|
||||
_, err := NeedsUpgrade(db.DB().BoltDB())
|
||||
_, err := NeedsUpgrade(db)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
@@ -95,14 +106,10 @@ func TestUpgrade_NeedsUpgrade_Error(t *testing.T) {
|
||||
func TestUpgrade_DeleteInvalidAllocs_NoAlloc(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir, err := ioutil.TempDir("", "nomadtest")
|
||||
require.NoError(t, err)
|
||||
bdb, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
db, err := boltdd.Open(filepath.Join(dir, "state.db"), 0666, nil)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
db := boltdd.New(bdb)
|
||||
|
||||
allocID := []byte(uuid.Generate())
|
||||
|
||||
@@ -138,3 +145,46 @@ func TestUpgrade_DeleteInvalidAllocs_NoAlloc(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
// TestUpgrade_DeleteInvalidTaskEntries asserts invalid entries under a task
|
||||
// bucket are deleted.
|
||||
func TestUpgrade_upgradeTaskBucket_InvalidEntries(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
taskName := []byte("fake-task")
|
||||
|
||||
// Insert unexpected bucket, unexpected key, and missing simple-all
|
||||
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
|
||||
bkt, err := tx.CreateBucket(taskName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = bkt.CreateBucket([]byte("unexpectedBucket"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bkt.Put([]byte("unexepectedKey"), []byte{'x'})
|
||||
}))
|
||||
|
||||
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
|
||||
bkt := tx.Bucket(taskName)
|
||||
|
||||
// upgradeTaskBucket should fail
|
||||
state, err := upgradeTaskBucket(testlog.HCLogger(t), bkt)
|
||||
require.Nil(t, state)
|
||||
require.Error(t, err)
|
||||
|
||||
// Invalid entries should have been deleted
|
||||
cur := bkt.Cursor()
|
||||
for k, v := cur.First(); k != nil; k, v = cur.Next() {
|
||||
t.Errorf("unexpected entry found: key=%q value=%q", k, v)
|
||||
}
|
||||
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// boltdd contains a wrapper around BoltDB to deduplicate writes and encode
|
||||
// BOLTdd contains a wrapper around BoltDB to deduplicate writes and encode
|
||||
// values using mgspack. (dd stands for DeDuplicate)
|
||||
package boltdd
|
||||
|
||||
@@ -54,10 +54,15 @@ func Open(path string, mode os.FileMode, options *bolt.Options) (*DB, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return New(bdb), nil
|
||||
}
|
||||
|
||||
// New deduplicating wrapper for the given boltdb.
|
||||
func New(bdb *bolt.DB) *DB {
|
||||
return &DB{
|
||||
rootBuckets: make(map[string]*bucketMeta),
|
||||
bdb: bdb,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (db *DB) bucket(btx *bolt.Tx, name []byte) *Bucket {
|
||||
|
||||
Reference in New Issue
Block a user