mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
agent: switch to go.etc.io/bbolt for state store
This PR modifies the server and client agents to use `go.etc.io/bbolt` as the implementation for their state stores.
This commit is contained in:
@@ -3,7 +3,7 @@ deps: Update hashicorp/raft-boltdb to v2.2.0
|
|||||||
```
|
```
|
||||||
|
|
||||||
```release-note:improvement
|
```release-note:improvement
|
||||||
core: Switch from boltdb/bolt to go.etcd.io/bbolt
|
agent: Switch from boltdb/bolt to go.etcd.io/bbolt
|
||||||
```
|
```
|
||||||
|
|
||||||
```release-note:improvement
|
```release-note:improvement
|
||||||
@@ -15,5 +15,6 @@ metrics: Emit metrics regarding raft boltdb operations
|
|||||||
```
|
```
|
||||||
|
|
||||||
```release-note:breaking-change
|
```release-note:breaking-change
|
||||||
core: The server raft implementation will automatically migrate its underlying raft.db database on startup. Downgrading to a previous version of the server after upgrading it to Nomad 1.3 is not supported.
|
agent: The state database on both clients and servers will automatically migrate its underlying database on startup. Downgrading to a previous version of an agent after upgrading it to Nomad 1.3 is not supported.
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@@ -6,8 +6,6 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
|
||||||
|
|
||||||
hclog "github.com/hashicorp/go-hclog"
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||||
@@ -15,6 +13,7 @@ import (
|
|||||||
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
||||||
"github.com/hashicorp/nomad/helper/boltdd"
|
"github.com/hashicorp/nomad/helper/boltdd"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -139,11 +138,11 @@ func NewBoltStateDB(logger hclog.Logger, stateDir string) (StateDB, error) {
|
|||||||
firstRun := fi == nil
|
firstRun := fi == nil
|
||||||
|
|
||||||
// Timeout to force failure when accessing a data dir that is already in use
|
// Timeout to force failure when accessing a data dir that is already in use
|
||||||
timeout := &bolt.Options{Timeout: 5 * time.Second}
|
timeout := &bbolt.Options{Timeout: 5 * time.Second}
|
||||||
|
|
||||||
// Create or open the boltdb state database
|
// Create or open the boltdb state database
|
||||||
db, err := boltdd.Open(fn, 0600, timeout)
|
db, err := boltdd.Open(fn, 0600, timeout)
|
||||||
if err == bolt.ErrTimeout {
|
if err == bbolt.ErrTimeout {
|
||||||
return nil, fmt.Errorf("timed out while opening database, is another Nomad process accessing data_dir %s?", stateDir)
|
return nil, fmt.Errorf("timed out while opening database, is another Nomad process accessing data_dir %s?", stateDir)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create state database: %v", err)
|
return nil, fmt.Errorf("failed to create state database: %v", err)
|
||||||
|
|||||||
@@ -6,20 +6,19 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
|
||||||
hclog "github.com/hashicorp/go-hclog"
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
"github.com/hashicorp/nomad/client/dynamicplugins"
|
"github.com/hashicorp/nomad/client/dynamicplugins"
|
||||||
"github.com/hashicorp/nomad/helper/boltdd"
|
"github.com/hashicorp/nomad/helper/boltdd"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is
|
// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is
|
||||||
// already up to date.
|
// already up to date.
|
||||||
func NeedsUpgrade(bdb *bolt.DB) (upgradeTo09, upgradeTo13 bool, err error) {
|
func NeedsUpgrade(bdb *bbolt.DB) (upgradeTo09, upgradeTo13 bool, err error) {
|
||||||
upgradeTo09 = true
|
upgradeTo09 = true
|
||||||
upgradeTo13 = true
|
upgradeTo13 = true
|
||||||
err = bdb.View(func(tx *bolt.Tx) error {
|
err = bdb.View(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(metaBucketName)
|
b := tx.Bucket(metaBucketName)
|
||||||
if b == nil {
|
if b == nil {
|
||||||
// No meta bucket; upgrade
|
// No meta bucket; upgrade
|
||||||
@@ -53,7 +52,7 @@ func NeedsUpgrade(bdb *bolt.DB) (upgradeTo09, upgradeTo13 bool, err error) {
|
|||||||
|
|
||||||
// addMeta adds version metadata to BoltDB to mark it as upgraded and
|
// addMeta adds version metadata to BoltDB to mark it as upgraded and
|
||||||
// should be run at the end of the upgrade transaction.
|
// should be run at the end of the upgrade transaction.
|
||||||
func addMeta(tx *bolt.Tx) error {
|
func addMeta(tx *bbolt.Tx) error {
|
||||||
// Create the meta bucket if it doesn't exist
|
// Create the meta bucket if it doesn't exist
|
||||||
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
|
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -64,13 +63,13 @@ func addMeta(tx *bolt.Tx) error {
|
|||||||
|
|
||||||
// backupDB backs up the existing state database prior to upgrade overwriting
|
// backupDB backs up the existing state database prior to upgrade overwriting
|
||||||
// previous backups.
|
// previous backups.
|
||||||
func backupDB(bdb *bolt.DB, dst string) error {
|
func backupDB(bdb *bbolt.DB, dst string) error {
|
||||||
fd, err := os.Create(dst)
|
fd, err := os.Create(dst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return bdb.View(func(tx *bolt.Tx) error {
|
return bdb.View(func(tx *bbolt.Tx) error {
|
||||||
if _, err := tx.WriteTo(fd); err != nil {
|
if _, err := tx.WriteTo(fd); err != nil {
|
||||||
fd.Close()
|
fd.Close()
|
||||||
return err
|
return err
|
||||||
@@ -145,7 +144,7 @@ func UpgradeAllocs(logger hclog.Logger, tx *boltdd.Tx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// upgradeAllocBucket upgrades an alloc bucket.
|
// upgradeAllocBucket upgrades an alloc bucket.
|
||||||
func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, allocID string) error {
|
func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bbolt.Bucket, allocID string) error {
|
||||||
allocFound := false
|
allocFound := false
|
||||||
taskBuckets := [][]byte{}
|
taskBuckets := [][]byte{}
|
||||||
cur := bkt.Cursor()
|
cur := bkt.Cursor()
|
||||||
@@ -253,7 +252,7 @@ func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, al
|
|||||||
|
|
||||||
// upgradeTaskBucket iterates over keys in a task bucket, deleting invalid keys
|
// upgradeTaskBucket iterates over keys in a task bucket, deleting invalid keys
|
||||||
// and returning the 0.8 version of the state.
|
// and returning the 0.8 version of the state.
|
||||||
func upgradeTaskBucket(logger hclog.Logger, bkt *bolt.Bucket) (*taskRunnerState08, error) {
|
func upgradeTaskBucket(logger hclog.Logger, bkt *bbolt.Bucket) (*taskRunnerState08, error) {
|
||||||
simpleFound := false
|
simpleFound := false
|
||||||
var trState taskRunnerState08
|
var trState taskRunnerState08
|
||||||
|
|
||||||
|
|||||||
@@ -7,18 +7,18 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
|
||||||
"github.com/hashicorp/nomad/helper/boltdd"
|
"github.com/hashicorp/nomad/helper/boltdd"
|
||||||
"github.com/hashicorp/nomad/helper/testlog"
|
"github.com/hashicorp/nomad/helper/testlog"
|
||||||
"github.com/hashicorp/nomad/helper/uuid"
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
func setupBoltDB(t *testing.T) (*bolt.DB, func()) {
|
func setupBoltDB(t *testing.T) (*bbolt.DB, func()) {
|
||||||
dir, err := ioutil.TempDir("", "nomadtest")
|
dir, err := ioutil.TempDir("", "nomadtest")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
db, err := bolt.Open(filepath.Join(dir, "state.db"), 0666, nil)
|
db, err := bbolt.Open(filepath.Join(dir, "state.db"), 0666, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
os.RemoveAll(dir)
|
os.RemoveAll(dir)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -54,7 +54,7 @@ func TestUpgrade_NeedsUpgrade_Old(t *testing.T) {
|
|||||||
|
|
||||||
// Create the allocations bucket which exists in both the old and 0.9
|
// Create the allocations bucket which exists in both the old and 0.9
|
||||||
// schemas
|
// schemas
|
||||||
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
|
require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
|
||||||
_, err := tx.CreateBucket(allocationsBucketName)
|
_, err := tx.CreateBucket(allocationsBucketName)
|
||||||
return err
|
return err
|
||||||
}))
|
}))
|
||||||
@@ -91,7 +91,7 @@ func TestUpgrade_NeedsUpgrade_Error(t *testing.T) {
|
|||||||
db, cleanup := setupBoltDB(t)
|
db, cleanup := setupBoltDB(t)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
|
require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
|
||||||
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
|
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@@ -160,7 +160,7 @@ func TestUpgrade_upgradeTaskBucket_InvalidEntries(t *testing.T) {
|
|||||||
taskName := []byte("fake-task")
|
taskName := []byte("fake-task")
|
||||||
|
|
||||||
// Insert unexpected bucket, unexpected key, and missing simple-all
|
// Insert unexpected bucket, unexpected key, and missing simple-all
|
||||||
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
|
require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
|
||||||
bkt, err := tx.CreateBucket(taskName)
|
bkt, err := tx.CreateBucket(taskName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -174,7 +174,7 @@ func TestUpgrade_upgradeTaskBucket_InvalidEntries(t *testing.T) {
|
|||||||
return bkt.Put([]byte("unexepectedKey"), []byte{'x'})
|
return bkt.Put([]byte("unexepectedKey"), []byte{'x'})
|
||||||
}))
|
}))
|
||||||
|
|
||||||
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
|
require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
|
||||||
bkt := tx.Bucket(taskName)
|
bkt := tx.Bucket(taskName)
|
||||||
|
|
||||||
// upgradeTaskBucket should fail
|
// upgradeTaskBucket should fail
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
// BOLTdd contains a wrapper around BoltDB to deduplicate writes and encode
|
// Package boltdd contains a wrapper around BBoltDB to deduplicate writes and encode
|
||||||
// values using mgspack. (dd stands for DeDuplicate)
|
// values using mgspack. (dd stands for de-duplicate)
|
||||||
package boltdd
|
package boltdd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -8,9 +8,9 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
"golang.org/x/crypto/blake2b"
|
"golang.org/x/crypto/blake2b"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -37,19 +37,19 @@ func IsErrNotFound(e error) bool {
|
|||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB wraps an underlying bolt.DB to create write deduplicating buckets and
|
// DB wraps an underlying bolt.DB to create write de-duplicating buckets and
|
||||||
// msgpack encoded values.
|
// msgpack encoded values.
|
||||||
type DB struct {
|
type DB struct {
|
||||||
rootBuckets map[string]*bucketMeta
|
rootBuckets map[string]*bucketMeta
|
||||||
rootBucketsLock sync.Mutex
|
rootBucketsLock sync.Mutex
|
||||||
|
|
||||||
bdb *bolt.DB
|
boltDB *bbolt.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open a bolt.DB and wrap it in a write-deduplicating msgpack-encoding
|
// Open a bolt.DB and wrap it in a write-de-duplicating msgpack-encoding
|
||||||
// implementation.
|
// implementation.
|
||||||
func Open(path string, mode os.FileMode, options *bolt.Options) (*DB, error) {
|
func Open(path string, mode os.FileMode, options *bbolt.Options) (*DB, error) {
|
||||||
bdb, err := bolt.Open(path, mode, options)
|
bdb, err := bbolt.Open(path, mode, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -57,15 +57,15 @@ func Open(path string, mode os.FileMode, options *bolt.Options) (*DB, error) {
|
|||||||
return New(bdb), nil
|
return New(bdb), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// New deduplicating wrapper for the given boltdb.
|
// New de-duplicating wrapper for the given bboltdb.
|
||||||
func New(bdb *bolt.DB) *DB {
|
func New(bdb *bbolt.DB) *DB {
|
||||||
return &DB{
|
return &DB{
|
||||||
rootBuckets: make(map[string]*bucketMeta),
|
rootBuckets: make(map[string]*bucketMeta),
|
||||||
bdb: bdb,
|
boltDB: bdb,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) bucket(btx *bolt.Tx, name []byte) *Bucket {
|
func (db *DB) bucket(btx *bbolt.Tx, name []byte) *Bucket {
|
||||||
bb := btx.Bucket(name)
|
bb := btx.Bucket(name)
|
||||||
if bb == nil {
|
if bb == nil {
|
||||||
return nil
|
return nil
|
||||||
@@ -87,7 +87,7 @@ func (db *DB) bucket(btx *bolt.Tx, name []byte) *Bucket {
|
|||||||
return newBucket(b, bb)
|
return newBucket(b, bb)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) createBucket(btx *bolt.Tx, name []byte) (*Bucket, error) {
|
func (db *DB) createBucket(btx *bbolt.Tx, name []byte) (*Bucket, error) {
|
||||||
bb, err := btx.CreateBucket(name)
|
bb, err := btx.CreateBucket(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -99,7 +99,7 @@ func (db *DB) createBucket(btx *bolt.Tx, name []byte) (*Bucket, error) {
|
|||||||
// While creating a bucket on a closed db would error, we must recheck
|
// While creating a bucket on a closed db would error, we must recheck
|
||||||
// after acquiring the lock to avoid races.
|
// after acquiring the lock to avoid races.
|
||||||
if db.isClosed() {
|
if db.isClosed() {
|
||||||
return nil, bolt.ErrDatabaseNotOpen
|
return nil, bbolt.ErrDatabaseNotOpen
|
||||||
}
|
}
|
||||||
|
|
||||||
// Always create a new Bucket since CreateBucket above fails if the
|
// Always create a new Bucket since CreateBucket above fails if the
|
||||||
@@ -110,7 +110,7 @@ func (db *DB) createBucket(btx *bolt.Tx, name []byte) (*Bucket, error) {
|
|||||||
return newBucket(b, bb), nil
|
return newBucket(b, bb), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) createBucketIfNotExists(btx *bolt.Tx, name []byte) (*Bucket, error) {
|
func (db *DB) createBucketIfNotExists(btx *bbolt.Tx, name []byte) (*Bucket, error) {
|
||||||
bb, err := btx.CreateBucketIfNotExists(name)
|
bb, err := btx.CreateBucketIfNotExists(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -122,7 +122,7 @@ func (db *DB) createBucketIfNotExists(btx *bolt.Tx, name []byte) (*Bucket, error
|
|||||||
// While creating a bucket on a closed db would error, we must recheck
|
// While creating a bucket on a closed db would error, we must recheck
|
||||||
// after acquiring the lock to avoid races.
|
// after acquiring the lock to avoid races.
|
||||||
if db.isClosed() {
|
if db.isClosed() {
|
||||||
return nil, bolt.ErrDatabaseNotOpen
|
return nil, bbolt.ErrDatabaseNotOpen
|
||||||
}
|
}
|
||||||
|
|
||||||
b, ok := db.rootBuckets[string(name)]
|
b, ok := db.rootBuckets[string(name)]
|
||||||
@@ -135,21 +135,21 @@ func (db *DB) createBucketIfNotExists(btx *bolt.Tx, name []byte) (*Bucket, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Update(fn func(*Tx) error) error {
|
func (db *DB) Update(fn func(*Tx) error) error {
|
||||||
return db.bdb.Update(func(btx *bolt.Tx) error {
|
return db.boltDB.Update(func(btx *bbolt.Tx) error {
|
||||||
tx := newTx(db, btx)
|
tx := newTx(db, btx)
|
||||||
return fn(tx)
|
return fn(tx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Batch(fn func(*Tx) error) error {
|
func (db *DB) Batch(fn func(*Tx) error) error {
|
||||||
return db.bdb.Batch(func(btx *bolt.Tx) error {
|
return db.boltDB.Batch(func(btx *bbolt.Tx) error {
|
||||||
tx := newTx(db, btx)
|
tx := newTx(db, btx)
|
||||||
return fn(tx)
|
return fn(tx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) View(fn func(*Tx) error) error {
|
func (db *DB) View(fn func(*Tx) error) error {
|
||||||
return db.bdb.View(func(btx *bolt.Tx) error {
|
return db.boltDB.View(func(btx *bbolt.Tx) error {
|
||||||
tx := newTx(db, btx)
|
tx := newTx(db, btx)
|
||||||
return fn(tx)
|
return fn(tx)
|
||||||
})
|
})
|
||||||
@@ -167,20 +167,20 @@ func (db *DB) Close() error {
|
|||||||
db.rootBucketsLock.Lock()
|
db.rootBucketsLock.Lock()
|
||||||
db.rootBuckets = nil
|
db.rootBuckets = nil
|
||||||
db.rootBucketsLock.Unlock()
|
db.rootBucketsLock.Unlock()
|
||||||
return db.bdb.Close()
|
return db.boltDB.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// BoltDB returns the underlying bolt.DB.
|
// BoltDB returns the underlying bolt.DB.
|
||||||
func (db *DB) BoltDB() *bolt.DB {
|
func (db *DB) BoltDB() *bbolt.DB {
|
||||||
return db.bdb
|
return db.boltDB
|
||||||
}
|
}
|
||||||
|
|
||||||
type Tx struct {
|
type Tx struct {
|
||||||
db *DB
|
db *DB
|
||||||
btx *bolt.Tx
|
btx *bbolt.Tx
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTx(db *DB, btx *bolt.Tx) *Tx {
|
func newTx(db *DB, btx *bbolt.Tx) *Tx {
|
||||||
return &Tx{
|
return &Tx{
|
||||||
db: db,
|
db: db,
|
||||||
btx: btx,
|
btx: btx,
|
||||||
@@ -208,7 +208,7 @@ func (tx *Tx) Writable() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// BoltTx returns the underlying bolt.Tx.
|
// BoltTx returns the underlying bolt.Tx.
|
||||||
func (tx *Tx) BoltTx() *bolt.Tx {
|
func (tx *Tx) BoltTx() *bbolt.Tx {
|
||||||
return tx.btx
|
return tx.btx
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -290,12 +290,12 @@ func (bm *bucketMeta) getOrCreateBucket(name []byte) *bucketMeta {
|
|||||||
|
|
||||||
type Bucket struct {
|
type Bucket struct {
|
||||||
bm *bucketMeta
|
bm *bucketMeta
|
||||||
boltBucket *bolt.Bucket
|
boltBucket *bbolt.Bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
// newBucket creates a new view into a bucket backed by a boltdb
|
// newBucket creates a new view into a bucket backed by a boltdb
|
||||||
// transaction.
|
// transaction.
|
||||||
func newBucket(b *bucketMeta, bb *bolt.Bucket) *Bucket {
|
func newBucket(b *bucketMeta, bb *bbolt.Bucket) *Bucket {
|
||||||
return &Bucket{
|
return &Bucket{
|
||||||
bm: b,
|
bm: b,
|
||||||
boltBucket: bb,
|
boltBucket: bb,
|
||||||
@@ -408,7 +408,7 @@ func (b *Bucket) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
|
|||||||
func (b *Bucket) DeleteBucket(name []byte) error {
|
func (b *Bucket) DeleteBucket(name []byte) error {
|
||||||
// Delete the bucket from the underlying boltdb
|
// Delete the bucket from the underlying boltdb
|
||||||
err := b.boltBucket.DeleteBucket(name)
|
err := b.boltBucket.DeleteBucket(name)
|
||||||
if err == bolt.ErrBucketNotFound {
|
if err == bbolt.ErrBucketNotFound {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -419,6 +419,6 @@ func (b *Bucket) DeleteBucket(name []byte) error {
|
|||||||
|
|
||||||
// BoltBucket returns the internal bolt.Bucket for this Bucket. Only valid
|
// BoltBucket returns the internal bolt.Bucket for this Bucket. Only valid
|
||||||
// for the duration of the current transaction.
|
// for the duration of the current transaction.
|
||||||
func (b *Bucket) BoltBucket() *bolt.Bucket {
|
func (b *Bucket) BoltBucket() *bbolt.Bucket {
|
||||||
return b.boltBucket
|
return b.boltBucket
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,11 +8,11 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
"github.com/hashicorp/nomad/nomad/mock"
|
"github.com/hashicorp/nomad/nomad/mock"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
type testingT interface {
|
type testingT interface {
|
||||||
@@ -66,12 +66,12 @@ func TestDB_Close(t *testing.T) {
|
|||||||
require.Equal(t, db.Update(func(tx *Tx) error {
|
require.Equal(t, db.Update(func(tx *Tx) error {
|
||||||
_, err := tx.CreateBucketIfNotExists([]byte("foo"))
|
_, err := tx.CreateBucketIfNotExists([]byte("foo"))
|
||||||
return err
|
return err
|
||||||
}), bolt.ErrDatabaseNotOpen)
|
}), bbolt.ErrDatabaseNotOpen)
|
||||||
|
|
||||||
require.Equal(t, db.Update(func(tx *Tx) error {
|
require.Equal(t, db.Update(func(tx *Tx) error {
|
||||||
_, err := tx.CreateBucket([]byte("foo"))
|
_, err := tx.CreateBucket([]byte("foo"))
|
||||||
return err
|
return err
|
||||||
}), bolt.ErrDatabaseNotOpen)
|
}), bbolt.ErrDatabaseNotOpen)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBucket_Create(t *testing.T) {
|
func TestBucket_Create(t *testing.T) {
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import (
|
|||||||
"github.com/hashicorp/nomad/nomad"
|
"github.com/hashicorp/nomad/nomad"
|
||||||
"github.com/hashicorp/nomad/nomad/state"
|
"github.com/hashicorp/nomad/nomad/state"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
raftboltdb "github.com/hashicorp/raft-boltdb"
|
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrNoMoreLogs = fmt.Errorf("no more logs")
|
var ErrNoMoreLogs = fmt.Errorf("no more logs")
|
||||||
|
|||||||
@@ -9,11 +9,11 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
raftboltdb "github.com/hashicorp/raft-boltdb"
|
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -24,7 +24,7 @@ var (
|
|||||||
func RaftStateInfo(p string) (store *raftboltdb.BoltStore, firstIdx uint64, lastIdx uint64, err error) {
|
func RaftStateInfo(p string) (store *raftboltdb.BoltStore, firstIdx uint64, lastIdx uint64, err error) {
|
||||||
opts := raftboltdb.Options{
|
opts := raftboltdb.Options{
|
||||||
Path: p,
|
Path: p,
|
||||||
BoltOptions: &bolt.Options{
|
BoltOptions: &bbolt.Options{
|
||||||
ReadOnly: true,
|
ReadOnly: true,
|
||||||
Timeout: 1 * time.Second,
|
Timeout: 1 * time.Second,
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user