diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index 841445cb3..586b5e5ab 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -30,13 +30,21 @@ const ( // NodeDeadlineCoalesceWindow is the duration in which deadlining nodes will // be coalesced together NodeDeadlineCoalesceWindow = 5 * time.Second + + // NodeDrainEventComplete is used to indicate that the node drain is + // finished. + NodeDrainEventComplete = "Node drain complete" + + // NodeDrainEventDetailDeadlined is the key to use when the drain is + // complete because a deadline. The acceptable values are "true" and "false" + NodeDrainEventDetailDeadlined = "deadline_reached" ) // RaftApplier contains methods for applying the raft requests required by the // NodeDrainer. type RaftApplier interface { AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error) - NodesDrainComplete(nodes []string) (uint64, error) + NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error) } // NodeTracker is the interface to notify an object that is tracking draining @@ -254,10 +262,16 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) { n.l.RUnlock() n.batchDrainAllocs(forceStop) + // Create the node event + event := structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemDrain). + SetMessage(NodeDrainEventComplete). + AddDetail(NodeDrainEventDetailDeadlined, "true") + // Submit the node transitions in a sharded form to ensure a reasonable // Raft transaction size. for _, nodes := range partitionIds(defaultMaxIdsPerTxn, nodes) { - if _, err := n.raft.NodesDrainComplete(nodes); err != nil { + if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err) } } @@ -324,10 +338,15 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) { } } + // Create the node event + event := structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemDrain). + SetMessage(NodeDrainEventComplete) + // Submit the node transitions in a sharded form to ensure a reasonable // Raft transaction size. for _, nodes := range partitionIds(defaultMaxIdsPerTxn, done) { - if _, err := n.raft.NodesDrainComplete(nodes); err != nil { + if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err) } } diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index 8862abd23..ac23b1668 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -102,7 +102,12 @@ func (n *NodeDrainer) Update(node *structs.Node) { } } - index, err := n.raft.NodesDrainComplete([]string{node.ID}) + // Create the node event + event := structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemDrain). + SetMessage(NodeDrainEventComplete) + + index, err := n.raft.NodesDrainComplete([]string{node.ID}, event) if err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err) } else { diff --git a/nomad/drainer/watch_nodes_test.go b/nomad/drainer/watch_nodes_test.go index 476c7a39b..74a62e852 100644 --- a/nomad/drainer/watch_nodes_test.go +++ b/nomad/drainer/watch_nodes_test.go @@ -97,7 +97,7 @@ func TestNodeDrainWatcher_Remove(t *testing.T) { require.Equal(n, tracked[n.ID]) // Change the node to be not draining and wait for it to be untracked - require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false)) + require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, nil)) testutil.WaitForResult(func() (bool, error) { return len(m.Events) == 2, nil }, func(err error) { @@ -175,7 +175,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) { // Change the node to have a new spec s2 := n.DrainStrategy.Copy() s2.Deadline += time.Hour - require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false)) + require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, nil)) // Wait for it to be updated testutil.WaitForResult(func() (bool, error) { diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 81d8625ca..379aee340 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -12,6 +12,7 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/drainer" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -212,6 +213,12 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) } func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { @@ -300,6 +307,13 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) } func TestDrainer_DrainEmptyNode(t *testing.T) { @@ -343,6 +357,12 @@ func TestDrainer_DrainEmptyNode(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) } func TestDrainer_AllTypes_Deadline(t *testing.T) { @@ -500,6 +520,13 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { } } require.True(serviceMax < batchMax) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) } // Test that drain is unset when batch jobs naturally finish @@ -659,6 +686,12 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) } func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { @@ -824,6 +857,13 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) } // Test that transitions to force drain work. @@ -962,6 +1002,13 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 4) + require.Equal(drainer.NodeDrainEventComplete, node.Events[3].Message) + require.Contains(node.Events[3].Details, drainer.NodeDrainEventDetailDeadlined) }) } } diff --git a/nomad/drainer_shims.go b/nomad/drainer_shims.go index 0eb8c43a2..c9795d5ac 100644 --- a/nomad/drainer_shims.go +++ b/nomad/drainer_shims.go @@ -8,15 +8,19 @@ type drainerShim struct { s *Server } -func (d drainerShim) NodesDrainComplete(nodes []string) (uint64, error) { +func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error) { args := &structs.BatchNodeUpdateDrainRequest{ Updates: make(map[string]*structs.DrainUpdate, len(nodes)), + NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)), WriteRequest: structs.WriteRequest{Region: d.s.config.Region}, } update := &structs.DrainUpdate{} for _, node := range nodes { args.Updates[node] = update + if event != nil { + args.NodeEvents[node] = event + } } resp, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)