refactor main drainloop into 2 more methods

This commit is contained in:
Michael Schurter
2018-02-28 20:59:41 -08:00
parent 1f73cd5d42
commit 478209807e

View File

@@ -29,21 +29,28 @@ type runningJob struct {
allocs []*structs.Allocation
}
// drainingAlloc contains a conservative deadline an alloc has to be healthy by
// before it should stopped being watched and replaced.
type drainingAlloc struct {
// LastModified+MigrateStrategy.HealthyDeadline
deadline time.Time
// collectResult is the state collected by scanning for drain eligible allocs
type collectResult struct {
// drainableSvcs contains all service jobs and allocs that are
// potentially drainable meaning they have at least one allocation on a
// draining node.
drainableSvcs map[jobKey]*runningJob
// Task Group key
tgKey string
}
// drainNow contains all batch and system jobs that should be
// immediately drained due to a deadline or in the case of system jobs:
// all other allocs on the node have completed draining.
drainNow map[jobKey]*runningJob
func newDrainingAlloc(a *structs.Allocation, deadline time.Time) drainingAlloc {
return drainingAlloc{
deadline: deadline,
tgKey: makeTaskGroupKey(a),
}
// upPerTG is a count of running allocs per task group for the
// migration mark phase to use when considering how many allocs can be
// migrated for a given group.
upPerTG map[string]int
// doneNodes need no coordinating to finish their drain. Either all
// allocs have drained, the node is being force drained, or the drain
// deadline was hit. Any remaining allocs will be migrated via
// drainNow.
doneNodes map[string]*structs.Node
}
// makeTaskGroupKey returns a unique key for an allocation's task group
@@ -107,10 +114,15 @@ type nodeDrainerState struct {
// should be called when a server establishes leadership and SetEnabled(false)
// called when leadership is lost.
type NodeDrainer struct {
// enabledCh is used by SetEnabled to signal Run when to start/stop the
// nodeDrainer goroutine
enabledCh chan nodeDrainerState
// raft is a shim around the raft messages necessary for draining
raft RaftApplier
// shutdownCh is closed when the Server is shutting down the
// NodeDrainer should permanently exit
shutdownCh <-chan struct{}
logger *log.Logger
@@ -249,9 +261,6 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore)
return
}
// Tracks nodes that are done draining
doneNodes := map[string]*structs.Node{}
// Capture state (statestore and time) to do consistent comparisons
snapshot, err := state.Snapshot()
if err != nil {
@@ -260,206 +269,20 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore)
}
now := time.Now()
// job key -> {job, allocs}
// Collect all allocs for all jobs with at least one
// non-terminal alloc on a draining node.
// Invariants:
// - Only service jobs
// - No entries with 0 allocs
//TODO could this be a helper method on prevAllocWatcher
drainableSvcs := map[jobKey]*runningJob{}
// drainNow are allocs for batch or system jobs that should be
// drained due to a node deadline being reached
drainNow := map[jobKey]*runningJob{}
// track number of "up" allocs per task group (not terminal and
// have a deployment status)
upPerTG := map[string]int{}
// Collect all drainable jobs
for nodeID, node := range nodes {
allocs, err := snapshot.AllocsByNode(nil, nodeID)
if err != nil {
//FIXME
panic(err)
}
// drainableSys are allocs for system jobs that should be
// drained if there are no other allocs left
drainableSys := map[jobKey]*runningJob{}
// track number of allocs left on this node to be drained
allocsLeft := false
inf, deadline := node.DrainStrategy.DeadlineTime()
deadlineReached := !inf && deadline.Before(now)
for _, alloc := range allocs {
// Don't need to consider drained allocs
if alloc.TerminalStatus() {
continue
}
jobkey := jobKey{alloc.Namespace, alloc.JobID}
// job does not found yet
job, err := snapshot.JobByID(nil, alloc.Namespace, alloc.JobID)
if err != nil {
//FIXME
panic(err)
}
// IgnoreSystemJobs if specified in the node's DrainStrategy
if node.DrainStrategy.IgnoreSystemJobs && job.Type == structs.JobTypeSystem {
continue
}
// When the node deadline is reached all batch
// and service jobs will be drained
if deadlineReached && job.Type != structs.JobTypeService {
n.logger.Printf("[TRACE] nomad.drain: draining alloc %s due to node %s reaching drain deadline", alloc.ID, node.ID)
if j, ok := drainNow[jobkey]; ok {
j.allocs = append(j.allocs, alloc)
} else {
// First alloc for this job, create entry
drainNow[jobkey] = &runningJob{
job: job,
allocs: []*structs.Allocation{alloc},
}
}
continue
}
// If deadline hasn't been reached, system jobs
// may still be drained if there are no other
// allocs left
if !deadlineReached && job.Type == structs.JobTypeSystem {
n.logger.Printf("[TRACE] nomad.drain: system alloc %s will be drained if no other allocs on node %s", alloc.ID, node.ID)
if j, ok := drainableSys[jobkey]; ok {
j.allocs = append(j.allocs, alloc)
} else {
// First alloc for this job, create entry
drainableSys[jobkey] = &runningJob{
job: job,
allocs: []*structs.Allocation{alloc},
}
}
continue
}
// This alloc is still running on a draining
// node, so treat the node as having allocs
// remaining
allocsLeft = true
jobAllocs, err := snapshot.AllocsByJob(nil, alloc.Namespace, alloc.JobID, true)
if err != nil {
//FIXME
panic(err)
}
// Count the number of down (terminal or nil deployment status) per task group
if job.Type == structs.JobTypeService {
num := 0
for _, a := range jobAllocs {
if !a.TerminalStatus() && a.DeploymentStatus != nil {
// Not terminal and health updated, count it as up!
upPerTG[makeTaskGroupKey(a)]++
num++
}
}
n.logger.Printf("[TRACE] nomad.drain: job %s has %d allocs running", job.Name, num)
}
drainableSvcs[jobkey] = &runningJob{
job: job,
allocs: jobAllocs,
}
jobWatcher.watch(jobkey, nodeID)
}
// if node has no allocs or has hit its deadline, it's done draining!
if !allocsLeft || deadlineReached {
n.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain or has reached deadline", nodeID)
jobWatcher.nodeDone(nodeID)
doneNodes[nodeID] = node
// Add all system jobs on this node to the drainNow slice
for k, sysj := range drainableSys {
if j, ok := drainNow[k]; ok {
// Job already has at least one alloc draining, append this one
j.allocs = append(j.allocs, sysj.allocs...)
} else {
// First draining alloc for this job, add the entry
drainNow[k] = sysj
}
}
}
result, err := n.collectDrainable(nodes, snapshot, jobWatcher, now)
if err != nil {
//FIXME
panic(err)
}
// stoplist are the allocations to migrate and their jobs to emit
// evaluations for. Initialized with allocations that should be
// immediately drained regardless of MaxParallel
stoplist := newStopAllocs(drainNow)
stoplist := newStopAllocs(result.drainNow)
// build drain list considering deadline & max_parallel
for _, drainingJob := range drainableSvcs {
for _, alloc := range drainingJob.allocs {
// Already draining/dead allocs don't need to be drained
if alloc.TerminalStatus() {
continue
}
node, ok := nodes[alloc.NodeID]
if !ok {
// Alloc's node is not draining so not elligible for draining!
continue
}
tgKey := makeTaskGroupKey(alloc)
if inf, d := node.DrainStrategy.DeadlineTime(); !inf && d.Before(now) {
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to node's drain deadline", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
// Alloc's Node has reached its deadline
stoplist.add(drainingJob.job, alloc)
upPerTG[tgKey]--
continue
}
// Stop allocs with count=1, max_parallel==0, or draining<max_parallel
tg := drainingJob.job.LookupTaskGroup(alloc.TaskGroup)
//FIXME tg==nil here?
// Only 1, drain
if tg.Count == 1 {
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to count=1", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
stoplist.add(drainingJob.job, alloc)
continue
}
// No migrate strategy or a max parallel of 0 mean force draining
if tg.Migrate == nil || tg.Migrate.MaxParallel == 0 {
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to force drain", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
stoplist.add(drainingJob.job, alloc)
continue
}
n.logger.Printf("[TRACE] nomad.drain: considering job %s alloc %s count %d maxp %d up %d",
drainingJob.job.Name, alloc.ID[:6], tg.Count, tg.Migrate.MaxParallel, upPerTG[tgKey])
// Count - MaxParalell = minimum number of allocations that must be "up"
minUp := (tg.Count - tg.Migrate.MaxParallel)
// If minimum is < the current number up it is safe to stop one.
if minUp < upPerTG[tgKey] {
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to max parallel", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
// More migrations are allowed, add to stoplist
stoplist.add(drainingJob.job, alloc)
upPerTG[tgKey]--
}
}
}
n.markMigrations(stoplist, result.upPerTG, result.drainableSvcs, nodes, now)
if len(stoplist.allocBatch) > 0 {
if err := n.applyMigrations(stoplist); err != nil {
@@ -469,7 +292,7 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore)
}
// Unset drain for nodes done draining
for nodeID, node := range doneNodes {
for nodeID, node := range result.doneNodes {
if err := n.raft.NodeDrainComplete(nodeID); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for: %v", err)
//FIXME
@@ -481,6 +304,202 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore)
}
}
// collectDrainable scans all nodes and allocs on draining nodes and builds a
// structure of eligible allocs to drain.
func (n *NodeDrainer) collectDrainable(nodes map[string]*structs.Node, state *state.StateSnapshot,
jobWatcher *jobWatcher, now time.Time) (*collectResult, error) {
svcs := map[jobKey]*runningJob{}
drainNow := map[jobKey]*runningJob{}
upPerTG := map[string]int{}
doneNodes := map[string]*structs.Node{}
for nodeID, node := range nodes {
allocs, err := state.AllocsByNode(nil, nodeID)
if err != nil {
return nil, err
}
// drainableSys are allocs for system jobs that should be
// drained if there are no other allocs left
drainableSys := map[jobKey]*runningJob{}
// track number of allocs left on this node to be drained
allocsLeft := false
inf, deadline := node.DrainStrategy.DeadlineTime()
deadlineReached := !inf && deadline.Before(now)
for _, alloc := range allocs {
// Don't need to consider drained allocs
if alloc.TerminalStatus() {
continue
}
jobkey := jobKey{alloc.Namespace, alloc.JobID}
// job does not found yet
job, err := state.JobByID(nil, alloc.Namespace, alloc.JobID)
if err != nil {
return nil, err
}
// IgnoreSystemJobs if specified in the node's DrainStrategy
if node.DrainStrategy.IgnoreSystemJobs && job.Type == structs.JobTypeSystem {
continue
}
// When the node deadline is reached all batch
// and service jobs will be drained
if deadlineReached && job.Type != structs.JobTypeService {
n.logger.Printf("[TRACE] nomad.drain: draining alloc %s due to node %s reaching drain deadline", alloc.ID, node.ID)
if j, ok := drainNow[jobkey]; ok {
j.allocs = append(j.allocs, alloc)
} else {
// First alloc for this job, create entry
drainNow[jobkey] = &runningJob{
job: job,
allocs: []*structs.Allocation{alloc},
}
}
continue
}
// If deadline hasn't been reached, system jobs
// may still be drained if there are no other
// allocs left
if !deadlineReached && job.Type == structs.JobTypeSystem {
n.logger.Printf("[TRACE] nomad.drain: system alloc %s will be drained if no other allocs on node %s", alloc.ID, node.ID)
if j, ok := drainableSys[jobkey]; ok {
j.allocs = append(j.allocs, alloc)
} else {
// First alloc for this job, create entry
drainableSys[jobkey] = &runningJob{
job: job,
allocs: []*structs.Allocation{alloc},
}
}
continue
}
// This alloc is still running on a draining
// node, so treat the node as having allocs
// remaining
allocsLeft = true
jobAllocs, err := state.AllocsByJob(nil, alloc.Namespace, alloc.JobID, true)
if err != nil {
return nil, err
}
// Count the number of down (terminal or nil deployment status) per task group
if job.Type == structs.JobTypeService {
num := 0
for _, a := range jobAllocs {
if !a.TerminalStatus() && a.DeploymentStatus != nil {
// Not terminal and health updated, count it as up!
upPerTG[makeTaskGroupKey(a)]++
num++
}
}
n.logger.Printf("[TRACE] nomad.drain: job %s has %d allocs running", job.Name, num)
}
svcs[jobkey] = &runningJob{
job: job,
allocs: jobAllocs,
}
jobWatcher.watch(jobkey, nodeID)
}
// if node has no allocs or has hit its deadline, it's done draining!
if !allocsLeft || deadlineReached {
n.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain or has reached deadline", nodeID)
jobWatcher.nodeDone(nodeID)
doneNodes[nodeID] = node
// Add all system jobs on this node to the drainNow slice
for k, sysj := range drainableSys {
if j, ok := drainNow[k]; ok {
// Job already has at least one alloc draining, append this one
j.allocs = append(j.allocs, sysj.allocs...)
} else {
// First draining alloc for this job, add the entry
drainNow[k] = sysj
}
}
}
}
result := &collectResult{
drainableSvcs: svcs,
drainNow: drainNow,
upPerTG: upPerTG,
doneNodes: doneNodes,
}
return result, nil
}
// markMigrations marks services to be drained for migration in the stoplist.
func (n *NodeDrainer) markMigrations(stoplist *stopAllocs, upPerTG map[string]int, drainable map[jobKey]*runningJob, nodes map[string]*structs.Node, now time.Time) {
for _, drainingJob := range drainable {
for _, alloc := range drainingJob.allocs {
// Already draining/dead allocs don't need to be drained
if alloc.TerminalStatus() {
continue
}
node, ok := nodes[alloc.NodeID]
if !ok {
// Alloc's node is not draining so not elligible for draining!
continue
}
tgKey := makeTaskGroupKey(alloc)
if inf, d := node.DrainStrategy.DeadlineTime(); !inf && d.Before(now) {
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to node's drain deadline", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
// Alloc's Node has reached its deadline
stoplist.add(drainingJob.job, alloc)
upPerTG[tgKey]--
continue
}
// Stop allocs with count=1, max_parallel==0, or draining<max_parallel
tg := drainingJob.job.LookupTaskGroup(alloc.TaskGroup)
//FIXME tg==nil here?
// Only 1, drain
if tg.Count == 1 {
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to count=1", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
stoplist.add(drainingJob.job, alloc)
continue
}
// No migrate strategy or a max parallel of 0 mean force draining
if tg.Migrate == nil || tg.Migrate.MaxParallel == 0 {
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to force drain", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
stoplist.add(drainingJob.job, alloc)
continue
}
n.logger.Printf("[TRACE] nomad.drain: considering job %s alloc %s count %d maxp %d up %d",
drainingJob.job.Name, alloc.ID[:6], tg.Count, tg.Migrate.MaxParallel, upPerTG[tgKey])
// Count - MaxParalell = minimum number of allocations that must be "up"
minUp := (tg.Count - tg.Migrate.MaxParallel)
// If minimum is < the current number up it is safe to stop one.
if minUp < upPerTG[tgKey] {
n.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to max parallel", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
// More migrations are allowed, add to stoplist
stoplist.add(drainingJob.job, alloc)
upPerTG[tgKey]--
}
}
}
}
// applyMigrations applies the specified allocation migrations along with their
// evaluations to raft.
func (n *NodeDrainer) applyMigrations(stoplist *stopAllocs) error {