Scheduler combines meta from job > group > task

This commit is contained in:
Alex Dadgar
2016-12-15 17:08:38 -08:00
parent 69b1adc4ae
commit e80cecf58b
4 changed files with 88 additions and 23 deletions

View File

@@ -1289,6 +1289,42 @@ func (j *Job) LookupTaskGroup(name string) *TaskGroup {
return nil
}
// CombinedTaskMeta takes a TaskGroup and Task name and returns the combined
// meta data for the task. When joining Job, Group and Task Meta, the precedence
// is by deepest scope (Task > Group > Job).
func (j *Job) CombinedTaskMeta(groupName, taskName string) map[string]string {
group := j.LookupTaskGroup(groupName)
if group == nil {
return nil
}
task := group.LookupTask(taskName)
if task == nil {
return nil
}
meta := CopyMapStringString(task.Meta)
if meta == nil {
meta = make(map[string]string, len(group.Meta)+len(j.Meta))
}
// Add the group specific meta
for k, v := range group.Meta {
if _, ok := meta[k]; !ok {
meta[k] = v
}
}
// Add the job specific meta
for k, v := range j.Meta {
if _, ok := meta[k]; !ok {
meta[k] = v
}
}
return meta
}
// Stub is used to return a summary of the job
func (j *Job) Stub(summary *JobSummary) *JobListStub {
return &JobListStub{

View File

@@ -1179,6 +1179,7 @@ func TestSystemSched_ChainedAlloc(t *testing.T) {
h1 := NewHarnessWithState(t, h.State)
job1 := mock.SystemJob()
job1.ID = job.ID
job1.TaskGroups[0].Tasks[0].Env = make(map[string]string)
job1.TaskGroups[0].Tasks[0].Env["foo"] = "bar"
noErr(t, h1.State.UpsertJob(h1.NextIndex(), job1))

View File

@@ -328,8 +328,12 @@ func shuffleNodes(nodes []*structs.Node) {
}
// tasksUpdated does a diff between task groups to see if the
// tasks, their drivers, environment variables or config have updated.
func tasksUpdated(a, b *structs.TaskGroup) bool {
// tasks, their drivers, environment variables or config have updated. The
// inputs are the task group name to diff and two jobs to diff.
func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool {
a := jobA.LookupTaskGroup(taskGroup)
b := jobB.LookupTaskGroup(taskGroup)
// If the number of tasks do not match, clearly there is an update
if len(a.Tasks) != len(b.Tasks) {
return true
@@ -358,9 +362,6 @@ func tasksUpdated(a, b *structs.TaskGroup) bool {
if !reflect.DeepEqual(at.Env, bt.Env) {
return true
}
if !reflect.DeepEqual(at.Meta, bt.Meta) {
return true
}
if !reflect.DeepEqual(at.Artifacts, bt.Artifacts) {
return true
}
@@ -371,6 +372,13 @@ func tasksUpdated(a, b *structs.TaskGroup) bool {
return true
}
// Check the metadata
if !reflect.DeepEqual(
jobA.CombinedTaskMeta(taskGroup, at.Name),
jobB.CombinedTaskMeta(taskGroup, bt.Name)) {
return true
}
// Inspect the network to see if the dynamic ports are different
if len(at.Resources.Networks) != len(bt.Resources.Networks) {
return true
@@ -452,8 +460,13 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
// Check if the task drivers or config has changed, requires
// a rolling upgrade since that cannot be done in-place.
existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name)
if tasksUpdated(update.TaskGroup, existing) {
//existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name)
//if tasksUpdated(update.TaskGroup, existing) {
//continue
//}
existing := update.Alloc.Job
if tasksUpdated(job, existing, update.TaskGroup.Name) {
continue
}

View File

@@ -454,50 +454,51 @@ func TestShuffleNodes(t *testing.T) {
func TestTasksUpdated(t *testing.T) {
j1 := mock.Job()
j2 := mock.Job()
name := j1.TaskGroups[0].Name
if tasksUpdated(j1.TaskGroups[0], j2.TaskGroups[0]) {
if tasksUpdated(j1, j2, name) {
t.Fatalf("bad")
}
j2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
if !tasksUpdated(j1.TaskGroups[0], j2.TaskGroups[0]) {
if !tasksUpdated(j1, j2, name) {
t.Fatalf("bad")
}
j3 := mock.Job()
j3.TaskGroups[0].Tasks[0].Name = "foo"
if !tasksUpdated(j1.TaskGroups[0], j3.TaskGroups[0]) {
if !tasksUpdated(j1, j3, name) {
t.Fatalf("bad")
}
j4 := mock.Job()
j4.TaskGroups[0].Tasks[0].Driver = "foo"
if !tasksUpdated(j1.TaskGroups[0], j4.TaskGroups[0]) {
if !tasksUpdated(j1, j4, name) {
t.Fatalf("bad")
}
j5 := mock.Job()
j5.TaskGroups[0].Tasks = append(j5.TaskGroups[0].Tasks,
j5.TaskGroups[0].Tasks[0])
if !tasksUpdated(j1.TaskGroups[0], j5.TaskGroups[0]) {
if !tasksUpdated(j1, j5, name) {
t.Fatalf("bad")
}
j6 := mock.Job()
j6.TaskGroups[0].Tasks[0].Resources.Networks[0].DynamicPorts = []structs.Port{{"http", 0}, {"https", 0}, {"admin", 0}}
if !tasksUpdated(j1.TaskGroups[0], j6.TaskGroups[0]) {
if !tasksUpdated(j1, j6, name) {
t.Fatalf("bad")
}
j7 := mock.Job()
j7.TaskGroups[0].Tasks[0].Env["NEW_ENV"] = "NEW_VALUE"
if !tasksUpdated(j1.TaskGroups[0], j7.TaskGroups[0]) {
if !tasksUpdated(j1, j7, name) {
t.Fatalf("bad")
}
j8 := mock.Job()
j8.TaskGroups[0].Tasks[0].User = "foo"
if !tasksUpdated(j1.TaskGroups[0], j8.TaskGroups[0]) {
if !tasksUpdated(j1, j8, name) {
t.Fatalf("bad")
}
@@ -507,49 +508,63 @@ func TestTasksUpdated(t *testing.T) {
GetterSource: "http://foo.com/bar",
},
}
if !tasksUpdated(j1.TaskGroups[0], j9.TaskGroups[0]) {
if !tasksUpdated(j1, j9, name) {
t.Fatalf("bad")
}
j10 := mock.Job()
j10.TaskGroups[0].Tasks[0].Meta["baz"] = "boom"
if !tasksUpdated(j1.TaskGroups[0], j10.TaskGroups[0]) {
if !tasksUpdated(j1, j10, name) {
t.Fatalf("bad")
}
j11 := mock.Job()
j11.TaskGroups[0].Tasks[0].Resources.CPU = 1337
if !tasksUpdated(j1.TaskGroups[0], j11.TaskGroups[0]) {
if !tasksUpdated(j1, j11, name) {
t.Fatalf("bad")
}
j12 := mock.Job()
j12.TaskGroups[0].Tasks[0].Resources.Networks[0].MBits = 100
if !tasksUpdated(j1.TaskGroups[0], j12.TaskGroups[0]) {
if !tasksUpdated(j1, j12, name) {
t.Fatalf("bad")
}
j13 := mock.Job()
j13.TaskGroups[0].Tasks[0].Resources.Networks[0].DynamicPorts[0].Label = "foobar"
if !tasksUpdated(j1.TaskGroups[0], j13.TaskGroups[0]) {
if !tasksUpdated(j1, j13, name) {
t.Fatalf("bad")
}
j14 := mock.Job()
j14.TaskGroups[0].Tasks[0].Resources.Networks[0].ReservedPorts = []structs.Port{{Label: "foo", Value: 1312}}
if !tasksUpdated(j1.TaskGroups[0], j14.TaskGroups[0]) {
if !tasksUpdated(j1, j14, name) {
t.Fatalf("bad")
}
j15 := mock.Job()
j15.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{"foo"}}
if !tasksUpdated(j1.TaskGroups[0], j15.TaskGroups[0]) {
if !tasksUpdated(j1, j15, name) {
t.Fatalf("bad")
}
j16 := mock.Job()
j16.TaskGroups[0].EphemeralDisk.Sticky = true
if !tasksUpdated(j1.TaskGroups[0], j16.TaskGroups[0]) {
if !tasksUpdated(j1, j16, name) {
t.Fatal("bad")
}
// Change group meta
j17 := mock.Job()
j17.TaskGroups[0].Meta["j17_test"] = "roll_baby_roll"
if !tasksUpdated(j1, j17, name) {
t.Fatal("bad")
}
// Change job meta
j18 := mock.Job()
j18.Meta["j18_test"] = "roll_baby_roll"
if !tasksUpdated(j1, j18, name) {
t.Fatal("bad")
}
}