events: Use single eventsFromChanges func (#9281)

This commit is contained in:
Kris Hicks
2020-11-05 13:06:52 -08:00
committed by GitHub
parent b11d067584
commit f2d2669f97
6 changed files with 378 additions and 409 deletions

View File

@@ -62,6 +62,7 @@ type JobDrainDetails struct {
var MsgTypeEvents = map[structs.MessageType]string{
structs.NodeRegisterRequestType: TypeNodeRegistration,
structs.NodeDeregisterRequestType: TypeNodeDeregistration,
structs.UpsertNodeEventsType: TypeNodeEvent,
structs.EvalUpdateRequestType: TypeEvalUpdated,
structs.AllocClientUpdateRequestType: TypeAllocUpdated,
@@ -80,20 +81,14 @@ var MsgTypeEvents = map[structs.MessageType]string{
structs.ApplyPlanResultsRequestType: TypePlanResult,
}
// GenericEventsFromChanges returns a set of events for a given set of
// transaction changes. It currently ignores Delete operations.
func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) {
func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
eventType, ok := MsgTypeEvents[changes.MsgType]
if !ok {
return nil, nil
return nil
}
var events []structs.Event
for _, change := range changes.Changes {
if change.Deleted() {
continue
}
if event, ok := eventFromChange(change); ok {
event.Type = eventType
event.Index = changes.Index
@@ -101,10 +96,25 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err
}
}
return &structs.Events{Index: changes.Index, Events: events}, nil
return &structs.Events{Index: changes.Index, Events: events}
}
func eventFromChange(change memdb.Change) (structs.Event, bool) {
if change.Deleted() {
switch before := change.Before.(type) {
case *structs.Node:
return structs.Event{
Topic: structs.TopicNode,
Key: before.ID,
Payload: &NodeEvent{
Node: before,
},
}, true
}
return structs.Event{}, false
}
switch after := change.After.(type) {
case *structs.Evaluation:
return structs.Event{

View File

@@ -12,12 +12,7 @@ import (
"github.com/stretchr/testify/require"
)
// structs.AllocClientUpdateRequestType:
// structs.AllocUpdateRequestType
// JobDeregisterRequestType
// jobregisterrequesttype
func TestGenericEventsFromChanges_DeploymentUpdate(t *testing.T) {
func TestEventsFromChanges_DeploymentUpdate(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
@@ -63,7 +58,7 @@ func TestGenericEventsFromChanges_DeploymentUpdate(t *testing.T) {
require.Contains(t, got.FilterKeys, j.ID)
}
func TestGenericEventsFromChanges_DeploymentPromotion(t *testing.T) {
func TestEventsFromChanges_DeploymentPromotion(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
@@ -140,7 +135,7 @@ func TestGenericEventsFromChanges_DeploymentPromotion(t *testing.T) {
require.Equal(t, TypeDeploymentPromotion, got.Type)
}
func TestGenericEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T) {
func TestEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
@@ -236,7 +231,7 @@ func TestGenericEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T)
}
}
func TestGenericEventsFromChanges_UpsertNodeEventsType(t *testing.T) {
func TestEventsFromChanges_UpsertNodeEventsType(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
@@ -277,7 +272,7 @@ func TestGenericEventsFromChanges_UpsertNodeEventsType(t *testing.T) {
}
func TestGenericEventsFromChanges_NodeUpdateStatusRequest(t *testing.T) {
func TestEventsFromChanges_NodeUpdateStatusRequest(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
@@ -308,7 +303,7 @@ func TestGenericEventsFromChanges_NodeUpdateStatusRequest(t *testing.T) {
require.Equal(t, structs.NodeStatusDown, event.Node.Status)
}
func TestGenericEventsFromChanges_EvalUpdateRequestType(t *testing.T) {
func TestEventsFromChanges_EvalUpdateRequestType(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
@@ -342,7 +337,7 @@ func TestGenericEventsFromChanges_EvalUpdateRequestType(t *testing.T) {
require.Equal(t, "blocked", event.Eval.Status)
}
func TestGenericEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) {
func TestEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
@@ -403,7 +398,7 @@ func TestGenericEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) {
require.Len(t, deploys, 1)
}
func TestGenericEventsFromChanges_BatchNodeUpdateDrainRequestType(t *testing.T) {
func TestEventsFromChanges_BatchNodeUpdateDrainRequestType(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
@@ -458,7 +453,7 @@ func TestGenericEventsFromChanges_BatchNodeUpdateDrainRequestType(t *testing.T)
}
}
func TestGenericEventsFromChanges_NodeUpdateEligibilityRequestType(t *testing.T) {
func TestEventsFromChanges_NodeUpdateEligibilityRequestType(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
@@ -498,7 +493,7 @@ func TestGenericEventsFromChanges_NodeUpdateEligibilityRequestType(t *testing.T)
}
}
func TestGenericEventsFromChanges_AllocUpdateDesiredTransitionRequestType(t *testing.T) {
func TestEventsFromChanges_AllocUpdateDesiredTransitionRequestType(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
@@ -552,24 +547,24 @@ func TestGenericEventsFromChanges_AllocUpdateDesiredTransitionRequestType(t *tes
require.Len(t, evalEvents, 1)
}
func TestGenericEventsFromChanges_JobBatchDeregisterRequestType(t *testing.T) {
func TestEventsFromChanges_JobBatchDeregisterRequestType(t *testing.T) {
// TODO Job batch deregister logic mostly occurs in the FSM
t.SkipNow()
}
func TestGenericEventsFromChanges_AllocClientUpdateRequestType(t *testing.T) {
func TestEventsFromChanges_AllocClientUpdateRequestType(t *testing.T) {
t.SkipNow()
}
func TestGenericEventsFromChanges_AllocUpdateRequestType(t *testing.T) {
func TestEventsFromChanges_AllocUpdateRequestType(t *testing.T) {
t.SkipNow()
}
func TestGenericEventsFromChanges_JobDeregisterRequestType(t *testing.T) {
func TestEventsFromChanges_JobDeregisterRequestType(t *testing.T) {
t.SkipNow()
}
func TestGenericEventsFromChanges_WithDeletion(t *testing.T) {
func TestEventsFromChanges_WithDeletion(t *testing.T) {
t.Parallel()
changes := Changes{
@@ -587,9 +582,345 @@ func TestGenericEventsFromChanges_WithDeletion(t *testing.T) {
MsgType: structs.JobDeregisterRequestType,
}
event, err := GenericEventsFromChanges(nil, changes)
require.NoError(t, err)
event := eventsFromChanges(nil, changes)
require.NotNil(t, event)
require.Len(t, event.Events, 1)
}
func TestEventsFromChanges_WithNodeDeregistration(t *testing.T) {
t.Parallel()
before := &structs.Node{
ID: "some-id",
Datacenter: "some-datacenter",
}
changes := Changes{
Index: uint64(1),
Changes: memdb.Changes{
{
Before: before,
After: nil, // deleted
},
},
MsgType: structs.NodeDeregisterRequestType,
}
actual := eventsFromChanges(nil, changes)
require.NotNil(t, actual)
require.Len(t, actual.Events, 1)
event := actual.Events[0]
require.Equal(t, TypeNodeDeregistration, event.Type)
require.Equal(t, uint64(1), event.Index)
require.Equal(t, structs.TopicNode, event.Topic)
require.Equal(t, "some-id", event.Key)
require.Len(t, event.FilterKeys, 0)
nodeEvent, ok := event.Payload.(*NodeEvent)
require.True(t, ok)
require.Equal(t, *before, *nodeEvent.Node)
}
func TestNodeEventsFromChanges(t *testing.T) {
cases := []struct {
Name string
MsgType structs.MessageType
Setup func(s *StateStore, tx *txn) error
Mutate func(s *StateStore, tx *txn) error
WantEvents []structs.Event
WantTopic structs.Topic
}{
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: structs.TopicNode,
Name: "node registered",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
WantEvents: []structs.Event{{
Topic: structs.TopicNode,
Type: TypeNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(),
},
}},
},
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: structs.TopicNode,
Name: "node registered initializing",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady))
},
WantEvents: []structs.Event{{
Topic: structs.TopicNode,
Type: TypeNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(nodeNotReady),
},
}},
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: structs.TopicNode,
Name: "node deregistered",
Setup: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID()})
},
WantEvents: []structs.Event{{
Topic: structs.TopicNode,
Type: TypeNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(),
},
}},
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: structs.TopicNode,
Name: "batch node deregistered",
Setup: func(s *StateStore, tx *txn) error {
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID(), testNodeIDTwo()})
},
WantEvents: []structs.Event{
{
Topic: structs.TopicNode,
Type: TypeNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(),
},
},
{
Topic: structs.TopicNode,
Type: TypeNodeDeregistration,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(nodeIDTwo),
},
},
},
},
{
MsgType: structs.UpsertNodeEventsType,
WantTopic: structs.TopicNode,
Name: "batch node events upserted",
Setup: func(s *StateStore, tx *txn) error {
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
},
Mutate: func(s *StateStore, tx *txn) error {
eventFn := func(id string) []*structs.NodeEvent {
return []*structs.NodeEvent{
{
Message: "test event one",
Subsystem: "Cluster",
Details: map[string]string{
"NodeID": id,
},
},
{
Message: "test event two",
Subsystem: "Cluster",
Details: map[string]string{
"NodeID": id,
},
},
}
}
require.NoError(t, s.upsertNodeEvents(tx.Index, testNodeID(), eventFn(testNodeID()), tx))
return s.upsertNodeEvents(tx.Index, testNodeIDTwo(), eventFn(testNodeIDTwo()), tx)
},
WantEvents: []structs.Event{
{
Topic: structs.TopicNode,
Type: TypeNodeEvent,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(),
},
},
{
Topic: structs.TopicNode,
Type: TypeNodeEvent,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(nodeIDTwo),
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
if tc.Setup != nil {
// Bypass publish mechanism for setup
setupTx := s.db.WriteTxn(10)
require.NoError(t, tc.Setup(s, setupTx))
setupTx.Txn.Commit()
}
tx := s.db.WriteTxnMsgT(tc.MsgType, 100)
require.NoError(t, tc.Mutate(s, tx))
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType}
got := eventsFromChanges(tx, changes)
require.NotNil(t, got)
require.Equal(t, len(tc.WantEvents), len(got.Events))
for idx, g := range got.Events {
// assert equality of shared fields
want := tc.WantEvents[idx]
require.Equal(t, want.Index, g.Index)
require.Equal(t, want.Key, g.Key)
require.Equal(t, want.Topic, g.Topic)
switch tc.MsgType {
case structs.NodeRegisterRequestType:
requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.NodeDeregisterRequestType:
requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.UpsertNodeEventsType:
requireNodeEventEqual(t, tc.WantEvents[idx], g)
default:
require.Fail(t, "unhandled message type")
}
}
})
}
}
func TestNodeDrainEventFromChanges(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
// setup
setupTx := s.db.WriteTxn(10)
node := mock.Node()
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc1.NodeID = node.ID
alloc2.NodeID = node.ID
require.NoError(t, upsertNodeTxn(setupTx, 10, node))
require.NoError(t, s.upsertAllocsImpl(100, []*structs.Allocation{alloc1, alloc2}, setupTx))
setupTx.Txn.Commit()
// changes
tx := s.db.WriteTxn(100)
strat := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Minute,
IgnoreSystemJobs: false,
},
StartedAt: time.Now(),
}
markEligible := false
updatedAt := time.Now()
event := &structs.NodeEvent{}
require.NoError(t, s.updateNodeDrainImpl(tx, 100, node.ID, strat, markEligible, updatedAt.UnixNano(), event))
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: structs.NodeUpdateDrainRequestType}
got := eventsFromChanges(tx, changes)
require.Len(t, got.Events, 1)
require.Equal(t, structs.TopicNode, got.Events[0].Topic)
require.Equal(t, TypeNodeDrain, got.Events[0].Type)
require.Equal(t, uint64(100), got.Events[0].Index)
nodeEvent, ok := got.Events[0].Payload.(*NodeEvent)
require.True(t, ok)
require.Equal(t, structs.NodeSchedulingIneligible, nodeEvent.Node.SchedulingEligibility)
require.Equal(t, strat, nodeEvent.Node.DrainStrategy)
}
func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()
wantPayload := want.Payload.(*NodeEvent)
gotPayload := got.Payload.(*NodeEvent)
// Check payload equality for the fields that we can easily control
require.Equal(t, wantPayload.Node.Status, gotPayload.Node.Status)
require.Equal(t, wantPayload.Node.ID, gotPayload.Node.ID)
require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events)
}
func requireNodeDeregistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()
wantPayload := want.Payload.(*NodeEvent)
gotPayload := got.Payload.(*NodeEvent)
require.Equal(t, wantPayload.Node.ID, gotPayload.Node.ID)
require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events)
}
func requireNodeEventEqual(t *testing.T, want, got structs.Event) {
gotPayload := got.Payload.(*NodeEvent)
require.Len(t, gotPayload.Node.Events, 3)
}
type nodeOpts func(n *structs.Node)
func nodeNotReady(n *structs.Node) {
n.Status = structs.NodeStatusInit
}
func nodeIDTwo(n *structs.Node) {
n.ID = testNodeIDTwo()
}
func testNode(opts ...nodeOpts) *structs.Node {
n := mock.Node()
n.ID = testNodeID()
n.SecretID = "ab9812d3-6a21-40d3-973d-d9d2174a23ee"
for _, opt := range opts {
opt(n)
}
return n
}
func testNodeID() string {
return "9d5741c1-3899-498a-98dd-eb3c05665863"
}
func testNodeIDTwo() string {
return "694ff31d-8c59-4030-ac83-e15692560c8d"
}

