From 1607a203a8bf955f74692db4fa3b004ebcc129d9 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Thu, 14 Nov 2019 15:34:38 -0500 Subject: [PATCH] Check for changes to affinity and constraints Adds checks for affinity and constraint changes when determining if we should update inplace. refactor to check all levels at once check for spread changes when checking inplace update --- scheduler/util.go | 102 +++++++++++++++++++++ scheduler/util_test.go | 202 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 304 insertions(+) diff --git a/scheduler/util.go b/scheduler/util.go index 37a5b729d..1046215a2 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -356,6 +356,21 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool { return true } + // Check Affinities + if affinitiesUpdated(jobA, jobB, taskGroup) { + return true + } + + // Check Constraints + if constraintsUpdated(jobA, jobB, taskGroup) { + return true + } + + // Check Spreads + if spreadsUpdated(jobA, jobB, taskGroup) { + return true + } + // Check each task for _, at := range a.Tasks { bt := b.LookupTask(at.Name) @@ -442,6 +457,93 @@ func networkPortMap(n *structs.NetworkResource) map[string]int { return m } +func affinitiesUpdated(jobA, jobB *structs.Job, taskGroup string) bool { + var aAffinities []*structs.Affinity + var bAffinities []*structs.Affinity + + tgA := jobA.LookupTaskGroup(taskGroup) + tgB := jobB.LookupTaskGroup(taskGroup) + + // Append jobA job and task group level affinities + aAffinities = append(aAffinities, jobA.Affinities...) + aAffinities = append(aAffinities, tgA.Affinities...) + + // Append jobB job and task group level affinities + bAffinities = append(bAffinities, jobB.Affinities...) + bAffinities = append(bAffinities, tgB.Affinities...) + + // append task affinities + for _, task := range tgA.Tasks { + aAffinities = append(aAffinities, task.Affinities...) + } + + for _, task := range tgB.Tasks { + bAffinities = append(bAffinities, task.Affinities...) + } + + // Check for equality + if len(aAffinities) != len(bAffinities) { + return true + } + + return !reflect.DeepEqual(aAffinities, bAffinities) +} + +func constraintsUpdated(jobA, jobB *structs.Job, taskGroup string) bool { + var aConstraints []*structs.Constraint + var bConstraints []*structs.Constraint + + tgA := jobA.LookupTaskGroup(taskGroup) + tgB := jobB.LookupTaskGroup(taskGroup) + + // Append jobA job and task group level constraints + aConstraints = append(aConstraints, jobA.Constraints...) + aConstraints = append(aConstraints, tgA.Constraints...) + + // Append jobB job and task group level constraints + bConstraints = append(bConstraints, jobB.Constraints...) + bConstraints = append(bConstraints, tgB.Constraints...) + + // Append task constraints + for _, task := range tgA.Tasks { + aConstraints = append(aConstraints, task.Constraints...) + } + + for _, task := range tgB.Tasks { + bConstraints = append(bConstraints, task.Constraints...) + } + + // Check for equality + if len(aConstraints) != len(bConstraints) { + return true + } + + return !reflect.DeepEqual(aConstraints, bConstraints) +} + +func spreadsUpdated(jobA, jobB *structs.Job, taskGroup string) bool { + var aSpreads []*structs.Spread + var bSpreads []*structs.Spread + + tgA := jobA.LookupTaskGroup(taskGroup) + tgB := jobB.LookupTaskGroup(taskGroup) + + // append jobA and task group level spreads + aSpreads = append(aSpreads, jobA.Spreads...) + aSpreads = append(aSpreads, tgA.Spreads...) + + // append jobB and task group level spreads + bSpreads = append(bSpreads, jobB.Spreads...) + bSpreads = append(bSpreads, tgB.Spreads...) + + // Check for equality + if len(aSpreads) != len(bSpreads) { + return true + } + + return !reflect.DeepEqual(aSpreads, bSpreads) +} + // setStatus is used to update the status of the evaluation func setStatus(logger log.Logger, planner Planner, eval, nextEval, spawnedBlocked *structs.Evaluation, diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 4983e592b..f6d6d56ca 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -384,6 +384,208 @@ func TestShuffleNodes(t *testing.T) { require.False(t, reflect.DeepEqual(nodes, orig)) } +func TestTaskUpdatedAffinity(t *testing.T) { + j1 := mock.Job() + j2 := mock.Job() + name := j1.TaskGroups[0].Name + + require.False(t, tasksUpdated(j1, j2, name)) + + // TaskGroup Affinity + j2.TaskGroups[0].Affinities = []*structs.Affinity{ + { + LTarget: "node.datacenter", + RTarget: "dc1", + Operand: "=", + Weight: 100, + }, + } + require.True(t, tasksUpdated(j1, j2, name)) + + // TaskGroup Task Affinity + j3 := mock.Job() + j3.TaskGroups[0].Tasks[0].Affinities = []*structs.Affinity{ + { + LTarget: "node.datacenter", + RTarget: "dc1", + Operand: "=", + Weight: 100, + }, + } + + require.True(t, tasksUpdated(j1, j3, name)) + + j4 := mock.Job() + j4.TaskGroups[0].Tasks[0].Affinities = []*structs.Affinity{ + { + LTarget: "node.datacenter", + RTarget: "dc1", + Operand: "=", + Weight: 100, + }, + } + + require.True(t, tasksUpdated(j1, j4, name)) + + // check different level of same constraint + j5 := mock.Job() + j5.Affinities = []*structs.Affinity{ + { + LTarget: "node.datacenter", + RTarget: "dc1", + Operand: "=", + Weight: 100, + }, + } + + j6 := mock.Job() + j6.Affinities = make([]*structs.Affinity, 0) + j6.TaskGroups[0].Affinities = []*structs.Affinity{ + { + LTarget: "node.datacenter", + RTarget: "dc1", + Operand: "=", + Weight: 100, + }, + } + + require.False(t, tasksUpdated(j5, j6, name)) +} + +func TestTaskUpdated_Constraint(t *testing.T) { + j1 := mock.Job() + j1.Constraints = make([]*structs.Constraint, 0) + + j2 := mock.Job() + j2.Constraints = make([]*structs.Constraint, 0) + + name := j1.TaskGroups[0].Name + require.False(t, tasksUpdated(j1, j2, name)) + + // TaskGroup Constraint + j2.TaskGroups[0].Constraints = []*structs.Constraint{ + { + LTarget: "kernel", + RTarget: "linux", + Operand: "=", + }, + } + + // TaskGroup Task Constraint + j3 := mock.Job() + j3.Constraints = make([]*structs.Constraint, 0) + + j3.TaskGroups[0].Tasks[0].Constraints = []*structs.Constraint{ + { + LTarget: "kernel", + RTarget: "linux", + Operand: "=", + }, + } + + require.True(t, tasksUpdated(j1, j3, name)) + + j4 := mock.Job() + j4.Constraints = make([]*structs.Constraint, 0) + + j4.TaskGroups[0].Tasks[0].Constraints = []*structs.Constraint{ + { + LTarget: "kernel", + RTarget: "linux", + Operand: "=", + }, + } + + require.True(t, tasksUpdated(j1, j4, name)) + + // check different level of same constraint + j5 := mock.Job() + j5.Constraints = []*structs.Constraint{ + { + LTarget: "kernel", + RTarget: "linux", + Operand: "=", + }, + } + + j6 := mock.Job() + j6.Constraints = make([]*structs.Constraint, 0) + j6.TaskGroups[0].Constraints = []*structs.Constraint{ + { + LTarget: "kernel", + RTarget: "linux", + Operand: "=", + }, + } + + require.False(t, tasksUpdated(j5, j6, name)) +} + +func TestTaskUpdatedSpread(t *testing.T) { + j1 := mock.Job() + j2 := mock.Job() + name := j1.TaskGroups[0].Name + + require.False(t, tasksUpdated(j1, j2, name)) + + // TaskGroup Spread + j2.TaskGroups[0].Spreads = []*structs.Spread{ + { + Attribute: "node.datacenter", + Weight: 100, + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "r1", + Percent: 50, + }, + { + Value: "r2", + Percent: 50, + }, + }, + }, + } + require.True(t, tasksUpdated(j1, j2, name)) + + // check different level of same constraint + j5 := mock.Job() + j5.Spreads = []*structs.Spread{ + { + Attribute: "node.datacenter", + Weight: 100, + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "r1", + Percent: 50, + }, + { + Value: "r2", + Percent: 50, + }, + }, + }, + } + + j6 := mock.Job() + j6.TaskGroups[0].Spreads = []*structs.Spread{ + { + Attribute: "node.datacenter", + Weight: 100, + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "r1", + Percent: 50, + }, + { + Value: "r2", + Percent: 50, + }, + }, + }, + } + + require.False(t, tasksUpdated(j5, j6, name)) +} func TestTasksUpdated(t *testing.T) { j1 := mock.Job() j2 := mock.Job()