Added the chained alloc for system scheduler

This commit is contained in:
Diptanu Choudhury
2016-08-16 10:49:45 -07:00
parent a6b88d7a50
commit bcab2eb523
4 changed files with 98 additions and 0 deletions

View File

@@ -184,6 +184,7 @@ func SystemJob() *structs.Job {
Config: map[string]interface{}{
"command": "/bin/date",
},
Env: map[string]string{},
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,

View File

@@ -2482,6 +2482,9 @@ type Allocation struct {
// TaskStates stores the state of each task,
TaskStates map[string]*TaskState
// PreviousAllocation is the allocation that this allocation is replacing
PreviousAllocation string
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64

View File

@@ -321,6 +321,12 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
ClientStatus: structs.AllocClientStatusPending,
}
// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if missing.Alloc != nil {
alloc.PreviousAllocation = missing.Alloc.ID
}
s.plan.AppendAlloc(alloc)
} else {
// Lazy initialize the failed map

View File

@@ -2,6 +2,7 @@ package scheduler
import (
"reflect"
"sort"
"testing"
"time"
@@ -1008,3 +1009,90 @@ func TestSystemSched_Queued_With_Constraints(t *testing.T) {
t.Fatalf("bad queued allocations: %#v", h.Evals[0].QueuedAllocations)
}
}
func TestSystemSched_ChainedAlloc(t *testing.T) {
h := NewHarness(t)
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Create a job
job := mock.SystemJob()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
if err := h.Process(NewSystemScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
var allocIDs []string
for _, allocList := range h.Plans[0].NodeAllocation {
for _, alloc := range allocList {
allocIDs = append(allocIDs, alloc.ID)
}
}
sort.Strings(allocIDs)
// Create a new harness to invoke the scheduler again
h1 := NewHarnessWithState(t, h.State)
job1 := mock.SystemJob()
job1.ID = job.ID
job1.TaskGroups[0].Tasks[0].Env["foo"] = "bar"
noErr(t, h1.State.UpsertJob(h1.NextIndex(), job1))
// Insert two more nodes
for i := 0; i < 2; i++ {
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Create a mock evaluation to update the job
eval1 := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job1.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job1.ID,
}
// Process the evaluation
if err := h1.Process(NewSystemScheduler, eval1); err != nil {
t.Fatalf("err: %v", err)
}
plan := h1.Plans[0]
// Collect all the chained allocation ids and the new allocations which
// don't have any chained allocations
var prevAllocs []string
var newAllocs []string
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
if alloc.PreviousAllocation == "" {
newAllocs = append(newAllocs, alloc.ID)
continue
}
prevAllocs = append(prevAllocs, alloc.PreviousAllocation)
}
}
sort.Strings(prevAllocs)
// Ensure that the new allocations has their corresponging original
// allocation ids
if !reflect.DeepEqual(prevAllocs, allocIDs) {
t.Fatalf("expected: %v, actual: %v", len(allocIDs), len(prevAllocs))
}
// Ensuring two new allocations don't have any chained allocations
if len(newAllocs) != 2 {
t.Fatalf("expected: %v, actual: %v", 2, len(newAllocs))
}
}