drain: block cli until all allocs stop

Before the drain CLI would block until the node was marked as completing
drain operations. While technically correct, it could lead operators (or
more likely: scripts) to shutdown drained nodes before all of its
allocations had *actually* terminated.

This change makes the CLI block until all allocations have terminated
(unless ignoring system jobs).
This commit is contained in:
Michael Schurter
2018-03-28 14:01:54 -07:00
parent 00b358553d
commit 4cefb6f06a
4 changed files with 474 additions and 158 deletions

View File

@@ -1,6 +1,7 @@
package api
import (
"context"
"fmt"
"sort"
"time"
@@ -84,6 +85,234 @@ func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q
return &resp, nil
}
// MonitorDrain emits drain related events on the returned string channel. The
// channel will be closed when all allocations on the draining node have
// stopped or the context is canceled.
func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan string {
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
// Multiplex node and alloc chans onto outCh. This goroutine closes
// outCh when other chans have been closed or context canceled.
multiplexCtx, cancel := context.WithCancel(ctx)
go n.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
// Monitor node for updates
go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh, errCh)
// Monitor allocs on node for updates
go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh, errCh)
return outCh
}
// monitorDrainMultiplex multiplexes node and alloc updates onto the out chan.
// Closes out chan when either the context is canceled, both update chans are
// closed, or an error occurs.
func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
outCh chan<- string, errCh, nodeCh, allocCh <-chan string) {
defer cancel()
defer close(outCh)
nodeOk := true
allocOk := true
var msg string
for {
// If both chans have been closed, close the output chan
if !nodeOk && !allocOk {
return
}
select {
case msg, nodeOk = <-nodeCh:
if !nodeOk {
// nil chan to prevent further recvs
nodeCh = nil
}
case msg, allocOk = <-allocCh:
if !allocOk {
// nil chan to prevent further recvs
allocCh = nil
}
case errMsg := <-errCh:
// Error occurred, exit after sending
select {
case outCh <- errMsg:
case <-ctx.Done():
}
return
case <-ctx.Done():
return
}
if msg == "" {
continue
}
select {
case outCh <- msg:
case <-ctx.Done():
return
}
}
}
// monitorDrainNode emits node updates on nodeCh and closes the channel when
// the node has finished draining.
func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh, errCh chan<- string) {
defer close(nodeCh)
var lastStrategy *DrainStrategy
for {
q := QueryOptions{
AllowStale: true,
WaitIndex: index,
}
node, meta, err := n.Info(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring node: %v", err)
select {
case errCh <- msg:
case <-ctx.Done():
}
return
}
if node.DrainStrategy == nil {
msg := fmt.Sprintf("Node %q drain complete", nodeID)
select {
case nodeCh <- msg:
case <-ctx.Done():
}
return
}
// DrainStrategy changed
if lastStrategy != nil && !node.DrainStrategy.Equal(lastStrategy) {
msg := fmt.Sprintf("Node %q drain updated: %s", nodeID, node.DrainStrategy)
select {
case nodeCh <- msg:
case <-ctx.Done():
return
}
}
lastStrategy = node.DrainStrategy
// Drain still ongoing, update index and block for updates
index = meta.LastIndex
}
}
// monitorDrainAllocs emits alloc updates on allocCh and closes the channel
// when the node has finished draining.
func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh, errCh chan<- string) {
defer close(allocCh)
// Build initial alloc state
q := QueryOptions{AllowStale: true}
allocs, meta, err := n.Allocations(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring allocations: %v", err)
select {
case errCh <- msg:
case <-ctx.Done():
}
return
}
initial := make(map[string]*Allocation, len(allocs))
for _, a := range allocs {
initial[a.ID] = a
}
for {
q.WaitIndex = meta.LastIndex
allocs, meta, err = n.Allocations(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring allocations: %v", err)
select {
case errCh <- msg:
case <-ctx.Done():
}
return
}
runningAllocs := 0
for _, a := range allocs {
// Get previous version of alloc
orig, existing := initial[a.ID]
// Update local alloc state
initial[a.ID] = a
migrating := a.DesiredTransition.ShouldMigrate()
var msg string
switch {
case !existing:
// Should only be possible if response
// from initial Allocations call was
// stale. No need to output
case orig.ClientStatus != a.ClientStatus:
// Alloc status has changed; output
msg = fmt.Sprintf("status %s -> %s", orig.ClientStatus, a.ClientStatus)
case migrating && !orig.DesiredTransition.ShouldMigrate():
// Alloc was marked for migration
msg = "marked for migration"
case migrating && (orig.DesiredStatus != a.DesiredStatus) && a.DesiredStatus == structs.AllocDesiredStatusStop:
// Alloc has already been marked for migration and is now being stopped
msg = "draining"
case a.NextAllocation != "" && orig.NextAllocation == "":
// Alloc has been replaced by another allocation
msg = fmt.Sprintf("replaced by allocation %q", a.NextAllocation)
}
if msg != "" {
select {
case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg):
case <-ctx.Done():
return
}
}
// Ignore malformed allocs
if a.Job == nil || a.Job.Type == nil {
continue
}
// Track how many allocs are still running
if ignoreSys && a.Job.Type != nil && *a.Job.Type == structs.JobTypeSystem {
continue
}
switch a.ClientStatus {
case structs.AllocClientStatusPending, structs.AllocClientStatusRunning:
runningAllocs++
}
}
// Exit if all allocs are terminal
if runningAllocs == 0 {
msg := fmt.Sprintf("All allocations on node %q have stopped.", nodeID)
select {
case allocCh <- msg:
case <-ctx.Done():
}
return
}
}
}
// NodeUpdateEligibilityRequest is used to update the drain specification for a node.
type NodeUpdateEligibilityRequest struct {
// NodeID is the node to update the drain specification for.
@@ -220,6 +449,32 @@ type DrainSpec struct {
IgnoreSystemJobs bool
}
func (d *DrainStrategy) Equal(o *DrainStrategy) bool {
if d == nil || o == nil {
return d == o
}
if d.ForceDeadline != o.ForceDeadline {
return false
}
if d.Deadline != o.Deadline {
return false
}
if d.IgnoreSystemJobs != o.IgnoreSystemJobs {
return false
}
return true
}
// String returns a human readable version of the drain strategy.
func (d *DrainStrategy) String() string {
if d.IgnoreSystemJobs {
return fmt.Sprintf("drain ignoring system jobs and deadline at %s", d.ForceDeadline)
}
return fmt.Sprintf("drain with deadline at %s", d.ForceDeadline)
}
const (
NodeEventSubsystemDrain = "Drain"
NodeEventSubsystemDriver = "Driver"

View File

@@ -1,6 +1,7 @@
package api
import (
"context"
"fmt"
"reflect"
"sort"
@@ -375,3 +376,160 @@ func TestNodes_GcAlloc(t *testing.T) {
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
// Unittest monitorDrainMultiplex when an error occurs
func TestNodes_MonitorDrain_Multiplex_Bad(t *testing.T) {
t.Parallel()
require := require.New(t)
ctx := context.Background()
multiplexCtx, cancel := context.WithCancel(ctx)
// monitorDrainMultiplex doesn't require anything on *Nodes, so we
// don't need to use a full Client
var nodeClient *Nodes
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
exitedCh := make(chan struct{})
go func() {
defer close(exitedCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
}()
// Fake an alloc update
msg := "alloc update"
allocCh <- msg
require.Equal(msg, <-outCh)
// Fake a node update
msg = "node update"
nodeCh <- msg
require.Equal(msg, <-outCh)
// Fake an error that should shut everything down
msg = "fake error"
errCh <- msg
require.Equal(msg, <-outCh)
_, ok := <-exitedCh
require.False(ok)
_, ok = <-outCh
require.False(ok)
// Exiting should also cancel the context that would be passed to the
// node & alloc watchers
select {
case <-multiplexCtx.Done():
case <-time.After(100 * time.Millisecond):
t.Fatalf("context wasn't canceled")
}
}
// Unittest monitorDrainMultiplex when drain finishes
func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) {
t.Parallel()
require := require.New(t)
ctx := context.Background()
multiplexCtx, cancel := context.WithCancel(ctx)
// monitorDrainMultiplex doesn't require anything on *Nodes, so we
// don't need to use a full Client
var nodeClient *Nodes
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
exitedCh := make(chan struct{})
go func() {
defer close(exitedCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
}()
// Fake a node updating and finishing
msg := "node update"
nodeCh <- msg
close(nodeCh)
require.Equal(msg, <-outCh)
// Nothing else should have exited yet
select {
case msg, ok := <-outCh:
if ok {
t.Fatalf("unexpected output: %q", msg)
}
t.Fatalf("out channel closed unexpectedly")
case <-exitedCh:
t.Fatalf("multiplexer exited unexpectedly")
case <-multiplexCtx.Done():
t.Fatalf("multiplexer context canceled unexpectedly")
case <-time.After(10 * time.Millisecond):
t.Logf("multiplexer still running as expected")
}
// Fake an alloc update coming in after the node monitor has finished
msg = "alloc update"
allocCh <- msg
require.Equal(msg, <-outCh)
// Closing the allocCh should cause everything to exit
close(allocCh)
_, ok := <-exitedCh
require.False(ok)
_, ok = <-outCh
require.False(ok)
// Exiting should also cancel the context that would be passed to the
// node & alloc watchers
select {
case <-multiplexCtx.Done():
case <-time.After(100 * time.Millisecond):
t.Fatalf("context wasn't canceled")
}
}
func TestNodes_DrainStrategy_Equal(t *testing.T) {
t.Parallel()
require := require.New(t)
// nil
var d *DrainStrategy
require.True(d.Equal(nil))
o := &DrainStrategy{}
require.False(d.Equal(o))
require.False(o.Equal(d))
d = &DrainStrategy{}
require.True(d.Equal(o))
// ForceDeadline
d.ForceDeadline = time.Now()
require.False(d.Equal(o))
o.ForceDeadline = d.ForceDeadline
require.True(d.Equal(o))
// Deadline
d.Deadline = 1
require.False(d.Equal(o))
o.Deadline = 1
require.True(d.Equal(o))
// IgnoreSystemJobs
d.IgnoreSystemJobs = true
require.False(d.Equal(o))
o.IgnoreSystemJobs = true
require.True(d.Equal(o))
}

View File

@@ -1,13 +1,13 @@
package command
import (
"context"
"fmt"
"strings"
"time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/posener/complete"
)
@@ -271,160 +271,18 @@ func (c *NodeDrainCommand) Run(args []string) int {
return 1
}
c.Ui.Output(fmt.Sprintf("Node %q drain strategy set", node.ID))
if enable {
c.Ui.Output(fmt.Sprintf("Node %q drain strategy set", node.ID))
} else {
c.Ui.Output(fmt.Sprintf("Node %q drain strategy unset", node.ID))
}
if enable && !detach {
if err := monitorDrain(c.Ui.Output, client.Nodes(), node.ID, meta.LastIndex); err != nil {
c.Ui.Error(fmt.Sprintf("Error monitoring drain: %v", err))
return 1
outCh := client.Nodes().MonitorDrain(context.Background(), node.ID, meta.LastIndex, ignoreSystem)
for msg := range outCh {
c.Ui.Output(msg)
}
c.Ui.Output(fmt.Sprintf("Node %q drain complete", nodeID))
}
return 0
}
// monitorDrain monitors the node being drained and exits when the node has
// finished draining.
func monitorDrain(output func(string), nodeClient *api.Nodes, nodeID string, index uint64) error {
doneCh := make(chan struct{})
defer close(doneCh)
// Errors from either goroutine are sent here
errCh := make(chan error, 1)
// Monitor node changes and close chan when drain is complete
nodeCh := make(chan struct{})
go func() {
for {
q := api.QueryOptions{
AllowStale: true,
WaitIndex: index,
}
node, meta, err := nodeClient.Info(nodeID, &q)
if err != nil {
select {
case errCh <- err:
case <-doneCh:
}
return
}
if node.DrainStrategy == nil {
close(nodeCh)
return
}
// Drain still ongoing
index = meta.LastIndex
}
}()
// Monitor alloc changes
allocCh := make(chan string, 1)
go func() {
allocs, meta, err := nodeClient.Allocations(nodeID, nil)
if err != nil {
select {
case errCh <- err:
case <-doneCh:
}
return
}
initial := make(map[string]*api.Allocation, len(allocs))
for _, a := range allocs {
initial[a.ID] = a
}
for {
q := api.QueryOptions{
AllowStale: true,
WaitIndex: meta.LastIndex,
}
allocs, meta, err = nodeClient.Allocations(nodeID, &q)
if err != nil {
select {
case errCh <- err:
case <-doneCh:
}
return
}
for _, a := range allocs {
// Get previous version of alloc
orig, ok := initial[a.ID]
// Update local alloc state
initial[a.ID] = a
migrating := a.DesiredTransition.ShouldMigrate()
msg := ""
switch {
case !ok:
// Should only be possible if response
// from initial Allocations call was
// stale. No need to output
case orig.ClientStatus != a.ClientStatus:
// Alloc status has changed; output
msg = fmt.Sprintf("status %s -> %s", orig.ClientStatus, a.ClientStatus)
case migrating && !orig.DesiredTransition.ShouldMigrate():
// Alloc was marked for migration
msg = "marked for migration"
case migrating && (orig.DesiredStatus != a.DesiredStatus) && a.DesiredStatus == structs.AllocDesiredStatusStop:
// Alloc has already been marked for migration and is now being stopped
msg = "draining"
case a.NextAllocation != "" && orig.NextAllocation == "":
// Alloc has been replaced by another allocation
msg = fmt.Sprintf("replaced by allocation %q", a.NextAllocation)
}
if msg != "" {
select {
case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg):
case <-doneCh:
return
}
}
}
}
}()
done := false
for !done {
select {
case err := <-errCh:
return err
case <-nodeCh:
done = true
case msg := <-allocCh:
output(msg)
}
}
// Loop on alloc messages for a bit longer as we may have gotten the
// "node done" first (since the watchers run concurrently the events
// may be received out of order)
deadline := 500 * time.Millisecond
timer := time.NewTimer(deadline)
for {
select {
case err := <-errCh:
return err
case msg := <-allocCh:
output(msg)
if !timer.Stop() {
<-timer.C
}
timer.Reset(deadline)
case <-timer.C:
// No events within deadline, exit
return nil
}
}
}

View File

@@ -119,16 +119,17 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
t.Fatalf("err: %s", err)
})
// Register a job to create an alloc to drain
count := 3
// Register a service job to create allocs to drain
serviceCount := 3
job := &api.Job{
ID: helper.StringToPtr("mock_service"),
Name: helper.StringToPtr("mock_service"),
Datacenters: []string{"dc1"},
Type: helper.StringToPtr("service"),
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("mock_group"),
Count: &count,
Count: &serviceCount,
Migrate: &api.MigrateStrategy{
MaxParallel: helper.IntToPtr(1),
HealthCheck: helper.StringToPtr("task_states"),
@@ -142,6 +143,10 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
Config: map[string]interface{}{
"run_for": "10m",
},
Resources: &api.Resources{
CPU: helper.IntToPtr(50),
MemoryMB: helper.IntToPtr(50),
},
},
},
},
@@ -151,14 +156,44 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
_, _, err := client.Jobs().Register(job, nil)
require.Nil(err)
// Register a system job to ensure it is ignored during draining
sysjob := &api.Job{
ID: helper.StringToPtr("mock_system"),
Name: helper.StringToPtr("mock_system"),
Datacenters: []string{"dc1"},
Type: helper.StringToPtr("system"),
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("mock_sysgroup"),
Count: helper.IntToPtr(1),
Tasks: []*api.Task{
{
Name: "mock_systask",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "10m",
},
Resources: &api.Resources{
CPU: helper.IntToPtr(50),
MemoryMB: helper.IntToPtr(50),
},
},
},
},
},
}
_, _, err = client.Jobs().Register(sysjob, nil)
require.Nil(err)
var allocs []*api.Allocation
testutil.WaitForResult(func() (bool, error) {
allocs, _, err = client.Nodes().Allocations(nodeID, nil)
if err != nil {
return false, err
}
if len(allocs) != count {
return false, fmt.Errorf("number of allocs %d != count (%d)", len(allocs), count)
if len(allocs) != serviceCount+1 {
return false, fmt.Errorf("number of allocs %d != count (%d)", len(allocs), serviceCount+1)
}
for _, a := range allocs {
if a.ClientStatus != "running" {
@@ -172,10 +207,10 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
ui := new(cli.MockUi)
cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}}
args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s"}
args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s", "-ignore-system"}
t.Logf("Running: %v", args)
if code := cmd.Run(args); code != 0 {
t.Fatalf("expected exit 0, got: %d", code)
t.Fatalf("expected exit 0, got: %d\n%s", code, ui.OutputWriter.String())
}
out := ui.OutputWriter.String()
@@ -183,9 +218,19 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
require.Contains(out, "drain complete")
for _, a := range allocs {
if *a.Job.Type == "system" {
if strings.Contains(out, a.ID) {
t.Fatalf("output should not contain system alloc %q", a.ID)
}
continue
}
require.Contains(out, fmt.Sprintf("Alloc %q marked for migration", a.ID))
require.Contains(out, fmt.Sprintf("Alloc %q draining", a.ID))
}
expected := fmt.Sprintf("All allocations on node %q have stopped.\n", nodeID)
if !strings.HasSuffix(out, expected) {
t.Fatalf("expected output to end with:\n%s", expected)
}
}
func TestNodeDrainCommand_Fails(t *testing.T) {