drainer: refactor newStopAllocs, applyMigrations

This commit is contained in:
Michael Schurter
2018-02-28 16:42:29 -08:00
parent 3ca9cdfadc
commit 1f73cd5d42
2 changed files with 56 additions and 41 deletions

View File

@@ -51,7 +51,8 @@ func makeTaskGroupKey(a *structs.Allocation) string {
return strings.Join([]string{a.Namespace, a.JobID, a.TaskGroup}, "-")
}
// stopAllocs tracks allocs to drain by a unique TG key
// stopAllocs tracks allocs to drain by a unique TG key along with their jobs
// as we need to emit evaluations for each allocations job
type stopAllocs struct {
allocBatch map[string]*structs.DesiredTransition
@@ -59,6 +60,25 @@ type stopAllocs struct {
jobBatch map[jobKey]*structs.Job
}
// newStopAllocs creates a list of allocs to migrate from an initial list of
// running jobs+allocs that need immediate draining.
func newStopAllocs(initial map[jobKey]*runningJob) *stopAllocs {
s := &stopAllocs{
allocBatch: make(map[string]*structs.DesiredTransition),
jobBatch: make(map[jobKey]*structs.Job),
}
// Add initial allocs
for _, drainingJob := range initial {
for _, a := range drainingJob.allocs {
s.add(drainingJob.job, a)
}
}
return s
}
// add an allocation to be migrated. Its job must also be specified in order to
// emit an evaluation.
func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) {
// Add the desired migration transition to the batch
s.allocBatch[a.ID] = &structs.DesiredTransition{
@@ -203,11 +223,6 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore)
go jobWatcher.run(ctx)
for {
//TODO this method of async node updates means we could make
//migration decisions on out of date information. the worst
//possible outcome of this is that an allocation could be
//stopped on a node that recently had its drain cancelled which
//doesn't seem like that bad of a pathological case
n.logger.Printf("[TRACE] nomad.drain: LOOP next deadline: %s (%s)", nextDeadline, time.Until(nextDeadline))
select {
case nodes = <-nodeWatcher.nodesCh:
@@ -383,18 +398,9 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore)
}
// stoplist are the allocations to migrate and their jobs to emit
// evaluations for
stoplist := &stopAllocs{
allocBatch: make(map[string]*structs.DesiredTransition),
jobBatch: make(map[jobKey]*structs.Job),
}
// Immediately drain all allocs in drainNow
for _, drainingJob := range drainNow {
for _, a := range drainingJob.allocs {
stoplist.add(drainingJob.job, a)
}
}
// evaluations for. Initialized with allocations that should be
// immediately drained regardless of MaxParallel
stoplist := newStopAllocs(drainNow)
// build drain list considering deadline & max_parallel
for _, drainingJob := range drainableSvcs {
@@ -456,29 +462,7 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore)
}
if len(stoplist.allocBatch) > 0 {
n.logger.Printf("[DEBUG] nomad.drain: stopping %d alloc(s) for %d job(s)", len(stoplist.allocBatch), len(stoplist.jobBatch))
for id, _ := range stoplist.allocBatch {
n.logger.Printf("[TRACE] nomad.drain: migrating alloc %s", id[:6])
}
// Reevaluate affected jobs
evals := make([]*structs.Evaluation, 0, len(stoplist.jobBatch))
for _, job := range stoplist.jobBatch {
evals = append(evals, &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: job.ID,
JobModifyIndex: job.ModifyIndex,
Status: structs.EvalStatusPending,
})
}
// Commit this update via Raft
if err := n.raft.AllocUpdateDesiredTransition(stoplist.allocBatch, evals); err != nil {
if err := n.applyMigrations(stoplist); err != nil {
//FIXME
panic(err)
}
@@ -497,6 +481,33 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore)
}
}
// applyMigrations applies the specified allocation migrations along with their
// evaluations to raft.
func (n *NodeDrainer) applyMigrations(stoplist *stopAllocs) error {
n.logger.Printf("[DEBUG] nomad.drain: stopping %d alloc(s) for %d job(s)", len(stoplist.allocBatch), len(stoplist.jobBatch))
for id, _ := range stoplist.allocBatch {
n.logger.Printf("[TRACE] nomad.drain: migrating alloc %s", id[:6])
}
// Reevaluate affected jobs
evals := make([]*structs.Evaluation, 0, len(stoplist.jobBatch))
for _, job := range stoplist.jobBatch {
evals = append(evals, &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: job.ID,
JobModifyIndex: job.ModifyIndex,
Status: structs.EvalStatusPending,
})
}
// Commit this update via Raft
return n.raft.AllocUpdateDesiredTransition(stoplist.allocBatch, evals)
}
// initDrainer initializes the node drainer state and returns a list of
// draining nodes as well as allocs that are draining that should be watched
// for a replacement.

View File

@@ -271,6 +271,8 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
t.Logf("job: %s node: %s alloc: %s desired_status: %s desired_transition: %s actual: %s replaces: %s",
alloc.Job.Name, alloc.NodeID[:6], alloc.ID[:6], alloc.DesiredStatus, pretty.Sprint(alloc.DesiredTransition.Migrate), alloc.ClientStatus, alloc.PreviousAllocation)
}
t.Logf("==> PASS")
}
// TestNodeDrainer_SystemDrain asserts system jobs are drained
@@ -484,4 +486,6 @@ func TestNodeDrainer_SystemDrain(t *testing.T) {
t.Logf("job: %s node: %s alloc: %s desired_status: %s desired_transition: %s actual: %s replaces: %s",
alloc.Job.Name, alloc.NodeID[:6], alloc.ID[:6], alloc.DesiredStatus, pretty.Sprint(alloc.DesiredTransition.Migrate), alloc.ClientStatus, alloc.PreviousAllocation)
}
t.Logf("==> PASS")
}