Events/mv structs (#9430)

* move structs to structs/event.go to avoid import cycle
This commit is contained in:
Drew Bailey
2020-11-23 14:01:10 -05:00
committed by GitHub
parent 9591b2b29d
commit ddb7e8299d
4 changed files with 108 additions and 120 deletions

View File

@@ -52,7 +52,7 @@ func TestDeploymentEventFromChanges(t *testing.T) {
require.Equal(t, uint64(100), got.Index)
require.Equal(t, d.ID, got.Key)
de := got.Payload.(*DeploymentEvent)
de := got.Payload.(*structs.DeploymentEvent)
require.Equal(t, structs.DeploymentStatusPaused, de.Deployment.Status)
require.Contains(t, got.FilterKeys, j.ID)

View File

@@ -5,80 +5,25 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
const (
TypeNodeRegistration = "NodeRegistration"
TypeNodeDeregistration = "NodeDeregistration"
TypeNodeEligibilityUpdate = "NodeEligibility"
TypeNodeDrain = "NodeDrain"
TypeNodeEvent = "NodeEvent"
TypeDeploymentUpdate = "DeploymentStatusUpdate"
TypeDeploymentPromotion = "DeploymentPromotion"
TypeDeploymentAllocHealth = "DeploymentAllocHealth"
TypeAllocCreated = "AllocCreated"
TypeAllocUpdated = "AllocUpdated"
TypeAllocUpdateDesiredStatus = "AllocUpdateDesiredStatus"
TypeEvalUpdated = "EvalUpdated"
TypeJobRegistered = "JobRegistered"
TypeJobDeregistered = "JobDeregistered"
TypeJobBatchDeregistered = "JobBatchDeregistered"
TypePlanResult = "PlanResult"
)
// JobEvent holds a newly updated Job.
type JobEvent struct {
Job *structs.Job
}
// EvalEvent holds a newly updated Eval.
type EvalEvent struct {
Eval *structs.Evaluation
}
// AllocEvent holds a newly updated Allocation. The
// Allocs embedded Job has been removed to reduce size.
type AllocEvent struct {
Alloc *structs.Allocation
}
// DeploymentEvent holds a newly updated Deployment.
type DeploymentEvent struct {
Deployment *structs.Deployment
}
// NodeEvent holds a newly updated Node
type NodeEvent struct {
Node *structs.Node
}
type NodeDrainAllocDetails struct {
ID string
Migrate *structs.MigrateStrategy
}
type JobDrainDetails struct {
Type string
AllocDetails map[string]NodeDrainAllocDetails
}
var MsgTypeEvents = map[structs.MessageType]string{
structs.NodeRegisterRequestType: TypeNodeRegistration,
structs.NodeDeregisterRequestType: TypeNodeDeregistration,
structs.UpsertNodeEventsType: TypeNodeEvent,
structs.EvalUpdateRequestType: TypeEvalUpdated,
structs.AllocClientUpdateRequestType: TypeAllocUpdated,
structs.JobRegisterRequestType: TypeJobRegistered,
structs.AllocUpdateRequestType: TypeAllocUpdated,
structs.NodeUpdateStatusRequestType: TypeNodeEvent,
structs.JobDeregisterRequestType: TypeJobDeregistered,
structs.JobBatchDeregisterRequestType: TypeJobBatchDeregistered,
structs.AllocUpdateDesiredTransitionRequestType: TypeAllocUpdateDesiredStatus,
structs.NodeUpdateEligibilityRequestType: TypeNodeDrain,
structs.NodeUpdateDrainRequestType: TypeNodeDrain,
structs.BatchNodeUpdateDrainRequestType: TypeNodeDrain,
structs.DeploymentStatusUpdateRequestType: TypeDeploymentUpdate,
structs.DeploymentPromoteRequestType: TypeDeploymentPromotion,
structs.DeploymentAllocHealthRequestType: TypeDeploymentAllocHealth,
structs.ApplyPlanResultsRequestType: TypePlanResult,
structs.NodeRegisterRequestType: structs.TypeNodeRegistration,
structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration,
structs.UpsertNodeEventsType: structs.TypeNodeEvent,
structs.EvalUpdateRequestType: structs.TypeEvalUpdated,
structs.AllocClientUpdateRequestType: structs.TypeAllocUpdated,
structs.JobRegisterRequestType: structs.TypeJobRegistered,
structs.AllocUpdateRequestType: structs.TypeAllocUpdated,
structs.NodeUpdateStatusRequestType: structs.TypeNodeEvent,
structs.JobDeregisterRequestType: structs.TypeJobDeregistered,
structs.JobBatchDeregisterRequestType: structs.TypeJobBatchDeregistered,
structs.AllocUpdateDesiredTransitionRequestType: structs.TypeAllocUpdateDesiredStatus,
structs.NodeUpdateEligibilityRequestType: structs.TypeNodeDrain,
structs.NodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.BatchNodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.DeploymentStatusUpdateRequestType: structs.TypeDeploymentUpdate,
structs.DeploymentPromoteRequestType: structs.TypeDeploymentPromotion,
structs.DeploymentAllocHealthRequestType: structs.TypeDeploymentAllocHealth,
structs.ApplyPlanResultsRequestType: structs.TypePlanResult,
}
func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
@@ -106,7 +51,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
return structs.Event{
Topic: structs.TopicNode,
Key: before.ID,
Payload: &NodeEvent{
Payload: &structs.NodeStreamEvent{
Node: before,
},
}, true
@@ -125,7 +70,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
after.DeploymentID,
},
Namespace: after.Namespace,
Payload: &EvalEvent{
Payload: &structs.EvalEvent{
Eval: after,
},
}, true
@@ -146,7 +91,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Key: after.ID,
FilterKeys: filterKeys,
Namespace: after.Namespace,
Payload: &AllocEvent{
Payload: &structs.AllocEvent{
Alloc: alloc,
},
}, true
@@ -156,7 +101,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Topic: structs.TopicJob,
Key: after.ID,
Namespace: after.Namespace,
Payload: &JobEvent{
Payload: &structs.JobEvent{
Job: after,
},
}, true
@@ -165,7 +110,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
return structs.Event{
Topic: structs.TopicNode,
Key: after.ID,
Payload: &NodeEvent{
Payload: &structs.NodeStreamEvent{
Node: after,
},
}, true
@@ -176,7 +121,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Key: after.ID,
Namespace: after.Namespace,
FilterKeys: []string{after.JobID},
Payload: &DeploymentEvent{
Payload: &structs.DeploymentEvent{
Deployment: after,
},
}, true

View File

@@ -53,7 +53,7 @@ func TestEventsFromChanges_DeploymentUpdate(t *testing.T) {
require.Equal(t, uint64(100), got.Index)
require.Equal(t, d.ID, got.Key)
de := got.Payload.(*DeploymentEvent)
de := got.Payload.(*structs.DeploymentEvent)
require.Equal(t, structs.DeploymentStatusPaused, de.Deployment.Status)
require.Contains(t, got.FilterKeys, j.ID)
}
@@ -130,9 +130,9 @@ func TestEventsFromChanges_DeploymentPromotion(t *testing.T) {
require.Equal(t, uint64(100), got.Index)
require.Equal(t, d.ID, got.Key)
de := got.Payload.(*DeploymentEvent)
de := got.Payload.(*structs.DeploymentEvent)
require.Equal(t, structs.DeploymentStatusRunning, de.Deployment.Status)
require.Equal(t, TypeDeploymentPromotion, got.Type)
require.Equal(t, structs.TypeDeploymentPromotion, got.Type)
}
func TestEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T) {
@@ -218,14 +218,14 @@ func TestEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T) {
require.Len(t, allocEvents, 2)
for _, e := range allocEvents {
require.Equal(t, 100, int(e.Index))
require.Equal(t, TypeDeploymentAllocHealth, e.Type)
require.Equal(t, structs.TypeDeploymentAllocHealth, e.Type)
require.Equal(t, structs.TopicAlloc, e.Topic)
}
require.Len(t, deploymentEvent, 1)
for _, e := range deploymentEvent {
require.Equal(t, 100, int(e.Index))
require.Equal(t, TypeDeploymentAllocHealth, e.Type)
require.Equal(t, structs.TypeDeploymentAllocHealth, e.Type)
require.Equal(t, structs.TopicDeployment, e.Topic)
require.Equal(t, d.ID, e.Key)
}
@@ -265,8 +265,8 @@ func TestEventsFromChanges_UpsertNodeEventsType(t *testing.T) {
for _, e := range events {
require.Equal(t, structs.TopicNode, e.Topic)
require.Equal(t, TypeNodeEvent, e.Type)
event := e.Payload.(*NodeEvent)
require.Equal(t, structs.TypeNodeEvent, e.Type)
event := e.Payload.(*structs.NodeStreamEvent)
require.Equal(t, "update", event.Node.Events[len(event.Node.Events)-1].Message)
}
@@ -297,8 +297,8 @@ func TestEventsFromChanges_NodeUpdateStatusRequest(t *testing.T) {
e := events[0]
require.Equal(t, structs.TopicNode, e.Topic)
require.Equal(t, TypeNodeEvent, e.Type)
event := e.Payload.(*NodeEvent)
require.Equal(t, structs.TypeNodeEvent, e.Type)
event := e.Payload.(*structs.NodeStreamEvent)
require.Equal(t, "down", event.Node.Events[len(event.Node.Events)-1].Message)
require.Equal(t, structs.NodeStatusDown, event.Node.Status)
}
@@ -330,10 +330,10 @@ func TestEventsFromChanges_EvalUpdateRequestType(t *testing.T) {
e := events[0]
require.Equal(t, structs.TopicEval, e.Topic)
require.Equal(t, TypeEvalUpdated, e.Type)
require.Equal(t, structs.TypeEvalUpdated, e.Type)
require.Contains(t, e.FilterKeys, e2.JobID)
require.Contains(t, e.FilterKeys, e2.DeploymentID)
event := e.Payload.(*EvalEvent)
event := e.Payload.(*structs.EvalEvent)
require.Equal(t, "blocked", event.Eval.Status)
}
@@ -390,7 +390,7 @@ func TestEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) {
} else if e.Topic == structs.TopicDeployment {
deploys = append(deploys, e)
}
require.Equal(t, TypePlanResult, e.Type)
require.Equal(t, structs.TypePlanResult, e.Type)
}
require.Len(t, allocs, 2)
require.Len(t, evals, 1)
@@ -446,9 +446,9 @@ func TestEventsFromChanges_BatchNodeUpdateDrainRequestType(t *testing.T) {
for _, e := range events {
require.Equal(t, 100, int(e.Index))
require.Equal(t, TypeNodeDrain, e.Type)
require.Equal(t, structs.TypeNodeDrain, e.Type)
require.Equal(t, structs.TopicNode, e.Topic)
ne := e.Payload.(*NodeEvent)
ne := e.Payload.(*structs.NodeStreamEvent)
require.Equal(t, event.Message, ne.Node.Events[len(ne.Node.Events)-1].Message)
}
}
@@ -485,9 +485,9 @@ func TestEventsFromChanges_NodeUpdateEligibilityRequestType(t *testing.T) {
for _, e := range events {
require.Equal(t, 100, int(e.Index))
require.Equal(t, TypeNodeDrain, e.Type)
require.Equal(t, structs.TypeNodeDrain, e.Type)
require.Equal(t, structs.TopicNode, e.Topic)
ne := e.Payload.(*NodeEvent)
ne := e.Payload.(*structs.NodeStreamEvent)
require.Equal(t, event.Message, ne.Node.Events[len(ne.Node.Events)-1].Message)
require.Equal(t, structs.NodeSchedulingIneligible, ne.Node.SchedulingEligibility)
}
@@ -540,7 +540,7 @@ func TestEventsFromChanges_AllocUpdateDesiredTransitionRequestType(t *testing.T)
require.Fail(t, "unexpected event type")
}
require.Equal(t, TypeAllocUpdateDesiredStatus, e.Type)
require.Equal(t, structs.TypeAllocUpdateDesiredStatus, e.Type)
}
require.Len(t, allocs, 1)
@@ -614,14 +614,14 @@ func TestEventsFromChanges_WithNodeDeregistration(t *testing.T) {
event := actual.Events[0]
require.Equal(t, TypeNodeDeregistration, event.Type)
require.Equal(t, structs.TypeNodeDeregistration, event.Type)
require.Equal(t, uint64(1), event.Index)
require.Equal(t, structs.TopicNode, event.Topic)
require.Equal(t, "some-id", event.Key)
require.Len(t, event.FilterKeys, 0)
nodeEvent, ok := event.Payload.(*NodeEvent)
nodeEvent, ok := event.Payload.(*structs.NodeStreamEvent)
require.True(t, ok)
require.Equal(t, *before, *nodeEvent.Node)
}
@@ -644,10 +644,10 @@ func TestNodeEventsFromChanges(t *testing.T) {
},
WantEvents: []structs.Event{{
Topic: structs.TopicNode,
Type: TypeNodeRegistration,
Type: structs.TypeNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Payload: &structs.NodeStreamEvent{
Node: testNode(),
},
}},
@@ -661,10 +661,10 @@ func TestNodeEventsFromChanges(t *testing.T) {
},
WantEvents: []structs.Event{{
Topic: structs.TopicNode,
Type: TypeNodeRegistration,
Type: structs.TypeNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Payload: &structs.NodeStreamEvent{
Node: testNode(nodeNotReady),
},
}},
@@ -681,10 +681,10 @@ func TestNodeEventsFromChanges(t *testing.T) {
},
WantEvents: []structs.Event{{
Topic: structs.TopicNode,
Type: TypeNodeDeregistration,
Type: structs.TypeNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Payload: &structs.NodeStreamEvent{
Node: testNode(),
},
}},
@@ -703,19 +703,19 @@ func TestNodeEventsFromChanges(t *testing.T) {
WantEvents: []structs.Event{
{
Topic: structs.TopicNode,
Type: TypeNodeDeregistration,
Type: structs.TypeNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Payload: &structs.NodeStreamEvent{
Node: testNode(),
},
},
{
Topic: structs.TopicNode,
Type: TypeNodeDeregistration,
Type: structs.TypeNodeDeregistration,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeEvent{
Payload: &structs.NodeStreamEvent{
Node: testNode(nodeIDTwo),
},
},
@@ -754,19 +754,19 @@ func TestNodeEventsFromChanges(t *testing.T) {
WantEvents: []structs.Event{
{
Topic: structs.TopicNode,
Type: TypeNodeEvent,
Type: structs.TypeNodeEvent,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Payload: &structs.NodeStreamEvent{
Node: testNode(),
},
},
{
Topic: structs.TopicNode,
Type: TypeNodeEvent,
Type: structs.TypeNodeEvent,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeEvent{
Payload: &structs.NodeStreamEvent{
Node: testNode(nodeIDTwo),
},
},
@@ -857,10 +857,10 @@ func TestNodeDrainEventFromChanges(t *testing.T) {
require.Len(t, got.Events, 1)
require.Equal(t, structs.TopicNode, got.Events[0].Topic)
require.Equal(t, TypeNodeDrain, got.Events[0].Type)
require.Equal(t, structs.TypeNodeDrain, got.Events[0].Type)
require.Equal(t, uint64(100), got.Events[0].Index)
nodeEvent, ok := got.Events[0].Payload.(*NodeEvent)
nodeEvent, ok := got.Events[0].Payload.(*structs.NodeStreamEvent)
require.True(t, ok)
require.Equal(t, structs.NodeSchedulingIneligible, nodeEvent.Node.SchedulingEligibility)
@@ -870,8 +870,8 @@ func TestNodeDrainEventFromChanges(t *testing.T) {
func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()
wantPayload := want.Payload.(*NodeEvent)
gotPayload := got.Payload.(*NodeEvent)
wantPayload := want.Payload.(*structs.NodeStreamEvent)
gotPayload := got.Payload.(*structs.NodeStreamEvent)
// Check payload equality for the fields that we can easily control
require.Equal(t, wantPayload.Node.Status, gotPayload.Node.Status)
@@ -882,15 +882,15 @@ func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
func requireNodeDeregistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()
wantPayload := want.Payload.(*NodeEvent)
gotPayload := got.Payload.(*NodeEvent)
wantPayload := want.Payload.(*structs.NodeStreamEvent)
gotPayload := got.Payload.(*structs.NodeStreamEvent)
require.Equal(t, wantPayload.Node.ID, gotPayload.Node.ID)
require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events)
}
func requireNodeEventEqual(t *testing.T, want, got structs.Event) {
gotPayload := got.Payload.(*NodeEvent)
gotPayload := got.Payload.(*structs.NodeStreamEvent)
require.Len(t, gotPayload.Node.Events, 3)
}

View File

@@ -31,6 +31,23 @@ const (
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicAll Topic = "*"
TypeNodeRegistration = "NodeRegistration"
TypeNodeDeregistration = "NodeDeregistration"
TypeNodeEligibilityUpdate = "NodeEligibility"
TypeNodeDrain = "NodeDrain"
TypeNodeEvent = "NodeStreamEvent"
TypeDeploymentUpdate = "DeploymentStatusUpdate"
TypeDeploymentPromotion = "DeploymentPromotion"
TypeDeploymentAllocHealth = "DeploymentAllocHealth"
TypeAllocCreated = "AllocCreated"
TypeAllocUpdated = "AllocUpdated"
TypeAllocUpdateDesiredStatus = "AllocUpdateDesiredStatus"
TypeEvalUpdated = "EvalUpdated"
TypeJobRegistered = "JobRegistered"
TypeJobDeregistered = "JobDeregistered"
TypeJobBatchDeregistered = "JobBatchDeregistered"
TypePlanResult = "PlanResult"
)
// Event represents a change in Nomads state.
@@ -167,3 +184,29 @@ func (e *EventSink) EqualSubscriptionValues(old *EventSink) bool {
e.Type == old.Type &&
reflect.DeepEqual(e.Topics, old.Topics)
}
// JobEvent holds a newly updated Job.
type JobEvent struct {
Job *Job
}
// EvalEvent holds a newly updated Eval.
type EvalEvent struct {
Eval *Evaluation
}
// AllocEvent holds a newly updated Allocation. The
// Allocs embedded Job has been removed to reduce size.
type AllocEvent struct {
Alloc *Allocation
}
// DeploymentEvent holds a newly updated Deployment.
type DeploymentEvent struct {
Deployment *Deployment
}
// NodeStreamEvent holds a newly updated Node
type NodeStreamEvent struct {
Node *Node
}