mirror of
https://github.com/kemko/nomad.git
synced 2026-01-09 03:45:41 +03:00
Update subscription filter func (#9232)
This adds support for specifying a global topic match for a specific key.
This commit is contained in:
@@ -124,68 +124,60 @@ func (s *Subscription) Unsubscribe() {
|
||||
// filter events to only those that match a subscriptions topic/keys/namespace
|
||||
func filter(req *SubscribeRequest, events []structs.Event) []structs.Event {
|
||||
if len(events) == 0 {
|
||||
return events
|
||||
}
|
||||
|
||||
var count int
|
||||
for _, e := range events {
|
||||
_, allTopics := req.Topics[structs.TopicAll]
|
||||
if _, ok := req.Topics[e.Topic]; ok || allTopics {
|
||||
var keys []string
|
||||
if allTopics {
|
||||
keys = req.Topics[structs.TopicAll]
|
||||
} else {
|
||||
keys = req.Topics[e.Topic]
|
||||
}
|
||||
if req.Namespace != "" && e.Namespace != "" && e.Namespace != req.Namespace {
|
||||
continue
|
||||
}
|
||||
for _, k := range keys {
|
||||
if e.Key == k || k == string(structs.TopicAll) || filterKeyContains(e.FilterKeys, k) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Only allocate a new slice if some events need to be filtered out
|
||||
switch count {
|
||||
case 0:
|
||||
return nil
|
||||
case len(events):
|
||||
}
|
||||
|
||||
allTopicKeys := req.Topics[structs.TopicAll]
|
||||
|
||||
if req.Namespace == "" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
|
||||
return events
|
||||
}
|
||||
|
||||
// Return filtered events
|
||||
result := make([]structs.Event, 0, count)
|
||||
for _, e := range events {
|
||||
_, allTopics := req.Topics[structs.TopicAll]
|
||||
if _, ok := req.Topics[e.Topic]; ok || allTopics {
|
||||
var keys []string
|
||||
if allTopics {
|
||||
keys = req.Topics[structs.TopicAll]
|
||||
} else {
|
||||
keys = req.Topics[e.Topic]
|
||||
}
|
||||
// filter out non matching namespaces
|
||||
if req.Namespace != "" && e.Namespace != "" && e.Namespace != req.Namespace {
|
||||
var result []structs.Event
|
||||
|
||||
for _, event := range events {
|
||||
if req.Namespace != "" && event.Namespace != "" && event.Namespace != req.Namespace {
|
||||
continue
|
||||
}
|
||||
|
||||
// *[*] always matches
|
||||
if len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
|
||||
result = append(result, event)
|
||||
continue
|
||||
}
|
||||
|
||||
keys := allTopicKeys
|
||||
|
||||
if topicKeys, ok := req.Topics[event.Topic]; ok {
|
||||
keys = append(keys, topicKeys...)
|
||||
}
|
||||
|
||||
if len(keys) == 1 && keys[0] == string(structs.TopicAll) {
|
||||
result = append(result, event)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
if eventMatchesKey(event, key) {
|
||||
result = append(result, event)
|
||||
continue
|
||||
}
|
||||
for _, k := range keys {
|
||||
if e.Key == k || k == string(structs.TopicAll) || filterKeyContains(e.FilterKeys, k) {
|
||||
result = append(result, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func filterKeyContains(filterKeys []string, key string) bool {
|
||||
for _, fk := range filterKeys {
|
||||
func eventMatchesKey(event structs.Event, key string) bool {
|
||||
if event.Key == key {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, fk := range event.FilterKeys {
|
||||
if fk == key {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -19,9 +19,6 @@ func TestFilter_AllTopics(t *testing.T) {
|
||||
}
|
||||
actual := filter(req, events)
|
||||
require.Equal(t, events, actual)
|
||||
|
||||
// ensure new array was not allocated
|
||||
require.Equal(t, cap(actual), 5)
|
||||
}
|
||||
|
||||
func TestFilter_AllKeys(t *testing.T) {
|
||||
@@ -35,9 +32,6 @@ func TestFilter_AllKeys(t *testing.T) {
|
||||
}
|
||||
actual := filter(req, events)
|
||||
require.Equal(t, events, actual)
|
||||
|
||||
// ensure new array was not allocated
|
||||
require.Equal(t, cap(actual), 5)
|
||||
}
|
||||
|
||||
func TestFilter_PartialMatch_Topic(t *testing.T) {
|
||||
@@ -53,7 +47,51 @@ func TestFilter_PartialMatch_Topic(t *testing.T) {
|
||||
expected := []structs.Event{{Topic: "Test", Key: "One"}, {Topic: "Test", Key: "Two"}}
|
||||
require.Equal(t, expected, actual)
|
||||
|
||||
require.Equal(t, cap(actual), 2)
|
||||
require.Equal(t, 2, cap(actual))
|
||||
}
|
||||
|
||||
func TestFilter_Match_TopicAll_SpecificKey(t *testing.T) {
|
||||
events := []structs.Event{
|
||||
{Topic: "Match", Key: "Two"},
|
||||
{Topic: "NoMatch", Key: "One"},
|
||||
{Topic: "OtherMatch", Key: "Two"},
|
||||
}
|
||||
|
||||
req := &SubscribeRequest{
|
||||
Topics: map[structs.Topic][]string{
|
||||
"*": {"Two"},
|
||||
},
|
||||
}
|
||||
|
||||
actual := filter(req, events)
|
||||
expected := []structs.Event{
|
||||
{Topic: "Match", Key: "Two"},
|
||||
{Topic: "OtherMatch", Key: "Two"},
|
||||
}
|
||||
require.Equal(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestFilter_Match_TopicAll_SpecificKey_Plus(t *testing.T) {
|
||||
events := []structs.Event{
|
||||
{Topic: "FirstTwo", Key: "Two"},
|
||||
{Topic: "Test", Key: "One"},
|
||||
{Topic: "SecondTwo", Key: "Two"},
|
||||
}
|
||||
|
||||
req := &SubscribeRequest{
|
||||
Topics: map[structs.Topic][]string{
|
||||
"*": {"Two"},
|
||||
"Test": {"One"},
|
||||
},
|
||||
}
|
||||
|
||||
actual := filter(req, events)
|
||||
expected := []structs.Event{
|
||||
{Topic: "FirstTwo", Key: "Two"},
|
||||
{Topic: "Test", Key: "One"},
|
||||
{Topic: "SecondTwo", Key: "Two"},
|
||||
}
|
||||
require.Equal(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestFilter_PartialMatch_Key(t *testing.T) {
|
||||
@@ -69,7 +107,7 @@ func TestFilter_PartialMatch_Key(t *testing.T) {
|
||||
expected := []structs.Event{{Topic: "Test", Key: "One"}}
|
||||
require.Equal(t, expected, actual)
|
||||
|
||||
require.Equal(t, cap(actual), 1)
|
||||
require.Equal(t, 1, cap(actual))
|
||||
}
|
||||
|
||||
func TestFilter_NoMatch(t *testing.T) {
|
||||
@@ -86,7 +124,7 @@ func TestFilter_NoMatch(t *testing.T) {
|
||||
var expected []structs.Event
|
||||
require.Equal(t, expected, actual)
|
||||
|
||||
require.Equal(t, cap(actual), 0)
|
||||
require.Equal(t, 0, cap(actual))
|
||||
}
|
||||
|
||||
func TestFilter_Namespace(t *testing.T) {
|
||||
@@ -106,7 +144,7 @@ func TestFilter_Namespace(t *testing.T) {
|
||||
}
|
||||
require.Equal(t, expected, actual)
|
||||
|
||||
require.Equal(t, cap(actual), 2)
|
||||
require.Equal(t, 2, cap(actual))
|
||||
}
|
||||
|
||||
func TestFilter_FilterKeys(t *testing.T) {
|
||||
@@ -125,5 +163,5 @@ func TestFilter_FilterKeys(t *testing.T) {
|
||||
}
|
||||
require.Equal(t, expected, actual)
|
||||
|
||||
require.Equal(t, cap(actual), 1)
|
||||
require.Equal(t, 1, cap(actual))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user