From fc7be6b7f021ce075bded60a1ba80cdfbd8215c2 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 18 Dec 2018 15:36:32 -0800 Subject: [PATCH] client/state: improve upgradeTaskBucket error handling And add a test --- client/state/db_test.go | 4 +- client/state/upgrade.go | 22 ++++----- client/state/upgrade_test.go | 86 ++++++++++++++++++++++++++++-------- helper/boltdd/boltdd.go | 9 +++- 4 files changed, 88 insertions(+), 33 deletions(-) diff --git a/client/state/db_test.go b/client/state/db_test.go index 68005a21a..87431d980 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -16,7 +16,7 @@ import ( "github.com/stretchr/testify/require" ) -func setupBoltDB(t *testing.T) (*BoltStateDB, func()) { +func setupBoltStateDB(t *testing.T) (*BoltStateDB, func()) { dir, err := ioutil.TempDir("", "nomadtest") require.NoError(t, err) @@ -41,7 +41,7 @@ func setupBoltDB(t *testing.T) (*BoltStateDB, func()) { } func testDB(t *testing.T, f func(*testing.T, StateDB)) { - boltdb, cleanup := setupBoltDB(t) + boltdb, cleanup := setupBoltStateDB(t) defer cleanup() memdb := NewMemDB() diff --git a/client/state/upgrade.go b/client/state/upgrade.go index a712fb8a6..36fe6e5e8 100644 --- a/client/state/upgrade.go +++ b/client/state/upgrade.go @@ -237,19 +237,19 @@ func upgradeTaskBucket(logger hclog.Logger, bkt *bolt.Bucket) (*taskRunnerState0 cur := bkt.Cursor() for k, v := cur.First(); k != nil; k, v = cur.Next() { - if !bytes.Equal(k, []byte("simple-all")) { - if v == nil { - // value is nil: delete unexpected bucket - logger.Warn("deleting unexpected task state bucket", - "bucket", string(k), - ) + if v == nil { + // value is nil: delete unexpected bucket + logger.Warn("deleting unexpected task state bucket", + "bucket", string(k), + ) - if err := bkt.DeleteBucket(k); err != nil { - return nil, fmt.Errorf("error deleting unexpected task bucket %q: %v", string(k), err) - } - continue + if err := bkt.DeleteBucket(k); err != nil { + return nil, fmt.Errorf("error deleting unexpected task bucket %q: %v", string(k), err) } + continue + } + if !bytes.Equal(k, []byte("simple-all")) { // value is non-nil: delete unexpected entry logger.Warn("deleting unexpected task state entry", "key", string(k), "value_bytes", len(v), @@ -264,7 +264,7 @@ func upgradeTaskBucket(logger hclog.Logger, bkt *bolt.Bucket) (*taskRunnerState0 // Decode simple-all simpleFound = true if err := codec.NewDecoderBytes(v, structs.MsgpackHandle).Decode(&trState); err != nil { - return nil, fmt.Errorf("failed to decode task state: %v", err) + return nil, fmt.Errorf("failed to decode task state from 'simple-all' entry: %v", err) } } diff --git a/client/state/upgrade_test.go b/client/state/upgrade_test.go index 42380335e..571c12a55 100644 --- a/client/state/upgrade_test.go +++ b/client/state/upgrade_test.go @@ -14,11 +14,28 @@ import ( "github.com/stretchr/testify/require" ) +func setupBoltDB(t *testing.T) (*bolt.DB, func()) { + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + + db, err := bolt.Open(filepath.Join(dir, "state.db"), 0666, nil) + if err != nil { + os.RemoveAll(dir) + require.NoError(t, err) + } + + return db, func() { + require.NoError(t, db.Close()) + require.NoError(t, os.RemoveAll(dir)) + } +} + // TestUpgrade_NeedsUpgrade_New asserts new state dbs do not need upgrading. func TestUpgrade_NeedsUpgrade_New(t *testing.T) { t.Parallel() - db, cleanup := setupBoltDB(t) + // Setting up a new StateDB should initialize it at the latest version. + db, cleanup := setupBoltStateDB(t) defer cleanup() up, err := NeedsUpgrade(db.DB().BoltDB()) @@ -31,14 +48,8 @@ func TestUpgrade_NeedsUpgrade_New(t *testing.T) { func TestUpgrade_NeedsUpgrade_Old(t *testing.T) { t.Parallel() - dir, err := ioutil.TempDir("", "nomadtest") - require.NoError(t, err) - - defer os.RemoveAll(dir) - - db, err := bolt.Open(filepath.Join(dir, "state.db"), 0666, nil) - require.NoError(t, err) - defer db.Close() + db, cleanup := setupBoltDB(t) + defer cleanup() // Create the allocations bucket which exists in both the old and 0.9 // schemas @@ -77,14 +88,14 @@ func TestUpgrade_NeedsUpgrade_Error(t *testing.T) { db, cleanup := setupBoltDB(t) defer cleanup() - require.NoError(t, db.DB().BoltDB().Update(func(tx *bolt.Tx) error { + require.NoError(t, db.Update(func(tx *bolt.Tx) error { bkt, err := tx.CreateBucketIfNotExists(metaBucketName) require.NoError(t, err) return bkt.Put(metaVersionKey, tc) })) - _, err := NeedsUpgrade(db.DB().BoltDB()) + _, err := NeedsUpgrade(db) require.Error(t, err) }) } @@ -95,14 +106,10 @@ func TestUpgrade_NeedsUpgrade_Error(t *testing.T) { func TestUpgrade_DeleteInvalidAllocs_NoAlloc(t *testing.T) { t.Parallel() - dir, err := ioutil.TempDir("", "nomadtest") - require.NoError(t, err) + bdb, cleanup := setupBoltDB(t) + defer cleanup() - defer os.RemoveAll(dir) - - db, err := boltdd.Open(filepath.Join(dir, "state.db"), 0666, nil) - require.NoError(t, err) - defer db.Close() + db := boltdd.New(bdb) allocID := []byte(uuid.Generate()) @@ -138,3 +145,46 @@ func TestUpgrade_DeleteInvalidAllocs_NoAlloc(t *testing.T) { return nil })) } + +// TestUpgrade_DeleteInvalidTaskEntries asserts invalid entries under a task +// bucket are deleted. +func TestUpgrade_upgradeTaskBucket_InvalidEntries(t *testing.T) { + t.Parallel() + + db, cleanup := setupBoltDB(t) + defer cleanup() + + taskName := []byte("fake-task") + + // Insert unexpected bucket, unexpected key, and missing simple-all + require.NoError(t, db.Update(func(tx *bolt.Tx) error { + bkt, err := tx.CreateBucket(taskName) + if err != nil { + return err + } + + _, err = bkt.CreateBucket([]byte("unexpectedBucket")) + if err != nil { + return err + } + + return bkt.Put([]byte("unexepectedKey"), []byte{'x'}) + })) + + require.NoError(t, db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(taskName) + + // upgradeTaskBucket should fail + state, err := upgradeTaskBucket(testlog.HCLogger(t), bkt) + require.Nil(t, state) + require.Error(t, err) + + // Invalid entries should have been deleted + cur := bkt.Cursor() + for k, v := cur.First(); k != nil; k, v = cur.Next() { + t.Errorf("unexpected entry found: key=%q value=%q", k, v) + } + + return nil + })) +} diff --git a/helper/boltdd/boltdd.go b/helper/boltdd/boltdd.go index b3cb177af..93e0b713e 100644 --- a/helper/boltdd/boltdd.go +++ b/helper/boltdd/boltdd.go @@ -1,4 +1,4 @@ -// boltdd contains a wrapper around BoltDB to deduplicate writes and encode +// BOLTdd contains a wrapper around BoltDB to deduplicate writes and encode // values using mgspack. (dd stands for DeDuplicate) package boltdd @@ -54,10 +54,15 @@ func Open(path string, mode os.FileMode, options *bolt.Options) (*DB, error) { return nil, err } + return New(bdb), nil +} + +// New deduplicating wrapper for the given boltdb. +func New(bdb *bolt.DB) *DB { return &DB{ rootBuckets: make(map[string]*bucketMeta), bdb: bdb, - }, nil + } } func (db *DB) bucket(btx *bolt.Tx, name []byte) *Bucket {