diff --git a/nomad/state/events.go b/nomad/state/events.go index 1dee631f1..4c97e180d 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -31,6 +31,8 @@ var MsgTypeEvents = map[structs.MessageType]string{ structs.ACLRolesUpsertRequestType: structs.TypeACLRoleUpserted, structs.ACLAuthMethodsUpsertRequestType: structs.TypeACLAuthMethodUpserted, structs.ACLAuthMethodsDeleteRequestType: structs.TypeACLAuthMethodDeleted, + structs.ACLBindingRulesUpsertRequestType: structs.TypeACLBindingRuleUpserted, + structs.ACLBindingRulesDeleteRequestType: structs.TypeACLBindingRuleDeleted, structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration, structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration, structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration, @@ -105,6 +107,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { AuthMethod: before, }, }, true + case TableACLBindingRules: + before, ok := change.Before.(*structs.ACLBindingRule) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicACLBindingRule, + Key: before.ID, + FilterKeys: []string{before.AuthMethod}, + Payload: &structs.ACLBindingRuleEvent{ + ACLBindingRule: before, + }, + }, true case "nodes": before, ok := change.Before.(*structs.Node) if !ok { @@ -189,6 +204,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { AuthMethod: after, }, }, true + case TableACLBindingRules: + after, ok := change.After.(*structs.ACLBindingRule) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicACLBindingRule, + Key: after.ID, + FilterKeys: []string{after.AuthMethod}, + Payload: &structs.ACLBindingRuleEvent{ + ACLBindingRule: after, + }, + }, true case "evals": after, ok := change.After.(*structs.Evaluation) if !ok { diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 5d3a69f36..1c4f4627d 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -1104,6 +1104,58 @@ func Test_eventsFromChanges_ACLAuthMethod(t *testing.T) { must.Eq(t, authMethod, eventPayload.AuthMethod) } +func Test_eventsFromChanges_ACLBindingRule(t *testing.T) { + ci.Parallel(t) + testState := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer testState.StopEventBroker() + + // Generate a test ACL binding rule. + bindingRule := mock.ACLBindingRule() + + // Upsert the binding rule straight into state. + writeTxn := testState.db.WriteTxn(10) + updated, err := testState.upsertACLBindingRuleTxn(10, writeTxn, bindingRule, true) + must.True(t, updated) + must.NoError(t, err) + writeTxn.Txn.Commit() + + // Pull the events from the stream. + upsertChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ACLBindingRulesUpsertRequestType} + receivedChange := eventsFromChanges(writeTxn, upsertChange) + must.NotNil(t, receivedChange) + + // Check the event, and its payload are what we are expecting. + must.Len(t, 1, receivedChange.Events) + must.Eq(t, structs.TopicACLBindingRule, receivedChange.Events[0].Topic) + must.Eq(t, bindingRule.ID, receivedChange.Events[0].Key) + must.SliceContainsAll(t, []string{bindingRule.AuthMethod}, receivedChange.Events[0].FilterKeys) + must.Eq(t, structs.TypeACLBindingRuleUpserted, receivedChange.Events[0].Type) + must.Eq(t, 10, receivedChange.Events[0].Index) + + must.Eq(t, bindingRule, receivedChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule) + + // Delete the previously upserted binding rule. + deleteTxn := testState.db.WriteTxn(20) + must.NoError(t, testState.deleteACLBindingRuleTxn(deleteTxn, bindingRule.ID)) + must.NoError(t, deleteTxn.Insert(tableIndex, &IndexEntry{TableACLBindingRules, 20})) + deleteTxn.Txn.Commit() + + // Pull the events from the stream. + deleteChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ACLBindingRulesDeleteRequestType} + receivedDeleteChange := eventsFromChanges(deleteTxn, deleteChange) + must.NotNil(t, receivedDeleteChange) + + // Check the event, and its payload are what we are expecting. + must.Len(t, 1, receivedDeleteChange.Events) + must.Eq(t, structs.TopicACLBindingRule, receivedDeleteChange.Events[0].Topic) + must.Eq(t, bindingRule.ID, receivedDeleteChange.Events[0].Key) + must.SliceContainsAll(t, []string{bindingRule.AuthMethod}, receivedDeleteChange.Events[0].FilterKeys) + must.Eq(t, structs.TypeACLBindingRuleDeleted, receivedDeleteChange.Events[0].Type) + must.Eq(t, uint64(20), receivedDeleteChange.Events[0].Index) + + must.Eq(t, bindingRule, receivedDeleteChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule) +} + func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) { t.Helper() diff --git a/nomad/structs/event.go b/nomad/structs/event.go index ad6f49e0c..649e0795b 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -16,17 +16,18 @@ type EventStreamWrapper struct { type Topic string const ( - TopicDeployment Topic = "Deployment" - TopicEvaluation Topic = "Evaluation" - TopicAllocation Topic = "Allocation" - TopicJob Topic = "Job" - TopicNode Topic = "Node" - TopicACLPolicy Topic = "ACLPolicy" - TopicACLToken Topic = "ACLToken" - TopicACLRole Topic = "ACLRole" - TopicACLAuthMethod Topic = "ACLAuthMethod" - TopicService Topic = "Service" - TopicAll Topic = "*" + TopicDeployment Topic = "Deployment" + TopicEvaluation Topic = "Evaluation" + TopicAllocation Topic = "Allocation" + TopicJob Topic = "Job" + TopicNode Topic = "Node" + TopicACLPolicy Topic = "ACLPolicy" + TopicACLToken Topic = "ACLToken" + TopicACLRole Topic = "ACLRole" + TopicACLAuthMethod Topic = "ACLAuthMethod" + TopicACLBindingRule Topic = "ACLBindingRule" + TopicService Topic = "Service" + TopicAll Topic = "*" TypeNodeRegistration = "NodeRegistration" TypeNodeDeregistration = "NodeDeregistration" @@ -52,6 +53,8 @@ const ( TypeACLRoleUpserted = "ACLRoleUpserted" TypeACLAuthMethodUpserted = "ACLAuthMethodUpserted" TypeACLAuthMethodDeleted = "ACLAuthMethodDeleted" + TypeACLBindingRuleUpserted = "ACLBindingRuleUpserted" + TypeACLBindingRuleDeleted = "ACLBindingRuleDeleted" TypeServiceRegistration = "ServiceRegistration" TypeServiceDeregistration = "ServiceDeregistration" ) @@ -169,3 +172,9 @@ type ACLRoleStreamEvent struct { type ACLAuthMethodEvent struct { AuthMethod *ACLAuthMethod } + +// ACLBindingRuleEvent holds a newly updated or deleted ACL binding rule to be +// used as an event in the event stream. +type ACLBindingRuleEvent struct { + ACLBindingRule *ACLBindingRule +}