mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
memdb: testing writer concurrency
This commit is contained in:
@@ -36,13 +36,13 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) {
|
||||
// Txn is used to start a new transaction, in either read or write mode.
|
||||
// There can only be a single concurrent writer, but any number of readers.
|
||||
func (db *MemDB) Txn(write bool) *Txn {
|
||||
txn := &Txn{
|
||||
db: db,
|
||||
write: write,
|
||||
root: db.root,
|
||||
}
|
||||
if write {
|
||||
txn.rootTxn = txn.root.Txn()
|
||||
db.writer.Lock()
|
||||
}
|
||||
txn := &Txn{
|
||||
db: db,
|
||||
write: write,
|
||||
rootTxn: db.root.Txn(),
|
||||
}
|
||||
return txn
|
||||
}
|
||||
|
||||
41
nomad/memdb/memdb_test.go
Normal file
41
nomad/memdb/memdb_test.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package memdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMemDB_SingleWriter_MultiReader(t *testing.T) {
|
||||
db, err := NewMemDB(testValidSchema())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
tx1 := db.Txn(true)
|
||||
tx2 := db.Txn(false) // Should not block!
|
||||
tx3 := db.Txn(false) // Should not block!
|
||||
tx4 := db.Txn(false) // Should not block!
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
db.Txn(true)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-doneCh:
|
||||
t.Fatalf("should not allow another writer")
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
}
|
||||
|
||||
tx1.Abort()
|
||||
tx2.Abort()
|
||||
tx3.Abort()
|
||||
tx4.Abort()
|
||||
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
t.Fatalf("should allow another writer")
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
type Txn struct {
|
||||
db *MemDB
|
||||
write bool
|
||||
root *iradix.Tree
|
||||
rootTxn *iradix.Txn
|
||||
}
|
||||
|
||||
@@ -22,14 +21,15 @@ func (txn *Txn) Abort() {
|
||||
}
|
||||
|
||||
// Check if already aborted or committed
|
||||
if txn.root == nil {
|
||||
if txn.rootTxn == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Clear the txn
|
||||
txn.rootTxn = nil
|
||||
|
||||
// Release the writer lock since this is invalid
|
||||
txn.db.writer.Unlock()
|
||||
txn.root = nil
|
||||
txn.rootTxn = nil
|
||||
}
|
||||
|
||||
// Commit is used to finalize this transaction. This is a noop for read transactions.
|
||||
@@ -40,7 +40,7 @@ func (txn *Txn) Commit() {
|
||||
}
|
||||
|
||||
// Check if already aborted or committed
|
||||
if txn.root == nil {
|
||||
if txn.rootTxn == nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -48,7 +48,6 @@ func (txn *Txn) Commit() {
|
||||
txn.db.root = txn.rootTxn.Commit()
|
||||
|
||||
// Clear the txn
|
||||
txn.root = nil
|
||||
txn.rootTxn = nil
|
||||
|
||||
// Release the writer lock since this is invalid
|
||||
|
||||
Reference in New Issue
Block a user