diff --git a/nomad/fsm.go b/nomad/fsm.go index 26dba94ed..77a47a6c3 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1513,10 +1513,11 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error { // Create a new state store config := &state.StateStoreConfig{ - Logger: n.config.Logger, - Region: n.config.Region, - EnablePublisher: n.config.EnableEventBroker, - EventBufferSize: n.config.EventBufferSize, + Logger: n.config.Logger, + Region: n.config.Region, + EnablePublisher: n.config.EnableEventBroker, + EventBufferSize: n.config.EventBufferSize, + JobTrackedVersions: n.config.JobTrackedVersions, } newState, err := state.NewStateStore(config) if err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 63db0b092..bc21105ae 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -105,6 +105,13 @@ type StateStoreConfig struct { JobTrackedVersions int } +func (c *StateStoreConfig) Validate() error { + if c.JobTrackedVersions <= 0 { + return fmt.Errorf("JobTrackedVersions must be positive; got: %d", c.JobTrackedVersions) + } + return nil +} + // The StateStore is responsible for maintaining all the Nomad // state. It is manipulated by the FSM which maintains consistency // through the use of Raft. The goals of the StateStore are to provide @@ -139,6 +146,10 @@ func (a *streamACLDelegate) TokenProvider() stream.ACLTokenProvider { // NewStateStore is used to create a new state store func NewStateStore(config *StateStoreConfig) (*StateStore, error) { + if err := config.Validate(); err != nil { + return nil, err + } + // Create the MemDB db, err := memdb.NewMemDB(stateStoreSchema()) if err != nil { @@ -2031,6 +2042,11 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *txn) // upsertJobVersion inserts a job into its historic version table and limits the // number of job versions that are tracked. func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) error { + // JobTrackedVersions really must not be zero here + if err := s.config.Validate(); err != nil { + return err + } + // Insert the job if err := txn.Insert("job_version", job); err != nil { return fmt.Errorf("failed to insert job into job_version table: %v", err) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 52778d092..9f67e78c6 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -29,6 +29,17 @@ func testStateStore(t *testing.T) *StateStore { return TestStateStore(t) } +func TestStateStore_InvalidConfig(t *testing.T) { + config := &StateStoreConfig{ + // default zero value, but explicit because it causes validation failure + JobTrackedVersions: 0, + } + store, err := NewStateStore(config) + must.Nil(t, store) + must.Error(t, err) + must.ErrorContains(t, err, "JobTrackedVersions must be positive") +} + func TestStateStore_Blocking_Error(t *testing.T) { ci.Parallel(t) diff --git a/nomad/state/testing.go b/nomad/state/testing.go index dcad49098..cb955ffa4 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -32,11 +32,13 @@ func TestStateStore(t testing.TB) *StateStore { func TestStateStorePublisher(t testing.TB) *StateStoreConfig { return &StateStoreConfig{ - Logger: testlog.HCLogger(t), - Region: "global", - EnablePublisher: true, + Logger: testlog.HCLogger(t), + Region: "global", + EnablePublisher: true, + JobTrackedVersions: structs.JobDefaultTrackedVersions, } } + func TestStateStoreCfg(t testing.TB, cfg *StateStoreConfig) *StateStore { state, err := NewStateStore(cfg) if err != nil {