events: add state objects and logic for service registrations.

This commit is contained in:
James Rasell
2022-02-28 10:44:58 +01:00
parent 16033234c3
commit 12265ee9d1
5 changed files with 129 additions and 30 deletions

View File

@@ -51,6 +51,9 @@ var msgTypeNames = map[structs.MessageType]string{
structs.OneTimeTokenUpsertRequestType: "OneTimeTokenUpsertRequestType",
structs.OneTimeTokenDeleteRequestType: "OneTimeTokenDeleteRequestType",
structs.OneTimeTokenExpireRequestType: "OneTimeTokenExpireRequestType",
structs.ServiceRegistrationUpsertRequestType: "ServiceRegistrationUpsertRequestType",
structs.ServiceRegistrationDeleteByIDRequestType: "ServiceRegistrationDeleteByIDRequestType",
structs.ServiceRegistrationDeleteByNodeIDRequestType: "ServiceRegistrationDeleteByNodeIDRequestType",
structs.NamespaceUpsertRequestType: "NamespaceUpsertRequestType",
structs.NamespaceDeleteRequestType: "NamespaceDeleteRequestType",
}

View File

@@ -6,28 +6,31 @@ import (
)
var MsgTypeEvents = map[structs.MessageType]string{
structs.NodeRegisterRequestType: structs.TypeNodeRegistration,
structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration,
structs.UpsertNodeEventsType: structs.TypeNodeEvent,
structs.EvalUpdateRequestType: structs.TypeEvalUpdated,
structs.AllocClientUpdateRequestType: structs.TypeAllocationUpdated,
structs.JobRegisterRequestType: structs.TypeJobRegistered,
structs.AllocUpdateRequestType: structs.TypeAllocationUpdated,
structs.NodeUpdateStatusRequestType: structs.TypeNodeEvent,
structs.JobDeregisterRequestType: structs.TypeJobDeregistered,
structs.JobBatchDeregisterRequestType: structs.TypeJobBatchDeregistered,
structs.AllocUpdateDesiredTransitionRequestType: structs.TypeAllocationUpdateDesiredStatus,
structs.NodeUpdateEligibilityRequestType: structs.TypeNodeDrain,
structs.NodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.BatchNodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.DeploymentStatusUpdateRequestType: structs.TypeDeploymentUpdate,
structs.DeploymentPromoteRequestType: structs.TypeDeploymentPromotion,
structs.DeploymentAllocHealthRequestType: structs.TypeDeploymentAllocHealth,
structs.ApplyPlanResultsRequestType: structs.TypePlanResult,
structs.ACLTokenDeleteRequestType: structs.TypeACLTokenDeleted,
structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted,
structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted,
structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted,
structs.NodeRegisterRequestType: structs.TypeNodeRegistration,
structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration,
structs.UpsertNodeEventsType: structs.TypeNodeEvent,
structs.EvalUpdateRequestType: structs.TypeEvalUpdated,
structs.AllocClientUpdateRequestType: structs.TypeAllocationUpdated,
structs.JobRegisterRequestType: structs.TypeJobRegistered,
structs.AllocUpdateRequestType: structs.TypeAllocationUpdated,
structs.NodeUpdateStatusRequestType: structs.TypeNodeEvent,
structs.JobDeregisterRequestType: structs.TypeJobDeregistered,
structs.JobBatchDeregisterRequestType: structs.TypeJobBatchDeregistered,
structs.AllocUpdateDesiredTransitionRequestType: structs.TypeAllocationUpdateDesiredStatus,
structs.NodeUpdateEligibilityRequestType: structs.TypeNodeDrain,
structs.NodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.BatchNodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.DeploymentStatusUpdateRequestType: structs.TypeDeploymentUpdate,
structs.DeploymentPromoteRequestType: structs.TypeDeploymentPromotion,
structs.DeploymentAllocHealthRequestType: structs.TypeDeploymentAllocHealth,
structs.ApplyPlanResultsRequestType: structs.TypePlanResult,
structs.ACLTokenDeleteRequestType: structs.TypeACLTokenDeleted,
structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted,
structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted,
structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted,
structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration,
structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration,
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
}
func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
@@ -88,6 +91,23 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Node: before,
},
}, true
case TableServiceRegistrations:
before, ok := change.Before.(*structs.ServiceRegistration)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicServiceRegistration,
Key: before.ID,
FilterKeys: []string{
before.JobID,
before.ServiceName,
},
Namespace: before.Namespace,
Payload: &structs.ServiceRegistrationStreamEvent{
Service: before,
},
}, true
}
return structs.Event{}, false
}
@@ -198,6 +218,23 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Deployment: after,
},
}, true
case TableServiceRegistrations:
after, ok := change.After.(*structs.ServiceRegistration)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicServiceRegistration,
Key: after.ID,
FilterKeys: []string{
after.JobID,
after.ServiceName,
},
Namespace: after.Namespace,
Payload: &structs.ServiceRegistrationStreamEvent{
Service: after,
},
}, true
}
return structs.Event{}, false

