Events/event source node (#8918)

* Node Register/Deregister event sourcing

example upsert node with context

fill in writetxnwithctx

ctx passing to handle event type creation, wip test

node deregistration event

drop Node from registration event

* node batch deregistration
This commit is contained in:
Drew Bailey
2020-09-23 10:52:04 -04:00
parent 26263091fd
commit 8bd15b534f
6 changed files with 423 additions and 12 deletions

View File

@@ -1,6 +1,7 @@
package nomad
import (
"context"
"fmt"
"io"
"reflect"
@@ -195,9 +196,9 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
switch msgType {
case structs.NodeRegisterRequestType:
return n.applyUpsertNode(buf[1:], log.Index)
return n.applyUpsertNode(msgType, buf[1:], log.Index)
case structs.NodeDeregisterRequestType:
return n.applyDeregisterNode(buf[1:], log.Index)
return n.applyDeregisterNode(msgType, buf[1:], log.Index)
case structs.NodeUpdateStatusRequestType:
return n.applyStatusUpdate(buf[1:], log.Index)
case structs.NodeUpdateDrainRequestType:
@@ -310,17 +311,19 @@ func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} {
return nil
}
func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyUpsertNode(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now())
var req structs.NodeRegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)
// Handle upgrade paths
req.Node.Canonicalize()
if err := n.state.UpsertNode(index, req.Node); err != nil {
if err := n.state.UpsertNodeCtx(ctx, index, req.Node); err != nil {
n.logger.Error("UpsertNode failed", "error", err)
return err
}
@@ -334,14 +337,16 @@ func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
return nil
}
func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDeregisterNode(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now())
var req structs.NodeDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteNode(index, []string{req.NodeID}); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)
if err := n.state.DeleteNodeCtx(ctx, index, []string{req.NodeID}); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}

View File

@@ -0,0 +1,75 @@
package state
import (
"fmt"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
TopicNodeRegistration = "NodeRegistration"
TopicNodeDeregistration = "NodeDeregistration"
)
type NodeRegistrationEvent struct {
Event *structs.NodeEvent
NodeStatus string
}
type NodeDeregistrationEvent struct {
NodeID string
}
// NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set
// of transaction changes.
func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}
event := stream.Event{
Topic: TopicNodeRegistration,
Index: changes.Index,
Key: after.ID,
Payload: &NodeRegistrationEvent{
Event: after.Events[len(after.Events)-1],
NodeStatus: after.Status,
},
}
events = append(events, event)
}
}
return events, nil
}
// NodeDeregisterEventFromChanges generates a NodeDeregistrationEvent from a set
// of transaction changes.
func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
before, ok := change.Before.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}
event := stream.Event{
Topic: TopicNodeDeregistration,
Index: changes.Index,
Key: before.ID,
Payload: &NodeDeregistrationEvent{
NodeID: before.ID,
},
}
events = append(events, event)
}
}
return events, nil
}

View File

