mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 09:55:44 +03:00
Merge pull request #12454 from hashicorp/f-rename-service-event-stream
events: add service API logic and rename topic to service from serviceregistration
This commit is contained in:
@@ -16,6 +16,7 @@ const (
|
||||
TopicAllocation Topic = "Allocation"
|
||||
TopicJob Topic = "Job"
|
||||
TopicNode Topic = "Node"
|
||||
TopicService Topic = "Service"
|
||||
TopicAll Topic = "*"
|
||||
)
|
||||
|
||||
@@ -91,12 +92,23 @@ func (e *Event) Node() (*Node, error) {
|
||||
return out.Node, nil
|
||||
}
|
||||
|
||||
// Service returns a ServiceRegistration struct from a given event payload. If
|
||||
// the Event Topic is Service this will return a valid ServiceRegistration.
|
||||
func (e *Event) Service() (*ServiceRegistration, error) {
|
||||
out, err := e.decodePayload()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out.Service, nil
|
||||
}
|
||||
|
||||
type eventPayload struct {
|
||||
Allocation *Allocation `mapstructure:"Allocation"`
|
||||
Deployment *Deployment `mapstructure:"Deployment"`
|
||||
Evaluation *Evaluation `mapstructure:"Evaluation"`
|
||||
Job *Job `mapstructure:"Job"`
|
||||
Node *Node `mapstructure:"Node"`
|
||||
Allocation *Allocation `mapstructure:"Allocation"`
|
||||
Deployment *Deployment `mapstructure:"Deployment"`
|
||||
Evaluation *Evaluation `mapstructure:"Evaluation"`
|
||||
Job *Job `mapstructure:"Job"`
|
||||
Node *Node `mapstructure:"Node"`
|
||||
Service *ServiceRegistration `mapstructure:"Service"`
|
||||
}
|
||||
|
||||
func (e *Event) decodePayload() (*eventPayload, error) {
|
||||
|
||||
@@ -258,6 +258,18 @@ func TestEventStream_PayloadValueHelpers(t *testing.T) {
|
||||
}, n)
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "service",
|
||||
input: []byte(`{"Topic": "Service", "Payload": {"Service":{"ID":"some-service-id","Namespace":"some-service-namespace-id","Datacenter":"us-east-1a"}}}`),
|
||||
expectFn: func(t *testing.T, event Event) {
|
||||
require.Equal(t, TopicService, event.Topic)
|
||||
a, err := event.Service()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "us-east-1a", a.Datacenter)
|
||||
require.Equal(t, "some-service-id", a.ID)
|
||||
require.Equal(t, "some-service-namespace-id", a.Namespace)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
|
||||
@@ -97,7 +97,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
||||
return structs.Event{}, false
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicServiceRegistration,
|
||||
Topic: structs.TopicService,
|
||||
Key: before.ID,
|
||||
FilterKeys: []string{
|
||||
before.JobID,
|
||||
@@ -224,7 +224,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
||||
return structs.Event{}, false
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicServiceRegistration,
|
||||
Topic: structs.TopicService,
|
||||
Key: after.ID,
|
||||
FilterKeys: []string{
|
||||
after.JobID,
|
||||
|
||||
@@ -976,7 +976,7 @@ func Test_eventsFromChanges_ServiceRegistration(t *testing.T) {
|
||||
|
||||
// Check the event, and it's payload are what we are expecting.
|
||||
require.Len(t, receivedChange.Events, 1)
|
||||
require.Equal(t, structs.TopicServiceRegistration, receivedChange.Events[0].Topic)
|
||||
require.Equal(t, structs.TopicService, receivedChange.Events[0].Topic)
|
||||
require.Equal(t, structs.TypeServiceRegistration, receivedChange.Events[0].Type)
|
||||
require.Equal(t, uint64(10), receivedChange.Events[0].Index)
|
||||
|
||||
@@ -994,7 +994,7 @@ func Test_eventsFromChanges_ServiceRegistration(t *testing.T) {
|
||||
|
||||
// Check the event, and it's payload are what we are expecting.
|
||||
require.Len(t, receivedDeleteChange.Events, 1)
|
||||
require.Equal(t, structs.TopicServiceRegistration, receivedDeleteChange.Events[0].Topic)
|
||||
require.Equal(t, structs.TopicService, receivedDeleteChange.Events[0].Topic)
|
||||
require.Equal(t, structs.TypeServiceDeregistration, receivedDeleteChange.Events[0].Type)
|
||||
require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index)
|
||||
|
||||
|
||||
@@ -299,7 +299,8 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
|
||||
case structs.TopicDeployment,
|
||||
structs.TopicEvaluation,
|
||||
structs.TopicAllocation,
|
||||
structs.TopicJob:
|
||||
structs.TopicJob,
|
||||
structs.TopicService:
|
||||
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -16,15 +16,15 @@ type EventStreamWrapper struct {
|
||||
type Topic string
|
||||
|
||||
const (
|
||||
TopicDeployment Topic = "Deployment"
|
||||
TopicEvaluation Topic = "Evaluation"
|
||||
TopicAllocation Topic = "Allocation"
|
||||
TopicJob Topic = "Job"
|
||||
TopicNode Topic = "Node"
|
||||
TopicACLPolicy Topic = "ACLPolicy"
|
||||
TopicACLToken Topic = "ACLToken"
|
||||
TopicServiceRegistration Topic = "ServiceRegistration"
|
||||
TopicAll Topic = "*"
|
||||
TopicDeployment Topic = "Deployment"
|
||||
TopicEvaluation Topic = "Evaluation"
|
||||
TopicAllocation Topic = "Allocation"
|
||||
TopicJob Topic = "Job"
|
||||
TopicNode Topic = "Node"
|
||||
TopicACLPolicy Topic = "ACLPolicy"
|
||||
TopicACLToken Topic = "ACLToken"
|
||||
TopicService Topic = "Service"
|
||||
TopicAll Topic = "*"
|
||||
|
||||
TypeNodeRegistration = "NodeRegistration"
|
||||
TypeNodeDeregistration = "NodeDeregistration"
|
||||
|
||||
Reference in New Issue
Block a user