View File

@@ -952,6 +952,53 @@ func TestNodeDrainEventFromChanges(t *testing.T) {
require.Equal(t, strat, nodeEvent.Node.DrainStrategy)
}
func Test_eventsFromChanges_ServiceRegistration(t *testing.T) {
t.Parallel()
testState := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer testState.StopEventBroker()
// Generate test service registration.
service := mock.ServiceRegistrations()[0]
// Upsert a service registration.
writeTxn := testState.db.WriteTxn(10)
updated, err := testState.upsertServiceRegistrationTxn(10, writeTxn, service)
require.True(t, updated)
require.NoError(t, err)
writeTxn.Txn.Commit()
// Pull the events from the stream.
registerChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ServiceRegistrationUpsertRequestType}
receivedChange := eventsFromChanges(writeTxn, registerChange)
// Check the event, and it's payload are what we are expecting.
require.Len(t, receivedChange.Events, 1)
require.Equal(t, structs.TopicServiceRegistration, receivedChange.Events[0].Topic)
require.Equal(t, structs.TypeServiceRegistration, receivedChange.Events[0].Type)
require.Equal(t, uint64(10), receivedChange.Events[0].Index)
eventPayload := receivedChange.Events[0].Payload.(*structs.ServiceRegistrationStreamEvent)
require.Equal(t, service, eventPayload.Service)
// Delete the previously upserted service registration.
deleteTxn := testState.db.WriteTxn(20)
require.NoError(t, testState.deleteServiceRegistrationByIDTxn(uint64(20), deleteTxn, service.Namespace, service.ID))
writeTxn.Txn.Commit()
// Pull the events from the stream.
deregisterChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ServiceRegistrationDeleteByIDRequestType}
receivedDeleteChange := eventsFromChanges(deleteTxn, deregisterChange)
// Check the event, and it's payload are what we are expecting.
require.Len(t, receivedDeleteChange.Events, 1)
require.Equal(t, structs.TopicServiceRegistration, receivedDeleteChange.Events[0].Topic)
require.Equal(t, structs.TypeServiceDeregistration, receivedDeleteChange.Events[0].Type)
require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index)
eventPayload = receivedChange.Events[0].Payload.(*structs.ServiceRegistrationStreamEvent)
require.Equal(t, service, eventPayload.Service)
}
func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()

View File

@@ -16,14 +16,15 @@ type EventStreamWrapper struct {
type Topic string
const (
TopicDeployment Topic = "Deployment"
TopicEvaluation Topic = "Evaluation"
TopicAllocation Topic = "Allocation"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicACLPolicy Topic = "ACLPolicy"
TopicACLToken Topic = "ACLToken"
TopicAll Topic = "*"
TopicDeployment Topic = "Deployment"
TopicEvaluation Topic = "Evaluation"
TopicAllocation Topic = "Allocation"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicACLPolicy Topic = "ACLPolicy"
TopicACLToken Topic = "ACLToken"
TopicServiceRegistration Topic = "ServiceRegistration"
TopicAll Topic = "*"
TypeNodeRegistration = "NodeRegistration"
TypeNodeDeregistration = "NodeDeregistration"
@@ -45,6 +46,8 @@ const (
TypeACLTokenUpserted = "ACLTokenUpserted"
TypeACLPolicyDeleted = "ACLPolicyDeleted"
TypeACLPolicyUpserted = "ACLPolicyUpserted"
TypeServiceRegistration = "ServiceRegistration"
TypeServiceDeregistration = "ServiceDeregistration"
)
// Event represents a change in Nomads state.
@@ -123,6 +126,12 @@ type ACLTokenEvent struct {
secretID string
}
// ServiceRegistrationStreamEvent holds a newly updated or deleted service
// registration.
type ServiceRegistrationStreamEvent struct {
Service *ServiceRegistration
}
// NewACLTokenEvent takes a token and creates a new ACLTokenEvent. It creates
// a copy of the passed in ACLToken and empties out the copied tokens SecretID
func NewACLTokenEvent(token *ACLToken) *ACLTokenEvent {

View File

@@ -105,6 +105,9 @@ const (
OneTimeTokenUpsertRequestType MessageType = 44
OneTimeTokenDeleteRequestType MessageType = 45
OneTimeTokenExpireRequestType MessageType = 46
ServiceRegistrationUpsertRequestType MessageType = 47
ServiceRegistrationDeleteByIDRequestType MessageType = 48
ServiceRegistrationDeleteByNodeIDRequestType MessageType = 49
// Namespace types were moved from enterprise and therefore start at 64
NamespaceUpsertRequestType MessageType = 64