diff --git a/api/nodes.go b/api/nodes.go index bf7709fea..8612b70e6 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -124,12 +124,12 @@ func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, i allocCh := make(chan *MonitorMessage, 8) // Multiplex node and alloc chans onto outCh. This goroutine closes - // outCh when other chans have been closed or context canceled. + // outCh when other chans have been closed. multiplexCtx, cancel := context.WithCancel(ctx) go n.monitorDrainMultiplex(multiplexCtx, cancel, outCh, nodeCh, allocCh) // Monitor node for updates - go n.monitorDrainNode(multiplexCtx, cancel, nodeID, index, nodeCh) + go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh) // Monitor allocs on node for updates go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh) @@ -160,12 +160,14 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), if !nodeOk { // nil chan to prevent further recvs nodeCh = nil + continue } case msg, allocOk = <-allocCh: if !allocOk { // nil chan to prevent further recvs allocCh = nil + continue } case <-ctx.Done(): @@ -179,14 +181,6 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), select { case outCh <- msg: case <-ctx.Done(): - - // If we are exiting but we have a message, attempt to send it - // so we don't lose a message but do not block. - select { - case outCh <- msg: - default: - } - return } @@ -199,12 +193,12 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), // monitorDrainNode emits node updates on nodeCh and closes the channel when // the node has finished draining. -func (n *Nodes) monitorDrainNode(ctx context.Context, cancel func(), - nodeID string, index uint64, nodeCh chan<- *MonitorMessage) { +func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, + index uint64, nodeCh chan<- *MonitorMessage) { + defer close(nodeCh) var lastStrategy *DrainStrategy - var strategyChanged bool q := QueryOptions{ AllowStale: true, WaitIndex: index, @@ -222,12 +216,7 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, cancel func(), if node.DrainStrategy == nil { var msg *MonitorMessage - if strategyChanged { - msg = Messagef(MonitorMsgLevelInfo, "Node %q has marked all allocations for migration", nodeID) - } else { - msg = Messagef(MonitorMsgLevelInfo, "No drain strategy set for node %s", nodeID) - defer cancel() - } + msg = Messagef(MonitorMsgLevelInfo, "Drain complete for node %s", nodeID) select { case nodeCh <- msg: case <-ctx.Done(): @@ -254,7 +243,6 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, cancel func(), } lastStrategy = node.DrainStrategy - strategyChanged = true // Drain still ongoing, update index and block for updates q.WaitIndex = meta.LastIndex diff --git a/command/node_drain.go b/command/node_drain.go index 040b16ee1..d86fc9f1c 100644 --- a/command/node_drain.go +++ b/command/node_drain.go @@ -242,7 +242,7 @@ func (c *NodeDrainCommand) Run(args []string) int { } // Prefix lookup matched a single node - node, _, err := client.Nodes().Info(nodes[0].ID, nil) + node, meta, err := client.Nodes().Info(nodes[0].ID, nil) if err != nil { c.Ui.Error(fmt.Sprintf("Error toggling drain mode: %s", err)) return 1 @@ -250,8 +250,12 @@ func (c *NodeDrainCommand) Run(args []string) int { // If monitoring the drain start the montior and return when done if monitor { + if node.DrainStrategy == nil { + c.Ui.Warn("No drain strategy set") + return 0 + } c.Ui.Info(fmt.Sprintf("%s: Monitoring node %q: Ctrl-C to detach monitoring", formatTime(time.Now()), node.ID)) - c.monitorDrain(client, context.Background(), node, 0, ignoreSystem) + c.monitorDrain(client, context.Background(), node, meta.LastIndex, ignoreSystem) return 0 } @@ -291,7 +295,7 @@ func (c *NodeDrainCommand) Run(args []string) int { } // Toggle node draining - meta, err := client.Nodes().UpdateDrain(node.ID, spec, !keepIneligible, nil) + updateMeta, err := client.Nodes().UpdateDrain(node.ID, spec, !keepIneligible, nil) if err != nil { c.Ui.Error(fmt.Sprintf("Error updating drain specification: %s", err)) return 1 @@ -309,7 +313,7 @@ func (c *NodeDrainCommand) Run(args []string) int { now := time.Now() c.Ui.Info(fmt.Sprintf("%s: Ctrl-C to stop monitoring: will not cancel the node drain", formatTime(now))) c.Ui.Output(fmt.Sprintf("%s: Node %q drain strategy set", formatTime(now), node.ID)) - c.monitorDrain(client, context.Background(), node, meta.LastIndex, ignoreSystem) + c.monitorDrain(client, context.Background(), node, updateMeta.LastIndex, ignoreSystem) } return 0 } diff --git a/command/node_drain_test.go b/command/node_drain_test.go index 9156096e0..acdf9775d 100644 --- a/command/node_drain_test.go +++ b/command/node_drain_test.go @@ -217,9 +217,7 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}} args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s", "-ignore-system"} t.Logf("Running: %v", args) - if code := cmd.Run(args); code != 0 { - t.Fatalf("expected exit 0, got: %d\n%s", code, outBuf.String()) - } + require.Zero(cmd.Run(args)) out := outBuf.String() t.Logf("Output:\n%s", out) @@ -228,7 +226,7 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { // monitor goroutines may start only after some or all the allocs have been // migrated. if !testutil.IsTravis() { - require.Contains(out, "marked all allocations for migration") + require.Contains(out, "Drain complete for node") for _, a := range allocs { if *a.Job.Type == "system" { if strings.Contains(out, a.ID) { @@ -250,9 +248,7 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { outBuf.Reset() args = []string{"-address=" + url, "-self", "-monitor", "-ignore-system"} t.Logf("Running: %v", args) - if code := cmd.Run(args); code != 0 { - t.Fatalf("expected exit 0, got: %d\n%s", code, outBuf.String()) - } + require.Zero(cmd.Run(args)) out = outBuf.String() t.Logf("Output:\n%s", out) @@ -298,7 +294,6 @@ func TestNodeDrainCommand_Monitor_NoDrainStrategy(t *testing.T) { out := outBuf.String() t.Logf("Output:\n%s", out) - require.Contains(out, "Monitoring node") require.Contains(out, "No drain strategy set") }