Merge pull request #8772 from hashicorp/state/track-changes

track transaction changes
This commit is contained in:
Drew Bailey
2020-09-04 14:12:27 -04:00
committed by GitHub
8 changed files with 405 additions and 236 deletions

13
nomad/event/event.go Normal file
View File

@@ -0,0 +1,13 @@
package event
type Event struct {
Topic string
Key string
Index uint64
Payload interface{}
}
type EventPublisher struct{}
func NewPublisher() *EventPublisher { return &EventPublisher{} }
func (e EventPublisher) Publish(events []Event) {}

View File

@@ -574,7 +574,7 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
err := n.state.WithWriteTransaction(index, func(tx state.Txn) error {
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx)
if err != nil {
@@ -612,7 +612,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{}
// Perform all store updates atomically to ensure a consistent view for store readers.
// A partial update may increment the snapshot index, allowing eval brokers to process
// evals for jobs whose deregistering didn't get committed yet.
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
err := n.state.WithWriteTransaction(index, func(tx state.Txn) error {
for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil {
n.logger.Error("deregistering job failed", "job", jobNS, "error", err)

View File

@@ -2866,22 +2866,22 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
// Add a node
node := mock.Node()
state.UpsertNode(800, node)
require.NoError(t, state.UpsertNode(800, node))
// Make a job so that none of the tasks can be placed
job1 := mock.Job()
job1.TaskGroups[0].Tasks[0].Resources.CPU = 5000
state.UpsertJob(1000, job1)
require.NoError(t, state.UpsertJob(1000, job1))
// make a job which can make partial progress
alloc := mock.Alloc()
alloc.NodeID = node.ID
state.UpsertJob(1010, alloc.Job)
state.UpsertAllocs(1011, []*structs.Allocation{alloc})
require.NoError(t, state.UpsertJob(1010, alloc.Job))
require.NoError(t, state.UpsertAllocs(1011, []*structs.Allocation{alloc}))
// Delete the summaries
state.DeleteJobSummary(1030, job1.Namespace, job1.ID)
state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID)
require.NoError(t, state.DeleteJobSummary(1030, job1.Namespace, job1.ID))
require.NoError(t, state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID))
req := structs.GenericRequest{}
buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req)
@@ -2895,7 +2895,9 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
}
ws := memdb.NewWatchSet()
out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID)
out1, err := state.JobSummaryByID(ws, job1.Namespace, job1.ID)
require.NoError(t, err)
expected := structs.JobSummary{
JobID: job1.ID,
Namespace: job1.Namespace,
@@ -2914,7 +2916,9 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
// This exercises the code path which adds the allocations made by the
// planner and the number of unplaced allocations in the reconcile summaries
// codepath
out2, _ := state.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID)
out2, err := state.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID)
require.NoError(t, err)
expected = structs.JobSummary{
JobID: alloc.Job.ID,
Namespace: alloc.Job.Namespace,

View File

@@ -27,7 +27,7 @@ func autopilotConfigTableSchema() *memdb.TableSchema {
// AutopilotConfig is used to get the current Autopilot configuration.
func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
tx := s.db.Txn(false)
tx := s.db.ReadTxn()
defer tx.Abort()
// Get the autopilot config
@@ -45,11 +45,11 @@ func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error)
}
// AutopilotSetConfig is used to set the current Autopilot configuration.
func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
tx := s.db.Txn(true)
func (s *StateStore) AutopilotSetConfig(index uint64, config *structs.AutopilotConfig) error {
tx := s.db.WriteTxn(index)
defer tx.Abort()
s.autopilotSetConfigTxn(idx, tx, config)
s.autopilotSetConfigTxn(index, tx, config)
tx.Commit()
return nil
@@ -58,8 +58,8 @@ func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotCon
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
tx := s.db.Txn(true)
func (s *StateStore) AutopilotCASConfig(index, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
tx := s.db.WriteTxn(index)
defer tx.Abort()
// Check for an existing config
@@ -76,13 +76,13 @@ func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.Autopi
return false, nil
}
s.autopilotSetConfigTxn(idx, tx, config)
s.autopilotSetConfigTxn(index, tx, config)
tx.Commit()
return true, nil
}
func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *txn, config *structs.AutopilotConfig) error {
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {

View File

@@ -0,0 +1,152 @@
package state
import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/event"
)
// ReadTxn is implemented by memdb.Txn to perform read operations.
type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
First(table, index string, args ...interface{}) (interface{}, error)
FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error)
Abort()
}
// Changes wraps a memdb.Changes to include the index at which these changes
// were made.
type Changes struct {
// Index is the latest index at the time these changes were committed.
Index uint64
Changes memdb.Changes
}
// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
// all write transactions. When the transaction is committed the changes are
// sent to the eventPublisher which will create and emit change events.
type changeTrackerDB struct {
db *memdb.MemDB
publisher eventPublisher
processChanges func(ReadTxn, Changes) ([]event.Event, error)
}
func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn changeProcessor) *changeTrackerDB {
return &changeTrackerDB{
db: db,
publisher: event.NewPublisher(),
processChanges: changesFn,
}
}
type changeProcessor func(ReadTxn, Changes) ([]event.Event, error)
type eventPublisher interface {
Publish(events []event.Event)
}
// noOpPublisher satisfies the eventPublisher interface and does nothing
type noOpPublisher struct{}
func (n *noOpPublisher) Publish(events []event.Event) {}
func noOpProcessChanges(ReadTxn, Changes) ([]event.Event, error) { return []event.Event{}, nil }
// ReadTxn returns a read-only transaction which behaves exactly the same as
// memdb.Txn
//
// TODO: this could return a regular memdb.Txn if all the state functions accepted
// the ReadTxn interface
func (c *changeTrackerDB) ReadTxn() *txn {
return &txn{Txn: c.db.Txn(false)}
}
// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.
// It will track changes and publish events for the changes when Commit
// is called.
//
// The idx argument must be the index of the current Raft operation. Almost
// all mutations to state should happen as part of a raft apply so the index of
// the log being applied can be passed to WriteTxn.
// The exceptional cases are transactions that are executed on an empty
// memdb.DB as part of Restore, and those executed by tests where we insert
// data directly into the DB. These cases may use WriteTxnRestore.
func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
t := &txn{
Txn: c.db.Txn(true),
Index: idx,
publish: c.publish,
}
t.Txn.TrackChanges()
return t
}
func (c *changeTrackerDB) publish(changes Changes) error {
readOnlyTx := c.db.Txn(false)
defer readOnlyTx.Abort()
events, err := c.processChanges(readOnlyTx, changes)
if err != nil {
return fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.Publish(events)
return nil
}
// WriteTxnRestore returns a wrapped RW transaction that does NOT have change
// tracking enabled. This should only be used in Restore where we need to
// replace the entire contents of the Store without a need to track the changes.
// WriteTxnRestore uses a zero index since the whole restore doesn't really occur
// at one index - the effect is to write many values that were previously
// written across many indexes.
func (c *changeTrackerDB) WriteTxnRestore() *txn {
return &txn{
Txn: c.db.Txn(true),
Index: 0,
}
}
// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
//
// This can not be done with txn.Defer because the callback passed to Defer is
// invoked after commit completes, and because the callback can not return an
// error. Any errors from the callback would be lost, which would result in a
// missing change event, even though the state store had changed.
type txn struct {
*memdb.Txn
// Index in raft where the write is occurring. The value is zero for a
// read-only, or WriteTxnRestore transaction.
// Index is stored so that it may be passed along to any subscribers as part
// of a change event.
Index uint64
publish func(changes Changes) error
}
// Commit first pushes changes to EventPublisher, then calls Commit on the
// underlying transaction.
//
// Note that this function, unlike memdb.Txn, returns an error which must be checked
// by the caller. A non-nil error indicates that a commit failed and was not
// applied.
func (tx *txn) Commit() error {
// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases changes should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
}
if err := tx.publish(changes); err != nil {
return err
}
}
tx.Txn.Commit()
return nil
}
func processDBChanges(tx ReadTxn, changes Changes) ([]event.Event, error) {
// TODO: add handlers here.
return []event.Event{}, nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,6 @@
package state
import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -14,13 +13,13 @@ func (s *StateStore) enterpriseInit() error {
}
// namespaceExists returns whether a namespace exists
func (s *StateStore) namespaceExists(txn *memdb.Txn, namespace string) (bool, error) {
func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) {
return namespace == structs.DefaultNamespace, nil
}
// updateEntWithAlloc is used to update Nomad Enterprise objects when an allocation is
// added/modified/deleted
func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *memdb.Txn) error {
func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error {
return nil
}

View File

@@ -1903,7 +1903,7 @@ func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) {
// Actually delete
const deletionIndex = uint64(10001)
err = state.WithWriteTransaction(func(txn Txn) error {
err = state.WithWriteTransaction(deletionIndex, func(txn Txn) error {
for i, job := range jobs {
err := state.DeleteJobTxn(deletionIndex, job.Namespace, job.ID, txn)
require.NoError(t, err, "failed at %d %e", i, err)
@@ -6053,19 +6053,20 @@ func TestStateStore_RestoreAlloc(t *testing.T) {
func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) {
t.Parallel()
index := uint64(0)
state := testStateStore(t)
txn := state.db.Txn(true)
txn := state.db.WriteTxn(index)
// Create and insert a mock job.
job := mock.Job()
job.Status = ""
job.ModifyIndex = 0
job.ModifyIndex = index
if err := txn.Insert("jobs", job); err != nil {
t.Fatalf("job insert failed: %v", err)
}
exp := "foobar"
index := uint64(1000)
index = uint64(1000)
if err := state.setJobStatus(index, txn, job, false, exp); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}
@@ -6088,8 +6089,9 @@ func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) {
func TestStateStore_SetJobStatus_NoOp(t *testing.T) {
t.Parallel()
index := uint64(0)
state := testStateStore(t)
txn := state.db.Txn(true)
txn := state.db.WriteTxn(index)
// Create and insert a mock job that should be pending.
job := mock.Job()
@@ -6099,7 +6101,7 @@ func TestStateStore_SetJobStatus_NoOp(t *testing.T) {
t.Fatalf("job insert failed: %v", err)
}
index := uint64(1000)
index = uint64(1000)
if err := state.setJobStatus(index, txn, job, false, ""); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}
@@ -6119,7 +6121,7 @@ func TestStateStore_SetJobStatus(t *testing.T) {
t.Parallel()
state := testStateStore(t)
txn := state.db.Txn(true)
txn := state.db.WriteTxn(uint64(0))
// Create and insert a mock job that should be pending but has an incorrect
// status.
@@ -6155,7 +6157,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs(t *testing.T) {
job := mock.Job()
state := testStateStore(t)
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@@ -6171,7 +6173,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) {
job := mock.PeriodicJob()
state := testStateStore(t)
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@@ -6187,7 +6189,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) {
job := mock.Job()
state := testStateStore(t)
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@@ -6221,7 +6223,7 @@ func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@@ -6247,7 +6249,7 @@ func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@@ -6264,7 +6266,7 @@ func TestStateStore_GetJobStatus_PeriodicJob(t *testing.T) {
state := testStateStore(t)
job := mock.PeriodicJob()
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@@ -6293,7 +6295,7 @@ func TestStateStore_GetJobStatus_ParameterizedJob(t *testing.T) {
job := mock.Job()
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@@ -6329,7 +6331,7 @@ func TestStateStore_SetJobStatus_PendingEval(t *testing.T) {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@@ -6357,7 +6359,7 @@ func TestStateStore_SetJobStatus_SystemJob(t *testing.T) {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
txn := state.db.ReadTxn()
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
@@ -8694,7 +8696,6 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
require.ElementsMatch([]string{policy2.ID}, policiesInNS2)
}
func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
t.Parallel()