diff --git a/command/node_drain.go b/command/node_drain.go index 9f170c76e..2e5a63088 100644 --- a/command/node_drain.go +++ b/command/node_drain.go @@ -410,7 +410,7 @@ func monitorDrain(output func(string), nodeClient *api.Nodes, nodeID string, ind // Loop on alloc messages for a bit longer as we may have gotten the // "node done" first (since the watchers run concurrently the events // may be received out of order) - deadline := 250 * time.Millisecond + deadline := 500 * time.Millisecond timer := time.NewTimer(deadline) for { select { diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index 46dcad696..f167cf1f5 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -233,18 +233,18 @@ func (n *NodeDrainer) run(ctx context.Context) { // marks them for migration. func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) { // Retrieve the set of allocations that will be force stopped. - n.l.RLock() var forceStop []*structs.Allocation + n.l.RLock() for _, node := range nodes { draining, ok := n.nodes[node] if !ok { - n.logger.Printf("[DEBUG] nomad.node_drainer: skipping untracked deadlined node %q", node) + n.logger.Printf("[DEBUG] nomad.drain: skipping untracked deadlined node %q", node) continue } - allocs, err := draining.DeadlineAllocs() + allocs, err := draining.RemainingAllocs() if err != nil { - n.logger.Printf("[ERR] nomad.node_drainer: failed to retrive allocs on deadlined node %q: %v", node, err) + n.logger.Printf("[ERR] nomad.drain: failed to retrive allocs on deadlined node %q: %v", node, err) continue } @@ -272,9 +272,11 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) { nodes[alloc.NodeID] = struct{}{} } + var done []string + var remainingAllocs []*structs.Allocation + // For each node, check if it is now done n.l.RLock() - var done []string for node := range nodes { draining, ok := n.nodes[node] if !ok { @@ -283,7 +285,7 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) { isDone, err := draining.IsDone() if err != nil { - n.logger.Printf("[ERR] nomad.drain: checking if node %q is done draining: %v", node, err) + n.logger.Printf("[ERR] nomad.drain: error checking if node %q is done draining: %v", node, err) continue } @@ -292,9 +294,27 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) { } done = append(done, node) + + remaining, err := draining.RemainingAllocs() + if err != nil { + n.logger.Printf("[ERR] nomad.drain: node %q is done draining but encountered an error getting remaining allocs: %v", node, err) + continue + } + + remainingAllocs = append(remainingAllocs, remaining...) } n.l.RUnlock() + // Stop any running system jobs on otherwise done nodes + if len(remainingAllocs) > 0 { + future := structs.NewBatchFuture() + n.drainAllocs(future, remainingAllocs) + if err := future.Wait(); err != nil { + n.logger.Printf("[ERR] nomad.drain: failed to drain %d remaining allocs from done nodes: %v", + len(remainingAllocs), err) + } + } + // Submit the node transistions in a sharded form to ensure a reasonable // Raft transaction size. for _, nodes := range partitionIds(done) { diff --git a/nomad/drainer/draining_node.go b/nomad/drainer/draining_node.go index af5c094b8..b9f7c1148 100644 --- a/nomad/drainer/draining_node.go +++ b/nomad/drainer/draining_node.go @@ -47,7 +47,9 @@ func (n *drainingNode) DeadlineTime() (bool, time.Time) { return n.node.DrainStrategy.DeadlineTime() } -// IsDone returns if the node is done draining +// IsDone returns if the node is done draining batch and service allocs. System +// allocs must be stopped before marking drain complete unless they're being +// ignored. func (n *drainingNode) IsDone() (bool, error) { n.l.RLock() defer n.l.RUnlock() @@ -57,9 +59,6 @@ func (n *drainingNode) IsDone() (bool, error) { return false, fmt.Errorf("node doesn't have a drain strategy set") } - // Grab the relevant drain info - ignoreSystem := n.node.DrainStrategy.IgnoreSystemJobs - // Retrieve the allocs on the node allocs, err := n.state.AllocsByNode(nil, n.node.ID) if err != nil { @@ -67,8 +66,9 @@ func (n *drainingNode) IsDone() (bool, error) { } for _, alloc := range allocs { - // Skip system if configured to - if alloc.Job.Type == structs.JobTypeSystem && ignoreSystem { + // System jobs are only stopped after a node is done draining + // everything else, so ignore them here. + if alloc.Job.Type == structs.JobTypeSystem { continue } @@ -81,10 +81,9 @@ func (n *drainingNode) IsDone() (bool, error) { return true, nil } -// TODO test that we return the right thing given the strategies -// DeadlineAllocs returns the set of allocations that should be drained given a -// node is at its deadline -func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) { +// RemainingAllocs returns the set of allocations remaining on a node that +// still need to be drained. +func (n *drainingNode) RemainingAllocs() ([]*structs.Allocation, error) { n.l.RLock() defer n.l.RUnlock() @@ -94,10 +93,6 @@ func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) { } // Grab the relevant drain info - inf, _ := n.node.DrainStrategy.DeadlineTime() - if inf { - return nil, nil - } ignoreSystem := n.node.DrainStrategy.IgnoreSystemJobs // Retrieve the allocs on the node @@ -124,7 +119,7 @@ func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) { return drain, nil } -// RunningServices returns the set of jobs on the node +// RunningServices returns the set of service jobs on the node. func (n *drainingNode) RunningServices() ([]structs.NamespacedID, error) { n.l.RLock() defer n.l.RUnlock() diff --git a/nomad/drainer/draining_node_test.go b/nomad/drainer/draining_node_test.go new file mode 100644 index 000000000..f8b298432 --- /dev/null +++ b/nomad/drainer/draining_node_test.go @@ -0,0 +1,223 @@ +package drainer + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// testDrainingNode creates a *drainingNode with a 1h deadline but no allocs +func testDrainingNode(t *testing.T) *drainingNode { + t.Helper() + + sconfig := &state.StateStoreConfig{ + LogOutput: testlog.NewWriter(t), + Region: "global", + } + state, err := state.NewStateStore(sconfig) + require.Nil(t, err) + + node := mock.Node() + node.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: time.Hour, + }, + ForceDeadline: time.Now().Add(time.Hour), + } + + require.Nil(t, state.UpsertNode(100, node)) + return NewDrainingNode(node, state) +} + +func assertDrainingNode(t *testing.T, dn *drainingNode, isDone bool, remaining, running int) { + t.Helper() + + done, err := dn.IsDone() + require.Nil(t, err) + assert.Equal(t, isDone, done, "IsDone mismatch") + + allocs, err := dn.RemainingAllocs() + require.Nil(t, err) + assert.Len(t, allocs, remaining, "RemainingAllocs mismatch") + + jobs, err := dn.RunningServices() + require.Nil(t, err) + assert.Len(t, jobs, running, "RunningServices mismatch") +} + +func TestDrainingNode_Table(t *testing.T) { + cases := []struct { + name string + isDone bool + remaining int + running int + setup func(*testing.T, *drainingNode) + }{ + { + name: "Empty", + isDone: true, + remaining: 0, + running: 0, + setup: func(*testing.T, *drainingNode) {}, + }, + { + name: "Batch", + isDone: false, + remaining: 1, + running: 0, + setup: func(t *testing.T, dn *drainingNode) { + alloc := mock.BatchAlloc() + alloc.NodeID = dn.node.ID + require.Nil(t, dn.state.UpsertJob(101, alloc.Job)) + require.Nil(t, dn.state.UpsertAllocs(102, []*structs.Allocation{alloc})) + }, + }, + { + name: "Service", + isDone: false, + remaining: 1, + running: 1, + setup: func(t *testing.T, dn *drainingNode) { + alloc := mock.Alloc() + alloc.NodeID = dn.node.ID + require.Nil(t, dn.state.UpsertJob(101, alloc.Job)) + require.Nil(t, dn.state.UpsertAllocs(102, []*structs.Allocation{alloc})) + }, + }, + { + name: "System", + isDone: true, + remaining: 1, + running: 0, + setup: func(t *testing.T, dn *drainingNode) { + alloc := mock.SystemAlloc() + alloc.NodeID = dn.node.ID + require.Nil(t, dn.state.UpsertJob(101, alloc.Job)) + require.Nil(t, dn.state.UpsertAllocs(102, []*structs.Allocation{alloc})) + }, + }, + { + name: "AllTerminal", + isDone: true, + remaining: 0, + running: 0, + setup: func(t *testing.T, dn *drainingNode) { + allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()} + for _, a := range allocs { + a.NodeID = dn.node.ID + require.Nil(t, dn.state.UpsertJob(101, a.Job)) + } + require.Nil(t, dn.state.UpsertAllocs(102, allocs)) + + // StateStore doesn't like inserting new allocs + // with a terminal status, so set the status in + // a second pass + for _, a := range allocs { + a.ClientStatus = structs.AllocClientStatusComplete + } + require.Nil(t, dn.state.UpsertAllocs(103, allocs)) + }, + }, + { + name: "ServiceTerminal", + isDone: false, + remaining: 2, + running: 0, + setup: func(t *testing.T, dn *drainingNode) { + allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()} + for _, a := range allocs { + a.NodeID = dn.node.ID + require.Nil(t, dn.state.UpsertJob(101, a.Job)) + } + require.Nil(t, dn.state.UpsertAllocs(102, allocs)) + + // Set only the service job as terminal + allocs[0].ClientStatus = structs.AllocClientStatusComplete + require.Nil(t, dn.state.UpsertAllocs(103, allocs)) + }, + }, + { + name: "AllTerminalButBatch", + isDone: false, + remaining: 1, + running: 0, + setup: func(t *testing.T, dn *drainingNode) { + allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()} + for _, a := range allocs { + a.NodeID = dn.node.ID + require.Nil(t, dn.state.UpsertJob(101, a.Job)) + } + require.Nil(t, dn.state.UpsertAllocs(102, allocs)) + + // Set only the service and batch jobs as terminal + allocs[0].ClientStatus = structs.AllocClientStatusComplete + allocs[2].ClientStatus = structs.AllocClientStatusComplete + require.Nil(t, dn.state.UpsertAllocs(103, allocs)) + }, + }, + { + name: "AllTerminalButSystem", + isDone: true, + remaining: 1, + running: 0, + setup: func(t *testing.T, dn *drainingNode) { + allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()} + for _, a := range allocs { + a.NodeID = dn.node.ID + require.Nil(t, dn.state.UpsertJob(101, a.Job)) + } + require.Nil(t, dn.state.UpsertAllocs(102, allocs)) + + // Set only the service and batch jobs as terminal + allocs[0].ClientStatus = structs.AllocClientStatusComplete + allocs[1].ClientStatus = structs.AllocClientStatusComplete + require.Nil(t, dn.state.UpsertAllocs(103, allocs)) + }, + }, + { + name: "HalfTerminal", + isDone: false, + remaining: 3, + running: 1, + setup: func(t *testing.T, dn *drainingNode) { + allocs := []*structs.Allocation{ + mock.Alloc(), + mock.BatchAlloc(), + mock.SystemAlloc(), + mock.Alloc(), + mock.BatchAlloc(), + mock.SystemAlloc(), + } + for _, a := range allocs { + a.NodeID = dn.node.ID + require.Nil(t, dn.state.UpsertJob(101, a.Job)) + } + require.Nil(t, dn.state.UpsertAllocs(102, allocs)) + + // Set only the service and batch jobs as terminal + allocs[0].ClientStatus = structs.AllocClientStatusComplete + allocs[1].ClientStatus = structs.AllocClientStatusComplete + allocs[2].ClientStatus = structs.AllocClientStatusComplete + require.Nil(t, dn.state.UpsertAllocs(103, allocs)) + }, + }, + } + + // Default test drainingNode has no allocs, so it should be done and + // have no remaining allocs. + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + dn := testDrainingNode(t) + tc.setup(t, dn) + assertDrainingNode(t, dn, tc.isDone, tc.remaining, tc.running) + }) + } +} diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index 97c6cf8b2..8b816ae1f 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -87,6 +87,21 @@ func (n *NodeDrainer) Update(node *structs.Node) { } if done { + // Node is done draining. Stop remaining system allocs before + // marking node as complete. + remaining, err := draining.RemainingAllocs() + if err != nil { + n.logger.Printf("[ERR] nomad.drain: error getting remaining allocs on drained node %q: %v", + node.ID, err) + } else if len(remaining) > 0 { + future := structs.NewBatchFuture() + n.drainAllocs(future, remaining) + if err := future.Wait(); err != nil { + n.logger.Printf("[ERR] nomad.drain: failed to drain %d remaining allocs from done node %q: %v", + len(remaining), node.ID, err) + } + } + index, err := n.raft.NodesDrainComplete([]string{node.ID}) if err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 6b135b636..bf1df8ea1 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -387,6 +387,98 @@ func Alloc() *structs.Allocation { return alloc } +func BatchAlloc() *structs.Allocation { + alloc := &structs.Allocation{ + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: "12345678-abcd-efab-cdef-123456789abc", + Namespace: structs.DefaultNamespace, + TaskGroup: "worker", + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + DiskMB: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []structs.Port{{Label: "http"}}, + }, + }, + }, + TaskResources: map[string]*structs.Resources{ + "worker": { + CPU: 100, + MemoryMB: 100, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + }, + }, + }, + }, + SharedResources: &structs.Resources{ + DiskMB: 150, + }, + Job: BatchJob(), + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + } + alloc.JobID = alloc.Job.ID + return alloc +} + +func SystemAlloc() *structs.Allocation { + alloc := &structs.Allocation{ + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: "12345678-abcd-efab-cdef-123456789abc", + Namespace: structs.DefaultNamespace, + TaskGroup: "web", + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + DiskMB: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []structs.Port{{Label: "http"}}, + }, + }, + }, + TaskResources: map[string]*structs.Resources{ + "web": { + CPU: 500, + MemoryMB: 256, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []structs.Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + SharedResources: &structs.Resources{ + DiskMB: 150, + }, + Job: SystemJob(), + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + } + alloc.JobID = alloc.Job.ID + return alloc +} + func VaultAccessor() *structs.VaultAccessor { return &structs.VaultAccessor{ Accessor: uuid.Generate(),