From 39ef3263cab436531fa1c586b98f4246fa83294c Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Tue, 6 Oct 2020 16:21:58 -0400 Subject: [PATCH] Add EvictCallbackFn to handle removing entries from go-memdb when they are removed from the event buffer. Wire up event buffer size config, use pointers for structs.Events instead of copying. --- api/event.go | 6 +- command/agent/agent.go | 7 +- command/agent/command.go | 5 - command/agent/config.go | 18 +++- command/agent/config_parse_test.go | 3 +- command/agent/config_test.go | 40 +++++++ command/agent/event_endpoint.go | 6 +- command/agent/event_endpoint_test.go | 3 +- command/agent/testdata/basic.hcl | 3 +- command/agent/testdata/basic.json | 3 +- nomad/config.go | 5 +- nomad/deploymentwatcher/testutil_test.go | 2 +- nomad/event_endpoint.go | 14 +-- nomad/event_endpoint_test.go | 130 +++++++++++++++++++++-- nomad/fsm.go | 46 ++++---- nomad/fsm_test.go | 19 ++-- nomad/node_endpoint_test.go | 2 +- nomad/server.go | 1 + nomad/state/apply_plan_events.go | 10 +- nomad/state/deployment_events.go | 10 +- nomad/state/events.go | 76 +++++++++---- nomad/state/node_events.go | 47 ++------ nomad/state/node_events_test.go | 3 +- nomad/state/state_changes.go | 56 ++++++---- nomad/state/state_store.go | 51 +++++++-- nomad/state/state_store_events_test.go | 125 ++++++++++++++++++++++ nomad/state/state_store_test.go | 12 +-- nomad/state/testing.go | 7 +- nomad/stream/event_buffer.go | 100 +++++++++-------- nomad/stream/event_buffer_test.go | 81 ++++++++++---- nomad/stream/event_publisher.go | 43 +++----- nomad/stream/event_publisher_test.go | 7 +- nomad/stream/subscription.go | 6 +- 33 files changed, 669 insertions(+), 278 deletions(-) create mode 100644 nomad/state/state_store_events_test.go diff --git a/api/event.go b/api/event.go index dce3c265f..2bf08fba5 100644 --- a/api/event.go +++ b/api/event.go @@ -6,7 +6,7 @@ import ( "fmt" ) -// Ebvents is a set of events for a corresponding index. Events returned for the +// Events is a set of events for a corresponding index. Events returned for the // index depend on which topics are subscribed to when a request is made. type Events struct { Index uint64 @@ -78,9 +78,9 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind // Decode next newline delimited json of events var events Events if err := dec.Decode(&events); err != nil { + // set error and fallthrough to + // select eventsCh events = Events{Err: err} - eventsCh <- &events - return } if events.IsHeartbeat() { continue diff --git a/command/agent/agent.go b/command/agent/agent.go index 16f3c20d5..54de24470 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -243,8 +243,11 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { if agentConfig.Server.UpgradeVersion != "" { conf.UpgradeVersion = agentConfig.Server.UpgradeVersion } - if agentConfig.Server.EnableEventPublisher { - conf.EnableEventPublisher = agentConfig.Server.EnableEventPublisher + if agentConfig.Server.EnableEventPublisher != nil { + conf.EnableEventPublisher = *agentConfig.Server.EnableEventPublisher + } + if agentConfig.Server.EventBufferSize > 0 { + conf.EventBufferSize = int64(agentConfig.Server.EventBufferSize) } if agentConfig.Autopilot != nil { if agentConfig.Autopilot.CleanupDeadServers != nil { diff --git a/command/agent/command.go b/command/agent/command.go index 71beabbee..0ed3ff338 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -94,7 +94,6 @@ func (c *Command) readConfig() *Config { flags.Var((*flaghelper.StringFlag)(&cmdConfig.Server.ServerJoin.StartJoin), "join", "") flags.Var((*flaghelper.StringFlag)(&cmdConfig.Server.ServerJoin.RetryJoin), "retry-join", "") flags.IntVar(&cmdConfig.Server.ServerJoin.RetryMaxAttempts, "retry-max", 0, "") - flags.BoolVar(&cmdConfig.Server.EnableEventPublisher, "event-publisher", false, "") flags.Var((flaghelper.FuncDurationVar)(func(d time.Duration) error { cmdConfig.Server.ServerJoin.RetryInterval = d return nil @@ -597,7 +596,6 @@ func (c *Command) AutocompleteFlags() complete.Flags { "-vault-tls-server-name": complete.PredictAnything, "-acl-enabled": complete.PredictNothing, "-acl-replication-token": complete.PredictAnything, - "-event-publisher": complete.PredictNothing, } } @@ -1280,9 +1278,6 @@ Server Options: -rejoin Ignore a previous leave and attempts to rejoin the cluster. - -event-publisher - Whether to enable or disable the servers event publisher. - Client Options: -client diff --git a/command/agent/config.go b/command/agent/config.go index 3ba23ecee..9665fcaa9 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -486,7 +486,12 @@ type ServerConfig struct { // EnableEventPublisher configures whether this server's state store // will generate events for its event stream. - EnableEventPublisher bool `hcl:"enable_event_publisher"` + EnableEventPublisher *bool `hcl:"enable_event_publisher"` + + // EventBufferSize configure the amount of events to be held in memory. + // If EnableEventPublisher is set to true, the minimum allowable value + // for the EventBufferSize is 1. + EventBufferSize int `hcl:"event_buffer_size"` // DurableEventCount specifies the amount of events to persist during snapshot generation. // A count of 0 signals that no events should be persisted. @@ -883,7 +888,8 @@ func DefaultConfig() *Config { }, Server: &ServerConfig{ Enabled: false, - EnableEventPublisher: true, + EnableEventPublisher: helper.BoolToPtr(true), + EventBufferSize: 100, DurableEventCount: 100, StartJoin: []string{}, ServerJoin: &ServerJoin{ @@ -1409,8 +1415,12 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig { result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin) } - if b.EnableEventPublisher { - result.EnableEventPublisher = true + if b.EnableEventPublisher != nil { + result.EnableEventPublisher = b.EnableEventPublisher + } + + if b.EventBufferSize != 0 { + result.EventBufferSize = b.EventBufferSize } if b.DurableEventCount != 0 { diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 9cce0f52e..8999fa281 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -122,7 +122,8 @@ var basicConfig = &Config{ RedundancyZone: "foo", UpgradeVersion: "0.8.0", EncryptKey: "abc", - EnableEventPublisher: true, + EnableEventPublisher: helper.BoolToPtr(false), + EventBufferSize: 200, DurableEventCount: 100, ServerJoin: &ServerJoin{ RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 32115d32d..59aa42e05 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -138,6 +138,7 @@ func TestConfig_Merge(t *testing.T) { MaxHeartbeatsPerSecond: 30.0, RedundancyZone: "foo", UpgradeVersion: "foo", + EnableEventPublisher: helper.BoolToPtr(false), }, ACL: &ACLConfig{ Enabled: true, @@ -328,6 +329,7 @@ func TestConfig_Merge(t *testing.T) { NonVotingServer: true, RedundancyZone: "bar", UpgradeVersion: "bar", + EnableEventPublisher: helper.BoolToPtr(true), }, ACL: &ACLConfig{ Enabled: true, @@ -1163,3 +1165,41 @@ func TestTelemetry_Parse(t *testing.T) { require.Exactly([]string{"+nomad.raft"}, config.Telemetry.PrefixFilter) require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics) } + +func TestEventPublisher_Parse(t *testing.T) { + + require := require.New(t) + + { + a := &ServerConfig{ + EnableEventPublisher: helper.BoolToPtr(false), + } + b := DefaultConfig().Server + b.EnableEventPublisher = nil + + result := a.Merge(b) + require.Equal(false, *result.EnableEventPublisher) + } + + { + a := &ServerConfig{ + EnableEventPublisher: helper.BoolToPtr(true), + } + b := DefaultConfig().Server + b.EnableEventPublisher = nil + + result := a.Merge(b) + require.Equal(true, *result.EnableEventPublisher) + } + + { + a := &ServerConfig{ + EnableEventPublisher: helper.BoolToPtr(false), + } + b := DefaultConfig().Server + b.EnableEventPublisher = helper.BoolToPtr(true) + + result := a.Merge(b) + require.Equal(true, *result.EnableEventPublisher) + } +} diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go index 828fbb01b..af46620ac 100644 --- a/command/agent/event_endpoint.go +++ b/command/agent/event_endpoint.go @@ -74,7 +74,7 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i output := ioutils.NewWriteFlusher(resp) // create an error channel to handle errors - errCh := make(chan HTTPCodedError, 2) + errCh := make(chan HTTPCodedError, 1) go func() { defer cancel() @@ -124,9 +124,7 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i cancel() codedErr := <-errCh - if codedErr != nil && - (codedErr == io.EOF || - strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error())) { + if codedErr != nil && strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error()) { codedErr = nil } diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index 915029efc..546dd6516 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -39,7 +39,7 @@ func TestEventStream(t *testing.T) { pub, err := s.Agent.server.State().EventPublisher() require.NoError(t, err) - pub.Publish(structs.Events{Index: 100, Events: []structs.Event{{Payload: testEvent{ID: "123"}}}}) + pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Payload: testEvent{ID: "123"}}}}) testutil.WaitForResult(func() (bool, error) { got := resp.Body.String() @@ -56,7 +56,6 @@ func TestEventStream(t *testing.T) { // wait for response to close to prevent race between subscription // shutdown and server shutdown returning subscription closed by server err - // resp.Close() cancel() select { case err := <-respErrCh: diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index da08118db..be35093e9 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -130,7 +130,8 @@ server { upgrade_version = "0.8.0" encrypt = "abc" raft_multiplier = 4 - enable_event_publisher = true + enable_event_publisher = false + event_buffer_size = 200 durable_event_count = 100 server_join { diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index 2d671dd1a..d9db2d5d0 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -261,7 +261,8 @@ "data_dir": "/tmp/data", "deployment_gc_threshold": "12h", "enabled": true, - "enable_event_publisher": true, + "enable_event_publisher": false, + "event_buffer_size": 200, "durable_event_count": 100, "enabled_schedulers": [ "test" diff --git a/nomad/config.go b/nomad/config.go index 081d83e02..0a2272be8 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -78,10 +78,13 @@ type Config struct { // in the absence of ACLs EnableDebug bool - // EnableEventPublisher is used to enable or disable the state stores + // EnableEventPublisher is used to enable or disable state store // event publishing EnableEventPublisher bool + // EventBufferSize is the amount of events to hold in memory. + EventBufferSize int64 + // LogOutput is the location to write logs to. If this is not set, // logs will go to stderr. LogOutput io.Writer diff --git a/nomad/deploymentwatcher/testutil_test.go b/nomad/deploymentwatcher/testutil_test.go index a3e21d7df..87cd492fe 100644 --- a/nomad/deploymentwatcher/testutil_test.go +++ b/nomad/deploymentwatcher/testutil_test.go @@ -38,7 +38,7 @@ func (m *mockBackend) nextIndex() uint64 { func (m *mockBackend) UpdateAllocDesiredTransition(u *structs.AllocUpdateDesiredTransitionRequest) (uint64, error) { m.Called(u) i := m.nextIndex() - return i, m.state.UpdateAllocsDesiredTransitions(i, u.Allocs, u.Evals) + return i, m.state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, i, u.Allocs, u.Evals) } // matchUpdateAllocDesiredTransitions is used to match an upsert request diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index 6c84d3783..202fcf99b 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -55,14 +55,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) { return } - // TODO(drew) handle streams without ACLS - reqToken := args.AuthToken - if reqToken == "" { - // generate a random request token - reqToken = uuid.Generate() + // authToken is passed to the subscribe request so the event stream + // can handle closing a subscription if the authToken expires. + // If ACLs are disabled, a random token is generated and it will + // never be closed due to expiry. + authToken := args.AuthToken + if authToken == "" { + authToken = uuid.Generate() } subReq := &stream.SubscribeRequest{ - Token: reqToken, + Token: authToken, Topics: args.Topics, Index: uint64(args.Index), } diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go index b05aef2b8..573084ff3 100644 --- a/nomad/event_endpoint_test.go +++ b/nomad/event_endpoint_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" @@ -28,7 +29,7 @@ func TestEventStream(t *testing.T) { // Create request for all topics and keys req := structs.EventStreamRequest{ - Topics: map[structs.Topic][]string{"*": []string{"*"}}, + Topics: map[structs.Topic][]string{"*": {"*"}}, QueryOptions: structs.QueryOptions{ Region: s1.Region(), }, @@ -47,7 +48,7 @@ func TestEventStream(t *testing.T) { // invoke handler go handler(p2) - // send request + // decode request responses go func() { decoder := codec.NewDecoder(p1, structs.MsgpackHandle) for { @@ -68,8 +69,9 @@ func TestEventStream(t *testing.T) { require.NoError(t, err) node := mock.Node() - publisher.Publish(structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}}) + publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}}) + // Send request encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(t, encoder.Encode(req)) @@ -160,14 +162,14 @@ func TestEventStream_StreamErr(t *testing.T) { require.NoError(t, err) node := mock.Node() - publisher.Publish(structs.Events{uint64(1), []structs.Event{{Topic: "test", Payload: node}}}) // send req encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(t, encoder.Encode(req)) - // stop the publisher to force an error on subscription side - s1.State().StopEventPublisher() + // publish some events + publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}}) + publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}}) timeout := time.After(5 * time.Second) OUTER: @@ -178,8 +180,10 @@ OUTER: case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: + // close the publishers subscriptions forcing an error + // after an initial event is received + publisher.CloseAll() if msg.Error == nil { - // race between error and receiving an event // continue trying for error continue } @@ -249,7 +253,7 @@ func TestEventStream_RegionForward(t *testing.T) { require.NoError(t, err) node := mock.Node() - publisher.Publish(structs.Events{uint64(1), []structs.Event{{Topic: "test", Payload: node}}}) + publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}}) // send req encoder := codec.NewEncoder(p1, structs.MsgpackHandle) @@ -290,6 +294,114 @@ OUTER: } } -// TODO(drew) acl test func TestEventStream_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server + s, root, cleanupS := TestACLServer(t, nil) + defer cleanupS() + testutil.WaitForLeader(t, s.RPC) + + policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) + tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + + cases := []struct { + Name string + Token string + ExpectedErr string + }{ + { + Name: "no token", + Token: "", + ExpectedErr: structs.ErrPermissionDenied.Error(), + }, + { + Name: "bad token", + Token: tokenBad.SecretID, + ExpectedErr: structs.ErrPermissionDenied.Error(), + }, + { + Name: "root token", + Token: root.SecretID, + ExpectedErr: "subscription closed by server", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + // Create request for all topics and keys + req := structs.EventStreamRequest{ + Topics: map[structs.Topic][]string{"*": {"*"}}, + QueryOptions: structs.QueryOptions{ + Region: s.Region(), + AuthToken: tc.Token, + }, + } + + handler, err := s.StreamingRpcHandler("Event.Stream") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *structs.EventStreamWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg structs.EventStreamWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %w", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + publisher, err := s.State().EventPublisher() + require.NoError(err) + + // publish some events + node := mock.Node() + publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}}) + publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}}) + + timeout := time.After(5 * time.Second) + OUTER: + for { + select { + case <-timeout: + require.Fail("timeout waiting for response") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + // force error by closing all subscriptions + publisher.CloseAll() + if msg.Error == nil { + continue + } + + if strings.Contains(msg.Error.Error(), tc.ExpectedErr) { + break OUTER + } else { + require.Fail("Unexpected error", msg.Error) + } + } + } + }) + } } diff --git a/nomad/fsm.go b/nomad/fsm.go index 5bab15b5e..ead6d43e0 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -100,8 +100,9 @@ type nomadFSM struct { // state in a way that can be accessed concurrently with operations // that may modify the live state. type nomadSnapshot struct { - snap *state.StateSnapshot - timetable *TimeTable + snap *state.StateSnapshot + timetable *TimeTable + durableEventCount int } // snapshotHeader is the first entry in our snapshot @@ -129,6 +130,8 @@ type FSMConfig struct { EnableEventPublisher bool + EventBufferSize int64 + // Durable count specifies the amount of events generated by the state store // to save to disk during snapshot generation. The most recent events // limited to count will be saved. @@ -142,6 +145,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { Logger: config.Logger, Region: config.Region, EnablePublisher: config.EnableEventPublisher, + EventBufferSize: config.EventBufferSize, DurableEventCount: config.DurableEventCount, } state, err := state.NewStateStore(sconfig) @@ -258,21 +262,16 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { case structs.UpsertNodeEventsType: return n.applyUpsertNodeEvent(msgType, buf[1:], log.Index) case structs.JobBatchDeregisterRequestType: - // TODO(drew) higher priority event return n.applyBatchDeregisterJob(msgType, buf[1:], log.Index) case structs.AllocUpdateDesiredTransitionRequestType: - // TODO(drew) higher priority event - return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index) + return n.applyAllocUpdateDesiredTransition(msgType, buf[1:], log.Index) case structs.NodeUpdateEligibilityRequestType: - // TODO(drew) higher priority event - return n.applyNodeEligibilityUpdate(buf[1:], log.Index) + return n.applyNodeEligibilityUpdate(msgType, buf[1:], log.Index) case structs.BatchNodeUpdateDrainRequestType: - // TODO(drew) higher priority event - return n.applyBatchDrainUpdate(buf[1:], log.Index) + return n.applyBatchDrainUpdate(msgType, buf[1:], log.Index) case structs.SchedulerConfigRequestType: return n.applySchedulerConfigUpdate(buf[1:], log.Index) case structs.NodeBatchDeregisterRequestType: - // TODO(drew) higher priority event return n.applyDeregisterNodeBatch(buf[1:], log.Index) case structs.ClusterMetadataRequestType: return n.applyClusterMetadata(buf[1:], log.Index) @@ -437,21 +436,21 @@ func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, ind return nil } -func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyBatchDrainUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_node_drain_update"}, time.Now()) var req structs.BatchNodeUpdateDrainRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.BatchUpdateNodeDrain(index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil { + if err := n.state.BatchUpdateNodeDrain(msgType, index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil { n.logger.Error("BatchUpdateNodeDrain failed", "error", err) return err } return nil } -func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyNodeEligibilityUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "node_eligibility_update"}, time.Now()) var req structs.NodeUpdateEligibilityRequest if err := structs.Decode(buf, &req); err != nil { @@ -465,7 +464,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac return err } - if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil { + if err := n.state.UpdateNodeEligibility(msgType, index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil { n.logger.Error("UpdateNodeEligibility failed", "error", err) return err } @@ -857,14 +856,14 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt // applyAllocUpdateDesiredTransition is used to update the desired transitions // of a set of allocations. -func (n *nomadFSM) applyAllocUpdateDesiredTransition(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyAllocUpdateDesiredTransition(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transition"}, time.Now()) var req structs.AllocUpdateDesiredTransitionRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.UpdateAllocsDesiredTransitions(index, req.Allocs, req.Evals); err != nil { + if err := n.state.UpdateAllocsDesiredTransitions(msgType, index, req.Allocs, req.Evals); err != nil { n.logger.Error("UpdateAllocsDesiredTransitions failed", "error", err) return err } @@ -1262,8 +1261,9 @@ func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { } ns := &nomadSnapshot{ - snap: snap, - timetable: n.timetable, + snap: snap, + timetable: n.timetable, + durableEventCount: n.config.DurableEventCount, } return ns, nil } @@ -1276,6 +1276,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { Logger: n.config.Logger, Region: n.config.Region, EnablePublisher: n.config.EnableEventPublisher, + EventBufferSize: n.config.EventBufferSize, DurableEventCount: n.config.DurableEventCount, } newState, err := state.NewStateStore(config) @@ -1589,7 +1590,7 @@ func rehydratePublisherFromState(s *state.StateStore) error { break } e := raw.(*structs.Events) - pub.Publish(*e) + pub.Publish(e) } return nil } @@ -2376,11 +2377,8 @@ func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink, } func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error { - var durableCount int - if s.snap.Config() != nil && s.snap.Config().DurableEventCount == 0 { + if s.durableEventCount == 0 { return nil - } else { - durableCount = s.snap.Config().DurableEventCount } events, err := s.snap.LatestEventsReverse(nil) @@ -2409,7 +2407,7 @@ func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Enc count += eventCount // Only write to sink until durableCount has been reached - if count >= durableCount { + if count >= s.durableEventCount { return nil } } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index d832b797b..b1f8d83dd 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2,7 +2,6 @@ package nomad import ( "bytes" - "errors" "fmt" "reflect" "strings" @@ -57,6 +56,7 @@ func testFSM(t *testing.T) *nomadFSM { Logger: logger, Region: "global", EnableEventPublisher: true, + EventBufferSize: 100, } fsm, err := NewFSM(fsmConfig) if err != nil { @@ -3207,11 +3207,10 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) { // Add some state fsm := testFSM(t) fsm.config.EnableEventPublisher = true + // DurableEventCount = 4 each mock events wrapper contains 2 events + fsm.config.DurableEventCount = 4 state := fsm.State() - cfg := state.Config() - // DurableEventCount = 4 each mock events wrapper contains 2 events - cfg.DurableEventCount = 4 e1 := mock.Events(1000) e2 := mock.Events(1001) @@ -3249,11 +3248,13 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) { pub, err := state2.EventPublisher() require.NoError(t, err) + testutil.WaitForResult(func() (bool, error) { - if pub.Len() == 2 { + plen := pub.Len() + if plen == 4 { return true, nil } - return false, errors.New("expected publisher to be populated") + return false, fmt.Errorf("expected publisher to have len 2 got: %d", plen) }, func(err error) { require.Fail(t, err.Error()) }) @@ -3261,9 +3262,11 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) { func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) { t.Parallel() - // Add some state fsm := testFSM(t) - fsm.config.EnableEventPublisher = false + // Enable event publisher with durable event count of zero + fsm.config.EnableEventPublisher = true + fsm.config.DurableEventCount = 0 + state := fsm.State() e1 := mock.Events(1000) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 23baba2e4..0f685dd67 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1917,7 +1917,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { if time.Since(start) < 100*time.Millisecond { t.Fatalf("too fast") } - assert.EqualValues(200, resp3.Index) + assert.EqualValues(200, int(resp3.Index)) if assert.Len(resp3.Allocs, 1) { assert.EqualValues(100, resp3.Allocs[alloc1.ID]) } diff --git a/nomad/server.go b/nomad/server.go index dea6e29c1..29757dd3c 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1217,6 +1217,7 @@ func (s *Server) setupRaft() error { Logger: s.logger, Region: s.Region(), EnableEventPublisher: s.config.EnableEventPublisher, + EventBufferSize: s.config.EventBufferSize, } var err error s.fsm, err = NewFSM(fsmConfig) diff --git a/nomad/state/apply_plan_events.go b/nomad/state/apply_plan_events.go index 1132dafd7..9bf4b813d 100644 --- a/nomad/state/apply_plan_events.go +++ b/nomad/state/apply_plan_events.go @@ -6,14 +6,14 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { +func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) { var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "deployment": after, ok := change.After.(*structs.Deployment) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not a Deployment") + return nil, fmt.Errorf("transaction change was not a Deployment") } event := structs.Event{ @@ -29,7 +29,7 @@ func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (structs.Even case "evals": after, ok := change.After.(*structs.Evaluation) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not an Evaluation") + return nil, fmt.Errorf("transaction change was not an Evaluation") } event := structs.Event{ @@ -45,7 +45,7 @@ func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (structs.Even case "allocs": after, ok := change.After.(*structs.Allocation) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not an Allocation") + return nil, fmt.Errorf("transaction change was not an Allocation") } before := change.Before var msg string @@ -69,5 +69,5 @@ func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (structs.Even } } - return structs.Events{Index: changes.Index, Events: events}, nil + return &structs.Events{Index: changes.Index, Events: events}, nil } diff --git a/nomad/state/deployment_events.go b/nomad/state/deployment_events.go index 3f11d351d..01c51992c 100644 --- a/nomad/state/deployment_events.go +++ b/nomad/state/deployment_events.go @@ -6,7 +6,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes Changes) (structs.Events, error) { +func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes Changes) (*structs.Events, error) { var events []structs.Event var eventType string @@ -24,7 +24,7 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes case "deployment": after, ok := change.After.(*structs.Deployment) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not a Deployment") + return nil, fmt.Errorf("transaction change was not a Deployment") } event := structs.Event{ @@ -42,7 +42,7 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes case "jobs": after, ok := change.After.(*structs.Job) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not a Job") + return nil, fmt.Errorf("transaction change was not a Job") } event := structs.Event{ @@ -61,7 +61,7 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes case "evals": after, ok := change.After.(*structs.Evaluation) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not an Evaluation") + return nil, fmt.Errorf("transaction change was not an Evaluation") } event := structs.Event{ @@ -79,5 +79,5 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes } } - return structs.Events{Index: changes.Index, Events: events}, nil + return &structs.Events{Index: changes.Index, Events: events}, nil } diff --git a/nomad/state/events.go b/nomad/state/events.go index 52f9450cc..62060137d 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -17,21 +17,22 @@ const ( TopicNode structs.Topic = "Node" // TODO(drew) Node Events use TopicNode + Type - TypeNodeRegistration = "NodeRegistration" - TypeNodeDeregistration = "NodeDeregistration" - TypeNodeDrain = "NodeDrain" - TypeNodeEvent = "NodeEvent" + TypeNodeRegistration = "NodeRegistration" + TypeNodeDeregistration = "NodeDeregistration" + TypeNodeEligibilityUpdate = "NodeEligibility" + TypeNodeDrain = "NodeDrain" + TypeNodeEvent = "NodeEvent" - TypeDeploymentUpdate = "DeploymentStatusUpdate" - TypeDeploymentPromotion = "DeploymentPromotion" - TypeDeploymentAllocHealth = "DeploymentAllocHealth" - - TypeAllocCreated = "AllocCreated" - TypeAllocUpdated = "AllocUpdated" - - TypeEvalUpdated = "EvalUpdated" - - TypeJobRegistered = "JobRegistered" + TypeDeploymentUpdate = "DeploymentStatusUpdate" + TypeDeploymentPromotion = "DeploymentPromotion" + TypeDeploymentAllocHealth = "DeploymentAllocHealth" + TypeAllocCreated = "AllocCreated" + TypeAllocUpdated = "AllocUpdated" + TypeAllocUpdateDesiredStatus = "AllocUpdateDesiredStatus" + TypeEvalUpdated = "EvalUpdated" + TypeJobRegistered = "JobRegistered" + TypeJobDeregistered = "JobDeregistered" + TypeJobBatchDeregistered = "JobBatchDeregistered" ) type JobEvent struct { @@ -72,9 +73,13 @@ type JobDrainDetails struct { AllocDetails map[string]NodeDrainAllocDetails } -func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { +func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) { var eventType string switch changes.MsgType { + case structs.NodeRegisterRequestType: + eventType = TypeNodeRegistration + case structs.UpsertNodeEventsType: + eventType = TypeNodeEvent case structs.EvalUpdateRequestType: eventType = TypeEvalUpdated case structs.AllocClientUpdateRequestType: @@ -85,15 +90,31 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, erro eventType = TypeAllocUpdated case structs.NodeUpdateStatusRequestType: eventType = TypeNodeEvent + case structs.JobDeregisterRequestType: + eventType = TypeJobDeregistered + case structs.JobBatchDeregisterRequestType: + eventType = TypeJobBatchDeregistered + case structs.AllocUpdateDesiredTransitionRequestType: + eventType = TypeAllocUpdateDesiredStatus + case structs.NodeUpdateEligibilityRequestType: + eventType = TypeNodeDrain + case structs.BatchNodeUpdateDrainRequestType: + eventType = TypeNodeDrain + default: + // unknown request type + return nil, nil } var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "evals": + if change.Deleted() { + return nil, nil + } after, ok := change.After.(*structs.Evaluation) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not an Evaluation") + return nil, fmt.Errorf("transaction change was not an Evaluation") } event := structs.Event{ @@ -109,26 +130,36 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, erro events = append(events, event) case "allocs": + if change.Deleted() { + return nil, nil + } after, ok := change.After.(*structs.Allocation) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not an Allocation") + return nil, fmt.Errorf("transaction change was not an Allocation") } + alloc := after.Copy() + // remove job info to help keep size of alloc event down + alloc.Job = nil + event := structs.Event{ Topic: TopicAlloc, Type: eventType, Index: changes.Index, Key: after.ID, Payload: &AllocEvent{ - Alloc: after, + Alloc: alloc, }, } events = append(events, event) case "jobs": + if change.Deleted() { + return nil, nil + } after, ok := change.After.(*structs.Job) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not an Allocation") + return nil, fmt.Errorf("transaction change was not an Allocation") } event := structs.Event{ @@ -143,9 +174,12 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, erro events = append(events, event) case "nodes": + if change.Deleted() { + return nil, nil + } after, ok := change.After.(*structs.Node) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not a Node") + return nil, fmt.Errorf("transaction change was not a Node") } event := structs.Event{ @@ -161,5 +195,5 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, erro } } - return structs.Events{Index: changes.Index, Events: events}, nil + return &structs.Events{Index: changes.Index, Events: events}, nil } diff --git a/nomad/state/node_events.go b/nomad/state/node_events.go index e709f3cf3..1c85ec572 100644 --- a/nomad/state/node_events.go +++ b/nomad/state/node_events.go @@ -8,14 +8,14 @@ import ( // NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set // of transaction changes. -func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { +func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) { var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "nodes": after, ok := change.After.(*structs.Node) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not a Node") + return nil, fmt.Errorf("transaction change was not a Node") } event := structs.Event{ @@ -30,19 +30,19 @@ func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, events = append(events, event) } } - return structs.Events{Index: changes.Index, Events: events}, nil + return &structs.Events{Index: changes.Index, Events: events}, nil } // NodeDeregisterEventFromChanges generates a NodeDeregistrationEvent from a set // of transaction changes. -func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { +func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) { var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "nodes": before, ok := change.Before.(*structs.Node) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not a Node") + return nil, fmt.Errorf("transaction change was not a Node") } event := structs.Event{ @@ -57,50 +57,23 @@ func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) (structs.Events events = append(events, event) } } - return structs.Events{Index: changes.Index, Events: events}, nil + return &structs.Events{Index: changes.Index, Events: events}, nil } -// NodeEventFromChanges generates a NodeDeregistrationEvent from a set -// of transaction changes. -func NodeEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { +func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) { var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "nodes": after, ok := change.After.(*structs.Node) if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not a Node") - } - - event := structs.Event{ - Topic: TopicNode, - Type: TypeNodeEvent, - Index: changes.Index, - Key: after.ID, - Payload: &NodeEvent{ - Node: after, - }, - } - events = append(events, event) - } - } - return structs.Events{Index: changes.Index, Events: events}, nil -} - -func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { - var events []structs.Event - for _, change := range changes.Changes { - switch change.Table { - case "nodes": - after, ok := change.After.(*structs.Node) - if !ok { - return structs.Events{}, fmt.Errorf("transaction change was not a Node") + return nil, fmt.Errorf("transaction change was not a Node") } // retrieve allocations currently on node allocs, err := allocsByNodeTxn(tx, nil, after.ID) if err != nil { - return structs.Events{}, fmt.Errorf("retrieving allocations for node drain event: %w", err) + return nil, fmt.Errorf("retrieving allocations for node drain event: %w", err) } // build job/alloc details for node drain @@ -132,5 +105,5 @@ func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, err events = append(events, event) } } - return structs.Events{Index: changes.Index, Events: events}, nil + return &structs.Events{Index: changes.Index, Events: events}, nil } diff --git a/nomad/state/node_events_test.go b/nomad/state/node_events_test.go index bb24657b1..86874814c 100644 --- a/nomad/state/node_events_test.go +++ b/nomad/state/node_events_test.go @@ -175,7 +175,7 @@ func TestNodeEventsFromChanges(t *testing.T) { setupTx.Txn.Commit() } - tx := s.db.WriteTxn(100) + tx := s.db.WriteTxnMsgT(tc.MsgType, 100) require.NoError(t, tc.Mutate(s, tx)) changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType} @@ -186,6 +186,7 @@ func TestNodeEventsFromChanges(t *testing.T) { return } require.NoError(t, err) + require.NotNil(t, got) require.Equal(t, len(tc.WantEvents), len(got.Events)) for idx, g := range got.Events { diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index 6e4903f02..4e7248515 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -8,10 +8,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -const ( - CtxMsgType = "type" -) - // ReadTxn is implemented by memdb.Txn to perform read operations. type ReadTxn interface { Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) @@ -34,10 +30,9 @@ type Changes struct { // sent to the EventPublisher which will create and emit change events. type changeTrackerDB struct { db *memdb.MemDB - durableEvents bool - durableCount int + durableCount int64 publisher *stream.EventPublisher - processChanges func(ReadTxn, Changes) (structs.Events, error) + processChanges func(ReadTxn, Changes) (*structs.Events, error) } // ChangeConfig @@ -54,13 +49,13 @@ func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, chang db: db, publisher: publisher, processChanges: changesFn, - durableCount: cfg.DurableEventCount, + durableCount: int64(cfg.DurableEventCount), } } -type changeProcessor func(ReadTxn, Changes) (structs.Events, error) +type changeProcessor func(ReadTxn, Changes) (*structs.Events, error) -func noOpProcessChanges(ReadTxn, Changes) (structs.Events, error) { return structs.Events{}, nil } +func noOpProcessChanges(ReadTxn, Changes) (*structs.Events, error) { return nil, nil } // ReadTxn returns a read-only transaction which behaves exactly the same as // memdb.Txn @@ -92,27 +87,31 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { } func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64) *txn { + persistChanges := c.durableCount > 0 + t := &txn{ msgType: msgType, Txn: c.db.Txn(true), Index: idx, publish: c.publish, - persistChanges: c.durableEvents, + persistChanges: persistChanges, } t.Txn.TrackChanges() return t } -func (c *changeTrackerDB) publish(changes Changes) (structs.Events, error) { +func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) { readOnlyTx := c.db.Txn(false) defer readOnlyTx.Abort() events, err := c.processChanges(readOnlyTx, changes) if err != nil { - return structs.Events{}, fmt.Errorf("failed generating events from changes: %v", err) + return nil, fmt.Errorf("failed generating events from changes: %v", err) } - c.publisher.Publish(events) + if events != nil { + c.publisher.Publish(events) + } return events, nil } @@ -147,7 +146,7 @@ type txn struct { // Index is stored so that it may be passed along to any subscribers as part // of a change event. Index uint64 - publish func(changes Changes) (structs.Events, error) + publish func(changes Changes) (*structs.Events, error) } // Commit first pushes changes to EventPublisher, then calls Commit on the @@ -171,7 +170,7 @@ func (tx *txn) Commit() error { return err } - if tx.persistChanges { + if tx.persistChanges && events != nil { // persist events after processing changes err := tx.Txn.Insert("events", events) if err != nil { @@ -191,13 +190,13 @@ func (tx *txn) MsgType() structs.MessageType { return tx.msgType } -func processDBChanges(tx ReadTxn, changes Changes) (structs.Events, error) { +func processDBChanges(tx ReadTxn, changes Changes) (*structs.Events, error) { switch changes.MsgType { case structs.IgnoreUnknownTypeFlag: // unknown event type - return structs.Events{}, nil + return nil, nil case structs.NodeRegisterRequestType: - return NodeRegisterEventFromChanges(tx, changes) + return GenericEventsFromChanges(tx, changes) case structs.NodeUpdateStatusRequestType: // TODO(drew) test return GenericEventsFromChanges(tx, changes) @@ -206,7 +205,7 @@ func processDBChanges(tx ReadTxn, changes Changes) (structs.Events, error) { case structs.NodeUpdateDrainRequestType: return NodeDrainEventFromChanges(tx, changes) case structs.UpsertNodeEventsType: - return NodeEventFromChanges(tx, changes) + return GenericEventsFromChanges(tx, changes) case structs.DeploymentStatusUpdateRequestType: return DeploymentEventFromChanges(changes.MsgType, tx, changes) case structs.DeploymentPromoteRequestType: @@ -225,6 +224,21 @@ func processDBChanges(tx ReadTxn, changes Changes) (structs.Events, error) { case structs.AllocUpdateRequestType: // TODO(drew) test return GenericEventsFromChanges(tx, changes) + // case structs.JobDeregisterRequestType: + // TODO(drew) test / handle delete + // return GenericEventsFromChanges(tx, changes) + // case structs.JobBatchDeregisterRequestType: + // TODO(drew) test & handle delete + // return GenericEventsFromChanges(tx, changes) + case structs.AllocUpdateDesiredTransitionRequestType: + // TODO(drew) drain + return GenericEventsFromChanges(tx, changes) + case structs.NodeUpdateEligibilityRequestType: + // TODO(drew) test, drain + return GenericEventsFromChanges(tx, changes) + case structs.BatchNodeUpdateDrainRequestType: + // TODO(drew) test, drain + return GenericEventsFromChanges(tx, changes) } - return structs.Events{}, nil + return nil, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 4484c0f1b..0a2bf582f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -49,8 +49,11 @@ type StateStoreConfig struct { // EnablePublisher is used to enable or disable the event publisher EnablePublisher bool - // DurableEventCount is the amount of events to persist during the snapshot - // process. + // EventBufferSize configures the amount of events to hold in memory + EventBufferSize int64 + + // DurableEventCount is used to determine if events from transaction changes + // should be saved in go-memdb DurableEventCount int } @@ -96,12 +99,15 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { if config.EnablePublisher { cfg := &ChangeConfig{ - DurableEventCount: 1000, + DurableEventCount: config.DurableEventCount, } + + // Create new event publisher using provided config publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{ EventBufferTTL: 1 * time.Hour, - EventBufferSize: 250, + EventBufferSize: config.EventBufferSize, Logger: config.Logger, + OnEvict: s.eventPublisherEvict, }) s.db = NewChangeTrackerDB(db, publisher, processDBChanges, cfg) } else { @@ -123,6 +129,30 @@ func (s *StateStore) EventPublisher() (*stream.EventPublisher, error) { return s.db.publisher, nil } +// eventPublisherEvict is used as a callback to delete an evicted events +// entry from go-memdb. +func (s *StateStore) eventPublisherEvict(events *structs.Events) { + if err := s.deleteEvent(events); err != nil { + if err == memdb.ErrNotFound { + s.logger.Info("Evicted event was not found in go-memdb table", "event index", events.Index) + } else { + s.logger.Error("Error deleting event from events table", "error", err) + } + } +} + +func (s *StateStore) deleteEvent(events *structs.Events) error { + txn := s.db.db.Txn(true) + defer txn.Abort() + + if err := txn.Delete("events", events); err != nil { + return err + } + + txn.Commit() + return nil +} + // Config returns the state store configuration. func (s *StateStore) Config() *StateStoreConfig { return s.config @@ -948,7 +978,7 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update } // BatchUpdateNodeDrain is used to update the drain of a node set of nodes -func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { +func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { txn := s.db.WriteTxn(index) defer txn.Abort() for node, update := range updates { @@ -1030,9 +1060,9 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, } // UpdateNodeEligibility is used to update the scheduling eligibility of a node -func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error { +func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() // Lookup the node @@ -2919,8 +2949,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e return fmt.Errorf("setting job status failed: %v", err) } - txn.Commit() - return nil + return txn.Commit() } // EvalByID is used to lookup an eval by its ID @@ -3291,10 +3320,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation // UpdateAllocsDesiredTransitions is used to update a set of allocations // desired transitions. -func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[string]*structs.DesiredTransition, +func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, index uint64, allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() // Handle each of the updated allocations diff --git a/nomad/state/state_store_events_test.go b/nomad/state/state_store_events_test.go new file mode 100644 index 000000000..08a5b88b7 --- /dev/null +++ b/nomad/state/state_store_events_test.go @@ -0,0 +1,125 @@ +package state + +import ( + "errors" + "fmt" + "testing" + + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +// TestStateStore_Events_OnEvict tests that events in the state stores +// event publisher and go-memdb are evicted together when the event buffer +// size reaches its max. +func TestStateStore_Events_OnEvict(t *testing.T) { + t.Parallel() + + cfg := &StateStoreConfig{ + Logger: testlog.HCLogger(t), + Region: "global", + EnablePublisher: true, + EventBufferSize: 10, + DurableEventCount: 10, + } + s := TestStateStoreCfg(t, cfg) + + _, err := s.EventPublisher() + require.NoError(t, err) + + // force 3 evictions + for i := 1; i < 13; i++ { + require.NoError(t, + s.UpsertNodeMsgType(structs.NodeRegisterRequestType, uint64(i), mock.Node()), + ) + } + + get := func() []*structs.Events { + var out []*structs.Events + iter, err := s.Events(nil) + require.NoError(t, err) + for { + raw := iter.Next() + if raw == nil { + break + } + e := raw.(*structs.Events) + + out = append(out, e) + } + return out + } + + // event publisher is async so wait for it to prune + testutil.WaitForResult(func() (bool, error) { + out := get() + if len(out) != 10 { + return false, errors.New("Expected event count to be pruned to 10") + } + return true, nil + }, func(err error) { + require.Fail(t, err.Error()) + t.Fatalf("err: %s", err) + }) + + out := get() + require.Equal(t, 3, int(out[0].Index)) + +} + +// TestStateStore_Events_OnEvict_Missing tests behavior when the event publisher +// evicts an event and there is no corresponding go-memdb entry due to durability +// settings +func TestStateStore_Events_OnEvict_Missing(t *testing.T) { + t.Parallel() + + cfg := &StateStoreConfig{ + Logger: testlog.HCLogger(t), + Region: "global", + EnablePublisher: true, + EventBufferSize: 10, + DurableEventCount: 0, + } + s := TestStateStoreCfg(t, cfg) + + _, err := s.EventPublisher() + require.NoError(t, err) + + getEvents := func() []*structs.Events { + var out []*structs.Events + iter, err := s.Events(nil) + require.NoError(t, err) + for { + raw := iter.Next() + if raw == nil { + break + } + e := raw.(*structs.Events) + + out = append(out, e) + } + return out + } + + // Publish 13 events to fill buffer and force 3 evictions + for i := 1; i < 13; i++ { + require.NoError(t, + s.UpsertNodeMsgType(structs.NodeRegisterRequestType, uint64(i), mock.Node()), + ) + } + + // event publisher is async so wait for it to prune + testutil.WaitForResult(func() (bool, error) { + out := getEvents() + if len(out) != 0 { + return false, fmt.Errorf("Expected event count to be %d, got: %d", 0, len(out)) + } + return true, nil + }, func(err error) { + require.Fail(t, err.Error()) + t.Fatalf("err: %s", err) + }) +} diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index a6ed60a44..1c0f345f5 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -956,7 +956,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { n2.ID: event, } - require.Nil(state.BatchUpdateNodeDrain(1002, 7, update, events)) + require.Nil(state.BatchUpdateNodeDrain(structs.MsgTypeTestSetup, 1002, 7, update, events)) require.True(watchFired(ws)) ws = memdb.NewWatchSet() @@ -1190,7 +1190,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) { Subsystem: structs.NodeEventSubsystemCluster, Timestamp: time.Now(), } - require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, 7, event)) + require.Nil(state.UpdateNodeEligibility(structs.MsgTypeTestSetup, 1001, node.ID, expectedEligibility, 7, event)) require.True(watchFired(ws)) ws = memdb.NewWatchSet() @@ -1216,7 +1216,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) { require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, 7, nil)) // Try to set the node to eligible - err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, 9, nil) + err = state.UpdateNodeEligibility(structs.MsgTypeTestSetup, 1003, node.ID, structs.NodeSchedulingEligible, 9, nil) require.NotNil(err) require.Contains(err.Error(), "while it is draining") } @@ -5203,7 +5203,7 @@ func TestStateStore_UpdateAllocDesiredTransition(t *testing.T) { evals := []*structs.Evaluation{eval} m := map[string]*structs.DesiredTransition{alloc.ID: t1} - require.Nil(state.UpdateAllocsDesiredTransitions(1001, m, evals)) + require.Nil(state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, 1001, m, evals)) ws := memdb.NewWatchSet() out, err := state.AllocByID(ws, alloc.ID) @@ -5223,7 +5223,7 @@ func TestStateStore_UpdateAllocDesiredTransition(t *testing.T) { require.NotNil(eout) m = map[string]*structs.DesiredTransition{alloc.ID: t2} - require.Nil(state.UpdateAllocsDesiredTransitions(1002, m, evals)) + require.Nil(state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, 1002, m, evals)) ws = memdb.NewWatchSet() out, err = state.AllocByID(ws, alloc.ID) @@ -5239,7 +5239,7 @@ func TestStateStore_UpdateAllocDesiredTransition(t *testing.T) { // Try with a bogus alloc id m = map[string]*structs.DesiredTransition{uuid.Generate(): t2} - require.Nil(state.UpdateAllocsDesiredTransitions(1003, m, evals)) + require.Nil(state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, 1003, m, evals)) } func TestStateStore_JobSummary(t *testing.T) { diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 19421cf42..6aa9039f4 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -26,10 +26,9 @@ func TestStateStore(t testing.T) *StateStore { func TestStateStorePublisher(t testing.T) *StateStoreConfig { return &StateStoreConfig{ - Logger: testlog.HCLogger(t), - Region: "global", - EnablePublisher: true, - DurableEventCount: 100, + Logger: testlog.HCLogger(t), + Region: "global", + EnablePublisher: true, } } func TestStateStoreCfg(t testing.T, cfg *StateStoreConfig) *StateStore { diff --git a/nomad/stream/event_buffer.go b/nomad/stream/event_buffer.go index fda04fa4f..42c9dc3a4 100644 --- a/nomad/stream/event_buffer.go +++ b/nomad/stream/event_buffer.go @@ -10,6 +10,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +type EvictCallbackFn func(events *structs.Events) + // eventBuffer is a single-writer, multiple-reader, fixed length concurrent // buffer of events that have been published. The buffer is // the head and tail of an atomically updated single-linked list. Atomic @@ -28,8 +30,8 @@ import ( // goroutines or deliver to O(N) separate channels. // // Because eventBuffer is a linked list with atomically updated pointers, readers don't -// have to take a lock and can consume at their own pace. Slow readers can eventually -// append +// have to take a lock and can consume at their own pace. Slow readers will eventually +// be forced to reconnect to the lastest head by being notified via a bufferItem's droppedCh. // // A new buffer is constructed with a sentinel "empty" bufferItem that has a nil // Events array. This enables subscribers to start watching for the next update @@ -39,10 +41,9 @@ import ( // initialized with an empty bufferItem so can not be used to wait for the first // published event. Call newEventBuffer to construct a new buffer. // -// Calls to Append or AppendBuffer that mutate the head must be externally +// Calls to Append or purne that mutate the head must be externally // synchronized. This allows systems that already serialize writes to append -// without lock overhead (e.g. a snapshot goroutine appending thousands of -// events). +// without lock overhead. type eventBuffer struct { size *int64 @@ -51,18 +52,20 @@ type eventBuffer struct { maxSize int64 maxItemTTL time.Duration + onEvict EvictCallbackFn } // newEventBuffer creates an eventBuffer ready for use. -func newEventBuffer(size int64, maxItemTTL time.Duration) *eventBuffer { +func newEventBuffer(size int64, maxItemTTL time.Duration, onEvict EvictCallbackFn) *eventBuffer { zero := int64(0) b := &eventBuffer{ maxSize: size, size: &zero, maxItemTTL: maxItemTTL, + onEvict: onEvict, } - item := newBufferItem(structs.Events{Index: 0, Events: nil}) + item := newBufferItem(&structs.Events{Index: 0, Events: nil}) b.head.Store(item) b.tail.Store(item) @@ -74,8 +77,8 @@ func newEventBuffer(size int64, maxItemTTL time.Duration) *eventBuffer { // watchers. After calling append, the caller must not make any further // mutations to the events as they may have been exposed to subscribers in other // goroutines. Append only supports a single concurrent caller and must be -// externally synchronized with other Append, AppendBuffer or AppendErr calls. -func (b *eventBuffer) Append(events structs.Events) { +// externally synchronized with other Append, or prune calls. +func (b *eventBuffer) Append(events *structs.Events) { b.appendItem(newBufferItem(events)) } @@ -88,11 +91,10 @@ func (b *eventBuffer) appendItem(item *bufferItem) { b.tail.Store(item) // Increment the buffer size - size := atomic.AddInt64(b.size, 1) + atomic.AddInt64(b.size, int64(len(item.Events.Events))) - // Check if we need to advance the head to keep the list - // constrained to max size - if size > b.maxSize { + // Advance Head until we are under allowable size + for atomic.LoadInt64(b.size) > b.maxSize { b.advanceHead() } @@ -101,18 +103,43 @@ func (b *eventBuffer) appendItem(item *bufferItem) { } +func newSentinelItem() *bufferItem { + return newBufferItem(&structs.Events{Index: 0, Events: nil}) +} + // advanceHead drops the current Head buffer item and notifies readers // that the item should be discarded by closing droppedCh. // Slow readers will prevent the old head from being GC'd until they // discard it. func (b *eventBuffer) advanceHead() { old := b.Head() + next := old.link.next.Load() + // if the next item is nil replace it with a sentinel value + if next == nil { + next = newSentinelItem() + } + // notify readers that old is being dropped close(old.link.droppedCh) - b.head.Store(next) - atomic.AddInt64(b.size, -1) + // store the next value to head + b.head.Store(next) + + // If the old head is equal to the tail + // update the tail value as well + if old == b.Tail() { + b.tail.Store(next) + } + + // update the amount of events we have in the buffer + rmCount := len(old.Events.Events) + atomic.AddInt64(b.size, -int64(rmCount)) + + // Call evict callback if the item isn't a sentinel value + if b.onEvict != nil && old.Events.Index != 0 { + b.onEvict(old.Events) + } } // Head returns the current head of the buffer. It will always exist but it may @@ -137,10 +164,10 @@ func (b *eventBuffer) Tail() *bufferItem { // index as well as the offset between the requested index and returned one. func (b *eventBuffer) StartAtClosest(index uint64) (*bufferItem, int) { item := b.Head() - if index < item.Index { - return item, int(item.Index) - int(index) + if index < item.Events.Index { + return item, int(item.Events.Index) - int(index) } - if item.Index == index { + if item.Events.Index == index { return item, 0 } @@ -148,12 +175,12 @@ func (b *eventBuffer) StartAtClosest(index uint64) (*bufferItem, int) { prev := item item = item.NextNoBlock() if item == nil { - return prev, int(index) - int(prev.Index) + return prev, int(index) - int(prev.Events.Index) } - if index < item.Index { - return item, int(item.Index) - int(index) + if index < item.Events.Index { + return item, int(item.Events.Index) - int(index) } - if index == item.Index { + if index == item.Events.Index { return item, 0 } } @@ -168,13 +195,14 @@ func (b *eventBuffer) Len() int { // is no longer expired. It should be externally synchronized as it mutates // the buffer of items. func (b *eventBuffer) prune() { + now := time.Now() for { head := b.Head() if b.Len() == 0 { return } - if time.Since(head.createdAt) > b.maxItemTTL { + if now.Sub(head.createdAt) > b.maxItemTTL { b.advanceHead() } else { return @@ -202,9 +230,7 @@ type bufferItem struct { // should check and skip nil Events at any point in the buffer. It will also // be nil if the producer appends an Error event because they can't complete // the request to populate the buffer. Err will be non-nil in this case. - Events []structs.Event - - Index uint64 + Events *structs.Events // Err is non-nil if the producer can't complete their task and terminates the // buffer. Subscribers should return the error to clients and cease attempting @@ -239,14 +265,13 @@ type bufferLink struct { // newBufferItem returns a blank buffer item with a link and chan ready to have // the fields set and be appended to a buffer. -func newBufferItem(events structs.Events) *bufferItem { +func newBufferItem(events *structs.Events) *bufferItem { return &bufferItem{ link: &bufferLink{ ch: make(chan struct{}), droppedCh: make(chan struct{}), }, - Events: events.Events, - Index: events.Index, + Events: events, createdAt: time.Now(), } } @@ -258,13 +283,15 @@ func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*buf // state change (chan nil) as that's not threadsafe but detecting close is. select { case <-ctx.Done(): - return nil, fmt.Errorf("waiting for next event: %w", ctx.Err()) + return nil, ctx.Err() case <-forceClose: return nil, fmt.Errorf("subscription closed") case <-i.link.ch: } // Check if the reader is too slow and the event buffer as discarded the event + // This must happen after the above select to prevent a random selection + // between linkCh and droppedCh select { case <-i.link.droppedCh: return nil, fmt.Errorf("event dropped from buffer") @@ -293,16 +320,3 @@ func (i *bufferItem) NextNoBlock() *bufferItem { } return nextRaw.(*bufferItem) } - -// NextLink returns either the next item in the buffer if there is one, or -// an empty item (that will be ignored by subscribers) that has a pointer to -// the same link as this bufferItem (but none of the bufferItem content). -// When the link.ch is closed, subscriptions will be notified of the next item. -func (i *bufferItem) NextLink() *bufferItem { - next := i.NextNoBlock() - if next == nil { - // Return an empty item that can be followed to the next item published. - return &bufferItem{link: i.link} - } - return next -} diff --git a/nomad/stream/event_buffer_test.go b/nomad/stream/event_buffer_test.go index 5b1b6e623..84f0b8524 100644 --- a/nomad/stream/event_buffer_test.go +++ b/nomad/stream/event_buffer_test.go @@ -3,11 +3,12 @@ package stream import ( "context" "fmt" - "github.com/hashicorp/nomad/nomad/structs" "math/rand" "testing" "time" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -16,7 +17,7 @@ func TestEventBufferFuzz(t *testing.T) { nReaders := 1000 nMessages := 1000 - b := newEventBuffer(1000, DefaultTTL) + b := newEventBuffer(1000, DefaultTTL, nil) // Start a write goroutine that will publish 10000 messages with sequential // indexes and some jitter in timing (to allow clients to "catch up" and block @@ -35,7 +36,7 @@ func TestEventBufferFuzz(t *testing.T) { e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(structs.Events{Index: uint64(i), Events: []structs.Event{e}}) + b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}}) // Sleep sometimes for a while to let some subscribers catch up wait := time.Duration(z.Uint64()) * time.Millisecond time.Sleep(wait) @@ -61,9 +62,9 @@ func TestEventBufferFuzz(t *testing.T) { expect, err) return } - if item.Events[0].Index != expect { + if item.Events.Events[0].Index != expect { errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i, - expect, item.Events[0].Index) + expect, item.Events.Events[0].Index) return } expect++ @@ -84,14 +85,13 @@ func TestEventBufferFuzz(t *testing.T) { } func TestEventBuffer_Slow_Reader(t *testing.T) { - - b := newEventBuffer(10, DefaultTTL) + b := newEventBuffer(10, DefaultTTL, nil) for i := 0; i < 10; i++ { e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(structs.Events{uint64(i), []structs.Event{e}}) + b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}}) } head := b.Head() @@ -100,7 +100,7 @@ func TestEventBuffer_Slow_Reader(t *testing.T) { e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(structs.Events{uint64(i), []structs.Event{e}}) + b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}}) } // Ensure the slow reader errors to handle dropped events and @@ -110,17 +110,17 @@ func TestEventBuffer_Slow_Reader(t *testing.T) { require.Nil(t, ev) newHead := b.Head() - require.Equal(t, 4, int(newHead.Index)) + require.Equal(t, 5, int(newHead.Events.Index)) } func TestEventBuffer_Size(t *testing.T) { - b := newEventBuffer(100, DefaultTTL) + b := newEventBuffer(100, DefaultTTL, nil) for i := 0; i < 10; i++ { e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(structs.Events{uint64(i), []structs.Event{e}}) + b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}}) } require.Equal(t, 10, b.Len()) @@ -130,26 +130,46 @@ func TestEventBuffer_Size(t *testing.T) { // are past their TTL, the event buffer should prune down to the last message // and hold onto the last item. func TestEventBuffer_Prune_AllOld(t *testing.T) { - b := newEventBuffer(100, 1*time.Second) + b := newEventBuffer(100, 1*time.Second, nil) for i := 0; i < 10; i++ { e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(structs.Events{uint64(i), []structs.Event{e}}) + b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}}) } require.Equal(t, 10, int(b.Len())) time.Sleep(1 * time.Second) + // prune old messages, which will bring the event buffer down + // to a single sentinel value b.prune() - require.Equal(t, 9, int(b.Head().Index)) + // head and tail are now a sentinel value + head := b.Head() + tail := b.Tail() + require.Equal(t, 0, int(head.Events.Index)) require.Equal(t, 0, b.Len()) + require.Equal(t, head, tail) + + e := structs.Event{ + Index: uint64(100), + } + b.Append(&structs.Events{Index: uint64(100), Events: []structs.Event{e}}) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second)) + defer cancel() + + next, err := head.Next(ctx, make(chan struct{})) + require.NoError(t, err) + require.NotNil(t, next) + require.Equal(t, uint64(100), next.Events.Index) + } -func TestStartAt_CurrentIdx_Past_Start(t *testing.T) { +func TestEventBuffer_StartAt_CurrentIdx_Past_Start(t *testing.T) { cases := []struct { desc string req uint64 @@ -183,20 +203,43 @@ func TestStartAt_CurrentIdx_Past_Start(t *testing.T) { } // buffer starts at index 11 goes to 100 - b := newEventBuffer(100, 1*time.Hour) + b := newEventBuffer(100, 1*time.Hour, nil) for i := 11; i <= 100; i++ { e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(structs.Events{uint64(i), []structs.Event{e}}) + b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}}) } for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { got, offset := b.StartAtClosest(tc.req) - require.Equal(t, int(tc.expected), int(got.Index)) + require.Equal(t, int(tc.expected), int(got.Events.Index)) require.Equal(t, tc.offset, offset) }) } } + +func TestEventBuffer_OnEvict(t *testing.T) { + called := make(chan struct{}) + testOnEvict := func(events *structs.Events) { + close(called) + } + b := newEventBuffer(2, DefaultTTL, testOnEvict) + + // start at 1 since new event buffer is built with a starting sentinel value + for i := 1; i < 4; i++ { + e := structs.Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}}) + } + + select { + case <-called: + // testOnEvict called + case <-time.After(100 * time.Millisecond): + require.Fail(t, "expected testOnEvict to be called") + } +} diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go index 45b8ef856..f28131ddd 100644 --- a/nomad/stream/event_publisher.go +++ b/nomad/stream/event_publisher.go @@ -18,6 +18,7 @@ type EventPublisherCfg struct { EventBufferSize int64 EventBufferTTL time.Duration Logger hclog.Logger + OnEvict EvictCallbackFn } type EventPublisher struct { @@ -27,10 +28,6 @@ type EventPublisher struct { // eventBuf stores a configurable amount of events in memory eventBuf *eventBuffer - // pruneTick is the duration to periodically prune events from the event - // buffer. Defaults to 5s - pruneTick time.Duration - logger hclog.Logger subscriptions *subscriptions @@ -38,7 +35,7 @@ type EventPublisher struct { // publishCh is used to send messages from an active txn to a goroutine which // publishes events, so that publishing can happen asynchronously from // the Commit call in the FSM hot path. - publishCh chan structs.Events + publishCh chan *structs.Events } type subscriptions struct { @@ -63,19 +60,22 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish cfg.Logger = hclog.NewNullLogger() } - buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL) + // Set the event buffer size to a minimum + if cfg.EventBufferSize == 0 { + cfg.EventBufferSize = 100 + } + + buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL, cfg.OnEvict) e := &EventPublisher{ logger: cfg.Logger.Named("event_publisher"), eventBuf: buffer, - publishCh: make(chan structs.Events, 64), + publishCh: make(chan *structs.Events, 64), subscriptions: &subscriptions{ byToken: make(map[string]map[*SubscribeRequest]*Subscription), }, - pruneTick: 5 * time.Second, } go e.handleUpdates(ctx) - go e.periodicPrune(ctx) return e } @@ -85,7 +85,7 @@ func (e *EventPublisher) Len() int { } // Publish events to all subscribers of the event Topic. -func (e *EventPublisher) Publish(events structs.Events) { +func (e *EventPublisher) Publish(events *structs.Events) { if len(events.Events) > 0 { e.publishCh <- events } @@ -104,11 +104,11 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) head = e.eventBuf.Head() } if offset > 0 { - e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Index)) + e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Events.Index)) } // Empty head so that calling Next on sub - start := newBufferItem(structs.Events{Index: req.Index}) + start := newBufferItem(&structs.Events{Index: req.Index}) start.link.next.Store(head) close(start.link.ch) @@ -118,6 +118,10 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) return sub, nil } +func (e *EventPublisher) CloseAll() { + e.subscriptions.closeAll() +} + func (e *EventPublisher) handleUpdates(ctx context.Context) { for { select { @@ -130,21 +134,8 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) { } } -func (e *EventPublisher) periodicPrune(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case <-time.After(e.pruneTick): - e.lock.Lock() - e.eventBuf.prune() - e.lock.Unlock() - } - } -} - // sendEvents sends the given events to the publishers event buffer. -func (e *EventPublisher) sendEvents(update structs.Events) { +func (e *EventPublisher) sendEvents(update *structs.Events) { e.lock.Lock() defer e.lock.Unlock() diff --git a/nomad/stream/event_publisher_test.go b/nomad/stream/event_publisher_test.go index 1cdd0f347..af92d0595 100644 --- a/nomad/stream/event_publisher_test.go +++ b/nomad/stream/event_publisher_test.go @@ -2,10 +2,11 @@ package stream import ( "context" - "github.com/hashicorp/nomad/nomad/structs" "testing" "time" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) @@ -32,7 +33,7 @@ func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) { Key: "sub-key", Payload: "sample payload", }} - publisher.Publish(structs.Events{Index: 1, Events: events}) + publisher.Publish(&structs.Events{Index: 1, Events: events}) // Subscriber should see the published event result := nextResult(t, eventCh) @@ -50,7 +51,7 @@ func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) { Key: "sub-key", Payload: "sample payload 2", }} - publisher.Publish(structs.Events{Index: 2, Events: events}) + publisher.Publish(&structs.Events{Index: 2, Events: events}) result = nextResult(t, eventCh) require.NoError(t, result.Err) diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go index 555be5834..0d2b8fe39 100644 --- a/nomad/stream/subscription.go +++ b/nomad/stream/subscription.go @@ -76,11 +76,11 @@ func (s *Subscription) Next(ctx context.Context) (structs.Events, error) { } s.currentItem = next - events := filter(s.req, next.Events) + events := filter(s.req, next.Events.Events) if len(events) == 0 { continue } - return structs.Events{Index: next.Index, Events: events}, nil + return structs.Events{Index: next.Events.Index, Events: events}, nil } } @@ -96,7 +96,7 @@ func (s *Subscription) NextNoBlock() ([]structs.Event, error) { } s.currentItem = next - events := filter(s.req, next.Events) + events := filter(s.req, next.Events.Events) if len(events) == 0 { continue }