From 85baf8f5ae214b2908c7720c14368ceff362e584 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Tue, 5 Apr 2022 08:25:22 +0100 Subject: [PATCH 1/2] events: fixup service events and rename topic to service. --- nomad/state/events.go | 4 ++-- nomad/state/events_test.go | 4 ++-- nomad/stream/event_broker.go | 3 ++- nomad/structs/event.go | 18 +++++++++--------- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/nomad/state/events.go b/nomad/state/events.go index 863bf0c6c..fc448babd 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -97,7 +97,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{}, false } return structs.Event{ - Topic: structs.TopicServiceRegistration, + Topic: structs.TopicService, Key: before.ID, FilterKeys: []string{ before.JobID, @@ -224,7 +224,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{}, false } return structs.Event{ - Topic: structs.TopicServiceRegistration, + Topic: structs.TopicService, Key: after.ID, FilterKeys: []string{ after.JobID, diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 7e25a6194..1af2cc6cc 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -976,7 +976,7 @@ func Test_eventsFromChanges_ServiceRegistration(t *testing.T) { // Check the event, and it's payload are what we are expecting. require.Len(t, receivedChange.Events, 1) - require.Equal(t, structs.TopicServiceRegistration, receivedChange.Events[0].Topic) + require.Equal(t, structs.TopicService, receivedChange.Events[0].Topic) require.Equal(t, structs.TypeServiceRegistration, receivedChange.Events[0].Type) require.Equal(t, uint64(10), receivedChange.Events[0].Index) @@ -994,7 +994,7 @@ func Test_eventsFromChanges_ServiceRegistration(t *testing.T) { // Check the event, and it's payload are what we are expecting. require.Len(t, receivedDeleteChange.Events, 1) - require.Equal(t, structs.TopicServiceRegistration, receivedDeleteChange.Events[0].Topic) + require.Equal(t, structs.TopicService, receivedDeleteChange.Events[0].Topic) require.Equal(t, structs.TypeServiceDeregistration, receivedDeleteChange.Events[0].Type) require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index) diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 793a7867a..e619968e0 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -299,7 +299,8 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool { case structs.TopicDeployment, structs.TopicEvaluation, structs.TopicAllocation, - structs.TopicJob: + structs.TopicJob, + structs.TopicService: if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok { return false } diff --git a/nomad/structs/event.go b/nomad/structs/event.go index 542f3b47c..5b55a4612 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -16,15 +16,15 @@ type EventStreamWrapper struct { type Topic string const ( - TopicDeployment Topic = "Deployment" - TopicEvaluation Topic = "Evaluation" - TopicAllocation Topic = "Allocation" - TopicJob Topic = "Job" - TopicNode Topic = "Node" - TopicACLPolicy Topic = "ACLPolicy" - TopicACLToken Topic = "ACLToken" - TopicServiceRegistration Topic = "ServiceRegistration" - TopicAll Topic = "*" + TopicDeployment Topic = "Deployment" + TopicEvaluation Topic = "Evaluation" + TopicAllocation Topic = "Allocation" + TopicJob Topic = "Job" + TopicNode Topic = "Node" + TopicACLPolicy Topic = "ACLPolicy" + TopicACLToken Topic = "ACLToken" + TopicService Topic = "Service" + TopicAll Topic = "*" TypeNodeRegistration = "NodeRegistration" TypeNodeDeregistration = "NodeDeregistration" From cebe704572b4325e04de44e70e2e6f83d6754440 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Tue, 5 Apr 2022 08:26:02 +0100 Subject: [PATCH 2/2] events: add API helpers for service events stream topics. --- api/event_stream.go | 22 +++++++++++++++++----- api/event_stream_test.go | 12 ++++++++++++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/api/event_stream.go b/api/event_stream.go index 6c6a853ca..10825b5c8 100644 --- a/api/event_stream.go +++ b/api/event_stream.go @@ -16,6 +16,7 @@ const ( TopicAllocation Topic = "Allocation" TopicJob Topic = "Job" TopicNode Topic = "Node" + TopicService Topic = "Service" TopicAll Topic = "*" ) @@ -91,12 +92,23 @@ func (e *Event) Node() (*Node, error) { return out.Node, nil } +// Service returns a ServiceRegistration struct from a given event payload. If +// the Event Topic is Service this will return a valid ServiceRegistration. +func (e *Event) Service() (*ServiceRegistration, error) { + out, err := e.decodePayload() + if err != nil { + return nil, err + } + return out.Service, nil +} + type eventPayload struct { - Allocation *Allocation `mapstructure:"Allocation"` - Deployment *Deployment `mapstructure:"Deployment"` - Evaluation *Evaluation `mapstructure:"Evaluation"` - Job *Job `mapstructure:"Job"` - Node *Node `mapstructure:"Node"` + Allocation *Allocation `mapstructure:"Allocation"` + Deployment *Deployment `mapstructure:"Deployment"` + Evaluation *Evaluation `mapstructure:"Evaluation"` + Job *Job `mapstructure:"Job"` + Node *Node `mapstructure:"Node"` + Service *ServiceRegistration `mapstructure:"Service"` } func (e *Event) decodePayload() (*eventPayload, error) { diff --git a/api/event_stream_test.go b/api/event_stream_test.go index d3e29fbe1..d0f55f91f 100644 --- a/api/event_stream_test.go +++ b/api/event_stream_test.go @@ -258,6 +258,18 @@ func TestEventStream_PayloadValueHelpers(t *testing.T) { }, n) }, }, + { + desc: "service", + input: []byte(`{"Topic": "Service", "Payload": {"Service":{"ID":"some-service-id","Namespace":"some-service-namespace-id","Datacenter":"us-east-1a"}}}`), + expectFn: func(t *testing.T, event Event) { + require.Equal(t, TopicService, event.Topic) + a, err := event.Service() + require.NoError(t, err) + require.Equal(t, "us-east-1a", a.Datacenter) + require.Equal(t, "some-service-id", a.ID) + require.Equal(t, "some-service-namespace-id", a.Namespace) + }, + }, } for _, tc := range testCases {