cli: add color to drain output

This commit is contained in:
Michael Schurter
2018-03-30 11:07:40 -07:00
parent 4d22c08b09
commit 55b98ee299
4 changed files with 91 additions and 56 deletions

View File

@@ -85,25 +85,51 @@ func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q
return &resp, nil
}
// MonitorMsgLevels represents the severity log level of a MonitorMessage.
type MonitorMsgLevel int
const (
MonitorMsgLevelInfo MonitorMsgLevel = 0
MonitorMsgLevelWarn MonitorMsgLevel = 1
MonitorMsgLevelError MonitorMsgLevel = 2
)
// MonitorMessage contains a message and log level.
type MonitorMessage struct {
Level MonitorMsgLevel
Message string
}
// Messagef formats a new MonitorMessage.
func Messagef(lvl MonitorMsgLevel, msg string, args ...interface{}) *MonitorMessage {
return &MonitorMessage{
Level: lvl,
Message: fmt.Sprintf(msg, args...),
}
}
func (m *MonitorMessage) String() string {
return m.Message
}
// MonitorDrain emits drain related events on the returned string channel. The
// channel will be closed when all allocations on the draining node have
// stopped or the context is canceled.
func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan string {
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan *MonitorMessage {
outCh := make(chan *MonitorMessage, 8)
nodeCh := make(chan *MonitorMessage, 1)
allocCh := make(chan *MonitorMessage, 8)
// Multiplex node and alloc chans onto outCh. This goroutine closes
// outCh when other chans have been closed or context canceled.
multiplexCtx, cancel := context.WithCancel(ctx)
go n.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
go n.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh)
// Monitor node for updates
go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh, errCh)
go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh)
// Monitor allocs on node for updates
go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh, errCh)
go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh)
return outCh
}
@@ -112,13 +138,14 @@ func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, i
// Closes out chan when either the context is canceled, both update chans are
// closed, or an error occurs.
func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
outCh chan<- string, errCh, nodeCh, allocCh <-chan string) {
outCh chan<- *MonitorMessage, nodeCh, allocCh <-chan *MonitorMessage) {
defer cancel()
defer close(outCh)
nodeOk := true
allocOk := true
var msg string
var msg *MonitorMessage
for {
// If both chans have been closed, close the output chan
if !nodeOk && !allocOk {
@@ -138,19 +165,11 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
allocCh = nil
}
case errMsg := <-errCh:
// Error occurred, exit after sending
select {
case outCh <- errMsg:
case <-ctx.Done():
}
return
case <-ctx.Done():
return
}
if msg == "" {
if msg == nil {
continue
}
@@ -159,33 +178,37 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
case <-ctx.Done():
return
}
// Abort on error messages
if msg.Level == MonitorMsgLevelError {
return
}
}
}
// monitorDrainNode emits node updates on nodeCh and closes the channel when
// the node has finished draining.
func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh, errCh chan<- string) {
func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh chan<- *MonitorMessage) {
defer close(nodeCh)
var lastStrategy *DrainStrategy
q := QueryOptions{
AllowStale: true,
WaitIndex: index,
}
for {
q := QueryOptions{
AllowStale: true,
WaitIndex: index,
}
node, meta, err := n.Info(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring node: %v", err)
msg := Messagef(MonitorMsgLevelError, "Error monitoring node: %v", err)
select {
case errCh <- msg:
case nodeCh <- msg:
case <-ctx.Done():
}
return
}
if node.DrainStrategy == nil {
msg := fmt.Sprintf("Node %q drain complete", nodeID)
msg := Messagef(MonitorMsgLevelWarn, "Node %q drain complete", nodeID)
select {
case nodeCh <- msg:
case <-ctx.Done():
@@ -195,7 +218,7 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint6
// DrainStrategy changed
if lastStrategy != nil && !node.DrainStrategy.Equal(lastStrategy) {
msg := fmt.Sprintf("Node %q drain updated: %s", nodeID, node.DrainStrategy)
msg := Messagef(MonitorMsgLevelInfo, "Node %q drain updated: %s", nodeID, node.DrainStrategy)
select {
case nodeCh <- msg:
case <-ctx.Done():
@@ -212,7 +235,7 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint6
// monitorDrainAllocs emits alloc updates on allocCh and closes the channel
// when the node has finished draining.
func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh, errCh chan<- string) {
func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh chan<- *MonitorMessage) {
defer close(allocCh)
q := QueryOptions{AllowStale: true}
@@ -221,9 +244,9 @@ func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys
for {
allocs, meta, err := n.Allocations(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring allocations: %v", err)
msg := Messagef(MonitorMsgLevelError, "Error monitoring allocations: %v", err)
select {
case errCh <- msg:
case allocCh <- msg:
case <-ctx.Done():
}
return
@@ -265,7 +288,7 @@ func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys
if msg != "" {
select {
case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg):
case allocCh <- Messagef(MonitorMsgLevelInfo, "Alloc %q %s", a.ID, msg):
case <-ctx.Done():
return
}
@@ -289,7 +312,7 @@ func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys
// Exit if all allocs are terminal
if runningAllocs == 0 {
msg := fmt.Sprintf("All allocations on node %q have stopped.", nodeID)
msg := Messagef(MonitorMsgLevelWarn, "All allocations on node %q have stopped.", nodeID)
select {
case allocCh <- msg:
case <-ctx.Done():

View File

@@ -389,29 +389,28 @@ func TestNodes_MonitorDrain_Multiplex_Bad(t *testing.T) {
// don't need to use a full Client
var nodeClient *Nodes
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
outCh := make(chan *MonitorMessage, 8)
nodeCh := make(chan *MonitorMessage, 1)
allocCh := make(chan *MonitorMessage, 8)
exitedCh := make(chan struct{})
go func() {
defer close(exitedCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh)
}()
// Fake an alloc update
msg := "alloc update"
msg := Messagef(0, "alloc update")
allocCh <- msg
require.Equal(msg, <-outCh)
// Fake a node update
msg = "node update"
msg = Messagef(0, "node update")
nodeCh <- msg
require.Equal(msg, <-outCh)
// Fake an error that should shut everything down
msg = "fake error"
errCh <- msg
msg = Messagef(MonitorMsgLevelError, "fake error")
nodeCh <- msg
require.Equal(msg, <-outCh)
_, ok := <-exitedCh
@@ -442,18 +441,17 @@ func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) {
// don't need to use a full Client
var nodeClient *Nodes
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
outCh := make(chan *MonitorMessage, 8)
nodeCh := make(chan *MonitorMessage, 1)
allocCh := make(chan *MonitorMessage, 8)
exitedCh := make(chan struct{})
go func() {
defer close(exitedCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh)
}()
// Fake a node updating and finishing
msg := "node update"
msg := Messagef(MonitorMsgLevelInfo, "node update")
nodeCh <- msg
close(nodeCh)
require.Equal(msg, <-outCh)
@@ -474,7 +472,7 @@ func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) {
}
// Fake an alloc update coming in after the node monitor has finished
msg = "alloc update"
msg = Messagef(0, "alloc update")
allocCh <- msg
require.Equal(msg, <-outCh)

View File

@@ -280,11 +280,19 @@ func (c *NodeDrainCommand) Run(args []string) int {
}
if enable && !detach {
c.Ui.Output("(Ctrl-C to stop monitoring: will not cancel the node drain)")
c.Ui.Output(fmt.Sprintf("%s Node %q drain strategy set", formatTime(time.Now()), node.ID))
now := time.Now()
c.Ui.Warn(fmt.Sprintf("%s: Ctrl-C to stop monitoring: will not cancel the node drain", formatTime(now)))
c.Ui.Output(fmt.Sprintf("%s: Node %q drain strategy set", formatTime(now), node.ID))
outCh := client.Nodes().MonitorDrain(context.Background(), node.ID, meta.LastIndex, ignoreSystem)
for msg := range outCh {
c.Ui.Output(fmt.Sprintf("%s %s", formatTime(time.Now()), msg))
switch msg.Level {
case api.MonitorMsgLevelWarn:
c.Ui.Warn(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg))
case api.MonitorMsgLevelError:
c.Ui.Error(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg))
default:
c.Ui.Output(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg))
}
}
}

View File

@@ -1,6 +1,7 @@
package command
import (
"bytes"
"fmt"
"strings"
"testing"
@@ -205,15 +206,20 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
t.Fatalf("err: %v", err)
})
ui := new(cli.MockUi)
outBuf := bytes.NewBuffer(nil)
ui := &cli.BasicUi{
Reader: bytes.NewReader(nil),
Writer: outBuf,
ErrorWriter: outBuf,
}
cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}}
args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s", "-ignore-system"}
t.Logf("Running: %v", args)
if code := cmd.Run(args); code != 0 {
t.Fatalf("expected exit 0, got: %d\n%s", code, ui.OutputWriter.String())
t.Fatalf("expected exit 0, got: %d\n%s", code, outBuf.String())
}
out := ui.OutputWriter.String()
out := outBuf.String()
t.Logf("Output:\n%s", out)
require.Contains(out, "drain complete")