From 8217ebf11e2b7585bef61b142550fa9bd3dfcb0e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 9 Mar 2018 16:25:46 -0800 Subject: [PATCH] drainer: RegisterJob -> RegisterJobs Test job watcher --- nomad/drainer/drainer.go | 1 - nomad/drainer/watch_jobs.go | 38 +- nomad/drainer/watch_jobs_test.go | 796 ++++++++++++++++++++----------- nomad/drainer/watch_nodes.go | 4 +- nomad/structs/structs.go | 4 +- 5 files changed, 546 insertions(+), 297 deletions(-) diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index 98c52479a..46dcad696 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -332,7 +332,6 @@ func (n *NodeDrainer) batchDrainAllocs(allocs []*structs.Allocation) (uint64, er } n.batcher.Unlock() - // Wait for the future if err := future.Wait(); err != nil { return 0, err } diff --git a/nomad/drainer/watch_jobs.go b/nomad/drainer/watch_jobs.go index 714bac2b7..61a615646 100644 --- a/nomad/drainer/watch_jobs.go +++ b/nomad/drainer/watch_jobs.go @@ -29,7 +29,7 @@ func NewDrainRequest(allocs []*structs.Allocation) *DrainRequest { // DrainingJobWatcher is the interface for watching a job drain type DrainingJobWatcher interface { // RegisterJob is used to start watching a draining job - RegisterJob(job structs.JobNs) + RegisterJobs(job []structs.JobNs) // Drain is used to emit allocations that should be drained. Drain() <-chan *DrainRequest @@ -90,21 +90,28 @@ func NewDrainingJobWatcher(ctx context.Context, limiter *rate.Limiter, state *st } // RegisterJob marks the given job as draining and adds it to being watched. -func (w *drainingJobWatcher) RegisterJob(job structs.JobNs) { +func (w *drainingJobWatcher) RegisterJobs(jobs []structs.JobNs) { w.l.Lock() defer w.l.Unlock() - if _, ok := w.jobs[job]; ok { - return + updated := false + for _, jns := range jobs { + if _, ok := w.jobs[jns]; ok { + continue + } + + // Add the job and cancel the context + w.logger.Printf("[TRACE] nomad.drain.job_watcher: registering job %v", jns) + w.jobs[jns] = struct{}{} + updated = true } - // Add the job and cancel the context - w.logger.Printf("[TRACE] nomad.drain.job_watcher: registering job %v", job) - w.jobs[job] = struct{}{} - w.queryCancel() + if updated { + w.queryCancel() - // Create a new query context - w.queryCtx, w.queryCancel = context.WithCancel(w.ctx) + // Create a new query context + w.queryCtx, w.queryCancel = context.WithCancel(w.ctx) + } } // Drain returns the channel that emits allocations to drain. @@ -160,7 +167,6 @@ func (w *drainingJobWatcher) watch() { } } - // update index for next run lastHandled := waitIndex waitIndex = index @@ -184,7 +190,7 @@ func (w *drainingJobWatcher) watch() { // Lookup the job job, err := w.state.JobByID(nil, jns.Namespace, jns.ID) - if err != nil { + if err != nil || job == nil { w.logger.Printf("[WARN] nomad.drain.job_watcher: failed to lookup job %v: %v", jns, err) continue } @@ -268,7 +274,8 @@ type jobResult struct { done bool } -// newJobResult returns an initialized jobResult +// newJobResult returns a jobResult with done=true. It is the responsibility of +// callers to set done=false when a remaining drainable alloc is found. func newJobResult() *jobResult { return &jobResult{ done: true, @@ -390,10 +397,13 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup, numToDrain := healthy - thresholdCount numToDrain = helper.IntMin(len(drainable), numToDrain) if numToDrain <= 0 { - fmt.Printf("------- Not draining any allocs\n") + fmt.Printf("------- Not draining any allocs: drainable:%d healthy:%d thresholdCount:%d\n", + len(drainable), healthy, thresholdCount) return nil } + fmt.Printf("------- DRAINing allocs: n: %d drainable:%d healthy:%d thresholdCount:%d\n", + numToDrain, len(drainable), healthy, thresholdCount) result.drain = append(result.drain, drainable[0:numToDrain]...) return nil } diff --git a/nomad/drainer/watch_jobs_test.go b/nomad/drainer/watch_jobs_test.go index 3db5ea0ac..078e5316e 100644 --- a/nomad/drainer/watch_jobs_test.go +++ b/nomad/drainer/watch_jobs_test.go @@ -2,7 +2,6 @@ package drainer import ( "context" - "fmt" "testing" "time" @@ -11,309 +10,552 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/time/rate" ) -func testDrainingJobWatcher(t *testing.T) (*drainingJobWatcher, *state.StateStore) { - t.Helper() - - state := state.TestStateStore(t) - limiter := rate.NewLimiter(100.0, 100) - logger := testlog.Logger(t) - w := NewDrainingJobWatcher(context.Background(), limiter, state, logger) - return w, state -} - -func TestDrainingJobWatcher_Interface(t *testing.T) { - t.Parallel() - require := require.New(t) - w, _ := testDrainingJobWatcher(t) - require.Implements((*DrainingJobWatcher)(nil), w) -} - -// DrainingJobWatcher tests: -// TODO Test that several jobs allocation changes get batched -// TODO Test that jobs are deregistered when they have no more to migrate -// TODO Test that the watcher gets triggered on alloc changes -// TODO Test that the watcher cancels its query when a new job is registered - -func TestHandleTaskGroup_AllDone(t *testing.T) { - t.Parallel() - require := require.New(t) +func testNodes(t *testing.T, state *state.StateStore) (drainingNode, runningNode *structs.Node) { + n1 := mock.Node() + n1.Name = "draining" + n1.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: time.Minute, + }, + ForceDeadline: time.Now().Add(time.Minute), + } + require.Nil(t, state.UpsertNode(100, n1)) // Create a non-draining node - state := state.TestStateStore(t) - n := mock.Node() - require.Nil(state.UpsertNode(100, n)) - - job := mock.Job() - require.Nil(state.UpsertJob(101, job)) - - // Create 10 running allocs on the healthy node - var allocs []*structs.Allocation - for i := 0; i < 10; i++ { - a := mock.Alloc() - a.Job = job - a.TaskGroup = job.TaskGroups[0].Name - a.NodeID = n.ID - a.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(false), - } - allocs = append(allocs, a) - } - require.Nil(state.UpsertAllocs(102, allocs)) - - snap, err := state.Snapshot() - require.Nil(err) - - res := &jobResult{} - require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res)) - require.Empty(res.drain) - require.Empty(res.migrated) - require.True(res.done) + n2 := mock.Node() + n2.Name = "running" + require.Nil(t, state.UpsertNode(101, n2)) + return n1, n2 } -func TestHandleTaskGroup_AllOnDrainingNodes(t *testing.T) { +func testDrainingJobWatcher(t *testing.T, state *state.StateStore) (*drainingJobWatcher, context.CancelFunc) { + t.Helper() + + limiter := rate.NewLimiter(100.0, 100) + logger := testlog.Logger(t) + ctx, cancel := context.WithCancel(context.Background()) + w := NewDrainingJobWatcher(ctx, limiter, state, logger) + return w, cancel +} + +// TestDrainingJobWatcher_Interface is a compile-time assertion that we +// implement the intended interface. +func TestDrainingJobWatcher_Interface(t *testing.T) { + w, cancel := testDrainingJobWatcher(t, state.TestStateStore(t)) + cancel() + var _ DrainingJobWatcher = w +} + +// TestDrainingJobWatcher_DrainJobs asserts DrainingJobWatcher batches +// allocation changes from multiple jobs. +func TestDrainingJobWatcher_DrainJobs(t *testing.T) { t.Parallel() require := require.New(t) - // The loop value sets the max parallel for the drain strategy - for i := 1; i < 8; i++ { - // Create a draining node - state := state.TestStateStore(t) - n := mock.Node() - n.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: 5 * time.Minute, - }, - ForceDeadline: time.Now().Add(1 * time.Minute), - } - require.Nil(state.UpsertNode(100, n)) + state := state.TestStateStore(t) + jobWatcher, cancelWatcher := testDrainingJobWatcher(t, state) + defer cancelWatcher() + drainingNode, runningNode := testNodes(t, state) + var index uint64 = 101 + count := 8 + + newAlloc := func(node *structs.Node, job *structs.Job) *structs.Allocation { + a := mock.Alloc() + a.JobID = job.ID + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + a.NodeID = node.ID + return a + } + + // 2 jobs with count 10, max parallel 3 + jnss := make([]structs.JobNs, 2) + jobs := make([]*structs.Job, 2) + for i := 0; i < 2; i++ { job := mock.Job() - job.TaskGroups[0].Migrate.MaxParallel = i - require.Nil(state.UpsertJob(101, job)) + jobs[i] = job + jnss[i] = structs.NewJobNs(job.Namespace, job.ID) + job.TaskGroups[0].Migrate.MaxParallel = 3 + job.TaskGroups[0].Count = count + require.Nil(state.UpsertJob(index, job)) + index++ - // Create 10 running allocs on the draining node var allocs []*structs.Allocation - for i := 0; i < 10; i++ { - a := mock.Alloc() - a.Job = job - a.TaskGroup = job.TaskGroups[0].Name - a.NodeID = n.ID + for i := 0; i < count; i++ { + a := newAlloc(drainingNode, job) a.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(false), + Healthy: helper.BoolToPtr(true), } allocs = append(allocs, a) } - require.Nil(state.UpsertAllocs(102, allocs)) - snap, err := state.Snapshot() - require.Nil(err) + require.Nil(state.UpsertAllocs(index, allocs)) + index++ - res := &jobResult{} - require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res)) - require.Len(res.drain, i) - require.Empty(res.migrated) - require.False(res.done) + } + + // Only register jobs with watcher after creating all data models as + // once the watcher starts we need to track the index carefully for + // updating the batch future + jobWatcher.RegisterJobs(jnss) + + // assertOps asserts how many allocs should be drained and migrated. + // The drains and migrations - if any - are returned. + assertOps := func(drained, migrated int) (drains *DrainRequest, migrations []*structs.Allocation) { + t.Helper() + var drainsChecked, migrationsChecked bool + for { + select { + case drains = <-jobWatcher.Drain(): + ids := make([]string, len(drains.Allocs)) + for i, a := range drains.Allocs { + ids[i] = a.JobID[:6] + ":" + a.ID[:6] + } + t.Logf("draining %d allocs: %v", len(ids), ids) + require.False(drainsChecked, "drains already received") + drainsChecked = true + require.Lenf(drains.Allocs, drained, + "expected %d drains but found %d", drained, len(drains.Allocs)) + case migrations = <-jobWatcher.Migrated(): + ids := make([]string, len(migrations)) + for i, a := range migrations { + ids[i] = a.JobID[:6] + ":" + a.ID[:6] + } + t.Logf("migrating %d allocs: %v", len(ids), ids) + require.False(migrationsChecked, "migrations already received") + migrationsChecked = true + require.Lenf(migrations, migrated, + "expected %d migrations but found %d", migrated, len(migrations)) + case <-time.After(10 * time.Millisecond): + if !drainsChecked && drained > 0 { + t.Fatalf("expected %d drains but none happened", drained) + } + if !migrationsChecked && migrated > 0 { + t.Fatalf("expected %d migrations but none happened", migrated) + } + return drains, migrations + } + } + } + + // Expect a first batch of MaxParallel allocs from each job + drains, _ := assertOps(6, 0) + + // Fake migrating the drained allocs by starting new ones and stopping + // the old ones + drainedAllocs := make([]*structs.Allocation, len(drains.Allocs)) + for i, a := range drains.Allocs { + a.DesiredTransition.Migrate = helper.BoolToPtr(true) + + // create a copy so we can reuse this slice + drainedAllocs[i] = a.Copy() + } + require.Nil(state.UpsertAllocs(index, drainedAllocs)) + drains.Resp.Respond(index, nil) + index++ + + // Just setting ShouldMigrate should not cause any further drains + assertOps(0, 0) + + // Proceed our fake migration along by creating new allocs and stopping + // old ones + replacements := make([]*structs.Allocation, len(drainedAllocs)) + updates := make([]*structs.Allocation, 0, len(drainedAllocs)*2) + for i, a := range drainedAllocs { + // Stop drained allocs + a.DesiredTransition.Migrate = nil + a.DesiredStatus = structs.AllocDesiredStatusStop + + // Create a replacement + replacement := mock.Alloc() + replacement.JobID = a.Job.ID + replacement.Job = a.Job + replacement.TaskGroup = a.TaskGroup + replacement.NodeID = runningNode.ID + // start in pending state with no health status + + updates = append(updates, a, replacement) + replacements[i] = replacement.Copy() + } + require.Nil(state.UpsertAllocs(index, updates)) + index++ + + // The drained allocs stopping cause migrations but no new drains + // because the replacements have not started + assertOps(0, 6) + + // Finally kickoff further drain activity by "starting" replacements + for _, a := range replacements { + a.ClientStatus = structs.AllocClientStatusRunning + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + } + require.Nil(state.UpsertAllocs(index, replacements)) + index++ + + require.NotEmpty(jobWatcher.drainingJobs()) + + // 6 new drains + drains, _ = assertOps(6, 0) + + // Fake migrations once more to finish the drain + drainedAllocs = make([]*structs.Allocation, len(drains.Allocs)) + for i, a := range drains.Allocs { + a.DesiredTransition.Migrate = helper.BoolToPtr(true) + + // create a copy so we can reuse this slice + drainedAllocs[i] = a.Copy() + } + require.Nil(state.UpsertAllocs(index, drainedAllocs)) + drains.Resp.Respond(index, nil) + index++ + + assertOps(0, 0) + + replacements = make([]*structs.Allocation, len(drainedAllocs)) + updates = make([]*structs.Allocation, 0, len(drainedAllocs)*2) + for i, a := range drainedAllocs { + a.DesiredTransition.Migrate = nil + a.DesiredStatus = structs.AllocDesiredStatusStop + + replacement := newAlloc(runningNode, a.Job) + updates = append(updates, a, replacement) + replacements[i] = replacement.Copy() + } + require.Nil(state.UpsertAllocs(index, updates)) + index++ + + assertOps(0, 6) + + for _, a := range replacements { + a.ClientStatus = structs.AllocClientStatusRunning + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + } + require.Nil(state.UpsertAllocs(index, replacements)) + index++ + + require.NotEmpty(jobWatcher.drainingJobs()) + + // Final 4 new drains + drains, _ = assertOps(4, 0) + + // Fake migrations once more to finish the drain + drainedAllocs = make([]*structs.Allocation, len(drains.Allocs)) + for i, a := range drains.Allocs { + a.DesiredTransition.Migrate = helper.BoolToPtr(true) + + // create a copy so we can reuse this slice + drainedAllocs[i] = a.Copy() + } + require.Nil(state.UpsertAllocs(index, drainedAllocs)) + drains.Resp.Respond(index, nil) + index++ + + assertOps(0, 0) + + replacements = make([]*structs.Allocation, len(drainedAllocs)) + updates = make([]*structs.Allocation, 0, len(drainedAllocs)*2) + for i, a := range drainedAllocs { + a.DesiredTransition.Migrate = nil + a.DesiredStatus = structs.AllocDesiredStatusStop + + replacement := newAlloc(runningNode, a.Job) + updates = append(updates, a, replacement) + replacements[i] = replacement.Copy() + } + require.Nil(state.UpsertAllocs(index, updates)) + index++ + + assertOps(0, 4) + + for _, a := range replacements { + a.ClientStatus = structs.AllocClientStatusRunning + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + } + require.Nil(state.UpsertAllocs(index, replacements)) + index++ + + // No jobs should be left! + require.Empty(jobWatcher.drainingJobs()) +} + +// DrainingJobWatcher tests: +// TODO Test that the watcher cancels its query when a new job is registered + +// handleTaskGroupTestCase is the test case struct for TestHandleTaskGroup +// +// Two nodes will be initialized: one draining and one running. +type handleTaskGroupTestCase struct { + // Name of test + Name string + + // Expectations + ExpectedDrained int + ExpectedMigrated int + ExpectedDone bool + + // Count overrides the default count of 10 if set + Count int + + // MaxParallel overrides the default max_parallel of 1 if set + MaxParallel int + + // AddAlloc will be called 10 times to create test allocs + // + // Allocs default to be healthy on the draining node + AddAlloc func(i int, a *structs.Allocation, drainingID, runningID string) +} + +func TestHandeTaskGroup_Table(t *testing.T) { + cases := []handleTaskGroupTestCase{ + { + // All allocs on draining node + Name: "AllDraining", + ExpectedDrained: 1, + ExpectedMigrated: 0, + ExpectedDone: false, + }, + { + // All allocs on non-draining node + Name: "AllNonDraining", + ExpectedDrained: 0, + ExpectedMigrated: 0, + ExpectedDone: true, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + a.NodeID = runningID + }, + }, + { + // Some allocs on non-draining node but not healthy + Name: "SomeNonDrainingUnhealthy", + ExpectedDrained: 0, + ExpectedMigrated: 0, + ExpectedDone: false, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + if i%2 == 0 { + a.NodeID = runningID + a.DeploymentStatus = nil + } + }, + }, + { + // One draining, other allocs on non-draining node and healthy + Name: "OneDraining", + ExpectedDrained: 1, + ExpectedMigrated: 0, + ExpectedDone: false, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + if i != 0 { + a.NodeID = runningID + } + }, + }, + { + // One already draining, other allocs on non-draining node and healthy + Name: "OneAlreadyDraining", + ExpectedDrained: 0, + ExpectedMigrated: 0, + ExpectedDone: false, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + if i == 0 { + a.DesiredTransition.Migrate = helper.BoolToPtr(true) + return + } + a.NodeID = runningID + }, + }, + { + // One already drained, other allocs on non-draining node and healthy + Name: "OneAlreadyDrained", + ExpectedDrained: 0, + ExpectedMigrated: 1, + ExpectedDone: true, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + if i == 0 { + a.DesiredStatus = structs.AllocDesiredStatusStop + return + } + a.NodeID = runningID + }, + }, + { + // All allocs are terminl, nothing to be drained + Name: "AllMigrating", + ExpectedDrained: 0, + ExpectedMigrated: 10, + ExpectedDone: true, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + a.DesiredStatus = structs.AllocDesiredStatusStop + }, + }, + { + // All allocs may be drained at once + Name: "AllAtOnce", + ExpectedDrained: 10, + ExpectedMigrated: 0, + ExpectedDone: false, + MaxParallel: 10, + }, + { + // Drain 2 + Name: "Drain2", + ExpectedDrained: 2, + ExpectedMigrated: 0, + ExpectedDone: false, + MaxParallel: 2, + }, + { + // One on new node, one drained, and one draining + ExpectedDrained: 1, + ExpectedMigrated: 1, + MaxParallel: 2, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + switch i { + case 0: + // One alloc on running node + a.NodeID = runningID + case 1: + // One alloc already migrated + a.DesiredStatus = structs.AllocDesiredStatusStop + } + }, + }, + { + // 8 on new node, one drained, and one draining + ExpectedDrained: 1, + ExpectedMigrated: 1, + MaxParallel: 2, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + switch i { + case 0, 1, 2, 3, 4, 5, 6, 7: + a.NodeID = runningID + case 8: + a.DesiredStatus = structs.AllocDesiredStatusStop + } + }, + }, + { + // 5 on new node, two drained, and three draining + ExpectedDrained: 3, + ExpectedMigrated: 2, + MaxParallel: 5, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + switch i { + case 0, 1, 2, 3, 4: + a.NodeID = runningID + case 8, 9: + a.DesiredStatus = structs.AllocDesiredStatusStop + } + }, + }, + { + // Not all on new node have health set + Name: "PendingHealth", + ExpectedDrained: 1, + ExpectedMigrated: 1, + MaxParallel: 3, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + switch i { + case 0: + // Deployment status UNset for 1 on new node + a.NodeID = runningID + a.DeploymentStatus = nil + case 1, 2, 3, 4: + // Deployment status set for 4 on new node + a.NodeID = runningID + case 9: + a.DesiredStatus = structs.AllocDesiredStatusStop + } + }, + }, + { + // 5 max parallel - 1 migrating - 2 with unset health = 2 drainable + Name: "PendingHealthHigherMax", + ExpectedDrained: 2, + ExpectedMigrated: 1, + MaxParallel: 5, + AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + switch i { + case 0, 1: + // Deployment status UNset for 2 on new node + a.NodeID = runningID + a.DeploymentStatus = nil + case 2, 3, 4: + // Deployment status set for 3 on new node + a.NodeID = runningID + case 9: + a.DesiredStatus = structs.AllocDesiredStatusStop + } + }, + }, + } + + for _, testCase := range cases { + t.Run(testCase.Name, func(t *testing.T) { + testHandleTaskGroup(t, testCase) + }) } } -func TestHandleTaskGroup_MixedHealth(t *testing.T) { - cases := []struct { - maxParallel int - drainingNodeAllocs int - healthSet int - healthUnset int - expectedDrain int - expectedMigrated int - expectedDone bool - }{ - { - maxParallel: 2, - drainingNodeAllocs: 10, - healthSet: 0, - healthUnset: 0, - expectedDrain: 2, - expectedMigrated: 0, - expectedDone: false, - }, - { - maxParallel: 2, - drainingNodeAllocs: 9, - healthSet: 0, - healthUnset: 0, - expectedDrain: 1, - expectedMigrated: 1, - expectedDone: false, - }, - { - maxParallel: 5, - drainingNodeAllocs: 9, - healthSet: 0, - healthUnset: 0, - expectedDrain: 4, - expectedMigrated: 1, - expectedDone: false, - }, - { - maxParallel: 2, - drainingNodeAllocs: 5, - healthSet: 2, - healthUnset: 0, - expectedDrain: 0, - expectedMigrated: 5, - expectedDone: false, - }, - { - maxParallel: 2, - drainingNodeAllocs: 5, - healthSet: 3, - healthUnset: 0, - expectedDrain: 0, - expectedMigrated: 5, - expectedDone: false, - }, - { - maxParallel: 2, - drainingNodeAllocs: 5, - healthSet: 4, - healthUnset: 0, - expectedDrain: 1, - expectedMigrated: 5, - expectedDone: false, - }, - { - maxParallel: 2, - drainingNodeAllocs: 5, - healthSet: 4, - healthUnset: 1, - expectedDrain: 1, - expectedMigrated: 5, - expectedDone: false, - }, - { - maxParallel: 1, - drainingNodeAllocs: 5, - healthSet: 4, - healthUnset: 1, - expectedDrain: 0, - expectedMigrated: 5, - expectedDone: false, - }, - { - maxParallel: 3, - drainingNodeAllocs: 5, - healthSet: 3, - healthUnset: 0, - expectedDrain: 1, - expectedMigrated: 5, - expectedDone: false, - }, - { - maxParallel: 3, - drainingNodeAllocs: 0, - healthSet: 10, - healthUnset: 0, - expectedDrain: 0, - expectedMigrated: 10, - expectedDone: true, - }, - { - // Is the case where deadline is hit and all 10 are just marked - // stopped. We should detect the job as done. - maxParallel: 3, - drainingNodeAllocs: 0, - healthSet: 0, - healthUnset: 0, - expectedDrain: 0, - expectedMigrated: 10, - expectedDone: true, - }, +func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) { + t.Parallel() + require := require.New(t) + assert := assert.New(t) + + // Create nodes + state := state.TestStateStore(t) + drainingNode, runningNode := testNodes(t, state) + + job := mock.Job() + job.TaskGroups[0].Count = 10 + if tc.Count > 0 { + job.TaskGroups[0].Count = tc.Count + } + if tc.MaxParallel > 0 { + job.TaskGroups[0].Migrate.MaxParallel = tc.MaxParallel + } + require.Nil(state.UpsertJob(102, job)) + + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + a := mock.Alloc() + a.JobID = job.ID + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + + // Default to being healthy on the draining node + a.NodeID = drainingNode.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + if tc.AddAlloc != nil { + tc.AddAlloc(i, a, drainingNode.ID, runningNode.ID) + } + allocs = append(allocs, a) } - for cnum, c := range cases { - t.Run(fmt.Sprintf("%d", cnum), func(t *testing.T) { - require := require.New(t) + require.Nil(state.UpsertAllocs(103, allocs)) + snap, err := state.Snapshot() + require.Nil(err) - // Create a draining node - state := state.TestStateStore(t) - - drainingNode := mock.Node() - drainingNode.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: 5 * time.Minute, - }, - ForceDeadline: time.Now().Add(1 * time.Minute), - } - require.Nil(state.UpsertNode(100, drainingNode)) - - healthyNode := mock.Node() - require.Nil(state.UpsertNode(101, healthyNode)) - - job := mock.Job() - job.TaskGroups[0].Migrate.MaxParallel = c.maxParallel - require.Nil(state.UpsertJob(101, job)) - - // Create running allocs on the draining node with health set - var allocs []*structs.Allocation - for i := 0; i < c.drainingNodeAllocs; i++ { - a := mock.Alloc() - a.Job = job - a.TaskGroup = job.TaskGroups[0].Name - a.NodeID = drainingNode.ID - a.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(false), - } - allocs = append(allocs, a) - } - - // Create stopped allocs on the draining node - for i := 10 - c.drainingNodeAllocs; i > 0; i-- { - a := mock.Alloc() - a.Job = job - a.TaskGroup = job.TaskGroups[0].Name - a.NodeID = drainingNode.ID - a.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(false), - } - a.DesiredStatus = structs.AllocDesiredStatusStop - allocs = append(allocs, a) - } - - // Create allocs on the healthy node with health set - for i := 0; i < c.healthSet; i++ { - a := mock.Alloc() - a.Job = job - a.TaskGroup = job.TaskGroups[0].Name - a.NodeID = healthyNode.ID - a.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(false), - } - allocs = append(allocs, a) - } - - // Create allocs on the healthy node with health not set - for i := 0; i < c.healthUnset; i++ { - a := mock.Alloc() - a.Job = job - a.TaskGroup = job.TaskGroups[0].Name - a.NodeID = healthyNode.ID - allocs = append(allocs, a) - } - require.Nil(state.UpsertAllocs(103, allocs)) - - snap, err := state.Snapshot() - require.Nil(err) - - res := &jobResult{} - require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res)) - require.Len(res.drain, c.expectedDrain) - require.Len(res.migrated, c.expectedMigrated) - require.Equal(c.expectedDone, res.done) - }) - } + res := newJobResult() + require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 102, res)) + assert.Lenf(res.drain, tc.ExpectedDrained, "Drain expected %d but found: %d", + tc.ExpectedDrained, len(res.drain)) + assert.Lenf(res.migrated, tc.ExpectedMigrated, "Migrate expected %d but found: %d", + tc.ExpectedMigrated, len(res.migrated)) + assert.Equal(tc.ExpectedDone, res.done) } func TestHandleTaskGroup_Migrations(t *testing.T) { diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index ed99fb693..97c6cf8b2 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -74,9 +74,7 @@ func (n *NodeDrainer) Update(node *structs.Node) { return } n.logger.Printf("[TRACE] nomad.drain: node %q has %d services on it", node.ID, len(jobs)) - for _, job := range jobs { - n.jobWatcher.RegisterJob(job) - } + n.jobWatcher.RegisterJobs(jobs) // TODO Test at this layer as well that a node drain on a node without // allocs immediately gets unmarked as draining diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 72f2c0a31..fa16284ab 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1794,13 +1794,13 @@ func (n *NetworkResource) PortLabels() map[string]int { // JobNs is a Job.ID and Namespace tuple type JobNs struct { - ID, Namespace string + Namespace, ID string } func NewJobNs(namespace, id string) JobNs { return JobNs{ - ID: id, Namespace: namespace, + ID: id, } }