From 0628fee7a888742f5359a4b505df09733c0495ff Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 24 Aug 2016 13:51:15 -0500 Subject: [PATCH 01/13] Implemented job spec parsing for sticky volumes --- api/tasks.go | 7 +++++ jobspec/parse.go | 50 ++++++++++++++++++++++++++++++--- jobspec/parse_test.go | 24 ++++++++++------ jobspec/test-fixtures/basic.hcl | 5 ++++ nomad/structs/structs.go | 31 ++++++++++++++++++++ 5 files changed, 105 insertions(+), 12 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index ab8886724..fff9d79bf 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -82,6 +82,12 @@ type Service struct { Checks []ServiceCheck } +// LocalDisk is an ephemeral disk object +type LocalDisk struct { + Sticky bool + DiskMB int `mapstructure:"disk"` +} + // TaskGroup is the unit of scheduling. type TaskGroup struct { Name string @@ -89,6 +95,7 @@ type TaskGroup struct { Constraints []*Constraint Tasks []*Task RestartPolicy *RestartPolicy + LocalDisk *LocalDisk Meta map[string]string } diff --git a/jobspec/parse.go b/jobspec/parse.go index b2d8eb8f7..37fa461e4 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -190,9 +190,10 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2) for i, t := range tasks { result.TaskGroups[i] = &structs.TaskGroup{ - Name: t.Name, - Count: 1, - Tasks: []*structs.Task{t}, + Name: t.Name, + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), + Tasks: []*structs.Task{t}, } } } @@ -240,6 +241,7 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error { "restart", "meta", "task", + "local_disk", } if err := checkHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) @@ -253,6 +255,7 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error { delete(m, "meta") delete(m, "task") delete(m, "restart") + delete(m, "local_disk") // Default count to 1 if not specified if _, ok := m["count"]; !ok { @@ -280,6 +283,14 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error { } } + // Parse local disk + g.LocalDisk = structs.DefaultLocalDisk() + if o := listVal.Filter("local_disk"); len(o.Items) > 0 { + if err := parseLocalDisk(&g.LocalDisk, o); err != nil { + return multierror.Prefix(err, fmt.Sprintf("'%s', local_disk ->", n)) + } + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { @@ -417,6 +428,38 @@ func parseConstraints(result *[]*structs.Constraint, list *ast.ObjectList) error return nil } +func parseLocalDisk(result **structs.LocalDisk, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'local_disk' block allowed") + } + + // Get our local_disk object + obj := list.Items[0] + + // Check for invalid keys + valid := []string{ + "sticky", + "disk", + } + if err := checkHCLKeys(obj.Val, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, obj.Val); err != nil { + return err + } + + var localDisk structs.LocalDisk + if err := mapstructure.WeakDecode(m, &localDisk); err != nil { + return err + } + *result = &localDisk + + return nil +} + // parseBool takes an interface value and tries to convert it to a boolean and // returns an error if the type can't be converted. func parseBool(value interface{}) (bool, error) { @@ -835,7 +878,6 @@ func parseResources(result *structs.Resources, list *ast.ObjectList) error { // Check for invalid keys valid := []string{ "cpu", - "disk", "iops", "memory", "network", diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 59ccfc0b2..e8f8821c7 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -49,8 +49,9 @@ func TestParse(t *testing.T) { TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "outside", - Count: 1, + Name: "outside", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "outside", @@ -87,6 +88,10 @@ func TestParse(t *testing.T) { Delay: 15 * time.Second, Mode: "delay", }, + LocalDisk: &structs.LocalDisk{ + Sticky: true, + DiskMB: 150, + }, Tasks: []*structs.Task{ &structs.Task{ Name: "binstore", @@ -313,8 +318,9 @@ func TestParse(t *testing.T) { TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "bar", - Count: 1, + Name: "bar", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "bar", @@ -356,8 +362,9 @@ func TestParse(t *testing.T) { TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "binsl", - Count: 1, + Name: "binsl", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "binstore", @@ -406,8 +413,9 @@ func TestParse(t *testing.T) { Region: "global", TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "group", - Count: 1, + Name: "group", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "task", diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index ed52a6a29..4371b7d7a 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -42,6 +42,11 @@ job "binstore-storagelocker" { mode = "delay" } + local_disk { + sticky = true + disk = 150 + } + task "binstore" { driver = "docker" user = "bob" diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 058ae16d3..8576aea50 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1541,6 +1541,9 @@ type TaskGroup struct { // Tasks are the collection of tasks that this task group needs to run Tasks []*Task + // LocalDisk is the disk resources that the task group requests + LocalDisk *LocalDisk + // Meta is used to associate arbitrary metadata with this // task group. This is opaque to Nomad. Meta map[string]string @@ -1613,6 +1616,14 @@ func (tg *TaskGroup) Validate() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name)) } + if tg.LocalDisk != nil { + if err := tg.LocalDisk.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } else { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a local disk object", tg.Name)) + } + // Check for duplicate tasks tasks := make(map[string]int) for idx, task := range tg.Tasks { @@ -2552,6 +2563,26 @@ func (c *Constraint) Validate() error { return mErr.ErrorOrNil() } +// LocalDisk is an ephemeral disk object +type LocalDisk struct { + Sticky bool + DiskMB int `mapstructure:"disk"` +} + +// DefaultLocalDisk returns a LocalDisk with default configurations +func DefaultLocalDisk() *LocalDisk { + return &LocalDisk{ + DiskMB: 300, + } +} + +func (d *LocalDisk) Validate() error { + if d.DiskMB < 10 { + return fmt.Errorf("minimum DiskMB value is 10; got %d", d.DiskMB) + } + return nil +} + // Vault stores the set of premissions a task needs access to from Vault. type Vault struct { // Policies is the set of policies that the task needs access to From 7da66e169c31c282a8aa02cc6b58dcfb5ec99604 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 25 Aug 2016 12:27:19 -0500 Subject: [PATCH 02/13] Making the scheduler use LocalDisk instead of Resources.DiskMB --- nomad/mock/mock.go | 5 +- scheduler/foo | 243 +++++++++++++++++++++++++++++++++++++++++ scheduler/rank.go | 32 ++++-- scheduler/rank_test.go | 70 +++++++----- scheduler/stack.go | 4 +- scheduler/util.go | 2 +- scheduler/util_test.go | 1 + 7 files changed, 315 insertions(+), 42 deletions(-) create mode 100644 scheduler/foo diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 937d09783..11f6b74ef 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -79,6 +79,9 @@ func Job() *structs.Job { &structs.TaskGroup{ Name: "web", Count: 10, + LocalDisk: &structs.LocalDisk{ + DiskMB: 150, + }, RestartPolicy: &structs.RestartPolicy{ Attempts: 3, Interval: 10 * time.Minute, @@ -120,7 +123,6 @@ func Job() *structs.Job { Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, - DiskMB: 150, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ MBits: 50, @@ -178,6 +180,7 @@ func SystemJob() *structs.Job { Delay: 1 * time.Minute, Mode: structs.RestartPolicyModeDelay, }, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "web", diff --git a/scheduler/foo b/scheduler/foo new file mode 100644 index 000000000..f3b79fc0e --- /dev/null +++ b/scheduler/foo @@ -0,0 +1,243 @@ +=== RUN TestAnnotateTaskGroup_Updates +--- PASS: TestAnnotateTaskGroup_Updates (0.00s) +=== RUN TestAnnotateCountChange_NonEdited +--- PASS: TestAnnotateCountChange_NonEdited (0.00s) +=== RUN TestAnnotateCountChange +--- PASS: TestAnnotateCountChange (0.00s) +=== RUN TestAnnotateTask_NonEdited +--- PASS: TestAnnotateTask_NonEdited (0.00s) +=== RUN TestAnnotateTask +--- PASS: TestAnnotateTask (0.00s) +=== RUN TestEvalContext_ProposedAlloc +--- PASS: TestEvalContext_ProposedAlloc (0.00s) +=== RUN TestEvalEligibility_JobStatus +false +--- PASS: TestEvalEligibility_JobStatus (0.00s) +=== RUN TestEvalEligibility_TaskGroupStatus +--- PASS: TestEvalEligibility_TaskGroupStatus (0.00s) +=== RUN TestEvalEligibility_SetJob +--- PASS: TestEvalEligibility_SetJob (0.00s) +=== RUN TestEvalEligibility_GetClasses +--- PASS: TestEvalEligibility_GetClasses (0.00s) +=== RUN TestStaticIterator_Reset +--- PASS: TestStaticIterator_Reset (0.00s) +=== RUN TestStaticIterator_SetNodes +--- PASS: TestStaticIterator_SetNodes (0.00s) +=== RUN TestRandomIterator +--- PASS: TestRandomIterator (0.00s) +=== RUN TestDriverChecker +--- PASS: TestDriverChecker (0.00s) +=== RUN TestConstraintChecker +--- PASS: TestConstraintChecker (0.00s) +=== RUN TestResolveConstraintTarget +--- PASS: TestResolveConstraintTarget (0.00s) +=== RUN TestCheckConstraint +--- PASS: TestCheckConstraint (0.00s) +=== RUN TestCheckLexicalOrder +--- PASS: TestCheckLexicalOrder (0.00s) +=== RUN TestCheckVersionConstraint +--- PASS: TestCheckVersionConstraint (0.00s) +=== RUN TestCheckRegexpConstraint +--- PASS: TestCheckRegexpConstraint (0.00s) +=== RUN TestProposedAllocConstraint_JobDistinctHosts +--- PASS: TestProposedAllocConstraint_JobDistinctHosts (0.00s) +=== RUN TestProposedAllocConstraint_JobDistinctHosts_Infeasible +--- PASS: TestProposedAllocConstraint_JobDistinctHosts_Infeasible (0.00s) +=== RUN TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount +--- PASS: TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount (0.00s) +=== RUN TestProposedAllocConstraint_TaskGroupDistinctHosts +--- PASS: TestProposedAllocConstraint_TaskGroupDistinctHosts (0.00s) +=== RUN TestFeasibilityWrapper_JobIneligible +--- PASS: TestFeasibilityWrapper_JobIneligible (0.00s) +=== RUN TestFeasibilityWrapper_JobEscapes +--- PASS: TestFeasibilityWrapper_JobEscapes (0.00s) +=== RUN TestFeasibilityWrapper_JobAndTg_Eligible +--- PASS: TestFeasibilityWrapper_JobAndTg_Eligible (0.00s) +=== RUN TestFeasibilityWrapper_JobEligible_TgIneligible +--- PASS: TestFeasibilityWrapper_JobEligible_TgIneligible (0.00s) +=== RUN TestFeasibilityWrapper_JobEligible_TgEscaped +--- PASS: TestFeasibilityWrapper_JobEligible_TgEscaped (0.00s) +=== RUN TestServiceSched_JobRegister +--- PASS: TestServiceSched_JobRegister (0.00s) +=== RUN TestServiceSched_JobRegister_Annotate +--- PASS: TestServiceSched_JobRegister_Annotate (0.00s) +=== RUN TestServiceSched_JobRegister_CountZero +--- PASS: TestServiceSched_JobRegister_CountZero (0.00s) +=== RUN TestServiceSched_JobRegister_AllocFail +--- PASS: TestServiceSched_JobRegister_AllocFail (0.00s) +=== RUN TestServiceSched_JobRegister_CreateBlockedEval +--- PASS: TestServiceSched_JobRegister_CreateBlockedEval (0.00s) +=== RUN TestServiceSched_JobRegister_FeasibleAndInfeasibleTG +--- PASS: TestServiceSched_JobRegister_FeasibleAndInfeasibleTG (0.00s) +=== RUN TestServiceSched_EvaluateMaxPlanEval +--- PASS: TestServiceSched_EvaluateMaxPlanEval (0.00s) +=== RUN TestServiceSched_Plan_Partial_Progress +--- PASS: TestServiceSched_Plan_Partial_Progress (0.00s) +=== RUN TestServiceSched_EvaluateBlockedEval +--- PASS: TestServiceSched_EvaluateBlockedEval (0.00s) +=== RUN TestServiceSched_EvaluateBlockedEval_Finished +--- PASS: TestServiceSched_EvaluateBlockedEval_Finished (0.00s) +=== RUN TestServiceSched_JobModify +--- PASS: TestServiceSched_JobModify (0.00s) +=== RUN TestServiceSched_JobModify_IncrCount_NodeLimit +--- PASS: TestServiceSched_JobModify_IncrCount_NodeLimit (0.00s) +=== RUN TestServiceSched_JobModify_CountZero +--- PASS: TestServiceSched_JobModify_CountZero (0.00s) +=== RUN TestServiceSched_JobModify_Rolling +--- PASS: TestServiceSched_JobModify_Rolling (0.00s) +=== RUN TestServiceSched_JobModify_InPlace +--- PASS: TestServiceSched_JobModify_InPlace (0.00s) +=== RUN TestServiceSched_JobDeregister +--- PASS: TestServiceSched_JobDeregister (0.00s) +=== RUN TestServiceSched_NodeDown +--- PASS: TestServiceSched_NodeDown (0.00s) +=== RUN TestServiceSched_NodeUpdate +--- PASS: TestServiceSched_NodeUpdate (0.00s) +=== RUN TestServiceSched_NodeDrain +--- PASS: TestServiceSched_NodeDrain (0.00s) +=== RUN TestServiceSched_NodeDrain_Down +--- PASS: TestServiceSched_NodeDrain_Down (0.00s) +=== RUN TestServiceSched_NodeDrain_Queued_Allocations +--- PASS: TestServiceSched_NodeDrain_Queued_Allocations (0.00s) +=== RUN TestServiceSched_NodeDrain_UpdateStrategy +--- PASS: TestServiceSched_NodeDrain_UpdateStrategy (0.00s) +=== RUN TestServiceSched_RetryLimit +--- PASS: TestServiceSched_RetryLimit (0.00s) +=== RUN TestBatchSched_Run_CompleteAlloc +--- PASS: TestBatchSched_Run_CompleteAlloc (0.00s) +=== RUN TestBatchSched_Run_DrainedAlloc +--- PASS: TestBatchSched_Run_DrainedAlloc (0.00s) +=== RUN TestBatchSched_Run_FailedAlloc +--- PASS: TestBatchSched_Run_FailedAlloc (0.00s) +=== RUN TestBatchSched_Run_FailedAllocQueuedAllocations +--- PASS: TestBatchSched_Run_FailedAllocQueuedAllocations (0.00s) +=== RUN TestBatchSched_ReRun_SuccessfullyFinishedAlloc +--- PASS: TestBatchSched_ReRun_SuccessfullyFinishedAlloc (0.00s) +=== RUN TestGenericSched_FilterCompleteAllocs +--- PASS: TestGenericSched_FilterCompleteAllocs (0.00s) +=== RUN TestGenericSched_ChainedAlloc +--- PASS: TestGenericSched_ChainedAlloc (0.00s) +=== RUN TestFeasibleRankIterator +--- PASS: TestFeasibleRankIterator (0.00s) +=== RUN TestBinPackIterator_NoExistingAlloc +--- FAIL: TestBinPackIterator_NoExistingAlloc (0.00s) + rank_test.go:88: Bad: [] +=== RUN TestBinPackIterator_PlannedAlloc +--- FAIL: TestBinPackIterator_PlannedAlloc (0.00s) + rank_test.go:167: Bad: []*scheduler.RankedNode(nil) +=== RUN TestBinPackIterator_ExistingAlloc +--- FAIL: TestBinPackIterator_ExistingAlloc (0.00s) + rank_test.go:252: Bad: []*scheduler.RankedNode(nil) +=== RUN TestBinPackIterator_ExistingAlloc_PlannedEvict +--- FAIL: TestBinPackIterator_ExistingAlloc_PlannedEvict (0.00s) + rank_test.go:341: Bad: []*scheduler.RankedNode(nil) +=== RUN TestJobAntiAffinity_PlannedAlloc +--- PASS: TestJobAntiAffinity_PlannedAlloc (0.00s) +=== RUN TestLimitIterator +--- PASS: TestLimitIterator (0.00s) +=== RUN TestMaxScoreIterator +--- PASS: TestMaxScoreIterator (0.00s) +=== RUN TestServiceStack_SetNodes +--- PASS: TestServiceStack_SetNodes (0.00s) +=== RUN TestServiceStack_SetJob +--- PASS: TestServiceStack_SetJob (0.00s) +=== RUN TestServiceStack_Select_Size +--- PASS: TestServiceStack_Select_Size (0.00s) +=== RUN TestServiceStack_Select_MetricsReset +--- PASS: TestServiceStack_Select_MetricsReset (0.00s) +=== RUN TestServiceStack_Select_DriverFilter +--- PASS: TestServiceStack_Select_DriverFilter (0.00s) +=== RUN TestServiceStack_Select_ConstraintFilter +--- PASS: TestServiceStack_Select_ConstraintFilter (0.00s) +=== RUN TestServiceStack_Select_BinPack_Overflow +--- PASS: TestServiceStack_Select_BinPack_Overflow (0.00s) +=== RUN TestSystemStack_SetNodes +--- PASS: TestSystemStack_SetNodes (0.00s) +=== RUN TestSystemStack_SetJob +--- PASS: TestSystemStack_SetJob (0.00s) +=== RUN TestSystemStack_Select_Size +--- PASS: TestSystemStack_Select_Size (0.00s) +=== RUN TestSystemStack_Select_MetricsReset +--- PASS: TestSystemStack_Select_MetricsReset (0.00s) +=== RUN TestSystemStack_Select_DriverFilter +--- PASS: TestSystemStack_Select_DriverFilter (0.00s) +=== RUN TestSystemStack_Select_ConstraintFilter +--- PASS: TestSystemStack_Select_ConstraintFilter (0.00s) +=== RUN TestSystemStack_Select_BinPack_Overflow +--- PASS: TestSystemStack_Select_BinPack_Overflow (0.00s) +=== RUN TestSystemSched_JobRegister +--- PASS: TestSystemSched_JobRegister (0.00s) +=== RUN TestSystemSched_ExhaustResources +--- PASS: TestSystemSched_ExhaustResources (0.00s) +=== RUN TestSystemSched_JobRegister_Annotate +--- PASS: TestSystemSched_JobRegister_Annotate (0.00s) +=== RUN TestSystemSched_JobRegister_AddNode +--- PASS: TestSystemSched_JobRegister_AddNode (0.00s) +=== RUN TestSystemSched_JobRegister_AllocFail +--- PASS: TestSystemSched_JobRegister_AllocFail (0.00s) +=== RUN TestSystemSched_JobModify +--- PASS: TestSystemSched_JobModify (0.00s) +=== RUN TestSystemSched_JobModify_Rolling +--- PASS: TestSystemSched_JobModify_Rolling (0.00s) +=== RUN TestSystemSched_JobModify_InPlace +--- PASS: TestSystemSched_JobModify_InPlace (0.00s) +=== RUN TestSystemSched_JobDeregister +--- PASS: TestSystemSched_JobDeregister (0.00s) +=== RUN TestSystemSched_NodeDown +--- PASS: TestSystemSched_NodeDown (0.00s) +=== RUN TestSystemSched_NodeDrain_Down +--- PASS: TestSystemSched_NodeDrain_Down (0.00s) +=== RUN TestSystemSched_NodeDrain +--- PASS: TestSystemSched_NodeDrain (0.00s) +=== RUN TestSystemSched_NodeUpdate +--- PASS: TestSystemSched_NodeUpdate (0.00s) +=== RUN TestSystemSched_RetryLimit +--- PASS: TestSystemSched_RetryLimit (0.00s) +=== RUN TestSystemSched_Queued_With_Constraints +--- PASS: TestSystemSched_Queued_With_Constraints (0.00s) +=== RUN TestSystemSched_ChainedAlloc +--- PASS: TestSystemSched_ChainedAlloc (0.00s) +=== RUN TestMaterializeTaskGroups +--- PASS: TestMaterializeTaskGroups (0.00s) +=== RUN TestDiffAllocs +--- PASS: TestDiffAllocs (0.00s) +=== RUN TestDiffSystemAllocs +--- PASS: TestDiffSystemAllocs (0.00s) +=== RUN TestReadyNodesInDCs +--- PASS: TestReadyNodesInDCs (0.00s) +=== RUN TestRetryMax +--- PASS: TestRetryMax (0.00s) +=== RUN TestTaintedNodes +--- PASS: TestTaintedNodes (0.00s) +=== RUN TestShuffleNodes +--- PASS: TestShuffleNodes (0.00s) +=== RUN TestTasksUpdated +--- PASS: TestTasksUpdated (0.00s) +=== RUN TestEvictAndPlace_LimitLessThanAllocs +--- PASS: TestEvictAndPlace_LimitLessThanAllocs (0.00s) +=== RUN TestEvictAndPlace_LimitEqualToAllocs +--- PASS: TestEvictAndPlace_LimitEqualToAllocs (0.00s) +=== RUN TestSetStatus +--- PASS: TestSetStatus (0.00s) +=== RUN TestInplaceUpdate_ChangedTaskGroup +--- PASS: TestInplaceUpdate_ChangedTaskGroup (0.00s) +=== RUN TestInplaceUpdate_NoMatch +--- PASS: TestInplaceUpdate_NoMatch (0.00s) +=== RUN TestInplaceUpdate_Success +--- PASS: TestInplaceUpdate_Success (0.00s) +=== RUN TestEvictAndPlace_LimitGreaterThanAllocs +--- PASS: TestEvictAndPlace_LimitGreaterThanAllocs (0.00s) +=== RUN TestTaskGroupConstraints +--- FAIL: TestTaskGroupConstraints (0.00s) + util_test.go:886: taskGroupConstraints(&{web 10 [ bar] [0xc4201624d0 0xc420162580] 0xc4202c6650 map[]}) returned &{1000 512 300 0 []}; want &{1000 512 0 0 []} +=== RUN TestProgressMade +--- PASS: TestProgressMade (0.00s) +=== RUN TestDesiredUpdates +--- PASS: TestDesiredUpdates (0.00s) +=== RUN TestUtil_AdjustQueuedAllocations +--- PASS: TestUtil_AdjustQueuedAllocations (0.00s) +=== RUN TestUtil_UpdateNonTerminalAllocsToLost +--- PASS: TestUtil_UpdateNonTerminalAllocsToLost (0.00s) +FAIL +exit status 1 +FAIL github.com/hashicorp/nomad/scheduler 0.070s diff --git a/scheduler/rank.go b/scheduler/rank.go index b3610078c..ddf63312b 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -10,9 +10,10 @@ import ( // along with a node when iterating. This state can be modified as // various rank methods are applied. type RankedNode struct { - Node *structs.Node - Score float64 - TaskResources map[string]*structs.Resources + Node *structs.Node + Score float64 + TaskResources map[string]*structs.Resources + AllocResources *structs.Resources // Allocs is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. @@ -44,6 +45,10 @@ func (r *RankedNode) SetTaskResources(task *structs.Task, r.TaskResources[task.Name] = resource } +func (r *RankedNode) SetAllocResources(resources *structs.Resources) { + r.AllocResources = resources +} + // RankFeasibleIterator is used to iteratively yield nodes along // with ranking metadata. The iterators may manage some state for // performance optimizations. @@ -131,11 +136,11 @@ func (iter *StaticRankIterator) Reset() { // BinPackIterator is a RankIterator that scores potential options // based on a bin-packing algorithm. type BinPackIterator struct { - ctx Context - source RankIterator - evict bool - priority int - tasks []*structs.Task + ctx Context + source RankIterator + evict bool + priority int + taskGroup *structs.TaskGroup } // NewBinPackIterator returns a BinPackIterator which tries to fit tasks @@ -154,8 +159,8 @@ func (iter *BinPackIterator) SetPriority(p int) { iter.priority = p } -func (iter *BinPackIterator) SetTasks(tasks []*structs.Task) { - iter.tasks = tasks +func (iter *BinPackIterator) SetTaskGroup(taskGroup *structs.TaskGroup) { + iter.taskGroup = taskGroup } func (iter *BinPackIterator) Next() *RankedNode { @@ -182,8 +187,10 @@ OUTER: netIdx.AddAllocs(proposed) // Assign the resources for each task - total := new(structs.Resources) - for _, task := range iter.tasks { + total := &structs.Resources{ + DiskMB: iter.taskGroup.LocalDisk.DiskMB, + } + for _, task := range iter.taskGroup.Tasks { taskResources := task.Resources.Copy() // Check if we need a network resource @@ -210,6 +217,7 @@ OUTER: // Accumulate the total resource requirement total.Add(taskResources) } + option.AllocResources = total // Add the resources we are trying to fit proposed = append(proposed, &structs.Allocation{Resources: total}) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index f33cd5521..071df4983 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -68,16 +68,20 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) { } static := NewStaticRankIterator(ctx, nodes) - task := &structs.Task{ - Name: "web", - Resources: &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + taskGroup := &structs.TaskGroup{ + LocalDisk: &structs.LocalDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, + }, }, } - binp := NewBinPackIterator(ctx, static, false, 0) - binp.SetTasks([]*structs.Task{task}) + binp.SetTaskGroup(taskGroup) out := collectRanked(binp) if len(out) != 2 { @@ -142,16 +146,21 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) { }, } - task := &structs.Task{ - Name: "web", - Resources: &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + taskGroup := &structs.TaskGroup{ + LocalDisk: &structs.LocalDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, + }, }, } binp := NewBinPackIterator(ctx, static, false, 0) - binp.SetTasks([]*structs.Task{task}) + binp.SetTaskGroup(taskGroup) out := collectRanked(binp) if len(out) != 1 { @@ -223,16 +232,20 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { noErr(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) - task := &structs.Task{ - Name: "web", - Resources: &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + taskGroup := &structs.TaskGroup{ + LocalDisk: &structs.LocalDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, + }, }, } - binp := NewBinPackIterator(ctx, static, false, 0) - binp.SetTasks([]*structs.Task{task}) + binp.SetTaskGroup(taskGroup) out := collectRanked(binp) if len(out) != 1 { @@ -307,16 +320,21 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { plan := ctx.Plan() plan.NodeUpdate[nodes[0].Node.ID] = []*structs.Allocation{alloc1} - task := &structs.Task{ - Name: "web", - Resources: &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + taskGroup := &structs.TaskGroup{ + LocalDisk: &structs.LocalDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, + }, }, } binp := NewBinPackIterator(ctx, static, false, 0) - binp.SetTasks([]*structs.Task{task}) + binp.SetTaskGroup(taskGroup) out := collectRanked(binp) if len(out) != 2 { diff --git a/scheduler/stack.go b/scheduler/stack.go index 98681980e..bea8c91e6 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -154,7 +154,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.proposedAllocConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) - s.binPack.SetTasks(tg.Tasks) + s.binPack.SetTaskGroup(tg) // Find the node with the max score option := s.maxScore.Next() @@ -242,7 +242,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resou // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) - s.binPack.SetTasks(tg.Tasks) + s.binPack.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) // Get the next option that satisfies the constraints. diff --git a/scheduler/util.go b/scheduler/util.go index accc13a13..0629686d5 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -540,7 +540,7 @@ func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple { c := tgConstrainTuple{ constraints: make([]*structs.Constraint, 0, len(tg.Constraints)), drivers: make(map[string]struct{}), - size: new(structs.Resources), + size: &structs.Resources{DiskMB: tg.LocalDisk.DiskMB}, } c.constraints = append(c.constraints, tg.Constraints...) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index e57db59b2..a63858613 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -846,6 +846,7 @@ func TestTaskGroupConstraints(t *testing.T) { Name: "web", Count: 10, Constraints: []*structs.Constraint{constr}, + LocalDisk: &structs.LocalDisk{}, Tasks: []*structs.Task{ &structs.Task{ Driver: "exec", From c9407d4e9fb37ff7126499665cc29cd53e13e58d Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 25 Aug 2016 13:00:20 -0500 Subject: [PATCH 03/13] Added an upgrade path for existing jobs with no local disk --- nomad/state/state_store.go | 39 ++++++++++++ nomad/state/state_store_test.go | 108 ++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index cee5935f3..47d304c19 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -357,6 +357,25 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("unable to create job summary: %v", err) } + // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB + // from task resources + for i, tg := range job.TaskGroups { + var diskMB int + for j, task := range tg.Tasks { + if task.Resources != nil { + resources := task.Resources + diskMB += resources.DiskMB + resources.DiskMB = 0 + task.Resources = resources + tg.Tasks[j] = task + } + } + tg.LocalDisk = &structs.LocalDisk{ + DiskMB: diskMB, + } + job.TaskGroups[i] = tg + } + // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -1690,6 +1709,26 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error { func (r *StateRestore) JobRestore(job *structs.Job) error { r.items.Add(watch.Item{Table: "jobs"}) r.items.Add(watch.Item{Job: job.ID}) + + // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB + // from task resources + for i, tg := range job.TaskGroups { + var diskMB int + for j, task := range tg.Tasks { + if task.Resources != nil { + resources := task.Resources + diskMB += resources.DiskMB + resources.DiskMB = 0 + task.Resources = resources + tg.Tasks[j] = task + } + } + tg.LocalDisk = &structs.LocalDisk{ + DiskMB: diskMB, + } + job.TaskGroups[i] = tg + } + if err := r.txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f272b9fb1..2312abee2 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -433,6 +433,69 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { notify.verify(t) } +// This test ensures that UpsertJob creates the LocalDisk is a job doesn't have +// one and clear out the task's disk resource asks +// COMPAT 0.4.1 -> 0.5 +func TestStateStore_UpsertJob_NoLocalDisk(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + + // Set the LocalDisk to nil and set the tasks's DiskMB to 150 + job.TaskGroups[0].LocalDisk = nil + job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150 + + notify := setupNotifyTest( + state, + watch.Item{Table: "jobs"}, + watch.Item{Job: job.ID}) + + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Expect the state store to create the LocalDisk and clear out Tasks's + // DiskMB + expected := job.Copy() + expected.TaskGroups[0].LocalDisk = &structs.LocalDisk{ + DiskMB: 150, + } + expected.TaskGroups[0].Tasks[0].Resources.DiskMB = 0 + + if !reflect.DeepEqual(expected, out) { + t.Fatalf("bad: %#v %#v", expected, out) + } + + index, err := state.Index("jobs") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1000 { + t.Fatalf("bad: %d", index) + } + + summary, err := state.JobSummaryByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if summary == nil { + t.Fatalf("nil summary") + } + if summary.JobID != job.ID { + t.Fatalf("bad summary id: %v", summary.JobID) + } + _, ok := summary.Summary["web"] + if !ok { + t.Fatalf("nil summary for task group") + } + notify.verify(t) +} + func TestStateStore_DeleteJob_Job(t *testing.T) { state := testStateStore(t) job := mock.Job() @@ -813,6 +876,51 @@ func TestStateStore_RestoreJob(t *testing.T) { notify.verify(t) } +// This test ensures that the state restore creates the LocalDisk for a job if +// it doesn't have one +// COMPAT 0.4.1 -> 0.5 +func TestStateStore_Jobs_NoLocalDisk(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + + // Set LocalDisk to nil and set the DiskMB to 150 + job.TaskGroups[0].LocalDisk = nil + job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150 + + notify := setupNotifyTest( + state, + watch.Item{Table: "jobs"}, + watch.Item{Job: job.ID}) + + restore, err := state.Restore() + if err != nil { + t.Fatalf("err: %v", err) + } + + err = restore.JobRestore(job) + if err != nil { + t.Fatalf("err: %v", err) + } + restore.Commit() + + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Expect job to have local disk and clear out the task's disk resource ask + expected := job.Copy() + expected.TaskGroups[0].LocalDisk = &structs.LocalDisk{ + DiskMB: 150, + } + expected.TaskGroups[0].Tasks[0].Resources.DiskMB = 0 + if !reflect.DeepEqual(out, expected) { + t.Fatalf("Bad: %#v %#v", out, job) + } + + notify.verify(t) +} + func TestStateStore_UpsertPeriodicLaunch(t *testing.T) { state := testStateStore(t) job := mock.Job() From 1adfb15f7eac1b7cdd50801b8338cb3963330154 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 25 Aug 2016 13:53:09 -0500 Subject: [PATCH 04/13] Fixed some tests --- nomad/state/state_store.go | 6 + nomad/structs/structs.go | 20 +-- nomad/structs/structs_test.go | 37 +++--- scheduler/foo | 243 ---------------------------------- scheduler/system_sched.go | 1 + 5 files changed, 41 insertions(+), 266 deletions(-) delete mode 100644 scheduler/foo diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 47d304c19..b8f5cf0cb 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -360,6 +360,9 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB // from task resources for i, tg := range job.TaskGroups { + if tg.LocalDisk != nil { + continue + } var diskMB int for j, task := range tg.Tasks { if task.Resources != nil { @@ -1713,6 +1716,9 @@ func (r *StateRestore) JobRestore(job *structs.Job) error { // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB // from task resources for i, tg := range job.TaskGroups { + if tg.LocalDisk != nil { + continue + } var diskMB int for j, task := range tg.Tasks { if task.Resources != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8576aea50..230a343f7 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -823,9 +823,6 @@ func (r *Resources) MeetsMinResources() error { if r.MemoryMB < 10 { mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum MemoryMB value is 10; got %d", r.MemoryMB)) } - if r.DiskMB < 10 { - mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum DiskMB value is 10; got %d", r.DiskMB)) - } if r.IOPS < 0 { mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum IOPS value is 0; got %d", r.IOPS)) } @@ -1638,7 +1635,7 @@ func (tg *TaskGroup) Validate() error { // Validate the tasks for _, task := range tg.Tasks { - if err := task.Validate(); err != nil { + if err := task.Validate(tg.LocalDisk); err != nil { outer := fmt.Errorf("Task %s validation failed: %s", task.Name, err) mErr.Errors = append(mErr.Errors, outer) } @@ -2036,7 +2033,7 @@ func (t *Task) FindHostAndPortFor(portLabel string) (string, int) { } // Validate is used to sanity check a task -func (t *Task) Validate() error { +func (t *Task) Validate(localDisk *LocalDisk) error { var mErr multierror.Error if t.Name == "" { mErr.Errors = append(mErr.Errors, errors.New("Missing task name")) @@ -2060,6 +2057,13 @@ func (t *Task) Validate() error { mErr.Errors = append(mErr.Errors, err) } + // Esnure the task isn't asking for disk resources + if t.Resources != nil { + if t.Resources.DiskMB > 0 { + mErr.Errors = append(mErr.Errors, errors.New("Task can't ask for disk resources, they have to be specified at the task group level.")) + } + } + // Validate the log config if t.LogConfig == nil { mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config")) @@ -2079,12 +2083,12 @@ func (t *Task) Validate() error { mErr.Errors = append(mErr.Errors, err) } - if t.LogConfig != nil && t.Resources != nil { + if t.LogConfig != nil && localDisk != nil { logUsage := (t.LogConfig.MaxFiles * t.LogConfig.MaxFileSizeMB) - if t.Resources.DiskMB <= logUsage { + if localDisk.DiskMB <= logUsage { mErr.Errors = append(mErr.Errors, fmt.Errorf("log storage (%d MB) must be less than requested disk capacity (%d MB)", - logUsage, t.Resources.DiskMB)) + logUsage, localDisk.DiskMB)) } } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index db9857571..21de9343e 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -115,8 +115,9 @@ func testJob() *Job { }, TaskGroups: []*TaskGroup{ &TaskGroup{ - Name: "web", - Count: 10, + Name: "web", + Count: 10, + LocalDisk: DefaultLocalDisk(), RestartPolicy: &RestartPolicy{ Mode: RestartPolicyModeFail, Attempts: 3, @@ -345,20 +346,24 @@ func TestTaskGroup_Validate(t *testing.T) { err = tg.Validate() mErr = err.(*multierror.Error) - if !strings.Contains(mErr.Errors[0].Error(), "2 redefines 'web' from task 1") { + if !strings.Contains(mErr.Errors[0].Error(), "should have a local disk object") { t.Fatalf("err: %s", err) } - if !strings.Contains(mErr.Errors[1].Error(), "Task 3 missing name") { + if !strings.Contains(mErr.Errors[1].Error(), "2 redefines 'web' from task 1") { t.Fatalf("err: %s", err) } - if !strings.Contains(mErr.Errors[2].Error(), "Task web validation failed") { + if !strings.Contains(mErr.Errors[2].Error(), "Task 3 missing name") { + t.Fatalf("err: %s", err) + } + if !strings.Contains(mErr.Errors[3].Error(), "Task web validation failed") { t.Fatalf("err: %s", err) } } func TestTask_Validate(t *testing.T) { task := &Task{} - err := task.Validate() + localDisk := DefaultLocalDisk() + err := task.Validate(localDisk) mErr := err.(*multierror.Error) if !strings.Contains(mErr.Errors[0].Error(), "task name") { t.Fatalf("err: %s", err) @@ -371,7 +376,7 @@ func TestTask_Validate(t *testing.T) { } task = &Task{Name: "web/foo"} - err = task.Validate() + err = task.Validate(localDisk) mErr = err.(*multierror.Error) if !strings.Contains(mErr.Errors[0].Error(), "slashes") { t.Fatalf("err: %s", err) @@ -382,13 +387,13 @@ func TestTask_Validate(t *testing.T) { Driver: "docker", Resources: &Resources{ CPU: 100, - DiskMB: 200, MemoryMB: 100, IOPS: 10, }, LogConfig: DefaultLogConfig(), } - err = task.Validate() + localDisk.DiskMB = 200 + err = task.Validate(localDisk) if err != nil { t.Fatalf("err: %s", err) } @@ -416,18 +421,20 @@ func TestTask_Validate_Services(t *testing.T) { Name: "service-name", } + localDisk := DefaultLocalDisk() task := &Task{ Name: "web", Driver: "docker", Resources: &Resources{ CPU: 100, - DiskMB: 200, MemoryMB: 100, IOPS: 10, }, Services: []*Service{s1, s2}, } - err := task.Validate() + localDisk.DiskMB = 200 + + err := task.Validate(localDisk) if err == nil { t.Fatal("expected an error") } @@ -494,12 +501,12 @@ func TestTask_Validate_Service_Check(t *testing.T) { func TestTask_Validate_LogConfig(t *testing.T) { task := &Task{ LogConfig: DefaultLogConfig(), - Resources: &Resources{ - DiskMB: 1, - }, + } + localDisk := &LocalDisk{ + DiskMB: 1, } - err := task.Validate() + err := task.Validate(localDisk) mErr := err.(*multierror.Error) if !strings.Contains(mErr.Errors[3].Error(), "log storage") { t.Fatalf("err: %s", err) diff --git a/scheduler/foo b/scheduler/foo deleted file mode 100644 index f3b79fc0e..000000000 --- a/scheduler/foo +++ /dev/null @@ -1,243 +0,0 @@ -=== RUN TestAnnotateTaskGroup_Updates ---- PASS: TestAnnotateTaskGroup_Updates (0.00s) -=== RUN TestAnnotateCountChange_NonEdited ---- PASS: TestAnnotateCountChange_NonEdited (0.00s) -=== RUN TestAnnotateCountChange ---- PASS: TestAnnotateCountChange (0.00s) -=== RUN TestAnnotateTask_NonEdited ---- PASS: TestAnnotateTask_NonEdited (0.00s) -=== RUN TestAnnotateTask ---- PASS: TestAnnotateTask (0.00s) -=== RUN TestEvalContext_ProposedAlloc ---- PASS: TestEvalContext_ProposedAlloc (0.00s) -=== RUN TestEvalEligibility_JobStatus -false ---- PASS: TestEvalEligibility_JobStatus (0.00s) -=== RUN TestEvalEligibility_TaskGroupStatus ---- PASS: TestEvalEligibility_TaskGroupStatus (0.00s) -=== RUN TestEvalEligibility_SetJob ---- PASS: TestEvalEligibility_SetJob (0.00s) -=== RUN TestEvalEligibility_GetClasses ---- PASS: TestEvalEligibility_GetClasses (0.00s) -=== RUN TestStaticIterator_Reset ---- PASS: TestStaticIterator_Reset (0.00s) -=== RUN TestStaticIterator_SetNodes ---- PASS: TestStaticIterator_SetNodes (0.00s) -=== RUN TestRandomIterator ---- PASS: TestRandomIterator (0.00s) -=== RUN TestDriverChecker ---- PASS: TestDriverChecker (0.00s) -=== RUN TestConstraintChecker ---- PASS: TestConstraintChecker (0.00s) -=== RUN TestResolveConstraintTarget ---- PASS: TestResolveConstraintTarget (0.00s) -=== RUN TestCheckConstraint ---- PASS: TestCheckConstraint (0.00s) -=== RUN TestCheckLexicalOrder ---- PASS: TestCheckLexicalOrder (0.00s) -=== RUN TestCheckVersionConstraint ---- PASS: TestCheckVersionConstraint (0.00s) -=== RUN TestCheckRegexpConstraint ---- PASS: TestCheckRegexpConstraint (0.00s) -=== RUN TestProposedAllocConstraint_JobDistinctHosts ---- PASS: TestProposedAllocConstraint_JobDistinctHosts (0.00s) -=== RUN TestProposedAllocConstraint_JobDistinctHosts_Infeasible ---- PASS: TestProposedAllocConstraint_JobDistinctHosts_Infeasible (0.00s) -=== RUN TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount ---- PASS: TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount (0.00s) -=== RUN TestProposedAllocConstraint_TaskGroupDistinctHosts ---- PASS: TestProposedAllocConstraint_TaskGroupDistinctHosts (0.00s) -=== RUN TestFeasibilityWrapper_JobIneligible ---- PASS: TestFeasibilityWrapper_JobIneligible (0.00s) -=== RUN TestFeasibilityWrapper_JobEscapes ---- PASS: TestFeasibilityWrapper_JobEscapes (0.00s) -=== RUN TestFeasibilityWrapper_JobAndTg_Eligible ---- PASS: TestFeasibilityWrapper_JobAndTg_Eligible (0.00s) -=== RUN TestFeasibilityWrapper_JobEligible_TgIneligible ---- PASS: TestFeasibilityWrapper_JobEligible_TgIneligible (0.00s) -=== RUN TestFeasibilityWrapper_JobEligible_TgEscaped ---- PASS: TestFeasibilityWrapper_JobEligible_TgEscaped (0.00s) -=== RUN TestServiceSched_JobRegister ---- PASS: TestServiceSched_JobRegister (0.00s) -=== RUN TestServiceSched_JobRegister_Annotate ---- PASS: TestServiceSched_JobRegister_Annotate (0.00s) -=== RUN TestServiceSched_JobRegister_CountZero ---- PASS: TestServiceSched_JobRegister_CountZero (0.00s) -=== RUN TestServiceSched_JobRegister_AllocFail ---- PASS: TestServiceSched_JobRegister_AllocFail (0.00s) -=== RUN TestServiceSched_JobRegister_CreateBlockedEval ---- PASS: TestServiceSched_JobRegister_CreateBlockedEval (0.00s) -=== RUN TestServiceSched_JobRegister_FeasibleAndInfeasibleTG ---- PASS: TestServiceSched_JobRegister_FeasibleAndInfeasibleTG (0.00s) -=== RUN TestServiceSched_EvaluateMaxPlanEval ---- PASS: TestServiceSched_EvaluateMaxPlanEval (0.00s) -=== RUN TestServiceSched_Plan_Partial_Progress ---- PASS: TestServiceSched_Plan_Partial_Progress (0.00s) -=== RUN TestServiceSched_EvaluateBlockedEval ---- PASS: TestServiceSched_EvaluateBlockedEval (0.00s) -=== RUN TestServiceSched_EvaluateBlockedEval_Finished ---- PASS: TestServiceSched_EvaluateBlockedEval_Finished (0.00s) -=== RUN TestServiceSched_JobModify ---- PASS: TestServiceSched_JobModify (0.00s) -=== RUN TestServiceSched_JobModify_IncrCount_NodeLimit ---- PASS: TestServiceSched_JobModify_IncrCount_NodeLimit (0.00s) -=== RUN TestServiceSched_JobModify_CountZero ---- PASS: TestServiceSched_JobModify_CountZero (0.00s) -=== RUN TestServiceSched_JobModify_Rolling ---- PASS: TestServiceSched_JobModify_Rolling (0.00s) -=== RUN TestServiceSched_JobModify_InPlace ---- PASS: TestServiceSched_JobModify_InPlace (0.00s) -=== RUN TestServiceSched_JobDeregister ---- PASS: TestServiceSched_JobDeregister (0.00s) -=== RUN TestServiceSched_NodeDown ---- PASS: TestServiceSched_NodeDown (0.00s) -=== RUN TestServiceSched_NodeUpdate ---- PASS: TestServiceSched_NodeUpdate (0.00s) -=== RUN TestServiceSched_NodeDrain ---- PASS: TestServiceSched_NodeDrain (0.00s) -=== RUN TestServiceSched_NodeDrain_Down ---- PASS: TestServiceSched_NodeDrain_Down (0.00s) -=== RUN TestServiceSched_NodeDrain_Queued_Allocations ---- PASS: TestServiceSched_NodeDrain_Queued_Allocations (0.00s) -=== RUN TestServiceSched_NodeDrain_UpdateStrategy ---- PASS: TestServiceSched_NodeDrain_UpdateStrategy (0.00s) -=== RUN TestServiceSched_RetryLimit ---- PASS: TestServiceSched_RetryLimit (0.00s) -=== RUN TestBatchSched_Run_CompleteAlloc ---- PASS: TestBatchSched_Run_CompleteAlloc (0.00s) -=== RUN TestBatchSched_Run_DrainedAlloc ---- PASS: TestBatchSched_Run_DrainedAlloc (0.00s) -=== RUN TestBatchSched_Run_FailedAlloc ---- PASS: TestBatchSched_Run_FailedAlloc (0.00s) -=== RUN TestBatchSched_Run_FailedAllocQueuedAllocations ---- PASS: TestBatchSched_Run_FailedAllocQueuedAllocations (0.00s) -=== RUN TestBatchSched_ReRun_SuccessfullyFinishedAlloc ---- PASS: TestBatchSched_ReRun_SuccessfullyFinishedAlloc (0.00s) -=== RUN TestGenericSched_FilterCompleteAllocs ---- PASS: TestGenericSched_FilterCompleteAllocs (0.00s) -=== RUN TestGenericSched_ChainedAlloc ---- PASS: TestGenericSched_ChainedAlloc (0.00s) -=== RUN TestFeasibleRankIterator ---- PASS: TestFeasibleRankIterator (0.00s) -=== RUN TestBinPackIterator_NoExistingAlloc ---- FAIL: TestBinPackIterator_NoExistingAlloc (0.00s) - rank_test.go:88: Bad: [] -=== RUN TestBinPackIterator_PlannedAlloc ---- FAIL: TestBinPackIterator_PlannedAlloc (0.00s) - rank_test.go:167: Bad: []*scheduler.RankedNode(nil) -=== RUN TestBinPackIterator_ExistingAlloc ---- FAIL: TestBinPackIterator_ExistingAlloc (0.00s) - rank_test.go:252: Bad: []*scheduler.RankedNode(nil) -=== RUN TestBinPackIterator_ExistingAlloc_PlannedEvict ---- FAIL: TestBinPackIterator_ExistingAlloc_PlannedEvict (0.00s) - rank_test.go:341: Bad: []*scheduler.RankedNode(nil) -=== RUN TestJobAntiAffinity_PlannedAlloc ---- PASS: TestJobAntiAffinity_PlannedAlloc (0.00s) -=== RUN TestLimitIterator ---- PASS: TestLimitIterator (0.00s) -=== RUN TestMaxScoreIterator ---- PASS: TestMaxScoreIterator (0.00s) -=== RUN TestServiceStack_SetNodes ---- PASS: TestServiceStack_SetNodes (0.00s) -=== RUN TestServiceStack_SetJob ---- PASS: TestServiceStack_SetJob (0.00s) -=== RUN TestServiceStack_Select_Size ---- PASS: TestServiceStack_Select_Size (0.00s) -=== RUN TestServiceStack_Select_MetricsReset ---- PASS: TestServiceStack_Select_MetricsReset (0.00s) -=== RUN TestServiceStack_Select_DriverFilter ---- PASS: TestServiceStack_Select_DriverFilter (0.00s) -=== RUN TestServiceStack_Select_ConstraintFilter ---- PASS: TestServiceStack_Select_ConstraintFilter (0.00s) -=== RUN TestServiceStack_Select_BinPack_Overflow ---- PASS: TestServiceStack_Select_BinPack_Overflow (0.00s) -=== RUN TestSystemStack_SetNodes ---- PASS: TestSystemStack_SetNodes (0.00s) -=== RUN TestSystemStack_SetJob ---- PASS: TestSystemStack_SetJob (0.00s) -=== RUN TestSystemStack_Select_Size ---- PASS: TestSystemStack_Select_Size (0.00s) -=== RUN TestSystemStack_Select_MetricsReset ---- PASS: TestSystemStack_Select_MetricsReset (0.00s) -=== RUN TestSystemStack_Select_DriverFilter ---- PASS: TestSystemStack_Select_DriverFilter (0.00s) -=== RUN TestSystemStack_Select_ConstraintFilter ---- PASS: TestSystemStack_Select_ConstraintFilter (0.00s) -=== RUN TestSystemStack_Select_BinPack_Overflow ---- PASS: TestSystemStack_Select_BinPack_Overflow (0.00s) -=== RUN TestSystemSched_JobRegister ---- PASS: TestSystemSched_JobRegister (0.00s) -=== RUN TestSystemSched_ExhaustResources ---- PASS: TestSystemSched_ExhaustResources (0.00s) -=== RUN TestSystemSched_JobRegister_Annotate ---- PASS: TestSystemSched_JobRegister_Annotate (0.00s) -=== RUN TestSystemSched_JobRegister_AddNode ---- PASS: TestSystemSched_JobRegister_AddNode (0.00s) -=== RUN TestSystemSched_JobRegister_AllocFail ---- PASS: TestSystemSched_JobRegister_AllocFail (0.00s) -=== RUN TestSystemSched_JobModify ---- PASS: TestSystemSched_JobModify (0.00s) -=== RUN TestSystemSched_JobModify_Rolling ---- PASS: TestSystemSched_JobModify_Rolling (0.00s) -=== RUN TestSystemSched_JobModify_InPlace ---- PASS: TestSystemSched_JobModify_InPlace (0.00s) -=== RUN TestSystemSched_JobDeregister ---- PASS: TestSystemSched_JobDeregister (0.00s) -=== RUN TestSystemSched_NodeDown ---- PASS: TestSystemSched_NodeDown (0.00s) -=== RUN TestSystemSched_NodeDrain_Down ---- PASS: TestSystemSched_NodeDrain_Down (0.00s) -=== RUN TestSystemSched_NodeDrain ---- PASS: TestSystemSched_NodeDrain (0.00s) -=== RUN TestSystemSched_NodeUpdate ---- PASS: TestSystemSched_NodeUpdate (0.00s) -=== RUN TestSystemSched_RetryLimit ---- PASS: TestSystemSched_RetryLimit (0.00s) -=== RUN TestSystemSched_Queued_With_Constraints ---- PASS: TestSystemSched_Queued_With_Constraints (0.00s) -=== RUN TestSystemSched_ChainedAlloc ---- PASS: TestSystemSched_ChainedAlloc (0.00s) -=== RUN TestMaterializeTaskGroups ---- PASS: TestMaterializeTaskGroups (0.00s) -=== RUN TestDiffAllocs ---- PASS: TestDiffAllocs (0.00s) -=== RUN TestDiffSystemAllocs ---- PASS: TestDiffSystemAllocs (0.00s) -=== RUN TestReadyNodesInDCs ---- PASS: TestReadyNodesInDCs (0.00s) -=== RUN TestRetryMax ---- PASS: TestRetryMax (0.00s) -=== RUN TestTaintedNodes ---- PASS: TestTaintedNodes (0.00s) -=== RUN TestShuffleNodes ---- PASS: TestShuffleNodes (0.00s) -=== RUN TestTasksUpdated ---- PASS: TestTasksUpdated (0.00s) -=== RUN TestEvictAndPlace_LimitLessThanAllocs ---- PASS: TestEvictAndPlace_LimitLessThanAllocs (0.00s) -=== RUN TestEvictAndPlace_LimitEqualToAllocs ---- PASS: TestEvictAndPlace_LimitEqualToAllocs (0.00s) -=== RUN TestSetStatus ---- PASS: TestSetStatus (0.00s) -=== RUN TestInplaceUpdate_ChangedTaskGroup ---- PASS: TestInplaceUpdate_ChangedTaskGroup (0.00s) -=== RUN TestInplaceUpdate_NoMatch ---- PASS: TestInplaceUpdate_NoMatch (0.00s) -=== RUN TestInplaceUpdate_Success ---- PASS: TestInplaceUpdate_Success (0.00s) -=== RUN TestEvictAndPlace_LimitGreaterThanAllocs ---- PASS: TestEvictAndPlace_LimitGreaterThanAllocs (0.00s) -=== RUN TestTaskGroupConstraints ---- FAIL: TestTaskGroupConstraints (0.00s) - util_test.go:886: taskGroupConstraints(&{web 10 [ bar] [0xc4201624d0 0xc420162580] 0xc4202c6650 map[]}) returned &{1000 512 300 0 []}; want &{1000 512 0 0 []} -=== RUN TestProgressMade ---- PASS: TestProgressMade (0.00s) -=== RUN TestDesiredUpdates ---- PASS: TestDesiredUpdates (0.00s) -=== RUN TestUtil_AdjustQueuedAllocations ---- PASS: TestUtil_AdjustQueuedAllocations (0.00s) -=== RUN TestUtil_UpdateNonTerminalAllocsToLost ---- PASS: TestUtil_UpdateNonTerminalAllocsToLost (0.00s) -FAIL -exit status 1 -FAIL github.com/hashicorp/nomad/scheduler 0.070s diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index df1badf36..afb6da80f 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -317,6 +317,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, TaskResources: option.TaskResources, + Resources: option.AllocResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, } From 0319d5fe64ec09cea3551b1954ba5f9ffbbabc6c Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 25 Aug 2016 15:26:28 -0500 Subject: [PATCH 05/13] Added scheduler tests to ensure disk constraints are honored --- nomad/state/state_store_test.go | 29 -------------- nomad/structs/structs.go | 5 ++- scheduler/generic_sched.go | 1 + scheduler/generic_sched_test.go | 69 ++++++++++++++++++++++++++++++++- scheduler/system_sched_test.go | 62 +++++++++++++++++++++++++++++ 5 files changed, 134 insertions(+), 32 deletions(-) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 2312abee2..ddd31353f 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -444,11 +444,6 @@ func TestStateStore_UpsertJob_NoLocalDisk(t *testing.T) { job.TaskGroups[0].LocalDisk = nil job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150 - notify := setupNotifyTest( - state, - watch.Item{Table: "jobs"}, - watch.Item{Job: job.ID}) - err := state.UpsertJob(1000, job) if err != nil { t.Fatalf("err: %v", err) @@ -470,30 +465,6 @@ func TestStateStore_UpsertJob_NoLocalDisk(t *testing.T) { if !reflect.DeepEqual(expected, out) { t.Fatalf("bad: %#v %#v", expected, out) } - - index, err := state.Index("jobs") - if err != nil { - t.Fatalf("err: %v", err) - } - if index != 1000 { - t.Fatalf("bad: %d", index) - } - - summary, err := state.JobSummaryByID(job.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if summary == nil { - t.Fatalf("nil summary") - } - if summary.JobID != job.ID { - t.Fatalf("bad summary id: %v", summary.JobID) - } - _, ok := summary.Summary["web"] - if !ok { - t.Fatalf("nil summary for task group") - } - notify.verify(t) } func TestStateStore_DeleteJob_Job(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 230a343f7..4ba0b5c21 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -772,7 +772,6 @@ func DefaultResources() *Resources { return &Resources{ CPU: 100, MemoryMB: 10, - DiskMB: 300, IOPS: 0, } } @@ -2569,7 +2568,10 @@ func (c *Constraint) Validate() error { // LocalDisk is an ephemeral disk object type LocalDisk struct { + // Sticky indicates whether the allocation is sticky to a node Sticky bool + + // DiskMB is the size of the local disk DiskMB int `mapstructure:"disk"` } @@ -2580,6 +2582,7 @@ func DefaultLocalDisk() *LocalDisk { } } +// Validate validates LocalDisk func (d *LocalDisk) Validate() error { if d.DiskMB < 10 { return fmt.Errorf("minimum DiskMB value is 10; got %d", d.DiskMB) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 48b1c65b4..613fdd313 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -452,6 +452,7 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { TaskGroup: missing.TaskGroup.Name, Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, + Resources: option.AllocResources, TaskResources: option.TaskResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index a5b37c988..03fe89b11 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -50,8 +50,8 @@ func TestServiceSched_JobRegister(t *testing.T) { } // Ensure the eval has no spawned blocked eval - if len(h.Evals) != 1 { - t.Fatalf("bad: %#v", h.Evals) + if len(h.CreateEvals) != 0 { + t.Fatalf("bad: %#v", h.CreateEvals) if h.Evals[0].BlockedEval != "" { t.Fatalf("bad: %#v", h.Evals[0]) } @@ -91,6 +91,71 @@ func TestServiceSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { + h := NewHarness(t) + + // Create a node + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job with count 2 and disk as 60GB so that only one allocation + // can fit + job := mock.Job() + job.TaskGroups[0].Count = 2 + job.TaskGroups[0].LocalDisk.DiskMB = 88 * 1024 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval has a blocked eval + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + // Ensure the plan allocated only one allocation + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure only one allocation was placed + if len(out) != 1 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 3ad4fd852..043fbe854 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -80,6 +80,68 @@ func TestSystemSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestSystemSched_JobRegister_LocalDiskConstraint(t *testing.T) { + h := NewHarness(t) + + // Create a nodes + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job + job := mock.SystemJob() + job.TaskGroups[0].LocalDisk.DiskMB = 60 * 1024 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create another job with a lot of disk resource ask so that it doesn't fit + // the node + job1 := mock.SystemJob() + job1.TaskGroups[0].LocalDisk.DiskMB = 60 * 1024 + noErr(t, h.State.UpsertJob(h.NextIndex(), job1)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + if err := h.Process(NewSystemScheduler, eval); err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 1 { + t.Fatalf("bad: %#v", out) + } + + // Create a new harness to test the scheduling result for the second job + h1 := NewHarnessWithState(t, h.State) + // Create a mock evaluation to register the job + eval1 := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job1.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job1.ID, + } + + // Process the evaluation + if err := h1.Process(NewSystemScheduler, eval1); err != nil { + t.Fatalf("err: %v", err) + } + + out, err = h1.State.AllocsByJob(job1.ID) + noErr(t, err) + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } +} + func TestSystemSched_ExhaustResources(t *testing.T) { h := NewHarness(t) From 183294644ea89a8304f804ad0f96e67db0552529 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 25 Aug 2016 22:10:25 -0500 Subject: [PATCH 06/13] Fixed api tests --- api/tasks.go | 6 ++++++ api/util_test.go | 10 +++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index fff9d79bf..6b37356ce 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -128,6 +128,12 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup { return g } +// RequireDisk adds a local disk to the task group +func (g *TaskGroup) RequireDisk(disk *LocalDisk) *TaskGroup { + g.LocalDisk = disk + return g +} + // LogConfig provides configuration for log rotation type LogConfig struct { MaxFiles int diff --git a/api/util_test.go b/api/util_test.go index c91ae0738..ab2cefe80 100644 --- a/api/util_test.go +++ b/api/util_test.go @@ -1,8 +1,6 @@ package api -import ( - "testing" -) +import "testing" func assertQueryMeta(t *testing.T, qm *QueryMeta) { if qm.LastIndex == 0 { @@ -25,7 +23,6 @@ func testJob() *Job { Require(&Resources{ CPU: 100, MemoryMB: 256, - DiskMB: 25, IOPS: 10, }). SetLogConfig(&LogConfig{ @@ -34,7 +31,10 @@ func testJob() *Job { }) group := NewTaskGroup("group1", 1). - AddTask(task) + AddTask(task). + RequireDisk(&LocalDisk{ + DiskMB: 25, + }) job := NewBatchJob("job1", "redis", "region1", 1). AddDatacenter("dc1"). From efb0d3de8a305981cb007b17c0df88d2c656844b Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 25 Aug 2016 23:05:21 -0500 Subject: [PATCH 07/13] Fixed some more tests --- command/plan_test.go | 2 -- command/run_test.go | 3 --- command/util_test.go | 6 ++++-- command/validate_test.go | 2 -- 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/command/plan_test.go b/command/plan_test.go index 79793301e..b56f0a5cd 100644 --- a/command/plan_test.go +++ b/command/plan_test.go @@ -85,7 +85,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } @@ -125,7 +124,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } diff --git a/command/run_test.go b/command/run_test.go index 840925e45..f8a5212be 100644 --- a/command/run_test.go +++ b/command/run_test.go @@ -32,7 +32,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } @@ -121,7 +120,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } @@ -171,7 +169,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } diff --git a/command/util_test.go b/command/util_test.go index 00fdd339d..3a0479cf6 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -45,7 +45,6 @@ func testJob(jobID string) *api.Job { SetConfig("exit_code", 0). Require(&api.Resources{ MemoryMB: 256, - DiskMB: 20, CPU: 100, }). SetLogConfig(&api.LogConfig{ @@ -54,7 +53,10 @@ func testJob(jobID string) *api.Job { }) group := api.NewTaskGroup("group1", 1). - AddTask(task) + AddTask(task). + RequireDisk(&api.LocalDisk{ + DiskMB: 20, + }) job := api.NewBatchJob(jobID, jobID, "region1", 1). AddDatacenter("dc1"). diff --git a/command/validate_test.go b/command/validate_test.go index ff9f42b5e..11be97511 100644 --- a/command/validate_test.go +++ b/command/validate_test.go @@ -32,7 +32,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } @@ -126,7 +125,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } From dba0d80ee34b8b6a9986222c18e9b4b5f7951bf0 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 26 Aug 2016 01:51:19 -0500 Subject: [PATCH 08/13] Fixed more tests --- jobspec/parse_test.go | 3 --- nomad/structs/structs_test.go | 1 - 2 files changed, 4 deletions(-) diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index e8f8821c7..d429551dc 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -128,7 +128,6 @@ func TestParse(t *testing.T) { Resources: &structs.Resources{ CPU: 500, MemoryMB: 128, - DiskMB: 300, IOPS: 0, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ @@ -173,7 +172,6 @@ func TestParse(t *testing.T) { Resources: &structs.Resources{ CPU: 500, MemoryMB: 128, - DiskMB: 300, IOPS: 30, }, Constraints: []*structs.Constraint{ @@ -372,7 +370,6 @@ func TestParse(t *testing.T) { Resources: &structs.Resources{ CPU: 100, MemoryMB: 10, - DiskMB: 300, IOPS: 0, }, LogConfig: &structs.LogConfig{ diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 21de9343e..214788529 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -148,7 +148,6 @@ func testJob() *Job { Resources: &Resources{ CPU: 500, MemoryMB: 256, - DiskMB: 20, Networks: []*NetworkResource{ &NetworkResource{ MBits: 50, From 54fe53742d5634cf073e2d519e6f0a16b34bbb0c Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 26 Aug 2016 14:24:47 -0500 Subject: [PATCH 09/13] Added copy method to LocalDisk --- jobspec/test-fixtures/multi-resource.hcl | 6 ++++++ nomad/state/state_store.go | 18 ++++++------------ nomad/structs/structs.go | 13 ++++++++++++- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/jobspec/test-fixtures/multi-resource.hcl b/jobspec/test-fixtures/multi-resource.hcl index 3058ea485..5e3197487 100644 --- a/jobspec/test-fixtures/multi-resource.hcl +++ b/jobspec/test-fixtures/multi-resource.hcl @@ -1,5 +1,11 @@ job "binstore-storagelocker" { group "binsl" { + local_disk { + disk = 500 + } + local_disk { + disk = 100 + } count = 5 task "binstore" { driver = "docker" diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index b8f5cf0cb..83d4f369e 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -364,13 +364,10 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { continue } var diskMB int - for j, task := range tg.Tasks { + for _, task := range tg.Tasks { if task.Resources != nil { - resources := task.Resources - diskMB += resources.DiskMB - resources.DiskMB = 0 - task.Resources = resources - tg.Tasks[j] = task + diskMB += task.Resources.DiskMB + task.Resources.DiskMB = 0 } } tg.LocalDisk = &structs.LocalDisk{ @@ -1720,13 +1717,10 @@ func (r *StateRestore) JobRestore(job *structs.Job) error { continue } var diskMB int - for j, task := range tg.Tasks { + for _, task := range tg.Tasks { if task.Resources != nil { - resources := task.Resources - diskMB += resources.DiskMB - resources.DiskMB = 0 - task.Resources = resources - tg.Tasks[j] = task + diskMB += task.Resources.DiskMB + task.Resources.DiskMB = 0 } } tg.LocalDisk = &structs.LocalDisk{ diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 4ba0b5c21..b448cc419 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1564,6 +1564,10 @@ func (tg *TaskGroup) Copy() *TaskGroup { } ntg.Meta = CopyMapStringString(ntg.Meta) + + if tg.LocalDisk != nil { + ntg.LocalDisk = tg.LocalDisk.Copy() + } return ntg } @@ -2056,7 +2060,7 @@ func (t *Task) Validate(localDisk *LocalDisk) error { mErr.Errors = append(mErr.Errors, err) } - // Esnure the task isn't asking for disk resources + // Ensure the task isn't asking for disk resources if t.Resources != nil { if t.Resources.DiskMB > 0 { mErr.Errors = append(mErr.Errors, errors.New("Task can't ask for disk resources, they have to be specified at the task group level.")) @@ -2590,6 +2594,13 @@ func (d *LocalDisk) Validate() error { return nil } +// Copy copies the LocalDisk struct and returns a new one +func (d *LocalDisk) Copy() *LocalDisk { + ld := new(LocalDisk) + *ld = *d + return ld +} + // Vault stores the set of premissions a task needs access to from Vault. type Vault struct { // Policies is the set of policies that the task needs access to From c9d35e4050efa9dc9e30a3d089b1a2795cccd5fd Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 26 Aug 2016 17:28:29 -0500 Subject: [PATCH 10/13] Changing implementation of AllocsFit --- nomad/plan_apply.go | 10 +++++++++- nomad/structs/funcs.go | 6 ++++++ scheduler/context.go | 7 +++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index c094f16e8..fae7a4455 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -355,7 +355,15 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri proposed = structs.RemoveAllocs(existingAlloc, remove) proposed = append(proposed, plan.NodeAllocation[nodeID]...) + // Copy the allocs and set the job + proposedAllocs := make([]*structs.Allocation, len(proposed)) + for i, alloc := range proposed { + newAlloc := alloc.Copy() + newAlloc.Job = plan.Job + proposedAllocs[i] = newAlloc + } + // Check if these allocations fit - fit, _, _, err := structs.AllocsFit(node, proposed, nil) + fit, _, _, err := structs.AllocsFit(node, proposedAllocs, nil) return fit, err } diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 37df72c33..174b92979 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -63,6 +63,12 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st return false, "", nil, err } } else if alloc.TaskResources != nil { + + // Adding the disk resource ask for the allocation to the used + // resources + if taskGroup := alloc.Job.LookupTaskGroup(alloc.TaskGroup); taskGroup != nil { + used.DiskMB += taskGroup.LocalDisk.DiskMB + } // Allocations within the plan have the combined resources stripped // to save space, so sum up the individual task resources. for _, taskResource := range alloc.TaskResources { diff --git a/scheduler/context.go b/scheduler/context.go index 7c9c4bc25..6b3325711 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -133,6 +133,13 @@ func (e *EvalContext) ProposedAllocs(nodeID string) ([]*structs.Allocation, erro // Materialize the proposed slice proposed = make([]*structs.Allocation, 0, len(proposedIDs)) for _, alloc := range proposedIDs { + if alloc.Job == nil { + job, err := e.state.JobByID(alloc.JobID) + if err != nil { + return nil, fmt.Errorf("error retreiving job %q: %v", alloc.JobID, err) + } + alloc.Job = job + } proposed = append(proposed, alloc) } From 8e76a3d52abc8defbc49e84ddb0d80d400bdf88f Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 26 Aug 2016 20:08:03 -0700 Subject: [PATCH 11/13] Ensuring resources are re-calculated properly in fsm --- nomad/fsm.go | 6 ++++++ nomad/fsm_test.go | 1 + nomad/state/state_store.go | 6 ++---- scheduler/generic_sched.go | 1 - scheduler/rank.go | 12 +++--------- scheduler/system_sched.go | 1 - 6 files changed, 12 insertions(+), 15 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 318ad8bf3..5ca89fc45 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -397,6 +397,12 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { for _, task := range alloc.TaskResources { alloc.Resources.Add(task) } + + taskGroup := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if taskGroup == nil { + return fmt.Errorf("unable to find task group %q in job %q", alloc.TaskGroup, alloc.Job) + } + alloc.Resources.DiskMB = taskGroup.LocalDisk.DiskMB } if err := n.state.UpsertAllocs(index, req.Alloc); err != nil { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 805e365c4..d903371c2 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -644,6 +644,7 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) { alloc.AllocModifyIndex = out.AllocModifyIndex // Resources should be recomputed + resources.DiskMB = alloc.Job.TaskGroups[0].LocalDisk.DiskMB alloc.Resources = resources if !reflect.DeepEqual(alloc, out) { t.Fatalf("bad: %#v %#v", alloc, out) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 83d4f369e..04626467c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -359,7 +359,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB // from task resources - for i, tg := range job.TaskGroups { + for _, tg := range job.TaskGroups { if tg.LocalDisk != nil { continue } @@ -373,7 +373,6 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { tg.LocalDisk = &structs.LocalDisk{ DiskMB: diskMB, } - job.TaskGroups[i] = tg } // Insert the job @@ -1712,7 +1711,7 @@ func (r *StateRestore) JobRestore(job *structs.Job) error { // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB // from task resources - for i, tg := range job.TaskGroups { + for _, tg := range job.TaskGroups { if tg.LocalDisk != nil { continue } @@ -1726,7 +1725,6 @@ func (r *StateRestore) JobRestore(job *structs.Job) error { tg.LocalDisk = &structs.LocalDisk{ DiskMB: diskMB, } - job.TaskGroups[i] = tg } if err := r.txn.Insert("jobs", job); err != nil { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 613fdd313..48b1c65b4 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -452,7 +452,6 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { TaskGroup: missing.TaskGroup.Name, Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, - Resources: option.AllocResources, TaskResources: option.TaskResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, diff --git a/scheduler/rank.go b/scheduler/rank.go index ddf63312b..9834e9689 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -10,10 +10,9 @@ import ( // along with a node when iterating. This state can be modified as // various rank methods are applied. type RankedNode struct { - Node *structs.Node - Score float64 - TaskResources map[string]*structs.Resources - AllocResources *structs.Resources + Node *structs.Node + Score float64 + TaskResources map[string]*structs.Resources // Allocs is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. @@ -45,10 +44,6 @@ func (r *RankedNode) SetTaskResources(task *structs.Task, r.TaskResources[task.Name] = resource } -func (r *RankedNode) SetAllocResources(resources *structs.Resources) { - r.AllocResources = resources -} - // RankFeasibleIterator is used to iteratively yield nodes along // with ranking metadata. The iterators may manage some state for // performance optimizations. @@ -217,7 +212,6 @@ OUTER: // Accumulate the total resource requirement total.Add(taskResources) } - option.AllocResources = total // Add the resources we are trying to fit proposed = append(proposed, &structs.Allocation{Resources: total}) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index afb6da80f..df1badf36 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -317,7 +317,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, TaskResources: option.TaskResources, - Resources: option.AllocResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, } From 3e675222c75600470b927fbddf31decd4c1239bb Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 26 Aug 2016 20:38:50 -0700 Subject: [PATCH 12/13] Added LocalDisk to diff --- nomad/structs/diff.go | 6 ++ nomad/structs/diff_test.go | 145 ++++++++++++++++++++++++++++++++++++ nomad/structs/funcs_test.go | 8 ++ 3 files changed, 159 insertions(+) diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index eabc3ec4b..132215ae2 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -223,6 +223,12 @@ func (tg *TaskGroup) Diff(other *TaskGroup, contextual bool) (*TaskGroupDiff, er diff.Objects = append(diff.Objects, rDiff) } + // LocalDisk diff + diskDiff := primitiveObjectDiff(tg.LocalDisk, other.LocalDisk, nil, "LocalDisk", contextual) + if diskDiff != nil { + diff.Objects = append(diff.Objects, diskDiff) + } + // Tasks diff tasks, err := taskDiffs(tg.Tasks, other.Tasks, contextual) if err != nil { diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index eec7979b2..edd73a913 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -1276,6 +1276,151 @@ func TestTaskGroupDiff(t *testing.T) { }, }, }, + { + // LocalDisk added + Old: &TaskGroup{}, + New: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: true, + DiskMB: 100, + }, + }, + Expected: &TaskGroupDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeAdded, + Name: "LocalDisk", + Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "DiskMB", + Old: "", + New: "100", + }, + { + Type: DiffTypeAdded, + Name: "Sticky", + Old: "", + New: "true", + }, + }, + }, + }, + }, + }, + { + // LocalDisk deleted + Old: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: true, + DiskMB: 100, + }, + }, + New: &TaskGroup{}, + Expected: &TaskGroupDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeDeleted, + Name: "LocalDisk", + Fields: []*FieldDiff{ + { + Type: DiffTypeDeleted, + Name: "DiskMB", + Old: "100", + New: "", + }, + { + Type: DiffTypeDeleted, + Name: "Sticky", + Old: "true", + New: "", + }, + }, + }, + }, + }, + }, + { + // LocalDisk edited + Old: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: true, + DiskMB: 150, + }, + }, + New: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: false, + DiskMB: 90, + }, + }, + Expected: &TaskGroupDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "LocalDisk", + Fields: []*FieldDiff{ + { + Type: DiffTypeEdited, + Name: "DiskMB", + Old: "150", + New: "90", + }, + + { + Type: DiffTypeEdited, + Name: "Sticky", + Old: "true", + New: "false", + }, + }, + }, + }, + }, + }, + { + // LocalDisk edited with context + Contextual: true, + Old: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: false, + DiskMB: 100, + }, + }, + New: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: true, + DiskMB: 90, + }, + }, + Expected: &TaskGroupDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "LocalDisk", + Fields: []*FieldDiff{ + { + Type: DiffTypeEdited, + Name: "DiskMB", + Old: "100", + New: "90", + }, + + { + Type: DiffTypeEdited, + Name: "Sticky", + Old: "false", + New: "true", + }, + }, + }, + }, + }, + }, { // Tasks edited Old: &TaskGroup{ diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index be34a8308..ad5d044e0 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -61,6 +61,14 @@ func TestAllocsFit_PortsOvercommitted(t *testing.T) { } a1 := &Allocation{ + Job: &Job{ + TaskGroups: []*TaskGroup{ + { + Name: "web", + LocalDisk: DefaultLocalDisk(), + }, + }, + }, TaskResources: map[string]*Resources{ "web": &Resources{ Networks: []*NetworkResource{ From e331186ee0650dfb36cad16121b8069f541ab2af Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 29 Aug 2016 12:49:52 -0700 Subject: [PATCH 13/13] Introducing shared resources in alloc --- nomad/fsm.go | 15 ++++++++++----- nomad/fsm_test.go | 29 +++++++++++++++++++++++++++++ nomad/mock/mock.go | 6 ++++-- nomad/plan_apply.go | 10 +--------- nomad/state/state_store.go | 9 +++++++++ nomad/structs/funcs.go | 6 +++--- nomad/structs/structs.go | 5 +++++ scheduler/context.go | 7 ------- scheduler/generic_sched.go | 4 ++++ scheduler/system_sched.go | 4 ++++ 10 files changed, 69 insertions(+), 26 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 5ca89fc45..c0783e18c 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -390,6 +390,14 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { // denormalized prior to being inserted into MemDB. for _, alloc := range req.Alloc { if alloc.Resources != nil { + // COMPAT 0.4.1 -> 0.5 + // Set the shared resources for allocations which don't have them + if alloc.SharedResources == nil { + alloc.SharedResources = &structs.Resources{ + DiskMB: alloc.Resources.DiskMB, + } + } + continue } @@ -398,11 +406,8 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { alloc.Resources.Add(task) } - taskGroup := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if taskGroup == nil { - return fmt.Errorf("unable to find task group %q in job %q", alloc.TaskGroup, alloc.Job) - } - alloc.Resources.DiskMB = taskGroup.LocalDisk.DiskMB + // Add the shared resources + alloc.Resources.Add(alloc.SharedResources) } if err := n.state.UpsertAllocs(index, req.Alloc); err != nil { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index d903371c2..1c3bd230d 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -934,6 +934,35 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) { } } +func TestFSM_SnapshotRestore_Allocs_NoSharedResources(t *testing.T) { + // Add some state + fsm := testFSM(t) + state := fsm.State() + alloc1 := mock.Alloc() + alloc2 := mock.Alloc() + alloc1.SharedResources = nil + alloc2.SharedResources = nil + state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)) + state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) + state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) + state.UpsertAllocs(1001, []*structs.Allocation{alloc2}) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + out1, _ := state2.AllocByID(alloc1.ID) + out2, _ := state2.AllocByID(alloc2.ID) + alloc1.SharedResources = &structs.Resources{DiskMB: 150} + alloc2.SharedResources = &structs.Resources{DiskMB: 150} + + if !reflect.DeepEqual(alloc1, out1) { + t.Fatalf("bad: \n%#v\n%#v", out1, alloc1) + } + if !reflect.DeepEqual(alloc2, out2) { + t.Fatalf("bad: \n%#v\n%#v", out2, alloc2) + } +} + func TestFSM_SnapshotRestore_Indexes(t *testing.T) { // Add some state fsm := testFSM(t) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 11f6b74ef..4d3c129f2 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -258,7 +258,7 @@ func Alloc() *structs.Allocation { Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, - DiskMB: 10, + DiskMB: 150, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ Device: "eth0", @@ -273,7 +273,6 @@ func Alloc() *structs.Allocation { "web": &structs.Resources{ CPU: 500, MemoryMB: 256, - DiskMB: 10, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ Device: "eth0", @@ -285,6 +284,9 @@ func Alloc() *structs.Allocation { }, }, }, + SharedResources: &structs.Resources{ + DiskMB: 150, + }, Job: Job(), DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index fae7a4455..c094f16e8 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -355,15 +355,7 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri proposed = structs.RemoveAllocs(existingAlloc, remove) proposed = append(proposed, plan.NodeAllocation[nodeID]...) - // Copy the allocs and set the job - proposedAllocs := make([]*structs.Allocation, len(proposed)) - for i, alloc := range proposed { - newAlloc := alloc.Copy() - newAlloc.Job = plan.Job - proposedAllocs[i] = newAlloc - } - // Check if these allocations fit - fit, _, _, err := structs.AllocsFit(node, proposedAllocs, nil) + fit, _, _, err := structs.AllocsFit(node, proposed, nil) return fit, err } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 04626467c..a5a09d3ae 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1750,6 +1750,15 @@ func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error { r.items.Add(watch.Item{AllocEval: alloc.EvalID}) r.items.Add(watch.Item{AllocJob: alloc.JobID}) r.items.Add(watch.Item{AllocNode: alloc.NodeID}) + + //COMPAT 0.4.1 -> 0.5 + // Set the shared resources if it's not present + if alloc.SharedResources == nil { + alloc.SharedResources = &structs.Resources{ + DiskMB: alloc.Resources.DiskMB, + } + } + if err := r.txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 174b92979..08da1bd85 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -64,10 +64,10 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st } } else if alloc.TaskResources != nil { - // Adding the disk resource ask for the allocation to the used + // Adding the shared resource asks for the allocation to the used // resources - if taskGroup := alloc.Job.LookupTaskGroup(alloc.TaskGroup); taskGroup != nil { - used.DiskMB += taskGroup.LocalDisk.DiskMB + if err := used.Add(alloc.SharedResources); err != nil { + return false, "", nil, err } // Allocations within the plan have the combined resources stripped // to save space, so sum up the individual task resources. diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b448cc419..791bdcedd 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2672,6 +2672,10 @@ type Allocation struct { // of this allocation of the task group. Resources *Resources + // SharedResources are the resources that are shared by all the tasks in an + // allocation + SharedResources *Resources + // TaskResources is the set of resources allocated to each // task. These should sum to the total Resources. TaskResources map[string]*Resources @@ -2719,6 +2723,7 @@ func (a *Allocation) Copy() *Allocation { na.Job = na.Job.Copy() na.Resources = na.Resources.Copy() + na.SharedResources = na.SharedResources.Copy() if a.TaskResources != nil { tr := make(map[string]*Resources, len(na.TaskResources)) diff --git a/scheduler/context.go b/scheduler/context.go index 6b3325711..7c9c4bc25 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -133,13 +133,6 @@ func (e *EvalContext) ProposedAllocs(nodeID string) ([]*structs.Allocation, erro // Materialize the proposed slice proposed = make([]*structs.Allocation, 0, len(proposedIDs)) for _, alloc := range proposedIDs { - if alloc.Job == nil { - job, err := e.state.JobByID(alloc.JobID) - if err != nil { - return nil, fmt.Errorf("error retreiving job %q: %v", alloc.JobID, err) - } - alloc.Job = job - } proposed = append(proposed, alloc) } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 48b1c65b4..40590c2bb 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -455,6 +455,10 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { TaskResources: option.TaskResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + + SharedResources: &structs.Resources{ + DiskMB: missing.TaskGroup.LocalDisk.DiskMB, + }, } // If the new allocation is replacing an older allocation then we diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index df1badf36..098560e35 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -319,6 +319,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { TaskResources: option.TaskResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + + SharedResources: &structs.Resources{ + DiskMB: missing.TaskGroup.LocalDisk.DiskMB, + }, } // If the new allocation is replacing an older allocation then we