From cde4823f8b286cfbc6bb68c8654fa4cc8a08d916 Mon Sep 17 00:00:00 2001 From: Kris Hicks Date: Fri, 30 Oct 2020 10:07:38 -0700 Subject: [PATCH] Update subscription filter func (#9232) This adds support for specifying a global topic match for a specific key. --- nomad/stream/subscription.go | 88 ++++++++++++++----------------- nomad/stream/subscription_test.go | 60 +++++++++++++++++---- 2 files changed, 89 insertions(+), 59 deletions(-) diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go index 70b133a63..341adeb2d 100644 --- a/nomad/stream/subscription.go +++ b/nomad/stream/subscription.go @@ -124,68 +124,60 @@ func (s *Subscription) Unsubscribe() { // filter events to only those that match a subscriptions topic/keys/namespace func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { if len(events) == 0 { - return events - } - - var count int - for _, e := range events { - _, allTopics := req.Topics[structs.TopicAll] - if _, ok := req.Topics[e.Topic]; ok || allTopics { - var keys []string - if allTopics { - keys = req.Topics[structs.TopicAll] - } else { - keys = req.Topics[e.Topic] - } - if req.Namespace != "" && e.Namespace != "" && e.Namespace != req.Namespace { - continue - } - for _, k := range keys { - if e.Key == k || k == string(structs.TopicAll) || filterKeyContains(e.FilterKeys, k) { - count++ - } - } - } - } - - // Only allocate a new slice if some events need to be filtered out - switch count { - case 0: return nil - case len(events): + } + + allTopicKeys := req.Topics[structs.TopicAll] + + if req.Namespace == "" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) { return events } - // Return filtered events - result := make([]structs.Event, 0, count) - for _, e := range events { - _, allTopics := req.Topics[structs.TopicAll] - if _, ok := req.Topics[e.Topic]; ok || allTopics { - var keys []string - if allTopics { - keys = req.Topics[structs.TopicAll] - } else { - keys = req.Topics[e.Topic] - } - // filter out non matching namespaces - if req.Namespace != "" && e.Namespace != "" && e.Namespace != req.Namespace { + var result []structs.Event + + for _, event := range events { + if req.Namespace != "" && event.Namespace != "" && event.Namespace != req.Namespace { + continue + } + + // *[*] always matches + if len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) { + result = append(result, event) + continue + } + + keys := allTopicKeys + + if topicKeys, ok := req.Topics[event.Topic]; ok { + keys = append(keys, topicKeys...) + } + + if len(keys) == 1 && keys[0] == string(structs.TopicAll) { + result = append(result, event) + continue + } + + for _, key := range keys { + if eventMatchesKey(event, key) { + result = append(result, event) continue } - for _, k := range keys { - if e.Key == k || k == string(structs.TopicAll) || filterKeyContains(e.FilterKeys, k) { - result = append(result, e) - } - } } } + return result } -func filterKeyContains(filterKeys []string, key string) bool { - for _, fk := range filterKeys { +func eventMatchesKey(event structs.Event, key string) bool { + if event.Key == key { + return true + } + + for _, fk := range event.FilterKeys { if fk == key { return true } } + return false } diff --git a/nomad/stream/subscription_test.go b/nomad/stream/subscription_test.go index c54bc4fa5..0fc4e3af6 100644 --- a/nomad/stream/subscription_test.go +++ b/nomad/stream/subscription_test.go @@ -19,9 +19,6 @@ func TestFilter_AllTopics(t *testing.T) { } actual := filter(req, events) require.Equal(t, events, actual) - - // ensure new array was not allocated - require.Equal(t, cap(actual), 5) } func TestFilter_AllKeys(t *testing.T) { @@ -35,9 +32,6 @@ func TestFilter_AllKeys(t *testing.T) { } actual := filter(req, events) require.Equal(t, events, actual) - - // ensure new array was not allocated - require.Equal(t, cap(actual), 5) } func TestFilter_PartialMatch_Topic(t *testing.T) { @@ -53,7 +47,51 @@ func TestFilter_PartialMatch_Topic(t *testing.T) { expected := []structs.Event{{Topic: "Test", Key: "One"}, {Topic: "Test", Key: "Two"}} require.Equal(t, expected, actual) - require.Equal(t, cap(actual), 2) + require.Equal(t, 2, cap(actual)) +} + +func TestFilter_Match_TopicAll_SpecificKey(t *testing.T) { + events := []structs.Event{ + {Topic: "Match", Key: "Two"}, + {Topic: "NoMatch", Key: "One"}, + {Topic: "OtherMatch", Key: "Two"}, + } + + req := &SubscribeRequest{ + Topics: map[structs.Topic][]string{ + "*": {"Two"}, + }, + } + + actual := filter(req, events) + expected := []structs.Event{ + {Topic: "Match", Key: "Two"}, + {Topic: "OtherMatch", Key: "Two"}, + } + require.Equal(t, expected, actual) +} + +func TestFilter_Match_TopicAll_SpecificKey_Plus(t *testing.T) { + events := []structs.Event{ + {Topic: "FirstTwo", Key: "Two"}, + {Topic: "Test", Key: "One"}, + {Topic: "SecondTwo", Key: "Two"}, + } + + req := &SubscribeRequest{ + Topics: map[structs.Topic][]string{ + "*": {"Two"}, + "Test": {"One"}, + }, + } + + actual := filter(req, events) + expected := []structs.Event{ + {Topic: "FirstTwo", Key: "Two"}, + {Topic: "Test", Key: "One"}, + {Topic: "SecondTwo", Key: "Two"}, + } + require.Equal(t, expected, actual) } func TestFilter_PartialMatch_Key(t *testing.T) { @@ -69,7 +107,7 @@ func TestFilter_PartialMatch_Key(t *testing.T) { expected := []structs.Event{{Topic: "Test", Key: "One"}} require.Equal(t, expected, actual) - require.Equal(t, cap(actual), 1) + require.Equal(t, 1, cap(actual)) } func TestFilter_NoMatch(t *testing.T) { @@ -86,7 +124,7 @@ func TestFilter_NoMatch(t *testing.T) { var expected []structs.Event require.Equal(t, expected, actual) - require.Equal(t, cap(actual), 0) + require.Equal(t, 0, cap(actual)) } func TestFilter_Namespace(t *testing.T) { @@ -106,7 +144,7 @@ func TestFilter_Namespace(t *testing.T) { } require.Equal(t, expected, actual) - require.Equal(t, cap(actual), 2) + require.Equal(t, 2, cap(actual)) } func TestFilter_FilterKeys(t *testing.T) { @@ -125,5 +163,5 @@ func TestFilter_FilterKeys(t *testing.T) { } require.Equal(t, expected, actual) - require.Equal(t, cap(actual), 1) + require.Equal(t, 1, cap(actual)) }