Merge pull request #4084 from hashicorp/f-drain-timestamps

drain: add timestamps to cli output
This commit is contained in:
Michael Schurter
2018-03-30 15:07:43 -07:00
committed by GitHub
9 changed files with 145 additions and 99 deletions

View File

@@ -85,25 +85,52 @@ func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q
return &resp, nil
}
// MonitorMsgLevels represents the severity log level of a MonitorMessage.
type MonitorMsgLevel int
const (
MonitorMsgLevelNormal MonitorMsgLevel = 0
MonitorMsgLevelInfo MonitorMsgLevel = 1
MonitorMsgLevelWarn MonitorMsgLevel = 2
MonitorMsgLevelError MonitorMsgLevel = 3
)
// MonitorMessage contains a message and log level.
type MonitorMessage struct {
Level MonitorMsgLevel
Message string
}
// Messagef formats a new MonitorMessage.
func Messagef(lvl MonitorMsgLevel, msg string, args ...interface{}) *MonitorMessage {
return &MonitorMessage{
Level: lvl,
Message: fmt.Sprintf(msg, args...),
}
}
func (m *MonitorMessage) String() string {
return m.Message
}
// MonitorDrain emits drain related events on the returned string channel. The
// channel will be closed when all allocations on the draining node have
// stopped or the context is canceled.
func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan string {
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan *MonitorMessage {
outCh := make(chan *MonitorMessage, 8)
nodeCh := make(chan *MonitorMessage, 1)
allocCh := make(chan *MonitorMessage, 8)
// Multiplex node and alloc chans onto outCh. This goroutine closes
// outCh when other chans have been closed or context canceled.
multiplexCtx, cancel := context.WithCancel(ctx)
go n.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
go n.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh)
// Monitor node for updates
go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh, errCh)
go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh)
// Monitor allocs on node for updates
go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh, errCh)
go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh)
return outCh
}
@@ -112,13 +139,14 @@ func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, i
// Closes out chan when either the context is canceled, both update chans are
// closed, or an error occurs.
func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
outCh chan<- string, errCh, nodeCh, allocCh <-chan string) {
outCh chan<- *MonitorMessage, nodeCh, allocCh <-chan *MonitorMessage) {
defer cancel()
defer close(outCh)
nodeOk := true
allocOk := true
var msg string
var msg *MonitorMessage
for {
// If both chans have been closed, close the output chan
if !nodeOk && !allocOk {
@@ -138,19 +166,11 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
allocCh = nil
}
case errMsg := <-errCh:
// Error occurred, exit after sending
select {
case outCh <- errMsg:
case <-ctx.Done():
}
return
case <-ctx.Done():
return
}
if msg == "" {
if msg == nil {
continue
}
@@ -159,33 +179,28 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
case <-ctx.Done():
return
}
// Abort on error messages
if msg.Level == MonitorMsgLevelError {
return
}
}
}
// monitorDrainNode emits node updates on nodeCh and closes the channel when
// the node has finished draining.
func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh, errCh chan<- string) {
func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh chan<- *MonitorMessage) {
defer close(nodeCh)
var lastStrategy *DrainStrategy
q := QueryOptions{
AllowStale: true,
WaitIndex: index,
}
for {
q := QueryOptions{
AllowStale: true,
WaitIndex: index,
}
node, meta, err := n.Info(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring node: %v", err)
select {
case errCh <- msg:
case <-ctx.Done():
}
return
}
if node.DrainStrategy == nil {
msg := fmt.Sprintf("Node %q drain complete", nodeID)
msg := Messagef(MonitorMsgLevelError, "Error monitoring node: %v", err)
select {
case nodeCh <- msg:
case <-ctx.Done():
@@ -193,9 +208,26 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint6
return
}
if node.DrainStrategy == nil {
msg := Messagef(MonitorMsgLevelInfo, "Node %q drain complete", nodeID)
select {
case nodeCh <- msg:
case <-ctx.Done():
}
return
}
if node.Status == structs.NodeStatusDown {
msg := Messagef(MonitorMsgLevelWarn, "Node %q down", nodeID)
select {
case nodeCh <- msg:
case <-ctx.Done():
}
}
// DrainStrategy changed
if lastStrategy != nil && !node.DrainStrategy.Equal(lastStrategy) {
msg := fmt.Sprintf("Node %q drain updated: %s", nodeID, node.DrainStrategy)
msg := Messagef(MonitorMsgLevelInfo, "Node %q drain updated: %s", nodeID, node.DrainStrategy)
select {
case nodeCh <- msg:
case <-ctx.Done():
@@ -206,45 +238,31 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint6
lastStrategy = node.DrainStrategy
// Drain still ongoing, update index and block for updates
index = meta.LastIndex
q.WaitIndex = meta.LastIndex
}
}
// monitorDrainAllocs emits alloc updates on allocCh and closes the channel
// when the node has finished draining.
func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh, errCh chan<- string) {
func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh chan<- *MonitorMessage) {
defer close(allocCh)
// Build initial alloc state
q := QueryOptions{AllowStale: true}
allocs, meta, err := n.Allocations(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring allocations: %v", err)
select {
case errCh <- msg:
case <-ctx.Done():
}
return
}
initial := make(map[string]*Allocation, len(allocs))
for _, a := range allocs {
initial[a.ID] = a
}
initial := make(map[string]*Allocation, 4)
for {
q.WaitIndex = meta.LastIndex
allocs, meta, err = n.Allocations(nodeID, &q)
allocs, meta, err := n.Allocations(nodeID, &q)
if err != nil {
msg := fmt.Sprintf("Error monitoring allocations: %v", err)
msg := Messagef(MonitorMsgLevelError, "Error monitoring allocations: %v", err)
select {
case errCh <- msg:
case allocCh <- msg:
case <-ctx.Done():
}
return
}
q.WaitIndex = meta.LastIndex
runningAllocs := 0
for _, a := range allocs {
// Get previous version of alloc
@@ -269,17 +287,15 @@ func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys
case migrating && !orig.DesiredTransition.ShouldMigrate():
// Alloc was marked for migration
msg = "marked for migration"
case migrating && (orig.DesiredStatus != a.DesiredStatus) && a.DesiredStatus == structs.AllocDesiredStatusStop:
// Alloc has already been marked for migration and is now being stopped
msg = "draining"
case a.NextAllocation != "" && orig.NextAllocation == "":
// Alloc has been replaced by another allocation
msg = fmt.Sprintf("replaced by allocation %q", a.NextAllocation)
}
if msg != "" {
select {
case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg):
case allocCh <- Messagef(MonitorMsgLevelNormal, "Alloc %q %s", a.ID, msg):
case <-ctx.Done():
return
}
@@ -303,7 +319,7 @@ func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys
// Exit if all allocs are terminal
if runningAllocs == 0 {
msg := fmt.Sprintf("All allocations on node %q have stopped.", nodeID)
msg := Messagef(MonitorMsgLevelInfo, "All allocations on node %q have stopped.", nodeID)
select {
case allocCh <- msg:
case <-ctx.Done():

View File

@@ -389,29 +389,28 @@ func TestNodes_MonitorDrain_Multiplex_Bad(t *testing.T) {
// don't need to use a full Client
var nodeClient *Nodes
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
outCh := make(chan *MonitorMessage, 8)
nodeCh := make(chan *MonitorMessage, 1)
allocCh := make(chan *MonitorMessage, 8)
exitedCh := make(chan struct{})
go func() {
defer close(exitedCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh)
}()
// Fake an alloc update
msg := "alloc update"
msg := Messagef(0, "alloc update")
allocCh <- msg
require.Equal(msg, <-outCh)
// Fake a node update
msg = "node update"
msg = Messagef(0, "node update")
nodeCh <- msg
require.Equal(msg, <-outCh)
// Fake an error that should shut everything down
msg = "fake error"
errCh <- msg
msg = Messagef(MonitorMsgLevelError, "fake error")
nodeCh <- msg
require.Equal(msg, <-outCh)
_, ok := <-exitedCh
@@ -442,18 +441,17 @@ func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) {
// don't need to use a full Client
var nodeClient *Nodes
outCh := make(chan string, 8)
errCh := make(chan string, 1)
nodeCh := make(chan string, 1)
allocCh := make(chan string, 8)
outCh := make(chan *MonitorMessage, 8)
nodeCh := make(chan *MonitorMessage, 1)
allocCh := make(chan *MonitorMessage, 8)
exitedCh := make(chan struct{})
go func() {
defer close(exitedCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh)
nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh)
}()
// Fake a node updating and finishing
msg := "node update"
msg := Messagef(MonitorMsgLevelInfo, "node update")
nodeCh <- msg
close(nodeCh)
require.Equal(msg, <-outCh)
@@ -474,7 +472,7 @@ func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) {
}
// Fake an alloc update coming in after the node monitor has finished
msg = "alloc update"
msg = Messagef(0, "alloc update")
allocCh <- msg
require.Equal(msg, <-outCh)

View File

@@ -478,9 +478,9 @@ func (c *Command) Run(args []string) int {
// Log config files
if len(config.Files) > 0 {
c.Ui.Info(fmt.Sprintf("Loaded configuration from %s", strings.Join(config.Files, ", ")))
c.Ui.Output(fmt.Sprintf("Loaded configuration from %s", strings.Join(config.Files, ", ")))
} else {
c.Ui.Info("No configuration files loaded")
c.Ui.Output("No configuration files loaded")
}
// Initialize the telemetry
@@ -529,7 +529,7 @@ func (c *Command) Run(args []string) int {
padding := 18
c.Ui.Output("Nomad agent configuration:\n")
for _, k := range infoKeys {
c.Ui.Info(fmt.Sprintf(
c.Ui.Output(fmt.Sprintf(
"%s%s: %s",
strings.Repeat(" ", padding-len(k)),
strings.Title(k),
@@ -831,7 +831,7 @@ func (c *Command) startupJoin(config *Config) error {
return err
}
c.Ui.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n))
c.Ui.Output(fmt.Sprintf("Join completed. Synced with %d initial agents", n))
return nil
}

View File

@@ -48,7 +48,7 @@ func (c *DeprecatedCommand) warn() {
// Commands returns the mapping of CLI commands for Nomad. The meta
// parameter lets you set meta options for all commands.
func Commands(metaPtr *Meta) map[string]cli.CommandFactory {
func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
if metaPtr == nil {
metaPtr = new(Meta)
}
@@ -156,7 +156,7 @@ func Commands(metaPtr *Meta) map[string]cli.CommandFactory {
"agent": func() (cli.Command, error) {
return &agent.Command{
Version: version.GetVersion(),
Ui: meta.Ui,
Ui: agentUi,
ShutdownCh: make(chan struct{}),
}, nil
},

View File

@@ -271,16 +271,30 @@ func (c *NodeDrainCommand) Run(args []string) int {
return 1
}
if enable {
c.Ui.Output(fmt.Sprintf("Node %q drain strategy set", node.ID))
} else {
c.Ui.Output(fmt.Sprintf("Node %q drain strategy unset", node.ID))
if !enable || detach {
if enable {
c.Ui.Output(fmt.Sprintf("Node %q drain strategy set", node.ID))
} else {
c.Ui.Output(fmt.Sprintf("Node %q drain strategy unset", node.ID))
}
}
if enable && !detach {
now := time.Now()
c.Ui.Info(fmt.Sprintf("%s: Ctrl-C to stop monitoring: will not cancel the node drain", formatTime(now)))
c.Ui.Output(fmt.Sprintf("%s: Node %q drain strategy set", formatTime(now), node.ID))
outCh := client.Nodes().MonitorDrain(context.Background(), node.ID, meta.LastIndex, ignoreSystem)
for msg := range outCh {
c.Ui.Output(msg)
switch msg.Level {
case api.MonitorMsgLevelInfo:
c.Ui.Info(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg))
case api.MonitorMsgLevelWarn:
c.Ui.Warn(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg))
case api.MonitorMsgLevelError:
c.Ui.Error(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg))
default:
c.Ui.Output(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg))
}
}
}

View File

@@ -1,6 +1,7 @@
package command
import (
"bytes"
"fmt"
"strings"
"testing"
@@ -205,15 +206,20 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
t.Fatalf("err: %v", err)
})
ui := new(cli.MockUi)
outBuf := bytes.NewBuffer(nil)
ui := &cli.BasicUi{
Reader: bytes.NewReader(nil),
Writer: outBuf,
ErrorWriter: outBuf,
}
cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}}
args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s", "-ignore-system"}
t.Logf("Running: %v", args)
if code := cmd.Run(args); code != 0 {
t.Fatalf("expected exit 0, got: %d\n%s", code, ui.OutputWriter.String())
t.Fatalf("expected exit 0, got: %d\n%s", code, outBuf.String())
}
out := ui.OutputWriter.String()
out := outBuf.String()
t.Logf("Output:\n%s", out)
require.Contains(out, "drain complete")

View File

@@ -150,6 +150,10 @@ func (c *NodeEligibilityCommand) Run(args []string) int {
return 1
}
c.Ui.Output(fmt.Sprintf("Node %q scheduling eligibility set", node.ID))
if enable {
c.Ui.Output(fmt.Sprintf("Node %q scheduling eligibility set: eligible for scheduling", node.ID))
} else {
c.Ui.Output(fmt.Sprintf("Node %q scheduling eligibility set: ineligible for scheduling", node.ID))
}
return 0
}

View File

@@ -114,7 +114,7 @@ func (c *OperatorKeyringCommand) Run(args []string) int {
}
if listKeys {
c.Ui.Info("Gathering installed encryption keys...")
c.Ui.Output("Gathering installed encryption keys...")
r, err := client.Agent().ListKeys()
if err != nil {
c.Ui.Error(fmt.Sprintf("error: %s", err))
@@ -125,7 +125,7 @@ func (c *OperatorKeyringCommand) Run(args []string) int {
}
if installKey != "" {
c.Ui.Info("Installing new gossip encryption key...")
c.Ui.Output("Installing new gossip encryption key...")
_, err := client.Agent().InstallKey(installKey)
if err != nil {
c.Ui.Error(fmt.Sprintf("error: %s", err))
@@ -135,7 +135,7 @@ func (c *OperatorKeyringCommand) Run(args []string) int {
}
if useKey != "" {
c.Ui.Info("Changing primary gossip encryption key...")
c.Ui.Output("Changing primary gossip encryption key...")
_, err := client.Agent().UseKey(useKey)
if err != nil {
c.Ui.Error(fmt.Sprintf("error: %s", err))
@@ -145,7 +145,7 @@ func (c *OperatorKeyringCommand) Run(args []string) int {
}
if removeKey != "" {
c.Ui.Info("Removing gossip encryption key...")
c.Ui.Output("Removing gossip encryption key...")
_, err := client.Agent().RemoveKey(removeKey)
if err != nil {
c.Ui.Error(fmt.Sprintf("error: %s", err))

10
main.go
View File

@@ -92,16 +92,24 @@ func RunCustom(args []string) int {
ErrorWriter: os.Stderr,
}
// The Nomad agent never outputs color
agentUi := &cli.BasicUi{
Reader: os.Stdin,
Writer: os.Stdout,
ErrorWriter: os.Stderr,
}
// Only use colored UI if stdout is a tty, and not disabled
if isTerminal && color {
metaPtr.Ui = &cli.ColoredUi{
ErrorColor: cli.UiColorRed,
WarnColor: cli.UiColorYellow,
InfoColor: cli.UiColorGreen,
Ui: metaPtr.Ui,
}
}
commands := command.Commands(metaPtr)
commands := command.Commands(metaPtr, agentUi)
cli := &cli.CLI{
Name: "nomad",
Version: version.GetVersion().FullVersionNumber(true),