From db3bfb77b3baf5fd266f4e9ad08812cd271e60c1 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Tue, 1 Dec 2020 15:14:05 -0500 Subject: [PATCH] Events switch on memdb change table instead of type to prevent duplicates (#9486) * prevent duplicate job events when a job is updated, the job_version table is updated with a structs.Job, this caused there to be multiple job events since we are switching off the change type and not the table * test length * add table value to tests --- nomad/state/events.go | 69 +++++++++++++++++++++++++++++--------- nomad/state/events_test.go | 33 ++++++++++++++++++ 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/nomad/state/events.go b/nomad/state/events.go index 0888a480a..b49a67321 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -50,8 +50,12 @@ func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events { func eventFromChange(change memdb.Change) (structs.Event, bool) { if change.Deleted() { - switch before := change.Before.(type) { - case *structs.ACLToken: + switch change.Table { + case "acl_token": + before, ok := change.Before.(*structs.ACLToken) + if !ok { + return structs.Event{}, false + } return structs.Event{ Topic: structs.TopicACLToken, Key: before.AccessorID, @@ -59,7 +63,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { ACLToken: before, }, }, true - case *structs.ACLPolicy: + case "acl_policy": + before, ok := change.Before.(*structs.ACLPolicy) + if !ok { + return structs.Event{}, false + } return structs.Event{ Topic: structs.TopicACLPolicy, Key: before.Name, @@ -67,7 +75,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { ACLPolicy: before, }, }, true - case *structs.Node: + case "nodes": + before, ok := change.Before.(*structs.Node) + if !ok { + return structs.Event{}, false + } return structs.Event{ Topic: structs.TopicNode, Key: before.ID, @@ -76,28 +88,39 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { }, }, true } - return structs.Event{}, false } - switch after := change.After.(type) { - case *structs.ACLToken: + switch change.Table { + case "acl_token": + after, ok := change.After.(*structs.ACLToken) + if !ok { + return structs.Event{}, false + } return structs.Event{ Topic: structs.TopicACLToken, Key: after.AccessorID, - Payload: &structs.ACLTokenEvent{ + Payload: structs.ACLTokenEvent{ ACLToken: after, }, }, true - case *structs.ACLPolicy: + case "acl_policy": + after, ok := change.After.(*structs.ACLPolicy) + if !ok { + return structs.Event{}, false + } return structs.Event{ Topic: structs.TopicACLPolicy, Key: after.Name, - Payload: &structs.ACLPolicyEvent{ + Payload: structs.ACLPolicyEvent{ ACLPolicy: after, }, }, true - case *structs.Evaluation: + case "evals": + after, ok := change.After.(*structs.Evaluation) + if !ok { + return structs.Event{}, false + } return structs.Event{ Topic: structs.TopicEval, Key: after.ID, @@ -110,7 +133,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Eval: after, }, }, true - case *structs.Allocation: + case "allocs": + after, ok := change.After.(*structs.Allocation) + if !ok { + return structs.Event{}, false + } alloc := after.Copy() filterKeys := []string{ @@ -130,7 +157,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Alloc: alloc, }, }, true - case *structs.Job: + case "jobs": + after, ok := change.After.(*structs.Job) + if !ok { + return structs.Event{}, false + } return structs.Event{ Topic: structs.TopicJob, Key: after.ID, @@ -139,7 +170,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Job: after, }, }, true - case *structs.Node: + case "nodes": + after, ok := change.After.(*structs.Node) + if !ok { + return structs.Event{}, false + } return structs.Event{ Topic: structs.TopicNode, Key: after.ID, @@ -147,7 +182,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Node: after, }, }, true - case *structs.Deployment: + case "deployment": + after, ok := change.After.(*structs.Deployment) + if !ok { + return structs.Event{}, false + } return structs.Event{ Topic: structs.TopicDeployment, Key: after.ID, diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 58cb8a0c6..3b347f9b6 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -12,6 +12,36 @@ import ( "github.com/stretchr/testify/require" ) +// TestEventFromChange_SingleEventPerTable ensures that only a single event is +// created per table per memdb.Change +func TestEventFromChange_SingleEventPerTable(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + changes := Changes{ + Index: 100, + MsgType: structs.JobRegisterRequestType, + Changes: memdb.Changes{ + { + Table: "job_version", + Before: mock.Job(), + After: mock.Job(), + }, + { + Table: "jobs", + Before: mock.Job(), + After: mock.Job(), + }, + }, + } + + out := eventsFromChanges(s.db.ReadTxn(), changes) + require.Len(t, out.Events, 1) + require.Equal(t, out.Events[0].Type, structs.TypeJobRegistered) + +} + func TestEventsFromChanges_DeploymentUpdate(t *testing.T) { t.Parallel() s := TestStateStoreCfg(t, TestStateStorePublisher(t)) @@ -571,10 +601,12 @@ func TestEventsFromChanges_WithDeletion(t *testing.T) { Index: uint64(1), Changes: memdb.Changes{ { + Table: "jobs", Before: &structs.Job{}, After: &structs.Job{}, }, { + Table: "jobs", Before: &structs.Job{}, After: nil, // deleted }, @@ -600,6 +632,7 @@ func TestEventsFromChanges_WithNodeDeregistration(t *testing.T) { Index: uint64(1), Changes: memdb.Changes{ { + Table: "nodes", Before: before, After: nil, // deleted },