@@ -0,0 +1,211 @@
package state
import (
"testing"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestNodeRegisterEventFromChanges(t *testing.T) {
cases := []struct {
Name string
MsgType structs.MessageType
Setup func(s *StateStore, tx *txn) error
Mutate func(s *StateStore, tx *txn) error
WantEvents []stream.Event
WantErr bool
WantTopic string
}{
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: TopicNodeRegistration,
Name: "node registered",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
WantEvents: []stream.Event{{
Topic: TopicNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeRegistrationEvent{
Event: &structs.NodeEvent{
Message: "Node registered",
Subsystem: "Cluster",
},
NodeStatus: structs.NodeStatusReady,
},
}},
WantErr: false,
},
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: TopicNodeRegistration,
Name: "node registered initializing",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady))
},
WantEvents: []stream.Event{{
Topic: TopicNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeRegistrationEvent{
Event: &structs.NodeEvent{
Message: "Node registered",
Subsystem: "Cluster",
},
NodeStatus: structs.NodeStatusInit,
},
}},
WantErr: false,
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: TopicNodeDeregistration,
Name: "node deregistered",
Setup: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID()})
},
WantEvents: []stream.Event{{
Topic: TopicNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeDeregistrationEvent{
NodeID: testNodeID(),
},
}},
WantErr: false,
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: TopicNodeDeregistration,
Name: "batch node deregistered",
Setup: func(s *StateStore, tx *txn) error {
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID(), testNodeIDTwo()})
},
WantEvents: []stream.Event{
{
Topic: TopicNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeDeregistrationEvent{
NodeID: testNodeID(),
},
},
{
Topic: TopicNodeDeregistration,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeDeregistrationEvent{
NodeID: testNodeIDTwo(),
},
},
},
WantErr: false,
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventPublisher()
if tc.Setup != nil {
// Bypass publish mechanism for setup
setupTx := s.db.WriteTxn(10)
require.NoError(t, tc.Setup(s, setupTx))
setupTx.Txn.Commit()
}
tx := s.db.WriteTxn(100)
require.NoError(t, tc.Mutate(s, tx))
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType}
got, err := processDBChanges(tx, changes)
if tc.WantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, len(tc.WantEvents), len(got))
for idx, g := range got {
switch tc.MsgType {
case structs.NodeRegisterRequestType:
requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.NodeDeregisterRequestType:
requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g)
}
}
})
}
}
func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()
require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)
wantPayload := want.Payload.(*NodeRegistrationEvent)
gotPayload := got.Payload.(*NodeRegistrationEvent)
// Check payload equality for the fields that we can easily control
require.Equal(t, wantPayload.NodeStatus, gotPayload.NodeStatus)
require.Equal(t, wantPayload.Event.Message, gotPayload.Event.Message)
require.Equal(t, wantPayload.Event.Subsystem, gotPayload.Event.Subsystem)
}
func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()
require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)
wantPayload := want.Payload.(*NodeDeregistrationEvent)
gotPayload := got.Payload.(*NodeDeregistrationEvent)
require.Equal(t, wantPayload, gotPayload)
}
type nodeOpts func(n *structs.Node)
func nodeNotReady(n *structs.Node) {
n.Status = structs.NodeStatusInit
}
func nodeIDTwo(n *structs.Node) {
n.ID = testNodeIDTwo()
}
func testNode(opts ...nodeOpts) *structs.Node {
n := mock.Node()
n.ID = testNodeID()
n.SecretID = "ab9812d3-6a21-40d3-973d-d9d2174a23ee"
for _, opt := range opts {
opt(n)
}
return n
}
func testNodeID() string {
return "9d5741c1-3899-498a-98dd-eb3c05665863"
}
func testNodeIDTwo() string {
return "694ff31d-8c59-4030-ac83-e15692560c8d"
}

View File

