diff --git a/nomad/state/autopilot.go b/nomad/state/autopilot.go index 149baaba6..ebb7e05bb 100644 --- a/nomad/state/autopilot.go +++ b/nomad/state/autopilot.go @@ -27,7 +27,7 @@ func autopilotConfigTableSchema() *memdb.TableSchema { // AutopilotConfig is used to get the current Autopilot configuration. func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) { - tx := s.db.Txn(false) + tx := s.db.ReadTxn() defer tx.Abort() // Get the autopilot config diff --git a/nomad/state/node_events.go b/nomad/state/node_events.go deleted file mode 100644 index 6a56b3a22..000000000 --- a/nomad/state/node_events.go +++ /dev/null @@ -1,175 +0,0 @@ -package state - -import ( - memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/nomad/event" - "github.com/hashicorp/nomad/nomad/structs" -) - -// NodeEvent represents a NodeEvent change on a given Node. -type NodeEvent struct { - Message string - NodeID string -} - -// NNodeDrainEvent holds information related to a Node Drain -type NodeDrainEvent struct { - NodeID string - Allocs []string - DrainStrategy structs.DrainStrategy - Message string -} - -func (s *StateStore) NodeEventsFromChanges(tx ReadTxn, changes Changes) ([]event.Event, error) { - var events []event.Event - - var nodeChanges map[string]*memdb.Change - - markNode := func(node string, nodeChange *memdb.Change) { - if nodeChanges == nil { - nodeChanges = make(map[string]*memdb.Change) - } - ch := nodeChanges[node] - if ch == nil { - nodeChanges[node] = nodeChange - } - } - - for _, change := range changes.Changes { - switch change.Table { - case "nodes": - nRaw := change.After - if change.After == nil { - nRaw = change.Before - } - n := nRaw.(*structs.Node) - changeCopy := change - markNode(n.ID, &changeCopy) - } - } - - for node, change := range nodeChanges { - if change != nil && change.Deleted() { - // TODO Node delete event - continue - } - - ne, err := s.statusEventsForNode(tx, node, change) - if err != nil { - return nil, err - } - // Rebuild node node events - events = append(events, ne...) - } - return events, nil -} - -func (s *StateStore) statusEventsForNode(tx ReadTxn, node string, change *memdb.Change) ([]event.Event, error) { - events := []event.Event{} - if change.Created() { - n := change.After.(*structs.Node) - for _, e := range n.Events { - nodeEvent := NodeEvent{Message: e.Message, NodeID: node} - e := event.Event{ - Topic: "NodeEvent", - Key: node, - Payload: nodeEvent, - } - events = append(events, e) - } - } else if change.Updated() { - nbefore := change.Before.(*structs.Node) - nafter := change.After.(*structs.Node) - newEvents := s.newNodeEvents(nbefore.Events, nafter.Events) - for _, e := range newEvents { - if s.isNodeDrainEvent(nbefore, nafter, newEvents) { - allocs, err := s.AllocsByNodeTx(tx, node) - if err != nil { - return []event.Event{}, err - } - var allocIDs []string - for _, a := range allocs { - allocIDs = append(allocIDs, a.ID) - } - - nde := NodeDrainEvent{ - NodeID: node, - DrainStrategy: *nafter.DrainStrategy, - Allocs: allocIDs, - Message: e.Message, - } - e := event.Event{ - Topic: "NodeEvent", - Key: node, - Payload: nde, - } - events = append(events, e) - } else { - - ne := NodeEvent{ - Message: e.Message, - NodeID: node, - } - e := event.Event{ - Topic: "NodeEvent", - Key: node, - Payload: ne, - } - events = append(events, e) - } - } - } - - return events, nil -} - -func (s *StateStore) newNodeEvents(before, after []*structs.NodeEvent) []*structs.NodeEvent { - events := []*structs.NodeEvent{} - if len(before) == len(after) { - return nil - } - - for _, e := range after { - found := false - for _, be := range before { - if e.String() == be.String() { - found = true - break - } - } - if !found { - events = append(events, e) - } - } - return events -} - -func (s *StateStore) isNodeDrainEvent(before, after *structs.Node, newEvents []*structs.NodeEvent) bool { - if before.Drain != after.Drain { - return true - } - - for _, e := range newEvents { - if e.Subsystem == structs.NodeEventSubsystemDrain { - return true - } - } - return false -} - -func (s *StateStore) AllocsByNodeTx(tx ReadTxn, node string) ([]*structs.Allocation, error) { - iter, err := tx.Get("allocs", "node_prefix", node) - if err != nil { - return nil, err - } - - var out []*structs.Allocation - for { - raw := iter.Next() - if raw == nil { - break - } - out = append(out, raw.(*structs.Allocation)) - } - return out, nil -} diff --git a/nomad/state/node_events_test.go b/nomad/state/node_events_test.go deleted file mode 100644 index cc6258a1c..000000000 --- a/nomad/state/node_events_test.go +++ /dev/null @@ -1,147 +0,0 @@ -package state - -import ( - "testing" - "time" - - "github.com/hashicorp/nomad/nomad/event" - "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/require" -) - -func allocID() string { return "e5bcbac313d6c-e29b-11c4-a5cd-5949157" } - -func mockNode(id string) *structs.Node { - node := mock.Node() - node.ID = id - return node -} - -func TestNodeEventsFromChanges(t *testing.T) { - cases := []struct { - Name string - Setup func(s *StateStore, index uint64) error - Mutate func(s *StateStore, tx *txn) error - WantEvents []event.Event - WantErr bool - }{ - { - Name: "new node registered", - Setup: func(s *StateStore, idx uint64) error { - req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94") - return s.UpsertNode(idx, req) - }, - Mutate: func(s *StateStore, tx *txn) error { - event := &structs.NodeEvent{ - Message: "Node ready foo", - Subsystem: structs.NodeEventSubsystemCluster, - Timestamp: time.Now(), - } - return s.updateNodeStatusTxn(tx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusReady, time.Now().UnixNano(), event) - }, - WantEvents: []event.Event{ - { - Topic: "NodeEvent", - Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94", - Payload: NodeEvent{ - Message: "Node ready foo", - NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94", - }, - }, - }, - }, - { - Name: "only new events", - Setup: func(s *StateStore, idx uint64) error { - req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94") - require.NoError(t, s.UpsertNode(idx, req)) - event := &structs.NodeEvent{ - Message: "Node foo initializing", - Subsystem: structs.NodeEventSubsystemCluster, - Timestamp: time.Now(), - } - return s.UpdateNodeStatus(idx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusInit, time.Now().UnixNano(), event) - }, - Mutate: func(s *StateStore, tx *txn) error { - event := &structs.NodeEvent{ - Message: "Node foo ready", - Subsystem: structs.NodeEventSubsystemCluster, - Timestamp: time.Now(), - } - return s.updateNodeStatusTxn(tx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusReady, time.Now().UnixNano(), event) - }, - WantEvents: []event.Event{ - { - Topic: "NodeEvent", - Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94", - Payload: NodeEvent{ - Message: "Node foo ready", - NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94", - }, - }, - }, - }, - { - Name: "node drain event", - Setup: func(s *StateStore, idx uint64) error { - - req := mockNode("8218b700-7e26-aac0-06d8-ff3b15f44e94") - require.NoError(t, s.UpsertNode(idx, req)) - event := &structs.NodeEvent{ - Message: "Node foo initializing", - Subsystem: structs.NodeEventSubsystemCluster, - Timestamp: time.Now(), - } - require.NoError(t, s.UpdateNodeStatus(idx, "8218b700-7e26-aac0-06d8-ff3b15f44e94", structs.NodeStatusInit, time.Now().UnixNano(), event)) - alloc := mock.Alloc() - alloc.NodeID = req.ID - alloc.ID = allocID() - return s.UpsertAllocs(idx, []*structs.Allocation{alloc}) - - }, - Mutate: func(s *StateStore, tx *txn) error { - event := &structs.NodeEvent{ - Subsystem: structs.NodeEventSubsystemCluster, - Timestamp: time.Now(), - } - event.SetMessage("Node drain strategy set") - event.SetSubsystem(structs.NodeEventSubsystemDrain) - drain := &structs.DrainStrategy{} - return s.updateNodeDrainImpl(tx, tx.Index, "8218b700-7e26-aac0-06d8-ff3b15f44e94", drain, false, time.Now().UnixNano(), event) - }, - WantEvents: []event.Event{ - { - Topic: "NodeEvent", - Key: "8218b700-7e26-aac0-06d8-ff3b15f44e94", - Payload: NodeDrainEvent{ - Message: "Node drain strategy set", - NodeID: "8218b700-7e26-aac0-06d8-ff3b15f44e94", - Allocs: []string{allocID()}, - }, - }, - }, - }, - } - - for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - s := testStateStore(t) - - if tc.Setup != nil { - require.NoError(t, tc.Setup(s, 10)) - } - - tx := s.db.WriteTxn(100) - require.NoError(t, tc.Mutate(s, tx)) - - got, err := s.NodeEventsFromChanges(tx, Changes{Changes: tx.Changes(), Index: 100}) - if tc.WantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - require.Equal(t, tc.WantEvents, got) - }) - } -} diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index 91c4affbe..22e32f31b 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -32,6 +32,16 @@ type changeTrackerDB struct { processChanges func(ReadTxn, Changes) ([]event.Event, error) } +func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn changeProcessor) *changeTrackerDB { + return &changeTrackerDB{ + db: db, + publisher: event.NewPublisher(), + processChanges: changesFn, + } +} + +type changeProcessor func(ReadTxn, Changes) ([]event.Event, error) + type eventPublisher interface { Publish(events []event.Event) } @@ -42,18 +52,6 @@ type noOpPublisher struct{} func (n *noOpPublisher) Publish(events []event.Event) {} func noOpProcessChanges(ReadTxn, Changes) ([]event.Event, error) { return []event.Event{}, nil } -// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting -// code may use it to create a read-only transaction, but it will panic if called -// with write=true. -// -// Deprecated: use either ReadTxn, or WriteTxn. -func (c *changeTrackerDB) Txn(write bool) *txn { - if write { - panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)") - } - return c.ReadTxn() -} - // ReadTxn returns a read-only transaction which behaves exactly the same as // memdb.Txn // diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 2c8a4af9f..fb905fce0 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -80,11 +80,7 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { config: config, abandonCh: make(chan struct{}), } - s.db = &changeTrackerDB{ - db: db, - publisher: event.NewPublisher(), - processChanges: processDBChanges, - } + s.db = NewChangeTrackerDB(db, event.NewPublisher(), processDBChanges) // Initialize the state store with required enterprise objects if err := s.enterpriseInit(); err != nil { @@ -109,12 +105,9 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { logger: s.logger, config: s.config, } - // create a changeTrackerDB that doesn't process changes - store.db = &changeTrackerDB{ - db: memDBSnap, - publisher: &noOpPublisher{}, - processChanges: noOpProcessChanges, - } + + store.db = NewChangeTrackerDB(memDBSnap, &noOpPublisher{}, noOpProcessChanges) + snap := &StateSnapshot{ StateStore: store, } @@ -470,7 +463,7 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl } func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire deployments table iter, err := txn.Get("deployment", "id") @@ -483,7 +476,7 @@ func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error } func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire deployments table iter, err := txn.Get("deployment", "namespace", namespace) @@ -496,7 +489,7 @@ func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) } func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire deployments table iter, err := txn.Get("deployment", "id_prefix", deploymentID) @@ -525,7 +518,7 @@ func deploymentNamespaceFilter(namespace string) func(interface{}) bool { } func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*structs.Deployment, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.deploymentByIDImpl(ws, deploymentID, txn) } @@ -544,7 +537,7 @@ func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, } func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Deployment, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() var job *structs.Job // Read job from state store @@ -587,7 +580,7 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID stri // LatestDeploymentByJobID returns the latest deployment for the given job. The // latest is determined strictly by CreateIndex. func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the deployments iter, err := txn.Get("deployment", "job", namespace, jobID) @@ -700,7 +693,7 @@ func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventR // ScalingEvents returns an iterator over all the job scaling events func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire scaling_event table iter, err := txn.Get("scaling_event", "id") @@ -714,7 +707,7 @@ func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, err } func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, uint64, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("scaling_event", "id", namespace, jobID) if err != nil { @@ -1325,7 +1318,7 @@ func (s *StateStore) deleteJobFromPlugins(index uint64, txn *txn, job *structs.J // NodeByID is used to lookup a node by ID func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("nodes", "id", nodeID) if err != nil { @@ -1341,7 +1334,7 @@ func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, // NodesByIDPrefix is used to lookup nodes by prefix func (s *StateStore) NodesByIDPrefix(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("nodes", "id_prefix", nodeID) if err != nil { @@ -1354,7 +1347,7 @@ func (s *StateStore) NodesByIDPrefix(ws memdb.WatchSet, nodeID string) (memdb.Re // NodeBySecretID is used to lookup a node by SecretID func (s *StateStore) NodeBySecretID(ws memdb.WatchSet, secretID string) (*structs.Node, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("nodes", "secret_id", secretID) if err != nil { @@ -1370,7 +1363,7 @@ func (s *StateStore) NodeBySecretID(ws memdb.WatchSet, secretID string) (*struct // Nodes returns an iterator over all the nodes func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire nodes table iter, err := txn.Get("nodes", "id") @@ -1721,7 +1714,7 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) // JobByID is used to lookup a job by its ID. JobByID returns the current/latest job // version. func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.JobByIDTxn(ws, namespace, id, txn) } @@ -1742,7 +1735,7 @@ func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn // JobsByIDPrefix is used to lookup a job by prefix func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("jobs", "id_prefix", namespace, id) if err != nil { @@ -1756,7 +1749,7 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (me // JobVersionsByID returns all the tracked versions of a job. func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.jobVersionByID(txn, &ws, namespace, id) } @@ -1802,7 +1795,7 @@ func (s *StateStore) jobVersionByID(txn *txn, ws *memdb.WatchSet, namespace, id // JobByIDAndVersion returns the job identified by its ID and Version. The // passed watchset may be nil. func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.jobByIDAndVersionImpl(ws, namespace, id, version, txn) } @@ -1829,7 +1822,7 @@ func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id stri } func (s *StateStore) JobVersions(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire deployments table iter, err := txn.Get("job_version", "id") @@ -1843,7 +1836,7 @@ func (s *StateStore) JobVersions(ws memdb.WatchSet) (memdb.ResultIterator, error // Jobs returns an iterator over all the jobs func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire jobs table iter, err := txn.Get("jobs", "id") @@ -1858,7 +1851,7 @@ func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) { // JobsByNamespace returns an iterator over all the jobs for the given namespace func (s *StateStore) JobsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.jobsByNamespaceImpl(ws, namespace, txn) } @@ -1877,7 +1870,7 @@ func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, tx // JobsByPeriodic returns an iterator over all the periodic or non-periodic jobs. func (s *StateStore) JobsByPeriodic(ws memdb.WatchSet, periodic bool) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("jobs", "periodic", periodic) if err != nil { @@ -1892,7 +1885,7 @@ func (s *StateStore) JobsByPeriodic(ws memdb.WatchSet, periodic bool) (memdb.Res // JobsByScheduler returns an iterator over all the jobs with the specific // scheduler type. func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Return an iterator for jobs with the specific type. iter, err := txn.Get("jobs", "type", schedulerType) @@ -1908,7 +1901,7 @@ func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (m // JobsByGC returns an iterator over all jobs eligible or uneligible for garbage // collection. func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("jobs", "gc", gc) if err != nil { @@ -1922,7 +1915,7 @@ func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, // JobSummary returns a job summary object which matches a specific id. func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("job_summary", "id", namespace, jobID) if err != nil { @@ -1942,7 +1935,7 @@ func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) // JobSummaries walks the entire job summary table and returns all the job // summary objects func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("job_summary", "id") if err != nil { @@ -1956,7 +1949,7 @@ func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, erro // JobSummaryByPrefix is used to look up Job Summary by id prefix func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("job_summary", "id_prefix", namespace, id) if err != nil { @@ -2020,7 +2013,7 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum // CSIVolumes returns the unfiltered list of all volumes func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() iter, err := txn.Get("csi_volumes", "id") @@ -2036,7 +2029,7 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) // CSIVolumeByID is used to lookup a single volume. Returns a copy of the volume // because its plugins are denormalized to provide accurate Health. func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, obj, err := txn.FirstWatch("csi_volumes", "id_prefix", namespace, id) if err != nil { @@ -2054,7 +2047,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st // CSIVolumes looks up csi_volumes by pluginID func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "plugin_id", pluginID) if err != nil { @@ -2076,7 +2069,7 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, pluginID // CSIVolumesByIDPrefix supports search func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID) if err != nil { @@ -2115,7 +2108,7 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb // Lookup the raw CSIVolumes to match the other list interfaces iter := NewSliceIterator() - txn := s.db.Txn(false) + txn := s.db.ReadTxn() for id, namespace := range ids { raw, err := txn.First("csi_volumes", "id", namespace, id) if err != nil { @@ -2129,7 +2122,7 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, nodeID string) (memdb // CSIVolumesByNamespace looks up the entire csi_volumes table func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "id_prefix", namespace, "") if err != nil { @@ -2282,7 +2275,7 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs return nil, nil } // Lookup CSIPlugin, the health records, and calculate volume health - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() plug, err := s.CSIPluginByID(ws, vol.PluginID) @@ -2360,7 +2353,7 @@ func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVol // CSIPlugins returns the unfiltered list of all plugin health status func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() iter, err := txn.Get("csi_plugins", "id") @@ -2375,7 +2368,7 @@ func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) // CSIPluginsByIDPrefix supports search func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("csi_plugins", "id_prefix", pluginID) if err != nil { @@ -2389,7 +2382,7 @@ func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (m // CSIPluginByID returns the one named CSIPlugin func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() raw, err := txn.First("csi_plugins", "id_prefix", id) @@ -2564,7 +2557,7 @@ func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID stri // PeriodicLaunchByID is used to lookup a periodic launch by the periodic job // ID. func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string) (*structs.PeriodicLaunch, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("periodic_launch", "id", namespace, id) if err != nil { @@ -2581,7 +2574,7 @@ func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string) // PeriodicLaunches returns an iterator over all the periodic launches func (s *StateStore) PeriodicLaunches(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("periodic_launch", "id") @@ -2805,7 +2798,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e // EvalByID is used to lookup an eval by its ID func (s *StateStore) EvalByID(ws memdb.WatchSet, id string) (*structs.Evaluation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("evals", "id", id) if err != nil { @@ -2823,7 +2816,7 @@ func (s *StateStore) EvalByID(ws memdb.WatchSet, id string) (*structs.Evaluation // EvalsByIDPrefix is used to lookup evaluations by prefix in a particular // namespace func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over all evals by the id prefix iter, err := txn.Get("evals", "id_prefix", id) @@ -2853,7 +2846,7 @@ func evalNamespaceFilter(namespace string) func(interface{}) bool { // EvalsByJob returns all the evaluations by job id func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*structs.Evaluation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the node allocations iter, err := txn.Get("evals", "job_prefix", namespace, jobID) @@ -2884,7 +2877,7 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]* // Evals returns an iterator over all the evaluations func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("evals", "id") @@ -2900,7 +2893,7 @@ func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) { // EvalsByNamespace returns an iterator over all the evaluations in the given // namespace func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("evals", "namespace", namespace) @@ -3225,7 +3218,7 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition( // AllocByID is used to lookup an allocation by its ID func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("allocs", "id", id) if err != nil { @@ -3242,7 +3235,7 @@ func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocatio // AllocsByIDPrefix is used to lookup allocs by prefix func (s *StateStore) AllocsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("allocs", "id_prefix", id) if err != nil { @@ -3271,7 +3264,7 @@ func allocNamespaceFilter(namespace string) func(interface{}) bool { // AllocsByIDPrefix is used to lookup allocs by prefix func (s *StateStore) AllocsByIDPrefixInNSes(ws memdb.WatchSet, namespaces map[string]bool, prefix string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() var iter memdb.ResultIterator var err error @@ -3303,7 +3296,7 @@ func (s *StateStore) AllocsByIDPrefixInNSes(ws memdb.WatchSet, namespaces map[st // AllocsByNode returns all the allocations by node func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the node allocations, using only the // node prefix which ignores the terminal status @@ -3327,7 +3320,7 @@ func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Al // AllocsByNode returns all the allocations by node and terminal status func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the node allocations iter, err := txn.Get("allocs", "node", node, terminal) @@ -3350,7 +3343,7 @@ func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, termin // AllocsByJob returns allocations by job id func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, anyCreateIndex bool) ([]*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get the job var job *structs.Job @@ -3391,7 +3384,7 @@ func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, any // AllocsByEval returns all the allocations by eval id func (s *StateStore) AllocsByEval(ws memdb.WatchSet, evalID string) ([]*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the eval allocations iter, err := txn.Get("allocs", "eval", evalID) @@ -3414,7 +3407,7 @@ func (s *StateStore) AllocsByEval(ws memdb.WatchSet, evalID string) ([]*structs. // AllocsByDeployment returns all the allocations by deployment id func (s *StateStore) AllocsByDeployment(ws memdb.WatchSet, deploymentID string) ([]*structs.Allocation, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the deployments allocations iter, err := txn.Get("allocs", "deployment", deploymentID) @@ -3437,7 +3430,7 @@ func (s *StateStore) AllocsByDeployment(ws memdb.WatchSet, deploymentID string) // Allocs returns an iterator over all the evaluations func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("allocs", "id") @@ -3453,7 +3446,7 @@ func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) { // AllocsByNamespace returns an iterator over all the allocations in the // namespace func (s *StateStore) AllocsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.allocsByNamespaceImpl(ws, txn, namespace) } @@ -3517,7 +3510,7 @@ func (s *StateStore) DeleteVaultAccessors(index uint64, accessors []*structs.Vau // VaultAccessor returns the given Vault accessor func (s *StateStore) VaultAccessor(ws memdb.WatchSet, accessor string) (*structs.VaultAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("vault_accessors", "id", accessor) if err != nil { @@ -3535,7 +3528,7 @@ func (s *StateStore) VaultAccessor(ws memdb.WatchSet, accessor string) (*structs // VaultAccessors returns an iterator of Vault accessors. func (s *StateStore) VaultAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("vault_accessors", "id") if err != nil { @@ -3549,7 +3542,7 @@ func (s *StateStore) VaultAccessors(ws memdb.WatchSet) (memdb.ResultIterator, er // VaultAccessorsByAlloc returns all the Vault accessors by alloc id func (s *StateStore) VaultAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.VaultAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the accessors iter, err := txn.Get("vault_accessors", "alloc_id", allocID) @@ -3572,7 +3565,7 @@ func (s *StateStore) VaultAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([ // VaultAccessorsByNode returns all the Vault accessors by node id func (s *StateStore) VaultAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.VaultAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Get an iterator over the accessors iter, err := txn.Get("vault_accessors", "node_id", nodeID) @@ -3650,7 +3643,7 @@ func (s *StateStore) DeleteSITokenAccessors(index uint64, accessors []*structs.S // SITokenAccessor returns the given Service Identity token accessor. func (s *StateStore) SITokenAccessor(ws memdb.WatchSet, accessorID string) (*structs.SITokenAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() watchCh, existing, err := txn.FirstWatch(siTokenAccessorTable, "id", accessorID) @@ -3669,7 +3662,7 @@ func (s *StateStore) SITokenAccessor(ws memdb.WatchSet, accessorID string) (*str // SITokenAccessors returns an iterator of Service Identity token accessors. func (s *StateStore) SITokenAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() iter, err := txn.Get(siTokenAccessorTable, "id") @@ -3684,7 +3677,7 @@ func (s *StateStore) SITokenAccessors(ws memdb.WatchSet) (memdb.ResultIterator, // SITokenAccessorsByAlloc returns all the Service Identity token accessors by alloc ID. func (s *StateStore) SITokenAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.SITokenAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() // Get an iterator over the accessors @@ -3705,7 +3698,7 @@ func (s *StateStore) SITokenAccessorsByAlloc(ws memdb.WatchSet, allocID string) // SITokenAccessorsByNode returns all the Service Identity token accessors by node ID. func (s *StateStore) SITokenAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.SITokenAccessor, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() // Get an iterator over the accessors @@ -4095,7 +4088,7 @@ func (s *StateStore) LatestIndex() (uint64, error) { // Index finds the matching index value func (s *StateStore) Index(name string) (uint64, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Lookup the first matching index out, err := txn.First("index", "id", name) @@ -4110,7 +4103,7 @@ func (s *StateStore) Index(name string) (uint64, error) { // Indexes returns an iterator over all the indexes func (s *StateStore) Indexes() (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire nodes table iter, err := txn.Get("index", "id") @@ -4988,7 +4981,7 @@ func (s *StateStore) DeleteACLPolicies(index uint64, names []string) error { // ACLPolicyByName is used to lookup a policy by name func (s *StateStore) ACLPolicyByName(ws memdb.WatchSet, name string) (*structs.ACLPolicy, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("acl_policy", "id", name) if err != nil { @@ -5004,7 +4997,7 @@ func (s *StateStore) ACLPolicyByName(ws memdb.WatchSet, name string) (*structs.A // ACLPolicyByNamePrefix is used to lookup policies by prefix func (s *StateStore) ACLPolicyByNamePrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("acl_policy", "id_prefix", prefix) if err != nil { @@ -5017,7 +5010,7 @@ func (s *StateStore) ACLPolicyByNamePrefix(ws memdb.WatchSet, prefix string) (me // ACLPolicies returns an iterator over all the acl policies func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("acl_policy", "id") @@ -5099,7 +5092,7 @@ func (s *StateStore) ACLTokenByAccessorID(ws memdb.WatchSet, id string) (*struct return nil, fmt.Errorf("acl token lookup failed: missing accessor id") } - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("acl_token", "id", id) if err != nil { @@ -5119,7 +5112,7 @@ func (s *StateStore) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*st return nil, fmt.Errorf("acl token lookup failed: missing secret id") } - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("acl_token", "secret", secretID) if err != nil { @@ -5135,7 +5128,7 @@ func (s *StateStore) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*st // ACLTokenByAccessorIDPrefix is used to lookup tokens by prefix func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("acl_token", "id_prefix", prefix) if err != nil { @@ -5147,7 +5140,7 @@ func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string // ACLTokens returns an iterator over all the tokens func (s *StateStore) ACLTokens(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("acl_token", "id") @@ -5160,7 +5153,7 @@ func (s *StateStore) ACLTokens(ws memdb.WatchSet) (memdb.ResultIterator, error) // ACLTokensByGlobal returns an iterator over all the tokens filtered by global value func (s *StateStore) ACLTokensByGlobal(ws memdb.WatchSet, globalVal bool) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire table iter, err := txn.Get("acl_token", "global", globalVal) @@ -5173,7 +5166,7 @@ func (s *StateStore) ACLTokensByGlobal(ws memdb.WatchSet, globalVal bool) (memdb // CanBootstrapACLToken checks if bootstrapping is possible and returns the reset index func (s *StateStore) CanBootstrapACLToken() (bool, uint64, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Lookup the bootstrap sentinel out, err := txn.First("index", "id", "acl_token_bootstrap") @@ -5230,7 +5223,7 @@ func (s *StateStore) BootstrapACLTokens(index, resetIndex uint64, token *structs // SchedulerConfig is used to get the current Scheduler configuration. func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) { - tx := s.db.Txn(false) + tx := s.db.ReadTxn() defer tx.Abort() // Get the scheduler config @@ -5259,7 +5252,7 @@ func (s *StateStore) SchedulerSetConfig(index uint64, config *structs.SchedulerC } func (s *StateStore) ClusterMetadata(ws memdb.WatchSet) (*structs.ClusterMetadata, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() defer txn.Abort() // Get the cluster metadata @@ -5477,7 +5470,7 @@ func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *t // ScalingPolicies returns an iterator over all the scaling policies func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // Walk the entire scaling_policy table iter, err := txn.Get("scaling_policy", "id") @@ -5491,7 +5484,7 @@ func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, e } func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() iter, err := txn.Get("scaling_policy", "target_prefix", namespace) if err != nil { @@ -5514,7 +5507,7 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str } func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() return s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn) } @@ -5543,7 +5536,7 @@ func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID } func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() watchCh, existing, err := txn.FirstWatch("scaling_policy", "id", id) if err != nil { @@ -5560,7 +5553,7 @@ func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.S func (s *StateStore) ScalingPolicyByTarget(ws memdb.WatchSet, target map[string]string) (*structs.ScalingPolicy, error) { - txn := s.db.Txn(false) + txn := s.db.ReadTxn() // currently, only scaling policy type is against a task group namespace := target[structs.ScalingTargetNamespace] diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index cb3d94c61..dea9fc4b0 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -6157,7 +6157,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs(t *testing.T) { job := mock.Job() state := testStateStore(t) - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, false) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6173,7 +6173,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) { job := mock.PeriodicJob() state := testStateStore(t) - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, false) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6189,7 +6189,7 @@ func TestStateStore_GetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) { job := mock.Job() state := testStateStore(t) - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, true) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6223,7 +6223,7 @@ func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) { t.Fatalf("err: %v", err) } - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, false) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6249,7 +6249,7 @@ func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) { t.Fatalf("err: %v", err) } - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, true) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6266,7 +6266,7 @@ func TestStateStore_GetJobStatus_PeriodicJob(t *testing.T) { state := testStateStore(t) job := mock.PeriodicJob() - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, false) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6295,7 +6295,7 @@ func TestStateStore_GetJobStatus_ParameterizedJob(t *testing.T) { job := mock.Job() job.ParameterizedJob = &structs.ParameterizedJobConfig{} - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, false) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6331,7 +6331,7 @@ func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { t.Fatalf("err: %v", err) } - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, true) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -6359,7 +6359,7 @@ func TestStateStore_SetJobStatus_SystemJob(t *testing.T) { t.Fatalf("err: %v", err) } - txn := state.db.Txn(false) + txn := state.db.ReadTxn() status, err := state.getJobStatus(txn, job, true) if err != nil { t.Fatalf("getJobStatus() failed: %v", err) @@ -8696,7 +8696,6 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { require.ElementsMatch([]string{policy2.ID}, policiesInNS2) } - func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) { t.Parallel()