Oss license support for ent builds (#8054)

* changes necessary to support oss licesning shims

revert nomad fmt changes

update test to work with enterprise changes

update tests to work with new ent enforcements

make check

update cas test to use scheduler algorithm

back out preemption changes

add comments

* remove unused method
This commit is contained in:
Drew Bailey
2020-05-27 13:46:52 -04:00
committed by GitHub
parent 5453e84ca3
commit 7fc495e30e
27 changed files with 953 additions and 85 deletions

View File

@@ -289,6 +289,9 @@ type Client struct {
// dynamicRegistry provides access to plugins that are dynamically registered
// with a nomad client. Currently only used for CSI.
dynamicRegistry dynamicplugins.Registry
// EnterpriseClient is used to set and check enterprise features for clients
EnterpriseClient *EnterpriseClient
}
var (
@@ -341,6 +344,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
invalidAllocs: make(map[string]struct{}),
serversContactedCh: make(chan struct{}),
serversContactedOnce: sync.Once{},
EnterpriseClient: newEnterpriseClient(),
}
c.batchNodeUpdates = newBatchNodeUpdates(
@@ -1839,6 +1843,7 @@ func (c *Client) updateNodeStatus() error {
c.triggerDiscovery()
}
c.EnterpriseClient.SetFeatures(resp.Features)
return nil
}

View File

@@ -0,0 +1,15 @@
// +build !ent
package client
// EnterpriseClient holds information and methods for enterprise functionality
type EnterpriseClient struct{}
func newEnterpriseClient() *EnterpriseClient {
return &EnterpriseClient{}
}
// SetFeatures is used for enterprise builds to configure enterprise features
func (ec *EnterpriseClient) SetFeatures(features uint64) {
return
}

View File

@@ -59,6 +59,9 @@ type Agent struct {
httpLogger log.Logger
logOutput io.Writer
// EnterpriseAgent holds information and methods for enterprise functionality
EnterpriseAgent *EnterpriseAgent
// consulService is Nomad's custom Consul client for managing services
// and checks.
consulService *consul.ServiceClient
@@ -121,6 +124,7 @@ func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, i
if err := a.setupClient(); err != nil {
return nil, err
}
if err := a.setupEnterpriseAgent(logger); err != nil {
return nil, err
}

View File

@@ -7,6 +7,10 @@ import (
"github.com/hashicorp/nomad/nomad/structs/config"
)
// EnterpriseAgent holds information and methods for enterprise functionality
// in OSS it is an empty struct.
type EnterpriseAgent struct{}
func (a *Agent) setupEnterpriseAgent(log hclog.Logger) error {
// configure eventer
a.auditor = &noOpAuditor{}

View File

@@ -348,6 +348,9 @@ type Config struct {
// RPCMaxConnsPerClient is the maximum number of concurrent RPC
// connections from a single IP address. nil/0 means no limit.
RPCMaxConnsPerClient int
// LicenseConfig is a tunable knob for enterprise license testing.
LicenseConfig *LicenseConfig
}
// CheckVersion is used to check if the ProtocolVersion is valid
@@ -413,6 +416,7 @@ func DefaultConfig() *Config {
TLSConfig: &config.TLSConfig{},
ReplicationBackoff: 30 * time.Second,
SentinelGCInterval: 30 * time.Second,
LicenseConfig: &LicenseConfig{},
AutopilotConfig: &structs.AutopilotConfig{
CleanupDeadServers: true,
LastContactThreshold: 200 * time.Millisecond,

View File

@@ -244,6 +244,8 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
}
}
reply.Features = n.srv.EnterpriseState.Features()
return nil
}

View File

@@ -6,8 +6,16 @@ import (
"github.com/hashicorp/consul/agent/consul/autopilot"
)
// LicenseConfig allows for tunable licensing config
// primarily used for enterprise testing
type LicenseConfig struct{}
type EnterpriseState struct{}
func (es *EnterpriseState) Features() uint64 {
return 0
}
func (s *Server) setupEnterprise(config *Config) error {
// Set up the OSS version of autopilot
apDelegate := &AutopilotDelegate{s}

View File

@@ -1167,6 +1167,9 @@ type NodeUpdateResponse struct {
EvalCreateIndex uint64
NodeModifyIndex uint64
// Features informs clients what enterprise features are allowed
Features uint64
// LeaderRPCAddr is the RPC address of the current Raft Leader. If
// empty, the current Nomad Server is in the minority of a partition.
LeaderRPCAddr string

View File

@@ -0,0 +1,9 @@
# UNRELEASED
FEATURES
* Add `SeekLowerBound` to allow for range scans. [[GH-24](https://github.com/hashicorp/go-immutable-radix/pull/24)]
# 1.0.0 (August 30th, 2018)
* go mod adopted

View File

@@ -1,4 +1,4 @@
go-immutable-radix [![Build Status](https://travis-ci.org/hashicorp/go-immutable-radix.png)](https://travis-ci.org/hashicorp/go-immutable-radix)
go-immutable-radix [![CircleCI](https://circleci.com/gh/hashicorp/go-immutable-radix/tree/master.svg?style=svg)](https://circleci.com/gh/hashicorp/go-immutable-radix/tree/master)
=========
Provides the `iradix` package that implements an immutable [radix tree](http://en.wikipedia.org/wiki/Radix_tree).
@@ -39,3 +39,28 @@ if string(m) != "foo" {
}
```
Here is an example of performing a range scan of the keys.
```go
// Create a tree
r := iradix.New()
r, _, _ = r.Insert([]byte("001"), 1)
r, _, _ = r.Insert([]byte("002"), 2)
r, _, _ = r.Insert([]byte("005"), 5)
r, _, _ = r.Insert([]byte("010"), 10)
r, _, _ = r.Insert([]byte("100"), 10)
// Range scan over the keys that sort lexicographically between [003, 050)
it := r.Root().Iterator()
it.SeekLowerBound([]byte("003"))
for key, _, ok := it.Next(); ok; key, _, ok = it.Next() {
if key >= "050" {
break
}
fmt.Println(key)
}
// Output:
// 005
// 010
```

View File

@@ -0,0 +1,6 @@
module github.com/hashicorp/go-immutable-radix
require (
github.com/hashicorp/go-uuid v1.0.0
github.com/hashicorp/golang-lru v0.5.0
)

View File

@@ -0,0 +1,4 @@
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=

View File

@@ -86,6 +86,20 @@ func (t *Tree) Txn() *Txn {
return txn
}
// Clone makes an independent copy of the transaction. The new transaction
// does not track any nodes and has TrackMutate turned off. The cloned transaction will contain any uncommitted writes in the original transaction but further mutations to either will be independent and result in different radix trees on Commit. A cloned transaction may be passed to another goroutine and mutated there independently however each transaction may only be mutated in a single thread.
func (t *Txn) Clone() *Txn {
// reset the writable node cache to avoid leaking future writes into the clone
t.writable = nil
txn := &Txn{
root: t.root,
snap: t.snap,
size: t.size,
}
return txn
}
// TrackMutate can be used to toggle if mutations are tracked. If this is enabled
// then notifications will be issued for affected internal nodes and leaves when
// the transaction is committed.
@@ -338,6 +352,11 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
if !n.isLeaf() {
return nil, nil
}
// Copy the pointer in case we are in a transaction that already
// modified this node since the node will be reused. Any changes
// made to the node will not affect returning the original leaf
// value.
oldLeaf := n.leaf
// Remove the leaf node
nc := t.writeNode(n, true)
@@ -347,7 +366,7 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
if n != t.root && len(nc.edges) == 1 {
t.mergeChild(nc)
}
return nc, n.leaf
return nc, oldLeaf
}
// Look for an edge

View File

@@ -1,6 +1,8 @@
package iradix
import "bytes"
import (
"bytes"
)
// Iterator is used to iterate over a set of nodes
// in pre-order
@@ -53,12 +55,107 @@ func (i *Iterator) SeekPrefix(prefix []byte) {
i.SeekPrefixWatch(prefix)
}
func (i *Iterator) recurseMin(n *Node) *Node {
// Traverse to the minimum child
if n.leaf != nil {
return n
}
if len(n.edges) > 0 {
// Add all the other edges to the stack (the min node will be added as
// we recurse)
i.stack = append(i.stack, n.edges[1:])
return i.recurseMin(n.edges[0].node)
}
// Shouldn't be possible
return nil
}
// SeekLowerBound is used to seek the iterator to the smallest key that is
// greater or equal to the given key. There is no watch variant as it's hard to
// predict based on the radix structure which node(s) changes might affect the
// result.
func (i *Iterator) SeekLowerBound(key []byte) {
// Wipe the stack. Unlike Prefix iteration, we need to build the stack as we
// go because we need only a subset of edges of many nodes in the path to the
// leaf with the lower bound.
i.stack = []edges{}
n := i.node
search := key
found := func(n *Node) {
i.node = n
i.stack = append(i.stack, edges{edge{node: n}})
}
for {
// Compare current prefix with the search key's same-length prefix.
var prefixCmp int
if len(n.prefix) < len(search) {
prefixCmp = bytes.Compare(n.prefix, search[0:len(n.prefix)])
} else {
prefixCmp = bytes.Compare(n.prefix, search)
}
if prefixCmp > 0 {
// Prefix is larger, that means the lower bound is greater than the search
// and from now on we need to follow the minimum path to the smallest
// leaf under this subtree.
n = i.recurseMin(n)
if n != nil {
found(n)
}
return
}
if prefixCmp < 0 {
// Prefix is smaller than search prefix, that means there is no lower
// bound
i.node = nil
return
}
// Prefix is equal, we are still heading for an exact match. If this is a
// leaf we're done.
if n.leaf != nil {
if bytes.Compare(n.leaf.key, key) < 0 {
i.node = nil
return
}
found(n)
return
}
// Consume the search prefix
if len(n.prefix) > len(search) {
search = []byte{}
} else {
search = search[len(n.prefix):]
}
// Otherwise, take the lower bound next edge.
idx, lbNode := n.getLowerBoundEdge(search[0])
if lbNode == nil {
i.node = nil
return
}
// Create stack edges for the all strictly higher edges in this node.
if idx+1 < len(n.edges) {
i.stack = append(i.stack, n.edges[idx+1:])
}
i.node = lbNode
// Recurse
n = lbNode
}
}
// Next returns the next node in order
func (i *Iterator) Next() ([]byte, interface{}, bool) {
// Initialize our stack if needed
if i.stack == nil && i.node != nil {
i.stack = []edges{
edges{
{
edge{node: i.node},
},
}

View File

@@ -79,6 +79,18 @@ func (n *Node) getEdge(label byte) (int, *Node) {
return -1, nil
}
func (n *Node) getLowerBoundEdge(label byte) (int, *Node) {
num := len(n.edges)
idx := sort.Search(num, func(i int) bool {
return n.edges[i].label >= label
})
// we want lower bound behavior so return even if it's not an exact match
if idx < num {
return idx, n.edges[idx].node
}
return -1, nil
}
func (n *Node) delEdge(label byte) {
num := len(n.edges)
idx := sort.Search(num, func(i int) bool {

View File

@@ -41,7 +41,7 @@ func (i *rawIterator) Next() {
// Initialize our stack if needed.
if i.stack == nil && i.node != nil {
i.stack = []rawStackEntry{
rawStackEntry{
{
edges: edges{
edge{node: i.node},
},

View File

@@ -1,9 +1,9 @@
# go-memdb
# go-memdb [![CircleCI](https://circleci.com/gh/hashicorp/go-memdb/tree/master.svg?style=svg)](https://circleci.com/gh/hashicorp/go-memdb/tree/master)
Provides the `memdb` package that implements a simple in-memory database
built on immutable radix trees. The database provides Atomicity, Consistency
and Isolation from ACID. Being that it is in-memory, it does not provide durability.
The database is instantiated with a schema that specifies the tables and indicies
The database is instantiated with a schema that specifies the tables and indices
that exist and allows transactions to be executed.
The database provides the following:
@@ -21,7 +21,7 @@ The database provides the following:
a single field index, or more advanced compound field indexes. Certain types like
UUID can be efficiently compressed from strings into byte indexes for reduced
storage requirements.
* Watches - Callers can populate a watch set as part of a query, which can be used to
detect when a modification has been made to the database which affects the query
results. This lets callers easily watch for changes in the database in a very general
@@ -32,50 +32,62 @@ For the underlying immutable radix trees, see [go-immutable-radix](https://githu
Documentation
=============
The full documentation is available on [Godoc](http://godoc.org/github.com/hashicorp/go-memdb).
The full documentation is available on [Godoc](https://pkg.go.dev/github.com/hashicorp/go-memdb).
Example
=======
Below is a simple example of usage
Below is a [simple example](https://play.golang.org/p/gCGE9FA4og1) of usage
```go
// Create a sample struct
type Person struct {
Email string
Name string
Age int
Email string
Name string
Age int
}
// Create the DB schema
schema := &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
"person": &memdb.TableSchema{
Name: "person",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "Email"},
},
},
},
},
Tables: map[string]*memdb.TableSchema{
"person": &memdb.TableSchema{
Name: "person",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "Email"},
},
"age": &memdb.IndexSchema{
Name: "age",
Unique: false,
Indexer: &memdb.IntFieldIndex{Field: "Age"},
},
},
},
},
}
// Create a new data base
db, err := memdb.NewMemDB(schema)
if err != nil {
panic(err)
panic(err)
}
// Create a write transaction
txn := db.Txn(true)
// Insert a new person
p := &Person{"joe@aol.com", "Joe", 30}
if err := txn.Insert("person", p); err != nil {
panic(err)
// Insert some people
people := []*Person{
&Person{"joe@aol.com", "Joe", 30},
&Person{"lucy@aol.com", "Lucy", 35},
&Person{"tariq@aol.com", "Tariq", 21},
&Person{"dorothy@aol.com", "Dorothy", 53},
}
for _, p := range people {
if err := txn.Insert("person", p); err != nil {
panic(err)
}
}
// Commit the transaction
@@ -88,11 +100,47 @@ defer txn.Abort()
// Lookup by email
raw, err := txn.First("person", "id", "joe@aol.com")
if err != nil {
panic(err)
panic(err)
}
// Say hi!
fmt.Printf("Hello %s!", raw.(*Person).Name)
fmt.Printf("Hello %s!\n", raw.(*Person).Name)
// List all the people
it, err := txn.Get("person", "id")
if err != nil {
panic(err)
}
fmt.Println("All the people:")
for obj := it.Next(); obj != nil; obj = it.Next() {
p := obj.(*Person)
fmt.Printf(" %s\n", p.Name)
}
// Range scan over people with ages between 25 and 35 inclusive
it, err = txn.LowerBound("person", "age", 25)
if err != nil {
panic(err)
}
fmt.Println("People aged 25 - 35:")
for obj := it.Next(); obj != nil; obj = it.Next() {
p := obj.(*Person)
if p.Age > 35 {
break
}
fmt.Printf(" %s is aged %d\n", p.Name, p.Age)
}
// Output:
// Hello Joe!
// All the people:
// Dorothy
// Joe
// Lucy
// Tariq
// People aged 25 - 35:
// Joe is aged 30
// Lucy is aged 35
```

34
vendor/github.com/hashicorp/go-memdb/changes.go generated vendored Normal file
View File

@@ -0,0 +1,34 @@
package memdb
// Changes describes a set of mutations to memDB tables performed during a
// transaction.
type Changes []Change
// Change describes a mutation to an object in a table.
type Change struct {
Table string
Before interface{}
After interface{}
// primaryKey stores the raw key value from the primary index so that we can
// de-duplicate multiple updates of the same object in the same transaction
// but we don't expose this implementation detail to the consumer.
primaryKey []byte
}
// Created returns true if the mutation describes a new object being inserted.
func (m *Change) Created() bool {
return m.Before == nil && m.After != nil
}
// Updated returns true if the mutation describes an existing object being
// updated.
func (m *Change) Updated() bool {
return m.Before != nil && m.After != nil
}
// Deleted returns true if the mutation describes an existing object being
// deleted.
func (m *Change) Deleted() bool {
return m.Before != nil && m.After == nil
}

8
vendor/github.com/hashicorp/go-memdb/go.mod generated vendored Normal file
View File

@@ -0,0 +1,8 @@
module github.com/hashicorp/go-memdb
go 1.12
require (
github.com/hashicorp/go-immutable-radix v1.2.0
github.com/hashicorp/golang-lru v0.5.4 // indirect
)

8
vendor/github.com/hashicorp/go-memdb/go.sum generated vendored Normal file
View File

@@ -0,0 +1,8 @@
github.com/hashicorp/go-immutable-radix v1.2.0 h1:l6UW37iCXwZkZoAbEYnptSHVE/cQ5bOTPYG5W3vf9+8=
github.com/hashicorp/go-immutable-radix v1.2.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=

View File

@@ -3,40 +3,52 @@ package memdb
import (
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"reflect"
"strings"
)
// Indexer is an interface used for defining indexes
// Indexer is an interface used for defining indexes. Indexes are used
// for efficient lookup of objects in a MemDB table. An Indexer must also
// implement one of SingleIndexer or MultiIndexer.
//
// Indexers are primarily responsible for returning the lookup key as
// a byte slice. The byte slice is the key data in the underlying data storage.
type Indexer interface {
// ExactFromArgs is used to build an exact index lookup
// based on arguments
// FromArgs is called to build the exact index key from a list of arguments.
FromArgs(args ...interface{}) ([]byte, error)
}
// SingleIndexer is an interface used for defining indexes
// generating a single entry per object
// SingleIndexer is an interface used for defining indexes that generate a
// single value per object
type SingleIndexer interface {
// FromObject is used to extract an index value from an
// object or to indicate that the index value is missing.
// FromObject extracts the index value from an object. The return values
// are whether the index value was found, the index value, and any error
// while extracting the index value, respectively.
FromObject(raw interface{}) (bool, []byte, error)
}
// MultiIndexer is an interface used for defining indexes
// generating multiple entries per object
// MultiIndexer is an interface used for defining indexes that generate
// multiple values per object. Each value is stored as a seperate index
// pointing to the same object.
//
// For example, an index that extracts the first and last name of a person
// and allows lookup based on eitherd would be a MultiIndexer. The FromObject
// of this example would split the first and last name and return both as
// values.
type MultiIndexer interface {
// FromObject is used to extract index values from an
// object or to indicate that the index value is missing.
// FromObject extracts index values from an object. The return values
// are the same as a SingleIndexer except there can be multiple index
// values.
FromObject(raw interface{}) (bool, [][]byte, error)
}
// PrefixIndexer can optionally be implemented for any
// indexes that support prefix based iteration. This may
// not apply to all indexes.
// PrefixIndexer is an optional interface on top of an Indexer that allows
// indexes to support prefix-based iteration.
type PrefixIndexer interface {
// PrefixFromArgs returns a prefix that should be used
// for scanning based on the arguments
// PrefixFromArgs is the same as FromArgs for an Indexer except that
// the index value returned should return all prefix-matched values.
PrefixFromArgs(args ...interface{}) ([]byte, error)
}
@@ -52,9 +64,16 @@ func (s *StringFieldIndex) FromObject(obj interface{}) (bool, []byte, error) {
v = reflect.Indirect(v) // Dereference the pointer if any
fv := v.FieldByName(s.Field)
if !fv.IsValid() {
isPtr := fv.Kind() == reflect.Ptr
fv = reflect.Indirect(fv)
if !isPtr && !fv.IsValid() {
return false, nil,
fmt.Errorf("field '%s' for %#v is invalid", s.Field, obj)
fmt.Errorf("field '%s' for %#v is invalid %v ", s.Field, obj, isPtr)
}
if isPtr && !fv.IsValid() {
val := ""
return false, []byte(val), nil
}
val := fv.String()
@@ -101,8 +120,9 @@ func (s *StringFieldIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
return val, nil
}
// StringSliceFieldIndex is used to extract a field from an object
// using reflection and builds an index on that field.
// StringSliceFieldIndex builds an index from a field on an object that is a
// string slice ([]string). Each value within the string slice can be used for
// lookup.
type StringSliceFieldIndex struct {
Field string
Lowercase bool
@@ -176,6 +196,16 @@ func (s *StringSliceFieldIndex) PrefixFromArgs(args ...interface{}) ([]byte, err
// StringMapFieldIndex is used to extract a field of type map[string]string
// from an object using reflection and builds an index on that field.
//
// Note that although FromArgs in theory supports using either one or
// two arguments, there is a bug: FromObject only creates an index
// using key/value, and does not also create an index using key. This
// means a lookup using one argument will never actually work.
//
// It is currently left as-is to prevent backwards compatibility
// issues.
//
// TODO: Fix this in the next major bump.
type StringMapFieldIndex struct {
Field string
Lowercase bool
@@ -221,6 +251,8 @@ func (s *StringMapFieldIndex) FromObject(obj interface{}) (bool, [][]byte, error
return true, vals, nil
}
// WARNING: Because of a bug in FromObject, this function will never return
// a value when using the single-argument version.
func (s *StringMapFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) > 2 || len(args) == 0 {
return nil, fmt.Errorf("must provide one or two arguments")
@@ -250,6 +282,79 @@ func (s *StringMapFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
return []byte(key), nil
}
// IntFieldIndex is used to extract an int field from an object using
// reflection and builds an index on that field.
type IntFieldIndex struct {
Field string
}
func (i *IntFieldIndex) FromObject(obj interface{}) (bool, []byte, error) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v) // Dereference the pointer if any
fv := v.FieldByName(i.Field)
if !fv.IsValid() {
return false, nil,
fmt.Errorf("field '%s' for %#v is invalid", i.Field, obj)
}
// Check the type
k := fv.Kind()
size, ok := IsIntType(k)
if !ok {
return false, nil, fmt.Errorf("field %q is of type %v; want an int", i.Field, k)
}
// Get the value and encode it
val := fv.Int()
buf := make([]byte, size)
binary.PutVarint(buf, val)
return true, buf, nil
}
func (i *IntFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
v := reflect.ValueOf(args[0])
if !v.IsValid() {
return nil, fmt.Errorf("%#v is invalid", args[0])
}
k := v.Kind()
size, ok := IsIntType(k)
if !ok {
return nil, fmt.Errorf("arg is of type %v; want a int", k)
}
val := v.Int()
buf := make([]byte, size)
binary.PutVarint(buf, val)
return buf, nil
}
// IsIntType returns whether the passed type is a type of int and the number
// of bytes needed to encode the type.
func IsIntType(k reflect.Kind) (size int, okay bool) {
switch k {
case reflect.Int:
return binary.MaxVarintLen64, true
case reflect.Int8:
return 2, true
case reflect.Int16:
return binary.MaxVarintLen16, true
case reflect.Int32:
return binary.MaxVarintLen32, true
case reflect.Int64:
return binary.MaxVarintLen64, true
default:
return 0, false
}
}
// UintFieldIndex is used to extract a uint field from an object using
// reflection and builds an index on that field.
type UintFieldIndex struct {
@@ -323,6 +428,41 @@ func IsUintType(k reflect.Kind) (size int, okay bool) {
}
}
// BoolFieldIndex is used to extract an boolean field from an object using
// reflection and builds an index on that field.
type BoolFieldIndex struct {
Field string
}
func (i *BoolFieldIndex) FromObject(obj interface{}) (bool, []byte, error) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v) // Dereference the pointer if any
fv := v.FieldByName(i.Field)
if !fv.IsValid() {
return false, nil,
fmt.Errorf("field '%s' for %#v is invalid", i.Field, obj)
}
// Check the type
k := fv.Kind()
if k != reflect.Bool {
return false, nil, fmt.Errorf("field %q is of type %v; want a bool", i.Field, k)
}
// Get the value and encode it
buf := make([]byte, 1)
if fv.Bool() {
buf[0] = 1
}
return true, buf, nil
}
func (i *BoolFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
return fromBoolArgs(args)
}
// UUIDFieldIndex is used to extract a field from an object
// using reflection and builds an index on that field by treating
// it as a UUID. This is an optimization to using a StringFieldIndex
@@ -528,7 +668,7 @@ func (c *CompoundIndex) FromObject(raw interface{}) (bool, []byte, error) {
func (c *CompoundIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != len(c.Indexes) {
return nil, fmt.Errorf("less arguments than index fields")
return nil, fmt.Errorf("non-equivalent argument count and index fields")
}
var out []byte
for i, arg := range args {
@@ -567,3 +707,177 @@ func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
}
return out, nil
}
// CompoundMultiIndex is used to build an index using multiple
// sub-indexes.
//
// Unlike CompoundIndex, CompoundMultiIndex can have both
// SingleIndexer and MultiIndexer sub-indexers. However, each
// MultiIndexer adds considerable overhead/complexity in terms of
// the number of indexes created under-the-hood. It is not suggested
// to use more than one or two, if possible.
//
// Another change from CompoundIndexer is that if AllowMissing is
// set, not only is it valid to have empty index fields, but it will
// still create index values up to the first empty index. This means
// that if you have a value with an empty field, rather than using a
// prefix for lookup, you can simply pass in less arguments. As an
// example, if {Foo, Bar} is indexed but Bar is missing for a value
// and AllowMissing is set, an index will still be created for {Foo}
// and it is valid to do a lookup passing in only Foo as an argument.
// Note that the ordering isn't guaranteed -- it's last-insert wins,
// but this is true if you have two objects that have the same
// indexes not using AllowMissing anyways.
//
// Because StringMapFieldIndexers can take a varying number of args,
// it is currently a requirement that whenever it is used, two
// arguments must _always_ be provided for it. In theory we only
// need one, except a bug in that indexer means the single-argument
// version will never work. You can leave the second argument nil,
// but it will never produce a value. We support this for whenever
// that bug is fixed, likely in a next major version bump.
//
// Prefix-based indexing is not currently supported.
type CompoundMultiIndex struct {
Indexes []Indexer
// AllowMissing results in an index based on only the indexers
// that return data. If true, you may end up with 2/3 columns
// indexed which might be useful for an index scan. Otherwise,
// CompoundMultiIndex requires all indexers to be satisfied.
AllowMissing bool
}
func (c *CompoundMultiIndex) FromObject(raw interface{}) (bool, [][]byte, error) {
// At each entry, builder is storing the results from the next index
builder := make([][][]byte, 0, len(c.Indexes))
// Start with something higher to avoid resizing if possible
out := make([][]byte, 0, len(c.Indexes)^3)
forloop:
// This loop goes through each indexer and adds the value(s) provided to the next
// entry in the slice. We can then later walk it like a tree to construct the indices.
for i, idxRaw := range c.Indexes {
switch idx := idxRaw.(type) {
case SingleIndexer:
ok, val, err := idx.FromObject(raw)
if err != nil {
return false, nil, fmt.Errorf("single sub-index %d error: %v", i, err)
}
if !ok {
if c.AllowMissing {
break forloop
} else {
return false, nil, nil
}
}
builder = append(builder, [][]byte{val})
case MultiIndexer:
ok, vals, err := idx.FromObject(raw)
if err != nil {
return false, nil, fmt.Errorf("multi sub-index %d error: %v", i, err)
}
if !ok {
if c.AllowMissing {
break forloop
} else {
return false, nil, nil
}
}
// Add each of the new values to each of the old values
builder = append(builder, vals)
default:
return false, nil, fmt.Errorf("sub-index %d does not satisfy either SingleIndexer or MultiIndexer", i)
}
}
// We are walking through the builder slice essentially in a depth-first fashion,
// building the prefix and leaves as we go. If AllowMissing is false, we only insert
// these full paths to leaves. Otherwise, we also insert each prefix along the way.
// This allows for lookup in FromArgs when AllowMissing is true that does not contain
// the full set of arguments. e.g. for {Foo, Bar} where an object has only the Foo
// field specified as "abc", it is valid to call FromArgs with just "abc".
var walkVals func([]byte, int)
walkVals = func(currPrefix []byte, depth int) {
if depth == len(builder)-1 {
// These are the "leaves", so append directly
for _, v := range builder[depth] {
out = append(out, append(currPrefix, v...))
}
return
}
for _, v := range builder[depth] {
nextPrefix := append(currPrefix, v...)
if c.AllowMissing {
out = append(out, nextPrefix)
}
walkVals(nextPrefix, depth+1)
}
}
walkVals(nil, 0)
return true, out, nil
}
func (c *CompoundMultiIndex) FromArgs(args ...interface{}) ([]byte, error) {
var stringMapCount int
var argCount int
for _, index := range c.Indexes {
if argCount >= len(args) {
break
}
if _, ok := index.(*StringMapFieldIndex); ok {
// We require pairs for StringMapFieldIndex, but only got one
if argCount+1 >= len(args) {
return nil, errors.New("invalid number of arguments")
}
stringMapCount++
argCount += 2
} else {
argCount++
}
}
argCount = 0
switch c.AllowMissing {
case true:
if len(args) > len(c.Indexes)+stringMapCount {
return nil, errors.New("too many arguments")
}
default:
if len(args) != len(c.Indexes)+stringMapCount {
return nil, errors.New("number of arguments does not equal number of indexers")
}
}
var out []byte
var val []byte
var err error
for i, idx := range c.Indexes {
if argCount >= len(args) {
// We're done; should only hit this if AllowMissing
break
}
if _, ok := idx.(*StringMapFieldIndex); ok {
if args[argCount+1] == nil {
val, err = idx.FromArgs(args[argCount])
} else {
val, err = idx.FromArgs(args[argCount : argCount+2]...)
}
argCount += 2
} else {
val, err = idx.FromArgs(args[argCount])
argCount++
}
if err != nil {
return nil, fmt.Errorf("sub-index %d error: %v", i, err)
}
out = append(out, val...)
}
return out, nil
}

View File

@@ -1,3 +1,5 @@
// Package memdb provides an in-memory database that supports transactions
// and MVCC.
package memdb
import (
@@ -8,16 +10,17 @@ import (
"github.com/hashicorp/go-immutable-radix"
)
// MemDB is an in-memory database. It provides a table abstraction,
// which is used to store objects (rows) with multiple indexes based
// on values. The database makes use of immutable radix trees to provide
// transactions and MVCC.
// MemDB is an in-memory database.
//
// MemDB provides a table abstraction to store objects (rows) with multiple
// indexes based on inserted values. The database makes use of immutable radix
// trees to provide transactions and MVCC.
type MemDB struct {
schema *DBSchema
root unsafe.Pointer // *iradix.Tree underneath
primary bool
// There can only be a single writter at once
// There can only be a single writer at once
writer sync.Mutex
}
@@ -37,6 +40,7 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) {
if err := db.initialize(); err != nil {
return nil, err
}
return db, nil
}
@@ -72,7 +76,8 @@ func (db *MemDB) Snapshot() *MemDB {
return clone
}
// initialize is used to setup the DB for use after creation
// initialize is used to setup the DB for use after creation. This should
// be called only once after allocating a MemDB.
func (db *MemDB) initialize() error {
root := db.getRoot()
for tName, tableSchema := range db.schema.Tables {

View File

@@ -2,33 +2,47 @@ package memdb
import "fmt"
// DBSchema contains the full database schema used for MemDB
// DBSchema is the schema to use for the full database with a MemDB instance.
//
// MemDB will require a valid schema. Schema validation can be tested using
// the Validate function. Calling this function is recommended in unit tests.
type DBSchema struct {
// Tables is the set of tables within this database. The key is the
// table name and must match the Name in TableSchema.
Tables map[string]*TableSchema
}
// Validate is used to validate the database schema
// Validate validates the schema.
func (s *DBSchema) Validate() error {
if s == nil {
return fmt.Errorf("missing schema")
return fmt.Errorf("schema is nil")
}
if len(s.Tables) == 0 {
return fmt.Errorf("no tables defined")
return fmt.Errorf("schema has no tables defined")
}
for name, table := range s.Tables {
if name != table.Name {
return fmt.Errorf("table name mis-match for '%s'", name)
}
if err := table.Validate(); err != nil {
return err
return fmt.Errorf("table %q: %s", name, err)
}
}
return nil
}
// TableSchema contains the schema for a single table
// TableSchema is the schema for a single table.
type TableSchema struct {
Name string
// Name of the table. This must match the key in the Tables map in DBSchema.
Name string
// Indexes is the set of indexes for querying this table. The key
// is a unique name for the index and must match the Name in the
// IndexSchema.
Indexes map[string]*IndexSchema
}
@@ -37,35 +51,50 @@ func (s *TableSchema) Validate() error {
if s.Name == "" {
return fmt.Errorf("missing table name")
}
if len(s.Indexes) == 0 {
return fmt.Errorf("missing table indexes for '%s'", s.Name)
}
if _, ok := s.Indexes["id"]; !ok {
return fmt.Errorf("must have id index")
}
if !s.Indexes["id"].Unique {
return fmt.Errorf("id index must be unique")
}
if _, ok := s.Indexes["id"].Indexer.(SingleIndexer); !ok {
return fmt.Errorf("id index must be a SingleIndexer")
}
for name, index := range s.Indexes {
if name != index.Name {
return fmt.Errorf("index name mis-match for '%s'", name)
}
if err := index.Validate(); err != nil {
return err
return fmt.Errorf("index %q: %s", name, err)
}
}
return nil
}
// IndexSchema contains the schema for an index
// IndexSchema is the schema for an index. An index defines how a table is
// queried.
type IndexSchema struct {
Name string
// Name of the index. This must be unique among a tables set of indexes.
// This must match the key in the map of Indexes for a TableSchema.
Name string
// AllowMissing if true ignores this index if it doesn't produce a
// value. For example, an index that extracts a field that doesn't
// exist from a structure.
AllowMissing bool
Unique bool
Indexer Indexer
Unique bool
Indexer Indexer
}
func (s *IndexSchema) Validate() error {

View File

@@ -7,13 +7,18 @@ import (
"sync/atomic"
"unsafe"
"github.com/hashicorp/go-immutable-radix"
iradix "github.com/hashicorp/go-immutable-radix"
)
const (
id = "id"
)
var (
// ErrNotFound is returned when the requested item is not found
ErrNotFound = fmt.Errorf("not found")
)
// tableIndex is a tuple of (Table, Index) used for lookups
type tableIndex struct {
Table string
@@ -28,9 +33,25 @@ type Txn struct {
rootTxn *iradix.Txn
after []func()
// changes is used to track the changes performed during the transaction. If
// it is nil at transaction start then changes are not tracked.
changes Changes
modified map[tableIndex]*iradix.Txn
}
// TrackChanges enables change tracking for the transaction. If called at any
// point before commit, subsequent mutations will be recorded and can be
// retrieved using ChangeSet. Once this has been called on a transaction it
// can't be unset. As with other Txn methods it's not safe to call this from a
// different goroutine than the one making mutations or committing the
// transaction.
func (txn *Txn) TrackChanges() {
if txn.changes == nil {
txn.changes = make(Changes, 0, 1)
}
}
// readableIndex returns a transaction usable for reading the given
// index in a table. If a write transaction is in progress, we may need
// to use an existing modified txn.
@@ -96,6 +117,7 @@ func (txn *Txn) Abort() {
// Clear the txn
txn.rootTxn = nil
txn.modified = nil
txn.changes = nil
// Release the writer lock since this is invalid
txn.db.writer.Unlock()
@@ -260,6 +282,14 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
indexTxn.Insert(val, obj)
}
}
if txn.changes != nil {
txn.changes = append(txn.changes, Change{
Table: table,
Before: existing, // might be nil on a create
After: obj,
primaryKey: idVal,
})
}
return nil
}
@@ -291,7 +321,7 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
idTxn := txn.writableIndex(table, id)
existing, ok := idTxn.Get(idVal)
if !ok {
return fmt.Errorf("not found")
return ErrNotFound
}
// Remove the object from all the indexes
@@ -327,6 +357,14 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
}
}
}
if txn.changes != nil {
txn.changes = append(txn.changes, Change{
Table: table,
Before: existing,
After: nil, // Now nil indicates deletion
primaryKey: idVal,
})
}
return nil
}
@@ -371,6 +409,19 @@ func (txn *Txn) DeletePrefix(table string, prefix_index string, prefix string) (
if !ok {
return false, fmt.Errorf("object missing primary index")
}
if txn.changes != nil {
// Record the deletion
idTxn := txn.writableIndex(table, id)
existing, ok := idTxn.Get(idVal)
if ok {
txn.changes = append(txn.changes, Change{
Table: table,
Before: existing,
After: nil, // Now nil indicates deletion
primaryKey: idVal,
})
}
}
// Remove the object from all the indexes except the given prefix index
for name, indexSchema := range tableSchema.Indexes {
if name == deletePrefixIndex {
@@ -408,6 +459,7 @@ func (txn *Txn) DeletePrefix(table string, prefix_index string, prefix string) (
}
}
}
}
if foundAny {
indexTxn := txn.writableIndex(table, deletePrefixIndex)
@@ -586,19 +638,11 @@ type ResultIterator interface {
// Get is used to construct a ResultIterator over all the
// rows that match the given constraints of an index.
func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) {
// Get the index value to scan
indexSchema, val, err := txn.getIndexValue(table, index, args...)
indexIter, val, err := txn.getIndexIterator(table, index, args...)
if err != nil {
return nil, err
}
// Get the index itself
indexTxn := txn.readableIndex(table, indexSchema.Name)
indexRoot := indexTxn.Root()
// Get an interator over the index
indexIter := indexRoot.Iterator()
// Seek the iterator to the appropriate sub-set
watchCh := indexIter.SeekPrefixWatch(val)
@@ -610,6 +654,129 @@ func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, e
return iter, nil
}
// LowerBound is used to construct a ResultIterator over all the the range of
// rows that have an index value greater than or equal to the provide args.
// Calling this then iterating until the rows are larger than required allows
// range scans within an index. It is not possible to watch the resulting
// iterator since the radix tree doesn't efficiently allow watching on lower
// bound changes. The WatchCh returned will be nill and so will block forever.
func (txn *Txn) LowerBound(table, index string, args ...interface{}) (ResultIterator, error) {
indexIter, val, err := txn.getIndexIterator(table, index, args...)
if err != nil {
return nil, err
}
// Seek the iterator to the appropriate sub-set
indexIter.SeekLowerBound(val)
// Create an iterator
iter := &radixIterator{
iter: indexIter,
}
return iter, nil
}
// objectID is a tuple of table name and the raw internal id byte slice
// converted to a string. It's only converted to a string to make it comparable
// so this struct can be used as a map index.
type objectID struct {
Table string
IndexVal string
}
// mutInfo stores metadata about mutations to allow collapsing multiple
// mutations to the same object into one.
type mutInfo struct {
firstBefore interface{}
lastIdx int
}
// Changes returns the set of object changes that have been made in the
// transaction so far. If change tracking is not enabled it wil always return
// nil. It can be called before or after Commit. If it is before Commit it will
// return all changes made so far which may not be the same as the final
// Changes. After abort it will always return nil. As with other Txn methods
// it's not safe to call this from a different goroutine than the one making
// mutations or committing the transaction. Mutations will appear in the order
// they were performed in the transaction but multiple operations to the same
// object will be collapsed so only the effective overall change to that object
// is present. If transaction operations are dependent (e.g. copy object X to Y
// then delete X) this might mean the set of mutations is incomplete to verify
// history, but it is complete in that the net effect is preserved (Y got a new
// value, X got removed).
func (txn *Txn) Changes() Changes {
if txn.changes == nil {
return nil
}
// De-duplicate mutations by key so all take effect at the point of the last
// write but we keep the mutations in order.
dups := make(map[objectID]mutInfo)
for i, m := range txn.changes {
oid := objectID{
Table: m.Table,
IndexVal: string(m.primaryKey),
}
// Store the latest mutation index for each key value
mi, ok := dups[oid]
if !ok {
// First entry for key, store the before value
mi.firstBefore = m.Before
}
mi.lastIdx = i
dups[oid] = mi
}
if len(dups) == len(txn.changes) {
// No duplicates found, fast path return it as is
return txn.changes
}
// Need to remove the duplicates
cs := make(Changes, 0, len(dups))
for i, m := range txn.changes {
oid := objectID{
Table: m.Table,
IndexVal: string(m.primaryKey),
}
mi := dups[oid]
if mi.lastIdx == i {
// This was the latest value for this key copy it with the before value in
// case it's different. Note that m is not a pointer so we are not
// modifying the txn.changeSet here - it's already a copy.
m.Before = mi.firstBefore
// Edge case - if the object was inserted and then eventually deleted in
// the same transaction, then the net affect on that key is a no-op. Don't
// emit a mutation with nil for before and after as it's meaningless and
// might violate expectations and cause a panic in code that assumes at
// least one must be set.
if m.Before == nil && m.After == nil {
continue
}
cs = append(cs, m)
}
}
// Store the de-duped version in case this is called again
txn.changes = cs
return cs
}
func (txn *Txn) getIndexIterator(table, index string, args ...interface{}) (*iradix.Iterator, []byte, error) {
// Get the index value to scan
indexSchema, val, err := txn.getIndexValue(table, index, args...)
if err != nil {
return nil, nil, err
}
// Get the index itself
indexTxn := txn.readableIndex(table, indexSchema.Name)
indexRoot := indexTxn.Root()
// Get an interator over the index
indexIter := indexRoot.Iterator()
return indexIter, val, nil
}
// Defer is used to push a new arbitrary function onto a stack which
// gets called when a transaction is committed and finished. Deferred
// functions are called in LIFO order, and only invoked at the end of
@@ -637,3 +804,26 @@ func (r *radixIterator) Next() interface{} {
}
return value
}
// Snapshot creates a snapshot of the current state of the transaction.
// Returns a new read-only transaction or nil if the transaction is already
// aborted or committed.
func (txn *Txn) Snapshot() *Txn {
if txn.rootTxn == nil {
return nil
}
snapshot := &Txn{
db: txn.db,
rootTxn: txn.rootTxn.Clone(),
}
// Commit sub-transactions into the snapshot
for key, subTxn := range txn.modified {
path := indexPath(key.Table, key.Index)
final := subTxn.CommitOnly()
snapshot.rootTxn.Insert(path, final)
}
return snapshot
}

View File

@@ -127,3 +127,18 @@ func (w WatchSet) watchMany(ctx context.Context) error {
return ctx.Err()
}
}
// WatchCh returns a channel that is used to wait for either the watch set to trigger
// or for the context to be cancelled. WatchCh creates a new goroutine each call, so
// callers may need to cache the returned channel to avoid creating extra goroutines.
func (w WatchSet) WatchCh(ctx context.Context) <-chan error {
// Create the outgoing channel
triggerCh := make(chan error, 1)
// Create a goroutine to collect the error from WatchCtx
go func() {
triggerCh <- w.WatchCtx(ctx)
}()
return triggerCh
}

View File

@@ -2,7 +2,7 @@ package memdb
//go:generate sh -c "go run watch-gen/main.go >watch_few.go"
import(
import (
"context"
)

4
vendor/vendor.json vendored
View File

@@ -234,8 +234,8 @@
{"path":"github.com/hashicorp/go-getter","checksumSHA1":"d4brua17AGQqMNtngK4xKOUwboY=","revision":"f5101da0117392c6e7960c934f05a2fd689a5b5f","revisionTime":"2019-08-22T19:45:07Z"},
{"path":"github.com/hashicorp/go-getter/helper/url","checksumSHA1":"9J+kDr29yDrwsdu2ULzewmqGjpA=","revision":"b345bfcec894fb7ff3fdf9b21baf2f56ea423d98","revisionTime":"2018-04-10T17:49:45Z"},
{"path":"github.com/hashicorp/go-hclog","checksumSHA1":"tNgHh706sto5/99XYD5jIuBDqa8=","revision":"0e86804c9e4bede0738cbbc370e705ef82580e7e","revisionTime":"2020-01-11T00:06:39Z","version":"v0.11.0","versionExact":"v0.11.0"},
{"path":"github.com/hashicorp/go-immutable-radix","checksumSHA1":"Cas2nprG6pWzf05A2F/OlnjUu2Y=","revision":"8aac2701530899b64bdea735a1de8da899815220","revisionTime":"2017-07-25T22:12:15Z"},
{"path":"github.com/hashicorp/go-memdb","checksumSHA1":"FMAvwDar2bQyYAW4XMFhAt0J5xA=","revision":"20ff6434c1cc49b80963d45bf5c6aa89c78d8d57","revisionTime":"2017-08-31T20:15:40Z"},
{"path":"github.com/hashicorp/go-immutable-radix","checksumSHA1":"nNkX7goZHHtAU8PhNwDsYmL/0+A=","revision":"96211587acecb2c23e5dbab9d6ee225346740a48","revisionTime":"2020-05-13T13:04:37Z"},
{"path":"github.com/hashicorp/go-memdb","checksumSHA1":"XdIjWJf8DU3ZErcDrEfBcUJzKG0=","revision":"ac8c839d3bac603f96bbb715e0d267c565567d0f","revisionTime":"2020-04-22T23:39:21Z"},
{"path":"github.com/hashicorp/go-msgpack/codec","checksumSHA1":"CKGYNUDKre3Z2g4hHNVfp5nTcfA=","revision":"cc7dbc9ee9335986a7245d8c29e1a9e2aa0cc8c7","revisionTime":"2019-11-01T19:38:46Z","version":"v1.1.5","versionExact":"v1.1.5"},
{"path":"github.com/hashicorp/go-multierror","checksumSHA1":"qb0WcbXXZyWv538gMYVHAq/qOlw=","revision":"72917a1559e17f38638ade54020ab372ba848d67","revisionTime":"2019-11-20T19:21:20Z"},
{"path":"github.com/hashicorp/go-plugin","checksumSHA1":"Nwod22KYiOycjys2ITllhNE9mtE=","revision":"809113480b559c989ea9cfcff62e9d387961f60b","revisionTime":"2019-10-04T17:18:45Z"},