diff --git a/api/nodes.go b/api/nodes.go index 728705db0..77e61624a 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -1,6 +1,7 @@ package api import ( + "context" "fmt" "sort" "time" @@ -84,6 +85,234 @@ func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q return &resp, nil } +// MonitorDrain emits drain related events on the returned string channel. The +// channel will be closed when all allocations on the draining node have +// stopped or the context is canceled. +func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, ignoreSys bool) <-chan string { + outCh := make(chan string, 8) + errCh := make(chan string, 1) + nodeCh := make(chan string, 1) + allocCh := make(chan string, 8) + + // Multiplex node and alloc chans onto outCh. This goroutine closes + // outCh when other chans have been closed or context canceled. + multiplexCtx, cancel := context.WithCancel(ctx) + go n.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh) + + // Monitor node for updates + go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh, errCh) + + // Monitor allocs on node for updates + go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh, errCh) + + return outCh +} + +// monitorDrainMultiplex multiplexes node and alloc updates onto the out chan. +// Closes out chan when either the context is canceled, both update chans are +// closed, or an error occurs. +func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), + outCh chan<- string, errCh, nodeCh, allocCh <-chan string) { + + defer cancel() + defer close(outCh) + nodeOk := true + allocOk := true + var msg string + for { + // If both chans have been closed, close the output chan + if !nodeOk && !allocOk { + return + } + + select { + case msg, nodeOk = <-nodeCh: + if !nodeOk { + // nil chan to prevent further recvs + nodeCh = nil + } + + case msg, allocOk = <-allocCh: + if !allocOk { + // nil chan to prevent further recvs + allocCh = nil + } + + case errMsg := <-errCh: + // Error occurred, exit after sending + select { + case outCh <- errMsg: + case <-ctx.Done(): + } + return + + case <-ctx.Done(): + return + } + + if msg == "" { + continue + } + + select { + case outCh <- msg: + case <-ctx.Done(): + return + } + } +} + +// monitorDrainNode emits node updates on nodeCh and closes the channel when +// the node has finished draining. +func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh, errCh chan<- string) { + defer close(nodeCh) + + var lastStrategy *DrainStrategy + + for { + q := QueryOptions{ + AllowStale: true, + WaitIndex: index, + } + node, meta, err := n.Info(nodeID, &q) + if err != nil { + msg := fmt.Sprintf("Error monitoring node: %v", err) + select { + case errCh <- msg: + case <-ctx.Done(): + } + return + } + + if node.DrainStrategy == nil { + msg := fmt.Sprintf("Node %q drain complete", nodeID) + select { + case nodeCh <- msg: + case <-ctx.Done(): + } + return + } + + // DrainStrategy changed + if lastStrategy != nil && !node.DrainStrategy.Equal(lastStrategy) { + msg := fmt.Sprintf("Node %q drain updated: %s", nodeID, node.DrainStrategy) + select { + case nodeCh <- msg: + case <-ctx.Done(): + return + } + } + + lastStrategy = node.DrainStrategy + + // Drain still ongoing, update index and block for updates + index = meta.LastIndex + } +} + +// monitorDrainAllocs emits alloc updates on allocCh and closes the channel +// when the node has finished draining. +func (n *Nodes) monitorDrainAllocs(ctx context.Context, nodeID string, ignoreSys bool, allocCh, errCh chan<- string) { + defer close(allocCh) + + // Build initial alloc state + q := QueryOptions{AllowStale: true} + allocs, meta, err := n.Allocations(nodeID, &q) + if err != nil { + msg := fmt.Sprintf("Error monitoring allocations: %v", err) + select { + case errCh <- msg: + case <-ctx.Done(): + } + return + } + + initial := make(map[string]*Allocation, len(allocs)) + for _, a := range allocs { + initial[a.ID] = a + } + + for { + q.WaitIndex = meta.LastIndex + + allocs, meta, err = n.Allocations(nodeID, &q) + if err != nil { + msg := fmt.Sprintf("Error monitoring allocations: %v", err) + select { + case errCh <- msg: + case <-ctx.Done(): + } + return + } + + runningAllocs := 0 + for _, a := range allocs { + // Get previous version of alloc + orig, existing := initial[a.ID] + + // Update local alloc state + initial[a.ID] = a + + migrating := a.DesiredTransition.ShouldMigrate() + + var msg string + switch { + case !existing: + // Should only be possible if response + // from initial Allocations call was + // stale. No need to output + + case orig.ClientStatus != a.ClientStatus: + // Alloc status has changed; output + msg = fmt.Sprintf("status %s -> %s", orig.ClientStatus, a.ClientStatus) + + case migrating && !orig.DesiredTransition.ShouldMigrate(): + // Alloc was marked for migration + msg = "marked for migration" + case migrating && (orig.DesiredStatus != a.DesiredStatus) && a.DesiredStatus == structs.AllocDesiredStatusStop: + // Alloc has already been marked for migration and is now being stopped + msg = "draining" + case a.NextAllocation != "" && orig.NextAllocation == "": + // Alloc has been replaced by another allocation + msg = fmt.Sprintf("replaced by allocation %q", a.NextAllocation) + } + + if msg != "" { + select { + case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg): + case <-ctx.Done(): + return + } + } + + // Ignore malformed allocs + if a.Job == nil || a.Job.Type == nil { + continue + } + + // Track how many allocs are still running + if ignoreSys && a.Job.Type != nil && *a.Job.Type == structs.JobTypeSystem { + continue + } + + switch a.ClientStatus { + case structs.AllocClientStatusPending, structs.AllocClientStatusRunning: + runningAllocs++ + } + } + + // Exit if all allocs are terminal + if runningAllocs == 0 { + msg := fmt.Sprintf("All allocations on node %q have stopped.", nodeID) + select { + case allocCh <- msg: + case <-ctx.Done(): + } + return + } + } +} + // NodeUpdateEligibilityRequest is used to update the drain specification for a node. type NodeUpdateEligibilityRequest struct { // NodeID is the node to update the drain specification for. @@ -220,6 +449,32 @@ type DrainSpec struct { IgnoreSystemJobs bool } +func (d *DrainStrategy) Equal(o *DrainStrategy) bool { + if d == nil || o == nil { + return d == o + } + + if d.ForceDeadline != o.ForceDeadline { + return false + } + if d.Deadline != o.Deadline { + return false + } + if d.IgnoreSystemJobs != o.IgnoreSystemJobs { + return false + } + + return true +} + +// String returns a human readable version of the drain strategy. +func (d *DrainStrategy) String() string { + if d.IgnoreSystemJobs { + return fmt.Sprintf("drain ignoring system jobs and deadline at %s", d.ForceDeadline) + } + return fmt.Sprintf("drain with deadline at %s", d.ForceDeadline) +} + const ( NodeEventSubsystemDrain = "Drain" NodeEventSubsystemDriver = "Driver" diff --git a/api/nodes_test.go b/api/nodes_test.go index 36c13ae1a..e00329d0f 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -1,6 +1,7 @@ package api import ( + "context" "fmt" "reflect" "sort" @@ -375,3 +376,160 @@ func TestNodes_GcAlloc(t *testing.T) { require.NotNil(err) require.True(structs.IsErrUnknownAllocation(err)) } + +// Unittest monitorDrainMultiplex when an error occurs +func TestNodes_MonitorDrain_Multiplex_Bad(t *testing.T) { + t.Parallel() + require := require.New(t) + + ctx := context.Background() + multiplexCtx, cancel := context.WithCancel(ctx) + + // monitorDrainMultiplex doesn't require anything on *Nodes, so we + // don't need to use a full Client + var nodeClient *Nodes + + outCh := make(chan string, 8) + errCh := make(chan string, 1) + nodeCh := make(chan string, 1) + allocCh := make(chan string, 8) + exitedCh := make(chan struct{}) + go func() { + defer close(exitedCh) + nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh) + }() + + // Fake an alloc update + msg := "alloc update" + allocCh <- msg + require.Equal(msg, <-outCh) + + // Fake a node update + msg = "node update" + nodeCh <- msg + require.Equal(msg, <-outCh) + + // Fake an error that should shut everything down + msg = "fake error" + errCh <- msg + require.Equal(msg, <-outCh) + + _, ok := <-exitedCh + require.False(ok) + + _, ok = <-outCh + require.False(ok) + + // Exiting should also cancel the context that would be passed to the + // node & alloc watchers + select { + case <-multiplexCtx.Done(): + case <-time.After(100 * time.Millisecond): + t.Fatalf("context wasn't canceled") + } + +} + +// Unittest monitorDrainMultiplex when drain finishes +func TestNodes_MonitorDrain_Multiplex_Good(t *testing.T) { + t.Parallel() + require := require.New(t) + + ctx := context.Background() + multiplexCtx, cancel := context.WithCancel(ctx) + + // monitorDrainMultiplex doesn't require anything on *Nodes, so we + // don't need to use a full Client + var nodeClient *Nodes + + outCh := make(chan string, 8) + errCh := make(chan string, 1) + nodeCh := make(chan string, 1) + allocCh := make(chan string, 8) + exitedCh := make(chan struct{}) + go func() { + defer close(exitedCh) + nodeClient.monitorDrainMultiplex(ctx, cancel, outCh, errCh, nodeCh, allocCh) + }() + + // Fake a node updating and finishing + msg := "node update" + nodeCh <- msg + close(nodeCh) + require.Equal(msg, <-outCh) + + // Nothing else should have exited yet + select { + case msg, ok := <-outCh: + if ok { + t.Fatalf("unexpected output: %q", msg) + } + t.Fatalf("out channel closed unexpectedly") + case <-exitedCh: + t.Fatalf("multiplexer exited unexpectedly") + case <-multiplexCtx.Done(): + t.Fatalf("multiplexer context canceled unexpectedly") + case <-time.After(10 * time.Millisecond): + t.Logf("multiplexer still running as expected") + } + + // Fake an alloc update coming in after the node monitor has finished + msg = "alloc update" + allocCh <- msg + require.Equal(msg, <-outCh) + + // Closing the allocCh should cause everything to exit + close(allocCh) + + _, ok := <-exitedCh + require.False(ok) + + _, ok = <-outCh + require.False(ok) + + // Exiting should also cancel the context that would be passed to the + // node & alloc watchers + select { + case <-multiplexCtx.Done(): + case <-time.After(100 * time.Millisecond): + t.Fatalf("context wasn't canceled") + } + +} + +func TestNodes_DrainStrategy_Equal(t *testing.T) { + t.Parallel() + require := require.New(t) + + // nil + var d *DrainStrategy + require.True(d.Equal(nil)) + + o := &DrainStrategy{} + require.False(d.Equal(o)) + require.False(o.Equal(d)) + + d = &DrainStrategy{} + require.True(d.Equal(o)) + + // ForceDeadline + d.ForceDeadline = time.Now() + require.False(d.Equal(o)) + + o.ForceDeadline = d.ForceDeadline + require.True(d.Equal(o)) + + // Deadline + d.Deadline = 1 + require.False(d.Equal(o)) + + o.Deadline = 1 + require.True(d.Equal(o)) + + // IgnoreSystemJobs + d.IgnoreSystemJobs = true + require.False(d.Equal(o)) + + o.IgnoreSystemJobs = true + require.True(d.Equal(o)) +} diff --git a/command/node_drain.go b/command/node_drain.go index 2e5a63088..978c5daa0 100644 --- a/command/node_drain.go +++ b/command/node_drain.go @@ -1,13 +1,13 @@ package command import ( + "context" "fmt" "strings" "time" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api/contexts" - "github.com/hashicorp/nomad/nomad/structs" "github.com/posener/complete" ) @@ -271,160 +271,18 @@ func (c *NodeDrainCommand) Run(args []string) int { return 1 } - c.Ui.Output(fmt.Sprintf("Node %q drain strategy set", node.ID)) + if enable { + c.Ui.Output(fmt.Sprintf("Node %q drain strategy set", node.ID)) + } else { + c.Ui.Output(fmt.Sprintf("Node %q drain strategy unset", node.ID)) + } if enable && !detach { - if err := monitorDrain(c.Ui.Output, client.Nodes(), node.ID, meta.LastIndex); err != nil { - c.Ui.Error(fmt.Sprintf("Error monitoring drain: %v", err)) - return 1 + outCh := client.Nodes().MonitorDrain(context.Background(), node.ID, meta.LastIndex, ignoreSystem) + for msg := range outCh { + c.Ui.Output(msg) } - - c.Ui.Output(fmt.Sprintf("Node %q drain complete", nodeID)) } return 0 } - -// monitorDrain monitors the node being drained and exits when the node has -// finished draining. -func monitorDrain(output func(string), nodeClient *api.Nodes, nodeID string, index uint64) error { - doneCh := make(chan struct{}) - defer close(doneCh) - - // Errors from either goroutine are sent here - errCh := make(chan error, 1) - - // Monitor node changes and close chan when drain is complete - nodeCh := make(chan struct{}) - go func() { - for { - q := api.QueryOptions{ - AllowStale: true, - WaitIndex: index, - } - node, meta, err := nodeClient.Info(nodeID, &q) - if err != nil { - select { - case errCh <- err: - case <-doneCh: - } - return - } - - if node.DrainStrategy == nil { - close(nodeCh) - return - } - - // Drain still ongoing - index = meta.LastIndex - } - }() - - // Monitor alloc changes - allocCh := make(chan string, 1) - go func() { - allocs, meta, err := nodeClient.Allocations(nodeID, nil) - if err != nil { - select { - case errCh <- err: - case <-doneCh: - } - return - } - - initial := make(map[string]*api.Allocation, len(allocs)) - for _, a := range allocs { - initial[a.ID] = a - } - - for { - q := api.QueryOptions{ - AllowStale: true, - WaitIndex: meta.LastIndex, - } - - allocs, meta, err = nodeClient.Allocations(nodeID, &q) - if err != nil { - select { - case errCh <- err: - case <-doneCh: - } - return - } - - for _, a := range allocs { - // Get previous version of alloc - orig, ok := initial[a.ID] - - // Update local alloc state - initial[a.ID] = a - - migrating := a.DesiredTransition.ShouldMigrate() - - msg := "" - switch { - case !ok: - // Should only be possible if response - // from initial Allocations call was - // stale. No need to output - - case orig.ClientStatus != a.ClientStatus: - // Alloc status has changed; output - msg = fmt.Sprintf("status %s -> %s", orig.ClientStatus, a.ClientStatus) - - case migrating && !orig.DesiredTransition.ShouldMigrate(): - // Alloc was marked for migration - msg = "marked for migration" - case migrating && (orig.DesiredStatus != a.DesiredStatus) && a.DesiredStatus == structs.AllocDesiredStatusStop: - // Alloc has already been marked for migration and is now being stopped - msg = "draining" - case a.NextAllocation != "" && orig.NextAllocation == "": - // Alloc has been replaced by another allocation - msg = fmt.Sprintf("replaced by allocation %q", a.NextAllocation) - } - - if msg != "" { - select { - case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg): - case <-doneCh: - return - } - } - } - } - }() - - done := false - for !done { - select { - case err := <-errCh: - return err - case <-nodeCh: - done = true - case msg := <-allocCh: - output(msg) - } - } - - // Loop on alloc messages for a bit longer as we may have gotten the - // "node done" first (since the watchers run concurrently the events - // may be received out of order) - deadline := 500 * time.Millisecond - timer := time.NewTimer(deadline) - for { - select { - case err := <-errCh: - return err - case msg := <-allocCh: - output(msg) - if !timer.Stop() { - <-timer.C - } - timer.Reset(deadline) - case <-timer.C: - // No events within deadline, exit - return nil - } - } -} diff --git a/command/node_drain_test.go b/command/node_drain_test.go index 01c8b1253..fceec6488 100644 --- a/command/node_drain_test.go +++ b/command/node_drain_test.go @@ -119,16 +119,17 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { t.Fatalf("err: %s", err) }) - // Register a job to create an alloc to drain - count := 3 + // Register a service job to create allocs to drain + serviceCount := 3 job := &api.Job{ ID: helper.StringToPtr("mock_service"), Name: helper.StringToPtr("mock_service"), Datacenters: []string{"dc1"}, + Type: helper.StringToPtr("service"), TaskGroups: []*api.TaskGroup{ { Name: helper.StringToPtr("mock_group"), - Count: &count, + Count: &serviceCount, Migrate: &api.MigrateStrategy{ MaxParallel: helper.IntToPtr(1), HealthCheck: helper.StringToPtr("task_states"), @@ -142,6 +143,10 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { Config: map[string]interface{}{ "run_for": "10m", }, + Resources: &api.Resources{ + CPU: helper.IntToPtr(50), + MemoryMB: helper.IntToPtr(50), + }, }, }, }, @@ -151,14 +156,44 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { _, _, err := client.Jobs().Register(job, nil) require.Nil(err) + // Register a system job to ensure it is ignored during draining + sysjob := &api.Job{ + ID: helper.StringToPtr("mock_system"), + Name: helper.StringToPtr("mock_system"), + Datacenters: []string{"dc1"}, + Type: helper.StringToPtr("system"), + TaskGroups: []*api.TaskGroup{ + { + Name: helper.StringToPtr("mock_sysgroup"), + Count: helper.IntToPtr(1), + Tasks: []*api.Task{ + { + Name: "mock_systask", + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": "10m", + }, + Resources: &api.Resources{ + CPU: helper.IntToPtr(50), + MemoryMB: helper.IntToPtr(50), + }, + }, + }, + }, + }, + } + + _, _, err = client.Jobs().Register(sysjob, nil) + require.Nil(err) + var allocs []*api.Allocation testutil.WaitForResult(func() (bool, error) { allocs, _, err = client.Nodes().Allocations(nodeID, nil) if err != nil { return false, err } - if len(allocs) != count { - return false, fmt.Errorf("number of allocs %d != count (%d)", len(allocs), count) + if len(allocs) != serviceCount+1 { + return false, fmt.Errorf("number of allocs %d != count (%d)", len(allocs), serviceCount+1) } for _, a := range allocs { if a.ClientStatus != "running" { @@ -172,10 +207,10 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { ui := new(cli.MockUi) cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}} - args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s"} + args := []string{"-address=" + url, "-self", "-enable", "-deadline", "1s", "-ignore-system"} t.Logf("Running: %v", args) if code := cmd.Run(args); code != 0 { - t.Fatalf("expected exit 0, got: %d", code) + t.Fatalf("expected exit 0, got: %d\n%s", code, ui.OutputWriter.String()) } out := ui.OutputWriter.String() @@ -183,9 +218,19 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { require.Contains(out, "drain complete") for _, a := range allocs { + if *a.Job.Type == "system" { + if strings.Contains(out, a.ID) { + t.Fatalf("output should not contain system alloc %q", a.ID) + } + continue + } require.Contains(out, fmt.Sprintf("Alloc %q marked for migration", a.ID)) require.Contains(out, fmt.Sprintf("Alloc %q draining", a.ID)) } + expected := fmt.Sprintf("All allocations on node %q have stopped.\n", nodeID) + if !strings.HasSuffix(out, expected) { + t.Fatalf("expected output to end with:\n%s", expected) + } } func TestNodeDrainCommand_Fails(t *testing.T) {