diff --git a/api/nodes.go b/api/nodes.go index 77e61624a..4be661af0 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -85,25 +85,52 @@ func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q return &resp, nil } +// MonitorMsgLevels represents the severity log level of a MonitorMessage. +type MonitorMsgLevel int + +const ( + MonitorMsgLevelNormal MonitorMsgLevel = 0 + MonitorMsgLevelInfo MonitorMsgLevel = 1 + MonitorMsgLevelWarn MonitorMsgLevel = 2 + MonitorMsgLevelError MonitorMsgLevel = 3 +) + +// MonitorMessage contains a message and log level. +type MonitorMessage struct { + Level MonitorMsgLevel + Message string +} + +// Messagef formats a new MonitorMessage. +func Messagef(lvl MonitorMsgLevel, msg string, args ...interface{}) *MonitorMessage { + return &MonitorMessage{ + Level: lvl, + Message: fmt.Sprintf(msg, args...), + } +} + +func (m *MonitorMessage) String() string { + return m.Message +} + // MonitorDrain emits drain related events on the returned string channel. The // channel will be closed when all allocations on the draining node have // stopped or the context is canceled. -func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan string { - outCh := make(chan string, 8) - errCh := make(chan string, 1) - nodeCh := make(chan string, 1) - allocCh := make(chan string, 8) +func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan *MonitorMessage { + outCh := make(chan *MonitorMessage, 8) + nodeCh := make(chan *MonitorMessage, 1) + allocCh := make(chan *MonitorMessage, 8) // Multiplex node and alloc chans onto outCh. This goroutine closes // outCh when other chans have been closed or context canceled. multiplexCtx, cancel := context.WithCancel(ctx) - go n.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh) + go n.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh) // Monitor node for updates - go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh, errCh) + go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh) // Monitor allocs on node for updates - go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh, errCh) + go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh) return outCh } @@ -112,13 +139,14 @@ func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, i // Closes out chan when either the context is canceled, both update chans are // closed, or an error occurs. func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), - outCh chan<- string, errCh, nodeCh, allocCh <-chan string) { + outCh chan<- *MonitorMessage, nodeCh, allocCh <-chan *MonitorMessage) { defer cancel() defer close(outCh) + nodeOk := true allocOk := true - var msg string + var msg *MonitorMessage for { // If both chans have been closed, close the output chan if !nodeOk && !allocOk { @@ -138,19 +166,11 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), allocCh = nil } - case errMsg := <-errCh: - // Error occurred, exit after sending - select { - case outCh <- errMsg: - case <-ctx.Done(): - } - return - case <-ctx.Done(): return } - if msg == "" { + if msg == nil { continue } @@ -159,33 +179,28 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), case <-ctx.Done(): return } + + // Abort on error messages + if msg.Level == MonitorMsgLevelError { + return + } } } // monitorDrainNode emits node updates on nodeCh and closes the channel when // the node has finished draining. -func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh, errCh chan<- string) { +func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh chan<- *MonitorMessage) { defer close(nodeCh) var lastStrategy *DrainStrategy - + q := QueryOptions{ + AllowStale: true, + WaitIndex: index, + } for { - q := QueryOptions{ - AllowStale: true, - WaitIndex: index, - } node, meta, err := n.Info(nodeID, &q) if err != nil { - msg := fmt.Sprintf("Error monitoring node: %v", err) - select { - case errCh <- msg: - case <-ctx.Done(): - } - return - } - - if node.DrainStrategy == nil { - msg := fmt.Sprintf("Node %q drain complete", nodeID) + msg := Messagef(MonitorMsgLevelError, "Error monitoring node: %v", err) select { case nodeCh <- msg: case <-ctx.Done(): @@ -193,9 +208,26 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint6 return } + if node.DrainStrategy == nil { + msg := Messagef(MonitorMsgLevelInfo, "Node %q drain complete", nodeID) + select { + case nodeCh <- msg: + case <-ctx.Done(): + } + return + } + + if node.Status == structs.NodeStatusDown { + msg := Messagef(MonitorMsgLevelWarn, "Node %q down", nodeID) + select { + case nodeCh <- msg: + case <-ctx.Done(): + } + } + // DrainStrategy changed if lastStrategy != nil && !node.DrainStrategy.Equal(lastStrategy) { - msg := fmt.Sprintf("Node %q drain updated: %s", nodeID, node.DrainStrategy) + msg := Messagef(MonitorMsgLevelInfo, "Node %q drain updated: %s", nodeID, node.DrainStrategy) select { case nodeCh <- msg: case <-ctx.Done(): @@ -206,45 +238,31 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint6 lastStrategy = node.DrainStrategy // Drain still ongoing, update index and block for updates - index = meta.LastIndex + q.WaitIndex = meta.LastIndex } } // monitorDrainAllocs emits alloc updates on allocCh and closes the channel // when the node has finished draining. -func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh, errCh chan<- string) { +func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh chan<- *MonitorMessage) { defer close(allocCh) - // Build initial alloc state q := QueryOptions{AllowStale: true} - allocs, meta, err := n.Allocations(nodeID, &q) - if err != nil { - msg := fmt.Sprintf("Error monitoring allocations: %v", err) - select { - case errCh <- msg: - case <-ctx.Done(): - } - return - } - - initial := make(map[string]*Allocation, len(allocs)) - for _, a := range allocs { - initial[a.ID] = a - } + initial := make(map[string]*Allocation, 4) for { - q.WaitIndex = meta.LastIndex - - allocs, meta, err = n.Allocations(nodeID, &q) + allocs, meta, err := n.Allocations(nodeID, &q) if err != nil { - msg := fmt.Sprintf("Error monitoring allocations: %v", err) + msg := Messagef(MonitorMsgLevelError, "Error monitoring allocations: %v", err) select { - case errCh <- msg: + case allocCh <- msg: case <-ctx.Done(): } return } + q.WaitIndex = meta.LastIndex + runningAllocs := 0 for _, a := range allocs { // Get previous version of alloc @@ -269,17 +287,15 @@ func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys case migrating && !orig.DesiredTransition.ShouldMigrate(): // Alloc was marked for migration msg = "marked for migration" + case migrating && (orig.DesiredStatus != a.DesiredStatus) && a.DesiredStatus == structs.AllocDesiredStatusStop: // Alloc has already been marked for migration and is now being stopped msg = "draining" - case a.NextAllocation != "" && orig.NextAllocation == "": - // Alloc has been replaced by another allocation - msg = fmt.Sprintf("replaced by allocation %q", a.NextAllocation) } if msg != "" { select { - case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg): + case allocCh <- Messagef(MonitorMsgLevelNormal, "Alloc %q %s", a.ID, msg): case <-ctx.Done(): return } @@ -303,7 +319,7 @@ func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys // Exit if all allocs are terminal if runningAllocs == 0 { - msg := fmt.Sprintf("All allocations on node %q have stopped.", nodeID) + msg := Messagef(MonitorMsgLevelInfo, "All allocations on node %q have stopped.", nodeID) select { case allocCh <- msg: case <-ctx.Done(): diff --git a/api/nodes_test.go b/api/nodes_test.go index e00329d0f..7d439f044 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -389,29 +389,28 @@ func TestNodes_MonitorDrain_Multiplex_Bad(t *testing.T) { // don't need to use a full Client var nodeClient *Nodes - outCh := make(chan string, 8) - errCh := make(chan string, 1) - nodeCh := make(chan string, 1) - allocCh := make(chan string, 8) + outCh := make(chan *MonitorMessage, 8) + nodeCh := make(chan *MonitorMessage, 1) + allocCh := make(chan *MonitorMessage, 8) exitedCh := make(chan struct{}) go func() { defer close(exitedCh) - nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh) + nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh) }() // Fake an alloc update - msg := "alloc update" + msg := Messagef(0, "alloc update") allocCh <- msg require.Equal(msg, <-outCh) // Fake a node update - msg = "node update" + msg = Messagef(0, "node update") nodeCh <- msg require.Equal(msg, <-outCh) // Fake an error that should shut everything down - msg = "fake error" - errCh <- msg + msg = Messagef(MonitorMsgLevelError, "fake error") + nodeCh <- msg require.Equal(msg, <-outCh) _, ok := <-exitedCh @@ -442,18 +441,17 @@ func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) { // don't need to use a full Client var nodeClient *Nodes - outCh := make(chan string, 8) - errCh := make(chan string, 1) - nodeCh := make(chan string, 1) - allocCh := make(chan string, 8) + outCh := make(chan *MonitorMessage, 8) + nodeCh := make(chan *MonitorMessage, 1) + allocCh := make(chan *MonitorMessage, 8) exitedCh := make(chan struct{}) go func() { defer close(exitedCh) - nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh) + nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh) }() // Fake a node updating and finishing - msg := "node update" + msg := Messagef(MonitorMsgLevelInfo, "node update") nodeCh <- msg close(nodeCh) require.Equal(msg, <-outCh) @@ -474,7 +472,7 @@ func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) { } // Fake an alloc update coming in after the node monitor has finished - msg = "alloc update" + msg = Messagef(0, "alloc update") allocCh <- msg require.Equal(msg, <-outCh) diff --git a/command/agent/command.go b/command/agent/command.go index fb5c489d7..59926777c 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -478,9 +478,9 @@ func (c *Command) Run(args []string) int { // Log config files if len(config.Files) > 0 { - c.Ui.Info(fmt.Sprintf("Loaded configuration from %s", strings.Join(config.Files, ", "))) + c.Ui.Output(fmt.Sprintf("Loaded configuration from %s", strings.Join(config.Files, ", "))) } else { - c.Ui.Info("No configuration files loaded") + c.Ui.Output("No configuration files loaded") } // Initialize the telemetry @@ -529,7 +529,7 @@ func (c *Command) Run(args []string) int { padding := 18 c.Ui.Output("Nomad agent configuration:\n") for _, k := range infoKeys { - c.Ui.Info(fmt.Sprintf( + c.Ui.Output(fmt.Sprintf( "%s%s: %s", strings.Repeat(" ", padding-len(k)), strings.Title(k), @@ -831,7 +831,7 @@ func (c *Command) startupJoin(config *Config) error { return err } - c.Ui.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n)) + c.Ui.Output(fmt.Sprintf("Join completed. Synced with %d initial agents", n)) return nil } diff --git a/command/commands.go b/command/commands.go index 63a050ad3..6eeba401a 100644 --- a/command/commands.go +++ b/command/commands.go @@ -48,7 +48,7 @@ func (c *DeprecatedCommand) warn() { // Commands returns the mapping of CLI commands for Nomad. The meta // parameter lets you set meta options for all commands. -func Commands(metaPtr *Meta) map[string]cli.CommandFactory { +func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { if metaPtr == nil { metaPtr = new(Meta) } @@ -156,7 +156,7 @@ func Commands(metaPtr *Meta) map[string]cli.CommandFactory { "agent": func() (cli.Command, error) { return &agent.Command{ Version: version.GetVersion(), - Ui: meta.Ui, + Ui: agentUi, ShutdownCh: make(chan struct{}), }, nil }, diff --git a/command/node_drain.go b/command/node_drain.go index 978c5daa0..0c0063f3b 100644 --- a/command/node_drain.go +++ b/command/node_drain.go @@ -271,16 +271,30 @@ func (c *NodeDrainCommand) Run(args []string) int { return 1 } - if enable { - c.Ui.Output(fmt.Sprintf("Node %q drain strategy set", node.ID)) - } else { - c.Ui.Output(fmt.Sprintf("Node %q drain strategy unset", node.ID)) + if !enable || detach { + if enable { + c.Ui.Output(fmt.Sprintf("Node %q drain strategy set", node.ID)) + } else { + c.Ui.Output(fmt.Sprintf("Node %q drain strategy unset", node.ID)) + } } if enable && !detach { + now := time.Now() + c.Ui.Info(fmt.Sprintf("%s: Ctrl-C to stop monitoring: will not cancel the node drain", formatTime(now))) + c.Ui.Output(fmt.Sprintf("%s: Node %q drain strategy set", formatTime(now), node.ID)) outCh := client.Nodes().MonitorDrain(context.Background(), node.ID, meta.LastIndex, ignoreSystem) for msg := range outCh { - c.Ui.Output(msg) + switch msg.Level { + case api.MonitorMsgLevelInfo: + c.Ui.Info(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg)) + case api.MonitorMsgLevelWarn: + c.Ui.Warn(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg)) + case api.MonitorMsgLevelError: + c.Ui.Error(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg)) + default: + c.Ui.Output(fmt.Sprintf("%s: %s", formatTime(time.Now()), msg)) + } } } diff --git a/command/node_drain_test.go b/command/node_drain_test.go index fceec6488..41cb69acf 100644 --- a/command/node_drain_test.go +++ b/command/node_drain_test.go @@ -1,6 +1,7 @@ package command import ( + "bytes" "fmt" "strings" "testing" @@ -205,15 +206,20 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { t.Fatalf("err: %v", err) }) - ui := new(cli.MockUi) + outBuf := bytes.NewBuffer(nil) + ui := &cli.BasicUi{ + Reader: bytes.NewReader(nil), + Writer: outBuf, + ErrorWriter: outBuf, + } cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}} args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s", "-ignore-system"} t.Logf("Running: %v", args) if code := cmd.Run(args); code != 0 { - t.Fatalf("expected exit 0, got: %d\n%s", code, ui.OutputWriter.String()) + t.Fatalf("expected exit 0, got: %d\n%s", code, outBuf.String()) } - out := ui.OutputWriter.String() + out := outBuf.String() t.Logf("Output:\n%s", out) require.Contains(out, "drain complete") diff --git a/command/node_eligibility.go b/command/node_eligibility.go index a3fe5f802..64a5d9533 100644 --- a/command/node_eligibility.go +++ b/command/node_eligibility.go @@ -150,6 +150,10 @@ func (c *NodeEligibilityCommand) Run(args []string) int { return 1 } - c.Ui.Output(fmt.Sprintf("Node %q scheduling eligibility set", node.ID)) + if enable { + c.Ui.Output(fmt.Sprintf("Node %q scheduling eligibility set: eligible for scheduling", node.ID)) + } else { + c.Ui.Output(fmt.Sprintf("Node %q scheduling eligibility set: ineligible for scheduling", node.ID)) + } return 0 } diff --git a/command/operator_keyring.go b/command/operator_keyring.go index a74285870..5ef828178 100644 --- a/command/operator_keyring.go +++ b/command/operator_keyring.go @@ -114,7 +114,7 @@ func (c *OperatorKeyringCommand) Run(args []string) int { } if listKeys { - c.Ui.Info("Gathering installed encryption keys...") + c.Ui.Output("Gathering installed encryption keys...") r, err := client.Agent().ListKeys() if err != nil { c.Ui.Error(fmt.Sprintf("error: %s", err)) @@ -125,7 +125,7 @@ func (c *OperatorKeyringCommand) Run(args []string) int { } if installKey != "" { - c.Ui.Info("Installing new gossip encryption key...") + c.Ui.Output("Installing new gossip encryption key...") _, err := client.Agent().InstallKey(installKey) if err != nil { c.Ui.Error(fmt.Sprintf("error: %s", err)) @@ -135,7 +135,7 @@ func (c *OperatorKeyringCommand) Run(args []string) int { } if useKey != "" { - c.Ui.Info("Changing primary gossip encryption key...") + c.Ui.Output("Changing primary gossip encryption key...") _, err := client.Agent().UseKey(useKey) if err != nil { c.Ui.Error(fmt.Sprintf("error: %s", err)) @@ -145,7 +145,7 @@ func (c *OperatorKeyringCommand) Run(args []string) int { } if removeKey != "" { - c.Ui.Info("Removing gossip encryption key...") + c.Ui.Output("Removing gossip encryption key...") _, err := client.Agent().RemoveKey(removeKey) if err != nil { c.Ui.Error(fmt.Sprintf("error: %s", err)) diff --git a/main.go b/main.go index b563a7f02..d74bef167 100644 --- a/main.go +++ b/main.go @@ -92,16 +92,24 @@ func RunCustom(args []string) int { ErrorWriter: os.Stderr, } + // The Nomad agent never outputs color + agentUi := &cli.BasicUi{ + Reader: os.Stdin, + Writer: os.Stdout, + ErrorWriter: os.Stderr, + } + // Only use colored UI if stdout is a tty, and not disabled if isTerminal && color { metaPtr.Ui = &cli.ColoredUi{ ErrorColor: cli.UiColorRed, WarnColor: cli.UiColorYellow, + InfoColor: cli.UiColorGreen, Ui: metaPtr.Ui, } } - commands := command.Commands(metaPtr) + commands := command.Commands(metaPtr, agentUi) cli := &cli.CLI{ Name: "nomad", Version: version.GetVersion().FullVersionNumber(true),