event durability count and cfg

This commit is contained in:
Drew Bailey
2020-10-05 19:40:06 -04:00
parent 004f634868
commit 8a3130a356
9 changed files with 41 additions and 28 deletions

View File

@@ -488,6 +488,10 @@ type ServerConfig struct {
// will generate events for its event stream.
EnableEventPublisher bool `hcl:"enable_event_publisher"`
// DurableEventCount specifies the amount of events to persist during snapshot generation.
// A count of 0 signals that no events should be persisted.
DurableEventCount int `hcl:"durable_event_count"`
// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
@@ -880,6 +884,7 @@ func DefaultConfig() *Config {
Server: &ServerConfig{
Enabled: false,
EnableEventPublisher: true,
DurableEventCount: 100,
StartJoin: []string{},
ServerJoin: &ServerJoin{
RetryJoin: []string{},
@@ -1408,6 +1413,10 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.EnableEventPublisher = true
}
if b.DurableEventCount != 0 {
result.DurableEventCount = b.DurableEventCount
}
if b.DefaultSchedulerConfig != nil {
c := *b.DefaultSchedulerConfig
result.DefaultSchedulerConfig = &c

View File

@@ -123,6 +123,7 @@ var basicConfig = &Config{
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
EnableEventPublisher: true,
DurableEventCount: 100,
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: time.Duration(15) * time.Second,

View File

@@ -131,6 +131,7 @@ server {
encrypt = "abc"
raft_multiplier = 4
enable_event_publisher = true
durable_event_count = 100
server_join {
retry_join = ["1.1.1.1", "2.2.2.2"]

View File

@@ -262,6 +262,7 @@
"deployment_gc_threshold": "12h",
"enabled": true,
"enable_event_publisher": true,
"durable_event_count": 100,
"enabled_schedulers": [
"test"
],

View File

@@ -128,17 +128,21 @@ type FSMConfig struct {
Region string
EnableEventPublisher bool
// Durable count specifies the amount of events generated by the state store
// to save to disk during snapshot generation. The most recent events
// limited to count will be saved.
DurableEventCount int
}
// NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(config *FSMConfig) (*nomadFSM, error) {
// Create a state store
sconfig := &state.StateStoreConfig{
Logger: config.Logger,
Region: config.Region,
EnablePublisher: config.EnableEventPublisher,
// TODO(drew) plumb cfg
EnableDurability: true,
Logger: config.Logger,
Region: config.Region,
EnablePublisher: config.EnableEventPublisher,
DurableEventCount: config.DurableEventCount,
}
state, err := state.NewStateStore(sconfig)
if err != nil {
@@ -1269,11 +1273,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
// Create a new state store
config := &state.StateStoreConfig{
Logger: n.config.Logger,
Region: n.config.Region,
EnablePublisher: n.config.EnableEventPublisher,
// TODO(drew) plumb cfg
EnableDurability: true,
Logger: n.config.Logger,
Region: n.config.Region,
EnablePublisher: n.config.EnableEventPublisher,
DurableEventCount: n.config.DurableEventCount,
}
newState, err := state.NewStateStore(config)
if err != nil {
@@ -2363,10 +2366,10 @@ func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink,
func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error {
var durableCount int
if s.snap.Config() != nil && !s.snap.Config().EnableDurability {
if s.snap.Config() != nil && s.snap.Config().DurableEventCount == 0 {
return nil
} else {
durableCount = s.snap.Config().DurableCount
durableCount = s.snap.Config().DurableEventCount
}
events, err := s.snap.LatestEventsReverse(nil)

View File

@@ -3206,9 +3206,8 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) {
fsm := testFSM(t)
state := fsm.State()
cfg := state.Config()
cfg.EnableDurability = true
// DurableCount = 4 each mock events wrapper contains 2 events
cfg.DurableCount = 4
// DurableEventCount = 4 each mock events wrapper contains 2 events
cfg.DurableEventCount = 4
e1 := mock.Events(1000)
e2 := mock.Events(1001)
@@ -3251,7 +3250,7 @@ func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) {
fsm := testFSM(t)
state := fsm.State()
cfg := state.Config()
cfg.EnableDurability = false
cfg.DurableEventCount = 0
e1 := mock.Events(1000)
e2 := mock.Events(1001)

View File

@@ -42,8 +42,7 @@ type changeTrackerDB struct {
// ChangeConfig
type ChangeConfig struct {
DurableEvents bool
DurableCount int
DurableEventCount int
}
func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, changesFn changeProcessor, cfg *ChangeConfig) *changeTrackerDB {
@@ -55,8 +54,7 @@ func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, chang
db: db,
publisher: publisher,
processChanges: changesFn,
durableEvents: cfg.DurableEvents,
durableCount: cfg.DurableCount,
durableCount: cfg.DurableEventCount,
}
}

View File

@@ -46,10 +46,12 @@ type StateStoreConfig struct {
// Region is the region of the server embedding the state store.
Region string
// EnablePublisher is used to enable or disable the event publisher
EnablePublisher bool
EnableDurability bool
DurableCount int
// DurableEventCount is the amount of events to persist during the snapshot
// process.
DurableEventCount int
}
// The StateStore is responsible for maintaining all the Nomad
@@ -94,8 +96,7 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
if config.EnablePublisher {
cfg := &ChangeConfig{
DurableEvents: config.EnableDurability,
DurableCount: 1000,
DurableEventCount: 1000,
}
publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{
EventBufferTTL: 1 * time.Hour,

View File

@@ -26,10 +26,10 @@ func TestStateStore(t testing.T) *StateStore {
func TestStateStorePublisher(t testing.T) *StateStoreConfig {
return &StateStoreConfig{
Logger: testlog.HCLogger(t),
Region: "global",
EnablePublisher: true,
EnableDurability: true,
Logger: testlog.HCLogger(t),
Region: "global",
EnablePublisher: true,
DurableEventCount: 100,
}
}
func TestStateStoreCfg(t testing.T, cfg *StateStoreConfig) *StateStore {