From 0319d5fe64ec09cea3551b1954ba5f9ffbbabc6c Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 25 Aug 2016 15:26:28 -0500 Subject: [PATCH] Added scheduler tests to ensure disk constraints are honored --- nomad/state/state_store_test.go | 29 -------------- nomad/structs/structs.go | 5 ++- scheduler/generic_sched.go | 1 + scheduler/generic_sched_test.go | 69 ++++++++++++++++++++++++++++++++- scheduler/system_sched_test.go | 62 +++++++++++++++++++++++++++++ 5 files changed, 134 insertions(+), 32 deletions(-) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 2312abee2..ddd31353f 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -444,11 +444,6 @@ func TestStateStore_UpsertJob_NoLocalDisk(t *testing.T) { job.TaskGroups[0].LocalDisk = nil job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150 - notify := setupNotifyTest( - state, - watch.Item{Table: "jobs"}, - watch.Item{Job: job.ID}) - err := state.UpsertJob(1000, job) if err != nil { t.Fatalf("err: %v", err) @@ -470,30 +465,6 @@ func TestStateStore_UpsertJob_NoLocalDisk(t *testing.T) { if !reflect.DeepEqual(expected, out) { t.Fatalf("bad: %#v %#v", expected, out) } - - index, err := state.Index("jobs") - if err != nil { - t.Fatalf("err: %v", err) - } - if index != 1000 { - t.Fatalf("bad: %d", index) - } - - summary, err := state.JobSummaryByID(job.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if summary == nil { - t.Fatalf("nil summary") - } - if summary.JobID != job.ID { - t.Fatalf("bad summary id: %v", summary.JobID) - } - _, ok := summary.Summary["web"] - if !ok { - t.Fatalf("nil summary for task group") - } - notify.verify(t) } func TestStateStore_DeleteJob_Job(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 230a343f7..4ba0b5c21 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -772,7 +772,6 @@ func DefaultResources() *Resources { return &Resources{ CPU: 100, MemoryMB: 10, - DiskMB: 300, IOPS: 0, } } @@ -2569,7 +2568,10 @@ func (c *Constraint) Validate() error { // LocalDisk is an ephemeral disk object type LocalDisk struct { + // Sticky indicates whether the allocation is sticky to a node Sticky bool + + // DiskMB is the size of the local disk DiskMB int `mapstructure:"disk"` } @@ -2580,6 +2582,7 @@ func DefaultLocalDisk() *LocalDisk { } } +// Validate validates LocalDisk func (d *LocalDisk) Validate() error { if d.DiskMB < 10 { return fmt.Errorf("minimum DiskMB value is 10; got %d", d.DiskMB) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 48b1c65b4..613fdd313 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -452,6 +452,7 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { TaskGroup: missing.TaskGroup.Name, Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, + Resources: option.AllocResources, TaskResources: option.TaskResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index a5b37c988..03fe89b11 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -50,8 +50,8 @@ func TestServiceSched_JobRegister(t *testing.T) { } // Ensure the eval has no spawned blocked eval - if len(h.Evals) != 1 { - t.Fatalf("bad: %#v", h.Evals) + if len(h.CreateEvals) != 0 { + t.Fatalf("bad: %#v", h.CreateEvals) if h.Evals[0].BlockedEval != "" { t.Fatalf("bad: %#v", h.Evals[0]) } @@ -91,6 +91,71 @@ func TestServiceSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { + h := NewHarness(t) + + // Create a node + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job with count 2 and disk as 60GB so that only one allocation + // can fit + job := mock.Job() + job.TaskGroups[0].Count = 2 + job.TaskGroups[0].LocalDisk.DiskMB = 88 * 1024 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval has a blocked eval + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + // Ensure the plan allocated only one allocation + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure only one allocation was placed + if len(out) != 1 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 3ad4fd852..043fbe854 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -80,6 +80,68 @@ func TestSystemSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestSystemSched_JobRegister_LocalDiskConstraint(t *testing.T) { + h := NewHarness(t) + + // Create a nodes + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job + job := mock.SystemJob() + job.TaskGroups[0].LocalDisk.DiskMB = 60 * 1024 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create another job with a lot of disk resource ask so that it doesn't fit + // the node + job1 := mock.SystemJob() + job1.TaskGroups[0].LocalDisk.DiskMB = 60 * 1024 + noErr(t, h.State.UpsertJob(h.NextIndex(), job1)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + if err := h.Process(NewSystemScheduler, eval); err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 1 { + t.Fatalf("bad: %#v", out) + } + + // Create a new harness to test the scheduling result for the second job + h1 := NewHarnessWithState(t, h.State) + // Create a mock evaluation to register the job + eval1 := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job1.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job1.ID, + } + + // Process the evaluation + if err := h1.Process(NewSystemScheduler, eval1); err != nil { + t.Fatalf("err: %v", err) + } + + out, err = h1.State.AllocsByJob(job1.ID) + noErr(t, err) + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } +} + func TestSystemSched_ExhaustResources(t *testing.T) { h := NewHarness(t)