mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
memdb: initial skeleton
This commit is contained in:
32
nomad/memdb/index.go
Normal file
32
nomad/memdb/index.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package memdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// StringFieldIndex is used to extract a field from an object
|
||||
// using reflection and builds an index on that field.
|
||||
func StringFieldIndex(field string, lowercase bool) IndexerFunc {
|
||||
return func(obj interface{}) (bool, []byte, error) {
|
||||
v := reflect.ValueOf(obj)
|
||||
v = reflect.Indirect(v) // Derefence the pointer if any
|
||||
|
||||
fv := v.FieldByName(field)
|
||||
if !fv.IsValid() {
|
||||
return false, nil,
|
||||
fmt.Errorf("field '%s' for %#v is invalid", field, obj)
|
||||
}
|
||||
|
||||
val := fv.String()
|
||||
if val == "" {
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
if lowercase {
|
||||
val = strings.ToLower(val)
|
||||
}
|
||||
return true, []byte(val), nil
|
||||
}
|
||||
}
|
||||
62
nomad/memdb/index_test.go
Normal file
62
nomad/memdb/index_test.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package memdb
|
||||
|
||||
import "testing"
|
||||
|
||||
type TestObject struct {
|
||||
Foo string
|
||||
Bar int
|
||||
Baz string
|
||||
Empty string
|
||||
}
|
||||
|
||||
func testObj() *TestObject {
|
||||
obj := &TestObject{
|
||||
Foo: "Testing",
|
||||
Bar: 42,
|
||||
Baz: "yep",
|
||||
}
|
||||
return obj
|
||||
}
|
||||
|
||||
func TestStringFieldIndex(t *testing.T) {
|
||||
obj := testObj()
|
||||
indexer := StringFieldIndex("Foo", false)
|
||||
|
||||
ok, val, err := indexer(obj)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if string(val) != "Testing" {
|
||||
t.Fatalf("bad: %s", val)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("should be ok")
|
||||
}
|
||||
|
||||
lower := StringFieldIndex("Foo", true)
|
||||
ok, val, err = lower(obj)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if string(val) != "testing" {
|
||||
t.Fatalf("bad: %s", val)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("should be ok")
|
||||
}
|
||||
|
||||
badField := StringFieldIndex("NA", true)
|
||||
ok, val, err = badField(obj)
|
||||
if err == nil {
|
||||
t.Fatalf("should get error")
|
||||
}
|
||||
|
||||
emptyField := StringFieldIndex("Empty", true)
|
||||
ok, val, err = emptyField(obj)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("should not ok")
|
||||
}
|
||||
}
|
||||
48
nomad/memdb/memdb.go
Normal file
48
nomad/memdb/memdb.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package memdb
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-immutable-radix"
|
||||
)
|
||||
|
||||
// MemDB is an in-memory database. It provides a table abstraction,
|
||||
// which is used to store objects (rows) with multiple indexes based
|
||||
// on values. The database makes use of immutable radix trees to provide
|
||||
// transactions and MVCC.
|
||||
type MemDB struct {
|
||||
schema *DBSchema
|
||||
root *iradix.Tree
|
||||
|
||||
// There can only be a single writter at once
|
||||
writer sync.Mutex
|
||||
}
|
||||
|
||||
// NewMemDB creates a new MemDB with the given schema
|
||||
func NewMemDB(schema *DBSchema) (*MemDB, error) {
|
||||
// Validate the schema
|
||||
if err := schema.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create the MemDB
|
||||
db := &MemDB{
|
||||
schema: schema,
|
||||
root: iradix.New(),
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// Txn is used to start a new transaction, in either read or write mode.
|
||||
// There can only be a single concurrent writer, but any number of readers.
|
||||
func (db *MemDB) Txn(write bool) *Txn {
|
||||
txn := &Txn{
|
||||
db: db,
|
||||
write: write,
|
||||
root: db.root,
|
||||
}
|
||||
if write {
|
||||
txn.rootTxn = txn.root.Txn()
|
||||
}
|
||||
return txn
|
||||
}
|
||||
68
nomad/memdb/schema.go
Normal file
68
nomad/memdb/schema.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package memdb
|
||||
|
||||
import "fmt"
|
||||
|
||||
// DBSchema contains the full database schema used for MemDB
|
||||
type DBSchema struct {
|
||||
Tables []*TableSchema
|
||||
}
|
||||
|
||||
// Validate is used to validate the database schema
|
||||
func (s *DBSchema) Validate() error {
|
||||
if s == nil {
|
||||
return fmt.Errorf("missing schema")
|
||||
}
|
||||
if len(s.Tables) == 0 {
|
||||
return fmt.Errorf("no tables defined")
|
||||
}
|
||||
for _, table := range s.Tables {
|
||||
if err := table.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TableSchema contains the schema for a single table
|
||||
type TableSchema struct {
|
||||
Name string
|
||||
Indexes []*IndexSchema
|
||||
}
|
||||
|
||||
// Validate is used to validate the table schema
|
||||
func (s *TableSchema) Validate() error {
|
||||
if s.Name == "" {
|
||||
return fmt.Errorf("missing table name")
|
||||
}
|
||||
if len(s.Indexes) == 0 {
|
||||
return fmt.Errorf("missing table schemas for '%s'", s.Name)
|
||||
}
|
||||
for _, index := range s.Indexes {
|
||||
if err := index.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IndexerFunc is used to extract an index value from an
|
||||
// object or to indicate that the index value is missing.
|
||||
type IndexerFunc func(interface{}) (bool, []byte, error)
|
||||
|
||||
// IndexSchema contains the schema for an index
|
||||
type IndexSchema struct {
|
||||
Name string
|
||||
AllowMissing bool
|
||||
Unique bool
|
||||
Indexer IndexerFunc
|
||||
}
|
||||
|
||||
func (s *IndexSchema) Validate() error {
|
||||
if s.Name == "" {
|
||||
return fmt.Errorf("missing index name")
|
||||
}
|
||||
if s.Indexer == nil {
|
||||
return fmt.Errorf("missing index function for '%s'", s.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
79
nomad/memdb/txn.go
Normal file
79
nomad/memdb/txn.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package memdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/go-immutable-radix"
|
||||
)
|
||||
|
||||
// Txn is a transaction against a MemDB. This can be a read or write transaction.
|
||||
type Txn struct {
|
||||
db *MemDB
|
||||
write bool
|
||||
root *iradix.Tree
|
||||
rootTxn *iradix.Txn
|
||||
}
|
||||
|
||||
// Abort is used to cancel this transaction. This is a noop for read transactions.
|
||||
func (txn *Txn) Abort() {
|
||||
// Noop for a read transaction
|
||||
if !txn.write {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if already aborted or committed
|
||||
if txn.root == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Release the writer lock since this is invalid
|
||||
txn.db.writer.Unlock()
|
||||
txn.root = nil
|
||||
txn.rootTxn = nil
|
||||
}
|
||||
|
||||
// Commit is used to finalize this transaction. This is a noop for read transactions.
|
||||
func (txn *Txn) Commit() {
|
||||
// Noop for a read transaction
|
||||
if !txn.write {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if already aborted or committed
|
||||
if txn.root == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Update the root of the DB
|
||||
txn.db.root = txn.rootTxn.Commit()
|
||||
|
||||
// Clear the txn
|
||||
txn.root = nil
|
||||
txn.rootTxn = nil
|
||||
|
||||
// Release the writer lock since this is invalid
|
||||
txn.db.writer.Unlock()
|
||||
}
|
||||
|
||||
// Insert is used to add or update an object into the given table
|
||||
func (txn *Txn) Insert(table string, obj interface{}) error {
|
||||
if !txn.write {
|
||||
return fmt.Errorf("cannot insert in read-only transaction")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *Txn) Delete(table, index string, args ...interface{}) error {
|
||||
if !txn.write {
|
||||
return fmt.Errorf("cannot delete in read-only transaction")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type ResultIterator interface {
|
||||
Next() interface{}
|
||||
}
|
||||
|
||||
func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) {
|
||||
return nil, nil
|
||||
}
|
||||
Reference in New Issue
Block a user