Drain complete

This commit is contained in:
Alex Dadgar
2018-05-10 17:22:06 -07:00
parent 200e4b3cab
commit bfdde41f95
5 changed files with 82 additions and 7 deletions

View File

@@ -30,13 +30,21 @@ const (
// NodeDeadlineCoalesceWindow is the duration in which deadlining nodes will
// be coalesced together
NodeDeadlineCoalesceWindow = 5 * time.Second
// NodeDrainEventComplete is used to indicate that the node drain is
// finished.
NodeDrainEventComplete = "Node drain complete"
// NodeDrainEventDetailDeadlined is the key to use when the drain is
// complete because a deadline. The acceptable values are "true" and "false"
NodeDrainEventDetailDeadlined = "deadline_reached"
)
// RaftApplier contains methods for applying the raft requests required by the
// NodeDrainer.
type RaftApplier interface {
AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error)
NodesDrainComplete(nodes []string) (uint64, error)
NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error)
}
// NodeTracker is the interface to notify an object that is tracking draining
@@ -254,10 +262,16 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
n.l.RUnlock()
n.batchDrainAllocs(forceStop)
// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete).
AddDetail(NodeDrainEventDetailDeadlined, "true")
// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, nodes) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
}
@@ -324,10 +338,15 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
}
}
// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete)
// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, done) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
}

View File

@@ -102,7 +102,12 @@ func (n *NodeDrainer) Update(node *structs.Node) {
}
}
index, err := n.raft.NodesDrainComplete([]string{node.ID})
// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete)
index, err := n.raft.NodesDrainComplete([]string{node.ID}, event)
if err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err)
} else {

View File

@@ -97,7 +97,7 @@ func TestNodeDrainWatcher_Remove(t *testing.T) {
require.Equal(n, tracked[n.ID])
// Change the node to be not draining and wait for it to be untracked
require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false))
require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, nil))
testutil.WaitForResult(func() (bool, error) {
return len(m.Events) == 2, nil
}, func(err error) {
@@ -175,7 +175,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) {
// Change the node to have a new spec
s2 := n.DrainStrategy.Copy()
s2.Deadline += time.Hour
require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false))
require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, nil))
// Wait for it to be updated
testutil.WaitForResult(func() (bool, error) {

View File

@@ -12,6 +12,7 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/drainer"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@@ -212,6 +213,12 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}
func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
@@ -300,6 +307,13 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}
func TestDrainer_DrainEmptyNode(t *testing.T) {
@@ -343,6 +357,12 @@ func TestDrainer_DrainEmptyNode(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}
func TestDrainer_AllTypes_Deadline(t *testing.T) {
@@ -500,6 +520,13 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) {
}
}
require.True(serviceMax < batchMax)
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}
// Test that drain is unset when batch jobs naturally finish
@@ -659,6 +686,12 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}
func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
@@ -824,6 +857,13 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}
// Test that transitions to force drain work.
@@ -962,6 +1002,13 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 4)
require.Equal(drainer.NodeDrainEventComplete, node.Events[3].Message)
require.Contains(node.Events[3].Details, drainer.NodeDrainEventDetailDeadlined)
})
}
}

View File

@@ -8,15 +8,19 @@ type drainerShim struct {
s *Server
}
func (d drainerShim) NodesDrainComplete(nodes []string) (uint64, error) {
func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error) {
args := &structs.BatchNodeUpdateDrainRequest{
Updates: make(map[string]*structs.DrainUpdate, len(nodes)),
NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)),
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
}
update := &structs.DrainUpdate{}
for _, node := range nodes {
args.Updates[node] = update
if event != nil {
args.NodeEvents[node] = event
}
}
resp, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)