@@ -1,10 +1,16 @@
package state
import (
"context"
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
CtxMsgType = "type"
)
// ReadTxn is implemented by memdb.Txn to perform read operations.
@@ -21,6 +27,7 @@ type Changes struct {
// Index is the latest index at the time these changes were committed.
Index uint64
Changes memdb.Changes
MsgType structs.MessageType
}
// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
@@ -81,6 +88,18 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
return t
}
// WriteTxnCtx is identical to WriteTxn but takes a ctx used for event sourcing
func (c *changeTrackerDB) WriteTxnCtx(ctx context.Context, idx uint64) *txn {
t := &txn{
ctx: ctx,
Txn: c.db.Txn(true),
Index: idx,
publish: c.publish,
}
t.Txn.TrackChanges()
return t
}
func (c *changeTrackerDB) publish(changes Changes) error {
readOnlyTx := c.db.Txn(false)
defer readOnlyTx.Abort()
@@ -113,6 +132,9 @@ func (c *changeTrackerDB) WriteTxnRestore() *txn {
// error. Any errors from the callback would be lost, which would result in a
// missing change event, even though the state store had changed.
type txn struct {
// ctx is used to hold message type information from an FSM request
ctx context.Context
*memdb.Txn
// Index in raft where the write is occurring. The value is zero for a
// read-only, or WriteTxnRestore transaction.
@@ -136,6 +158,7 @@ func (tx *txn) Commit() error {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
MsgType: tx.MsgType(),
}
if err := tx.publish(changes); err != nil {
return err
@@ -146,7 +169,35 @@ func (tx *txn) Commit() error {
return nil
}
// MsgType returns a MessageType from the txn's context.
// If the context is empty or the value isn't set IgnoreUnknownTypeFlag will
// be returned to signal that the MsgType is unknown.
func (tx *txn) MsgType() structs.MessageType {
if tx.ctx == nil {
return structs.IgnoreUnknownTypeFlag
}
raw := tx.ctx.Value(CtxMsgType)
if raw == nil {
return structs.IgnoreUnknownTypeFlag
}
msgType, ok := raw.(structs.MessageType)
if !ok {
return structs.IgnoreUnknownTypeFlag
}
return msgType
}
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
// TODO: add handlers here.
switch changes.MsgType {
case structs.IgnoreUnknownTypeFlag:
// unknown event type
return []stream.Event{}, nil
case structs.NodeRegisterRequestType:
return NodeRegisterEventFromChanges(tx, changes)
case structs.NodeDeregisterRequestType:
return NodeDeregisterEventFromChanges(tx, changes)
}
return []stream.Event{}, nil
}

View File

@@ -206,10 +206,16 @@ func (s *StateStore) AbandonCh() <-chan struct{} {
// Abandon is used to signal that the given state store has been abandoned.
// Calling this more than one time will panic.
func (s *StateStore) Abandon() {
s.stopEventPublisher()
s.StopEventPublisher()
close(s.abandonCh)
}
// StopStopEventPublisher calls the cancel func for the state stores event
// publisher. It should be called during server shutdown.
func (s *StateStore) StopEventPublisher() {
s.stopEventPublisher()
}
// QueryFn is the definition of a function that can be used to implement a basic
// blocking query against the state store.
type QueryFn func(memdb.WatchSet, *StateStore) (resp interface{}, index uint64, err error)
@@ -740,6 +746,21 @@ func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID stri
return nil, 0, nil
}
// UpsertNodeCtx is used to register a node or update a node definition
// This is assumed to be triggered by the client, so we retain the value
// of drain/eligibility which is set by the scheduler.
func (s *StateStore) UpsertNodeCtx(ctx context.Context, index uint64, node *structs.Node) error {
txn := s.db.WriteTxnCtx(ctx, index)
defer txn.Abort()
err := upsertNodeTxn(txn, index, node)
if err != nil {
return nil
}
txn.Commit()
return nil
}
// UpsertNode is used to register a node or update a node definition
// This is assumed to be triggered by the client, so we retain the value
// of drain/eligibility which is set by the scheduler.
@@ -747,6 +768,15 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()
err := upsertNodeTxn(txn, index, node)
if err != nil {
return nil
}
txn.Commit()
return nil
}
func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
// Check if the node already exists
existing, err := txn.First("nodes", "id", node.ID)
if err != nil {
@@ -795,19 +825,40 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
return fmt.Errorf("csi plugin update failed: %v", err)
}
return nil
}
// DeleteNode deregisters a batch of nodes
func (s *StateStore) DeleteNodeCtx(ctx context.Context, index uint64, nodes []string) error {
txn := s.db.WriteTxnCtx(ctx, index)
defer txn.Abort()
err := deleteNodeTxn(txn, index, nodes)
if err != nil {
return nil
}
txn.Commit()
return nil
}
// DeleteNode deregisters a batch of nodes
func (s *StateStore) DeleteNode(index uint64, nodes []string) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()
err := deleteNodeTxn(txn, index, nodes)
if err != nil {
return nil
}
txn.Commit()
return nil
}
func deleteNodeTxn(txn *txn, index uint64, nodes []string) error {
if len(nodes) == 0 {
return fmt.Errorf("node ids missing")
}
txn := s.db.WriteTxn(index)
defer txn.Abort()
for _, nodeID := range nodes {
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
@@ -832,7 +883,6 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error {
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}

View File

@@ -24,6 +24,25 @@ func TestStateStore(t testing.T) *StateStore {
return state
}
func TestStateStorePublisher(t testing.T) *StateStoreConfig {
return &StateStoreConfig{
Logger: testlog.HCLogger(t),
Region: "global",
EnablePublisher: true,
}
}
func TestStateStoreCfg(t testing.T, cfg *StateStoreConfig) *StateStore {
state, err := NewStateStore(cfg)
if err != nil {
t.Fatalf("err: %v", err)
}
if state == nil {
t.Fatalf("missing state")
}
return state
}
// CreateTestCSIPlugin is a helper that generates the node + fingerprint results necessary
// to create a CSIPlugin by directly inserting into the state store. The plugin requires a
// controller.