mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
Merge pull request #5045 from hashicorp/b-drivermanager-tests-drain
drain: fix node drain monitoring
This commit is contained in:
28
api/nodes.go
28
api/nodes.go
@@ -124,12 +124,12 @@ func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, i
|
||||
allocCh := make(chan *MonitorMessage, 8)
|
||||
|
||||
// Multiplex node and alloc chans onto outCh. This goroutine closes
|
||||
// outCh when other chans have been closed or context canceled.
|
||||
// outCh when other chans have been closed.
|
||||
multiplexCtx, cancel := context.WithCancel(ctx)
|
||||
go n.monitorDrainMultiplex(multiplexCtx, cancel, outCh, nodeCh, allocCh)
|
||||
|
||||
// Monitor node for updates
|
||||
go n.monitorDrainNode(multiplexCtx, cancel, nodeID, index, nodeCh)
|
||||
go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh)
|
||||
|
||||
// Monitor allocs on node for updates
|
||||
go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh)
|
||||
@@ -160,12 +160,14 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
|
||||
if !nodeOk {
|
||||
// nil chan to prevent further recvs
|
||||
nodeCh = nil
|
||||
continue
|
||||
}
|
||||
|
||||
case msg, allocOk = <-allocCh:
|
||||
if !allocOk {
|
||||
// nil chan to prevent further recvs
|
||||
allocCh = nil
|
||||
continue
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
@@ -179,14 +181,6 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
|
||||
select {
|
||||
case outCh <- msg:
|
||||
case <-ctx.Done():
|
||||
|
||||
// If we are exiting but we have a message, attempt to send it
|
||||
// so we don't lose a message but do not block.
|
||||
select {
|
||||
case outCh <- msg:
|
||||
default:
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -199,12 +193,12 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
|
||||
|
||||
// monitorDrainNode emits node updates on nodeCh and closes the channel when
|
||||
// the node has finished draining.
|
||||
func (n *Nodes) monitorDrainNode(ctx context.Context, cancel func(),
|
||||
nodeID string, index uint64, nodeCh chan<- *MonitorMessage) {
|
||||
func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string,
|
||||
index uint64, nodeCh chan<- *MonitorMessage) {
|
||||
|
||||
defer close(nodeCh)
|
||||
|
||||
var lastStrategy *DrainStrategy
|
||||
var strategyChanged bool
|
||||
q := QueryOptions{
|
||||
AllowStale: true,
|
||||
WaitIndex: index,
|
||||
@@ -222,12 +216,7 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, cancel func(),
|
||||
|
||||
if node.DrainStrategy == nil {
|
||||
var msg *MonitorMessage
|
||||
if strategyChanged {
|
||||
msg = Messagef(MonitorMsgLevelInfo, "Node %q has marked all allocations for migration", nodeID)
|
||||
} else {
|
||||
msg = Messagef(MonitorMsgLevelInfo, "No drain strategy set for node %s", nodeID)
|
||||
defer cancel()
|
||||
}
|
||||
msg = Messagef(MonitorMsgLevelInfo, "Drain complete for node %s", nodeID)
|
||||
select {
|
||||
case nodeCh <- msg:
|
||||
case <-ctx.Done():
|
||||
@@ -254,7 +243,6 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, cancel func(),
|
||||
}
|
||||
|
||||
lastStrategy = node.DrainStrategy
|
||||
strategyChanged = true
|
||||
|
||||
// Drain still ongoing, update index and block for updates
|
||||
q.WaitIndex = meta.LastIndex
|
||||
|
||||
@@ -242,7 +242,7 @@ func (c *NodeDrainCommand) Run(args []string) int {
|
||||
}
|
||||
|
||||
// Prefix lookup matched a single node
|
||||
node, _, err := client.Nodes().Info(nodes[0].ID, nil)
|
||||
node, meta, err := client.Nodes().Info(nodes[0].ID, nil)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error toggling drain mode: %s", err))
|
||||
return 1
|
||||
@@ -250,8 +250,12 @@ func (c *NodeDrainCommand) Run(args []string) int {
|
||||
|
||||
// If monitoring the drain start the montior and return when done
|
||||
if monitor {
|
||||
if node.DrainStrategy == nil {
|
||||
c.Ui.Warn("No drain strategy set")
|
||||
return 0
|
||||
}
|
||||
c.Ui.Info(fmt.Sprintf("%s: Monitoring node %q: Ctrl-C to detach monitoring", formatTime(time.Now()), node.ID))
|
||||
c.monitorDrain(client, context.Background(), node, 0, ignoreSystem)
|
||||
c.monitorDrain(client, context.Background(), node, meta.LastIndex, ignoreSystem)
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -291,7 +295,7 @@ func (c *NodeDrainCommand) Run(args []string) int {
|
||||
}
|
||||
|
||||
// Toggle node draining
|
||||
meta, err := client.Nodes().UpdateDrain(node.ID, spec, !keepIneligible, nil)
|
||||
updateMeta, err := client.Nodes().UpdateDrain(node.ID, spec, !keepIneligible, nil)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error updating drain specification: %s", err))
|
||||
return 1
|
||||
@@ -309,7 +313,7 @@ func (c *NodeDrainCommand) Run(args []string) int {
|
||||
now := time.Now()
|
||||
c.Ui.Info(fmt.Sprintf("%s: Ctrl-C to stop monitoring: will not cancel the node drain", formatTime(now)))
|
||||
c.Ui.Output(fmt.Sprintf("%s: Node %q drain strategy set", formatTime(now), node.ID))
|
||||
c.monitorDrain(client, context.Background(), node, meta.LastIndex, ignoreSystem)
|
||||
c.monitorDrain(client, context.Background(), node, updateMeta.LastIndex, ignoreSystem)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -217,9 +217,7 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
|
||||
cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}}
|
||||
args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s", "-ignore-system"}
|
||||
t.Logf("Running: %v", args)
|
||||
if code := cmd.Run(args); code != 0 {
|
||||
t.Fatalf("expected exit 0, got: %d\n%s", code, outBuf.String())
|
||||
}
|
||||
require.Zero(cmd.Run(args))
|
||||
|
||||
out := outBuf.String()
|
||||
t.Logf("Output:\n%s", out)
|
||||
@@ -228,7 +226,7 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
|
||||
// monitor goroutines may start only after some or all the allocs have been
|
||||
// migrated.
|
||||
if !testutil.IsTravis() {
|
||||
require.Contains(out, "marked all allocations for migration")
|
||||
require.Contains(out, "Drain complete for node")
|
||||
for _, a := range allocs {
|
||||
if *a.Job.Type == "system" {
|
||||
if strings.Contains(out, a.ID) {
|
||||
@@ -250,9 +248,7 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
|
||||
outBuf.Reset()
|
||||
args = []string{"-address=" + url, "-self", "-monitor", "-ignore-system"}
|
||||
t.Logf("Running: %v", args)
|
||||
if code := cmd.Run(args); code != 0 {
|
||||
t.Fatalf("expected exit 0, got: %d\n%s", code, outBuf.String())
|
||||
}
|
||||
require.Zero(cmd.Run(args))
|
||||
|
||||
out = outBuf.String()
|
||||
t.Logf("Output:\n%s", out)
|
||||
@@ -298,7 +294,6 @@ func TestNodeDrainCommand_Monitor_NoDrainStrategy(t *testing.T) {
|
||||
out := outBuf.String()
|
||||
t.Logf("Output:\n%s", out)
|
||||
|
||||
require.Contains(out, "Monitoring node")
|
||||
require.Contains(out, "No drain strategy set")
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user