From 478209807e10fcb2c6592e62d36bce2334aeb74b Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 28 Feb 2018 20:59:41 -0800 Subject: [PATCH] refactor main drainloop into 2 more methods --- nomad/drainer/drain.go | 437 +++++++++++++++++++++-------------------- 1 file changed, 228 insertions(+), 209 deletions(-) diff --git a/nomad/drainer/drain.go b/nomad/drainer/drain.go index 4f2b73556..8db56ac7d 100644 --- a/nomad/drainer/drain.go +++ b/nomad/drainer/drain.go @@ -29,21 +29,28 @@ type runningJob struct { allocs []*structs.Allocation } -// drainingAlloc contains a conservative deadline an alloc has to be healthy by -// before it should stopped being watched and replaced. -type drainingAlloc struct { - // LastModified+MigrateStrategy.HealthyDeadline - deadline time.Time +// collectResult is the state collected by scanning for drain eligible allocs +type collectResult struct { + // drainableSvcs contains all service jobs and allocs that are + // potentially drainable meaning they have at least one allocation on a + // draining node. + drainableSvcs map[jobKey]*runningJob - // Task Group key - tgKey string -} + // drainNow contains all batch and system jobs that should be + // immediately drained due to a deadline or in the case of system jobs: + // all other allocs on the node have completed draining. + drainNow map[jobKey]*runningJob -func newDrainingAlloc(a *structs.Allocation, deadline time.Time) drainingAlloc { - return drainingAlloc{ - deadline: deadline, - tgKey: makeTaskGroupKey(a), - } + // upPerTG is a count of running allocs per task group for the + // migration mark phase to use when considering how many allocs can be + // migrated for a given group. + upPerTG map[string]int + + // doneNodes need no coordinating to finish their drain. Either all + // allocs have drained, the node is being force drained, or the drain + // deadline was hit. Any remaining allocs will be migrated via + // drainNow. + doneNodes map[string]*structs.Node } // makeTaskGroupKey returns a unique key for an allocation's task group @@ -107,10 +114,15 @@ type nodeDrainerState struct { // should be called when a server establishes leadership and SetEnabled(false) // called when leadership is lost. type NodeDrainer struct { + // enabledCh is used by SetEnabled to signal Run when to start/stop the + // nodeDrainer goroutine enabledCh chan nodeDrainerState + // raft is a shim around the raft messages necessary for draining raft RaftApplier + // shutdownCh is closed when the Server is shutting down the + // NodeDrainer should permanently exit shutdownCh <-chan struct{} logger *log.Logger @@ -249,9 +261,6 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore) return } - // Tracks nodes that are done draining - doneNodes := map[string]*structs.Node{} - // Capture state (statestore and time) to do consistent comparisons snapshot, err := state.Snapshot() if err != nil { @@ -260,206 +269,20 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore) } now := time.Now() - // job key -> {job, allocs} - // Collect all allocs for all jobs with at least one - // non-terminal alloc on a draining node. - // Invariants: - // - Only service jobs - // - No entries with 0 allocs - //TODO could this be a helper method on prevAllocWatcher - drainableSvcs := map[jobKey]*runningJob{} - - // drainNow are allocs for batch or system jobs that should be - // drained due to a node deadline being reached - drainNow := map[jobKey]*runningJob{} - - // track number of "up" allocs per task group (not terminal and - // have a deployment status) - upPerTG := map[string]int{} - // Collect all drainable jobs - for nodeID, node := range nodes { - allocs, err := snapshot.AllocsByNode(nil, nodeID) - if err != nil { - //FIXME - panic(err) - } - - // drainableSys are allocs for system jobs that should be - // drained if there are no other allocs left - drainableSys := map[jobKey]*runningJob{} - - // track number of allocs left on this node to be drained - allocsLeft := false - inf, deadline := node.DrainStrategy.DeadlineTime() - deadlineReached := !inf && deadline.Before(now) - for _, alloc := range allocs { - // Don't need to consider drained allocs - if alloc.TerminalStatus() { - continue - } - - jobkey := jobKey{alloc.Namespace, alloc.JobID} - - // job does not found yet - job, err := snapshot.JobByID(nil, alloc.Namespace, alloc.JobID) - if err != nil { - //FIXME - panic(err) - } - - // IgnoreSystemJobs if specified in the node's DrainStrategy - if node.DrainStrategy.IgnoreSystemJobs && job.Type == structs.JobTypeSystem { - continue - } - - // When the node deadline is reached all batch - // and service jobs will be drained - if deadlineReached && job.Type != structs.JobTypeService { - n.logger.Printf("[TRACE] nomad.drain: draining alloc %s due to node %s reaching drain deadline", alloc.ID, node.ID) - if j, ok := drainNow[jobkey]; ok { - j.allocs = append(j.allocs, alloc) - } else { - // First alloc for this job, create entry - drainNow[jobkey] = &runningJob{ - job: job, - allocs: []*structs.Allocation{alloc}, - } - } - continue - } - - // If deadline hasn't been reached, system jobs - // may still be drained if there are no other - // allocs left - if !deadlineReached && job.Type == structs.JobTypeSystem { - n.logger.Printf("[TRACE] nomad.drain: system alloc %s will be drained if no other allocs on node %s", alloc.ID, node.ID) - if j, ok := drainableSys[jobkey]; ok { - j.allocs = append(j.allocs, alloc) - } else { - // First alloc for this job, create entry - drainableSys[jobkey] = &runningJob{ - job: job, - allocs: []*structs.Allocation{alloc}, - } - } - continue - } - - // This alloc is still running on a draining - // node, so treat the node as having allocs - // remaining - allocsLeft = true - - jobAllocs, err := snapshot.AllocsByJob(nil, alloc.Namespace, alloc.JobID, true) - if err != nil { - //FIXME - panic(err) - } - - // Count the number of down (terminal or nil deployment status) per task group - if job.Type == structs.JobTypeService { - num := 0 - for _, a := range jobAllocs { - if !a.TerminalStatus() && a.DeploymentStatus != nil { - // Not terminal and health updated, count it as up! - upPerTG[makeTaskGroupKey(a)]++ - num++ - } - } - n.logger.Printf("[TRACE] nomad.drain: job %s has %d allocs running", job.Name, num) - } - - drainableSvcs[jobkey] = &runningJob{ - job: job, - allocs: jobAllocs, - } - - jobWatcher.watch(jobkey, nodeID) - } - - // if node has no allocs or has hit its deadline, it's done draining! - if !allocsLeft || deadlineReached { - n.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain or has reached deadline", nodeID) - jobWatcher.nodeDone(nodeID) - doneNodes[nodeID] = node - - // Add all system jobs on this node to the drainNow slice - for k, sysj := range drainableSys { - if j, ok := drainNow[k]; ok { - // Job already has at least one alloc draining, append this one - j.allocs = append(j.allocs, sysj.allocs...) - } else { - // First draining alloc for this job, add the entry - drainNow[k] = sysj - } - } - } + result, err := n.collectDrainable(nodes, snapshot, jobWatcher, now) + if err != nil { + //FIXME + panic(err) } // stoplist are the allocations to migrate and their jobs to emit // evaluations for. Initialized with allocations that should be // immediately drained regardless of MaxParallel - stoplist := newStopAllocs(drainNow) + stoplist := newStopAllocs(result.drainNow) // build drain list considering deadline & max_parallel - for _, drainingJob := range drainableSvcs { - for _, alloc := range drainingJob.allocs { - // Already draining/dead allocs don't need to be drained - if alloc.TerminalStatus() { - continue - } - - node, ok := nodes[alloc.NodeID] - if !ok { - // Alloc's node is not draining so not elligible for draining! - continue - } - - tgKey := makeTaskGroupKey(alloc) - - if inf, d := node.DrainStrategy.DeadlineTime(); !inf && d.Before(now) { - n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to node's drain deadline", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6]) - // Alloc's Node has reached its deadline - stoplist.add(drainingJob.job, alloc) - upPerTG[tgKey]-- - - continue - } - - // Stop allocs with count=1, max_parallel==0, or draining 0 { if err := n.applyMigrations(stoplist); err != nil { @@ -469,7 +292,7 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore) } // Unset drain for nodes done draining - for nodeID, node := range doneNodes { + for nodeID, node := range result.doneNodes { if err := n.raft.NodeDrainComplete(nodeID); err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for: %v", err) //FIXME @@ -481,6 +304,202 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore) } } +// collectDrainable scans all nodes and allocs on draining nodes and builds a +// structure of eligible allocs to drain. +func (n *NodeDrainer) collectDrainable(nodes map[string]*structs.Node, state *state.StateSnapshot, + jobWatcher *jobWatcher, now time.Time) (*collectResult, error) { + + svcs := map[jobKey]*runningJob{} + drainNow := map[jobKey]*runningJob{} + upPerTG := map[string]int{} + doneNodes := map[string]*structs.Node{} + + for nodeID, node := range nodes { + allocs, err := state.AllocsByNode(nil, nodeID) + if err != nil { + return nil, err + } + + // drainableSys are allocs for system jobs that should be + // drained if there are no other allocs left + drainableSys := map[jobKey]*runningJob{} + + // track number of allocs left on this node to be drained + allocsLeft := false + inf, deadline := node.DrainStrategy.DeadlineTime() + deadlineReached := !inf && deadline.Before(now) + for _, alloc := range allocs { + // Don't need to consider drained allocs + if alloc.TerminalStatus() { + continue + } + + jobkey := jobKey{alloc.Namespace, alloc.JobID} + + // job does not found yet + job, err := state.JobByID(nil, alloc.Namespace, alloc.JobID) + if err != nil { + return nil, err + } + + // IgnoreSystemJobs if specified in the node's DrainStrategy + if node.DrainStrategy.IgnoreSystemJobs && job.Type == structs.JobTypeSystem { + continue + } + + // When the node deadline is reached all batch + // and service jobs will be drained + if deadlineReached && job.Type != structs.JobTypeService { + n.logger.Printf("[TRACE] nomad.drain: draining alloc %s due to node %s reaching drain deadline", alloc.ID, node.ID) + if j, ok := drainNow[jobkey]; ok { + j.allocs = append(j.allocs, alloc) + } else { + // First alloc for this job, create entry + drainNow[jobkey] = &runningJob{ + job: job, + allocs: []*structs.Allocation{alloc}, + } + } + continue + } + + // If deadline hasn't been reached, system jobs + // may still be drained if there are no other + // allocs left + if !deadlineReached && job.Type == structs.JobTypeSystem { + n.logger.Printf("[TRACE] nomad.drain: system alloc %s will be drained if no other allocs on node %s", alloc.ID, node.ID) + if j, ok := drainableSys[jobkey]; ok { + j.allocs = append(j.allocs, alloc) + } else { + // First alloc for this job, create entry + drainableSys[jobkey] = &runningJob{ + job: job, + allocs: []*structs.Allocation{alloc}, + } + } + continue + } + + // This alloc is still running on a draining + // node, so treat the node as having allocs + // remaining + allocsLeft = true + + jobAllocs, err := state.AllocsByJob(nil, alloc.Namespace, alloc.JobID, true) + if err != nil { + return nil, err + } + + // Count the number of down (terminal or nil deployment status) per task group + if job.Type == structs.JobTypeService { + num := 0 + for _, a := range jobAllocs { + if !a.TerminalStatus() && a.DeploymentStatus != nil { + // Not terminal and health updated, count it as up! + upPerTG[makeTaskGroupKey(a)]++ + num++ + } + } + n.logger.Printf("[TRACE] nomad.drain: job %s has %d allocs running", job.Name, num) + } + + svcs[jobkey] = &runningJob{ + job: job, + allocs: jobAllocs, + } + + jobWatcher.watch(jobkey, nodeID) + } + + // if node has no allocs or has hit its deadline, it's done draining! + if !allocsLeft || deadlineReached { + n.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain or has reached deadline", nodeID) + jobWatcher.nodeDone(nodeID) + doneNodes[nodeID] = node + + // Add all system jobs on this node to the drainNow slice + for k, sysj := range drainableSys { + if j, ok := drainNow[k]; ok { + // Job already has at least one alloc draining, append this one + j.allocs = append(j.allocs, sysj.allocs...) + } else { + // First draining alloc for this job, add the entry + drainNow[k] = sysj + } + } + } + } + + result := &collectResult{ + drainableSvcs: svcs, + drainNow: drainNow, + upPerTG: upPerTG, + doneNodes: doneNodes, + } + return result, nil +} + +// markMigrations marks services to be drained for migration in the stoplist. +func (n *NodeDrainer) markMigrations(stoplist *stopAllocs, upPerTG map[string]int, drainable map[jobKey]*runningJob, nodes map[string]*structs.Node, now time.Time) { + for _, drainingJob := range drainable { + for _, alloc := range drainingJob.allocs { + // Already draining/dead allocs don't need to be drained + if alloc.TerminalStatus() { + continue + } + + node, ok := nodes[alloc.NodeID] + if !ok { + // Alloc's node is not draining so not elligible for draining! + continue + } + + tgKey := makeTaskGroupKey(alloc) + + if inf, d := node.DrainStrategy.DeadlineTime(); !inf && d.Before(now) { + n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to node's drain deadline", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6]) + // Alloc's Node has reached its deadline + stoplist.add(drainingJob.job, alloc) + upPerTG[tgKey]-- + + continue + } + + // Stop allocs with count=1, max_parallel==0, or draining