View File

@@ -1,34 +0,0 @@
package state
import (
"fmt"
"github.com/hashicorp/nomad/nomad/structs"
)
// NodeDeregisterEventFromChanges generates a NodeDeregistrationEvent from a set
// of transaction changes.
func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) {
var events []structs.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
before, ok := change.Before.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}
event := structs.Event{
Topic: structs.TopicNode,
Type: TypeNodeDeregistration,
Index: changes.Index,
Key: before.ID,
Payload: &NodeEvent{
Node: before,
},
}
events = append(events, event)
}
}
return &structs.Events{Index: changes.Index, Events: events}, nil
}

View File

@@ -1,321 +0,0 @@
package state
import (
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestNodeEventsFromChanges(t *testing.T) {
cases := []struct {
Name string
MsgType structs.MessageType
Setup func(s *StateStore, tx *txn) error
Mutate func(s *StateStore, tx *txn) error
WantEvents []structs.Event
WantErr bool
WantTopic structs.Topic
}{
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: structs.TopicNode,
Name: "node registered",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
WantEvents: []structs.Event{{
Topic: structs.TopicNode,
Type: TypeNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(),
},
}},
WantErr: false,
},
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: structs.TopicNode,
Name: "node registered initializing",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady))
},
WantEvents: []structs.Event{{
Topic: structs.TopicNode,
Type: TypeNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(nodeNotReady),
},
}},
WantErr: false,
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: structs.TopicNode,
Name: "node deregistered",
Setup: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID()})
},
WantEvents: []structs.Event{{
Topic: structs.TopicNode,
Type: TypeNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(),
},
}},
WantErr: false,
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: structs.TopicNode,
Name: "batch node deregistered",
Setup: func(s *StateStore, tx *txn) error {
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID(), testNodeIDTwo()})
},
WantEvents: []structs.Event{
{
Topic: structs.TopicNode,
Type: TypeNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(),
},
},
{
Topic: structs.TopicNode,
Type: TypeNodeDeregistration,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(nodeIDTwo),
},
},
},
WantErr: false,
},
{
MsgType: structs.UpsertNodeEventsType,
WantTopic: structs.TopicNode,
Name: "batch node events upserted",
Setup: func(s *StateStore, tx *txn) error {
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
},
Mutate: func(s *StateStore, tx *txn) error {
eventFn := func(id string) []*structs.NodeEvent {
return []*structs.NodeEvent{
{
Message: "test event one",
Subsystem: "Cluster",
Details: map[string]string{
"NodeID": id,
},
},
{
Message: "test event two",
Subsystem: "Cluster",
Details: map[string]string{
"NodeID": id,
},
},
}
}
require.NoError(t, s.upsertNodeEvents(tx.Index, testNodeID(), eventFn(testNodeID()), tx))
return s.upsertNodeEvents(tx.Index, testNodeIDTwo(), eventFn(testNodeIDTwo()), tx)
},
WantEvents: []structs.Event{
{
Topic: structs.TopicNode,
Type: TypeNodeEvent,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(),
},
},
{
Topic: structs.TopicNode,
Type: TypeNodeEvent,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(nodeIDTwo),
},
},
},
WantErr: false,
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
if tc.Setup != nil {
// Bypass publish mechanism for setup
setupTx := s.db.WriteTxn(10)
require.NoError(t, tc.Setup(s, setupTx))
setupTx.Txn.Commit()
}
tx := s.db.WriteTxnMsgT(tc.MsgType, 100)
require.NoError(t, tc.Mutate(s, tx))
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType}
got, err := processDBChanges(tx, changes)
if tc.WantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.NotNil(t, got)
require.Equal(t, len(tc.WantEvents), len(got.Events))
for idx, g := range got.Events {
// assert equality of shared fields
want := tc.WantEvents[idx]
require.Equal(t, want.Index, g.Index)
require.Equal(t, want.Key, g.Key)
require.Equal(t, want.Topic, g.Topic)
switch tc.MsgType {
case structs.NodeRegisterRequestType:
requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.NodeDeregisterRequestType:
requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.UpsertNodeEventsType:
requireNodeEventEqual(t, tc.WantEvents[idx], g)
default:
require.Fail(t, "unhandled message type")
}
}
})
}
}
func TestNodeDrainEventFromChanges(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
// setup
setupTx := s.db.WriteTxn(10)
node := mock.Node()
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc1.NodeID = node.ID
alloc2.NodeID = node.ID
require.NoError(t, upsertNodeTxn(setupTx, 10, node))
require.NoError(t, s.upsertAllocsImpl(100, []*structs.Allocation{alloc1, alloc2}, setupTx))
setupTx.Txn.Commit()
// changes
tx := s.db.WriteTxn(100)
strat := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Minute,
IgnoreSystemJobs: false,
},
StartedAt: time.Now(),
}
markEligible := false
updatedAt := time.Now()
event := &structs.NodeEvent{}
require.NoError(t, s.updateNodeDrainImpl(tx, 100, node.ID, strat, markEligible, updatedAt.UnixNano(), event))
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: structs.NodeUpdateDrainRequestType}
got, err := processDBChanges(tx, changes)
require.NoError(t, err)
require.Len(t, got.Events, 1)
require.Equal(t, structs.TopicNode, got.Events[0].Topic)
require.Equal(t, TypeNodeDrain, got.Events[0].Type)
require.Equal(t, uint64(100), got.Events[0].Index)
nodeEvent, ok := got.Events[0].Payload.(*NodeEvent)
require.True(t, ok)
require.Equal(t, structs.NodeSchedulingIneligible, nodeEvent.Node.SchedulingEligibility)
require.Equal(t, strat, nodeEvent.Node.DrainStrategy)
}
func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()
wantPayload := want.Payload.(*NodeEvent)
gotPayload := got.Payload.(*NodeEvent)
// Check payload equality for the fields that we can easily control
require.Equal(t, wantPayload.Node.Status, gotPayload.Node.Status)
require.Equal(t, wantPayload.Node.ID, gotPayload.Node.ID)
require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events)
}
func requireNodeDeregistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()
wantPayload := want.Payload.(*NodeEvent)
gotPayload := got.Payload.(*NodeEvent)
require.Equal(t, wantPayload.Node.ID, gotPayload.Node.ID)
require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events)
}
func requireNodeEventEqual(t *testing.T, want, got structs.Event) {
gotPayload := got.Payload.(*NodeEvent)
require.Len(t, gotPayload.Node.Events, 3)
}
type nodeOpts func(n *structs.Node)
func nodeNotReady(n *structs.Node) {
n.Status = structs.NodeStatusInit
}
func nodeIDTwo(n *structs.Node) {
n.ID = testNodeIDTwo()
}
func testNode(opts ...nodeOpts) *structs.Node {
n := mock.Node()
n.ID = testNodeID()
n.SecretID = "ab9812d3-6a21-40d3-973d-d9d2174a23ee"
for _, opt := range opts {
opt(n)
}
return n
}
func testNodeID() string {
return "9d5741c1-3899-498a-98dd-eb3c05665863"
}
func testNodeIDTwo() string {
return "694ff31d-8c59-4030-ac83-e15692560c8d"
}

