diff --git a/nomad/drainer/drain.go b/nomad/drainer/drain.go index e52a735aa..4f2b73556 100644 --- a/nomad/drainer/drain.go +++ b/nomad/drainer/drain.go @@ -51,7 +51,8 @@ func makeTaskGroupKey(a *structs.Allocation) string { return strings.Join([]string{a.Namespace, a.JobID, a.TaskGroup}, "-") } -// stopAllocs tracks allocs to drain by a unique TG key +// stopAllocs tracks allocs to drain by a unique TG key along with their jobs +// as we need to emit evaluations for each allocations job type stopAllocs struct { allocBatch map[string]*structs.DesiredTransition @@ -59,6 +60,25 @@ type stopAllocs struct { jobBatch map[jobKey]*structs.Job } +// newStopAllocs creates a list of allocs to migrate from an initial list of +// running jobs+allocs that need immediate draining. +func newStopAllocs(initial map[jobKey]*runningJob) *stopAllocs { + s := &stopAllocs{ + allocBatch: make(map[string]*structs.DesiredTransition), + jobBatch: make(map[jobKey]*structs.Job), + } + + // Add initial allocs + for _, drainingJob := range initial { + for _, a := range drainingJob.allocs { + s.add(drainingJob.job, a) + } + } + return s +} + +// add an allocation to be migrated. Its job must also be specified in order to +// emit an evaluation. func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) { // Add the desired migration transition to the batch s.allocBatch[a.ID] = &structs.DesiredTransition{ @@ -203,11 +223,6 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore) go jobWatcher.run(ctx) for { - //TODO this method of async node updates means we could make - //migration decisions on out of date information. the worst - //possible outcome of this is that an allocation could be - //stopped on a node that recently had its drain cancelled which - //doesn't seem like that bad of a pathological case n.logger.Printf("[TRACE] nomad.drain: LOOP next deadline: %s (%s)", nextDeadline, time.Until(nextDeadline)) select { case nodes = <-nodeWatcher.nodesCh: @@ -383,18 +398,9 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore) } // stoplist are the allocations to migrate and their jobs to emit - // evaluations for - stoplist := &stopAllocs{ - allocBatch: make(map[string]*structs.DesiredTransition), - jobBatch: make(map[jobKey]*structs.Job), - } - - // Immediately drain all allocs in drainNow - for _, drainingJob := range drainNow { - for _, a := range drainingJob.allocs { - stoplist.add(drainingJob.job, a) - } - } + // evaluations for. Initialized with allocations that should be + // immediately drained regardless of MaxParallel + stoplist := newStopAllocs(drainNow) // build drain list considering deadline & max_parallel for _, drainingJob := range drainableSvcs { @@ -456,29 +462,7 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore) } if len(stoplist.allocBatch) > 0 { - n.logger.Printf("[DEBUG] nomad.drain: stopping %d alloc(s) for %d job(s)", len(stoplist.allocBatch), len(stoplist.jobBatch)) - - for id, _ := range stoplist.allocBatch { - n.logger.Printf("[TRACE] nomad.drain: migrating alloc %s", id[:6]) - } - - // Reevaluate affected jobs - evals := make([]*structs.Evaluation, 0, len(stoplist.jobBatch)) - for _, job := range stoplist.jobBatch { - evals = append(evals, &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: job.Namespace, - Priority: job.Priority, - Type: job.Type, - TriggeredBy: structs.EvalTriggerNodeDrain, - JobID: job.ID, - JobModifyIndex: job.ModifyIndex, - Status: structs.EvalStatusPending, - }) - } - - // Commit this update via Raft - if err := n.raft.AllocUpdateDesiredTransition(stoplist.allocBatch, evals); err != nil { + if err := n.applyMigrations(stoplist); err != nil { //FIXME panic(err) } @@ -497,6 +481,33 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore) } } +// applyMigrations applies the specified allocation migrations along with their +// evaluations to raft. +func (n *NodeDrainer) applyMigrations(stoplist *stopAllocs) error { + n.logger.Printf("[DEBUG] nomad.drain: stopping %d alloc(s) for %d job(s)", len(stoplist.allocBatch), len(stoplist.jobBatch)) + + for id, _ := range stoplist.allocBatch { + n.logger.Printf("[TRACE] nomad.drain: migrating alloc %s", id[:6]) + } + // Reevaluate affected jobs + evals := make([]*structs.Evaluation, 0, len(stoplist.jobBatch)) + for _, job := range stoplist.jobBatch { + evals = append(evals, &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: job.Namespace, + Priority: job.Priority, + Type: job.Type, + TriggeredBy: structs.EvalTriggerNodeDrain, + JobID: job.ID, + JobModifyIndex: job.ModifyIndex, + Status: structs.EvalStatusPending, + }) + } + + // Commit this update via Raft + return n.raft.AllocUpdateDesiredTransition(stoplist.allocBatch, evals) +} + // initDrainer initializes the node drainer state and returns a list of // draining nodes as well as allocs that are draining that should be watched // for a replacement. diff --git a/nomad/drainer/drain_test.go b/nomad/drainer/drain_test.go index 8361a5659..f92f2503e 100644 --- a/nomad/drainer/drain_test.go +++ b/nomad/drainer/drain_test.go @@ -271,6 +271,8 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) { t.Logf("job: %s node: %s alloc: %s desired_status: %s desired_transition: %s actual: %s replaces: %s", alloc.Job.Name, alloc.NodeID[:6], alloc.ID[:6], alloc.DesiredStatus, pretty.Sprint(alloc.DesiredTransition.Migrate), alloc.ClientStatus, alloc.PreviousAllocation) } + + t.Logf("==> PASS") } // TestNodeDrainer_SystemDrain asserts system jobs are drained @@ -484,4 +486,6 @@ func TestNodeDrainer_SystemDrain(t *testing.T) { t.Logf("job: %s node: %s alloc: %s desired_status: %s desired_transition: %s actual: %s replaces: %s", alloc.Job.Name, alloc.NodeID[:6], alloc.ID[:6], alloc.DesiredStatus, pretty.Sprint(alloc.DesiredTransition.Migrate), alloc.ClientStatus, alloc.PreviousAllocation) } + + t.Logf("==> PASS") }