diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go index 5684d9c14..d68563969 100644 --- a/command/agent/event_endpoint.go +++ b/command/agent/event_endpoint.go @@ -13,7 +13,6 @@ import ( "github.com/docker/docker/pkg/ioutils" "github.com/hashicorp/go-msgpack/codec" - "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) @@ -134,12 +133,12 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i return nil, codedErr } -func parseEventTopics(query url.Values) (map[stream.Topic][]string, error) { +func parseEventTopics(query url.Values) (map[structs.Topic][]string, error) { raw, ok := query["topic"] if !ok { return allTopics(), nil } - topics := make(map[stream.Topic][]string) + topics := make(map[structs.Topic][]string) for _, topic := range raw { k, v, err := parseTopic(topic) @@ -147,10 +146,10 @@ func parseEventTopics(query url.Values) (map[stream.Topic][]string, error) { return nil, fmt.Errorf("error parsing topics: %w", err) } - if topics[stream.Topic(k)] == nil { - topics[stream.Topic(k)] = []string{v} + if topics[structs.Topic(k)] == nil { + topics[structs.Topic(k)] = []string{v} } else { - topics[stream.Topic(k)] = append(topics[stream.Topic(k)], v) + topics[structs.Topic(k)] = append(topics[structs.Topic(k)], v) } } return topics, nil @@ -164,6 +163,6 @@ func parseTopic(topic string) (string, string, error) { return parts[0], parts[1], nil } -func allTopics() map[stream.Topic][]string { - return map[stream.Topic][]string{"*": {"*"}} +func allTopics() map[structs.Topic][]string { + return map[structs.Topic][]string{"*": {"*"}} } diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index e97b4a30d..af86cce7c 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -3,6 +3,7 @@ package agent import ( "context" "fmt" + "github.com/hashicorp/nomad/nomad/structs" "net/http" "net/http/httptest" "net/url" @@ -10,7 +11,6 @@ import ( "testing" "time" - "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -38,7 +38,7 @@ func TestEventStream(t *testing.T) { pub, err := s.Agent.server.State().EventPublisher() require.NoError(t, err) - pub.Publish(100, []stream.Event{{Payload: testEvent{ID: "123"}}}) + pub.Publish(structs.Events{Index: 100, Events: []structs.Event{{Payload: testEvent{ID: "123"}}}}) testutil.WaitForResult(func() (bool, error) { got := resp.Body.String() @@ -72,20 +72,20 @@ func TestEventStream_QueryParse(t *testing.T) { cases := []struct { desc string query string - want map[stream.Topic][]string + want map[structs.Topic][]string wantErr bool }{ { desc: "all topics and keys specified", query: "?topic=*:*", - want: map[stream.Topic][]string{ + want: map[structs.Topic][]string{ "*": {"*"}, }, }, { desc: "all topics and keys inferred", query: "", - want: map[stream.Topic][]string{ + want: map[structs.Topic][]string{ "*": {"*"}, }, }, @@ -102,14 +102,14 @@ func TestEventStream_QueryParse(t *testing.T) { { desc: "single topic and key", query: "?topic=NodeDrain:*", - want: map[stream.Topic][]string{ + want: map[structs.Topic][]string{ "NodeDrain": {"*"}, }, }, { desc: "single topic multiple keys", query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d", - want: map[stream.Topic][]string{ + want: map[structs.Topic][]string{ "NodeDrain": { "*", "3caace09-f1f4-4d23-b37a-9ab5eb75069d", @@ -119,7 +119,7 @@ func TestEventStream_QueryParse(t *testing.T) { { desc: "multiple topics", query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d", - want: map[stream.Topic][]string{ + want: map[structs.Topic][]string{ "NodeDrain": { "3caace09-f1f4-4d23-b37a-9ab5eb75069d", }, diff --git a/go.mod b/go.mod index 0995d5df8..d399bf5f9 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ replace ( github.com/godbus/dbus => github.com/godbus/dbus v5.0.1+incompatible github.com/golang/protobuf => github.com/golang/protobuf v1.3.4 github.com/hashicorp/go-discover => github.com/hashicorp/go-discover v0.0.0-20200812215701-c4b85f6ed31f + github.com/hashicorp/go-memdb => /home/drew/work/go/go-memdb github.com/hashicorp/nomad/api => ./api github.com/kr/pty => github.com/kr/pty v1.1.5 github.com/shirou/gopsutil => github.com/hashicorp/gopsutil v2.18.13-0.20200531184148-5aca383d4f9d+incompatible @@ -62,7 +63,7 @@ require ( github.com/hashicorp/go-envparse v0.0.0-20180119215841-310ca1881b22 github.com/hashicorp/go-getter v1.3.1-0.20190822194507-f5101da01173 github.com/hashicorp/go-hclog v0.12.0 - github.com/hashicorp/go-immutable-radix v1.2.0 + github.com/hashicorp/go-immutable-radix v1.3.0 github.com/hashicorp/go-memdb v1.2.1 github.com/hashicorp/go-msgpack v1.1.5 github.com/hashicorp/go-multierror v1.1.0 diff --git a/go.sum b/go.sum index 795748067..ed55baff1 100644 --- a/go.sum +++ b/go.sum @@ -392,9 +392,8 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-immutable-radix v1.1.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-immutable-radix v1.2.0 h1:l6UW37iCXwZkZoAbEYnptSHVE/cQ5bOTPYG5W3vf9+8= github.com/hashicorp/go-immutable-radix v1.2.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= -github.com/hashicorp/go-memdb v1.0.3/go.mod h1:LWQ8R70vPrS4OEY9k28D2z8/Zzyu34NVzeRibGAzHO0= -github.com/hashicorp/go-memdb v1.2.1 h1:wI9btDjYUOJJHTCnRlAG/TkRyD/ij7meJMrLK9X31Cc= -github.com/hashicorp/go-memdb v1.2.1/go.mod h1:OSvLJ662Jim8hMM+gWGyhktyWk2xPCnWMc7DWIqtkGA= +github.com/hashicorp/go-immutable-radix v1.3.0 h1:8exGP7ego3OmkfksihtSouGMZ+hQrhxx+FVELeXpVPE= +github.com/hashicorp/go-immutable-radix v1.3.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-msgpack v1.1.5 h1:9byZdVjKTe5mce63pRVNP1L7UAmdHOTEMGehn6KvJWs= diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index 781adb5ad..1e1c94081 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -83,7 +83,7 @@ func (e *Event) stream(conn io.ReadWriteCloser) { } defer subscription.Unsubscribe() - ndJsonCh := make(chan *stream.NDJson) + ndJsonCh := make(chan *structs.NDJson) errCh := make(chan error) jsonStream := stream.NewNDJsonStream(ndJsonCh, 30*time.Second) diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go index 793c198a4..b05aef2b8 100644 --- a/nomad/event_endpoint_test.go +++ b/nomad/event_endpoint_test.go @@ -28,7 +28,7 @@ func TestEventStream(t *testing.T) { // Create request for all topics and keys req := structs.EventStreamRequest{ - Topics: map[stream.Topic][]string{"*": []string{"*"}}, + Topics: map[structs.Topic][]string{"*": []string{"*"}}, QueryOptions: structs.QueryOptions{ Region: s1.Region(), }, @@ -68,7 +68,7 @@ func TestEventStream(t *testing.T) { require.NoError(t, err) node := mock.Node() - publisher.Publish(uint64(1), []stream.Event{{Topic: "test", Payload: node}}) + publisher.Publish(structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}}) encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(t, encoder.Encode(req)) @@ -91,7 +91,7 @@ OUTER: continue } - var event stream.Events + var event structs.Events err = json.Unmarshal(msg.Event.Data, &event) require.NoError(t, err) @@ -123,7 +123,7 @@ func TestEventStream_StreamErr(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) req := structs.EventStreamRequest{ - Topics: map[stream.Topic][]string{"*": {"*"}}, + Topics: map[structs.Topic][]string{"*": {"*"}}, QueryOptions: structs.QueryOptions{ Region: s1.Region(), }, @@ -160,7 +160,7 @@ func TestEventStream_StreamErr(t *testing.T) { require.NoError(t, err) node := mock.Node() - publisher.Publish(uint64(1), []stream.Event{{Topic: "test", Payload: node}}) + publisher.Publish(structs.Events{uint64(1), []structs.Event{{Topic: "test", Payload: node}}}) // send req encoder := codec.NewEncoder(p1, structs.MsgpackHandle) @@ -210,7 +210,7 @@ func TestEventStream_RegionForward(t *testing.T) { // Create request targed for region foo req := structs.EventStreamRequest{ - Topics: map[stream.Topic][]string{"*": {"*"}}, + Topics: map[structs.Topic][]string{"*": {"*"}}, QueryOptions: structs.QueryOptions{ Region: "foo", }, @@ -249,7 +249,7 @@ func TestEventStream_RegionForward(t *testing.T) { require.NoError(t, err) node := mock.Node() - publisher.Publish(uint64(1), []stream.Event{{Topic: "test", Payload: node}}) + publisher.Publish(structs.Events{uint64(1), []structs.Event{{Topic: "test", Payload: node}}}) // send req encoder := codec.NewEncoder(p1, structs.MsgpackHandle) @@ -272,7 +272,7 @@ OUTER: continue } - var event stream.Events + var event structs.Events err = json.Unmarshal(msg.Event.Data, &event) require.NoError(t, err) diff --git a/nomad/fsm.go b/nomad/fsm.go index cf6d06a36..ce5caaafd 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -52,6 +52,7 @@ const ( CSIPluginSnapshot CSIVolumeSnapshot ScalingEventsSnapshot + EventSnapshot ) // LogApplier is the definition of a function that can apply a Raft log @@ -136,6 +137,8 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { Logger: config.Logger, Region: config.Region, EnablePublisher: config.EnableEventPublisher, + // TODO(drew) plumb cfg + EnableDurability: true, } state, err := state.NewStateStore(sconfig) if err != nil { @@ -1266,8 +1269,11 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // Create a new state store config := &state.StateStoreConfig{ - Logger: n.config.Logger, - Region: n.config.Region, + Logger: n.config.Logger, + Region: n.config.Region, + EnablePublisher: n.config.EnableEventPublisher, + // TODO(drew) plumb cfg + EnableDurability: true, } newState, err := state.NewStateStore(config) if err != nil { @@ -1511,7 +1517,14 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := restore.CSIVolumeRestore(plugin); err != nil { return err } - + case EventSnapshot: + event := new(structs.Events) + if err := dec.Decode(event); err != nil { + return err + } + if err := restore.EventRestore(event); err != nil { + return err + } default: // Check if this is an enterprise only object being restored restorer, ok := n.enterpriseRestorers[snapType] @@ -1546,6 +1559,24 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // blocking queries won't see any changes and need to be woken up. stateOld.Abandon() + // Rehydrate the new state store's event publisher with the events + // persisted in the snapshot + // pub, err := n.state.EventPublisher() + // if err != nil { + // n.logger.Warn("Snapshot Restore: new state event publisher not configured") + // } + // events, err := n.state.Events(nil) + // if err != nil { + // n.logger.Warn("Snapshot Restore: unable to retrieve current events") + // } + // for { + // raw := events.Next() + // if raw == nil { + // break + // } + // e := raw.(*structs.Events) + // pub.Publish(e) + // } return nil } @@ -1823,6 +1854,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistEvents(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -2326,6 +2361,47 @@ func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink, return nil } +func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error { + var durableCount int + if s.snap.Config() != nil && !s.snap.Config().EnableDurability { + return nil + } else { + durableCount = s.snap.Config().DurableCount + } + + events, err := s.snap.LatestEventsReverse(nil) + if err != nil { + return err + } + + count := 0 + for { + // Get the next item + raw := events.Next() + if raw == nil { + break + } + + // Prepare the request struct + event := raw.(*structs.Events) + + eventCount := len(event.Events) + + // Write out a volume snapshot + sink.Write([]byte{byte(EventSnapshot)}) + if err := encoder.Encode(event); err != nil { + return err + } + count += eventCount + + // Only write to sink until durableCount has been reached + if count >= durableCount { + return nil + } + } + return nil +} + // Release is a no-op, as we just need to GC the pointer // to the state store snapshot. There is nothing to explicitly // cleanup. diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 1a0857178..81e583568 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -3199,3 +3199,73 @@ func TestFSM_ClusterMetadata(t *testing.T) { r.Equal(clusterID, storedMetadata.ClusterID) r.Equal(now, storedMetadata.CreateTime) } + +func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) { + t.Parallel() + // Add some state + fsm := testFSM(t) + state := fsm.State() + cfg := state.Config() + cfg.EnableDurability = true + // DurableCount = 4 each mock events wrapper contains 2 events + cfg.DurableCount = 4 + + e1 := mock.Events(1000) + e2 := mock.Events(1001) + e3 := mock.Events(1002) + + require.NoError(t, state.UpsertEvents(1000, e1)) + require.NoError(t, state.UpsertEvents(1001, e2)) + require.NoError(t, state.UpsertEvents(1002, e3)) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + + // latest events iterator is newest to oldest + iter, err := state2.LatestEventsReverse(nil) + require.NoError(t, err) + + raw3 := iter.Next() + require.NotNil(t, raw3) + + out3, ok := raw3.(*structs.Events) + require.True(t, ok) + require.Equal(t, e3.Index, out3.Index) + + raw2 := iter.Next() + require.NotNil(t, raw2) + + out2, ok := raw2.(*structs.Events) + require.True(t, ok) + require.Equal(t, e2.Index, out2.Index) + + // Durable count was 4 so e1 events should be excluded + raw1 := iter.Next() + require.Nil(t, raw1) +} + +func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) { + t.Parallel() + // Add some state + fsm := testFSM(t) + state := fsm.State() + cfg := state.Config() + cfg.EnableDurability = false + + e1 := mock.Events(1000) + e2 := mock.Events(1001) + + require.NoError(t, state.UpsertEvents(1000, e1)) + require.NoError(t, state.UpsertEvents(1001, e2)) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + // ws := memdb.NewWatchSet() + out, err := state2.LatestEventsReverse(nil) + require.NoError(t, err) + + raw := out.Next() + require.Nil(t, raw) +} diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 338b0f1b5..765ca0730 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1471,3 +1471,25 @@ func CSIVolume(plugin *structs.CSIPlugin) *structs.CSIVolume { NodesExpected: len(plugin.Nodes), } } + +func Events(index uint64) *structs.Events { + return &structs.Events{ + Index: index, + Events: []structs.Event{ + { + Index: index, + Topic: "Node", + Type: "update", + Key: uuid.Generate(), + Payload: Node(), + }, + { + Index: index, + Topic: "Eval", + Type: "update", + Key: uuid.Generate(), + Payload: Eval(), + }, + }, + } +} diff --git a/nomad/state/apply_plan_events.go b/nomad/state/apply_plan_events.go index 470c48790..1132dafd7 100644 --- a/nomad/state/apply_plan_events.go +++ b/nomad/state/apply_plan_events.go @@ -3,21 +3,20 @@ package state import ( "fmt" - "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) -func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { - var events []stream.Event +func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { + var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "deployment": after, ok := change.After.(*structs.Deployment) if !ok { - return nil, fmt.Errorf("transaction change was not a Deployment") + return structs.Events{}, fmt.Errorf("transaction change was not a Deployment") } - event := stream.Event{ + event := structs.Event{ Topic: TopicDeployment, Type: TypeDeploymentUpdate, Index: changes.Index, @@ -30,10 +29,10 @@ func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Eve case "evals": after, ok := change.After.(*structs.Evaluation) if !ok { - return nil, fmt.Errorf("transaction change was not an Evaluation") + return structs.Events{}, fmt.Errorf("transaction change was not an Evaluation") } - event := stream.Event{ + event := structs.Event{ Topic: TopicEval, Index: changes.Index, Key: after.ID, @@ -46,7 +45,7 @@ func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Eve case "allocs": after, ok := change.After.(*structs.Allocation) if !ok { - return nil, fmt.Errorf("transaction change was not an Allocation") + return structs.Events{}, fmt.Errorf("transaction change was not an Allocation") } before := change.Before var msg string @@ -56,7 +55,7 @@ func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Eve msg = TypeAllocUpdated } - event := stream.Event{ + event := structs.Event{ Topic: TopicAlloc, Type: msg, Index: changes.Index, @@ -70,5 +69,5 @@ func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Eve } } - return events, nil + return structs.Events{Index: changes.Index, Events: events}, nil } diff --git a/nomad/state/deployment_events.go b/nomad/state/deployment_events.go index 5f9838f28..3f11d351d 100644 --- a/nomad/state/deployment_events.go +++ b/nomad/state/deployment_events.go @@ -3,12 +3,11 @@ package state import ( "fmt" - "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) -func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes Changes) ([]stream.Event, error) { - var events []stream.Event +func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes Changes) (structs.Events, error) { + var events []structs.Event var eventType string switch msgType { @@ -25,10 +24,10 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes case "deployment": after, ok := change.After.(*structs.Deployment) if !ok { - return nil, fmt.Errorf("transaction change was not a Deployment") + return structs.Events{}, fmt.Errorf("transaction change was not a Deployment") } - event := stream.Event{ + event := structs.Event{ Topic: TopicDeployment, Type: eventType, Index: changes.Index, @@ -43,10 +42,10 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes case "jobs": after, ok := change.After.(*structs.Job) if !ok { - return nil, fmt.Errorf("transaction change was not a Job") + return structs.Events{}, fmt.Errorf("transaction change was not a Job") } - event := stream.Event{ + event := structs.Event{ Topic: TopicJob, Type: eventType, Index: changes.Index, @@ -62,10 +61,10 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes case "evals": after, ok := change.After.(*structs.Evaluation) if !ok { - return nil, fmt.Errorf("transaction change was not an Evaluation") + return structs.Events{}, fmt.Errorf("transaction change was not an Evaluation") } - event := stream.Event{ + event := structs.Event{ Topic: TopicEval, Type: eventType, Index: changes.Index, @@ -80,5 +79,5 @@ func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes } } - return events, nil + return structs.Events{Index: changes.Index, Events: events}, nil } diff --git a/nomad/state/deployment_event_test.go b/nomad/state/deployment_events_test.go similarity index 97% rename from nomad/state/deployment_event_test.go rename to nomad/state/deployment_events_test.go index 14fb3482e..d85c9d395 100644 --- a/nomad/state/deployment_event_test.go +++ b/nomad/state/deployment_events_test.go @@ -135,7 +135,7 @@ func TestDeploymentEventFromChanges_Promotion(t *testing.T) { require.Equal(t, structs.DeploymentStatusRunning, de.Deployment.Status) } -func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, timeout time.Duration) []stream.Event { +func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, timeout time.Duration) []structs.Event { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -162,12 +162,12 @@ func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, tim } } -func EventsForIndex(t *testing.T, s *StateStore, index uint64) []stream.Event { +func EventsForIndex(t *testing.T, s *StateStore, index uint64) []structs.Event { pub, err := s.EventPublisher() require.NoError(t, err) sub, err := pub.Subscribe(&stream.SubscribeRequest{ - Topics: map[stream.Topic][]string{ + Topics: map[structs.Topic][]string{ "*": []string{"*"}, }, Index: index, @@ -176,7 +176,7 @@ func EventsForIndex(t *testing.T, s *StateStore, index uint64) []stream.Event { require.NoError(t, err) - var events []stream.Event + var events []structs.Event for { e, err := sub.NextNoBlock() require.NoError(t, err) diff --git a/nomad/state/events.go b/nomad/state/events.go index 7e0d95daa..52f9450cc 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -3,19 +3,18 @@ package state import ( "fmt" - "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) const ( - TopicDeployment stream.Topic = "Deployment" - TopicEval stream.Topic = "Eval" - TopicAlloc stream.Topic = "Alloc" - TopicJob stream.Topic = "Job" + TopicDeployment structs.Topic = "Deployment" + TopicEval structs.Topic = "Eval" + TopicAlloc structs.Topic = "Alloc" + TopicJob structs.Topic = "Job" // TopicNodeRegistration stream.Topic = "NodeRegistration" // TopicNodeDeregistration stream.Topic = "NodeDeregistration" // TopicNodeDrain stream.Topic = "NodeDrain" - TopicNode stream.Topic = "Node" + TopicNode structs.Topic = "Node" // TODO(drew) Node Events use TopicNode + Type TypeNodeRegistration = "NodeRegistration" @@ -73,7 +72,7 @@ type JobDrainDetails struct { AllocDetails map[string]NodeDrainAllocDetails } -func GenericEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { +func GenericEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { var eventType string switch changes.MsgType { case structs.EvalUpdateRequestType: @@ -88,16 +87,16 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, erro eventType = TypeNodeEvent } - var events []stream.Event + var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "evals": after, ok := change.After.(*structs.Evaluation) if !ok { - return nil, fmt.Errorf("transaction change was not an Evaluation") + return structs.Events{}, fmt.Errorf("transaction change was not an Evaluation") } - event := stream.Event{ + event := structs.Event{ Topic: TopicEval, Type: eventType, Index: changes.Index, @@ -112,10 +111,10 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, erro case "allocs": after, ok := change.After.(*structs.Allocation) if !ok { - return nil, fmt.Errorf("transaction change was not an Allocation") + return structs.Events{}, fmt.Errorf("transaction change was not an Allocation") } - event := stream.Event{ + event := structs.Event{ Topic: TopicAlloc, Type: eventType, Index: changes.Index, @@ -129,10 +128,10 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, erro case "jobs": after, ok := change.After.(*structs.Job) if !ok { - return nil, fmt.Errorf("transaction change was not an Allocation") + return structs.Events{}, fmt.Errorf("transaction change was not an Allocation") } - event := stream.Event{ + event := structs.Event{ Topic: TopicAlloc, Type: eventType, Index: changes.Index, @@ -146,10 +145,10 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, erro case "nodes": after, ok := change.After.(*structs.Node) if !ok { - return nil, fmt.Errorf("transaction change was not a Node") + return structs.Events{}, fmt.Errorf("transaction change was not a Node") } - event := stream.Event{ + event := structs.Event{ Topic: TopicNode, Type: eventType, Index: changes.Index, @@ -162,5 +161,5 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, erro } } - return events, nil + return structs.Events{Index: changes.Index, Events: events}, nil } diff --git a/nomad/state/node_events.go b/nomad/state/node_events.go index a9e55aeb1..e709f3cf3 100644 --- a/nomad/state/node_events.go +++ b/nomad/state/node_events.go @@ -3,23 +3,22 @@ package state import ( "fmt" - "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) // NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set // of transaction changes. -func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { - var events []stream.Event +func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { + var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "nodes": after, ok := change.After.(*structs.Node) if !ok { - return nil, fmt.Errorf("transaction change was not a Node") + return structs.Events{}, fmt.Errorf("transaction change was not a Node") } - event := stream.Event{ + event := structs.Event{ Topic: TopicNode, Type: TypeNodeRegistration, Index: changes.Index, @@ -31,22 +30,22 @@ func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, events = append(events, event) } } - return events, nil + return structs.Events{Index: changes.Index, Events: events}, nil } // NodeDeregisterEventFromChanges generates a NodeDeregistrationEvent from a set // of transaction changes. -func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { - var events []stream.Event +func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { + var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "nodes": before, ok := change.Before.(*structs.Node) if !ok { - return nil, fmt.Errorf("transaction change was not a Node") + return structs.Events{}, fmt.Errorf("transaction change was not a Node") } - event := stream.Event{ + event := structs.Event{ Topic: TopicNode, Type: TypeNodeDeregistration, Index: changes.Index, @@ -58,22 +57,22 @@ func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event events = append(events, event) } } - return events, nil + return structs.Events{Index: changes.Index, Events: events}, nil } // NodeEventFromChanges generates a NodeDeregistrationEvent from a set // of transaction changes. -func NodeEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { - var events []stream.Event +func NodeEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { + var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "nodes": after, ok := change.After.(*structs.Node) if !ok { - return nil, fmt.Errorf("transaction change was not a Node") + return structs.Events{}, fmt.Errorf("transaction change was not a Node") } - event := stream.Event{ + event := structs.Event{ Topic: TopicNode, Type: TypeNodeEvent, Index: changes.Index, @@ -85,23 +84,23 @@ func NodeEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { events = append(events, event) } } - return events, nil + return structs.Events{Index: changes.Index, Events: events}, nil } -func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { - var events []stream.Event +func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { + var events []structs.Event for _, change := range changes.Changes { switch change.Table { case "nodes": after, ok := change.After.(*structs.Node) if !ok { - return nil, fmt.Errorf("transaction change was not a Node") + return structs.Events{}, fmt.Errorf("transaction change was not a Node") } // retrieve allocations currently on node allocs, err := allocsByNodeTxn(tx, nil, after.ID) if err != nil { - return nil, fmt.Errorf("retrieving allocations for node drain event: %w", err) + return structs.Events{}, fmt.Errorf("retrieving allocations for node drain event: %w", err) } // build job/alloc details for node drain @@ -120,7 +119,7 @@ func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, err } } - event := stream.Event{ + event := structs.Event{ Topic: TopicNode, Type: TypeNodeDrain, Index: changes.Index, @@ -133,5 +132,5 @@ func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, err events = append(events, event) } } - return events, nil + return structs.Events{Index: changes.Index, Events: events}, nil } diff --git a/nomad/state/node_events_test.go b/nomad/state/node_events_test.go index 78918db23..e32e338ec 100644 --- a/nomad/state/node_events_test.go +++ b/nomad/state/node_events_test.go @@ -5,7 +5,6 @@ import ( "time" "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" ) @@ -16,9 +15,9 @@ func TestNodeEventsFromChanges(t *testing.T) { MsgType structs.MessageType Setup func(s *StateStore, tx *txn) error Mutate func(s *StateStore, tx *txn) error - WantEvents []stream.Event + WantEvents []structs.Event WantErr bool - WantTopic stream.Topic + WantTopic structs.Topic }{ { MsgType: structs.NodeRegisterRequestType, @@ -27,7 +26,7 @@ func TestNodeEventsFromChanges(t *testing.T) { Mutate: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode()) }, - WantEvents: []stream.Event{{ + WantEvents: []structs.Event{{ Topic: TopicNode, Type: TypeNodeRegistration, Key: testNodeID(), @@ -45,7 +44,7 @@ func TestNodeEventsFromChanges(t *testing.T) { Mutate: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady)) }, - WantEvents: []stream.Event{{ + WantEvents: []structs.Event{{ Topic: TopicNode, Type: TypeNodeRegistration, Key: testNodeID(), @@ -66,7 +65,7 @@ func TestNodeEventsFromChanges(t *testing.T) { Mutate: func(s *StateStore, tx *txn) error { return deleteNodeTxn(tx, tx.Index, []string{testNodeID()}) }, - WantEvents: []stream.Event{{ + WantEvents: []structs.Event{{ Topic: TopicNode, Type: TypeNodeDeregistration, Key: testNodeID(), @@ -88,7 +87,7 @@ func TestNodeEventsFromChanges(t *testing.T) { Mutate: func(s *StateStore, tx *txn) error { return deleteNodeTxn(tx, tx.Index, []string{testNodeID(), testNodeIDTwo()}) }, - WantEvents: []stream.Event{ + WantEvents: []structs.Event{ { Topic: TopicNode, Type: TypeNodeDeregistration, @@ -140,7 +139,7 @@ func TestNodeEventsFromChanges(t *testing.T) { require.NoError(t, s.upsertNodeEvents(tx.Index, testNodeID(), eventFn(testNodeID()), tx)) return s.upsertNodeEvents(tx.Index, testNodeIDTwo(), eventFn(testNodeIDTwo()), tx) }, - WantEvents: []stream.Event{ + WantEvents: []structs.Event{ { Topic: TopicNode, Type: TypeNodeEvent, @@ -188,8 +187,8 @@ func TestNodeEventsFromChanges(t *testing.T) { } require.NoError(t, err) - require.Equal(t, len(tc.WantEvents), len(got)) - for idx, g := range got { + require.Equal(t, len(tc.WantEvents), len(got.Events)) + for idx, g := range got.Events { // assert equality of shared fields want := tc.WantEvents[idx] @@ -251,18 +250,18 @@ func TestNodeDrainEventFromChanges(t *testing.T) { require.Len(t, got, 1) - require.Equal(t, TopicNode, got[0].Topic) - require.Equal(t, TypeNodeDrain, got[0].Type) - require.Equal(t, uint64(100), got[0].Index) + require.Equal(t, TopicNode, got.Events[0].Topic) + require.Equal(t, TypeNodeDrain, got.Events[0].Type) + require.Equal(t, uint64(100), got.Events[0].Index) - nodeEvent, ok := got[0].Payload.(*NodeDrainEvent) + nodeEvent, ok := got.Events[0].Payload.(*NodeDrainEvent) require.True(t, ok) require.Equal(t, structs.NodeSchedulingIneligible, nodeEvent.Node.SchedulingEligibility) require.Equal(t, strat, nodeEvent.Node.DrainStrategy) } -func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) { +func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) { t.Helper() wantPayload := want.Payload.(*NodeEvent) @@ -274,7 +273,7 @@ func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) { require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events) } -func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) { +func requireNodeDeregistrationEventEqual(t *testing.T, want, got structs.Event) { t.Helper() wantPayload := want.Payload.(*NodeEvent) @@ -284,7 +283,7 @@ func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) { require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events) } -func requireNodeEventEqual(t *testing.T, want, got stream.Event) { +func requireNodeEventEqual(t *testing.T, want, got structs.Event) { gotPayload := got.Payload.(*NodeEvent) require.Len(t, gotPayload.Node.Events, 3) diff --git a/nomad/state/schema_test.go b/nomad/state/schema_test.go index 267c6f874..fd2f04b4c 100644 --- a/nomad/state/schema_test.go +++ b/nomad/state/schema_test.go @@ -1,11 +1,11 @@ package state import ( + "github.com/hashicorp/nomad/nomad/structs" "testing" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/stream" "github.com/stretchr/testify/require" ) @@ -178,26 +178,26 @@ func TestEventTableUintIndex(t *testing.T) { return num } - insertEvents := func(e *stream.Events) { + insertEvents := func(e *structs.Events) { txn := db.Txn(true) err := txn.Insert(eventsTable, e) require.NoError(err) txn.Commit() } - get := func(idx uint64) *stream.Events { + get := func(idx uint64) *structs.Events { txn := db.Txn(false) defer txn.Abort() record, err := txn.First("events", "id", idx) require.NoError(err) - s, ok := record.(*stream.Events) + s, ok := record.(*structs.Events) require.True(ok) return s } - firstEvent := &stream.Events{Index: 10, Events: []stream.Event{{Index: 10}, {Index: 10}}} - secondEvent := &stream.Events{Index: 11, Events: []stream.Event{{Index: 11}, {Index: 11}}} - thirdEvent := &stream.Events{Index: 202, Events: []stream.Event{{Index: 202}, {Index: 202}}} + firstEvent := &structs.Events{Index: 10, Events: []structs.Event{{Index: 10}, {Index: 10}}} + secondEvent := &structs.Events{Index: 11, Events: []structs.Event{{Index: 11}, {Index: 11}}} + thirdEvent := &structs.Events{Index: 202, Events: []structs.Event{{Index: 202}, {Index: 202}}} insertEvents(firstEvent) insertEvents(secondEvent) insertEvents(thirdEvent) diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index 117eebc29..a8c48b1ff 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -34,21 +34,35 @@ type Changes struct { // sent to the EventPublisher which will create and emit change events. type changeTrackerDB struct { db *memdb.MemDB + durableEvents bool + durableCount int publisher *stream.EventPublisher - processChanges func(ReadTxn, Changes) ([]stream.Event, error) + processChanges func(ReadTxn, Changes) (structs.Events, error) } -func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, changesFn changeProcessor) *changeTrackerDB { +// ChangeConfig +type ChangeConfig struct { + DurableEvents bool + DurableCount int +} + +func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, changesFn changeProcessor, cfg *ChangeConfig) *changeTrackerDB { + if cfg == nil { + cfg = &ChangeConfig{} + } + return &changeTrackerDB{ db: db, publisher: publisher, processChanges: changesFn, + durableEvents: cfg.DurableEvents, + durableCount: cfg.DurableCount, } } -type changeProcessor func(ReadTxn, Changes) ([]stream.Event, error) +type changeProcessor func(ReadTxn, Changes) (structs.Events, error) -func noOpProcessChanges(ReadTxn, Changes) ([]stream.Event, error) { return []stream.Event{}, nil } +func noOpProcessChanges(ReadTxn, Changes) (structs.Events, error) { return structs.Events{}, nil } // ReadTxn returns a read-only transaction which behaves exactly the same as // memdb.Txn @@ -81,25 +95,27 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64) *txn { t := &txn{ - msgType: msgType, - Txn: c.db.Txn(true), - Index: idx, - publish: c.publish, + msgType: msgType, + Txn: c.db.Txn(true), + Index: idx, + publish: c.publish, + persistChanges: c.durableEvents, } t.Txn.TrackChanges() return t } -func (c *changeTrackerDB) publish(changes Changes) error { +func (c *changeTrackerDB) publish(changes Changes) (structs.Events, error) { readOnlyTx := c.db.Txn(false) defer readOnlyTx.Abort() events, err := c.processChanges(readOnlyTx, changes) if err != nil { - return fmt.Errorf("failed generating events from changes: %v", err) + return structs.Events{}, fmt.Errorf("failed generating events from changes: %v", err) } - c.publisher.Publish(changes.Index, events) - return nil + + c.publisher.Publish(events) + return events, nil } // WriteTxnRestore returns a wrapped RW transaction that does NOT have change @@ -125,13 +141,15 @@ type txn struct { // msgType is used to inform event sourcing which type of event to create msgType structs.MessageType + persistChanges bool + *memdb.Txn // Index in raft where the write is occurring. The value is zero for a // read-only, or WriteTxnRestore transaction. // Index is stored so that it may be passed along to any subscribers as part // of a change event. Index uint64 - publish func(changes Changes) error + publish func(changes Changes) (structs.Events, error) } // Commit first pushes changes to EventPublisher, then calls Commit on the @@ -150,9 +168,18 @@ func (tx *txn) Commit() error { Changes: tx.Txn.Changes(), MsgType: tx.MsgType(), } - if err := tx.publish(changes); err != nil { + events, err := tx.publish(changes) + if err != nil { return err } + + if tx.persistChanges { + // persist events after processing changes + err := tx.Txn.Insert("events", events) + if err != nil { + return err + } + } } tx.Txn.Commit() @@ -166,11 +193,11 @@ func (tx *txn) MsgType() structs.MessageType { return tx.msgType } -func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { +func processDBChanges(tx ReadTxn, changes Changes) (structs.Events, error) { switch changes.MsgType { case structs.IgnoreUnknownTypeFlag: // unknown event type - return []stream.Event{}, nil + return structs.Events{}, nil case structs.NodeRegisterRequestType: return NodeRegisterEventFromChanges(tx, changes) case structs.NodeUpdateStatusRequestType: @@ -201,5 +228,5 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { // TODO(drew) test return GenericEventsFromChanges(tx, changes) } - return []stream.Event{}, nil + return structs.Events{}, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index a2271676f..18efdfd36 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -47,6 +47,9 @@ type StateStoreConfig struct { Region string EnablePublisher bool + + EnableDurability bool + DurableCount int } // The StateStore is responsible for maintaining all the Nomad @@ -90,14 +93,18 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { } if config.EnablePublisher { + cfg := &ChangeConfig{ + DurableEvents: config.EnableDurability, + DurableCount: 1000, + } publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{ EventBufferTTL: 1 * time.Hour, EventBufferSize: 250, Logger: config.Logger, }) - s.db = NewChangeTrackerDB(db, publisher, processDBChanges) + s.db = NewChangeTrackerDB(db, publisher, processDBChanges, cfg) } else { - s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges) + s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges, nil) } // Initialize the state store with required enterprise objects @@ -132,7 +139,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { } // Create a new change tracker DB that does not publish or track changes - store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges) + store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges, nil) snap := &StateSnapshot{ StateStore: store, @@ -5752,6 +5759,49 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[ return nil, nil } +// LatestEventsReverse returns the unfiltered list of all volumes +func (s *StateStore) LatestEventsReverse(ws memdb.WatchSet) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + defer txn.Abort() + + iter, err := txn.GetReverse("events", "id") + if err != nil { + return nil, fmt.Errorf("events lookup failed: %v", err) + } + + ws.Add(iter.WatchCh()) + + return iter, nil +} + +// Events returns the unfiltered list of all volumes +func (s *StateStore) Events(ws memdb.WatchSet) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + defer txn.Abort() + + iter, err := txn.Get("events", "id") + if err != nil { + return nil, fmt.Errorf("events lookup failed: %v", err) + } + + ws.Add(iter.WatchCh()) + + return iter, nil +} + +// UpsertEvents is used to insert events. It should only be used for testing. +// Normal use events are inserted to go-memdb during transaction commit +func (s *StateStore) UpsertEvents(index uint64, events *structs.Events) error { + txn := s.db.WriteTxn(index) + defer txn.Abort() + + if err := txn.Insert("events", events); err != nil { + return err + } + txn.Commit() + return nil +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore @@ -5999,6 +6049,13 @@ func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error { return nil } +func (r *StateRestore) EventRestore(events *structs.Events) error { + if err := r.txn.Insert("events", events); err != nil { + return fmt.Errorf("events insert failed: %v", err) + } + return nil +} + func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error { if err := r.txn.Insert("scaling_event", jobEvents); err != nil { return fmt.Errorf("scaling event insert failed: %v", err) diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 6aa9039f4..86acaf9e7 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -26,9 +26,10 @@ func TestStateStore(t testing.T) *StateStore { func TestStateStorePublisher(t testing.T) *StateStoreConfig { return &StateStoreConfig{ - Logger: testlog.HCLogger(t), - Region: "global", - EnablePublisher: true, + Logger: testlog.HCLogger(t), + Region: "global", + EnablePublisher: true, + EnableDurability: true, } } func TestStateStoreCfg(t testing.T, cfg *StateStoreConfig) *StateStore { diff --git a/nomad/stream/event.go b/nomad/stream/event.go deleted file mode 100644 index 33679723e..000000000 --- a/nomad/stream/event.go +++ /dev/null @@ -1,21 +0,0 @@ -package stream - -const ( - AllKeys = "*" -) - -type Topic string - -type Event struct { - Topic Topic - Type string - Key string - FilterKeys []string - Index uint64 - Payload interface{} -} - -type Events struct { - Index uint64 - Events []Event -} diff --git a/nomad/stream/event_buffer.go b/nomad/stream/event_buffer.go index 145776225..fda04fa4f 100644 --- a/nomad/stream/event_buffer.go +++ b/nomad/stream/event_buffer.go @@ -6,6 +6,8 @@ import ( "fmt" "sync/atomic" "time" + + "github.com/hashicorp/nomad/nomad/structs" ) // eventBuffer is a single-writer, multiple-reader, fixed length concurrent @@ -60,7 +62,7 @@ func newEventBuffer(size int64, maxItemTTL time.Duration) *eventBuffer { maxItemTTL: maxItemTTL, } - item := newBufferItem(0, nil) + item := newBufferItem(structs.Events{Index: 0, Events: nil}) b.head.Store(item) b.tail.Store(item) @@ -73,8 +75,8 @@ func newEventBuffer(size int64, maxItemTTL time.Duration) *eventBuffer { // mutations to the events as they may have been exposed to subscribers in other // goroutines. Append only supports a single concurrent caller and must be // externally synchronized with other Append, AppendBuffer or AppendErr calls. -func (b *eventBuffer) Append(index uint64, events []Event) { - b.appendItem(newBufferItem(index, events)) +func (b *eventBuffer) Append(events structs.Events) { + b.appendItem(newBufferItem(events)) } func (b *eventBuffer) appendItem(item *bufferItem) { @@ -200,7 +202,7 @@ type bufferItem struct { // should check and skip nil Events at any point in the buffer. It will also // be nil if the producer appends an Error event because they can't complete // the request to populate the buffer. Err will be non-nil in this case. - Events []Event + Events []structs.Event Index uint64 @@ -237,14 +239,14 @@ type bufferLink struct { // newBufferItem returns a blank buffer item with a link and chan ready to have // the fields set and be appended to a buffer. -func newBufferItem(index uint64, events []Event) *bufferItem { +func newBufferItem(events structs.Events) *bufferItem { return &bufferItem{ link: &bufferLink{ ch: make(chan struct{}), droppedCh: make(chan struct{}), }, - Events: events, - Index: index, + Events: events.Events, + Index: events.Index, createdAt: time.Now(), } } diff --git a/nomad/stream/event_buffer_test.go b/nomad/stream/event_buffer_test.go index 15a96a4e0..5b1b6e623 100644 --- a/nomad/stream/event_buffer_test.go +++ b/nomad/stream/event_buffer_test.go @@ -3,6 +3,7 @@ package stream import ( "context" "fmt" + "github.com/hashicorp/nomad/nomad/structs" "math/rand" "testing" "time" @@ -31,10 +32,10 @@ func TestEventBufferFuzz(t *testing.T) { for i := 0; i < nMessages; i++ { // Event content is arbitrary and not valid for our use of buffers in // streaming - here we only care about the semantics of the buffer. - e := Event{ + e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(uint64(i), []Event{e}) + b.Append(structs.Events{Index: uint64(i), Events: []structs.Event{e}}) // Sleep sometimes for a while to let some subscribers catch up wait := time.Duration(z.Uint64()) * time.Millisecond time.Sleep(wait) @@ -87,19 +88,19 @@ func TestEventBuffer_Slow_Reader(t *testing.T) { b := newEventBuffer(10, DefaultTTL) for i := 0; i < 10; i++ { - e := Event{ + e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(uint64(i), []Event{e}) + b.Append(structs.Events{uint64(i), []structs.Event{e}}) } head := b.Head() for i := 10; i < 15; i++ { - e := Event{ + e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(uint64(i), []Event{e}) + b.Append(structs.Events{uint64(i), []structs.Event{e}}) } // Ensure the slow reader errors to handle dropped events and @@ -116,10 +117,10 @@ func TestEventBuffer_Size(t *testing.T) { b := newEventBuffer(100, DefaultTTL) for i := 0; i < 10; i++ { - e := Event{ + e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(uint64(i), []Event{e}) + b.Append(structs.Events{uint64(i), []structs.Event{e}}) } require.Equal(t, 10, b.Len()) @@ -132,10 +133,10 @@ func TestEventBuffer_Prune_AllOld(t *testing.T) { b := newEventBuffer(100, 1*time.Second) for i := 0; i < 10; i++ { - e := Event{ + e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(uint64(i), []Event{e}) + b.Append(structs.Events{uint64(i), []structs.Event{e}}) } require.Equal(t, 10, int(b.Len())) @@ -185,10 +186,10 @@ func TestStartAt_CurrentIdx_Past_Start(t *testing.T) { b := newEventBuffer(100, 1*time.Hour) for i := 11; i <= 100; i++ { - e := Event{ + e := structs.Event{ Index: uint64(i), // Indexes should be contiguous } - b.Append(uint64(i), []Event{e}) + b.Append(structs.Events{uint64(i), []structs.Event{e}}) } for _, tc := range cases { diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go index 0a6bb79cd..16e7f0a73 100644 --- a/nomad/stream/event_publisher.go +++ b/nomad/stream/event_publisher.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/go-hclog" ) @@ -36,7 +38,7 @@ type EventPublisher struct { // publishCh is used to send messages from an active txn to a goroutine which // publishes events, so that publishing can happen asynchronously from // the Commit call in the FSM hot path. - publishCh chan changeEvents + publishCh chan structs.Events } type subscriptions struct { @@ -65,7 +67,7 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish e := &EventPublisher{ logger: cfg.Logger.Named("event_publisher"), eventBuf: buffer, - publishCh: make(chan changeEvents, 64), + publishCh: make(chan structs.Events, 64), subscriptions: &subscriptions{ byToken: make(map[string]map[*SubscribeRequest]*Subscription), }, @@ -79,9 +81,9 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish } // Publish events to all subscribers of the event Topic. -func (e *EventPublisher) Publish(index uint64, events []Event) { - if len(events) > 0 { - e.publishCh <- changeEvents{index: index, events: events} +func (e *EventPublisher) Publish(events structs.Events) { + if len(events.Events) > 0 { + e.publishCh <- events } } @@ -102,7 +104,7 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) } // Empty head so that calling Next on sub - start := newBufferItem(req.Index, []Event{}) + start := newBufferItem(structs.Events{Index: req.Index}) start.link.next.Store(head) close(start.link.ch) @@ -137,17 +139,12 @@ func (e *EventPublisher) periodicPrune(ctx context.Context) { } } -type changeEvents struct { - index uint64 - events []Event -} - // sendEvents sends the given events to the publishers event buffer. -func (e *EventPublisher) sendEvents(update changeEvents) { +func (e *EventPublisher) sendEvents(update structs.Events) { e.lock.Lock() defer e.lock.Unlock() - e.eventBuf.Append(update.index, update.events) + e.eventBuf.Append(update) } func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { diff --git a/nomad/stream/event_publisher_test.go b/nomad/stream/event_publisher_test.go index 691574a65..1cdd0f347 100644 --- a/nomad/stream/event_publisher_test.go +++ b/nomad/stream/event_publisher_test.go @@ -2,6 +2,7 @@ package stream import ( "context" + "github.com/hashicorp/nomad/nomad/structs" "testing" "time" @@ -10,7 +11,7 @@ import ( func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) { subscription := &SubscribeRequest{ - Topics: map[Topic][]string{ + Topics: map[structs.Topic][]string{ "Test": []string{"sub-key"}, }, } @@ -25,35 +26,35 @@ func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) { // Now subscriber should block waiting for updates assertNoResult(t, eventCh) - events := []Event{{ + events := []structs.Event{{ Index: 1, Topic: "Test", Key: "sub-key", Payload: "sample payload", }} - publisher.Publish(1, events) + publisher.Publish(structs.Events{Index: 1, Events: events}) // Subscriber should see the published event result := nextResult(t, eventCh) require.NoError(t, result.Err) - expected := []Event{{Payload: "sample payload", Key: "sub-key", Topic: "Test", Index: 1}} + expected := []structs.Event{{Payload: "sample payload", Key: "sub-key", Topic: "Test", Index: 1}} require.Equal(t, expected, result.Events) // Now subscriber should block waiting for updates assertNoResult(t, eventCh) // Publish a second event - events = []Event{{ + events = []structs.Event{{ Index: 2, Topic: "Test", Key: "sub-key", Payload: "sample payload 2", }} - publisher.Publish(2, events) + publisher.Publish(structs.Events{Index: 2, Events: events}) result = nextResult(t, eventCh) require.NoError(t, result.Err) - expected = []Event{{Payload: "sample payload 2", Key: "sub-key", Topic: "Test", Index: 2}} + expected = []structs.Event{{Payload: "sample payload 2", Key: "sub-key", Topic: "Test", Index: 2}} require.Equal(t, expected, result.Events) } @@ -98,7 +99,7 @@ func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextR } type subNextResult struct { - Events []Event + Events []structs.Event Err error } diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go index cb5195e54..cd3befc7e 100644 --- a/nomad/stream/ndjson.go +++ b/nomad/stream/ndjson.go @@ -7,43 +7,40 @@ import ( "fmt" "sync" "time" + + "github.com/hashicorp/nomad/nomad/structs" ) var ( // NDJsonHeartbeat is the NDJson to send as a heartbeat // Avoids creating many heartbeat instances - NDJsonHeartbeat = &NDJson{Data: []byte("{}\n")} + NDJsonHeartbeat = &structs.NDJson{Data: []byte("{}\n")} ) // NDJsonStream is used to send new line delimited JSON and heartbeats // to a destination (out channel) type NDJsonStream struct { - out chan<- *NDJson + out chan<- *structs.NDJson // heartbeat is the interval to send heartbeat messages to keep a connection // open. heartbeat *time.Ticker - publishCh chan NDJson + publishCh chan structs.NDJson exitCh chan struct{} l sync.Mutex running bool } -// NNDJson is a wrapper for a Newline Delimited JSON object -type NDJson struct { - Data []byte -} - // NewNNewNDJsonStream creates a new NDJson stream that will output NDJson structs // to the passed output channel -func NewNDJsonStream(out chan<- *NDJson, heartbeat time.Duration) *NDJsonStream { +func NewNDJsonStream(out chan<- *structs.NDJson, heartbeat time.Duration) *NDJsonStream { return &NDJsonStream{ out: out, heartbeat: time.NewTicker(heartbeat), exitCh: make(chan struct{}), - publishCh: make(chan NDJson), + publishCh: make(chan structs.NDJson), } } @@ -97,18 +94,10 @@ func (n *NDJsonStream) Send(obj interface{}) error { } select { - case n.publishCh <- NDJson{Data: buf.Bytes()}: + case n.publishCh <- structs.NDJson{Data: buf.Bytes()}: case <-n.exitCh: return fmt.Errorf("stream is no longer running") } return nil } - -func (j *NDJson) Copy() *NDJson { - n := new(NDJson) - *n = *j - n.Data = make([]byte, len(j.Data)) - copy(n.Data, j.Data) - return n -} diff --git a/nomad/stream/ndjson_test.go b/nomad/stream/ndjson_test.go index 8e807938f..589cde1a7 100644 --- a/nomad/stream/ndjson_test.go +++ b/nomad/stream/ndjson_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" ) @@ -19,7 +20,7 @@ func TestNDJson(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - out := make(chan *NDJson) + out := make(chan *structs.NDJson) s := NewNDJsonStream(out, 1*time.Second) s.Run(ctx) @@ -45,7 +46,7 @@ func TestNDJson_Send_After_Stop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - out := make(chan *NDJson) + out := make(chan *structs.NDJson) s := NewNDJsonStream(out, 1*time.Second) s.Run(ctx) @@ -62,7 +63,7 @@ func TestNDJson_HeartBeat(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - out := make(chan *NDJson) + out := make(chan *structs.NDJson) s := NewNDJsonStream(out, 10*time.Millisecond) s.Run(ctx) diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go index bae3d091a..555be5834 100644 --- a/nomad/stream/subscription.go +++ b/nomad/stream/subscription.go @@ -4,9 +4,12 @@ import ( "context" "errors" "sync/atomic" + + "github.com/hashicorp/nomad/nomad/structs" ) const ( + AllKeys = "*" // subscriptionStateOpen is the default state of a subscription. An open // subscription may receive new events. subscriptionStateOpen uint32 = 0 @@ -46,7 +49,7 @@ type SubscribeRequest struct { Token string Index uint64 - Topics map[Topic][]string + Topics map[structs.Topic][]string } func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription { @@ -58,18 +61,18 @@ func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Sub } } -func (s *Subscription) Next(ctx context.Context) (Events, error) { +func (s *Subscription) Next(ctx context.Context) (structs.Events, error) { if atomic.LoadUint32(&s.state) == subscriptionStateClosed { - return Events{}, ErrSubscriptionClosed + return structs.Events{}, ErrSubscriptionClosed } for { next, err := s.currentItem.Next(ctx, s.forceClosed) switch { case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed: - return Events{}, ErrSubscriptionClosed + return structs.Events{}, ErrSubscriptionClosed case err != nil: - return Events{}, err + return structs.Events{}, err } s.currentItem = next @@ -77,11 +80,11 @@ func (s *Subscription) Next(ctx context.Context) (Events, error) { if len(events) == 0 { continue } - return Events{Index: next.Index, Events: events}, nil + return structs.Events{Index: next.Index, Events: events}, nil } } -func (s *Subscription) NextNoBlock() ([]Event, error) { +func (s *Subscription) NextNoBlock() ([]structs.Event, error) { if atomic.LoadUint32(&s.state) == subscriptionStateClosed { return nil, ErrSubscriptionClosed } @@ -113,7 +116,7 @@ func (s *Subscription) Unsubscribe() { } // filter events to only those that match a subscriptions topic/keys -func filter(req *SubscribeRequest, events []Event) []Event { +func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { if len(events) == 0 { return events } @@ -145,7 +148,7 @@ func filter(req *SubscribeRequest, events []Event) []Event { } // Return filtered events - result := make([]Event, 0, count) + result := make([]structs.Event, 0, count) for _, e := range events { _, allTopics := req.Topics[AllKeys] if _, ok := req.Topics[e.Topic]; ok || allTopics { diff --git a/nomad/stream/subscription_test.go b/nomad/stream/subscription_test.go index 8dd841bd2..659a7f1a2 100644 --- a/nomad/stream/subscription_test.go +++ b/nomad/stream/subscription_test.go @@ -1,6 +1,7 @@ package stream import ( + "github.com/hashicorp/nomad/nomad/structs" "testing" "github.com/stretchr/testify/require" @@ -11,11 +12,11 @@ func TestSubscription(t *testing.T) { } func TestFilter_AllTopics(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + events := make([]structs.Event, 0, 5) + events = append(events, structs.Event{Topic: "Test", Key: "One"}, structs.Event{Topic: "Test", Key: "Two"}) req := &SubscribeRequest{ - Topics: map[Topic][]string{ + Topics: map[structs.Topic][]string{ "*": []string{"*"}, }, } @@ -27,11 +28,11 @@ func TestFilter_AllTopics(t *testing.T) { } func TestFilter_AllKeys(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + events := make([]structs.Event, 0, 5) + events = append(events, structs.Event{Topic: "Test", Key: "One"}, structs.Event{Topic: "Test", Key: "Two"}) req := &SubscribeRequest{ - Topics: map[Topic][]string{ + Topics: map[structs.Topic][]string{ "Test": []string{"*"}, }, } @@ -43,49 +44,49 @@ func TestFilter_AllKeys(t *testing.T) { } func TestFilter_PartialMatch_Topic(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}, Event{Topic: "Exclude", Key: "Two"}) + events := make([]structs.Event, 0, 5) + events = append(events, structs.Event{Topic: "Test", Key: "One"}, structs.Event{Topic: "Test", Key: "Two"}, structs.Event{Topic: "Exclude", Key: "Two"}) req := &SubscribeRequest{ - Topics: map[Topic][]string{ + Topics: map[structs.Topic][]string{ "Test": []string{"*"}, }, } actual := filter(req, events) - expected := []Event{{Topic: "Test", Key: "One"}, {Topic: "Test", Key: "Two"}} + expected := []structs.Event{{Topic: "Test", Key: "One"}, {Topic: "Test", Key: "Two"}} require.Equal(t, expected, actual) require.Equal(t, cap(actual), 2) } func TestFilter_PartialMatch_Key(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + events := make([]structs.Event, 0, 5) + events = append(events, structs.Event{Topic: "Test", Key: "One"}, structs.Event{Topic: "Test", Key: "Two"}) req := &SubscribeRequest{ - Topics: map[Topic][]string{ + Topics: map[structs.Topic][]string{ "Test": []string{"One"}, }, } actual := filter(req, events) - expected := []Event{{Topic: "Test", Key: "One"}} + expected := []structs.Event{{Topic: "Test", Key: "One"}} require.Equal(t, expected, actual) require.Equal(t, cap(actual), 1) } func TestFilter_NoMatch(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + events := make([]structs.Event, 0, 5) + events = append(events, structs.Event{Topic: "Test", Key: "One"}, structs.Event{Topic: "Test", Key: "Two"}) req := &SubscribeRequest{ - Topics: map[Topic][]string{ + Topics: map[structs.Topic][]string{ "NodeEvents": []string{"*"}, "Test": []string{"Highly-Specific-Key"}, }, } actual := filter(req, events) - var expected []Event + var expected []structs.Event require.Equal(t, expected, actual) require.Equal(t, cap(actual), 0) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c00aa1d49..4192a27b0 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -39,7 +39,6 @@ import ( "github.com/hashicorp/nomad/helper/constraints/semver" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/lib/kheap" - "github.com/hashicorp/nomad/nomad/stream" psstructs "github.com/hashicorp/nomad/plugins/shared/structs" ) @@ -10712,7 +10711,7 @@ type ACLTokenUpsertResponse struct { // EEventStreamRequest is used to stream events from a servers // EventPublisher type EventStreamRequest struct { - Topics map[stream.Topic][]string + Topics map[Topic][]string Index int QueryOptions @@ -10720,7 +10719,7 @@ type EventStreamRequest struct { type EventStreamWrapper struct { Error *RpcError - Event *stream.NDJson + Event *NDJson } // RpcError is used for serializing errors with a potential error code @@ -10739,3 +10738,32 @@ func NewRpcError(err error, code *int64) *RpcError { func (r *RpcError) Error() string { return r.Message } + +type Topic string + +type Event struct { + Topic Topic + Type string + Key string + FilterKeys []string + Index uint64 + Payload interface{} +} + +type Events struct { + Index uint64 + Events []Event +} + +// NNDJson is a wrapper for a Newline Delimited JSON object +type NDJson struct { + Data []byte +} + +func (j *NDJson) Copy() *NDJson { + n := new(NDJson) + *n = *j + n.Data = make([]byte, len(j.Data)) + copy(n.Data, j.Data) + return n +} diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 1294604d2..b0aa7be3b 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1,7 +1,6 @@ package scheduler import ( - "context" "fmt" "reflect" "sort" @@ -3108,7 +3107,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { newAlloc.ClientStatus = structs.AllocClientStatusRunning running = append(running, newAlloc) } - require.NoError(t, h.State.UpdateAllocsFromClient(context.Background(), h.NextIndex(), running)) + require.NoError(t, h.State.UpdateAllocsFromClient(structs.MsgTypeTestSetup, h.NextIndex(), running)) // Mark some of the allocations as complete var complete []*structs.Allocation @@ -3127,7 +3126,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { newAlloc.ClientStatus = structs.AllocClientStatusComplete complete = append(complete, newAlloc) } - require.NoError(t, h.State.UpdateAllocsFromClient(context.Background(), h.NextIndex(), complete)) + require.NoError(t, h.State.UpdateAllocsFromClient(structs.MsgTypeTestSetup, h.NextIndex(), complete)) // Create a mock evaluation to deal with the node update eval := &structs.Evaluation{ diff --git a/vendor/github.com/hashicorp/go-immutable-radix/CHANGELOG.md b/vendor/github.com/hashicorp/go-immutable-radix/CHANGELOG.md index a967ae456..6331af921 100644 --- a/vendor/github.com/hashicorp/go-immutable-radix/CHANGELOG.md +++ b/vendor/github.com/hashicorp/go-immutable-radix/CHANGELOG.md @@ -1,4 +1,16 @@ -# UNRELEASED +# 1.3.0 (September 17th, 2020) + +FEATURES + +* Add reverse tree traversal [[GH-30](https://github.com/hashicorp/go-immutable-radix/pull/30)] + +# 1.2.0 (March 18th, 2020) + +FEATURES + +* Adds a `Clone` method to `Txn` allowing transactions to be split either into two independently mutable trees. [[GH-26](https://github.com/hashicorp/go-immutable-radix/pull/26)] + +# 1.1.0 (May 22nd, 2019) FEATURES diff --git a/vendor/github.com/hashicorp/go-immutable-radix/iter.go b/vendor/github.com/hashicorp/go-immutable-radix/iter.go index 1ecaf831c..cd16d3bea 100644 --- a/vendor/github.com/hashicorp/go-immutable-radix/iter.go +++ b/vendor/github.com/hashicorp/go-immutable-radix/iter.go @@ -155,7 +155,7 @@ func (i *Iterator) Next() ([]byte, interface{}, bool) { // Initialize our stack if needed if i.stack == nil && i.node != nil { i.stack = []edges{ - edges{ + { edge{node: i.node}, }, } diff --git a/vendor/github.com/hashicorp/go-immutable-radix/node.go b/vendor/github.com/hashicorp/go-immutable-radix/node.go index 3ab904edc..359854808 100644 --- a/vendor/github.com/hashicorp/go-immutable-radix/node.go +++ b/vendor/github.com/hashicorp/go-immutable-radix/node.go @@ -211,6 +211,12 @@ func (n *Node) Iterator() *Iterator { return &Iterator{node: n} } +// ReverseIterator is used to return an iterator at +// the given node to walk the tree backwards +func (n *Node) ReverseIterator() *ReverseIterator { + return NewReverseIterator(n) +} + // rawIterator is used to return a raw iterator at the given node to walk the // tree. func (n *Node) rawIterator() *rawIterator { @@ -224,6 +230,11 @@ func (n *Node) Walk(fn WalkFn) { recursiveWalk(n, fn) } +// WalkBackwards is used to walk the tree in reverse order +func (n *Node) WalkBackwards(fn WalkFn) { + reverseRecursiveWalk(n, fn) +} + // WalkPrefix is used to walk the tree under a prefix func (n *Node) WalkPrefix(prefix []byte, fn WalkFn) { search := prefix @@ -302,3 +313,22 @@ func recursiveWalk(n *Node, fn WalkFn) bool { } return false } + +// reverseRecursiveWalk is used to do a reverse pre-order +// walk of a node recursively. Returns true if the walk +// should be aborted +func reverseRecursiveWalk(n *Node, fn WalkFn) bool { + // Visit the leaf values if any + if n.leaf != nil && fn(n.leaf.key, n.leaf.val) { + return true + } + + // Recurse on the children in reverse order + for i := len(n.edges) - 1; i >= 0; i-- { + e := n.edges[i] + if reverseRecursiveWalk(e.node, fn) { + return true + } + } + return false +} diff --git a/vendor/github.com/hashicorp/go-immutable-radix/raw_iter.go b/vendor/github.com/hashicorp/go-immutable-radix/raw_iter.go index 04814c132..3c6a22525 100644 --- a/vendor/github.com/hashicorp/go-immutable-radix/raw_iter.go +++ b/vendor/github.com/hashicorp/go-immutable-radix/raw_iter.go @@ -41,7 +41,7 @@ func (i *rawIterator) Next() { // Initialize our stack if needed. if i.stack == nil && i.node != nil { i.stack = []rawStackEntry{ - rawStackEntry{ + { edges: edges{ edge{node: i.node}, }, diff --git a/vendor/github.com/hashicorp/go-immutable-radix/reverse_iter.go b/vendor/github.com/hashicorp/go-immutable-radix/reverse_iter.go new file mode 100644 index 000000000..762471bc3 --- /dev/null +++ b/vendor/github.com/hashicorp/go-immutable-radix/reverse_iter.go @@ -0,0 +1,177 @@ +package iradix + +import ( + "bytes" +) + +// ReverseIterator is used to iterate over a set of nodes +// in reverse in-order +type ReverseIterator struct { + i *Iterator +} + +// NewReverseIterator returns a new ReverseIterator at a node +func NewReverseIterator(n *Node) *ReverseIterator { + return &ReverseIterator{ + i: &Iterator{node: n}, + } +} + +// SeekPrefixWatch is used to seek the iterator to a given prefix +// and returns the watch channel of the finest granularity +func (ri *ReverseIterator) SeekPrefixWatch(prefix []byte) (watch <-chan struct{}) { + return ri.i.SeekPrefixWatch(prefix) +} + +// SeekPrefix is used to seek the iterator to a given prefix +func (ri *ReverseIterator) SeekPrefix(prefix []byte) { + ri.i.SeekPrefixWatch(prefix) +} + +func (ri *ReverseIterator) recurseMax(n *Node) *Node { + // Traverse to the maximum child + if n.leaf != nil { + return n + } + if len(n.edges) > 0 { + // Add all the other edges to the stack (the max node will be added as + // we recurse) + m := len(n.edges) + ri.i.stack = append(ri.i.stack, n.edges[:m-1]) + return ri.recurseMax(n.edges[m-1].node) + } + // Shouldn't be possible + return nil +} + +// SeekReverseLowerBound is used to seek the iterator to the largest key that is +// lower or equal to the given key. There is no watch variant as it's hard to +// predict based on the radix structure which node(s) changes might affect the +// result. +func (ri *ReverseIterator) SeekReverseLowerBound(key []byte) { + // Wipe the stack. Unlike Prefix iteration, we need to build the stack as we + // go because we need only a subset of edges of many nodes in the path to the + // leaf with the lower bound. + ri.i.stack = []edges{} + n := ri.i.node + search := key + + found := func(n *Node) { + ri.i.node = n + ri.i.stack = append(ri.i.stack, edges{edge{node: n}}) + } + + for { + // Compare current prefix with the search key's same-length prefix. + var prefixCmp int + if len(n.prefix) < len(search) { + prefixCmp = bytes.Compare(n.prefix, search[0:len(n.prefix)]) + } else { + prefixCmp = bytes.Compare(n.prefix, search) + } + + if prefixCmp < 0 { + // Prefix is smaller than search prefix, that means there is no lower bound. + // But we are looking in reverse, so the reverse lower bound will be the + // largest leaf under this subtree, since it is the value that would come + // right before the current search prefix if it were in the tree. So we need + // to follow the maximum path in this subtree to find it. + n = ri.recurseMax(n) + if n != nil { + found(n) + } + return + } + + if prefixCmp > 0 { + // Prefix is larger than search prefix, that means there is no reverse lower + // bound since nothing comes before our current search prefix. + ri.i.node = nil + return + } + + // Prefix is equal, we are still heading for an exact match. If this is a + // leaf we're done. + if n.leaf != nil { + if bytes.Compare(n.leaf.key, key) < 0 { + ri.i.node = nil + return + } + found(n) + return + } + + // Consume the search prefix + if len(n.prefix) > len(search) { + search = []byte{} + } else { + search = search[len(n.prefix):] + } + + // Otherwise, take the lower bound next edge. + idx, lbNode := n.getLowerBoundEdge(search[0]) + + // From here, we need to update the stack with all values lower than + // the lower bound edge. Since getLowerBoundEdge() returns -1 when the + // search prefix is larger than all edges, we need to place idx at the + // last edge index so they can all be place in the stack, since they + // come before our search prefix. + if idx == -1 { + idx = len(n.edges) + } + + // Create stack edges for the all strictly lower edges in this node. + if len(n.edges[:idx]) > 0 { + ri.i.stack = append(ri.i.stack, n.edges[:idx]) + } + + // Exit if there's not lower bound edge. The stack will have the + // previous nodes already. + if lbNode == nil { + ri.i.node = nil + return + } + + ri.i.node = lbNode + // Recurse + n = lbNode + } +} + +// Previous returns the previous node in reverse order +func (ri *ReverseIterator) Previous() ([]byte, interface{}, bool) { + // Initialize our stack if needed + if ri.i.stack == nil && ri.i.node != nil { + ri.i.stack = []edges{ + { + edge{node: ri.i.node}, + }, + } + } + + for len(ri.i.stack) > 0 { + // Inspect the last element of the stack + n := len(ri.i.stack) + last := ri.i.stack[n-1] + m := len(last) + elem := last[m-1].node + + // Update the stack + if m > 1 { + ri.i.stack[n-1] = last[:m-1] + } else { + ri.i.stack = ri.i.stack[:n-1] + } + + // Push the edges onto the frontier + if len(elem.edges) > 0 { + ri.i.stack = append(ri.i.stack, elem.edges) + } + + // Return the leaf values if any + if elem.leaf != nil { + return elem.leaf.key, elem.leaf.val, true + } + } + return nil, nil, false +} diff --git a/vendor/github.com/hashicorp/go-memdb/go.mod b/vendor/github.com/hashicorp/go-memdb/go.mod index 4b37934cb..242f5fac2 100644 --- a/vendor/github.com/hashicorp/go-memdb/go.mod +++ b/vendor/github.com/hashicorp/go-memdb/go.mod @@ -3,6 +3,6 @@ module github.com/hashicorp/go-memdb go 1.12 require ( - github.com/hashicorp/go-immutable-radix v1.2.0 + github.com/hashicorp/go-immutable-radix v1.3.0 github.com/hashicorp/golang-lru v0.5.4 // indirect ) diff --git a/vendor/github.com/hashicorp/go-memdb/go.sum b/vendor/github.com/hashicorp/go-memdb/go.sum index 8d330e75a..eaff521ce 100644 --- a/vendor/github.com/hashicorp/go-memdb/go.sum +++ b/vendor/github.com/hashicorp/go-memdb/go.sum @@ -1,5 +1,5 @@ -github.com/hashicorp/go-immutable-radix v1.2.0 h1:l6UW37iCXwZkZoAbEYnptSHVE/cQ5bOTPYG5W3vf9+8= -github.com/hashicorp/go-immutable-radix v1.2.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-immutable-radix v1.3.0 h1:8exGP7ego3OmkfksihtSouGMZ+hQrhxx+FVELeXpVPE= +github.com/hashicorp/go-immutable-radix v1.3.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= diff --git a/vendor/github.com/hashicorp/go-memdb/txn.go b/vendor/github.com/hashicorp/go-memdb/txn.go index 646684593..68734e37c 100644 --- a/vendor/github.com/hashicorp/go-memdb/txn.go +++ b/vendor/github.com/hashicorp/go-memdb/txn.go @@ -536,6 +536,34 @@ func (txn *Txn) FirstWatch(table, index string, args ...interface{}) (<-chan str return watch, value, nil } +// LastWatch is used to return the last matching object for +// the given constraints on the index along with the watch channel +func (txn *Txn) LastWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) { + // Get the index value + indexSchema, val, err := txn.getIndexValue(table, index, args...) + if err != nil { + return nil, nil, err + } + + // Get the index itself + indexTxn := txn.readableIndex(table, indexSchema.Name) + + // Do an exact lookup + if indexSchema.Unique && val != nil && indexSchema.Name == index { + watch, obj, ok := indexTxn.GetWatch(val) + if !ok { + return watch, nil, nil + } + return watch, obj, nil + } + + // Handle non-unique index by using an iterator and getting the last value + iter := indexTxn.Root().ReverseIterator() + watch := iter.SeekPrefixWatch(val) + _, value, _ := iter.Previous() + return watch, value, nil +} + // First is used to return the first matching object for // the given constraints on the index func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) { @@ -543,6 +571,13 @@ func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, er return val, err } +// Last is used to return the last matching object for +// the given constraints on the index +func (txn *Txn) Last(table, index string, args ...interface{}) (interface{}, error) { + _, val, err := txn.LastWatch(table, index, args...) + return val, err +} + // LongestPrefix is used to fetch the longest prefix match for the given // constraints on the index. Note that this will not work with the memdb // StringFieldIndex because it adds null terminators which prevent the @@ -654,6 +689,26 @@ func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, e return iter, nil } +// GetReverse is used to construct a Reverse ResultIterator over all the +// rows that match the given constraints of an index. +// The returned ResultIterator's Next() will return the next Previous value +func (txn *Txn) GetReverse(table, index string, args ...interface{}) (ResultIterator, error) { + indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...) + if err != nil { + return nil, err + } + + // Seek the iterator to the appropriate sub-set + watchCh := indexIter.SeekPrefixWatch(val) + + // Create an iterator + iter := &radixReverseIterator{ + iter: indexIter, + watchCh: watchCh, + } + return iter, nil +} + // LowerBound is used to construct a ResultIterator over all the the range of // rows that have an index value greater than or equal to the provide args. // Calling this then iterating until the rows are larger than required allows @@ -676,6 +731,29 @@ func (txn *Txn) LowerBound(table, index string, args ...interface{}) (ResultIter return iter, nil } +// ReverseLowerBound is used to construct a Reverse ResultIterator over all the +// the range of rows that have an index value less than or equal to the +// provide args. Calling this then iterating until the rows are lower than +// required allows range scans within an index. It is not possible to watch the +// resulting iterator since the radix tree doesn't efficiently allow watching +// on lower bound changes. The WatchCh returned will be nill and so will block +// forever. +func (txn *Txn) ReverseLowerBound(table, index string, args ...interface{}) (ResultIterator, error) { + indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...) + if err != nil { + return nil, err + } + + // Seek the iterator to the appropriate sub-set + indexIter.SeekReverseLowerBound(val) + + // Create an iterator + iter := &radixReverseIterator{ + iter: indexIter, + } + return iter, nil +} + // objectID is a tuple of table name and the raw internal id byte slice // converted to a string. It's only converted to a string to make it comparable // so this struct can be used as a map index. @@ -777,6 +855,22 @@ func (txn *Txn) getIndexIterator(table, index string, args ...interface{}) (*ira return indexIter, val, nil } +func (txn *Txn) getIndexIteratorReverse(table, index string, args ...interface{}) (*iradix.ReverseIterator, []byte, error) { + // Get the index value to scan + indexSchema, val, err := txn.getIndexValue(table, index, args...) + if err != nil { + return nil, nil, err + } + + // Get the index itself + indexTxn := txn.readableIndex(table, indexSchema.Name) + indexRoot := indexTxn.Root() + + // Get an interator over the index + indexIter := indexRoot.ReverseIterator() + return indexIter, val, nil +} + // Defer is used to push a new arbitrary function onto a stack which // gets called when a transaction is committed and finished. Deferred // functions are called in LIFO order, and only invoked at the end of @@ -805,6 +899,23 @@ func (r *radixIterator) Next() interface{} { return value } +type radixReverseIterator struct { + iter *iradix.ReverseIterator + watchCh <-chan struct{} +} + +func (r *radixReverseIterator) Next() interface{} { + _, value, ok := r.iter.Previous() + if !ok { + return nil + } + return value +} + +func (r *radixReverseIterator) WatchCh() <-chan struct{} { + return r.watchCh +} + // Snapshot creates a snapshot of the current state of the transaction. // Returns a new read-only transaction or nil if the transaction is already // aborted or committed. diff --git a/vendor/github.com/hashicorp/nomad/api/event.go b/vendor/github.com/hashicorp/nomad/api/event.go new file mode 100644 index 000000000..b52f4eb8e --- /dev/null +++ b/vendor/github.com/hashicorp/nomad/api/event.go @@ -0,0 +1,104 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" +) + +// Events is a set of events for a corresponding index. Events returned for the +// index depend on which topics are subscribed to when a request is made. +type Events struct { + Index uint64 + Events []Event +} + +// Topic is an event Topic +type Topic string + +// Event holds information related to an event that occurred in Nomad. +// The Payload is a hydrated object related to the Topic +type Event struct { + Topic Topic + Type string + Key string + FilterKeys []string + Index uint64 + Payload interface{} +} + +// IsHeartBeat specifies if the event is an empty heartbeat used to +// keep a connection alive. +func (e *Events) IsHeartBeat() bool { + return e.Index == 0 && len(e.Events) == 0 +} + +// EventStream is used to stream events from Nomad +type EventStream struct { + client *Client +} + +// EventStream returns a handle to the Events endpoint +func (c *Client) EventStream() *EventStream { + return &EventStream{client: c} +} + +// Stream establishes a new subscription to Nomad's event stream and streams +// results back to the returned channel. +func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, <-chan error) { + + errCh := make(chan error, 1) + + r, err := e.client.newRequest("GET", "/v1/event/stream") + if err != nil { + errCh <- err + return nil, errCh + } + r.setQueryOptions(q) + + // Build topic query params + for topic, keys := range topics { + for _, k := range keys { + r.params.Add("topic", fmt.Sprintf("%s:%s", topic, k)) + } + } + + _, resp, err := requireOK(e.client.doRequest(r)) + + if err != nil { + errCh <- err + return nil, errCh + } + + eventsCh := make(chan *Events, 10) + go func() { + defer resp.Body.Close() + + dec := json.NewDecoder(resp.Body) + + for { + select { + case <-ctx.Done(): + close(eventsCh) + return + default: + } + + // Decode next newline delimited json of events + var events Events + if err := dec.Decode(&events); err != nil { + close(eventsCh) + errCh <- err + return + } + if events.IsHeartBeat() { + continue + } + + eventsCh <- &events + + } + }() + + return eventsCh, errCh +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 218ee4eb7..5af8e5004 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -408,10 +408,10 @@ github.com/hashicorp/go-getter/helper/url # github.com/hashicorp/go-hclog v0.12.0 ## explicit github.com/hashicorp/go-hclog -# github.com/hashicorp/go-immutable-radix v1.2.0 +# github.com/hashicorp/go-immutable-radix v1.3.0 ## explicit github.com/hashicorp/go-immutable-radix -# github.com/hashicorp/go-memdb v1.2.1 +# github.com/hashicorp/go-memdb v1.2.1 => /home/drew/work/go/go-memdb ## explicit github.com/hashicorp/go-memdb # github.com/hashicorp/go-msgpack v1.1.5 @@ -1019,6 +1019,7 @@ honnef.co/go/tools/version # github.com/godbus/dbus => github.com/godbus/dbus v5.0.1+incompatible # github.com/golang/protobuf => github.com/golang/protobuf v1.3.4 # github.com/hashicorp/go-discover => github.com/hashicorp/go-discover v0.0.0-20200812215701-c4b85f6ed31f +# github.com/hashicorp/go-memdb => /home/drew/work/go/go-memdb # github.com/hashicorp/nomad/api => ./api # github.com/kr/pty => github.com/kr/pty v1.1.5 # github.com/shirou/gopsutil => github.com/hashicorp/gopsutil v2.18.13-0.20200531184148-5aca383d4f9d+incompatible