drainer: RegisterJob -> RegisterJobs

Test job watcher
This commit is contained in:
Michael Schurter
2018-03-09 16:25:46 -08:00
parent 0a1f1d2c56
commit 8217ebf11e
5 changed files with 546 additions and 297 deletions

View File

@@ -332,7 +332,6 @@ func (n *NodeDrainer) batchDrainAllocs(allocs []*structs.Allocation) (uint64, er
}
n.batcher.Unlock()
// Wait for the future
if err := future.Wait(); err != nil {
return 0, err
}

View File

@@ -29,7 +29,7 @@ func NewDrainRequest(allocs []*structs.Allocation) *DrainRequest {
// DrainingJobWatcher is the interface for watching a job drain
type DrainingJobWatcher interface {
// RegisterJob is used to start watching a draining job
RegisterJob(job structs.JobNs)
RegisterJobs(job []structs.JobNs)
// Drain is used to emit allocations that should be drained.
Drain() <-chan *DrainRequest
@@ -90,21 +90,28 @@ func NewDrainingJobWatcher(ctx context.Context, limiter *rate.Limiter, state *st
}
// RegisterJob marks the given job as draining and adds it to being watched.
func (w *drainingJobWatcher) RegisterJob(job structs.JobNs) {
func (w *drainingJobWatcher) RegisterJobs(jobs []structs.JobNs) {
w.l.Lock()
defer w.l.Unlock()
if _, ok := w.jobs[job]; ok {
return
updated := false
for _, jns := range jobs {
if _, ok := w.jobs[jns]; ok {
continue
}
// Add the job and cancel the context
w.logger.Printf("[TRACE] nomad.drain.job_watcher: registering job %v", jns)
w.jobs[jns] = struct{}{}
updated = true
}
// Add the job and cancel the context
w.logger.Printf("[TRACE] nomad.drain.job_watcher: registering job %v", job)
w.jobs[job] = struct{}{}
w.queryCancel()
if updated {
w.queryCancel()
// Create a new query context
w.queryCtx, w.queryCancel = context.WithCancel(w.ctx)
// Create a new query context
w.queryCtx, w.queryCancel = context.WithCancel(w.ctx)
}
}
// Drain returns the channel that emits allocations to drain.
@@ -160,7 +167,6 @@ func (w *drainingJobWatcher) watch() {
}
}
// update index for next run
lastHandled := waitIndex
waitIndex = index
@@ -184,7 +190,7 @@ func (w *drainingJobWatcher) watch() {
// Lookup the job
job, err := w.state.JobByID(nil, jns.Namespace, jns.ID)
if err != nil {
if err != nil || job == nil {
w.logger.Printf("[WARN] nomad.drain.job_watcher: failed to lookup job %v: %v", jns, err)
continue
}
@@ -268,7 +274,8 @@ type jobResult struct {
done bool
}
// newJobResult returns an initialized jobResult
// newJobResult returns a jobResult with done=true. It is the responsibility of
// callers to set done=false when a remaining drainable alloc is found.
func newJobResult() *jobResult {
return &jobResult{
done: true,
@@ -390,10 +397,13 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
numToDrain := healthy - thresholdCount
numToDrain = helper.IntMin(len(drainable), numToDrain)
if numToDrain <= 0 {
fmt.Printf("------- Not draining any allocs\n")
fmt.Printf("------- Not draining any allocs: drainable:%d healthy:%d thresholdCount:%d\n",
len(drainable), healthy, thresholdCount)
return nil
}
fmt.Printf("------- DRAINing allocs: n: %d drainable:%d healthy:%d thresholdCount:%d\n",
numToDrain, len(drainable), healthy, thresholdCount)
result.drain = append(result.drain, drainable[0:numToDrain]...)
return nil
}

View File

@@ -2,7 +2,6 @@ package drainer
import (
"context"
"fmt"
"testing"
"time"
@@ -11,309 +10,552 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
func testDrainingJobWatcher(t *testing.T) (*drainingJobWatcher, *state.StateStore) {
t.Helper()
state := state.TestStateStore(t)
limiter := rate.NewLimiter(100.0, 100)
logger := testlog.Logger(t)
w := NewDrainingJobWatcher(context.Background(), limiter, state, logger)
return w, state
}
func TestDrainingJobWatcher_Interface(t *testing.T) {
t.Parallel()
require := require.New(t)
w, _ := testDrainingJobWatcher(t)
require.Implements((*DrainingJobWatcher)(nil), w)
}
// DrainingJobWatcher tests:
// TODO Test that several jobs allocation changes get batched
// TODO Test that jobs are deregistered when they have no more to migrate
// TODO Test that the watcher gets triggered on alloc changes
// TODO Test that the watcher cancels its query when a new job is registered
func TestHandleTaskGroup_AllDone(t *testing.T) {
t.Parallel()
require := require.New(t)
func testNodes(t *testing.T, state *state.StateStore) (drainingNode, runningNode *structs.Node) {
n1 := mock.Node()
n1.Name = "draining"
n1.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: time.Minute,
},
ForceDeadline: time.Now().Add(time.Minute),
}
require.Nil(t, state.UpsertNode(100, n1))
// Create a non-draining node
state := state.TestStateStore(t)
n := mock.Node()
require.Nil(state.UpsertNode(100, n))
job := mock.Job()
require.Nil(state.UpsertJob(101, job))
// Create 10 running allocs on the healthy node
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = n.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
allocs = append(allocs, a)
}
require.Nil(state.UpsertAllocs(102, allocs))
snap, err := state.Snapshot()
require.Nil(err)
res := &jobResult{}
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)
n2 := mock.Node()
n2.Name = "running"
require.Nil(t, state.UpsertNode(101, n2))
return n1, n2
}
func TestHandleTaskGroup_AllOnDrainingNodes(t *testing.T) {
func testDrainingJobWatcher(t *testing.T, state *state.StateStore) (*drainingJobWatcher, context.CancelFunc) {
t.Helper()
limiter := rate.NewLimiter(100.0, 100)
logger := testlog.Logger(t)
ctx, cancel := context.WithCancel(context.Background())
w := NewDrainingJobWatcher(ctx, limiter, state, logger)
return w, cancel
}
// TestDrainingJobWatcher_Interface is a compile-time assertion that we
// implement the intended interface.
func TestDrainingJobWatcher_Interface(t *testing.T) {
w, cancel := testDrainingJobWatcher(t, state.TestStateStore(t))
cancel()
var _ DrainingJobWatcher = w
}
// TestDrainingJobWatcher_DrainJobs asserts DrainingJobWatcher batches
// allocation changes from multiple jobs.
func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
t.Parallel()
require := require.New(t)
// The loop value sets the max parallel for the drain strategy
for i := 1; i < 8; i++ {
// Create a draining node
state := state.TestStateStore(t)
n := mock.Node()
n.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 5 * time.Minute,
},
ForceDeadline: time.Now().Add(1 * time.Minute),
}
require.Nil(state.UpsertNode(100, n))
state := state.TestStateStore(t)
jobWatcher, cancelWatcher := testDrainingJobWatcher(t, state)
defer cancelWatcher()
drainingNode, runningNode := testNodes(t, state)
var index uint64 = 101
count := 8
newAlloc := func(node *structs.Node, job *structs.Job) *structs.Allocation {
a := mock.Alloc()
a.JobID = job.ID
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = node.ID
return a
}
// 2 jobs with count 10, max parallel 3
jnss := make([]structs.JobNs, 2)
jobs := make([]*structs.Job, 2)
for i := 0; i < 2; i++ {
job := mock.Job()
job.TaskGroups[0].Migrate.MaxParallel = i
require.Nil(state.UpsertJob(101, job))
jobs[i] = job
jnss[i] = structs.NewJobNs(job.Namespace, job.ID)
job.TaskGroups[0].Migrate.MaxParallel = 3
job.TaskGroups[0].Count = count
require.Nil(state.UpsertJob(index, job))
index++
// Create 10 running allocs on the draining node
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = n.ID
for i := 0; i < count; i++ {
a := newAlloc(drainingNode, job)
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
Healthy: helper.BoolToPtr(true),
}
allocs = append(allocs, a)
}
require.Nil(state.UpsertAllocs(102, allocs))
snap, err := state.Snapshot()
require.Nil(err)
require.Nil(state.UpsertAllocs(index, allocs))
index++
res := &jobResult{}
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res))
require.Len(res.drain, i)
require.Empty(res.migrated)
require.False(res.done)
}
// Only register jobs with watcher after creating all data models as
// once the watcher starts we need to track the index carefully for
// updating the batch future
jobWatcher.RegisterJobs(jnss)
// assertOps asserts how many allocs should be drained and migrated.
// The drains and migrations - if any - are returned.
assertOps := func(drained, migrated int) (drains *DrainRequest, migrations []*structs.Allocation) {
t.Helper()
var drainsChecked, migrationsChecked bool
for {
select {
case drains = <-jobWatcher.Drain():
ids := make([]string, len(drains.Allocs))
for i, a := range drains.Allocs {
ids[i] = a.JobID[:6] + ":" + a.ID[:6]
}
t.Logf("draining %d allocs: %v", len(ids), ids)
require.False(drainsChecked, "drains already received")
drainsChecked = true
require.Lenf(drains.Allocs, drained,
"expected %d drains but found %d", drained, len(drains.Allocs))
case migrations = <-jobWatcher.Migrated():
ids := make([]string, len(migrations))
for i, a := range migrations {
ids[i] = a.JobID[:6] + ":" + a.ID[:6]
}
t.Logf("migrating %d allocs: %v", len(ids), ids)
require.False(migrationsChecked, "migrations already received")
migrationsChecked = true
require.Lenf(migrations, migrated,
"expected %d migrations but found %d", migrated, len(migrations))
case <-time.After(10 * time.Millisecond):
if !drainsChecked && drained > 0 {
t.Fatalf("expected %d drains but none happened", drained)
}
if !migrationsChecked && migrated > 0 {
t.Fatalf("expected %d migrations but none happened", migrated)
}
return drains, migrations
}
}
}
// Expect a first batch of MaxParallel allocs from each job
drains, _ := assertOps(6, 0)
// Fake migrating the drained allocs by starting new ones and stopping
// the old ones
drainedAllocs := make([]*structs.Allocation, len(drains.Allocs))
for i, a := range drains.Allocs {
a.DesiredTransition.Migrate = helper.BoolToPtr(true)
// create a copy so we can reuse this slice
drainedAllocs[i] = a.Copy()
}
require.Nil(state.UpsertAllocs(index, drainedAllocs))
drains.Resp.Respond(index, nil)
index++
// Just setting ShouldMigrate should not cause any further drains
assertOps(0, 0)
// Proceed our fake migration along by creating new allocs and stopping
// old ones
replacements := make([]*structs.Allocation, len(drainedAllocs))
updates := make([]*structs.Allocation, 0, len(drainedAllocs)*2)
for i, a := range drainedAllocs {
// Stop drained allocs
a.DesiredTransition.Migrate = nil
a.DesiredStatus = structs.AllocDesiredStatusStop
// Create a replacement
replacement := mock.Alloc()
replacement.JobID = a.Job.ID
replacement.Job = a.Job
replacement.TaskGroup = a.TaskGroup
replacement.NodeID = runningNode.ID
// start in pending state with no health status
updates = append(updates, a, replacement)
replacements[i] = replacement.Copy()
}
require.Nil(state.UpsertAllocs(index, updates))
index++
// The drained allocs stopping cause migrations but no new drains
// because the replacements have not started
assertOps(0, 6)
// Finally kickoff further drain activity by "starting" replacements
for _, a := range replacements {
a.ClientStatus = structs.AllocClientStatusRunning
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
}
require.Nil(state.UpsertAllocs(index, replacements))
index++
require.NotEmpty(jobWatcher.drainingJobs())
// 6 new drains
drains, _ = assertOps(6, 0)
// Fake migrations once more to finish the drain
drainedAllocs = make([]*structs.Allocation, len(drains.Allocs))
for i, a := range drains.Allocs {
a.DesiredTransition.Migrate = helper.BoolToPtr(true)
// create a copy so we can reuse this slice
drainedAllocs[i] = a.Copy()
}
require.Nil(state.UpsertAllocs(index, drainedAllocs))
drains.Resp.Respond(index, nil)
index++
assertOps(0, 0)
replacements = make([]*structs.Allocation, len(drainedAllocs))
updates = make([]*structs.Allocation, 0, len(drainedAllocs)*2)
for i, a := range drainedAllocs {
a.DesiredTransition.Migrate = nil
a.DesiredStatus = structs.AllocDesiredStatusStop
replacement := newAlloc(runningNode, a.Job)
updates = append(updates, a, replacement)
replacements[i] = replacement.Copy()
}
require.Nil(state.UpsertAllocs(index, updates))
index++
assertOps(0, 6)
for _, a := range replacements {
a.ClientStatus = structs.AllocClientStatusRunning
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
}
require.Nil(state.UpsertAllocs(index, replacements))
index++
require.NotEmpty(jobWatcher.drainingJobs())
// Final 4 new drains
drains, _ = assertOps(4, 0)
// Fake migrations once more to finish the drain
drainedAllocs = make([]*structs.Allocation, len(drains.Allocs))
for i, a := range drains.Allocs {
a.DesiredTransition.Migrate = helper.BoolToPtr(true)
// create a copy so we can reuse this slice
drainedAllocs[i] = a.Copy()
}
require.Nil(state.UpsertAllocs(index, drainedAllocs))
drains.Resp.Respond(index, nil)
index++
assertOps(0, 0)
replacements = make([]*structs.Allocation, len(drainedAllocs))
updates = make([]*structs.Allocation, 0, len(drainedAllocs)*2)
for i, a := range drainedAllocs {
a.DesiredTransition.Migrate = nil
a.DesiredStatus = structs.AllocDesiredStatusStop
replacement := newAlloc(runningNode, a.Job)
updates = append(updates, a, replacement)
replacements[i] = replacement.Copy()
}
require.Nil(state.UpsertAllocs(index, updates))
index++
assertOps(0, 4)
for _, a := range replacements {
a.ClientStatus = structs.AllocClientStatusRunning
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
}
require.Nil(state.UpsertAllocs(index, replacements))
index++
// No jobs should be left!
require.Empty(jobWatcher.drainingJobs())
}
// DrainingJobWatcher tests:
// TODO Test that the watcher cancels its query when a new job is registered
// handleTaskGroupTestCase is the test case struct for TestHandleTaskGroup
//
// Two nodes will be initialized: one draining and one running.
type handleTaskGroupTestCase struct {
// Name of test
Name string
// Expectations
ExpectedDrained int
ExpectedMigrated int
ExpectedDone bool
// Count overrides the default count of 10 if set
Count int
// MaxParallel overrides the default max_parallel of 1 if set
MaxParallel int
// AddAlloc will be called 10 times to create test allocs
//
// Allocs default to be healthy on the draining node
AddAlloc func(i int, a *structs.Allocation, drainingID, runningID string)
}
func TestHandeTaskGroup_Table(t *testing.T) {
cases := []handleTaskGroupTestCase{
{
// All allocs on draining node
Name: "AllDraining",
ExpectedDrained: 1,
ExpectedMigrated: 0,
ExpectedDone: false,
},
{
// All allocs on non-draining node
Name: "AllNonDraining",
ExpectedDrained: 0,
ExpectedMigrated: 0,
ExpectedDone: true,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
a.NodeID = runningID
},
},
{
// Some allocs on non-draining node but not healthy
Name: "SomeNonDrainingUnhealthy",
ExpectedDrained: 0,
ExpectedMigrated: 0,
ExpectedDone: false,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
if i%2 == 0 {
a.NodeID = runningID
a.DeploymentStatus = nil
}
},
},
{
// One draining, other allocs on non-draining node and healthy
Name: "OneDraining",
ExpectedDrained: 1,
ExpectedMigrated: 0,
ExpectedDone: false,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
if i != 0 {
a.NodeID = runningID
}
},
},
{
// One already draining, other allocs on non-draining node and healthy
Name: "OneAlreadyDraining",
ExpectedDrained: 0,
ExpectedMigrated: 0,
ExpectedDone: false,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
if i == 0 {
a.DesiredTransition.Migrate = helper.BoolToPtr(true)
return
}
a.NodeID = runningID
},
},
{
// One already drained, other allocs on non-draining node and healthy
Name: "OneAlreadyDrained",
ExpectedDrained: 0,
ExpectedMigrated: 1,
ExpectedDone: true,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
if i == 0 {
a.DesiredStatus = structs.AllocDesiredStatusStop
return
}
a.NodeID = runningID
},
},
{
// All allocs are terminl, nothing to be drained
Name: "AllMigrating",
ExpectedDrained: 0,
ExpectedMigrated: 10,
ExpectedDone: true,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
a.DesiredStatus = structs.AllocDesiredStatusStop
},
},
{
// All allocs may be drained at once
Name: "AllAtOnce",
ExpectedDrained: 10,
ExpectedMigrated: 0,
ExpectedDone: false,
MaxParallel: 10,
},
{
// Drain 2
Name: "Drain2",
ExpectedDrained: 2,
ExpectedMigrated: 0,
ExpectedDone: false,
MaxParallel: 2,
},
{
// One on new node, one drained, and one draining
ExpectedDrained: 1,
ExpectedMigrated: 1,
MaxParallel: 2,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
switch i {
case 0:
// One alloc on running node
a.NodeID = runningID
case 1:
// One alloc already migrated
a.DesiredStatus = structs.AllocDesiredStatusStop
}
},
},
{
// 8 on new node, one drained, and one draining
ExpectedDrained: 1,
ExpectedMigrated: 1,
MaxParallel: 2,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
switch i {
case 0, 1, 2, 3, 4, 5, 6, 7:
a.NodeID = runningID
case 8:
a.DesiredStatus = structs.AllocDesiredStatusStop
}
},
},
{
// 5 on new node, two drained, and three draining
ExpectedDrained: 3,
ExpectedMigrated: 2,
MaxParallel: 5,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
switch i {
case 0, 1, 2, 3, 4:
a.NodeID = runningID
case 8, 9:
a.DesiredStatus = structs.AllocDesiredStatusStop
}
},
},
{
// Not all on new node have health set
Name: "PendingHealth",
ExpectedDrained: 1,
ExpectedMigrated: 1,
MaxParallel: 3,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
switch i {
case 0:
// Deployment status UNset for 1 on new node
a.NodeID = runningID
a.DeploymentStatus = nil
case 1, 2, 3, 4:
// Deployment status set for 4 on new node
a.NodeID = runningID
case 9:
a.DesiredStatus = structs.AllocDesiredStatusStop
}
},
},
{
// 5 max parallel - 1 migrating - 2 with unset health = 2 drainable
Name: "PendingHealthHigherMax",
ExpectedDrained: 2,
ExpectedMigrated: 1,
MaxParallel: 5,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
switch i {
case 0, 1:
// Deployment status UNset for 2 on new node
a.NodeID = runningID
a.DeploymentStatus = nil
case 2, 3, 4:
// Deployment status set for 3 on new node
a.NodeID = runningID
case 9:
a.DesiredStatus = structs.AllocDesiredStatusStop
}
},
},
}
for _, testCase := range cases {
t.Run(testCase.Name, func(t *testing.T) {
testHandleTaskGroup(t, testCase)
})
}
}
func TestHandleTaskGroup_MixedHealth(t *testing.T) {
cases := []struct {
maxParallel int
drainingNodeAllocs int
healthSet int
healthUnset int
expectedDrain int
expectedMigrated int
expectedDone bool
}{
{
maxParallel: 2,
drainingNodeAllocs: 10,
healthSet: 0,
healthUnset: 0,
expectedDrain: 2,
expectedMigrated: 0,
expectedDone: false,
},
{
maxParallel: 2,
drainingNodeAllocs: 9,
healthSet: 0,
healthUnset: 0,
expectedDrain: 1,
expectedMigrated: 1,
expectedDone: false,
},
{
maxParallel: 5,
drainingNodeAllocs: 9,
healthSet: 0,
healthUnset: 0,
expectedDrain: 4,
expectedMigrated: 1,
expectedDone: false,
},
{
maxParallel: 2,
drainingNodeAllocs: 5,
healthSet: 2,
healthUnset: 0,
expectedDrain: 0,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 2,
drainingNodeAllocs: 5,
healthSet: 3,
healthUnset: 0,
expectedDrain: 0,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 2,
drainingNodeAllocs: 5,
healthSet: 4,
healthUnset: 0,
expectedDrain: 1,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 2,
drainingNodeAllocs: 5,
healthSet: 4,
healthUnset: 1,
expectedDrain: 1,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 1,
drainingNodeAllocs: 5,
healthSet: 4,
healthUnset: 1,
expectedDrain: 0,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 3,
drainingNodeAllocs: 5,
healthSet: 3,
healthUnset: 0,
expectedDrain: 1,
expectedMigrated: 5,
expectedDone: false,
},
{
maxParallel: 3,
drainingNodeAllocs: 0,
healthSet: 10,
healthUnset: 0,
expectedDrain: 0,
expectedMigrated: 10,
expectedDone: true,
},
{
// Is the case where deadline is hit and all 10 are just marked
// stopped. We should detect the job as done.
maxParallel: 3,
drainingNodeAllocs: 0,
healthSet: 0,
healthUnset: 0,
expectedDrain: 0,
expectedMigrated: 10,
expectedDone: true,
},
func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
// Create nodes
state := state.TestStateStore(t)
drainingNode, runningNode := testNodes(t, state)
job := mock.Job()
job.TaskGroups[0].Count = 10
if tc.Count > 0 {
job.TaskGroups[0].Count = tc.Count
}
if tc.MaxParallel > 0 {
job.TaskGroups[0].Migrate.MaxParallel = tc.MaxParallel
}
require.Nil(state.UpsertJob(102, job))
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
a.JobID = job.ID
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
// Default to being healthy on the draining node
a.NodeID = drainingNode.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
if tc.AddAlloc != nil {
tc.AddAlloc(i, a, drainingNode.ID, runningNode.ID)
}
allocs = append(allocs, a)
}
for cnum, c := range cases {
t.Run(fmt.Sprintf("%d", cnum), func(t *testing.T) {
require := require.New(t)
require.Nil(state.UpsertAllocs(103, allocs))
snap, err := state.Snapshot()
require.Nil(err)
// Create a draining node
state := state.TestStateStore(t)
drainingNode := mock.Node()
drainingNode.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 5 * time.Minute,
},
ForceDeadline: time.Now().Add(1 * time.Minute),
}
require.Nil(state.UpsertNode(100, drainingNode))
healthyNode := mock.Node()
require.Nil(state.UpsertNode(101, healthyNode))
job := mock.Job()
job.TaskGroups[0].Migrate.MaxParallel = c.maxParallel
require.Nil(state.UpsertJob(101, job))
// Create running allocs on the draining node with health set
var allocs []*structs.Allocation
for i := 0; i < c.drainingNodeAllocs; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = drainingNode.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
allocs = append(allocs, a)
}
// Create stopped allocs on the draining node
for i := 10 - c.drainingNodeAllocs; i > 0; i-- {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = drainingNode.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
a.DesiredStatus = structs.AllocDesiredStatusStop
allocs = append(allocs, a)
}
// Create allocs on the healthy node with health set
for i := 0; i < c.healthSet; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = healthyNode.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
allocs = append(allocs, a)
}
// Create allocs on the healthy node with health not set
for i := 0; i < c.healthUnset; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = healthyNode.ID
allocs = append(allocs, a)
}
require.Nil(state.UpsertAllocs(103, allocs))
snap, err := state.Snapshot()
require.Nil(err)
res := &jobResult{}
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res))
require.Len(res.drain, c.expectedDrain)
require.Len(res.migrated, c.expectedMigrated)
require.Equal(c.expectedDone, res.done)
})
}
res := newJobResult()
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 102, res))
assert.Lenf(res.drain, tc.ExpectedDrained, "Drain expected %d but found: %d",
tc.ExpectedDrained, len(res.drain))
assert.Lenf(res.migrated, tc.ExpectedMigrated, "Migrate expected %d but found: %d",
tc.ExpectedMigrated, len(res.migrated))
assert.Equal(tc.ExpectedDone, res.done)
}
func TestHandleTaskGroup_Migrations(t *testing.T) {

View File

@@ -74,9 +74,7 @@ func (n *NodeDrainer) Update(node *structs.Node) {
return
}
n.logger.Printf("[TRACE] nomad.drain: node %q has %d services on it", node.ID, len(jobs))
for _, job := range jobs {
n.jobWatcher.RegisterJob(job)
}
n.jobWatcher.RegisterJobs(jobs)
// TODO Test at this layer as well that a node drain on a node without
// allocs immediately gets unmarked as draining

View File

@@ -1794,13 +1794,13 @@ func (n *NetworkResource) PortLabels() map[string]int {
// JobNs is a Job.ID and Namespace tuple
type JobNs struct {
ID, Namespace string
Namespace, ID string
}
func NewJobNs(namespace, id string) JobNs {
return JobNs{
ID: id,
Namespace: namespace,
ID: id,
}
}