diff --git a/nomad/state/deployment_events_test.go b/nomad/state/deployment_events_test.go index 6786b6d5d..0a38890d6 100644 --- a/nomad/state/deployment_events_test.go +++ b/nomad/state/deployment_events_test.go @@ -52,7 +52,7 @@ func TestDeploymentEventFromChanges(t *testing.T) { require.Equal(t, uint64(100), got.Index) require.Equal(t, d.ID, got.Key) - de := got.Payload.(*DeploymentEvent) + de := got.Payload.(*structs.DeploymentEvent) require.Equal(t, structs.DeploymentStatusPaused, de.Deployment.Status) require.Contains(t, got.FilterKeys, j.ID) diff --git a/nomad/state/events.go b/nomad/state/events.go index 7563d1db0..916ad9c43 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -5,80 +5,25 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -const ( - TypeNodeRegistration = "NodeRegistration" - TypeNodeDeregistration = "NodeDeregistration" - TypeNodeEligibilityUpdate = "NodeEligibility" - TypeNodeDrain = "NodeDrain" - TypeNodeEvent = "NodeEvent" - TypeDeploymentUpdate = "DeploymentStatusUpdate" - TypeDeploymentPromotion = "DeploymentPromotion" - TypeDeploymentAllocHealth = "DeploymentAllocHealth" - TypeAllocCreated = "AllocCreated" - TypeAllocUpdated = "AllocUpdated" - TypeAllocUpdateDesiredStatus = "AllocUpdateDesiredStatus" - TypeEvalUpdated = "EvalUpdated" - TypeJobRegistered = "JobRegistered" - TypeJobDeregistered = "JobDeregistered" - TypeJobBatchDeregistered = "JobBatchDeregistered" - TypePlanResult = "PlanResult" -) - -// JobEvent holds a newly updated Job. -type JobEvent struct { - Job *structs.Job -} - -// EvalEvent holds a newly updated Eval. -type EvalEvent struct { - Eval *structs.Evaluation -} - -// AllocEvent holds a newly updated Allocation. The -// Allocs embedded Job has been removed to reduce size. -type AllocEvent struct { - Alloc *structs.Allocation -} - -// DeploymentEvent holds a newly updated Deployment. -type DeploymentEvent struct { - Deployment *structs.Deployment -} - -// NodeEvent holds a newly updated Node -type NodeEvent struct { - Node *structs.Node -} - -type NodeDrainAllocDetails struct { - ID string - Migrate *structs.MigrateStrategy -} - -type JobDrainDetails struct { - Type string - AllocDetails map[string]NodeDrainAllocDetails -} - var MsgTypeEvents = map[structs.MessageType]string{ - structs.NodeRegisterRequestType: TypeNodeRegistration, - structs.NodeDeregisterRequestType: TypeNodeDeregistration, - structs.UpsertNodeEventsType: TypeNodeEvent, - structs.EvalUpdateRequestType: TypeEvalUpdated, - structs.AllocClientUpdateRequestType: TypeAllocUpdated, - structs.JobRegisterRequestType: TypeJobRegistered, - structs.AllocUpdateRequestType: TypeAllocUpdated, - structs.NodeUpdateStatusRequestType: TypeNodeEvent, - structs.JobDeregisterRequestType: TypeJobDeregistered, - structs.JobBatchDeregisterRequestType: TypeJobBatchDeregistered, - structs.AllocUpdateDesiredTransitionRequestType: TypeAllocUpdateDesiredStatus, - structs.NodeUpdateEligibilityRequestType: TypeNodeDrain, - structs.NodeUpdateDrainRequestType: TypeNodeDrain, - structs.BatchNodeUpdateDrainRequestType: TypeNodeDrain, - structs.DeploymentStatusUpdateRequestType: TypeDeploymentUpdate, - structs.DeploymentPromoteRequestType: TypeDeploymentPromotion, - structs.DeploymentAllocHealthRequestType: TypeDeploymentAllocHealth, - structs.ApplyPlanResultsRequestType: TypePlanResult, + structs.NodeRegisterRequestType: structs.TypeNodeRegistration, + structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration, + structs.UpsertNodeEventsType: structs.TypeNodeEvent, + structs.EvalUpdateRequestType: structs.TypeEvalUpdated, + structs.AllocClientUpdateRequestType: structs.TypeAllocUpdated, + structs.JobRegisterRequestType: structs.TypeJobRegistered, + structs.AllocUpdateRequestType: structs.TypeAllocUpdated, + structs.NodeUpdateStatusRequestType: structs.TypeNodeEvent, + structs.JobDeregisterRequestType: structs.TypeJobDeregistered, + structs.JobBatchDeregisterRequestType: structs.TypeJobBatchDeregistered, + structs.AllocUpdateDesiredTransitionRequestType: structs.TypeAllocUpdateDesiredStatus, + structs.NodeUpdateEligibilityRequestType: structs.TypeNodeDrain, + structs.NodeUpdateDrainRequestType: structs.TypeNodeDrain, + structs.BatchNodeUpdateDrainRequestType: structs.TypeNodeDrain, + structs.DeploymentStatusUpdateRequestType: structs.TypeDeploymentUpdate, + structs.DeploymentPromoteRequestType: structs.TypeDeploymentPromotion, + structs.DeploymentAllocHealthRequestType: structs.TypeDeploymentAllocHealth, + structs.ApplyPlanResultsRequestType: structs.TypePlanResult, } func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events { @@ -106,7 +51,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{ Topic: structs.TopicNode, Key: before.ID, - Payload: &NodeEvent{ + Payload: &structs.NodeStreamEvent{ Node: before, }, }, true @@ -125,7 +70,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { after.DeploymentID, }, Namespace: after.Namespace, - Payload: &EvalEvent{ + Payload: &structs.EvalEvent{ Eval: after, }, }, true @@ -146,7 +91,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Key: after.ID, FilterKeys: filterKeys, Namespace: after.Namespace, - Payload: &AllocEvent{ + Payload: &structs.AllocEvent{ Alloc: alloc, }, }, true @@ -156,7 +101,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Topic: structs.TopicJob, Key: after.ID, Namespace: after.Namespace, - Payload: &JobEvent{ + Payload: &structs.JobEvent{ Job: after, }, }, true @@ -165,7 +110,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{ Topic: structs.TopicNode, Key: after.ID, - Payload: &NodeEvent{ + Payload: &structs.NodeStreamEvent{ Node: after, }, }, true @@ -176,7 +121,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Key: after.ID, Namespace: after.Namespace, FilterKeys: []string{after.JobID}, - Payload: &DeploymentEvent{ + Payload: &structs.DeploymentEvent{ Deployment: after, }, }, true diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 056ecfff5..58cb8a0c6 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -53,7 +53,7 @@ func TestEventsFromChanges_DeploymentUpdate(t *testing.T) { require.Equal(t, uint64(100), got.Index) require.Equal(t, d.ID, got.Key) - de := got.Payload.(*DeploymentEvent) + de := got.Payload.(*structs.DeploymentEvent) require.Equal(t, structs.DeploymentStatusPaused, de.Deployment.Status) require.Contains(t, got.FilterKeys, j.ID) } @@ -130,9 +130,9 @@ func TestEventsFromChanges_DeploymentPromotion(t *testing.T) { require.Equal(t, uint64(100), got.Index) require.Equal(t, d.ID, got.Key) - de := got.Payload.(*DeploymentEvent) + de := got.Payload.(*structs.DeploymentEvent) require.Equal(t, structs.DeploymentStatusRunning, de.Deployment.Status) - require.Equal(t, TypeDeploymentPromotion, got.Type) + require.Equal(t, structs.TypeDeploymentPromotion, got.Type) } func TestEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T) { @@ -218,14 +218,14 @@ func TestEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T) { require.Len(t, allocEvents, 2) for _, e := range allocEvents { require.Equal(t, 100, int(e.Index)) - require.Equal(t, TypeDeploymentAllocHealth, e.Type) + require.Equal(t, structs.TypeDeploymentAllocHealth, e.Type) require.Equal(t, structs.TopicAlloc, e.Topic) } require.Len(t, deploymentEvent, 1) for _, e := range deploymentEvent { require.Equal(t, 100, int(e.Index)) - require.Equal(t, TypeDeploymentAllocHealth, e.Type) + require.Equal(t, structs.TypeDeploymentAllocHealth, e.Type) require.Equal(t, structs.TopicDeployment, e.Topic) require.Equal(t, d.ID, e.Key) } @@ -265,8 +265,8 @@ func TestEventsFromChanges_UpsertNodeEventsType(t *testing.T) { for _, e := range events { require.Equal(t, structs.TopicNode, e.Topic) - require.Equal(t, TypeNodeEvent, e.Type) - event := e.Payload.(*NodeEvent) + require.Equal(t, structs.TypeNodeEvent, e.Type) + event := e.Payload.(*structs.NodeStreamEvent) require.Equal(t, "update", event.Node.Events[len(event.Node.Events)-1].Message) } @@ -297,8 +297,8 @@ func TestEventsFromChanges_NodeUpdateStatusRequest(t *testing.T) { e := events[0] require.Equal(t, structs.TopicNode, e.Topic) - require.Equal(t, TypeNodeEvent, e.Type) - event := e.Payload.(*NodeEvent) + require.Equal(t, structs.TypeNodeEvent, e.Type) + event := e.Payload.(*structs.NodeStreamEvent) require.Equal(t, "down", event.Node.Events[len(event.Node.Events)-1].Message) require.Equal(t, structs.NodeStatusDown, event.Node.Status) } @@ -330,10 +330,10 @@ func TestEventsFromChanges_EvalUpdateRequestType(t *testing.T) { e := events[0] require.Equal(t, structs.TopicEval, e.Topic) - require.Equal(t, TypeEvalUpdated, e.Type) + require.Equal(t, structs.TypeEvalUpdated, e.Type) require.Contains(t, e.FilterKeys, e2.JobID) require.Contains(t, e.FilterKeys, e2.DeploymentID) - event := e.Payload.(*EvalEvent) + event := e.Payload.(*structs.EvalEvent) require.Equal(t, "blocked", event.Eval.Status) } @@ -390,7 +390,7 @@ func TestEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) { } else if e.Topic == structs.TopicDeployment { deploys = append(deploys, e) } - require.Equal(t, TypePlanResult, e.Type) + require.Equal(t, structs.TypePlanResult, e.Type) } require.Len(t, allocs, 2) require.Len(t, evals, 1) @@ -446,9 +446,9 @@ func TestEventsFromChanges_BatchNodeUpdateDrainRequestType(t *testing.T) { for _, e := range events { require.Equal(t, 100, int(e.Index)) - require.Equal(t, TypeNodeDrain, e.Type) + require.Equal(t, structs.TypeNodeDrain, e.Type) require.Equal(t, structs.TopicNode, e.Topic) - ne := e.Payload.(*NodeEvent) + ne := e.Payload.(*structs.NodeStreamEvent) require.Equal(t, event.Message, ne.Node.Events[len(ne.Node.Events)-1].Message) } } @@ -485,9 +485,9 @@ func TestEventsFromChanges_NodeUpdateEligibilityRequestType(t *testing.T) { for _, e := range events { require.Equal(t, 100, int(e.Index)) - require.Equal(t, TypeNodeDrain, e.Type) + require.Equal(t, structs.TypeNodeDrain, e.Type) require.Equal(t, structs.TopicNode, e.Topic) - ne := e.Payload.(*NodeEvent) + ne := e.Payload.(*structs.NodeStreamEvent) require.Equal(t, event.Message, ne.Node.Events[len(ne.Node.Events)-1].Message) require.Equal(t, structs.NodeSchedulingIneligible, ne.Node.SchedulingEligibility) } @@ -540,7 +540,7 @@ func TestEventsFromChanges_AllocUpdateDesiredTransitionRequestType(t *testing.T) require.Fail(t, "unexpected event type") } - require.Equal(t, TypeAllocUpdateDesiredStatus, e.Type) + require.Equal(t, structs.TypeAllocUpdateDesiredStatus, e.Type) } require.Len(t, allocs, 1) @@ -614,14 +614,14 @@ func TestEventsFromChanges_WithNodeDeregistration(t *testing.T) { event := actual.Events[0] - require.Equal(t, TypeNodeDeregistration, event.Type) + require.Equal(t, structs.TypeNodeDeregistration, event.Type) require.Equal(t, uint64(1), event.Index) require.Equal(t, structs.TopicNode, event.Topic) require.Equal(t, "some-id", event.Key) require.Len(t, event.FilterKeys, 0) - nodeEvent, ok := event.Payload.(*NodeEvent) + nodeEvent, ok := event.Payload.(*structs.NodeStreamEvent) require.True(t, ok) require.Equal(t, *before, *nodeEvent.Node) } @@ -644,10 +644,10 @@ func TestNodeEventsFromChanges(t *testing.T) { }, WantEvents: []structs.Event{{ Topic: structs.TopicNode, - Type: TypeNodeRegistration, + Type: structs.TypeNodeRegistration, Key: testNodeID(), Index: 100, - Payload: &NodeEvent{ + Payload: &structs.NodeStreamEvent{ Node: testNode(), }, }}, @@ -661,10 +661,10 @@ func TestNodeEventsFromChanges(t *testing.T) { }, WantEvents: []structs.Event{{ Topic: structs.TopicNode, - Type: TypeNodeRegistration, + Type: structs.TypeNodeRegistration, Key: testNodeID(), Index: 100, - Payload: &NodeEvent{ + Payload: &structs.NodeStreamEvent{ Node: testNode(nodeNotReady), }, }}, @@ -681,10 +681,10 @@ func TestNodeEventsFromChanges(t *testing.T) { }, WantEvents: []structs.Event{{ Topic: structs.TopicNode, - Type: TypeNodeDeregistration, + Type: structs.TypeNodeDeregistration, Key: testNodeID(), Index: 100, - Payload: &NodeEvent{ + Payload: &structs.NodeStreamEvent{ Node: testNode(), }, }}, @@ -703,19 +703,19 @@ func TestNodeEventsFromChanges(t *testing.T) { WantEvents: []structs.Event{ { Topic: structs.TopicNode, - Type: TypeNodeDeregistration, + Type: structs.TypeNodeDeregistration, Key: testNodeID(), Index: 100, - Payload: &NodeEvent{ + Payload: &structs.NodeStreamEvent{ Node: testNode(), }, }, { Topic: structs.TopicNode, - Type: TypeNodeDeregistration, + Type: structs.TypeNodeDeregistration, Key: testNodeIDTwo(), Index: 100, - Payload: &NodeEvent{ + Payload: &structs.NodeStreamEvent{ Node: testNode(nodeIDTwo), }, }, @@ -754,19 +754,19 @@ func TestNodeEventsFromChanges(t *testing.T) { WantEvents: []structs.Event{ { Topic: structs.TopicNode, - Type: TypeNodeEvent, + Type: structs.TypeNodeEvent, Key: testNodeID(), Index: 100, - Payload: &NodeEvent{ + Payload: &structs.NodeStreamEvent{ Node: testNode(), }, }, { Topic: structs.TopicNode, - Type: TypeNodeEvent, + Type: structs.TypeNodeEvent, Key: testNodeIDTwo(), Index: 100, - Payload: &NodeEvent{ + Payload: &structs.NodeStreamEvent{ Node: testNode(nodeIDTwo), }, }, @@ -857,10 +857,10 @@ func TestNodeDrainEventFromChanges(t *testing.T) { require.Len(t, got.Events, 1) require.Equal(t, structs.TopicNode, got.Events[0].Topic) - require.Equal(t, TypeNodeDrain, got.Events[0].Type) + require.Equal(t, structs.TypeNodeDrain, got.Events[0].Type) require.Equal(t, uint64(100), got.Events[0].Index) - nodeEvent, ok := got.Events[0].Payload.(*NodeEvent) + nodeEvent, ok := got.Events[0].Payload.(*structs.NodeStreamEvent) require.True(t, ok) require.Equal(t, structs.NodeSchedulingIneligible, nodeEvent.Node.SchedulingEligibility) @@ -870,8 +870,8 @@ func TestNodeDrainEventFromChanges(t *testing.T) { func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) { t.Helper() - wantPayload := want.Payload.(*NodeEvent) - gotPayload := got.Payload.(*NodeEvent) + wantPayload := want.Payload.(*structs.NodeStreamEvent) + gotPayload := got.Payload.(*structs.NodeStreamEvent) // Check payload equality for the fields that we can easily control require.Equal(t, wantPayload.Node.Status, gotPayload.Node.Status) @@ -882,15 +882,15 @@ func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) { func requireNodeDeregistrationEventEqual(t *testing.T, want, got structs.Event) { t.Helper() - wantPayload := want.Payload.(*NodeEvent) - gotPayload := got.Payload.(*NodeEvent) + wantPayload := want.Payload.(*structs.NodeStreamEvent) + gotPayload := got.Payload.(*structs.NodeStreamEvent) require.Equal(t, wantPayload.Node.ID, gotPayload.Node.ID) require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events) } func requireNodeEventEqual(t *testing.T, want, got structs.Event) { - gotPayload := got.Payload.(*NodeEvent) + gotPayload := got.Payload.(*structs.NodeStreamEvent) require.Len(t, gotPayload.Node.Events, 3) } diff --git a/nomad/structs/event.go b/nomad/structs/event.go index cb506ee38..5e7e1ff77 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -31,6 +31,23 @@ const ( TopicJob Topic = "Job" TopicNode Topic = "Node" TopicAll Topic = "*" + + TypeNodeRegistration = "NodeRegistration" + TypeNodeDeregistration = "NodeDeregistration" + TypeNodeEligibilityUpdate = "NodeEligibility" + TypeNodeDrain = "NodeDrain" + TypeNodeEvent = "NodeStreamEvent" + TypeDeploymentUpdate = "DeploymentStatusUpdate" + TypeDeploymentPromotion = "DeploymentPromotion" + TypeDeploymentAllocHealth = "DeploymentAllocHealth" + TypeAllocCreated = "AllocCreated" + TypeAllocUpdated = "AllocUpdated" + TypeAllocUpdateDesiredStatus = "AllocUpdateDesiredStatus" + TypeEvalUpdated = "EvalUpdated" + TypeJobRegistered = "JobRegistered" + TypeJobDeregistered = "JobDeregistered" + TypeJobBatchDeregistered = "JobBatchDeregistered" + TypePlanResult = "PlanResult" ) // Event represents a change in Nomads state. @@ -167,3 +184,29 @@ func (e *EventSink) EqualSubscriptionValues(old *EventSink) bool { e.Type == old.Type && reflect.DeepEqual(e.Topics, old.Topics) } + +// JobEvent holds a newly updated Job. +type JobEvent struct { + Job *Job +} + +// EvalEvent holds a newly updated Eval. +type EvalEvent struct { + Eval *Evaluation +} + +// AllocEvent holds a newly updated Allocation. The +// Allocs embedded Job has been removed to reduce size. +type AllocEvent struct { + Alloc *Allocation +} + +// DeploymentEvent holds a newly updated Deployment. +type DeploymentEvent struct { + Deployment *Deployment +} + +// NodeStreamEvent holds a newly updated Node +type NodeStreamEvent struct { + Node *Node +}