mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
events: add ACL binding rules to core events stream topics. (#15544)
This commit is contained in:
@@ -31,6 +31,8 @@ var MsgTypeEvents = map[structs.MessageType]string{
|
||||
structs.ACLRolesUpsertRequestType: structs.TypeACLRoleUpserted,
|
||||
structs.ACLAuthMethodsUpsertRequestType: structs.TypeACLAuthMethodUpserted,
|
||||
structs.ACLAuthMethodsDeleteRequestType: structs.TypeACLAuthMethodDeleted,
|
||||
structs.ACLBindingRulesUpsertRequestType: structs.TypeACLBindingRuleUpserted,
|
||||
structs.ACLBindingRulesDeleteRequestType: structs.TypeACLBindingRuleDeleted,
|
||||
structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration,
|
||||
structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration,
|
||||
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
|
||||
@@ -105,6 +107,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
||||
AuthMethod: before,
|
||||
},
|
||||
}, true
|
||||
case TableACLBindingRules:
|
||||
before, ok := change.Before.(*structs.ACLBindingRule)
|
||||
if !ok {
|
||||
return structs.Event{}, false
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicACLBindingRule,
|
||||
Key: before.ID,
|
||||
FilterKeys: []string{before.AuthMethod},
|
||||
Payload: &structs.ACLBindingRuleEvent{
|
||||
ACLBindingRule: before,
|
||||
},
|
||||
}, true
|
||||
case "nodes":
|
||||
before, ok := change.Before.(*structs.Node)
|
||||
if !ok {
|
||||
@@ -189,6 +204,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
||||
AuthMethod: after,
|
||||
},
|
||||
}, true
|
||||
case TableACLBindingRules:
|
||||
after, ok := change.After.(*structs.ACLBindingRule)
|
||||
if !ok {
|
||||
return structs.Event{}, false
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicACLBindingRule,
|
||||
Key: after.ID,
|
||||
FilterKeys: []string{after.AuthMethod},
|
||||
Payload: &structs.ACLBindingRuleEvent{
|
||||
ACLBindingRule: after,
|
||||
},
|
||||
}, true
|
||||
case "evals":
|
||||
after, ok := change.After.(*structs.Evaluation)
|
||||
if !ok {
|
||||
|
||||
@@ -1104,6 +1104,58 @@ func Test_eventsFromChanges_ACLAuthMethod(t *testing.T) {
|
||||
must.Eq(t, authMethod, eventPayload.AuthMethod)
|
||||
}
|
||||
|
||||
func Test_eventsFromChanges_ACLBindingRule(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
testState := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
||||
defer testState.StopEventBroker()
|
||||
|
||||
// Generate a test ACL binding rule.
|
||||
bindingRule := mock.ACLBindingRule()
|
||||
|
||||
// Upsert the binding rule straight into state.
|
||||
writeTxn := testState.db.WriteTxn(10)
|
||||
updated, err := testState.upsertACLBindingRuleTxn(10, writeTxn, bindingRule, true)
|
||||
must.True(t, updated)
|
||||
must.NoError(t, err)
|
||||
writeTxn.Txn.Commit()
|
||||
|
||||
// Pull the events from the stream.
|
||||
upsertChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ACLBindingRulesUpsertRequestType}
|
||||
receivedChange := eventsFromChanges(writeTxn, upsertChange)
|
||||
must.NotNil(t, receivedChange)
|
||||
|
||||
// Check the event, and its payload are what we are expecting.
|
||||
must.Len(t, 1, receivedChange.Events)
|
||||
must.Eq(t, structs.TopicACLBindingRule, receivedChange.Events[0].Topic)
|
||||
must.Eq(t, bindingRule.ID, receivedChange.Events[0].Key)
|
||||
must.SliceContainsAll(t, []string{bindingRule.AuthMethod}, receivedChange.Events[0].FilterKeys)
|
||||
must.Eq(t, structs.TypeACLBindingRuleUpserted, receivedChange.Events[0].Type)
|
||||
must.Eq(t, 10, receivedChange.Events[0].Index)
|
||||
|
||||
must.Eq(t, bindingRule, receivedChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule)
|
||||
|
||||
// Delete the previously upserted binding rule.
|
||||
deleteTxn := testState.db.WriteTxn(20)
|
||||
must.NoError(t, testState.deleteACLBindingRuleTxn(deleteTxn, bindingRule.ID))
|
||||
must.NoError(t, deleteTxn.Insert(tableIndex, &IndexEntry{TableACLBindingRules, 20}))
|
||||
deleteTxn.Txn.Commit()
|
||||
|
||||
// Pull the events from the stream.
|
||||
deleteChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ACLBindingRulesDeleteRequestType}
|
||||
receivedDeleteChange := eventsFromChanges(deleteTxn, deleteChange)
|
||||
must.NotNil(t, receivedDeleteChange)
|
||||
|
||||
// Check the event, and its payload are what we are expecting.
|
||||
must.Len(t, 1, receivedDeleteChange.Events)
|
||||
must.Eq(t, structs.TopicACLBindingRule, receivedDeleteChange.Events[0].Topic)
|
||||
must.Eq(t, bindingRule.ID, receivedDeleteChange.Events[0].Key)
|
||||
must.SliceContainsAll(t, []string{bindingRule.AuthMethod}, receivedDeleteChange.Events[0].FilterKeys)
|
||||
must.Eq(t, structs.TypeACLBindingRuleDeleted, receivedDeleteChange.Events[0].Type)
|
||||
must.Eq(t, uint64(20), receivedDeleteChange.Events[0].Index)
|
||||
|
||||
must.Eq(t, bindingRule, receivedDeleteChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule)
|
||||
}
|
||||
|
||||
func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
|
||||
t.Helper()
|
||||
|
||||
|
||||
@@ -16,17 +16,18 @@ type EventStreamWrapper struct {
|
||||
type Topic string
|
||||
|
||||
const (
|
||||
TopicDeployment Topic = "Deployment"
|
||||
TopicEvaluation Topic = "Evaluation"
|
||||
TopicAllocation Topic = "Allocation"
|
||||
TopicJob Topic = "Job"
|
||||
TopicNode Topic = "Node"
|
||||
TopicACLPolicy Topic = "ACLPolicy"
|
||||
TopicACLToken Topic = "ACLToken"
|
||||
TopicACLRole Topic = "ACLRole"
|
||||
TopicACLAuthMethod Topic = "ACLAuthMethod"
|
||||
TopicService Topic = "Service"
|
||||
TopicAll Topic = "*"
|
||||
TopicDeployment Topic = "Deployment"
|
||||
TopicEvaluation Topic = "Evaluation"
|
||||
TopicAllocation Topic = "Allocation"
|
||||
TopicJob Topic = "Job"
|
||||
TopicNode Topic = "Node"
|
||||
TopicACLPolicy Topic = "ACLPolicy"
|
||||
TopicACLToken Topic = "ACLToken"
|
||||
TopicACLRole Topic = "ACLRole"
|
||||
TopicACLAuthMethod Topic = "ACLAuthMethod"
|
||||
TopicACLBindingRule Topic = "ACLBindingRule"
|
||||
TopicService Topic = "Service"
|
||||
TopicAll Topic = "*"
|
||||
|
||||
TypeNodeRegistration = "NodeRegistration"
|
||||
TypeNodeDeregistration = "NodeDeregistration"
|
||||
@@ -52,6 +53,8 @@ const (
|
||||
TypeACLRoleUpserted = "ACLRoleUpserted"
|
||||
TypeACLAuthMethodUpserted = "ACLAuthMethodUpserted"
|
||||
TypeACLAuthMethodDeleted = "ACLAuthMethodDeleted"
|
||||
TypeACLBindingRuleUpserted = "ACLBindingRuleUpserted"
|
||||
TypeACLBindingRuleDeleted = "ACLBindingRuleDeleted"
|
||||
TypeServiceRegistration = "ServiceRegistration"
|
||||
TypeServiceDeregistration = "ServiceDeregistration"
|
||||
)
|
||||
@@ -169,3 +172,9 @@ type ACLRoleStreamEvent struct {
|
||||
type ACLAuthMethodEvent struct {
|
||||
AuthMethod *ACLAuthMethod
|
||||
}
|
||||
|
||||
// ACLBindingRuleEvent holds a newly updated or deleted ACL binding rule to be
|
||||
// used as an event in the event stream.
|
||||
type ACLBindingRuleEvent struct {
|
||||
ACLBindingRule *ACLBindingRule
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user