From 55b98ee2992ec01ab034eb30df32d59d949aa869 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 30 Mar 2018 11:07:40 -0700 Subject: [PATCH] cli: add color to drain output --- api/nodes.go | 91 ++++++++++++++++++++++++-------------- api/nodes_test.go | 30 ++++++------- command/node_drain.go | 14 ++++-- command/node_drain_test.go | 12 +++-- 4 files changed, 91 insertions(+), 56 deletions(-) diff --git a/api/nodes.go b/api/nodes.go index 4633efe88..d3f8def54 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -85,25 +85,51 @@ func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q return &resp, nil } +// MonitorMsgLevels represents the severity log level of a MonitorMessage. +type MonitorMsgLevel int + +const ( + MonitorMsgLevelInfo MonitorMsgLevel = 0 + MonitorMsgLevelWarn MonitorMsgLevel = 1 + MonitorMsgLevelError MonitorMsgLevel = 2 +) + +// MonitorMessage contains a message and log level. +type MonitorMessage struct { + Level MonitorMsgLevel + Message string +} + +// Messagef formats a new MonitorMessage. +func Messagef(lvl MonitorMsgLevel, msg string, args ...interface{}) *MonitorMessage { + return &MonitorMessage{ + Level: lvl, + Message: fmt.Sprintf(msg, args...), + } +} + +func (m *MonitorMessage) String() string { + return m.Message +} + // MonitorDrain emits drain related events on the returned string channel. The // channel will be closed when all allocations on the draining node have // stopped or the context is canceled. -func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan string { - outCh := make(chan string, 8) - errCh := make(chan string, 1) - nodeCh := make(chan string, 1) - allocCh := make(chan string, 8) +func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan *MonitorMessage { + outCh := make(chan *MonitorMessage, 8) + nodeCh := make(chan *MonitorMessage, 1) + allocCh := make(chan *MonitorMessage, 8) // Multiplex node and alloc chans onto outCh. This goroutine closes // outCh when other chans have been closed or context canceled. multiplexCtx, cancel := context.WithCancel(ctx) - go n.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh) + go n.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh) // Monitor node for updates - go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh, errCh) + go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh) // Monitor allocs on node for updates - go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh, errCh) + go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh) return outCh } @@ -112,13 +138,14 @@ func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, i // Closes out chan when either the context is canceled, both update chans are // closed, or an error occurs. func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), - outCh chan<- string, errCh, nodeCh, allocCh <-chan string) { + outCh chan<- *MonitorMessage, nodeCh, allocCh <-chan *MonitorMessage) { defer cancel() defer close(outCh) + nodeOk := true allocOk := true - var msg string + var msg *MonitorMessage for { // If both chans have been closed, close the output chan if !nodeOk && !allocOk { @@ -138,19 +165,11 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), allocCh = nil } - case errMsg := <-errCh: - // Error occurred, exit after sending - select { - case outCh <- errMsg: - case <-ctx.Done(): - } - return - case <-ctx.Done(): return } - if msg == "" { + if msg == nil { continue } @@ -159,33 +178,37 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), case <-ctx.Done(): return } + + // Abort on error messages + if msg.Level == MonitorMsgLevelError { + return + } } } // monitorDrainNode emits node updates on nodeCh and closes the channel when // the node has finished draining. -func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh, errCh chan<- string) { +func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh chan<- *MonitorMessage) { defer close(nodeCh) var lastStrategy *DrainStrategy - + q := QueryOptions{ + AllowStale: true, + WaitIndex: index, + } for { - q := QueryOptions{ - AllowStale: true, - WaitIndex: index, - } node, meta, err := n.Info(nodeID, &q) if err != nil { - msg := fmt.Sprintf("Error monitoring node: %v", err) + msg := Messagef(MonitorMsgLevelError, "Error monitoring node: %v", err) select { - case errCh <- msg: + case nodeCh <- msg: case <-ctx.Done(): } return } if node.DrainStrategy == nil { - msg := fmt.Sprintf("Node %q drain complete", nodeID) + msg := Messagef(MonitorMsgLevelWarn, "Node %q drain complete", nodeID) select { case nodeCh <- msg: case <-ctx.Done(): @@ -195,7 +218,7 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint6 // DrainStrategy changed if lastStrategy != nil && !node.DrainStrategy.Equal(lastStrategy) { - msg := fmt.Sprintf("Node %q drain updated: %s", nodeID, node.DrainStrategy) + msg := Messagef(MonitorMsgLevelInfo, "Node %q drain updated: %s", nodeID, node.DrainStrategy) select { case nodeCh <- msg: case <-ctx.Done(): @@ -212,7 +235,7 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint6 // monitorDrainAllocs emits alloc updates on allocCh and closes the channel // when the node has finished draining. -func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh, errCh chan<- string) { +func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh chan<- *MonitorMessage) { defer close(allocCh) q := QueryOptions{AllowStale: true} @@ -221,9 +244,9 @@ func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys for { allocs, meta, err := n.Allocations(nodeID, &q) if err != nil { - msg := fmt.Sprintf("Error monitoring allocations: %v", err) + msg := Messagef(MonitorMsgLevelError, "Error monitoring allocations: %v", err) select { - case errCh <- msg: + case allocCh <- msg: case <-ctx.Done(): } return @@ -265,7 +288,7 @@ func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys if msg != "" { select { - case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg): + case allocCh <- Messagef(MonitorMsgLevelInfo, "Alloc %q %s", a.ID, msg): case <-ctx.Done(): return } @@ -289,7 +312,7 @@ func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys // Exit if all allocs are terminal if runningAllocs == 0 { - msg := fmt.Sprintf("All allocations on node %q have stopped.", nodeID) + msg := Messagef(MonitorMsgLevelWarn, "All allocations on node %q have stopped.", nodeID) select { case allocCh <- msg: case <-ctx.Done(): diff --git a/api/nodes_test.go b/api/nodes_test.go index e00329d0f..7d439f044 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -389,29 +389,28 @@ func TestNodes_MonitorDrain_Multiplex_Bad(t *testing.T) { // don't need to use a full Client var nodeClient *Nodes - outCh := make(chan string, 8) - errCh := make(chan string, 1) - nodeCh := make(chan string, 1) - allocCh := make(chan string, 8) + outCh := make(chan *MonitorMessage, 8) + nodeCh := make(chan *MonitorMessage, 1) + allocCh := make(chan *MonitorMessage, 8) exitedCh := make(chan struct{}) go func() { defer close(exitedCh) - nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh) + nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh) }() // Fake an alloc update - msg := "alloc update" + msg := Messagef(0, "alloc update") allocCh <- msg require.Equal(msg, <-outCh) // Fake a node update - msg = "node update" + msg = Messagef(0, "node update") nodeCh <- msg require.Equal(msg, <-outCh) // Fake an error that should shut everything down - msg = "fake error" - errCh <- msg + msg = Messagef(MonitorMsgLevelError, "fake error") + nodeCh <- msg require.Equal(msg, <-outCh) _, ok := <-exitedCh @@ -442,18 +441,17 @@ func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) { // don't need to use a full Client var nodeClient *Nodes - outCh := make(chan string, 8) - errCh := make(chan string, 1) - nodeCh := make(chan string, 1) - allocCh := make(chan string, 8) + outCh := make(chan *MonitorMessage, 8) + nodeCh := make(chan *MonitorMessage, 1) + allocCh := make(chan *MonitorMessage, 8) exitedCh := make(chan struct{}) go func() { defer close(exitedCh) - nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh) + nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh) }() // Fake a node updating and finishing - msg := "node update" + msg := Messagef(MonitorMsgLevelInfo, "node update") nodeCh <- msg close(nodeCh) require.Equal(msg, <-outCh) @@ -474,7 +472,7 @@ func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) { } // Fake an alloc update coming in after the node monitor has finished - msg = "alloc update" + msg = Messagef(0, "alloc update") allocCh <- msg require.Equal(msg, <-outCh) diff --git a/command/node_drain.go b/command/node_drain.go index a27ae5251..b362870df 100644 --- a/command/node_drain.go +++ b/command/node_drain.go @@ -280,11 +280,19 @@ func (c *NodeDrainCommand) Run(args []string) int { } if enable && !detach { - c.Ui.Output("(Ctrl-C to stop monitoring: will not cancel the node drain)") - c.Ui.Output(fmt.Sprintf("%s Node %q drain strategy set", formatTime(time.Now()), node.ID)) + now := time.Now() + c.Ui.Warn(fmt.Sprintf("%s: Ctrl-C to stop monitoring: will not cancel the node drain", formatTime(now))) + c.Ui.Output(fmt.Sprintf("%s: Node %q drain strategy set", formatTime(now), node.ID)) outCh := client.Nodes().MonitorDrain(context.Background(), node.ID, meta.LastIndex, ignoreSystem) for msg := range outCh { - c.Ui.Output(fmt.Sprintf("%s %s", formatTime(time.Now()), msg)) + switch msg.Level { + case api.MonitorMsgLevelWarn: + c.Ui.Warn(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg)) + case api.MonitorMsgLevelError: + c.Ui.Error(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg)) + default: + c.Ui.Output(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg)) + } } } diff --git a/command/node_drain_test.go b/command/node_drain_test.go index fceec6488..41cb69acf 100644 --- a/command/node_drain_test.go +++ b/command/node_drain_test.go @@ -1,6 +1,7 @@ package command import ( + "bytes" "fmt" "strings" "testing" @@ -205,15 +206,20 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { t.Fatalf("err: %v", err) }) - ui := new(cli.MockUi) + outBuf := bytes.NewBuffer(nil) + ui := &cli.BasicUi{ + Reader: bytes.NewReader(nil), + Writer: outBuf, + ErrorWriter: outBuf, + } cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}} args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s", "-ignore-system"} t.Logf("Running: %v", args) if code := cmd.Run(args); code != 0 { - t.Fatalf("expected exit 0, got: %d\n%s", code, ui.OutputWriter.String()) + t.Fatalf("expected exit 0, got: %d\n%s", code, outBuf.String()) } - out := ui.OutputWriter.String() + out := outBuf.String() t.Logf("Output:\n%s", out) require.Contains(out, "drain complete")