View File

@@ -1,8 +1,6 @@
package state
import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
@@ -31,7 +29,7 @@ type Changes struct {
type changeTrackerDB struct {
memdb *memdb.MemDB
publisher *stream.EventBroker
processChanges func(ReadTxn, Changes) (*structs.Events, error)
processChanges changeProcessor
}
func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventBroker, changesFn changeProcessor) *changeTrackerDB {
@@ -42,9 +40,9 @@ func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventBroker, changesF
}
}
type changeProcessor func(ReadTxn, Changes) (*structs.Events, error)
type changeProcessor func(ReadTxn, Changes) *structs.Events
func noOpProcessChanges(ReadTxn, Changes) (*structs.Events, error) { return nil, nil }
func noOpProcessChanges(ReadTxn, Changes) *structs.Events { return nil }
// ReadTxn returns a read-only transaction which behaves exactly the same as
// memdb.Txn
@@ -91,14 +89,11 @@ func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) {
readOnlyTx := c.memdb.Txn(false)
defer readOnlyTx.Abort()
events, err := c.processChanges(readOnlyTx, changes)
if err != nil {
return nil, fmt.Errorf("failed generating events from changes: %v", err)
}
events := c.processChanges(readOnlyTx, changes)
if events != nil {
c.publisher.Publish(events)
}
return events, nil
}
@@ -166,15 +161,3 @@ func (tx *txn) Commit() error {
func (tx *txn) MsgType() structs.MessageType {
return tx.msgType
}
func processDBChanges(tx ReadTxn, changes Changes) (*structs.Events, error) {
switch changes.MsgType {
case structs.IgnoreUnknownTypeFlag:
// unknown event type
return nil, nil
case structs.NodeDeregisterRequestType:
return NodeDeregisterEventFromChanges(tx, changes)
default:
return GenericEventsFromChanges(tx, changes)
}
}

View File

@@ -100,7 +100,7 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
EventBufferSize: config.EventBufferSize,
Logger: config.Logger,
})
s.db = NewChangeTrackerDB(db, broker, processDBChanges)
s.db = NewChangeTrackerDB(db, broker, eventsFromChanges)
} else {
s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges)
}