diff --git a/command/agent/agent.go b/command/agent/agent.go index 1605fc086..5207451bb 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -252,12 +252,6 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { } conf.EventBufferSize = int64(*agentConfig.Server.EventBufferSize) } - if agentConfig.Server.DurableEventCount != nil { - if *agentConfig.Server.DurableEventCount < 0 { - return nil, fmt.Errorf("Invalid Config, durable_event_count must be non-negative") - } - conf.DurableEventCount = int64(*agentConfig.Server.DurableEventCount) - } if agentConfig.Autopilot != nil { if agentConfig.Autopilot.CleanupDeadServers != nil { conf.AutopilotConfig.CleanupDeadServers = *agentConfig.Autopilot.CleanupDeadServers diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 5c71a70b3..4f9c39585 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -58,7 +58,6 @@ func TestAgent_ServerConfig(t *testing.T) { require.NoError(t, err) require.True(t, out.EnableEventBroker) - require.Equal(t, int64(100), out.DurableEventCount) serfAddr := out.SerfConfig.MemberlistConfig.AdvertiseAddr require.Equal(t, "127.0.0.1", serfAddr) diff --git a/command/agent/config.go b/command/agent/config.go index 83a09e5da..a83445e21 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -494,10 +494,6 @@ type ServerConfig struct { // for the EventBufferSize is 1. EventBufferSize *int `hcl:"event_buffer_size"` - // DurableEventCount specifies the amount of events to persist during snapshot generation. - // A count of 0 signals that no events should be persisted. - DurableEventCount *int `hcl:"durable_event_count"` - // ExtraKeysHCL is used by hcl to surface unexpected keys ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` } @@ -891,7 +887,6 @@ func DefaultConfig() *Config { Enabled: false, EnableEventBroker: helper.BoolToPtr(true), EventBufferSize: helper.IntToPtr(100), - DurableEventCount: helper.IntToPtr(100), StartJoin: []string{}, ServerJoin: &ServerJoin{ RetryJoin: []string{}, @@ -1424,10 +1419,6 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig { result.EventBufferSize = b.EventBufferSize } - if b.DurableEventCount != nil { - result.DurableEventCount = b.DurableEventCount - } - if b.DefaultSchedulerConfig != nil { c := *b.DefaultSchedulerConfig result.DefaultSchedulerConfig = &c diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 771ff1973..b3c5ab87d 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -124,7 +124,6 @@ var basicConfig = &Config{ EncryptKey: "abc", EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(200), - DurableEventCount: helper.IntToPtr(0), ServerJoin: &ServerJoin{ RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, RetryInterval: time.Duration(15) * time.Second, @@ -174,15 +173,15 @@ var basicConfig = &Config{ }, }, Telemetry: &Telemetry{ - StatsiteAddr: "127.0.0.1:1234", - StatsdAddr: "127.0.0.1:2345", - PrometheusMetrics: true, - DisableHostname: true, - UseNodeName: false, - CollectionInterval: "3s", - collectionInterval: 3 * time.Second, - PublishAllocationMetrics: true, - PublishNodeMetrics: true, + StatsiteAddr: "127.0.0.1:1234", + StatsdAddr: "127.0.0.1:2345", + PrometheusMetrics: true, + DisableHostname: true, + UseNodeName: false, + CollectionInterval: "3s", + collectionInterval: 3 * time.Second, + PublishAllocationMetrics: true, + PublishNodeMetrics: true, }, LeaveOnInt: true, LeaveOnTerm: true, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 4e0aa9346..11681f002 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -140,7 +140,6 @@ func TestConfig_Merge(t *testing.T) { UpgradeVersion: "foo", EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(0), - DurableEventCount: helper.IntToPtr(0), }, ACL: &ACLConfig{ Enabled: true, @@ -332,7 +331,6 @@ func TestConfig_Merge(t *testing.T) { RedundancyZone: "bar", UpgradeVersion: "bar", EnableEventBroker: helper.BoolToPtr(true), - DurableEventCount: helper.IntToPtr(100), EventBufferSize: helper.IntToPtr(100), }, ACL: &ACLConfig{ @@ -1177,50 +1175,41 @@ func TestEventBroker_Parse(t *testing.T) { a := &ServerConfig{ EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(0), - DurableEventCount: helper.IntToPtr(0), } b := DefaultConfig().Server b.EnableEventBroker = nil b.EventBufferSize = nil - b.DurableEventCount = nil result := a.Merge(b) require.Equal(false, *result.EnableEventBroker) require.Equal(0, *result.EventBufferSize) - require.Equal(0, *result.DurableEventCount) } { a := &ServerConfig{ EnableEventBroker: helper.BoolToPtr(true), EventBufferSize: helper.IntToPtr(5000), - DurableEventCount: helper.IntToPtr(200), } b := DefaultConfig().Server b.EnableEventBroker = nil b.EventBufferSize = nil - b.DurableEventCount = nil result := a.Merge(b) require.Equal(true, *result.EnableEventBroker) require.Equal(5000, *result.EventBufferSize) - require.Equal(200, *result.DurableEventCount) } { a := &ServerConfig{ EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(0), - DurableEventCount: helper.IntToPtr(0), } b := DefaultConfig().Server b.EnableEventBroker = helper.BoolToPtr(true) b.EventBufferSize = helper.IntToPtr(20000) - b.DurableEventCount = helper.IntToPtr(1000) result := a.Merge(b) require.Equal(true, *result.EnableEventBroker) require.Equal(20000, *result.EventBufferSize) - require.Equal(1000, *result.DurableEventCount) } } diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index 66d9f9d0f..74b33e85a 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -132,7 +132,6 @@ server { raft_multiplier = 4 enable_event_broker = false event_buffer_size = 200 - durable_event_count = 0 server_join { retry_join = ["1.1.1.1", "2.2.2.2"] diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index c20889fc6..8a988b309 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -263,7 +263,6 @@ "enabled": true, "enable_event_broker": false, "event_buffer_size": 200, - "durable_event_count": 0, "enabled_schedulers": [ "test" ], diff --git a/nomad/config.go b/nomad/config.go index 93abdc6f1..08e4f562f 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -85,10 +85,6 @@ type Config struct { // EventBufferSize is the amount of events to hold in memory. EventBufferSize int64 - // DurableEventCount is the amount of events to save to disk when - // snapshotting - DurableEventCount int64 - // LogOutput is the location to write logs to. If this is not set, // logs will go to stderr. LogOutput io.Writer @@ -426,7 +422,6 @@ func DefaultConfig() *Config { LicenseConfig: &LicenseConfig{}, EnableEventBroker: true, EventBufferSize: 100, - DurableEventCount: 100, AutopilotConfig: &structs.AutopilotConfig{ CleanupDeadServers: true, LastContactThreshold: 200 * time.Millisecond, diff --git a/nomad/fsm.go b/nomad/fsm.go index efa86768d..063269a2b 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -8,7 +8,6 @@ import ( "time" metrics "github.com/armon/go-metrics" - "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" @@ -53,7 +52,6 @@ const ( CSIPluginSnapshot CSIVolumeSnapshot ScalingEventsSnapshot - EventSnapshot ) // LogApplier is the definition of a function that can apply a Raft log @@ -101,9 +99,8 @@ type nomadFSM struct { // state in a way that can be accessed concurrently with operations // that may modify the live state. type nomadSnapshot struct { - snap *state.StateSnapshot - timetable *TimeTable - durableEventCount int64 + snap *state.StateSnapshot + timetable *TimeTable } // snapshotHeader is the first entry in our snapshot @@ -135,22 +132,16 @@ type FSMConfig struct { // EventBufferSize is the amount of messages to hold in memory EventBufferSize int64 - - // Durable count specifies the amount of events generated by the state store - // to save to disk during snapshot generation. The most recent events - // limited to count will be saved. - DurableEventCount int64 } // NewFSMPath is used to construct a new FSM with a blank state func NewFSM(config *FSMConfig) (*nomadFSM, error) { // Create a state store sconfig := &state.StateStoreConfig{ - Logger: config.Logger, - Region: config.Region, - EnablePublisher: config.EnableEventBroker, - EventBufferSize: config.EventBufferSize, - DurableEventCount: config.DurableEventCount, + Logger: config.Logger, + Region: config.Region, + EnablePublisher: config.EnableEventBroker, + EventBufferSize: config.EventBufferSize, } state, err := state.NewStateStore(sconfig) if err != nil { @@ -1265,9 +1256,8 @@ func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { } ns := &nomadSnapshot{ - snap: snap, - timetable: n.timetable, - durableEventCount: n.config.DurableEventCount, + snap: snap, + timetable: n.timetable, } return ns, nil } @@ -1277,11 +1267,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // Create a new state store config := &state.StateStoreConfig{ - Logger: n.config.Logger, - Region: n.config.Region, - EnablePublisher: n.config.EnableEventBroker, - EventBufferSize: n.config.EventBufferSize, - DurableEventCount: n.config.DurableEventCount, + Logger: n.config.Logger, + Region: n.config.Region, + EnablePublisher: n.config.EnableEventBroker, + EventBufferSize: n.config.EventBufferSize, } newState, err := state.NewStateStore(config) if err != nil { @@ -1525,20 +1514,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := restore.CSIVolumeRestore(plugin); err != nil { return err } - case EventSnapshot: - // If the event broker is disabled but the snapshot from potentially - // a remote server has events, ignore them - if !n.config.EnableEventBroker { - return nil - } - - event := new(structs.Events) - if err := dec.Decode(event); err != nil { - return err - } - if err := restore.EventRestore(event); err != nil { - return err - } default: // Check if this is an enterprise only object being restored restorer, ok := n.enterpriseRestorers[snapType] @@ -1575,43 +1550,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // blocking queries won't see any changes and need to be woken up. stateOld.Abandon() - // Rehydrate the new state store's event publisher with the events - // persisted in the snapshot - if n.config.EnableEventBroker { - n.logger.Debug("Rehydrating event broker events from snapshot") - if err := rehydratePublisherFromState(n.state, n.logger); err != nil { - n.logger.Error("Error re-hydrating event publisher during restore", "error", err) - } - } - - return nil -} - -// rehydratePublisherFromState is used during a snapshot restore to -// add the persisted events from that snapshot that were just added to memdb -// back into the event publisher -func rehydratePublisherFromState(s *state.StateStore, l hclog.Logger) error { - pub, err := s.EventBroker() - if err != nil { - return err - } - - events, err := s.Events(nil) - if err != nil { - return err - } - count := 0 - for { - raw := events.Next() - if raw == nil { - break - } - e := raw.(*structs.Events) - pub.Publish(e) - count++ - } - - l.Debug("finished hydrating event broker from snapshot", "events", count) return nil } @@ -1890,10 +1828,6 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } - if err := s.persistEvents(sink, encoder); err != nil { - sink.Cancel() - return err - } return nil } @@ -2397,42 +2331,6 @@ func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink, return nil } -func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error { - if s.durableEventCount == 0 { - return nil - } - - events, err := s.snap.LatestEventsReverse(nil) - if err != nil { - return err - } - - var count int64 - for { - // Get the next item - raw := events.Next() - if raw == nil { - break - } - - // Prepare the request struct - event := raw.(*structs.Events) - - // Write out a volume snapshot - sink.Write([]byte{byte(EventSnapshot)}) - if err := encoder.Encode(event); err != nil { - return err - } - count += int64(len(event.Events)) - - // Only write to sink until durableCount has been reached - if count >= s.durableEventCount { - return nil - } - } - return nil -} - // Release is a no-op, as we just need to GC the pointer // to the state store snapshot. There is nothing to explicitly // cleanup. diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index c8c77fea6..f2f49126a 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -3201,87 +3201,3 @@ func TestFSM_ClusterMetadata(t *testing.T) { r.Equal(clusterID, storedMetadata.ClusterID) r.Equal(now, storedMetadata.CreateTime) } - -func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) { - t.Parallel() - // Add some state - fsm := testFSM(t) - fsm.config.EnableEventBroker = true - // DurableEventCount = 4 each mock events wrapper contains 2 events - fsm.config.DurableEventCount = 4 - - state := fsm.State() - - e1 := mock.Events(1000) - e2 := mock.Events(1001) - e3 := mock.Events(1002) - - require.NoError(t, state.UpsertEvents(1000, e1)) - require.NoError(t, state.UpsertEvents(1001, e2)) - require.NoError(t, state.UpsertEvents(1002, e3)) - - // Verify the contents - fsm2 := testSnapshotRestore(t, fsm) - state2 := fsm2.State() - - // latest events iterator is newest to oldest - iter, err := state2.LatestEventsReverse(nil) - require.NoError(t, err) - - raw3 := iter.Next() - require.NotNil(t, raw3) - - out3, ok := raw3.(*structs.Events) - require.True(t, ok) - require.Equal(t, e3.Index, out3.Index) - - raw2 := iter.Next() - require.NotNil(t, raw2) - - out2, ok := raw2.(*structs.Events) - require.True(t, ok) - require.Equal(t, e2.Index, out2.Index) - - // Durable count was 4 so e1 events should be excluded - raw1 := iter.Next() - require.Nil(t, raw1) - - pub, err := state2.EventBroker() - require.NoError(t, err) - - testutil.WaitForResult(func() (bool, error) { - plen := pub.Len() - if plen == 4 { - return true, nil - } - return false, fmt.Errorf("expected publisher to have len 2 got: %d", plen) - }, func(err error) { - require.Fail(t, err.Error()) - }) -} - -func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) { - t.Parallel() - fsm := testFSM(t) - // Enable event publisher with durable event count of zero - fsm.config.EnableEventBroker = true - fsm.config.DurableEventCount = 0 - - state := fsm.State() - - e1 := mock.Events(1000) - e2 := mock.Events(1001) - - require.NoError(t, state.UpsertEvents(1000, e1)) - require.NoError(t, state.UpsertEvents(1001, e2)) - - // Verify the contents - fsm2 := testSnapshotRestore(t, fsm) - state2 := fsm2.State() - // ws := memdb.NewWatchSet() - out, err := state2.LatestEventsReverse(nil) - require.NoError(t, err) - - raw := out.Next() - require.Nil(t, raw) -} diff --git a/nomad/server.go b/nomad/server.go index 44de55471..fb744d513 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1217,7 +1217,6 @@ func (s *Server) setupRaft() error { Logger: s.logger, Region: s.Region(), EnableEventBroker: s.config.EnableEventBroker, - DurableEventCount: s.config.DurableEventCount, EventBufferSize: s.config.EventBufferSize, } var err error diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 7f73bb4fc..3d1308859 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -52,7 +52,6 @@ func init() { csiPluginTableSchema, scalingPolicyTableSchema, scalingEventTableSchema, - eventTableSchema, }...) } @@ -901,19 +900,3 @@ func scalingEventTableSchema() *memdb.TableSchema { }, } } - -func eventTableSchema() *memdb.TableSchema { - return &memdb.TableSchema{ - Name: "events", - Indexes: map[string]*memdb.IndexSchema{ - "id": { - Name: "id", - AllowMissing: true, - Unique: true, - Indexer: &memdb.UintFieldIndex{ - Field: "Index", - }, - }, - }, - } -} diff --git a/nomad/state/schema_test.go b/nomad/state/schema_test.go index fd2f04b4c..f5b1b620f 100644 --- a/nomad/state/schema_test.go +++ b/nomad/state/schema_test.go @@ -1,7 +1,6 @@ package state import ( - "github.com/hashicorp/nomad/nomad/structs" "testing" memdb "github.com/hashicorp/go-memdb" @@ -145,70 +144,3 @@ func TestState_ScalingPolicyTargetFieldIndex_FromObject(t *testing.T) { require.Error(err) require.Equal("", string(val)) } - -func TestEventTableUintIndex(t *testing.T) { - - require := require.New(t) - - const ( - eventsTable = "events" - uintIDIdx = "id" - ) - - db, err := memdb.NewMemDB(&memdb.DBSchema{ - Tables: map[string]*memdb.TableSchema{ - eventsTable: eventTableSchema(), - }, - }) - require.NoError(err) - - // numRecords in table counts all the items in the table, which is expected - // to always be 1 since that's the point of the singletonRecord Indexer. - numRecordsInTable := func() int { - txn := db.Txn(false) - defer txn.Abort() - - iter, err := txn.Get(eventsTable, uintIDIdx) - require.NoError(err) - - num := 0 - for item := iter.Next(); item != nil; item = iter.Next() { - num++ - } - return num - } - - insertEvents := func(e *structs.Events) { - txn := db.Txn(true) - err := txn.Insert(eventsTable, e) - require.NoError(err) - txn.Commit() - } - - get := func(idx uint64) *structs.Events { - txn := db.Txn(false) - defer txn.Abort() - record, err := txn.First("events", "id", idx) - require.NoError(err) - s, ok := record.(*structs.Events) - require.True(ok) - return s - } - - firstEvent := &structs.Events{Index: 10, Events: []structs.Event{{Index: 10}, {Index: 10}}} - secondEvent := &structs.Events{Index: 11, Events: []structs.Event{{Index: 11}, {Index: 11}}} - thirdEvent := &structs.Events{Index: 202, Events: []structs.Event{{Index: 202}, {Index: 202}}} - insertEvents(firstEvent) - insertEvents(secondEvent) - insertEvents(thirdEvent) - require.Equal(3, numRecordsInTable()) - - gotFirst := get(10) - require.Equal(firstEvent, gotFirst) - - gotSecond := get(11) - require.Equal(secondEvent, gotSecond) - - gotThird := get(202) - require.Equal(thirdEvent, gotThird) -} diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index 7b8f6420e..8035a32cf 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -30,17 +30,15 @@ type Changes struct { // sent to the EventBroker which will create and emit change events. type changeTrackerDB struct { memdb *memdb.MemDB - durableCount int64 publisher *stream.EventBroker processChanges func(ReadTxn, Changes) (*structs.Events, error) } -func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventBroker, changesFn changeProcessor, durableCount int64) *changeTrackerDB { +func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventBroker, changesFn changeProcessor) *changeTrackerDB { return &changeTrackerDB{ memdb: db, publisher: publisher, processChanges: changesFn, - durableCount: durableCount, } } @@ -79,14 +77,11 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { } func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64) *txn { - persistChanges := c.durableCount > 0 - t := &txn{ - msgType: msgType, - Txn: c.memdb.Txn(true), - Index: idx, - publish: c.publish, - persistChanges: persistChanges, + msgType: msgType, + Txn: c.memdb.Txn(true), + Index: idx, + publish: c.publish, } t.Txn.TrackChanges() return t @@ -130,8 +125,6 @@ type txn struct { // msgType is used to inform event sourcing which type of event to create msgType structs.MessageType - persistChanges bool - *memdb.Txn // Index in raft where the write is occurring. The value is zero for a // read-only, or WriteTxnRestore transaction. @@ -157,18 +150,10 @@ func (tx *txn) Commit() error { Changes: tx.Txn.Changes(), MsgType: tx.MsgType(), } - events, err := tx.publish(changes) + _, err := tx.publish(changes) if err != nil { return err } - - if tx.persistChanges && events != nil { - // persist events after processing changes - err := tx.Txn.Insert("events", events) - if err != nil { - return err - } - } } tx.Txn.Commit() diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 0bdb49782..d5b2f1d3f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -51,10 +51,6 @@ type StateStoreConfig struct { // EventBufferSize configures the amount of events to hold in memory EventBufferSize int64 - - // DurableEventCount is used to determine if events from transaction changes - // should be saved in go-memdb - DurableEventCount int64 } // The StateStore is responsible for maintaining all the Nomad @@ -102,11 +98,10 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { broker := stream.NewEventBroker(ctx, stream.EventBrokerCfg{ EventBufferSize: config.EventBufferSize, Logger: config.Logger, - OnEvict: s.eventBrokerEvict, }) - s.db = NewChangeTrackerDB(db, broker, processDBChanges, config.DurableEventCount) + s.db = NewChangeTrackerDB(db, broker, processDBChanges) } else { - s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges, 0) + s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges) } // Initialize the state store with required enterprise objects @@ -124,30 +119,6 @@ func (s *StateStore) EventBroker() (*stream.EventBroker, error) { return s.db.publisher, nil } -// eventBrokerEvict is used as a callback to delete an evicted events -// entry from go-memdb. -func (s *StateStore) eventBrokerEvict(events *structs.Events) { - if err := s.deleteEvent(events); err != nil { - if err == memdb.ErrNotFound { - s.logger.Info("Evicted event was not found in go-memdb table", "event index", events.Index) - } else { - s.logger.Error("Error deleting event from events table", "error", err) - } - } -} - -func (s *StateStore) deleteEvent(events *structs.Events) error { - txn := s.db.memdb.Txn(true) - defer txn.Abort() - - if err := txn.Delete("events", events); err != nil { - return err - } - - txn.Commit() - return nil -} - // Config returns the state store configuration. func (s *StateStore) Config() *StateStoreConfig { return s.config @@ -165,7 +136,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { } // Create a new change tracker DB that does not publish or track changes - store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges, 0) + store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges) snap := &StateSnapshot{ StateStore: store, @@ -5667,48 +5638,6 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[ return nil, nil } -// LatestEventsReverse returns the unfiltered list of all volumes -func (s *StateStore) LatestEventsReverse(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.ReadTxn() - defer txn.Abort() - - iter, err := txn.GetReverse("events", "id") - if err != nil { - return nil, fmt.Errorf("events lookup failed: %v", err) - } - - ws.Add(iter.WatchCh()) - - return iter, nil -} - -// Events returns the unfiltered list of all volumes -func (s *StateStore) Events(ws memdb.WatchSet) (memdb.ResultIterator, error) { - txn := s.db.ReadTxn() - defer txn.Abort() - - iter, err := txn.Get("events", "id") - if err != nil { - return nil, fmt.Errorf("events lookup failed: %v", err) - } - - ws.Add(iter.WatchCh()) - - return iter, nil -} - -// UpsertEvents is used to insert events. It should only be used for testing. -// Normal use events are inserted to go-memdb during transaction commit -func (s *StateStore) UpsertEvents(index uint64, events *structs.Events) error { - txn := s.db.WriteTxn(index) - defer txn.Abort() - - if err := txn.Insert("events", events); err != nil { - return err - } - return txn.Commit() -} - // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore @@ -5956,13 +5885,6 @@ func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error { return nil } -func (r *StateRestore) EventRestore(events *structs.Events) error { - if err := r.txn.Insert("events", events); err != nil { - return fmt.Errorf("events insert failed: %v", err) - } - return nil -} - func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error { if err := r.txn.Insert("scaling_event", jobEvents); err != nil { return fmt.Errorf("scaling event insert failed: %v", err) diff --git a/nomad/state/state_store_events_test.go b/nomad/state/state_store_events_test.go deleted file mode 100644 index 8332ce6d9..000000000 --- a/nomad/state/state_store_events_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package state - -import ( - "errors" - "fmt" - "testing" - - "github.com/hashicorp/nomad/helper/testlog" - "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/require" -) - -// TestStateStore_Events_OnEvict tests that events in the state stores -// event publisher and go-memdb are evicted together when the event buffer -// size reaches its max. -func TestStateStore_Events_OnEvict(t *testing.T) { - t.Parallel() - - cfg := &StateStoreConfig{ - Logger: testlog.HCLogger(t), - Region: "global", - EnablePublisher: true, - EventBufferSize: 10, - DurableEventCount: 10, - } - s := TestStateStoreCfg(t, cfg) - - _, err := s.EventBroker() - require.NoError(t, err) - - // force 3 evictions - for i := 1; i < 13; i++ { - require.NoError(t, - s.UpsertNode(structs.NodeRegisterRequestType, uint64(i), mock.Node()), - ) - } - - get := func() []*structs.Events { - var out []*structs.Events - iter, err := s.Events(nil) - require.NoError(t, err) - for { - raw := iter.Next() - if raw == nil { - break - } - e := raw.(*structs.Events) - - out = append(out, e) - } - return out - } - - // event publisher is async so wait for it to prune - testutil.WaitForResult(func() (bool, error) { - out := get() - if len(out) != 10 { - return false, errors.New("Expected event count to be pruned to 10") - } - return true, nil - }, func(err error) { - require.Fail(t, err.Error()) - t.Fatalf("err: %s", err) - }) - - out := get() - require.Equal(t, 3, int(out[0].Index)) - -} - -// TestStateStore_Events_OnEvict_Missing tests behavior when the event publisher -// evicts an event and there is no corresponding go-memdb entry due to durability -// settings -func TestStateStore_Events_OnEvict_Missing(t *testing.T) { - t.Parallel() - - cfg := &StateStoreConfig{ - Logger: testlog.HCLogger(t), - Region: "global", - EnablePublisher: true, - EventBufferSize: 10, - DurableEventCount: 0, - } - s := TestStateStoreCfg(t, cfg) - - _, err := s.EventBroker() - require.NoError(t, err) - - getEvents := func() []*structs.Events { - var out []*structs.Events - iter, err := s.Events(nil) - require.NoError(t, err) - for { - raw := iter.Next() - if raw == nil { - break - } - e := raw.(*structs.Events) - - out = append(out, e) - } - return out - } - - // Publish 13 events to fill buffer and force 3 evictions - for i := 1; i < 13; i++ { - require.NoError(t, - s.UpsertNode(structs.NodeRegisterRequestType, uint64(i), mock.Node()), - ) - } - - // event publisher is async so wait for it to prune - testutil.WaitForResult(func() (bool, error) { - out := getEvents() - if len(out) != 0 { - return false, fmt.Errorf("Expected event count to be %d, got: %d", 0, len(out)) - } - return true, nil - }, func(err error) { - require.Fail(t, err.Error()) - t.Fatalf("err: %s", err) - }) -} diff --git a/nomad/stream/event_buffer.go b/nomad/stream/event_buffer.go index 419b27f98..7ae10a250 100644 --- a/nomad/stream/event_buffer.go +++ b/nomad/stream/event_buffer.go @@ -10,8 +10,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -type EvictCallbackFn func(events *structs.Events) - // eventBuffer is a single-writer, multiple-reader, fixed length concurrent // buffer of events that have been published. The buffer is // the head and tail of an atomically updated single-linked list. Atomic @@ -51,16 +49,14 @@ type eventBuffer struct { tail atomic.Value maxSize int64 - onEvict EvictCallbackFn } // newEventBuffer creates an eventBuffer ready for use. -func newEventBuffer(size int64, onEvict EvictCallbackFn) *eventBuffer { +func newEventBuffer(size int64) *eventBuffer { zero := int64(0) b := &eventBuffer{ maxSize: size, size: &zero, - onEvict: onEvict, } item := newBufferItem(&structs.Events{Index: 0, Events: nil}) @@ -133,11 +129,6 @@ func (b *eventBuffer) advanceHead() { // update the amount of events we have in the buffer rmCount := len(old.Events.Events) atomic.AddInt64(b.size, -int64(rmCount)) - - // Call evict callback if the item isn't a sentinel value - if b.onEvict != nil && old.Events.Index != 0 { - b.onEvict(old.Events) - } } // Head returns the current head of the buffer. It will always exist but it may diff --git a/nomad/stream/event_buffer_test.go b/nomad/stream/event_buffer_test.go index 45501d123..cf43b5bc0 100644 --- a/nomad/stream/event_buffer_test.go +++ b/nomad/stream/event_buffer_test.go @@ -17,7 +17,7 @@ func TestEventBufferFuzz(t *testing.T) { nReaders := 1000 nMessages := 1000 - b := newEventBuffer(1000, nil) + b := newEventBuffer(1000) // Start a write goroutine that will publish 10000 messages with sequential // indexes and some jitter in timing (to allow clients to "catch up" and block @@ -85,7 +85,7 @@ func TestEventBufferFuzz(t *testing.T) { } func TestEventBuffer_Slow_Reader(t *testing.T) { - b := newEventBuffer(10, nil) + b := newEventBuffer(10) for i := 0; i < 10; i++ { e := structs.Event{ @@ -114,7 +114,7 @@ func TestEventBuffer_Slow_Reader(t *testing.T) { } func TestEventBuffer_Size(t *testing.T) { - b := newEventBuffer(100, nil) + b := newEventBuffer(100) for i := 0; i < 10; i++ { e := structs.Event{ @@ -130,7 +130,7 @@ func TestEventBuffer_Size(t *testing.T) { // are removed, the event buffer should advance its head down to the last message // and insert a placeholder sentinel value. func TestEventBuffer_Emptying_Buffer(t *testing.T) { - b := newEventBuffer(10, nil) + b := newEventBuffer(10) for i := 0; i < 10; i++ { e := structs.Event{ @@ -203,7 +203,7 @@ func TestEventBuffer_StartAt_CurrentIdx_Past_Start(t *testing.T) { } // buffer starts at index 11 goes to 100 - b := newEventBuffer(100, nil) + b := newEventBuffer(100) for i := 11; i <= 100; i++ { e := structs.Event{ @@ -220,26 +220,3 @@ func TestEventBuffer_StartAt_CurrentIdx_Past_Start(t *testing.T) { }) } } - -func TestEventBuffer_OnEvict(t *testing.T) { - called := make(chan struct{}) - testOnEvict := func(events *structs.Events) { - close(called) - } - b := newEventBuffer(2, testOnEvict) - - // start at 1 since new event buffer is built with a starting sentinel value - for i := 1; i < 4; i++ { - e := structs.Event{ - Index: uint64(i), // Indexes should be contiguous - } - b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}}) - } - - select { - case <-called: - // testOnEvict called - case <-time.After(100 * time.Millisecond): - require.Fail(t, "expected testOnEvict to be called") - } -} diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go index bcf8a8fc5..dd6522035 100644 --- a/nomad/stream/event_publisher.go +++ b/nomad/stream/event_publisher.go @@ -19,7 +19,6 @@ const ( type EventBrokerCfg struct { EventBufferSize int64 Logger hclog.Logger - OnEvict EvictCallbackFn } type EventBroker struct { @@ -53,7 +52,7 @@ func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) *EventBroker { cfg.EventBufferSize = 100 } - buffer := newEventBuffer(cfg.EventBufferSize, cfg.OnEvict) + buffer := newEventBuffer(cfg.EventBufferSize) e := &EventBroker{ logger: cfg.Logger.Named("event_broker"), eventBuf: buffer, diff --git a/website/pages/docs/configuration/server.mdx b/website/pages/docs/configuration/server.mdx index efc4b1266..619911b73 100644 --- a/website/pages/docs/configuration/server.mdx +++ b/website/pages/docs/configuration/server.mdx @@ -98,14 +98,6 @@ server { deployment must be in the terminal state before it is eligible for garbage collection. This is specified using a label suffix like "30s" or "1h". -- `durable_event_count` `(int: 100)` - Specifies the number of events to - persist during snapshot generation. This provides a given server's event - stream durability during restarts by persiting a backlog of events to be - repopulated when the server comes back online. Increasing this value provides - a greater history of messages when a server comes online. Decreasing this - value reduces the history of messages but uses less disk i/o during - snapshot creation. - - `csi_volume_claim_gc_threshold` `(string: "1h")` - Specifies the minimum age of a CSI volume before it is eligible to have its claims garbage collected. This is specified using a label suffix like "30s" or "1h".