diff --git a/nomad/event/event.go b/nomad/event/event.go new file mode 100644 index 000000000..49e809ac8 --- /dev/null +++ b/nomad/event/event.go @@ -0,0 +1,13 @@ +package event + +type Event struct { + Topic string + Key string + Index uint64 + Payload interface{} +} + +type EventPublisher struct{} + +func NewPublisher() *EventPublisher { return &EventPublisher{} } +func (e EventPublisher) Publish(events []Event) {} diff --git a/nomad/fsm.go b/nomad/fsm.go index 98a65590b..22663fe17 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -574,7 +574,7 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - err := n.state.WithWriteTransaction(func(tx state.Txn) error { + err := n.state.WithWriteTransaction(index, func(tx state.Txn) error { err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx) if err != nil { @@ -612,7 +612,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} // Perform all store updates atomically to ensure a consistent view for store readers. // A partial update may increment the snapshot index, allowing eval brokers to process // evals for jobs whose deregistering didn't get committed yet. - err := n.state.WithWriteTransaction(func(tx state.Txn) error { + err := n.state.WithWriteTransaction(index, func(tx state.Txn) error { for jobNS, options := range req.Jobs { if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil { n.logger.Error("deregistering job failed", "job", jobNS, "error", err) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index e371b5d2a..714471654 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2866,22 +2866,22 @@ func TestFSM_ReconcileSummaries(t *testing.T) { // Add a node node := mock.Node() - state.UpsertNode(800, node) + require.NoError(t, state.UpsertNode(800, node)) // Make a job so that none of the tasks can be placed job1 := mock.Job() job1.TaskGroups[0].Tasks[0].Resources.CPU = 5000 - state.UpsertJob(1000, job1) + require.NoError(t, state.UpsertJob(1000, job1)) // make a job which can make partial progress alloc := mock.Alloc() alloc.NodeID = node.ID - state.UpsertJob(1010, alloc.Job) - state.UpsertAllocs(1011, []*structs.Allocation{alloc}) + require.NoError(t, state.UpsertJob(1010, alloc.Job)) + require.NoError(t, state.UpsertAllocs(1011, []*structs.Allocation{alloc})) // Delete the summaries - state.DeleteJobSummary(1030, job1.Namespace, job1.ID) - state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID) + require.NoError(t, state.DeleteJobSummary(1030, job1.Namespace, job1.ID)) + require.NoError(t, state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID)) req := structs.GenericRequest{} buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req) @@ -2895,7 +2895,9 @@ func TestFSM_ReconcileSummaries(t *testing.T) { } ws := memdb.NewWatchSet() - out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID) + out1, err := state.JobSummaryByID(ws, job1.Namespace, job1.ID) + require.NoError(t, err) + expected := structs.JobSummary{ JobID: job1.ID, Namespace: job1.Namespace, @@ -2914,7 +2916,9 @@ func TestFSM_ReconcileSummaries(t *testing.T) { // This exercises the code path which adds the allocations made by the // planner and the number of unplaced allocations in the reconcile summaries // codepath - out2, _ := state.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID) + out2, err := state.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID) + require.NoError(t, err) + expected = structs.JobSummary{ JobID: alloc.Job.ID, Namespace: alloc.Job.Namespace, diff --git a/nomad/state/autopilot.go b/nomad/state/autopilot.go index 79447b2ce..ebb7e05bb 100644 --- a/nomad/state/autopilot.go +++ b/nomad/state/autopilot.go @@ -27,7 +27,7 @@ func autopilotConfigTableSchema() *memdb.TableSchema { // AutopilotConfig is used to get the current Autopilot configuration. func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) { - tx := s.db.Txn(false) + tx := s.db.ReadTxn() defer tx.Abort() // Get the autopilot config @@ -45,11 +45,11 @@ func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) } // AutopilotSetConfig is used to set the current Autopilot configuration. -func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error { - tx := s.db.Txn(true) +func (s *StateStore) AutopilotSetConfig(index uint64, config *structs.AutopilotConfig) error { + tx := s.db.WriteTxn(index) defer tx.Abort() - s.autopilotSetConfigTxn(idx, tx, config) + s.autopilotSetConfigTxn(index, tx, config) tx.Commit() return nil @@ -58,8 +58,8 @@ func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotCon // AutopilotCASConfig is used to try updating the Autopilot configuration with a // given Raft index. If the CAS index specified is not equal to the last observed index // for the config, then the call is a noop, -func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) { - tx := s.db.Txn(true) +func (s *StateStore) AutopilotCASConfig(index, cidx uint64, config *structs.AutopilotConfig) (bool, error) { + tx := s.db.WriteTxn(index) defer tx.Abort() // Check for an existing config @@ -76,13 +76,13 @@ func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.Autopi return false, nil } - s.autopilotSetConfigTxn(idx, tx, config) + s.autopilotSetConfigTxn(index, tx, config) tx.Commit() return true, nil } -func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error { +func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *txn, config *structs.AutopilotConfig) error { // Check for an existing config existing, err := tx.First("autopilot-config", "id") if err != nil { diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go new file mode 100644 index 000000000..22e32f31b --- /dev/null +++ b/nomad/state/state_changes.go @@ -0,0 +1,152 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/event" +) + +// ReadTxn is implemented by memdb.Txn to perform read operations. +type ReadTxn interface { + Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) + First(table, index string, args ...interface{}) (interface{}, error) + FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) + Abort() +} + +// Changes wraps a memdb.Changes to include the index at which these changes +// were made. +type Changes struct { + // Index is the latest index at the time these changes were committed. + Index uint64 + Changes memdb.Changes +} + +// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on +// all write transactions. When the transaction is committed the changes are +// sent to the eventPublisher which will create and emit change events. +type changeTrackerDB struct { + db *memdb.MemDB + publisher eventPublisher + processChanges func(ReadTxn, Changes) ([]event.Event, error) +} + +func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn changeProcessor) *changeTrackerDB { + return &changeTrackerDB{ + db: db, + publisher: event.NewPublisher(), + processChanges: changesFn, + } +} + +type changeProcessor func(ReadTxn, Changes) ([]event.Event, error) + +type eventPublisher interface { + Publish(events []event.Event) +} + +// noOpPublisher satisfies the eventPublisher interface and does nothing +type noOpPublisher struct{} + +func (n *noOpPublisher) Publish(events []event.Event) {} +func noOpProcessChanges(ReadTxn, Changes) ([]event.Event, error) { return []event.Event{}, nil } + +// ReadTxn returns a read-only transaction which behaves exactly the same as +// memdb.Txn +// +// TODO: this could return a regular memdb.Txn if all the state functions accepted +// the ReadTxn interface +func (c *changeTrackerDB) ReadTxn() *txn { + return &txn{Txn: c.db.Txn(false)} +} + +// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store. +// It will track changes and publish events for the changes when Commit +// is called. +// +// The idx argument must be the index of the current Raft operation. Almost +// all mutations to state should happen as part of a raft apply so the index of +// the log being applied can be passed to WriteTxn. +// The exceptional cases are transactions that are executed on an empty +// memdb.DB as part of Restore, and those executed by tests where we insert +// data directly into the DB. These cases may use WriteTxnRestore. +func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { + t := &txn{ + Txn: c.db.Txn(true), + Index: idx, + publish: c.publish, + } + t.Txn.TrackChanges() + return t +} + +func (c *changeTrackerDB) publish(changes Changes) error { + readOnlyTx := c.db.Txn(false) + defer readOnlyTx.Abort() + + events, err := c.processChanges(readOnlyTx, changes) + if err != nil { + return fmt.Errorf("failed generating events from changes: %v", err) + } + c.publisher.Publish(events) + return nil +} + +// WriteTxnRestore returns a wrapped RW transaction that does NOT have change +// tracking enabled. This should only be used in Restore where we need to +// replace the entire contents of the Store without a need to track the changes. +// WriteTxnRestore uses a zero index since the whole restore doesn't really occur +// at one index - the effect is to write many values that were previously +// written across many indexes. +func (c *changeTrackerDB) WriteTxnRestore() *txn { + return &txn{ + Txn: c.db.Txn(true), + Index: 0, + } +} + +// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. +// +// This can not be done with txn.Defer because the callback passed to Defer is +// invoked after commit completes, and because the callback can not return an +// error. Any errors from the callback would be lost, which would result in a +// missing change event, even though the state store had changed. +type txn struct { + *memdb.Txn + // Index in raft where the write is occurring. The value is zero for a + // read-only, or WriteTxnRestore transaction. + // Index is stored so that it may be passed along to any subscribers as part + // of a change event. + Index uint64 + publish func(changes Changes) error +} + +// Commit first pushes changes to EventPublisher, then calls Commit on the +// underlying transaction. +// +// Note that this function, unlike memdb.Txn, returns an error which must be checked +// by the caller. A non-nil error indicates that a commit failed and was not +// applied. +func (tx *txn) Commit() error { + // publish may be nil if this is a read-only or WriteTxnRestore transaction. + // In those cases changes should also be empty, and there will be nothing + // to publish. + if tx.publish != nil { + changes := Changes{ + Index: tx.Index, + Changes: tx.Txn.Changes(), + } + if err := tx.publish(changes); err != nil { + return err + } + } + + tx.Txn.Commit() + return nil +} + +func processDBChanges(tx ReadTxn, changes Changes) ([]event.Event, error) { + // TODO: add handlers here. + return []event.Event{}, nil +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d8ebbabd9..fb905fce0 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -13,12 +13,13 @@ import ( "github.com/pkg/errors" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/event" "github.com/hashicorp/nomad/nomad/structs" ) // Txn is a transaction against a state store. // This can be a read or write transaction. -type Txn = *memdb.Txn +type Txn = *txn const ( // NodeRegisterEventReregistered is the message used when the node becomes @@ -55,7 +56,7 @@ type StateStoreConfig struct { // considered a constant and NEVER modified in place. type StateStore struct { logger log.Logger - db *memdb.MemDB + db *changeTrackerDB // config is the passed in configuration config *StateStoreConfig @@ -76,10 +77,10 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { // Create the state store s := &StateStore{ logger: config.Logger.Named("state_store"), - db: db, config: config, abandonCh: make(chan struct{}), } + s.db = NewChangeTrackerDB(db, event.NewPublisher(), processDBChanges) // Initialize the state store with required enterprise objects if err := s.enterpriseInit(); err != nil { @@ -98,12 +99,17 @@ func (s *StateStore) Config() *StateStoreConfig { // we use MemDB, we just need to snapshot the state of the underlying // database. func (s *StateStore) Snapshot() (*StateSnapshot, error) { + memDBSnap := s.db.db.Snapshot() + + store := StateStore{ + logger: s.logger, + config: s.config, + } + + store.db = NewChangeTrackerDB(memDBSnap, &noOpPublisher{}, noOpProcessChanges) + snap := &StateSnapshot{ - StateStore: StateStore{ - logger: s.logger, - config: s.config, - db: s.db.Snapshot(), - }, + StateStore: store, } return snap, nil } @@ -167,7 +173,7 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State // state by minimizing the number of transactions and checking // overhead. func (s *StateStore) Restore() (*StateRestore, error) { - txn := s.db.Txn(true) + txn := s.db.WriteTxnRestore() r := &StateRestore{ txn: txn, } @@ -253,7 +259,7 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return err } - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Upsert the newly created or updated deployment @@ -347,7 +353,7 @@ func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) { // upsertDeploymentUpdates updates the deployments given the passed status // updates. -func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error { +func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *txn) error { for _, u := range updates { if err := s.updateDeploymentStatusImpl(index, u, txn); err != nil { return err @@ -359,7 +365,7 @@ func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.De // UpsertJobSummary upserts a job summary into the state store. func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Check if the job summary already exists @@ -394,7 +400,7 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma // DeleteJobSummary deletes the job summary with the given ID. This is for // testing purposes only. func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Delete the job summary @@ -411,7 +417,7 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error // UpsertDeployment is used to insert a new deployment. If cancelPrior is set to // true, all prior deployments for the same job will be cancelled. func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.upsertDeploymentImpl(index, deployment, txn); err != nil { return err @@ -420,7 +426,7 @@ func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployme return nil } -func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, txn *memdb.Txn) error { +func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, txn *txn) error { // Check if the deployment already exists existing, err := txn.First("deployment", "id", deployment.ID) if err != nil { @@ -457,7 +463,7 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl } func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire deployments table iter, err := txn.Get("deployment", "id") @@ -470,7 +476,7 @@ func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error } func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire deployments table iter, err := txn.Get("deployment", "namespace", namespace) @@ -483,7 +489,7 @@ func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) } func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire deployments table iter, err := txn.Get("deployment", "id_prefix", deploymentID) @@ -512,11 +518,11 @@ func deploymentNamespaceFilter(namespace string) func(interface{}) bool { } func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*structs.Deployment, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.deploymentByIDImpl(ws, deploymentID, txn) } -func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, txn *memdb.Txn) (*structs.Deployment, error) { +func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, txn *txn) (*structs.Deployment, error) { watchCh, existing, err := txn.FirstWatch("deployment", "id", deploymentID) if err != nil { return nil, fmt.Errorf("deployment lookup failed: %v", err) @@ -531,7 +537,7 @@ func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, } func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Deployment, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() var job *structs.Job // Read job from state store @@ -574,7 +580,7 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID stri // LatestDeploymentByJobID returns the latest deployment for the given job. The // latest is determined strictly by CreateIndex. func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the deployments iter, err := txn.Get("deployment", "job", namespace, jobID) @@ -602,7 +608,7 @@ func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID // DeleteDeployment is used to delete a set of deployments by ID func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if len(deploymentIDs) == 0 { @@ -636,7 +642,7 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro // UpsertScalingEvent is used to insert a new scaling event. // Only the most recent JobTrackedScalingEvents will be kept. func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventRequest) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Get the existing events @@ -687,7 +693,7 @@ func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventR // ScalingEvents returns an iterator over all the job scaling events func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire scaling_event table iter, err := txn.Get("scaling_event", "id") @@ -701,7 +707,7 @@ func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, err } func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, uint64, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("scaling_event", "id", namespace, jobID) if err != nil { @@ -720,7 +726,7 @@ func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID stri // This is assumed to be triggered by the client, so we retain the value // of drain/eligibility which is set by the scheduler. func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Check if the node already exists @@ -781,7 +787,7 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error { return fmt.Errorf("node ids missing") } - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, nodeID := range nodes { @@ -814,9 +820,19 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error { // UpdateNodeStatus is used to update the status of a node func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() + if err := s.updateNodeStatusTxn(txn, nodeID, status, updatedAt, event); err != nil { + return err + } + + txn.Commit() + return nil +} + +func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error { + // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -833,28 +849,26 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updat // Add the event if given if event != nil { - appendNodeEvents(index, copyNode, []*structs.NodeEvent{event}) + appendNodeEvents(txn.Index, copyNode, []*structs.NodeEvent{event}) } // Update the status in the copy copyNode.Status = status - copyNode.ModifyIndex = index + copyNode.ModifyIndex = txn.Index // Insert the node if err := txn.Insert("nodes", copyNode); err != nil { return fmt.Errorf("node update failed: %v", err) } - if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil { return fmt.Errorf("index update failed: %v", err) } - - txn.Commit() return nil } // BatchUpdateNodeDrain is used to update the drain of a node set of nodes func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for node, update := range updates { if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil { @@ -869,7 +883,7 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil { return err @@ -878,7 +892,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, return nil } -func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string, +func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { // Lookup the node @@ -925,7 +939,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st // UpdateNodeEligibility is used to update the scheduling eligibility of a node func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Lookup the node @@ -971,7 +985,7 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil // UpsertNodeEvents adds the node events to the nodes, rotating events as // necessary. func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for nodeID, events := range nodeEvents { @@ -987,7 +1001,7 @@ func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*str // upsertNodeEvent upserts a node event for a respective node. It also maintains // that a fixed number of node events are ever stored simultaneously, deleting // older events once this bound has been reached. -func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *memdb.Txn) error { +func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *txn) error { // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -1031,7 +1045,7 @@ func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEv // upsertNodeCSIPlugins indexes csi plugins for volume retrieval, with health. It's called // on upsertNodeEvents, so that event driven health changes are updated -func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) error { +func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error { loop := func(info *structs.CSIInfo) error { raw, err := txn.First("csi_plugins", "id", info.PluginID) @@ -1147,7 +1161,7 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro } // deleteNodeCSIPlugins cleans up CSIInfo node health status, called in DeleteNode -func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) error { +func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error { if len(node.CSIControllerPlugins) == 0 && len(node.CSINodePlugins) == 0 { return nil } @@ -1190,7 +1204,7 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro } // updateOrGCPlugin updates a plugin but will delete it if the plugin is empty -func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) error { +func updateOrGCPlugin(index uint64, txn *txn, plug *structs.CSIPlugin) error { plug.ModifyIndex = index if plug.IsEmpty() { @@ -1209,7 +1223,7 @@ func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) err // deleteJobFromPlugins removes the allocations of this job from any plugins the job is // running, possibly deleting the plugin if it's no longer in use. It's called in DeleteJobTxn -func (s *StateStore) deleteJobFromPlugins(index uint64, txn *memdb.Txn, job *structs.Job) error { +func (s *StateStore) deleteJobFromPlugins(index uint64, txn *txn, job *structs.Job) error { ws := memdb.NewWatchSet() summary, err := s.JobSummaryByID(ws, job.Namespace, job.ID) if err != nil { @@ -1304,7 +1318,7 @@ func (s *StateStore) deleteJobFromPlugins(index uint64, txn *memdb.Txn, job *str // NodeByID is used to lookup a node by ID func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("nodes", "id", nodeID) if err != nil { @@ -1320,7 +1334,7 @@ func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, // NodesByIDPrefix is used to lookup nodes by prefix func (s *StateStore) NodesByIDPrefix(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("nodes", "id_prefix", nodeID) if err != nil { @@ -1333,7 +1347,7 @@ func (s *StateStore) NodesByIDPrefix(ws memdb.WatchSet, nodeID string) (memdb.Re // NodeBySecretID is used to lookup a node by SecretID func (s *StateStore) NodeBySecretID(ws memdb.WatchSet, secretID string) (*structs.Node, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("nodes", "secret_id", secretID) if err != nil { @@ -1349,7 +1363,7 @@ func (s *StateStore) NodeBySecretID(ws memdb.WatchSet, secretID string) (*struct // Nodes returns an iterator over all the nodes func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire nodes table iter, err := txn.Get("nodes", "id") @@ -1362,7 +1376,7 @@ func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) { // UpsertJob is used to register a job or update a job definition func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.upsertJobImpl(index, job, false, txn); err != nil { return err @@ -1378,7 +1392,7 @@ func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error } // upsertJobImpl is the implementation for registering a job or updating a job definition -func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error { +func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *txn) error { // Assert the namespace exists if exists, err := s.namespaceExists(txn, job.Namespace); err != nil { return err @@ -1465,7 +1479,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b // DeleteJob is used to deregister a job func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() err := s.DeleteJobTxn(index, namespace, jobID, txn) @@ -1577,7 +1591,7 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn } // deleteJobScalingPolicies deletes any scaling policies associated with the job -func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error { iter, err := s.ScalingPoliciesByJobTxn(nil, job.Namespace, job.ID, txn) if err != nil { return fmt.Errorf("getting job scaling policies for deletion failed: %v", err) @@ -1610,7 +1624,7 @@ func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, tx } // deleteJobVersions deletes all versions of the given job. -func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *txn) error { iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID) if err != nil { return err @@ -1650,7 +1664,7 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd // upsertJobVersion inserts a job into its historic version table and limits the // number of job versions that are tracked. -func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) error { // Insert the job if err := txn.Insert("job_version", job); err != nil { return fmt.Errorf("failed to insert job into job_version table: %v", err) @@ -1700,7 +1714,7 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb // JobByID is used to lookup a job by its ID. JobByID returns the current/latest job // version. func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.JobByIDTxn(ws, namespace, id, txn) } @@ -1721,7 +1735,7 @@ func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn // JobsByIDPrefix is used to lookup a job by prefix func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("jobs", "id_prefix", namespace, id) if err != nil { @@ -1735,7 +1749,7 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (me // JobVersionsByID returns all the tracked versions of a job. func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.jobVersionByID(txn, &ws, namespace, id) } @@ -1743,7 +1757,7 @@ func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([ // jobVersionByID is the underlying implementation for retrieving all tracked // versions of a job and is called under an existing transaction. A watch set // can optionally be passed in to add the job histories to the watch set. -func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { +func (s *StateStore) jobVersionByID(txn *txn, ws *memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { // Get all the historic jobs for this ID iter, err := txn.Get("job_version", "id_prefix", namespace, id) if err != nil { @@ -1781,14 +1795,14 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, namespac // JobByIDAndVersion returns the job identified by its ID and Version. The // passed watchset may be nil. func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.jobByIDAndVersionImpl(ws, namespace, id, version, txn) } // jobByIDAndVersionImpl returns the job identified by its ID and Version. The // passed watchset may be nil. func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id string, - version uint64, txn *memdb.Txn) (*structs.Job, error) { + version uint64, txn *txn) (*structs.Job, error) { watchCh, existing, err := txn.FirstWatch("job_version", "id", namespace, id, version) if err != nil { @@ -1808,7 +1822,7 @@ func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id stri } func (s *StateStore) JobVersions(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire deployments table iter, err := txn.Get("job_version", "id") @@ -1822,7 +1836,7 @@ func (s *StateStore) JobVersions(ws memdb.WatchSet) (memdb.ResultIterator, error // Jobs returns an iterator over all the jobs func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire jobs table iter, err := txn.Get("jobs", "id") @@ -1837,12 +1851,12 @@ func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) { // JobsByNamespace returns an iterator over all the jobs for the given namespace func (s *StateStore) JobsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.jobsByNamespaceImpl(ws, namespace, txn) } // jobsByNamespaceImpl returns an iterator over all the jobs for the given namespace -func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, txn *memdb.Txn) (memdb.ResultIterator, error) { +func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, txn *txn) (memdb.ResultIterator, error) { // Walk the entire jobs table iter, err := txn.Get("jobs", "id_prefix", namespace, "") if err != nil { @@ -1856,7 +1870,7 @@ func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, tx // JobsByPeriodic returns an iterator over all the periodic or non-periodic jobs. func (s *StateStore) JobsByPeriodic(ws memdb.WatchSet, periodic bool) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("jobs", "periodic", periodic) if err != nil { @@ -1871,7 +1885,7 @@ func (s *StateStore) JobsByPeriodic(ws memdb.WatchSet, periodic bool) (memdb.Res // JobsByScheduler returns an iterator over all the jobs with the specific // scheduler type. func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Return an iterator for jobs with the specific type. iter, err := txn.Get("jobs", "type", schedulerType) @@ -1887,7 +1901,7 @@ func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (m // JobsByGC returns an iterator over all jobs eligible or uneligible for garbage // collection. func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("jobs", "gc", gc) if err != nil { @@ -1901,7 +1915,7 @@ func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, // JobSummary returns a job summary object which matches a specific id. func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("job_summary", "id", namespace, jobID) if err != nil { @@ -1921,7 +1935,7 @@ func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) // JobSummaries walks the entire job summary table and returns all the job // summary objects func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("job_summary", "id") if err != nil { @@ -1935,7 +1949,7 @@ func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, erro // JobSummaryByPrefix is used to look up Job Summary by id prefix func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("job_summary", "id_prefix", namespace, id) if err != nil { @@ -1949,7 +1963,7 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) // CSIVolumeRegister adds a volume to the server store, failing if it already exists func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, v := range volumes { @@ -1999,7 +2013,7 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum // CSIVolumes returns the unfiltered list of all volumes func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() iter, err := txn.Get("csi_volumes", "id") @@ -2015,7 +2029,7 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) // CSIVolumeByID is used to lookup a single volume. Returns a copy of the volume // because its plugins are denormalized to provide accurate Health. func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, obj, err := txn.FirstWatch("csi_volumes", "id_prefix", namespace, id) if err != nil { @@ -2033,7 +2047,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st // CSIVolumes looks up csi_volumes by pluginID func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "plugin_id", pluginID) if err != nil { @@ -2055,7 +2069,7 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID // CSIVolumesByIDPrefix supports search func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID) if err != nil { @@ -2094,7 +2108,7 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb // Lookup the raw CSIVolumes to match the other list interfaces iter := NewSliceIterator() - txn := s.db.Txn(false) + txn := s.db.ReadTxn() for id, namespace := range ids { raw, err := txn.First("csi_volumes", "id", namespace, id) if err != nil { @@ -2108,7 +2122,7 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb // CSIVolumesByNamespace looks up the entire csi_volumes table func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "id_prefix", namespace, "") if err != nil { @@ -2121,7 +2135,7 @@ func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace string) // CSIVolumeClaim updates the volume's claim count and allocation list func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *structs.CSIVolumeClaim) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() ws := memdb.NewWatchSet() @@ -2189,7 +2203,7 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s // CSIVolumeDeregister removes the volume from the server func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []string, force bool) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, id := range ids { @@ -2261,7 +2275,7 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs return nil, nil } // Lookup CSIPlugin, the health records, and calculate volume health - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() plug, err := s.CSIPluginByID(ws, vol.PluginID) @@ -2339,7 +2353,7 @@ func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVol // CSIPlugins returns the unfiltered list of all plugin health status func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() iter, err := txn.Get("csi_plugins", "id") @@ -2354,7 +2368,7 @@ func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) // CSIPluginsByIDPrefix supports search func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("csi_plugins", "id_prefix", pluginID) if err != nil { @@ -2368,7 +2382,7 @@ func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (m // CSIPluginByID returns the one named CSIPlugin func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() raw, err := txn.First("csi_plugins", "id_prefix", id) @@ -2418,7 +2432,7 @@ func (s *StateStore) CSIPluginDenormalize(ws memdb.WatchSet, plug *structs.CSIPl // is currently no raft message for this, as it's intended to support // testing use cases. func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() existing, err := txn.First("csi_plugins", "id", plug.ID) @@ -2444,7 +2458,7 @@ func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) erro // DeleteCSIPlugin deletes the plugin if it's not in use. func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() ws := memdb.NewWatchSet() @@ -2475,7 +2489,7 @@ func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error { // UpsertPeriodicLaunch is used to register a launch or update it. func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Check if the job already exists @@ -2507,7 +2521,7 @@ func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.Periodic // DeletePeriodicLaunch is used to delete the periodic launch func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() err := s.DeletePeriodicLaunchTxn(index, namespace, jobID, txn) @@ -2543,7 +2557,7 @@ func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID stri // PeriodicLaunchByID is used to lookup a periodic launch by the periodic job // ID. func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string) (*structs.PeriodicLaunch, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("periodic_launch", "id", namespace, id) if err != nil { @@ -2560,7 +2574,7 @@ func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string) // PeriodicLaunches returns an iterator over all the periodic launches func (s *StateStore) PeriodicLaunches(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("periodic_launch", "id") @@ -2575,7 +2589,7 @@ func (s *StateStore) PeriodicLaunches(ws memdb.WatchSet) (memdb.ResultIterator, // UpsertEvals is used to upsert a set of evaluations func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() err := s.UpsertEvalsTxn(index, evals, txn) @@ -2611,7 +2625,7 @@ func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, t } // nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction -func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error { +func (s *StateStore) nestedUpsertEval(txn *txn, index uint64, eval *structs.Evaluation) error { // Lookup the evaluation existing, err := txn.First("evals", "id", eval.ID) if err != nil { @@ -2702,7 +2716,7 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct // updateEvalModifyIndex is used to update the modify index of an evaluation that has been // through a scheduler pass. This is done as part of plan apply. It ensures that when a subsequent // scheduler workers process a re-queued evaluation it sees any partial updates from the plan apply. -func (s *StateStore) updateEvalModifyIndex(txn *memdb.Txn, index uint64, evalID string) error { +func (s *StateStore) updateEvalModifyIndex(txn *txn, index uint64, evalID string) error { // Lookup the evaluation existing, err := txn.First("evals", "id", evalID) if err != nil { @@ -2728,7 +2742,7 @@ func (s *StateStore) updateEvalModifyIndex(txn *memdb.Txn, index uint64, evalID // DeleteEval is used to delete an evaluation func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() jobs := make(map[structs.NamespacedID]string, len(evals)) @@ -2784,7 +2798,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e // EvalByID is used to lookup an eval by its ID func (s *StateStore) EvalByID(ws memdb.WatchSet, id string) (*structs.Evaluation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("evals", "id", id) if err != nil { @@ -2802,7 +2816,7 @@ func (s *StateStore) EvalByID(ws memdb.WatchSet, id string) (*structs.Evaluation // EvalsByIDPrefix is used to lookup evaluations by prefix in a particular // namespace func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over all evals by the id prefix iter, err := txn.Get("evals", "id_prefix", id) @@ -2832,7 +2846,7 @@ func evalNamespaceFilter(namespace string) func(interface{}) bool { // EvalsByJob returns all the evaluations by job id func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*structs.Evaluation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the node allocations iter, err := txn.Get("evals", "job_prefix", namespace, jobID) @@ -2863,7 +2877,7 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]* // Evals returns an iterator over all the evaluations func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("evals", "id") @@ -2879,7 +2893,7 @@ func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) { // EvalsByNamespace returns an iterator over all the evaluations in the given // namespace func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("evals", "namespace", namespace) @@ -2898,7 +2912,7 @@ func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memd // the desired state comes from the schedulers, while the actual state comes // from clients. func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allocation) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Handle each of the updated allocations @@ -2918,7 +2932,7 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo } // nestedUpdateAllocFromClient is used to nest an update of an allocation with client status -func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, alloc *structs.Allocation) error { +func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *structs.Allocation) error { // Look for existing alloc existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { @@ -3007,7 +3021,7 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a // UpsertAllocs is used to evict a set of allocations and allocate new ones at // the same time. func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.upsertAllocsImpl(index, allocs, txn); err != nil { return err @@ -3018,7 +3032,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er // upsertAllocs is the actual implementation of UpsertAllocs so that it may be // used with an existing transaction. -func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation, txn *memdb.Txn) error { +func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation, txn *txn) error { // Handle the allocations jobs := make(map[structs.NamespacedID]string, 1) for _, alloc := range allocs { @@ -3142,7 +3156,7 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Handle each of the updated allocations @@ -3170,7 +3184,7 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[str // nestedUpdateAllocDesiredTransition is used to nest an update of an // allocations desired transition func (s *StateStore) nestedUpdateAllocDesiredTransition( - txn *memdb.Txn, index uint64, allocID string, + txn *txn, index uint64, allocID string, transition *structs.DesiredTransition) error { // Look for existing alloc @@ -3204,7 +3218,7 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition( // AllocByID is used to lookup an allocation by its ID func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("allocs", "id", id) if err != nil { @@ -3221,7 +3235,7 @@ func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocatio // AllocsByIDPrefix is used to lookup allocs by prefix func (s *StateStore) AllocsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("allocs", "id_prefix", id) if err != nil { @@ -3250,7 +3264,7 @@ func allocNamespaceFilter(namespace string) func(interface{}) bool { // AllocsByIDPrefix is used to lookup allocs by prefix func (s *StateStore) AllocsByIDPrefixInNSes(ws memdb.WatchSet, namespaces map[string]bool, prefix string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() var iter memdb.ResultIterator var err error @@ -3282,7 +3296,7 @@ func (s *StateStore) AllocsByIDPrefixInNSes(ws memdb.WatchSet, namespaces map[st // AllocsByNode returns all the allocations by node func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the node allocations, using only the // node prefix which ignores the terminal status @@ -3306,7 +3320,7 @@ func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Al // AllocsByNode returns all the allocations by node and terminal status func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the node allocations iter, err := txn.Get("allocs", "node", node, terminal) @@ -3329,7 +3343,7 @@ func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, termin // AllocsByJob returns allocations by job id func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, anyCreateIndex bool) ([]*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get the job var job *structs.Job @@ -3370,7 +3384,7 @@ func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, any // AllocsByEval returns all the allocations by eval id func (s *StateStore) AllocsByEval(ws memdb.WatchSet, evalID string) ([]*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the eval allocations iter, err := txn.Get("allocs", "eval", evalID) @@ -3393,7 +3407,7 @@ func (s *StateStore) AllocsByEval(ws memdb.WatchSet, evalID string) ([]*structs. // AllocsByDeployment returns all the allocations by deployment id func (s *StateStore) AllocsByDeployment(ws memdb.WatchSet, deploymentID string) ([]*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the deployments allocations iter, err := txn.Get("allocs", "deployment", deploymentID) @@ -3416,7 +3430,7 @@ func (s *StateStore) AllocsByDeployment(ws memdb.WatchSet, deploymentID string) // Allocs returns an iterator over all the evaluations func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("allocs", "id") @@ -3432,13 +3446,13 @@ func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) { // AllocsByNamespace returns an iterator over all the allocations in the // namespace func (s *StateStore) AllocsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.allocsByNamespaceImpl(ws, txn, namespace) } // allocsByNamespaceImpl returns an iterator over all the allocations in the // namespace -func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *memdb.Txn, namespace string) (memdb.ResultIterator, error) { +func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *txn, namespace string) (memdb.ResultIterator, error) { // Walk the entire table iter, err := txn.Get("allocs", "namespace", namespace) if err != nil { @@ -3452,7 +3466,7 @@ func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *memdb.Txn, na // UpsertVaultAccessors is used to register a set of Vault Accessors func (s *StateStore) UpsertVaultAccessor(index uint64, accessors []*structs.VaultAccessor) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, accessor := range accessors { @@ -3475,7 +3489,7 @@ func (s *StateStore) UpsertVaultAccessor(index uint64, accessors []*structs.Vaul // DeleteVaultAccessors is used to delete a set of Vault Accessors func (s *StateStore) DeleteVaultAccessors(index uint64, accessors []*structs.VaultAccessor) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Lookup the accessor @@ -3496,7 +3510,7 @@ func (s *StateStore) DeleteVaultAccessors(index uint64, accessors []*structs.Vau // VaultAccessor returns the given Vault accessor func (s *StateStore) VaultAccessor(ws memdb.WatchSet, accessor string) (*structs.VaultAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("vault_accessors", "id", accessor) if err != nil { @@ -3514,7 +3528,7 @@ func (s *StateStore) VaultAccessor(ws memdb.WatchSet, accessor string) (*structs // VaultAccessors returns an iterator of Vault accessors. func (s *StateStore) VaultAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("vault_accessors", "id") if err != nil { @@ -3528,7 +3542,7 @@ func (s *StateStore) VaultAccessors(ws memdb.WatchSet) (memdb.ResultIterator, er // VaultAccessorsByAlloc returns all the Vault accessors by alloc id func (s *StateStore) VaultAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.VaultAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the accessors iter, err := txn.Get("vault_accessors", "alloc_id", allocID) @@ -3551,7 +3565,7 @@ func (s *StateStore) VaultAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([ // VaultAccessorsByNode returns all the Vault accessors by node id func (s *StateStore) VaultAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.VaultAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the accessors iter, err := txn.Get("vault_accessors", "node_id", nodeID) @@ -3583,7 +3597,7 @@ const siTokenAccessorTable = "si_token_accessors" // UpsertSITokenAccessors is used to register a set of Service Identity token accessors. func (s *StateStore) UpsertSITokenAccessors(index uint64, accessors []*structs.SITokenAccessor) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, accessor := range accessors { @@ -3607,7 +3621,7 @@ func (s *StateStore) UpsertSITokenAccessors(index uint64, accessors []*structs.S // DeleteSITokenAccessors is used to delete a set of Service Identity token accessors. func (s *StateStore) DeleteSITokenAccessors(index uint64, accessors []*structs.SITokenAccessor) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Lookup each accessor @@ -3629,7 +3643,7 @@ func (s *StateStore) DeleteSITokenAccessors(index uint64, accessors []*structs.S // SITokenAccessor returns the given Service Identity token accessor. func (s *StateStore) SITokenAccessor(ws memdb.WatchSet, accessorID string) (*structs.SITokenAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() watchCh, existing, err := txn.FirstWatch(siTokenAccessorTable, "id", accessorID) @@ -3648,7 +3662,7 @@ func (s *StateStore) SITokenAccessor(ws memdb.WatchSet, accessorID string) (*str // SITokenAccessors returns an iterator of Service Identity token accessors. func (s *StateStore) SITokenAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() iter, err := txn.Get(siTokenAccessorTable, "id") @@ -3663,7 +3677,7 @@ func (s *StateStore) SITokenAccessors(ws memdb.WatchSet) (memdb.ResultIterator, // SITokenAccessorsByAlloc returns all the Service Identity token accessors by alloc ID. func (s *StateStore) SITokenAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.SITokenAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() // Get an iterator over the accessors @@ -3684,7 +3698,7 @@ func (s *StateStore) SITokenAccessorsByAlloc(ws memdb.WatchSet, allocID string) // SITokenAccessorsByNode returns all the Service Identity token accessors by node ID. func (s *StateStore) SITokenAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.SITokenAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() // Get an iterator over the accessors @@ -3706,7 +3720,7 @@ func (s *StateStore) SITokenAccessorsByNode(ws memdb.WatchSet, nodeID string) ([ // UpdateDeploymentStatus is used to make deployment status updates and // potentially make a evaluation func (s *StateStore) UpdateDeploymentStatus(index uint64, req *structs.DeploymentStatusUpdateRequest) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.updateDeploymentStatusImpl(index, req.DeploymentUpdate, txn); err != nil { @@ -3732,7 +3746,7 @@ func (s *StateStore) UpdateDeploymentStatus(index uint64, req *structs.Deploymen } // updateDeploymentStatusImpl is used to make deployment status updates -func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.DeploymentStatusUpdate, txn *memdb.Txn) error { +func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.DeploymentStatusUpdate, txn *txn) error { // Retrieve deployment ws := memdb.NewWatchSet() deployment, err := s.deploymentByIDImpl(ws, u.DeploymentID, txn) @@ -3773,7 +3787,7 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym // UpdateJobStability updates the stability of the given job and version to the // desired status. func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, jobVersion uint64, stable bool) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.updateJobStabilityImpl(index, namespace, jobID, jobVersion, stable, txn); err != nil { @@ -3785,7 +3799,7 @@ func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, j } // updateJobStabilityImpl updates the stability of the given job and version -func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *memdb.Txn) error { +func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *txn) error { // Get the job that is referenced job, err := s.jobByIDAndVersionImpl(nil, namespace, jobID, jobVersion, txn) if err != nil { @@ -3810,7 +3824,7 @@ func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID strin // UpdateDeploymentPromotion is used to promote canaries in a deployment and // potentially make a evaluation func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyDeploymentPromoteRequest) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Retrieve deployment and ensure it is not terminal and is active @@ -3953,7 +3967,7 @@ func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyD // UpdateDeploymentAllocHealth is used to update the health of allocations as // part of the deployment and potentially make a evaluation func (s *StateStore) UpdateDeploymentAllocHealth(index uint64, req *structs.ApplyDeploymentAllocHealthRequest) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Retrieve deployment and ensure it is not terminal and is active @@ -4074,7 +4088,7 @@ func (s *StateStore) LatestIndex() (uint64, error) { // Index finds the matching index value func (s *StateStore) Index(name string) (uint64, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Lookup the first matching index out, err := txn.First("index", "id", name) @@ -4087,22 +4101,9 @@ func (s *StateStore) Index(name string) (uint64, error) { return out.(*IndexEntry).Value, nil } -// RemoveIndex is a helper method to remove an index for testing purposes -func (s *StateStore) RemoveIndex(name string) error { - txn := s.db.Txn(true) - defer txn.Abort() - - if _, err := txn.DeleteAll("index", "id", name); err != nil { - return err - } - - txn.Commit() - return nil -} - // Indexes returns an iterator over all the indexes func (s *StateStore) Indexes() (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire nodes table iter, err := txn.Get("index", "id") @@ -4115,7 +4116,7 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) { // ReconcileJobSummaries re-creates summaries for all jobs present in the state // store func (s *StateStore) ReconcileJobSummaries(index uint64) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Get all the jobs @@ -4274,7 +4275,7 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { // setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID. // It takes a map of job IDs to an optional forceStatus string. It returns an // error if the job doesn't exist or setJobStatus fails. -func (s *StateStore) setJobStatuses(index uint64, txn *memdb.Txn, +func (s *StateStore) setJobStatuses(index uint64, txn *txn, jobs map[structs.NamespacedID]string, evalDelete bool) error { for tuple, forceStatus := range jobs { @@ -4300,7 +4301,7 @@ func (s *StateStore) setJobStatuses(index uint64, txn *memdb.Txn, // called because an evaluation is being deleted (potentially because of garbage // collection). If forceStatus is non-empty, the job's status will be set to the // passed status. -func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, +func (s *StateStore) setJobStatus(index uint64, txn *txn, job *structs.Job, evalDelete bool, forceStatus string) error { // Capture the current status so we can check if there is a change @@ -4399,7 +4400,7 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, return nil } -func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) { +func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) (string, error) { // System, Periodic and Parameterized jobs are running until explicitly // stopped if job.Type == structs.JobTypeSystem || job.IsParameterized() || job.IsPeriodic() { @@ -4456,7 +4457,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b // updateSummaryWithJob creates or updates job summaries when new jobs are // upserted or existing ones are updated func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, - txn *memdb.Txn) error { + txn *txn) error { // Update the job summary summaryRaw, err := txn.First("job_summary", "id", job.Namespace, job.ID) @@ -4511,7 +4512,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, // updateJobScalingPolicies upserts any scaling policies contained in the job and removes // any previous scaling policies that were removed from the job -func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error { ws := memdb.NewWatchSet() @@ -4557,7 +4558,7 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx } // updateJobCSIPlugins runs on job update, and indexes the job in the plugin -func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, txn *txn) error { ws := memdb.NewWatchSet() plugIns := make(map[string]*structs.CSIPlugin) @@ -4623,7 +4624,7 @@ func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, t // updateDeploymentWithAlloc is used to update the deployment state associated // with the given allocation. The passed alloc may be updated if the deployment // status has changed to capture the modify index at which it has changed. -func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *structs.Allocation, txn *memdb.Txn) error { +func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *structs.Allocation, txn *txn) error { // Nothing to do if the allocation is not associated with a deployment if alloc.DeploymentID == "" { return nil @@ -4730,7 +4731,7 @@ func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *st // updateSummaryWithAlloc updates the job summary when allocations are updated // or inserted func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation, - existingAlloc *structs.Allocation, txn *memdb.Txn) error { + existingAlloc *structs.Allocation, txn *txn) error { // We don't have to update the summary if the job is missing if alloc.Job == nil { @@ -4849,7 +4850,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat // updatePluginWithAlloc updates the CSI plugins for an alloc when the // allocation is updated or inserted with a terminal server status. func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation, - txn *memdb.Txn) error { + txn *txn) error { if !alloc.ServerTerminalStatus() { return nil } @@ -4886,7 +4887,7 @@ func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocati // updatePluginWithJobSummary updates the CSI plugins for a job when the // job summary is updated by an alloc func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.JobSummary, alloc *structs.Allocation, - txn *memdb.Txn) error { + txn *txn) error { ws := memdb.NewWatchSet() tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) @@ -4920,7 +4921,7 @@ func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.J // UpsertACLPolicies is used to create or update a set of ACL policies func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, policy := range policies { @@ -4962,7 +4963,7 @@ func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPoli // DeleteACLPolicies deletes the policies with the given names func (s *StateStore) DeleteACLPolicies(index uint64, names []string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Delete the policy @@ -4980,7 +4981,7 @@ func (s *StateStore) DeleteACLPolicies(index uint64, names []string) error { // ACLPolicyByName is used to lookup a policy by name func (s *StateStore) ACLPolicyByName(ws memdb.WatchSet, name string) (*structs.ACLPolicy, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("acl_policy", "id", name) if err != nil { @@ -4996,7 +4997,7 @@ func (s *StateStore) ACLPolicyByName(ws memdb.WatchSet, name string) (*structs.A // ACLPolicyByNamePrefix is used to lookup policies by prefix func (s *StateStore) ACLPolicyByNamePrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("acl_policy", "id_prefix", prefix) if err != nil { @@ -5009,7 +5010,7 @@ func (s *StateStore) ACLPolicyByNamePrefix(ws memdb.WatchSet, prefix string) (me // ACLPolicies returns an iterator over all the acl policies func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("acl_policy", "id") @@ -5022,7 +5023,7 @@ func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error // UpsertACLTokens is used to create or update a set of ACL tokens func (s *StateStore) UpsertACLTokens(index uint64, tokens []*structs.ACLToken) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() for _, token := range tokens { @@ -5069,7 +5070,7 @@ func (s *StateStore) UpsertACLTokens(index uint64, tokens []*structs.ACLToken) e // DeleteACLTokens deletes the tokens with the given accessor ids func (s *StateStore) DeleteACLTokens(index uint64, ids []string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Delete the tokens @@ -5091,7 +5092,7 @@ func (s *StateStore) ACLTokenByAccessorID(ws memdb.WatchSet, id string) (*struct return nil, fmt.Errorf("acl token lookup failed: missing accessor id") } - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("acl_token", "id", id) if err != nil { @@ -5111,7 +5112,7 @@ func (s *StateStore) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*st return nil, fmt.Errorf("acl token lookup failed: missing secret id") } - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("acl_token", "secret", secretID) if err != nil { @@ -5127,7 +5128,7 @@ func (s *StateStore) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*st // ACLTokenByAccessorIDPrefix is used to lookup tokens by prefix func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("acl_token", "id_prefix", prefix) if err != nil { @@ -5139,7 +5140,7 @@ func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string // ACLTokens returns an iterator over all the tokens func (s *StateStore) ACLTokens(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("acl_token", "id") @@ -5152,7 +5153,7 @@ func (s *StateStore) ACLTokens(ws memdb.WatchSet) (memdb.ResultIterator, error) // ACLTokensByGlobal returns an iterator over all the tokens filtered by global value func (s *StateStore) ACLTokensByGlobal(ws memdb.WatchSet, globalVal bool) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("acl_token", "global", globalVal) @@ -5165,7 +5166,7 @@ func (s *StateStore) ACLTokensByGlobal(ws memdb.WatchSet, globalVal bool) (memdb // CanBootstrapACLToken checks if bootstrapping is possible and returns the reset index func (s *StateStore) CanBootstrapACLToken() (bool, uint64, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Lookup the bootstrap sentinel out, err := txn.First("index", "id", "acl_token_bootstrap") @@ -5184,7 +5185,7 @@ func (s *StateStore) CanBootstrapACLToken() (bool, uint64, error) { // BootstrapACLToken is used to create an initial ACL token func (s *StateStore) BootstrapACLTokens(index, resetIndex uint64, token *structs.ACLToken) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() // Check if we have already done a bootstrap @@ -5222,7 +5223,7 @@ func (s *StateStore) BootstrapACLTokens(index, resetIndex uint64, token *structs // SchedulerConfig is used to get the current Scheduler configuration. func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) { - tx := s.db.Txn(false) + tx := s.db.ReadTxn() defer tx.Abort() // Get the scheduler config @@ -5240,18 +5241,18 @@ func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, } // SchedulerSetConfig is used to set the current Scheduler configuration. -func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerConfiguration) error { - tx := s.db.Txn(true) +func (s *StateStore) SchedulerSetConfig(index uint64, config *structs.SchedulerConfiguration) error { + tx := s.db.WriteTxn(index) defer tx.Abort() - s.schedulerSetConfigTxn(idx, tx, config) + s.schedulerSetConfigTxn(index, tx, config) tx.Commit() return nil } func (s *StateStore) ClusterMetadata(ws memdb.WatchSet) (*structs.ClusterMetadata, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() // Get the cluster metadata @@ -5269,7 +5270,7 @@ func (s *StateStore) ClusterMetadata(ws memdb.WatchSet) (*structs.ClusterMetadat } func (s *StateStore) ClusterSetMetadata(index uint64, meta *structs.ClusterMetadata) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.setClusterMetadata(txn, meta); err != nil { @@ -5283,8 +5284,8 @@ func (s *StateStore) ClusterSetMetadata(index uint64, meta *structs.ClusterMetad // WithWriteTransaction executes the passed function within a write transaction, // and returns its result. If the invocation returns no error, the transaction // is committed; otherwise, it's aborted. -func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error { - tx := s.db.Txn(true) +func (s *StateStore) WithWriteTransaction(index uint64, fn func(Txn) error) error { + tx := s.db.WriteTxn(index) defer tx.Abort() err := fn(tx) @@ -5297,8 +5298,8 @@ func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error { // SchedulerCASConfig is used to update the scheduler configuration with a // given Raft index. If the CAS index specified is not equal to the last observed index // for the config, then the call is a noop. -func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) { - tx := s.db.Txn(true) +func (s *StateStore) SchedulerCASConfig(index, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) { + tx := s.db.WriteTxn(index) defer tx.Abort() // Check for an existing config @@ -5315,13 +5316,13 @@ func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.Schedu return false, nil } - s.schedulerSetConfigTxn(idx, tx, config) + s.schedulerSetConfigTxn(index, tx, config) tx.Commit() return true, nil } -func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.SchedulerConfiguration) error { +func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *txn, config *structs.SchedulerConfiguration) error { // Check for an existing config existing, err := tx.First("scheduler_config", "id") if err != nil { @@ -5342,7 +5343,7 @@ func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *st return nil } -func (s *StateStore) setClusterMetadata(txn *memdb.Txn, meta *structs.ClusterMetadata) error { +func (s *StateStore) setClusterMetadata(txn *txn, meta *structs.ClusterMetadata) error { // Check for an existing config, if it exists, sanity check the cluster ID matches existing, err := txn.First("cluster_meta", "id") if err != nil { @@ -5367,7 +5368,7 @@ func (s *StateStore) setClusterMetadata(txn *memdb.Txn, meta *structs.ClusterMet // UpsertScalingPolicy is used to insert a new scaling policy. func (s *StateStore) UpsertScalingPolicies(index uint64, scalingPolicies []*structs.ScalingPolicy) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() if err := s.UpsertScalingPoliciesTxn(index, scalingPolicies, txn); err != nil { @@ -5380,7 +5381,7 @@ func (s *StateStore) UpsertScalingPolicies(index uint64, scalingPolicies []*stru // upsertScalingPolicy is used to insert a new scaling policy. func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*structs.ScalingPolicy, - txn *memdb.Txn) error { + txn *txn) error { hadUpdates := false @@ -5427,7 +5428,7 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s } func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error { - txn := s.db.Txn(true) + txn := s.db.WriteTxn(index) defer txn.Abort() err := s.DeleteScalingPoliciesTxn(index, ids, txn) @@ -5439,7 +5440,7 @@ func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error { } // DeleteScalingPolicies is used to delete a set of scaling policies by ID -func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *memdb.Txn) error { +func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *txn) error { if len(ids) == 0 { return nil } @@ -5469,7 +5470,7 @@ func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *m // ScalingPolicies returns an iterator over all the scaling policies func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire scaling_policy table iter, err := txn.Get("scaling_policy", "id") @@ -5483,7 +5484,7 @@ func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, e } func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("scaling_policy", "target_prefix", namespace) if err != nil { @@ -5491,7 +5492,6 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str } ws.Add(iter.WatchCh()) - filter := func(raw interface{}) bool { d, ok := raw.(*structs.ScalingPolicy) if !ok { @@ -5507,12 +5507,12 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str } func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn) } func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string, - txn *memdb.Txn) (memdb.ResultIterator, error) { + txn *txn) (memdb.ResultIterator, error) { iter, err := txn.Get("scaling_policy", "target_prefix", namespace, jobID) if err != nil { @@ -5536,7 +5536,7 @@ func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID } func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("scaling_policy", "id", id) if err != nil { @@ -5553,7 +5553,7 @@ func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.S func (s *StateStore) ScalingPolicyByTarget(ws memdb.WatchSet, target map[string]string) (*structs.ScalingPolicy, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // currently, only scaling policy type is against a task group namespace := target[structs.ScalingTargetNamespace] @@ -5665,7 +5665,7 @@ func getPreemptedAllocDesiredDescription(PreemptedByAllocID string) string { // restoring state by only using a single large transaction // instead of thousands of sub transactions type StateRestore struct { - txn *memdb.Txn + txn *txn } // Abort is used to abort the restore operation diff --git a/nomad/state/state_store_oss.go b/nomad/state/state_store_oss.go index 3ef4e600b..487f84213 100644 --- a/nomad/state/state_store_oss.go +++ b/nomad/state/state_store_oss.go @@ -3,7 +3,6 @@ package state import ( - memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" ) @@ -14,13 +13,13 @@ func (s *StateStore) enterpriseInit() error { } // namespaceExists returns whether a namespace exists -func (s *StateStore) namespaceExists(txn *memdb.Txn, namespace string) (bool, error) { +func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { return namespace == structs.DefaultNamespace, nil } // updateEntWithAlloc is used to update Nomad Enterprise objects when an allocation is // added/modified/deleted -func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *memdb.Txn) error { +func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error { return nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 4458bcaa9..dea9fc4b0 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1903,7 +1903,7 @@ func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) { // Actually delete const deletionIndex = uint64(10001) - err = state.WithWriteTransaction(func(txn Txn) error { + err = state.WithWriteTransaction(deletionIndex, func(txn Txn) error { for i, job := range jobs { err := state.DeleteJobTxn(deletionIndex, job.Namespace, job.ID, txn) require.NoError(t, err, "failed at %d %e", i, err) @@ -6053,19 +6053,20 @@ func TestStateStore_RestoreAlloc(t *testing.T) { func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) { t.Parallel() + index := uint64(0) state := testStateStore(t) - txn := state.db.Txn(true) + txn := state.db.WriteTxn(index) // Create and insert a mock job. job := mock.Job() job.Status = "" - job.ModifyIndex = 0 + job.ModifyIndex = index if err := txn.Insert("jobs", job); err != nil { t.Fatalf("job insert failed: %v", err) } exp := "foobar" - index := uint64(1000) + index = uint64(1000) if err := state.setJobStatus(index, txn, job, false, exp); err != nil { t.Fatalf("setJobStatus() failed: %v", err) } @@ -6088,8 +6089,9 @@ func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) { func TestStateStore_SetJobStatus_NoOp(t *testing.T) { t.Parallel() + index := uint64(0) state := testStateStore(t) - txn := state.db.Txn(true) + txn := state.db.WriteTxn(index) // Create and insert a mock job that should be pending. job := mock.Job() @@ -6099,7 +6101,7 @@ func TestStateStore_SetJobStatus_NoOp(t *testing.T) { t.Fatalf("job insert failed: %v", err) } - index := uint64(1000) + index = uint64(1000) if err := state.setJobStatus(index, txn, job, false, ""); err != nil { t.Fatalf("setJobStatus() failed: %v", err) } @@ -6119,7 +6121,7 @@ func TestStateStore_SetJobStatus(t *testing.T) { t.Parallel() state := testStateStore(t) - txn := state.db.Txn(true) + txn := state.db.WriteTxn(uint64(0)) // Create and insert a mock job that should be pending but has an incorrect // status. @@ -6155,7 +6157,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs(t *testing.T) { job := mock.Job() state := testStateStore(t) - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, false) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6171,7 +6173,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) { job := mock.PeriodicJob() state := testStateStore(t) - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, false) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6187,7 +6189,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) { job := mock.Job() state := testStateStore(t) - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, true) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6221,7 +6223,7 @@ func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) { t.Fatalf("err: %v", err) } - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, false) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6247,7 +6249,7 @@ func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) { t.Fatalf("err: %v", err) } - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, true) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6264,7 +6266,7 @@ func TestStateStore_GetJobStatus_PeriodicJob(t *testing.T) { state := testStateStore(t) job := mock.PeriodicJob() - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, false) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6293,7 +6295,7 @@ func TestStateStore_GetJobStatus_ParameterizedJob(t *testing.T) { job := mock.Job() job.ParameterizedJob = &structs.ParameterizedJobConfig{} - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, false) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6329,7 +6331,7 @@ func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { t.Fatalf("err: %v", err) } - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, true) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6357,7 +6359,7 @@ func TestStateStore_SetJobStatus_SystemJob(t *testing.T) { t.Fatalf("err: %v", err) } - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, true) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -8694,7 +8696,6 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { require.ElementsMatch([]string{policy2.ID}, policiesInNS2) } - func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) { t.Parallel()