Add EvictCallbackFn to handle removing entries from go-memdb when they

are removed from the event buffer.

Wire up event buffer size config, use pointers for structs.Events
instead of copying.
This commit is contained in:
Drew Bailey
2020-10-06 16:21:58 -04:00
parent 1288b18b27
commit 39ef3263ca
33 changed files with 669 additions and 278 deletions

View File

@@ -6,7 +6,7 @@ import (
"fmt"
)
// Ebvents is a set of events for a corresponding index. Events returned for the
// Events is a set of events for a corresponding index. Events returned for the
// index depend on which topics are subscribed to when a request is made.
type Events struct {
Index uint64
@@ -78,9 +78,9 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind
// Decode next newline delimited json of events
var events Events
if err := dec.Decode(&events); err != nil {
// set error and fallthrough to
// select eventsCh
events = Events{Err: err}
eventsCh <- &events
return
}
if events.IsHeartbeat() {
continue

View File

@@ -243,8 +243,11 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if agentConfig.Server.UpgradeVersion != "" {
conf.UpgradeVersion = agentConfig.Server.UpgradeVersion
}
if agentConfig.Server.EnableEventPublisher {
conf.EnableEventPublisher = agentConfig.Server.EnableEventPublisher
if agentConfig.Server.EnableEventPublisher != nil {
conf.EnableEventPublisher = *agentConfig.Server.EnableEventPublisher
}
if agentConfig.Server.EventBufferSize > 0 {
conf.EventBufferSize = int64(agentConfig.Server.EventBufferSize)
}
if agentConfig.Autopilot != nil {
if agentConfig.Autopilot.CleanupDeadServers != nil {

View File

@@ -94,7 +94,6 @@ func (c *Command) readConfig() *Config {
flags.Var((*flaghelper.StringFlag)(&cmdConfig.Server.ServerJoin.StartJoin), "join", "")
flags.Var((*flaghelper.StringFlag)(&cmdConfig.Server.ServerJoin.RetryJoin), "retry-join", "")
flags.IntVar(&cmdConfig.Server.ServerJoin.RetryMaxAttempts, "retry-max", 0, "")
flags.BoolVar(&cmdConfig.Server.EnableEventPublisher, "event-publisher", false, "")
flags.Var((flaghelper.FuncDurationVar)(func(d time.Duration) error {
cmdConfig.Server.ServerJoin.RetryInterval = d
return nil
@@ -597,7 +596,6 @@ func (c *Command) AutocompleteFlags() complete.Flags {
"-vault-tls-server-name": complete.PredictAnything,
"-acl-enabled": complete.PredictNothing,
"-acl-replication-token": complete.PredictAnything,
"-event-publisher": complete.PredictNothing,
}
}
@@ -1280,9 +1278,6 @@ Server Options:
-rejoin
Ignore a previous leave and attempts to rejoin the cluster.
-event-publisher
Whether to enable or disable the servers event publisher.
Client Options:
-client

View File

@@ -486,7 +486,12 @@ type ServerConfig struct {
// EnableEventPublisher configures whether this server's state store
// will generate events for its event stream.
EnableEventPublisher bool `hcl:"enable_event_publisher"`
EnableEventPublisher *bool `hcl:"enable_event_publisher"`
// EventBufferSize configure the amount of events to be held in memory.
// If EnableEventPublisher is set to true, the minimum allowable value
// for the EventBufferSize is 1.
EventBufferSize int `hcl:"event_buffer_size"`
// DurableEventCount specifies the amount of events to persist during snapshot generation.
// A count of 0 signals that no events should be persisted.
@@ -883,7 +888,8 @@ func DefaultConfig() *Config {
},
Server: &ServerConfig{
Enabled: false,
EnableEventPublisher: true,
EnableEventPublisher: helper.BoolToPtr(true),
EventBufferSize: 100,
DurableEventCount: 100,
StartJoin: []string{},
ServerJoin: &ServerJoin{
@@ -1409,8 +1415,12 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin)
}
if b.EnableEventPublisher {
result.EnableEventPublisher = true
if b.EnableEventPublisher != nil {
result.EnableEventPublisher = b.EnableEventPublisher
}
if b.EventBufferSize != 0 {
result.EventBufferSize = b.EventBufferSize
}
if b.DurableEventCount != 0 {

View File

@@ -122,7 +122,8 @@ var basicConfig = &Config{
RedundancyZone: "foo",
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
EnableEventPublisher: true,
EnableEventPublisher: helper.BoolToPtr(false),
EventBufferSize: 200,
DurableEventCount: 100,
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},

View File

@@ -138,6 +138,7 @@ func TestConfig_Merge(t *testing.T) {
MaxHeartbeatsPerSecond: 30.0,
RedundancyZone: "foo",
UpgradeVersion: "foo",
EnableEventPublisher: helper.BoolToPtr(false),
},
ACL: &ACLConfig{
Enabled: true,
@@ -328,6 +329,7 @@ func TestConfig_Merge(t *testing.T) {
NonVotingServer: true,
RedundancyZone: "bar",
UpgradeVersion: "bar",
EnableEventPublisher: helper.BoolToPtr(true),
},
ACL: &ACLConfig{
Enabled: true,
@@ -1163,3 +1165,41 @@ func TestTelemetry_Parse(t *testing.T) {
require.Exactly([]string{"+nomad.raft"}, config.Telemetry.PrefixFilter)
require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics)
}
func TestEventPublisher_Parse(t *testing.T) {
require := require.New(t)
{
a := &ServerConfig{
EnableEventPublisher: helper.BoolToPtr(false),
}
b := DefaultConfig().Server
b.EnableEventPublisher = nil
result := a.Merge(b)
require.Equal(false, *result.EnableEventPublisher)
}
{
a := &ServerConfig{
EnableEventPublisher: helper.BoolToPtr(true),
}
b := DefaultConfig().Server
b.EnableEventPublisher = nil
result := a.Merge(b)
require.Equal(true, *result.EnableEventPublisher)
}
{
a := &ServerConfig{
EnableEventPublisher: helper.BoolToPtr(false),
}
b := DefaultConfig().Server
b.EnableEventPublisher = helper.BoolToPtr(true)
result := a.Merge(b)
require.Equal(true, *result.EnableEventPublisher)
}
}

View File

@@ -74,7 +74,7 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i
output := ioutils.NewWriteFlusher(resp)
// create an error channel to handle errors
errCh := make(chan HTTPCodedError, 2)
errCh := make(chan HTTPCodedError, 1)
go func() {
defer cancel()
@@ -124,9 +124,7 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i
cancel()
codedErr := <-errCh
if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error())) {
if codedErr != nil && strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error()) {
codedErr = nil
}

View File

@@ -39,7 +39,7 @@ func TestEventStream(t *testing.T) {
pub, err := s.Agent.server.State().EventPublisher()
require.NoError(t, err)
pub.Publish(structs.Events{Index: 100, Events: []structs.Event{{Payload: testEvent{ID: "123"}}}})
pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Payload: testEvent{ID: "123"}}}})
testutil.WaitForResult(func() (bool, error) {
got := resp.Body.String()
@@ -56,7 +56,6 @@ func TestEventStream(t *testing.T) {
// wait for response to close to prevent race between subscription
// shutdown and server shutdown returning subscription closed by server err
// resp.Close()
cancel()
select {
case err := <-respErrCh:

View File

@@ -130,7 +130,8 @@ server {
upgrade_version = "0.8.0"
encrypt = "abc"
raft_multiplier = 4
enable_event_publisher = true
enable_event_publisher = false
event_buffer_size = 200
durable_event_count = 100
server_join {

View File

@@ -261,7 +261,8 @@
"data_dir": "/tmp/data",
"deployment_gc_threshold": "12h",
"enabled": true,
"enable_event_publisher": true,
"enable_event_publisher": false,
"event_buffer_size": 200,
"durable_event_count": 100,
"enabled_schedulers": [
"test"

View File

@@ -78,10 +78,13 @@ type Config struct {
// in the absence of ACLs
EnableDebug bool
// EnableEventPublisher is used to enable or disable the state stores
// EnableEventPublisher is used to enable or disable state store
// event publishing
EnableEventPublisher bool
// EventBufferSize is the amount of events to hold in memory.
EventBufferSize int64
// LogOutput is the location to write logs to. If this is not set,
// logs will go to stderr.
LogOutput io.Writer

View File

@@ -38,7 +38,7 @@ func (m *mockBackend) nextIndex() uint64 {
func (m *mockBackend) UpdateAllocDesiredTransition(u *structs.AllocUpdateDesiredTransitionRequest) (uint64, error) {
m.Called(u)
i := m.nextIndex()
return i, m.state.UpdateAllocsDesiredTransitions(i, u.Allocs, u.Evals)
return i, m.state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, i, u.Allocs, u.Evals)
}
// matchUpdateAllocDesiredTransitions is used to match an upsert request

View File

@@ -55,14 +55,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
return
}
// TODO(drew) handle streams without ACLS
reqToken := args.AuthToken
if reqToken == "" {
// generate a random request token
reqToken = uuid.Generate()
// authToken is passed to the subscribe request so the event stream
// can handle closing a subscription if the authToken expires.
// If ACLs are disabled, a random token is generated and it will
// never be closed due to expiry.
authToken := args.AuthToken
if authToken == "" {
authToken = uuid.Generate()
}
subReq := &stream.SubscribeRequest{
Token: reqToken,
Token: authToken,
Topics: args.Topics,
Index: uint64(args.Index),
}

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
@@ -28,7 +29,7 @@ func TestEventStream(t *testing.T) {
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": []string{"*"}},
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
@@ -47,7 +48,7 @@ func TestEventStream(t *testing.T) {
// invoke handler
go handler(p2)
// send request
// decode request responses
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
@@ -68,8 +69,9 @@ func TestEventStream(t *testing.T) {
require.NoError(t, err)
node := mock.Node()
publisher.Publish(structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
// Send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
@@ -160,14 +162,14 @@ func TestEventStream_StreamErr(t *testing.T) {
require.NoError(t, err)
node := mock.Node()
publisher.Publish(structs.Events{uint64(1), []structs.Event{{Topic: "test", Payload: node}}})
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
// stop the publisher to force an error on subscription side
s1.State().StopEventPublisher()
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
timeout := time.After(5 * time.Second)
OUTER:
@@ -178,8 +180,10 @@ OUTER:
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
// close the publishers subscriptions forcing an error
// after an initial event is received
publisher.CloseAll()
if msg.Error == nil {
// race between error and receiving an event
// continue trying for error
continue
}
@@ -249,7 +253,7 @@ func TestEventStream_RegionForward(t *testing.T) {
require.NoError(t, err)
node := mock.Node()
publisher.Publish(structs.Events{uint64(1), []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
@@ -290,6 +294,114 @@ OUTER:
}
}
// TODO(drew) acl test
func TestEventStream_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
// start server
s, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
cases := []struct {
Name string
Token string
ExpectedErr string
}{
{
Name: "no token",
Token: "",
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "root token",
Token: root.SecretID,
ExpectedErr: "subscription closed by server",
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s.Region(),
AuthToken: tc.Token,
},
}
handler, err := s.StreamingRpcHandler("Event.Stream")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
publisher, err := s.State().EventPublisher()
require.NoError(err)
// publish some events
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
require.Fail("timeout waiting for response")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
// force error by closing all subscriptions
publisher.CloseAll()
if msg.Error == nil {
continue
}
if strings.Contains(msg.Error.Error(), tc.ExpectedErr) {
break OUTER
} else {
require.Fail("Unexpected error", msg.Error)
}
}
}
})
}
}

View File

@@ -100,8 +100,9 @@ type nomadFSM struct {
// state in a way that can be accessed concurrently with operations
// that may modify the live state.
type nomadSnapshot struct {
snap *state.StateSnapshot
timetable *TimeTable
snap *state.StateSnapshot
timetable *TimeTable
durableEventCount int
}
// snapshotHeader is the first entry in our snapshot
@@ -129,6 +130,8 @@ type FSMConfig struct {
EnableEventPublisher bool
EventBufferSize int64
// Durable count specifies the amount of events generated by the state store
// to save to disk during snapshot generation. The most recent events
// limited to count will be saved.
@@ -142,6 +145,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) {
Logger: config.Logger,
Region: config.Region,
EnablePublisher: config.EnableEventPublisher,
EventBufferSize: config.EventBufferSize,
DurableEventCount: config.DurableEventCount,
}
state, err := state.NewStateStore(sconfig)
@@ -258,21 +262,16 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(msgType, buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType:
// TODO(drew) higher priority event
return n.applyBatchDeregisterJob(msgType, buf[1:], log.Index)
case structs.AllocUpdateDesiredTransitionRequestType:
// TODO(drew) higher priority event
return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index)
return n.applyAllocUpdateDesiredTransition(msgType, buf[1:], log.Index)
case structs.NodeUpdateEligibilityRequestType:
// TODO(drew) higher priority event
return n.applyNodeEligibilityUpdate(buf[1:], log.Index)
return n.applyNodeEligibilityUpdate(msgType, buf[1:], log.Index)
case structs.BatchNodeUpdateDrainRequestType:
// TODO(drew) higher priority event
return n.applyBatchDrainUpdate(buf[1:], log.Index)
return n.applyBatchDrainUpdate(msgType, buf[1:], log.Index)
case structs.SchedulerConfigRequestType:
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
case structs.NodeBatchDeregisterRequestType:
// TODO(drew) higher priority event
return n.applyDeregisterNodeBatch(buf[1:], log.Index)
case structs.ClusterMetadataRequestType:
return n.applyClusterMetadata(buf[1:], log.Index)
@@ -437,21 +436,21 @@ func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, ind
return nil
}
func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyBatchDrainUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_node_drain_update"}, time.Now())
var req structs.BatchNodeUpdateDrainRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.BatchUpdateNodeDrain(index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil {
if err := n.state.BatchUpdateNodeDrain(msgType, index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil {
n.logger.Error("BatchUpdateNodeDrain failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyNodeEligibilityUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_eligibility_update"}, time.Now())
var req structs.NodeUpdateEligibilityRequest
if err := structs.Decode(buf, &req); err != nil {
@@ -465,7 +464,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
return err
}
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeEligibility(msgType, index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeEligibility failed", "error", err)
return err
}
@@ -857,14 +856,14 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt
// applyAllocUpdateDesiredTransition is used to update the desired transitions
// of a set of allocations.
func (n *nomadFSM) applyAllocUpdateDesiredTransition(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyAllocUpdateDesiredTransition(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transition"}, time.Now())
var req structs.AllocUpdateDesiredTransitionRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateAllocsDesiredTransitions(index, req.Allocs, req.Evals); err != nil {
if err := n.state.UpdateAllocsDesiredTransitions(msgType, index, req.Allocs, req.Evals); err != nil {
n.logger.Error("UpdateAllocsDesiredTransitions failed", "error", err)
return err
}
@@ -1262,8 +1261,9 @@ func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
}
ns := &nomadSnapshot{
snap: snap,
timetable: n.timetable,
snap: snap,
timetable: n.timetable,
durableEventCount: n.config.DurableEventCount,
}
return ns, nil
}
@@ -1276,6 +1276,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
Logger: n.config.Logger,
Region: n.config.Region,
EnablePublisher: n.config.EnableEventPublisher,
EventBufferSize: n.config.EventBufferSize,
DurableEventCount: n.config.DurableEventCount,
}
newState, err := state.NewStateStore(config)
@@ -1589,7 +1590,7 @@ func rehydratePublisherFromState(s *state.StateStore) error {
break
}
e := raw.(*structs.Events)
pub.Publish(*e)
pub.Publish(e)
}
return nil
}
@@ -2376,11 +2377,8 @@ func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink,
}
func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error {
var durableCount int
if s.snap.Config() != nil && s.snap.Config().DurableEventCount == 0 {
if s.durableEventCount == 0 {
return nil
} else {
durableCount = s.snap.Config().DurableEventCount
}
events, err := s.snap.LatestEventsReverse(nil)
@@ -2409,7 +2407,7 @@ func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Enc
count += eventCount
// Only write to sink until durableCount has been reached
if count >= durableCount {
if count >= s.durableEventCount {
return nil
}
}

View File

@@ -2,7 +2,6 @@ package nomad
import (
"bytes"
"errors"
"fmt"
"reflect"
"strings"
@@ -57,6 +56,7 @@ func testFSM(t *testing.T) *nomadFSM {
Logger: logger,
Region: "global",
EnableEventPublisher: true,
EventBufferSize: 100,
}
fsm, err := NewFSM(fsmConfig)
if err != nil {
@@ -3207,11 +3207,10 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) {
// Add some state
fsm := testFSM(t)
fsm.config.EnableEventPublisher = true
// DurableEventCount = 4 each mock events wrapper contains 2 events
fsm.config.DurableEventCount = 4
state := fsm.State()
cfg := state.Config()
// DurableEventCount = 4 each mock events wrapper contains 2 events
cfg.DurableEventCount = 4
e1 := mock.Events(1000)
e2 := mock.Events(1001)
@@ -3249,11 +3248,13 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) {
pub, err := state2.EventPublisher()
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
if pub.Len() == 2 {
plen := pub.Len()
if plen == 4 {
return true, nil
}
return false, errors.New("expected publisher to be populated")
return false, fmt.Errorf("expected publisher to have len 2 got: %d", plen)
}, func(err error) {
require.Fail(t, err.Error())
})
@@ -3261,9 +3262,11 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) {
func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
fsm.config.EnableEventPublisher = false
// Enable event publisher with durable event count of zero
fsm.config.EnableEventPublisher = true
fsm.config.DurableEventCount = 0
state := fsm.State()
e1 := mock.Events(1000)

View File

@@ -1917,7 +1917,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) {
if time.Since(start) < 100*time.Millisecond {
t.Fatalf("too fast")
}
assert.EqualValues(200, resp3.Index)
assert.EqualValues(200, int(resp3.Index))
if assert.Len(resp3.Allocs, 1) {
assert.EqualValues(100, resp3.Allocs[alloc1.ID])
}

View File

@@ -1217,6 +1217,7 @@ func (s *Server) setupRaft() error {
Logger: s.logger,
Region: s.Region(),
EnableEventPublisher: s.config.EnableEventPublisher,
EventBufferSize: s.config.EventBufferSize,
}
var err error
s.fsm, err = NewFSM(fsmConfig)

View File

@@ -6,14 +6,14 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) {
func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) {
var events []structs.Event
for _, change := range changes.Changes {
switch change.Table {
case "deployment":
after, ok := change.After.(*structs.Deployment)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not a Deployment")
return nil, fmt.Errorf("transaction change was not a Deployment")
}
event := structs.Event{
@@ -29,7 +29,7 @@ func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (structs.Even
case "evals":
after, ok := change.After.(*structs.Evaluation)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not an Evaluation")
return nil, fmt.Errorf("transaction change was not an Evaluation")
}
event := structs.Event{
@@ -45,7 +45,7 @@ func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (structs.Even
case "allocs":
after, ok := change.After.(*structs.Allocation)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not an Allocation")
return nil, fmt.Errorf("transaction change was not an Allocation")
}
before := change.Before
var msg string
@@ -69,5 +69,5 @@ func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (structs.Even
}
}
return structs.Events{Index: changes.Index, Events: events}, nil
return &structs.Events{Index: changes.Index, Events: events}, nil
}

View File

@@ -6,7 +6,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes Changes) (structs.Events, error) {
func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes Changes) (*structs.Events, error) {
var events []structs.Event
var eventType string
@@ -24,7 +24,7 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes
case "deployment":
after, ok := change.After.(*structs.Deployment)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not a Deployment")
return nil, fmt.Errorf("transaction change was not a Deployment")
}
event := structs.Event{
@@ -42,7 +42,7 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes
case "jobs":
after, ok := change.After.(*structs.Job)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not a Job")
return nil, fmt.Errorf("transaction change was not a Job")
}
event := structs.Event{
@@ -61,7 +61,7 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes
case "evals":
after, ok := change.After.(*structs.Evaluation)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not an Evaluation")
return nil, fmt.Errorf("transaction change was not an Evaluation")
}
event := structs.Event{
@@ -79,5 +79,5 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes
}
}
return structs.Events{Index: changes.Index, Events: events}, nil
return &structs.Events{Index: changes.Index, Events: events}, nil
}

View File

@@ -17,21 +17,22 @@ const (
TopicNode structs.Topic = "Node"
// TODO(drew) Node Events use TopicNode + Type
TypeNodeRegistration = "NodeRegistration"
TypeNodeDeregistration = "NodeDeregistration"
TypeNodeDrain = "NodeDrain"
TypeNodeEvent = "NodeEvent"
TypeNodeRegistration = "NodeRegistration"
TypeNodeDeregistration = "NodeDeregistration"
TypeNodeEligibilityUpdate = "NodeEligibility"
TypeNodeDrain = "NodeDrain"
TypeNodeEvent = "NodeEvent"
TypeDeploymentUpdate = "DeploymentStatusUpdate"
TypeDeploymentPromotion = "DeploymentPromotion"
TypeDeploymentAllocHealth = "DeploymentAllocHealth"
TypeAllocCreated = "AllocCreated"
TypeAllocUpdated = "AllocUpdated"
TypeEvalUpdated = "EvalUpdated"
TypeJobRegistered = "JobRegistered"
TypeDeploymentUpdate = "DeploymentStatusUpdate"
TypeDeploymentPromotion = "DeploymentPromotion"
TypeDeploymentAllocHealth = "DeploymentAllocHealth"
TypeAllocCreated = "AllocCreated"
TypeAllocUpdated = "AllocUpdated"
TypeAllocUpdateDesiredStatus = "AllocUpdateDesiredStatus"
TypeEvalUpdated = "EvalUpdated"
TypeJobRegistered = "JobRegistered"
TypeJobDeregistered = "JobDeregistered"
TypeJobBatchDeregistered = "JobBatchDeregistered"
)
type JobEvent struct {
@@ -72,9 +73,13 @@ type JobDrainDetails struct {
AllocDetails map[string]NodeDrainAllocDetails
}
func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) {
func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) {
var eventType string
switch changes.MsgType {
case structs.NodeRegisterRequestType:
eventType = TypeNodeRegistration
case structs.UpsertNodeEventsType:
eventType = TypeNodeEvent
case structs.EvalUpdateRequestType:
eventType = TypeEvalUpdated
case structs.AllocClientUpdateRequestType:
@@ -85,15 +90,31 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, erro
eventType = TypeAllocUpdated
case structs.NodeUpdateStatusRequestType:
eventType = TypeNodeEvent
case structs.JobDeregisterRequestType:
eventType = TypeJobDeregistered
case structs.JobBatchDeregisterRequestType:
eventType = TypeJobBatchDeregistered
case structs.AllocUpdateDesiredTransitionRequestType:
eventType = TypeAllocUpdateDesiredStatus
case structs.NodeUpdateEligibilityRequestType:
eventType = TypeNodeDrain
case structs.BatchNodeUpdateDrainRequestType:
eventType = TypeNodeDrain
default:
// unknown request type
return nil, nil
}
var events []structs.Event
for _, change := range changes.Changes {
switch change.Table {
case "evals":
if change.Deleted() {
return nil, nil
}
after, ok := change.After.(*structs.Evaluation)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not an Evaluation")
return nil, fmt.Errorf("transaction change was not an Evaluation")
}
event := structs.Event{
@@ -109,26 +130,36 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, erro
events = append(events, event)
case "allocs":
if change.Deleted() {
return nil, nil
}
after, ok := change.After.(*structs.Allocation)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not an Allocation")
return nil, fmt.Errorf("transaction change was not an Allocation")
}
alloc := after.Copy()
// remove job info to help keep size of alloc event down
alloc.Job = nil
event := structs.Event{
Topic: TopicAlloc,
Type: eventType,
Index: changes.Index,
Key: after.ID,
Payload: &AllocEvent{
Alloc: after,
Alloc: alloc,
},
}
events = append(events, event)
case "jobs":
if change.Deleted() {
return nil, nil
}
after, ok := change.After.(*structs.Job)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not an Allocation")
return nil, fmt.Errorf("transaction change was not an Allocation")
}
event := structs.Event{
@@ -143,9 +174,12 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, erro
events = append(events, event)
case "nodes":
if change.Deleted() {
return nil, nil
}
after, ok := change.After.(*structs.Node)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not a Node")
return nil, fmt.Errorf("transaction change was not a Node")
}
event := structs.Event{
@@ -161,5 +195,5 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, erro
}
}
return structs.Events{Index: changes.Index, Events: events}, nil
return &structs.Events{Index: changes.Index, Events: events}, nil
}

View File

@@ -8,14 +8,14 @@ import (
// NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set
// of transaction changes.
func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) {
func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) {
var events []structs.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not a Node")
return nil, fmt.Errorf("transaction change was not a Node")
}
event := structs.Event{
@@ -30,19 +30,19 @@ func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) (structs.Events,
events = append(events, event)
}
}
return structs.Events{Index: changes.Index, Events: events}, nil
return &structs.Events{Index: changes.Index, Events: events}, nil
}
// NodeDeregisterEventFromChanges generates a NodeDeregistrationEvent from a set
// of transaction changes.
func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) {
func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) {
var events []structs.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
before, ok := change.Before.(*structs.Node)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not a Node")
return nil, fmt.Errorf("transaction change was not a Node")
}
event := structs.Event{
@@ -57,50 +57,23 @@ func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) (structs.Events
events = append(events, event)
}
}
return structs.Events{Index: changes.Index, Events: events}, nil
return &structs.Events{Index: changes.Index, Events: events}, nil
}
// NodeEventFromChanges generates a NodeDeregistrationEvent from a set
// of transaction changes.
func NodeEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) {
func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) {
var events []structs.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not a Node")
}
event := structs.Event{
Topic: TopicNode,
Type: TypeNodeEvent,
Index: changes.Index,
Key: after.ID,
Payload: &NodeEvent{
Node: after,
},
}
events = append(events, event)
}
}
return structs.Events{Index: changes.Index, Events: events}, nil
}
func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) {
var events []structs.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return structs.Events{}, fmt.Errorf("transaction change was not a Node")
return nil, fmt.Errorf("transaction change was not a Node")
}
// retrieve allocations currently on node
allocs, err := allocsByNodeTxn(tx, nil, after.ID)
if err != nil {
return structs.Events{}, fmt.Errorf("retrieving allocations for node drain event: %w", err)
return nil, fmt.Errorf("retrieving allocations for node drain event: %w", err)
}
// build job/alloc details for node drain
@@ -132,5 +105,5 @@ func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, err
events = append(events, event)
}
}
return structs.Events{Index: changes.Index, Events: events}, nil
return &structs.Events{Index: changes.Index, Events: events}, nil
}

View File

@@ -175,7 +175,7 @@ func TestNodeEventsFromChanges(t *testing.T) {
setupTx.Txn.Commit()
}
tx := s.db.WriteTxn(100)
tx := s.db.WriteTxnMsgT(tc.MsgType, 100)
require.NoError(t, tc.Mutate(s, tx))
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType}
@@ -186,6 +186,7 @@ func TestNodeEventsFromChanges(t *testing.T) {
return
}
require.NoError(t, err)
require.NotNil(t, got)
require.Equal(t, len(tc.WantEvents), len(got.Events))
for idx, g := range got.Events {

View File

@@ -8,10 +8,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
const (
CtxMsgType = "type"
)
// ReadTxn is implemented by memdb.Txn to perform read operations.
type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
@@ -34,10 +30,9 @@ type Changes struct {
// sent to the EventPublisher which will create and emit change events.
type changeTrackerDB struct {
db *memdb.MemDB
durableEvents bool
durableCount int
durableCount int64
publisher *stream.EventPublisher
processChanges func(ReadTxn, Changes) (structs.Events, error)
processChanges func(ReadTxn, Changes) (*structs.Events, error)
}
// ChangeConfig
@@ -54,13 +49,13 @@ func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, chang
db: db,
publisher: publisher,
processChanges: changesFn,
durableCount: cfg.DurableEventCount,
durableCount: int64(cfg.DurableEventCount),
}
}
type changeProcessor func(ReadTxn, Changes) (structs.Events, error)
type changeProcessor func(ReadTxn, Changes) (*structs.Events, error)
func noOpProcessChanges(ReadTxn, Changes) (structs.Events, error) { return structs.Events{}, nil }
func noOpProcessChanges(ReadTxn, Changes) (*structs.Events, error) { return nil, nil }
// ReadTxn returns a read-only transaction which behaves exactly the same as
// memdb.Txn
@@ -92,27 +87,31 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
}
func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64) *txn {
persistChanges := c.durableCount > 0
t := &txn{
msgType: msgType,
Txn: c.db.Txn(true),
Index: idx,
publish: c.publish,
persistChanges: c.durableEvents,
persistChanges: persistChanges,
}
t.Txn.TrackChanges()
return t
}
func (c *changeTrackerDB) publish(changes Changes) (structs.Events, error) {
func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) {
readOnlyTx := c.db.Txn(false)
defer readOnlyTx.Abort()
events, err := c.processChanges(readOnlyTx, changes)
if err != nil {
return structs.Events{}, fmt.Errorf("failed generating events from changes: %v", err)
return nil, fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.Publish(events)
if events != nil {
c.publisher.Publish(events)
}
return events, nil
}
@@ -147,7 +146,7 @@ type txn struct {
// Index is stored so that it may be passed along to any subscribers as part
// of a change event.
Index uint64
publish func(changes Changes) (structs.Events, error)
publish func(changes Changes) (*structs.Events, error)
}
// Commit first pushes changes to EventPublisher, then calls Commit on the
@@ -171,7 +170,7 @@ func (tx *txn) Commit() error {
return err
}
if tx.persistChanges {
if tx.persistChanges && events != nil {
// persist events after processing changes
err := tx.Txn.Insert("events", events)
if err != nil {
@@ -191,13 +190,13 @@ func (tx *txn) MsgType() structs.MessageType {
return tx.msgType
}
func processDBChanges(tx ReadTxn, changes Changes) (structs.Events, error) {
func processDBChanges(tx ReadTxn, changes Changes) (*structs.Events, error) {
switch changes.MsgType {
case structs.IgnoreUnknownTypeFlag:
// unknown event type
return structs.Events{}, nil
return nil, nil
case structs.NodeRegisterRequestType:
return NodeRegisterEventFromChanges(tx, changes)
return GenericEventsFromChanges(tx, changes)
case structs.NodeUpdateStatusRequestType:
// TODO(drew) test
return GenericEventsFromChanges(tx, changes)
@@ -206,7 +205,7 @@ func processDBChanges(tx ReadTxn, changes Changes) (structs.Events, error) {
case structs.NodeUpdateDrainRequestType:
return NodeDrainEventFromChanges(tx, changes)
case structs.UpsertNodeEventsType:
return NodeEventFromChanges(tx, changes)
return GenericEventsFromChanges(tx, changes)
case structs.DeploymentStatusUpdateRequestType:
return DeploymentEventFromChanges(changes.MsgType, tx, changes)
case structs.DeploymentPromoteRequestType:
@@ -225,6 +224,21 @@ func processDBChanges(tx ReadTxn, changes Changes) (structs.Events, error) {
case structs.AllocUpdateRequestType:
// TODO(drew) test
return GenericEventsFromChanges(tx, changes)
// case structs.JobDeregisterRequestType:
// TODO(drew) test / handle delete
// return GenericEventsFromChanges(tx, changes)
// case structs.JobBatchDeregisterRequestType:
// TODO(drew) test & handle delete
// return GenericEventsFromChanges(tx, changes)
case structs.AllocUpdateDesiredTransitionRequestType:
// TODO(drew) drain
return GenericEventsFromChanges(tx, changes)
case structs.NodeUpdateEligibilityRequestType:
// TODO(drew) test, drain
return GenericEventsFromChanges(tx, changes)
case structs.BatchNodeUpdateDrainRequestType:
// TODO(drew) test, drain
return GenericEventsFromChanges(tx, changes)
}
return structs.Events{}, nil
return nil, nil
}

View File

@@ -49,8 +49,11 @@ type StateStoreConfig struct {
// EnablePublisher is used to enable or disable the event publisher
EnablePublisher bool
// DurableEventCount is the amount of events to persist during the snapshot
// process.
// EventBufferSize configures the amount of events to hold in memory
EventBufferSize int64
// DurableEventCount is used to determine if events from transaction changes
// should be saved in go-memdb
DurableEventCount int
}
@@ -96,12 +99,15 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
if config.EnablePublisher {
cfg := &ChangeConfig{
DurableEventCount: 1000,
DurableEventCount: config.DurableEventCount,
}
// Create new event publisher using provided config
publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{
EventBufferTTL: 1 * time.Hour,
EventBufferSize: 250,
EventBufferSize: config.EventBufferSize,
Logger: config.Logger,
OnEvict: s.eventPublisherEvict,
})
s.db = NewChangeTrackerDB(db, publisher, processDBChanges, cfg)
} else {
@@ -123,6 +129,30 @@ func (s *StateStore) EventPublisher() (*stream.EventPublisher, error) {
return s.db.publisher, nil
}
// eventPublisherEvict is used as a callback to delete an evicted events
// entry from go-memdb.
func (s *StateStore) eventPublisherEvict(events *structs.Events) {
if err := s.deleteEvent(events); err != nil {
if err == memdb.ErrNotFound {
s.logger.Info("Evicted event was not found in go-memdb table", "event index", events.Index)
} else {
s.logger.Error("Error deleting event from events table", "error", err)
}
}
}
func (s *StateStore) deleteEvent(events *structs.Events) error {
txn := s.db.db.Txn(true)
defer txn.Abort()
if err := txn.Delete("events", events); err != nil {
return err
}
txn.Commit()
return nil
}
// Config returns the state store configuration.
func (s *StateStore) Config() *StateStoreConfig {
return s.config
@@ -948,7 +978,7 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update
}
// BatchUpdateNodeDrain is used to update the drain of a node set of nodes
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()
for node, update := range updates {
@@ -1030,9 +1060,9 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string,
}
// UpdateNodeEligibility is used to update the scheduling eligibility of a node
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {
func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {
txn := s.db.WriteTxn(index)
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
// Lookup the node
@@ -2919,8 +2949,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
return fmt.Errorf("setting job status failed: %v", err)
}
txn.Commit()
return nil
return txn.Commit()
}
// EvalByID is used to lookup an eval by its ID
@@ -3291,10 +3320,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
// UpdateAllocsDesiredTransitions is used to update a set of allocations
// desired transitions.
func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[string]*structs.DesiredTransition,
func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, index uint64, allocs map[string]*structs.DesiredTransition,
evals []*structs.Evaluation) error {
txn := s.db.WriteTxn(index)
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
// Handle each of the updated allocations

View File

@@ -0,0 +1,125 @@
package state
import (
"errors"
"fmt"
"testing"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
// TestStateStore_Events_OnEvict tests that events in the state stores
// event publisher and go-memdb are evicted together when the event buffer
// size reaches its max.
func TestStateStore_Events_OnEvict(t *testing.T) {
t.Parallel()
cfg := &StateStoreConfig{
Logger: testlog.HCLogger(t),
Region: "global",
EnablePublisher: true,
EventBufferSize: 10,
DurableEventCount: 10,
}
s := TestStateStoreCfg(t, cfg)
_, err := s.EventPublisher()
require.NoError(t, err)
// force 3 evictions
for i := 1; i < 13; i++ {
require.NoError(t,
s.UpsertNodeMsgType(structs.NodeRegisterRequestType, uint64(i), mock.Node()),
)
}
get := func() []*structs.Events {
var out []*structs.Events
iter, err := s.Events(nil)
require.NoError(t, err)
for {
raw := iter.Next()
if raw == nil {
break
}
e := raw.(*structs.Events)
out = append(out, e)
}
return out
}
// event publisher is async so wait for it to prune
testutil.WaitForResult(func() (bool, error) {
out := get()
if len(out) != 10 {
return false, errors.New("Expected event count to be pruned to 10")
}
return true, nil
}, func(err error) {
require.Fail(t, err.Error())
t.Fatalf("err: %s", err)
})
out := get()
require.Equal(t, 3, int(out[0].Index))
}
// TestStateStore_Events_OnEvict_Missing tests behavior when the event publisher
// evicts an event and there is no corresponding go-memdb entry due to durability
// settings
func TestStateStore_Events_OnEvict_Missing(t *testing.T) {
t.Parallel()
cfg := &StateStoreConfig{
Logger: testlog.HCLogger(t),
Region: "global",
EnablePublisher: true,
EventBufferSize: 10,
DurableEventCount: 0,
}
s := TestStateStoreCfg(t, cfg)
_, err := s.EventPublisher()
require.NoError(t, err)
getEvents := func() []*structs.Events {
var out []*structs.Events
iter, err := s.Events(nil)
require.NoError(t, err)
for {
raw := iter.Next()
if raw == nil {
break
}
e := raw.(*structs.Events)
out = append(out, e)
}
return out
}
// Publish 13 events to fill buffer and force 3 evictions
for i := 1; i < 13; i++ {
require.NoError(t,
s.UpsertNodeMsgType(structs.NodeRegisterRequestType, uint64(i), mock.Node()),
)
}
// event publisher is async so wait for it to prune
testutil.WaitForResult(func() (bool, error) {
out := getEvents()
if len(out) != 0 {
return false, fmt.Errorf("Expected event count to be %d, got: %d", 0, len(out))
}
return true, nil
}, func(err error) {
require.Fail(t, err.Error())
t.Fatalf("err: %s", err)
})
}

View File

@@ -956,7 +956,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
n2.ID: event,
}
require.Nil(state.BatchUpdateNodeDrain(1002, 7, update, events))
require.Nil(state.BatchUpdateNodeDrain(structs.MsgTypeTestSetup, 1002, 7, update, events))
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
@@ -1190,7 +1190,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, 7, event))
require.Nil(state.UpdateNodeEligibility(structs.MsgTypeTestSetup, 1001, node.ID, expectedEligibility, 7, event))
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
@@ -1216,7 +1216,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, 7, nil))
// Try to set the node to eligible
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, 9, nil)
err = state.UpdateNodeEligibility(structs.MsgTypeTestSetup, 1003, node.ID, structs.NodeSchedulingEligible, 9, nil)
require.NotNil(err)
require.Contains(err.Error(), "while it is draining")
}
@@ -5203,7 +5203,7 @@ func TestStateStore_UpdateAllocDesiredTransition(t *testing.T) {
evals := []*structs.Evaluation{eval}
m := map[string]*structs.DesiredTransition{alloc.ID: t1}
require.Nil(state.UpdateAllocsDesiredTransitions(1001, m, evals))
require.Nil(state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, 1001, m, evals))
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
@@ -5223,7 +5223,7 @@ func TestStateStore_UpdateAllocDesiredTransition(t *testing.T) {
require.NotNil(eout)
m = map[string]*structs.DesiredTransition{alloc.ID: t2}
require.Nil(state.UpdateAllocsDesiredTransitions(1002, m, evals))
require.Nil(state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, 1002, m, evals))
ws = memdb.NewWatchSet()
out, err = state.AllocByID(ws, alloc.ID)
@@ -5239,7 +5239,7 @@ func TestStateStore_UpdateAllocDesiredTransition(t *testing.T) {
// Try with a bogus alloc id
m = map[string]*structs.DesiredTransition{uuid.Generate(): t2}
require.Nil(state.UpdateAllocsDesiredTransitions(1003, m, evals))
require.Nil(state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, 1003, m, evals))
}
func TestStateStore_JobSummary(t *testing.T) {

View File

@@ -26,10 +26,9 @@ func TestStateStore(t testing.T) *StateStore {
func TestStateStorePublisher(t testing.T) *StateStoreConfig {
return &StateStoreConfig{
Logger: testlog.HCLogger(t),
Region: "global",
EnablePublisher: true,
DurableEventCount: 100,
Logger: testlog.HCLogger(t),
Region: "global",
EnablePublisher: true,
}
}
func TestStateStoreCfg(t testing.T, cfg *StateStoreConfig) *StateStore {

View File

@@ -10,6 +10,8 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
type EvictCallbackFn func(events *structs.Events)
// eventBuffer is a single-writer, multiple-reader, fixed length concurrent
// buffer of events that have been published. The buffer is
// the head and tail of an atomically updated single-linked list. Atomic
@@ -28,8 +30,8 @@ import (
// goroutines or deliver to O(N) separate channels.
//
// Because eventBuffer is a linked list with atomically updated pointers, readers don't
// have to take a lock and can consume at their own pace. Slow readers can eventually
// append
// have to take a lock and can consume at their own pace. Slow readers will eventually
// be forced to reconnect to the lastest head by being notified via a bufferItem's droppedCh.
//
// A new buffer is constructed with a sentinel "empty" bufferItem that has a nil
// Events array. This enables subscribers to start watching for the next update
@@ -39,10 +41,9 @@ import (
// initialized with an empty bufferItem so can not be used to wait for the first
// published event. Call newEventBuffer to construct a new buffer.
//
// Calls to Append or AppendBuffer that mutate the head must be externally
// Calls to Append or purne that mutate the head must be externally
// synchronized. This allows systems that already serialize writes to append
// without lock overhead (e.g. a snapshot goroutine appending thousands of
// events).
// without lock overhead.
type eventBuffer struct {
size *int64
@@ -51,18 +52,20 @@ type eventBuffer struct {
maxSize int64
maxItemTTL time.Duration
onEvict EvictCallbackFn
}
// newEventBuffer creates an eventBuffer ready for use.
func newEventBuffer(size int64, maxItemTTL time.Duration) *eventBuffer {
func newEventBuffer(size int64, maxItemTTL time.Duration, onEvict EvictCallbackFn) *eventBuffer {
zero := int64(0)
b := &eventBuffer{
maxSize: size,
size: &zero,
maxItemTTL: maxItemTTL,
onEvict: onEvict,
}
item := newBufferItem(structs.Events{Index: 0, Events: nil})
item := newBufferItem(&structs.Events{Index: 0, Events: nil})
b.head.Store(item)
b.tail.Store(item)
@@ -74,8 +77,8 @@ func newEventBuffer(size int64, maxItemTTL time.Duration) *eventBuffer {
// watchers. After calling append, the caller must not make any further
// mutations to the events as they may have been exposed to subscribers in other
// goroutines. Append only supports a single concurrent caller and must be
// externally synchronized with other Append, AppendBuffer or AppendErr calls.
func (b *eventBuffer) Append(events structs.Events) {
// externally synchronized with other Append, or prune calls.
func (b *eventBuffer) Append(events *structs.Events) {
b.appendItem(newBufferItem(events))
}
@@ -88,11 +91,10 @@ func (b *eventBuffer) appendItem(item *bufferItem) {
b.tail.Store(item)
// Increment the buffer size
size := atomic.AddInt64(b.size, 1)
atomic.AddInt64(b.size, int64(len(item.Events.Events)))
// Check if we need to advance the head to keep the list
// constrained to max size
if size > b.maxSize {
// Advance Head until we are under allowable size
for atomic.LoadInt64(b.size) > b.maxSize {
b.advanceHead()
}
@@ -101,18 +103,43 @@ func (b *eventBuffer) appendItem(item *bufferItem) {
}
func newSentinelItem() *bufferItem {
return newBufferItem(&structs.Events{Index: 0, Events: nil})
}
// advanceHead drops the current Head buffer item and notifies readers
// that the item should be discarded by closing droppedCh.
// Slow readers will prevent the old head from being GC'd until they
// discard it.
func (b *eventBuffer) advanceHead() {
old := b.Head()
next := old.link.next.Load()
// if the next item is nil replace it with a sentinel value
if next == nil {
next = newSentinelItem()
}
// notify readers that old is being dropped
close(old.link.droppedCh)
b.head.Store(next)
atomic.AddInt64(b.size, -1)
// store the next value to head
b.head.Store(next)
// If the old head is equal to the tail
// update the tail value as well
if old == b.Tail() {
b.tail.Store(next)
}
// update the amount of events we have in the buffer
rmCount := len(old.Events.Events)
atomic.AddInt64(b.size, -int64(rmCount))
// Call evict callback if the item isn't a sentinel value
if b.onEvict != nil && old.Events.Index != 0 {
b.onEvict(old.Events)
}
}
// Head returns the current head of the buffer. It will always exist but it may
@@ -137,10 +164,10 @@ func (b *eventBuffer) Tail() *bufferItem {
// index as well as the offset between the requested index and returned one.
func (b *eventBuffer) StartAtClosest(index uint64) (*bufferItem, int) {
item := b.Head()
if index < item.Index {
return item, int(item.Index) - int(index)
if index < item.Events.Index {
return item, int(item.Events.Index) - int(index)
}
if item.Index == index {
if item.Events.Index == index {
return item, 0
}
@@ -148,12 +175,12 @@ func (b *eventBuffer) StartAtClosest(index uint64) (*bufferItem, int) {
prev := item
item = item.NextNoBlock()
if item == nil {
return prev, int(index) - int(prev.Index)
return prev, int(index) - int(prev.Events.Index)
}
if index < item.Index {
return item, int(item.Index) - int(index)
if index < item.Events.Index {
return item, int(item.Events.Index) - int(index)
}
if index == item.Index {
if index == item.Events.Index {
return item, 0
}
}
@@ -168,13 +195,14 @@ func (b *eventBuffer) Len() int {
// is no longer expired. It should be externally synchronized as it mutates
// the buffer of items.
func (b *eventBuffer) prune() {
now := time.Now()
for {
head := b.Head()
if b.Len() == 0 {
return
}
if time.Since(head.createdAt) > b.maxItemTTL {
if now.Sub(head.createdAt) > b.maxItemTTL {
b.advanceHead()
} else {
return
@@ -202,9 +230,7 @@ type bufferItem struct {
// should check and skip nil Events at any point in the buffer. It will also
// be nil if the producer appends an Error event because they can't complete
// the request to populate the buffer. Err will be non-nil in this case.
Events []structs.Event
Index uint64
Events *structs.Events
// Err is non-nil if the producer can't complete their task and terminates the
// buffer. Subscribers should return the error to clients and cease attempting
@@ -239,14 +265,13 @@ type bufferLink struct {
// newBufferItem returns a blank buffer item with a link and chan ready to have
// the fields set and be appended to a buffer.
func newBufferItem(events structs.Events) *bufferItem {
func newBufferItem(events *structs.Events) *bufferItem {
return &bufferItem{
link: &bufferLink{
ch: make(chan struct{}),
droppedCh: make(chan struct{}),
},
Events: events.Events,
Index: events.Index,
Events: events,
createdAt: time.Now(),
}
}
@@ -258,13 +283,15 @@ func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*buf
// state change (chan nil) as that's not threadsafe but detecting close is.
select {
case <-ctx.Done():
return nil, fmt.Errorf("waiting for next event: %w", ctx.Err())
return nil, ctx.Err()
case <-forceClose:
return nil, fmt.Errorf("subscription closed")
case <-i.link.ch:
}
// Check if the reader is too slow and the event buffer as discarded the event
// This must happen after the above select to prevent a random selection
// between linkCh and droppedCh
select {
case <-i.link.droppedCh:
return nil, fmt.Errorf("event dropped from buffer")
@@ -293,16 +320,3 @@ func (i *bufferItem) NextNoBlock() *bufferItem {
}
return nextRaw.(*bufferItem)
}
// NextLink returns either the next item in the buffer if there is one, or
// an empty item (that will be ignored by subscribers) that has a pointer to
// the same link as this bufferItem (but none of the bufferItem content).
// When the link.ch is closed, subscriptions will be notified of the next item.
func (i *bufferItem) NextLink() *bufferItem {
next := i.NextNoBlock()
if next == nil {
// Return an empty item that can be followed to the next item published.
return &bufferItem{link: i.link}
}
return next
}

View File

@@ -3,11 +3,12 @@ package stream
import (
"context"
"fmt"
"github.com/hashicorp/nomad/nomad/structs"
"math/rand"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -16,7 +17,7 @@ func TestEventBufferFuzz(t *testing.T) {
nReaders := 1000
nMessages := 1000
b := newEventBuffer(1000, DefaultTTL)
b := newEventBuffer(1000, DefaultTTL, nil)
// Start a write goroutine that will publish 10000 messages with sequential
// indexes and some jitter in timing (to allow clients to "catch up" and block
@@ -35,7 +36,7 @@ func TestEventBufferFuzz(t *testing.T) {
e := structs.Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(structs.Events{Index: uint64(i), Events: []structs.Event{e}})
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
// Sleep sometimes for a while to let some subscribers catch up
wait := time.Duration(z.Uint64()) * time.Millisecond
time.Sleep(wait)
@@ -61,9 +62,9 @@ func TestEventBufferFuzz(t *testing.T) {
expect, err)
return
}
if item.Events[0].Index != expect {
if item.Events.Events[0].Index != expect {
errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i,
expect, item.Events[0].Index)
expect, item.Events.Events[0].Index)
return
}
expect++
@@ -84,14 +85,13 @@ func TestEventBufferFuzz(t *testing.T) {
}
func TestEventBuffer_Slow_Reader(t *testing.T) {
b := newEventBuffer(10, DefaultTTL)
b := newEventBuffer(10, DefaultTTL, nil)
for i := 0; i < 10; i++ {
e := structs.Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(structs.Events{uint64(i), []structs.Event{e}})
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
}
head := b.Head()
@@ -100,7 +100,7 @@ func TestEventBuffer_Slow_Reader(t *testing.T) {
e := structs.Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(structs.Events{uint64(i), []structs.Event{e}})
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
}
// Ensure the slow reader errors to handle dropped events and
@@ -110,17 +110,17 @@ func TestEventBuffer_Slow_Reader(t *testing.T) {
require.Nil(t, ev)
newHead := b.Head()
require.Equal(t, 4, int(newHead.Index))
require.Equal(t, 5, int(newHead.Events.Index))
}
func TestEventBuffer_Size(t *testing.T) {
b := newEventBuffer(100, DefaultTTL)
b := newEventBuffer(100, DefaultTTL, nil)
for i := 0; i < 10; i++ {
e := structs.Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(structs.Events{uint64(i), []structs.Event{e}})
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
}
require.Equal(t, 10, b.Len())
@@ -130,26 +130,46 @@ func TestEventBuffer_Size(t *testing.T) {
// are past their TTL, the event buffer should prune down to the last message
// and hold onto the last item.
func TestEventBuffer_Prune_AllOld(t *testing.T) {
b := newEventBuffer(100, 1*time.Second)
b := newEventBuffer(100, 1*time.Second, nil)
for i := 0; i < 10; i++ {
e := structs.Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(structs.Events{uint64(i), []structs.Event{e}})
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
}
require.Equal(t, 10, int(b.Len()))
time.Sleep(1 * time.Second)
// prune old messages, which will bring the event buffer down
// to a single sentinel value
b.prune()
require.Equal(t, 9, int(b.Head().Index))
// head and tail are now a sentinel value
head := b.Head()
tail := b.Tail()
require.Equal(t, 0, int(head.Events.Index))
require.Equal(t, 0, b.Len())
require.Equal(t, head, tail)
e := structs.Event{
Index: uint64(100),
}
b.Append(&structs.Events{Index: uint64(100), Events: []structs.Event{e}})
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
defer cancel()
next, err := head.Next(ctx, make(chan struct{}))
require.NoError(t, err)
require.NotNil(t, next)
require.Equal(t, uint64(100), next.Events.Index)
}
func TestStartAt_CurrentIdx_Past_Start(t *testing.T) {
func TestEventBuffer_StartAt_CurrentIdx_Past_Start(t *testing.T) {
cases := []struct {
desc string
req uint64
@@ -183,20 +203,43 @@ func TestStartAt_CurrentIdx_Past_Start(t *testing.T) {
}
// buffer starts at index 11 goes to 100
b := newEventBuffer(100, 1*time.Hour)
b := newEventBuffer(100, 1*time.Hour, nil)
for i := 11; i <= 100; i++ {
e := structs.Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(structs.Events{uint64(i), []structs.Event{e}})
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
got, offset := b.StartAtClosest(tc.req)
require.Equal(t, int(tc.expected), int(got.Index))
require.Equal(t, int(tc.expected), int(got.Events.Index))
require.Equal(t, tc.offset, offset)
})
}
}
func TestEventBuffer_OnEvict(t *testing.T) {
called := make(chan struct{})
testOnEvict := func(events *structs.Events) {
close(called)
}
b := newEventBuffer(2, DefaultTTL, testOnEvict)
// start at 1 since new event buffer is built with a starting sentinel value
for i := 1; i < 4; i++ {
e := structs.Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
}
select {
case <-called:
// testOnEvict called
case <-time.After(100 * time.Millisecond):
require.Fail(t, "expected testOnEvict to be called")
}
}

View File

@@ -18,6 +18,7 @@ type EventPublisherCfg struct {
EventBufferSize int64
EventBufferTTL time.Duration
Logger hclog.Logger
OnEvict EvictCallbackFn
}
type EventPublisher struct {
@@ -27,10 +28,6 @@ type EventPublisher struct {
// eventBuf stores a configurable amount of events in memory
eventBuf *eventBuffer
// pruneTick is the duration to periodically prune events from the event
// buffer. Defaults to 5s
pruneTick time.Duration
logger hclog.Logger
subscriptions *subscriptions
@@ -38,7 +35,7 @@ type EventPublisher struct {
// publishCh is used to send messages from an active txn to a goroutine which
// publishes events, so that publishing can happen asynchronously from
// the Commit call in the FSM hot path.
publishCh chan structs.Events
publishCh chan *structs.Events
}
type subscriptions struct {
@@ -63,19 +60,22 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish
cfg.Logger = hclog.NewNullLogger()
}
buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL)
// Set the event buffer size to a minimum
if cfg.EventBufferSize == 0 {
cfg.EventBufferSize = 100
}
buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL, cfg.OnEvict)
e := &EventPublisher{
logger: cfg.Logger.Named("event_publisher"),
eventBuf: buffer,
publishCh: make(chan structs.Events, 64),
publishCh: make(chan *structs.Events, 64),
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
pruneTick: 5 * time.Second,
}
go e.handleUpdates(ctx)
go e.periodicPrune(ctx)
return e
}
@@ -85,7 +85,7 @@ func (e *EventPublisher) Len() int {
}
// Publish events to all subscribers of the event Topic.
func (e *EventPublisher) Publish(events structs.Events) {
func (e *EventPublisher) Publish(events *structs.Events) {
if len(events.Events) > 0 {
e.publishCh <- events
}
@@ -104,11 +104,11 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
head = e.eventBuf.Head()
}
if offset > 0 {
e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Index))
e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Events.Index))
}
// Empty head so that calling Next on sub
start := newBufferItem(structs.Events{Index: req.Index})
start := newBufferItem(&structs.Events{Index: req.Index})
start.link.next.Store(head)
close(start.link.ch)
@@ -118,6 +118,10 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
return sub, nil
}
func (e *EventPublisher) CloseAll() {
e.subscriptions.closeAll()
}
func (e *EventPublisher) handleUpdates(ctx context.Context) {
for {
select {
@@ -130,21 +134,8 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) {
}
}
func (e *EventPublisher) periodicPrune(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(e.pruneTick):
e.lock.Lock()
e.eventBuf.prune()
e.lock.Unlock()
}
}
}
// sendEvents sends the given events to the publishers event buffer.
func (e *EventPublisher) sendEvents(update structs.Events) {
func (e *EventPublisher) sendEvents(update *structs.Events) {
e.lock.Lock()
defer e.lock.Unlock()

View File

@@ -2,10 +2,11 @@ package stream
import (
"context"
"github.com/hashicorp/nomad/nomad/structs"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
@@ -32,7 +33,7 @@ func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) {
Key: "sub-key",
Payload: "sample payload",
}}
publisher.Publish(structs.Events{Index: 1, Events: events})
publisher.Publish(&structs.Events{Index: 1, Events: events})
// Subscriber should see the published event
result := nextResult(t, eventCh)
@@ -50,7 +51,7 @@ func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) {
Key: "sub-key",
Payload: "sample payload 2",
}}
publisher.Publish(structs.Events{Index: 2, Events: events})
publisher.Publish(&structs.Events{Index: 2, Events: events})
result = nextResult(t, eventCh)
require.NoError(t, result.Err)

View File

@@ -76,11 +76,11 @@ func (s *Subscription) Next(ctx context.Context) (structs.Events, error) {
}
s.currentItem = next
events := filter(s.req, next.Events)
events := filter(s.req, next.Events.Events)
if len(events) == 0 {
continue
}
return structs.Events{Index: next.Index, Events: events}, nil
return structs.Events{Index: next.Events.Index, Events: events}, nil
}
}
@@ -96,7 +96,7 @@ func (s *Subscription) NextNoBlock() ([]structs.Event, error) {
}
s.currentItem = next
events := filter(s.req, next.Events)
events := filter(s.req, next.Events.Events)
if len(events) == 0 {
continue
}