diff --git a/scheduler/util.go b/scheduler/util.go index 37a5b729d..1046215a2 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -356,6 +356,21 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool { return true } + // Check Affinities + if affinitiesUpdated(jobA, jobB, taskGroup) { + return true + } + + // Check Constraints + if constraintsUpdated(jobA, jobB, taskGroup) { + return true + } + + // Check Spreads + if spreadsUpdated(jobA, jobB, taskGroup) { + return true + } + // Check each task for _, at := range a.Tasks { bt := b.LookupTask(at.Name) @@ -442,6 +457,93 @@ func networkPortMap(n *structs.NetworkResource) map[string]int { return m } +func affinitiesUpdated(jobA, jobB *structs.Job, taskGroup string) bool { + var aAffinities []*structs.Affinity + var bAffinities []*structs.Affinity + + tgA := jobA.LookupTaskGroup(taskGroup) + tgB := jobB.LookupTaskGroup(taskGroup) + + // Append jobA job and task group level affinities + aAffinities = append(aAffinities, jobA.Affinities...) + aAffinities = append(aAffinities, tgA.Affinities...) + + // Append jobB job and task group level affinities + bAffinities = append(bAffinities, jobB.Affinities...) + bAffinities = append(bAffinities, tgB.Affinities...) + + // append task affinities + for _, task := range tgA.Tasks { + aAffinities = append(aAffinities, task.Affinities...) + } + + for _, task := range tgB.Tasks { + bAffinities = append(bAffinities, task.Affinities...) + } + + // Check for equality + if len(aAffinities) != len(bAffinities) { + return true + } + + return !reflect.DeepEqual(aAffinities, bAffinities) +} + +func constraintsUpdated(jobA, jobB *structs.Job, taskGroup string) bool { + var aConstraints []*structs.Constraint + var bConstraints []*structs.Constraint + + tgA := jobA.LookupTaskGroup(taskGroup) + tgB := jobB.LookupTaskGroup(taskGroup) + + // Append jobA job and task group level constraints + aConstraints = append(aConstraints, jobA.Constraints...) + aConstraints = append(aConstraints, tgA.Constraints...) + + // Append jobB job and task group level constraints + bConstraints = append(bConstraints, jobB.Constraints...) + bConstraints = append(bConstraints, tgB.Constraints...) + + // Append task constraints + for _, task := range tgA.Tasks { + aConstraints = append(aConstraints, task.Constraints...) + } + + for _, task := range tgB.Tasks { + bConstraints = append(bConstraints, task.Constraints...) + } + + // Check for equality + if len(aConstraints) != len(bConstraints) { + return true + } + + return !reflect.DeepEqual(aConstraints, bConstraints) +} + +func spreadsUpdated(jobA, jobB *structs.Job, taskGroup string) bool { + var aSpreads []*structs.Spread + var bSpreads []*structs.Spread + + tgA := jobA.LookupTaskGroup(taskGroup) + tgB := jobB.LookupTaskGroup(taskGroup) + + // append jobA and task group level spreads + aSpreads = append(aSpreads, jobA.Spreads...) + aSpreads = append(aSpreads, tgA.Spreads...) + + // append jobB and task group level spreads + bSpreads = append(bSpreads, jobB.Spreads...) + bSpreads = append(bSpreads, tgB.Spreads...) + + // Check for equality + if len(aSpreads) != len(bSpreads) { + return true + } + + return !reflect.DeepEqual(aSpreads, bSpreads) +} + // setStatus is used to update the status of the evaluation func setStatus(logger log.Logger, planner Planner, eval, nextEval, spawnedBlocked *structs.Evaluation, diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 4983e592b..f6d6d56ca 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -384,6 +384,208 @@ func TestShuffleNodes(t *testing.T) { require.False(t, reflect.DeepEqual(nodes, orig)) } +func TestTaskUpdatedAffinity(t *testing.T) { + j1 := mock.Job() + j2 := mock.Job() + name := j1.TaskGroups[0].Name + + require.False(t, tasksUpdated(j1, j2, name)) + + // TaskGroup Affinity + j2.TaskGroups[0].Affinities = []*structs.Affinity{ + { + LTarget: "node.datacenter", + RTarget: "dc1", + Operand: "=", + Weight: 100, + }, + } + require.True(t, tasksUpdated(j1, j2, name)) + + // TaskGroup Task Affinity + j3 := mock.Job() + j3.TaskGroups[0].Tasks[0].Affinities = []*structs.Affinity{ + { + LTarget: "node.datacenter", + RTarget: "dc1", + Operand: "=", + Weight: 100, + }, + } + + require.True(t, tasksUpdated(j1, j3, name)) + + j4 := mock.Job() + j4.TaskGroups[0].Tasks[0].Affinities = []*structs.Affinity{ + { + LTarget: "node.datacenter", + RTarget: "dc1", + Operand: "=", + Weight: 100, + }, + } + + require.True(t, tasksUpdated(j1, j4, name)) + + // check different level of same constraint + j5 := mock.Job() + j5.Affinities = []*structs.Affinity{ + { + LTarget: "node.datacenter", + RTarget: "dc1", + Operand: "=", + Weight: 100, + }, + } + + j6 := mock.Job() + j6.Affinities = make([]*structs.Affinity, 0) + j6.TaskGroups[0].Affinities = []*structs.Affinity{ + { + LTarget: "node.datacenter", + RTarget: "dc1", + Operand: "=", + Weight: 100, + }, + } + + require.False(t, tasksUpdated(j5, j6, name)) +} + +func TestTaskUpdated_Constraint(t *testing.T) { + j1 := mock.Job() + j1.Constraints = make([]*structs.Constraint, 0) + + j2 := mock.Job() + j2.Constraints = make([]*structs.Constraint, 0) + + name := j1.TaskGroups[0].Name + require.False(t, tasksUpdated(j1, j2, name)) + + // TaskGroup Constraint + j2.TaskGroups[0].Constraints = []*structs.Constraint{ + { + LTarget: "kernel", + RTarget: "linux", + Operand: "=", + }, + } + + // TaskGroup Task Constraint + j3 := mock.Job() + j3.Constraints = make([]*structs.Constraint, 0) + + j3.TaskGroups[0].Tasks[0].Constraints = []*structs.Constraint{ + { + LTarget: "kernel", + RTarget: "linux", + Operand: "=", + }, + } + + require.True(t, tasksUpdated(j1, j3, name)) + + j4 := mock.Job() + j4.Constraints = make([]*structs.Constraint, 0) + + j4.TaskGroups[0].Tasks[0].Constraints = []*structs.Constraint{ + { + LTarget: "kernel", + RTarget: "linux", + Operand: "=", + }, + } + + require.True(t, tasksUpdated(j1, j4, name)) + + // check different level of same constraint + j5 := mock.Job() + j5.Constraints = []*structs.Constraint{ + { + LTarget: "kernel", + RTarget: "linux", + Operand: "=", + }, + } + + j6 := mock.Job() + j6.Constraints = make([]*structs.Constraint, 0) + j6.TaskGroups[0].Constraints = []*structs.Constraint{ + { + LTarget: "kernel", + RTarget: "linux", + Operand: "=", + }, + } + + require.False(t, tasksUpdated(j5, j6, name)) +} + +func TestTaskUpdatedSpread(t *testing.T) { + j1 := mock.Job() + j2 := mock.Job() + name := j1.TaskGroups[0].Name + + require.False(t, tasksUpdated(j1, j2, name)) + + // TaskGroup Spread + j2.TaskGroups[0].Spreads = []*structs.Spread{ + { + Attribute: "node.datacenter", + Weight: 100, + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "r1", + Percent: 50, + }, + { + Value: "r2", + Percent: 50, + }, + }, + }, + } + require.True(t, tasksUpdated(j1, j2, name)) + + // check different level of same constraint + j5 := mock.Job() + j5.Spreads = []*structs.Spread{ + { + Attribute: "node.datacenter", + Weight: 100, + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "r1", + Percent: 50, + }, + { + Value: "r2", + Percent: 50, + }, + }, + }, + } + + j6 := mock.Job() + j6.TaskGroups[0].Spreads = []*structs.Spread{ + { + Attribute: "node.datacenter", + Weight: 100, + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "r1", + Percent: 50, + }, + { + Value: "r2", + Percent: 50, + }, + }, + }, + } + + require.False(t, tasksUpdated(j5, j6, name)) +} func TestTasksUpdated(t *testing.T) { j1 := mock.Job() j2 := mock.Job()