diff --git a/nomad/memdb/index.go b/nomad/memdb/index.go new file mode 100644 index 000000000..774cdb895 --- /dev/null +++ b/nomad/memdb/index.go @@ -0,0 +1,32 @@ +package memdb + +import ( + "fmt" + "reflect" + "strings" +) + +// StringFieldIndex is used to extract a field from an object +// using reflection and builds an index on that field. +func StringFieldIndex(field string, lowercase bool) IndexerFunc { + return func(obj interface{}) (bool, []byte, error) { + v := reflect.ValueOf(obj) + v = reflect.Indirect(v) // Derefence the pointer if any + + fv := v.FieldByName(field) + if !fv.IsValid() { + return false, nil, + fmt.Errorf("field '%s' for %#v is invalid", field, obj) + } + + val := fv.String() + if val == "" { + return false, nil, nil + } + + if lowercase { + val = strings.ToLower(val) + } + return true, []byte(val), nil + } +} diff --git a/nomad/memdb/index_test.go b/nomad/memdb/index_test.go new file mode 100644 index 000000000..f2daac63f --- /dev/null +++ b/nomad/memdb/index_test.go @@ -0,0 +1,62 @@ +package memdb + +import "testing" + +type TestObject struct { + Foo string + Bar int + Baz string + Empty string +} + +func testObj() *TestObject { + obj := &TestObject{ + Foo: "Testing", + Bar: 42, + Baz: "yep", + } + return obj +} + +func TestStringFieldIndex(t *testing.T) { + obj := testObj() + indexer := StringFieldIndex("Foo", false) + + ok, val, err := indexer(obj) + if err != nil { + t.Fatalf("err: %v", err) + } + if string(val) != "Testing" { + t.Fatalf("bad: %s", val) + } + if !ok { + t.Fatalf("should be ok") + } + + lower := StringFieldIndex("Foo", true) + ok, val, err = lower(obj) + if err != nil { + t.Fatalf("err: %v", err) + } + if string(val) != "testing" { + t.Fatalf("bad: %s", val) + } + if !ok { + t.Fatalf("should be ok") + } + + badField := StringFieldIndex("NA", true) + ok, val, err = badField(obj) + if err == nil { + t.Fatalf("should get error") + } + + emptyField := StringFieldIndex("Empty", true) + ok, val, err = emptyField(obj) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("should not ok") + } +} diff --git a/nomad/memdb/memdb.go b/nomad/memdb/memdb.go new file mode 100644 index 000000000..89465983a --- /dev/null +++ b/nomad/memdb/memdb.go @@ -0,0 +1,48 @@ +package memdb + +import ( + "sync" + + "github.com/hashicorp/go-immutable-radix" +) + +// MemDB is an in-memory database. It provides a table abstraction, +// which is used to store objects (rows) with multiple indexes based +// on values. The database makes use of immutable radix trees to provide +// transactions and MVCC. +type MemDB struct { + schema *DBSchema + root *iradix.Tree + + // There can only be a single writter at once + writer sync.Mutex +} + +// NewMemDB creates a new MemDB with the given schema +func NewMemDB(schema *DBSchema) (*MemDB, error) { + // Validate the schema + if err := schema.Validate(); err != nil { + return nil, err + } + + // Create the MemDB + db := &MemDB{ + schema: schema, + root: iradix.New(), + } + return db, nil +} + +// Txn is used to start a new transaction, in either read or write mode. +// There can only be a single concurrent writer, but any number of readers. +func (db *MemDB) Txn(write bool) *Txn { + txn := &Txn{ + db: db, + write: write, + root: db.root, + } + if write { + txn.rootTxn = txn.root.Txn() + } + return txn +} diff --git a/nomad/memdb/schema.go b/nomad/memdb/schema.go new file mode 100644 index 000000000..b732129ed --- /dev/null +++ b/nomad/memdb/schema.go @@ -0,0 +1,68 @@ +package memdb + +import "fmt" + +// DBSchema contains the full database schema used for MemDB +type DBSchema struct { + Tables []*TableSchema +} + +// Validate is used to validate the database schema +func (s *DBSchema) Validate() error { + if s == nil { + return fmt.Errorf("missing schema") + } + if len(s.Tables) == 0 { + return fmt.Errorf("no tables defined") + } + for _, table := range s.Tables { + if err := table.Validate(); err != nil { + return err + } + } + return nil +} + +// TableSchema contains the schema for a single table +type TableSchema struct { + Name string + Indexes []*IndexSchema +} + +// Validate is used to validate the table schema +func (s *TableSchema) Validate() error { + if s.Name == "" { + return fmt.Errorf("missing table name") + } + if len(s.Indexes) == 0 { + return fmt.Errorf("missing table schemas for '%s'", s.Name) + } + for _, index := range s.Indexes { + if err := index.Validate(); err != nil { + return err + } + } + return nil +} + +// IndexerFunc is used to extract an index value from an +// object or to indicate that the index value is missing. +type IndexerFunc func(interface{}) (bool, []byte, error) + +// IndexSchema contains the schema for an index +type IndexSchema struct { + Name string + AllowMissing bool + Unique bool + Indexer IndexerFunc +} + +func (s *IndexSchema) Validate() error { + if s.Name == "" { + return fmt.Errorf("missing index name") + } + if s.Indexer == nil { + return fmt.Errorf("missing index function for '%s'", s.Name) + } + return nil +} diff --git a/nomad/memdb/txn.go b/nomad/memdb/txn.go new file mode 100644 index 000000000..42091f705 --- /dev/null +++ b/nomad/memdb/txn.go @@ -0,0 +1,79 @@ +package memdb + +import ( + "fmt" + + "github.com/hashicorp/go-immutable-radix" +) + +// Txn is a transaction against a MemDB. This can be a read or write transaction. +type Txn struct { + db *MemDB + write bool + root *iradix.Tree + rootTxn *iradix.Txn +} + +// Abort is used to cancel this transaction. This is a noop for read transactions. +func (txn *Txn) Abort() { + // Noop for a read transaction + if !txn.write { + return + } + + // Check if already aborted or committed + if txn.root == nil { + return + } + + // Release the writer lock since this is invalid + txn.db.writer.Unlock() + txn.root = nil + txn.rootTxn = nil +} + +// Commit is used to finalize this transaction. This is a noop for read transactions. +func (txn *Txn) Commit() { + // Noop for a read transaction + if !txn.write { + return + } + + // Check if already aborted or committed + if txn.root == nil { + return + } + + // Update the root of the DB + txn.db.root = txn.rootTxn.Commit() + + // Clear the txn + txn.root = nil + txn.rootTxn = nil + + // Release the writer lock since this is invalid + txn.db.writer.Unlock() +} + +// Insert is used to add or update an object into the given table +func (txn *Txn) Insert(table string, obj interface{}) error { + if !txn.write { + return fmt.Errorf("cannot insert in read-only transaction") + } + return nil +} + +func (txn *Txn) Delete(table, index string, args ...interface{}) error { + if !txn.write { + return fmt.Errorf("cannot delete in read-only transaction") + } + return nil +} + +type ResultIterator interface { + Next() interface{} +} + +func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) { + return nil, nil +}