mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Implement 'batch mode' for persisting allocations on the client. (#9093)
Fixes #9047, see problem details there. As a solution, we use BoltDB's 'Batch' mode that combines multiple parallel writes into small number of transactions. See https://github.com/boltdb/bolt#batch-read-write-transactions for more information.
This commit is contained in:
committed by
GitHub
parent
fe8ffd0762
commit
1be5243d08
@@ -888,7 +888,7 @@ func (ar *allocRunner) PersistState() error {
|
||||
defer ar.destroyedLock.Unlock()
|
||||
|
||||
if ar.destroyed {
|
||||
err := ar.stateDB.DeleteAllocationBucket(ar.id)
|
||||
err := ar.stateDB.DeleteAllocationBucket(ar.id, cstate.WithBatchMode())
|
||||
if err != nil {
|
||||
ar.logger.Warn("failed to delete allocation bucket", "error", err)
|
||||
}
|
||||
@@ -896,23 +896,25 @@ func (ar *allocRunner) PersistState() error {
|
||||
}
|
||||
|
||||
// persist network status, wrapping in a func to release state lock as early as possible
|
||||
if err := func() error {
|
||||
err := func() error {
|
||||
ar.stateLock.Lock()
|
||||
defer ar.stateLock.Unlock()
|
||||
if ar.state.NetworkStatus != nil {
|
||||
if err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus); err != nil {
|
||||
err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus, cstate.WithBatchMode())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: consider persisting deployment state along with task status.
|
||||
// While we study why only the alloc is persisted, I opted to maintain current
|
||||
// behavior and not risk adding yet more IO calls unnecessarily.
|
||||
return ar.stateDB.PutAllocation(ar.Alloc())
|
||||
return ar.stateDB.PutAllocation(ar.Alloc(), cstate.WithBatchMode())
|
||||
}
|
||||
|
||||
// Destroy the alloc runner by stopping it if it is still running and cleaning
|
||||
|
||||
@@ -4,7 +4,9 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
@@ -57,7 +59,7 @@ func testDB(t *testing.T, f func(*testing.T, StateDB)) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestStateDB asserts the behavior of GetAllAllocations, PutAllocation, and
|
||||
// TestStateDB_Allocations asserts the behavior of GetAllAllocations, PutAllocation, and
|
||||
// DeleteAllocationBucket for all operational StateDB implementations.
|
||||
func TestStateDB_Allocations(t *testing.T) {
|
||||
t.Parallel()
|
||||
@@ -137,6 +139,119 @@ func TestStateDB_Allocations(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// Integer division, rounded up.
|
||||
func ceilDiv(a, b int) int {
|
||||
return (a + b - 1) / b
|
||||
}
|
||||
|
||||
// TestStateDB_Batch asserts the behavior of PutAllocation, PutNetworkStatus and
|
||||
// DeleteAllocationBucket in batch mode, for all operational StateDB implementations.
|
||||
func TestStateDB_Batch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testDB(t, func(t *testing.T, db StateDB) {
|
||||
require := require.New(t)
|
||||
|
||||
// For BoltDB, get initial tx_id
|
||||
var getTxID func() int
|
||||
var prevTxID int
|
||||
var batchDelay time.Duration
|
||||
var batchSize int
|
||||
if boltStateDB, ok := db.(*BoltStateDB); ok {
|
||||
boltdb := boltStateDB.DB().BoltDB()
|
||||
getTxID = func() int {
|
||||
tx, err := boltdb.Begin(true)
|
||||
require.NoError(err)
|
||||
defer tx.Rollback()
|
||||
return tx.ID()
|
||||
}
|
||||
prevTxID = getTxID()
|
||||
batchDelay = boltdb.MaxBatchDelay
|
||||
batchSize = boltdb.MaxBatchSize
|
||||
}
|
||||
|
||||
// Write 1000 allocations and network statuses in batch mode
|
||||
startTime := time.Now()
|
||||
const numAllocs = 1000
|
||||
var allocs []*structs.Allocation
|
||||
for i := 0; i < numAllocs; i++ {
|
||||
allocs = append(allocs, mock.Alloc())
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for _, alloc := range allocs {
|
||||
wg.Add(1)
|
||||
go func(alloc *structs.Allocation) {
|
||||
require.NoError(db.PutNetworkStatus(alloc.ID, mock.AllocNetworkStatus(), WithBatchMode()))
|
||||
require.NoError(db.PutAllocation(alloc, WithBatchMode()))
|
||||
wg.Done()
|
||||
}(alloc)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Check BoltDB actually combined PutAllocation calls into much fewer transactions.
|
||||
// The actual number of transactions depends on how fast the goroutines are spawned,
|
||||
// with every batchDelay (10ms by default) period saved in a separate transaction,
|
||||
// plus each transaction is limited to batchSize writes (1000 by default).
|
||||
// See boltdb MaxBatchDelay and MaxBatchSize parameters for more details.
|
||||
if getTxID != nil {
|
||||
numTransactions := getTxID() - prevTxID
|
||||
writeTime := time.Now().Sub(startTime)
|
||||
expectedNumTransactions := ceilDiv(2 * numAllocs, batchSize) + ceilDiv(int(writeTime), int(batchDelay))
|
||||
require.LessOrEqual(numTransactions, expectedNumTransactions)
|
||||
prevTxID = getTxID()
|
||||
}
|
||||
|
||||
// Retrieve allocs and make sure they are the same (order can differ)
|
||||
readAllocs, errs, err := db.GetAllAllocations()
|
||||
require.NoError(err)
|
||||
require.NotNil(readAllocs)
|
||||
require.Len(readAllocs, len(allocs))
|
||||
require.NotNil(errs)
|
||||
require.Empty(errs)
|
||||
|
||||
readAllocsById := make(map[string]*structs.Allocation)
|
||||
for _, readAlloc := range readAllocs {
|
||||
readAllocsById[readAlloc.ID] = readAlloc
|
||||
}
|
||||
for _, alloc := range allocs {
|
||||
readAlloc, ok := readAllocsById[alloc.ID]
|
||||
if !ok {
|
||||
t.Fatalf("no alloc with ID=%q", alloc.ID)
|
||||
}
|
||||
if !reflect.DeepEqual(readAlloc, alloc) {
|
||||
pretty.Ldiff(t, readAlloc, alloc)
|
||||
t.Fatalf("alloc %q unequal", alloc.ID)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete all allocs in batch mode
|
||||
startTime = time.Now()
|
||||
for _, alloc := range allocs {
|
||||
wg.Add(1)
|
||||
go func(alloc *structs.Allocation) {
|
||||
require.NoError(db.DeleteAllocationBucket(alloc.ID, WithBatchMode()))
|
||||
wg.Done()
|
||||
}(alloc)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Check BoltDB combined DeleteAllocationBucket calls into much fewer transactions.
|
||||
if getTxID != nil {
|
||||
numTransactions := getTxID() - prevTxID
|
||||
writeTime := time.Now().Sub(startTime)
|
||||
expectedNumTransactions := ceilDiv(numAllocs, batchSize) + ceilDiv(int(writeTime), int(batchDelay))
|
||||
require.LessOrEqual(numTransactions, expectedNumTransactions)
|
||||
prevTxID = getTxID()
|
||||
}
|
||||
|
||||
// Check all allocs were deleted.
|
||||
readAllocs, errs, err = db.GetAllAllocations()
|
||||
require.NoError(err)
|
||||
require.Empty(readAllocs)
|
||||
require.Empty(errs)
|
||||
})
|
||||
}
|
||||
|
||||
// TestStateDB_TaskState asserts the behavior of task state related StateDB
|
||||
// methods.
|
||||
func TestStateDB_TaskState(t *testing.T) {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
"github.com/hashicorp/nomad/client/dynamicplugins"
|
||||
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
@@ -27,7 +28,7 @@ func (m *ErrDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er
|
||||
return m.Allocs, nil, nil
|
||||
}
|
||||
|
||||
func (m *ErrDB) PutAllocation(alloc *structs.Allocation) error {
|
||||
func (m *ErrDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error {
|
||||
return fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
@@ -43,7 +44,7 @@ func (m *ErrDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e
|
||||
return nil, fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error {
|
||||
func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error {
|
||||
return fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
@@ -63,7 +64,7 @@ func (m *ErrDB) DeleteTaskBucket(allocID, taskName string) error {
|
||||
return fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
func (m *ErrDB) DeleteAllocationBucket(allocID string) error {
|
||||
func (m *ErrDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error {
|
||||
return fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
@@ -71,6 +72,14 @@ func (m *ErrDB) PutDevicePluginState(ps *dmstate.PluginState) error {
|
||||
return fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
func (m *ErrDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) {
|
||||
return nil, fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
func (m *ErrDB) PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error {
|
||||
return fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
// GetDevicePluginState stores the device manager's plugin state or returns an
|
||||
// error.
|
||||
func (m *ErrDB) GetDevicePluginState() (*dmstate.PluginState, error) {
|
||||
@@ -88,3 +97,6 @@ func (m *ErrDB) PutDriverPluginState(ps *driverstate.PluginState) error {
|
||||
func (m *ErrDB) Close() error {
|
||||
return fmt.Errorf("Error!")
|
||||
}
|
||||
|
||||
// Ensure *ErrDB implements StateDB
|
||||
var _ StateDB = (*ErrDB)(nil)
|
||||
|
||||
@@ -24,9 +24,9 @@ type StateDB interface {
|
||||
// If a single error is returned then both allocations and the map will be nil.
|
||||
GetAllAllocations() ([]*structs.Allocation, map[string]error, error)
|
||||
|
||||
// PulAllocation stores an allocation or returns an error if it could
|
||||
// PutAllocation stores an allocation or returns an error if it could
|
||||
// not be stored.
|
||||
PutAllocation(*structs.Allocation) error
|
||||
PutAllocation(*structs.Allocation, ...WriteOption) error
|
||||
|
||||
// Get/Put DeploymentStatus get and put the allocation's deployment
|
||||
// status. It may be nil.
|
||||
@@ -36,7 +36,7 @@ type StateDB interface {
|
||||
// Get/Put NetworkStatus get and put the allocation's network
|
||||
// status. It may be nil.
|
||||
GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error)
|
||||
PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error
|
||||
PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error
|
||||
|
||||
// GetTaskRunnerState returns the LocalState and TaskState for a
|
||||
// TaskRunner. Either state may be nil if it is not found, but if an
|
||||
@@ -57,7 +57,7 @@ type StateDB interface {
|
||||
|
||||
// DeleteAllocationBucket deletes an allocation's state bucket if it
|
||||
// exists. No error is returned if it does not exist.
|
||||
DeleteAllocationBucket(allocID string) error
|
||||
DeleteAllocationBucket(allocID string, opts ...WriteOption) error
|
||||
|
||||
// GetDevicePluginState is used to retrieve the device manager's plugin
|
||||
// state.
|
||||
@@ -78,10 +78,43 @@ type StateDB interface {
|
||||
// GetDynamicPluginRegistryState is used to retrieve a dynamic plugin manager's state.
|
||||
GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error)
|
||||
|
||||
// PutDynamicPluginRegistryState is used to store the dynamic plugin managers's state.
|
||||
// PutDynamicPluginRegistryState is used to store the dynamic plugin manager's state.
|
||||
PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error
|
||||
|
||||
// Close the database. Unsafe for further use after calling regardless
|
||||
// of return value.
|
||||
Close() error
|
||||
}
|
||||
|
||||
// WriteOptions adjusts the way the data is persisted by the StateDB above. Default is
|
||||
// zero/false values for all fields. To provide different values, use With* functions
|
||||
// below, like this: statedb.PutAllocation(alloc, WithBatchMode())
|
||||
type WriteOptions struct {
|
||||
// In Batch mode, concurrent writes (Put* and Delete* operations above) are
|
||||
// coalesced into a single transaction, increasing write performance. To benefit
|
||||
// from this mode, writes must happen concurrently in goroutines, as every write
|
||||
// request still waits for the shared transaction to commit before returning.
|
||||
// See https://github.com/boltdb/bolt#batch-read-write-transactions for details.
|
||||
// This mode is only supported for BoltDB state backend and is ignored in other backends.
|
||||
BatchMode bool
|
||||
}
|
||||
|
||||
// WriteOption is a function that modifies WriteOptions struct above.
|
||||
type WriteOption func(*WriteOptions)
|
||||
|
||||
// mergeWriteOptions creates a final WriteOptions struct to be used by the write methods above
|
||||
// from a list of WriteOption-s provided as variadic arguments.
|
||||
func mergeWriteOptions(opts []WriteOption) WriteOptions {
|
||||
writeOptions := WriteOptions{} // Default WriteOptions is zero value.
|
||||
for _, opt := range opts {
|
||||
opt(&writeOptions)
|
||||
}
|
||||
return writeOptions
|
||||
}
|
||||
|
||||
// Enable Batch mode for write requests (Put* and Delete* operations above).
|
||||
func WithBatchMode() WriteOption {
|
||||
return func(s *WriteOptions) {
|
||||
s.BatchMode = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ func (m *MemDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er
|
||||
return allocs, map[string]error{}, nil
|
||||
}
|
||||
|
||||
func (m *MemDB) PutAllocation(alloc *structs.Allocation) error {
|
||||
func (m *MemDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.allocs[alloc.ID] = alloc
|
||||
@@ -99,7 +99,7 @@ func (m *MemDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e
|
||||
return m.networkStatus[allocID], nil
|
||||
}
|
||||
|
||||
func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error {
|
||||
func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error {
|
||||
m.mu.Lock()
|
||||
m.networkStatus[allocID] = ns
|
||||
defer m.mu.Unlock()
|
||||
@@ -175,7 +175,7 @@ func (m *MemDB) DeleteTaskBucket(allocID, taskName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemDB) DeleteAllocationBucket(allocID string) error {
|
||||
func (m *MemDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ func (n NoopDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (n NoopDB) PutAllocation(*structs.Allocation) error {
|
||||
func (n NoopDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ func (n NoopDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus) error {
|
||||
func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus, opts ...WriteOption) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ func (n NoopDB) DeleteTaskBucket(allocID, taskName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n NoopDB) DeleteAllocationBucket(allocID string) error {
|
||||
func (n NoopDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -234,8 +234,8 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m
|
||||
}
|
||||
|
||||
// PutAllocation stores an allocation or returns an error.
|
||||
func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error {
|
||||
return s.updateWithOptions(opts, func(tx *boltdd.Tx) error {
|
||||
// Retrieve the root allocations bucket
|
||||
allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucketName)
|
||||
if err != nil {
|
||||
@@ -321,8 +321,8 @@ type networkStatusEntry struct {
|
||||
|
||||
// PutDeploymentStatus stores an allocation's DeploymentStatus or returns an
|
||||
// error.
|
||||
func (s *BoltStateDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
func (s *BoltStateDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus, opts ...WriteOption) error {
|
||||
return s.updateWithOptions(opts, func(tx *boltdd.Tx) error {
|
||||
return putNetworkStatusImpl(tx, allocID, ds)
|
||||
})
|
||||
}
|
||||
@@ -493,8 +493,8 @@ func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error {
|
||||
}
|
||||
|
||||
// DeleteAllocationBucket is used to delete an allocation bucket if it exists.
|
||||
func (s *BoltStateDB) DeleteAllocationBucket(allocID string) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
func (s *BoltStateDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error {
|
||||
return s.updateWithOptions(opts, func(tx *boltdd.Tx) error {
|
||||
// Retrieve the root allocations bucket
|
||||
allocations := tx.Bucket(allocationsBucketName)
|
||||
if allocations == nil {
|
||||
@@ -725,6 +725,19 @@ func (s *BoltStateDB) init() error {
|
||||
})
|
||||
}
|
||||
|
||||
// updateWithOptions enables adjustments to db.Update operation, including Batch mode.
|
||||
func (s *BoltStateDB) updateWithOptions(opts []WriteOption, updateFn func(tx *boltdd.Tx) error) error {
|
||||
writeOpts := mergeWriteOptions(opts)
|
||||
|
||||
if writeOpts.BatchMode {
|
||||
// In Batch mode, BoltDB opportunistically combines multiple concurrent writes into one or
|
||||
// several transactions. See boltdb.Batch() documentation for details.
|
||||
return s.db.Batch(updateFn)
|
||||
} else {
|
||||
return s.db.Update(updateFn)
|
||||
}
|
||||
}
|
||||
|
||||
// Upgrade bolt state db from 0.8 schema to 0.9 schema. Noop if already using
|
||||
// 0.9 schema. Creates a backup before upgrading.
|
||||
func (s *BoltStateDB) Upgrade() error {
|
||||
|
||||
@@ -141,6 +141,13 @@ func (db *DB) Update(fn func(*Tx) error) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (db *DB) Batch(fn func(*Tx) error) error {
|
||||
return db.bdb.Batch(func(btx *bolt.Tx) error {
|
||||
tx := newTx(db, btx)
|
||||
return fn(tx)
|
||||
})
|
||||
}
|
||||
|
||||
func (db *DB) View(fn func(*Tx) error) error {
|
||||
return db.bdb.View(func(btx *bolt.Tx) error {
|
||||
tx := newTx(db, btx)
|
||||
|
||||
@@ -1493,3 +1493,15 @@ func Events(index uint64) *structs.Events {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func AllocNetworkStatus() *structs.AllocNetworkStatus {
|
||||
return &structs.AllocNetworkStatus{
|
||||
InterfaceName: "eth0",
|
||||
Address: "192.168.0.100",
|
||||
DNS: &structs.DNSConfig{
|
||||
Servers: []string{"1.1.1.1"},
|
||||
Searches: []string{"localdomain"},
|
||||
Options: []string{"ndots:5"},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user