From 8e76a3d52abc8defbc49e84ddb0d80d400bdf88f Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 26 Aug 2016 20:08:03 -0700 Subject: [PATCH] Ensuring resources are re-calculated properly in fsm --- nomad/fsm.go | 6 ++++++ nomad/fsm_test.go | 1 + nomad/state/state_store.go | 6 ++---- scheduler/generic_sched.go | 1 - scheduler/rank.go | 12 +++--------- scheduler/system_sched.go | 1 - 6 files changed, 12 insertions(+), 15 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 318ad8bf3..5ca89fc45 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -397,6 +397,12 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { for _, task := range alloc.TaskResources { alloc.Resources.Add(task) } + + taskGroup := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if taskGroup == nil { + return fmt.Errorf("unable to find task group %q in job %q", alloc.TaskGroup, alloc.Job) + } + alloc.Resources.DiskMB = taskGroup.LocalDisk.DiskMB } if err := n.state.UpsertAllocs(index, req.Alloc); err != nil { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 805e365c4..d903371c2 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -644,6 +644,7 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) { alloc.AllocModifyIndex = out.AllocModifyIndex // Resources should be recomputed + resources.DiskMB = alloc.Job.TaskGroups[0].LocalDisk.DiskMB alloc.Resources = resources if !reflect.DeepEqual(alloc, out) { t.Fatalf("bad: %#v %#v", alloc, out) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 83d4f369e..04626467c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -359,7 +359,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB // from task resources - for i, tg := range job.TaskGroups { + for _, tg := range job.TaskGroups { if tg.LocalDisk != nil { continue } @@ -373,7 +373,6 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { tg.LocalDisk = &structs.LocalDisk{ DiskMB: diskMB, } - job.TaskGroups[i] = tg } // Insert the job @@ -1712,7 +1711,7 @@ func (r *StateRestore) JobRestore(job *structs.Job) error { // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB // from task resources - for i, tg := range job.TaskGroups { + for _, tg := range job.TaskGroups { if tg.LocalDisk != nil { continue } @@ -1726,7 +1725,6 @@ func (r *StateRestore) JobRestore(job *structs.Job) error { tg.LocalDisk = &structs.LocalDisk{ DiskMB: diskMB, } - job.TaskGroups[i] = tg } if err := r.txn.Insert("jobs", job); err != nil { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 613fdd313..48b1c65b4 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -452,7 +452,6 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { TaskGroup: missing.TaskGroup.Name, Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, - Resources: option.AllocResources, TaskResources: option.TaskResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, diff --git a/scheduler/rank.go b/scheduler/rank.go index ddf63312b..9834e9689 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -10,10 +10,9 @@ import ( // along with a node when iterating. This state can be modified as // various rank methods are applied. type RankedNode struct { - Node *structs.Node - Score float64 - TaskResources map[string]*structs.Resources - AllocResources *structs.Resources + Node *structs.Node + Score float64 + TaskResources map[string]*structs.Resources // Allocs is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. @@ -45,10 +44,6 @@ func (r *RankedNode) SetTaskResources(task *structs.Task, r.TaskResources[task.Name] = resource } -func (r *RankedNode) SetAllocResources(resources *structs.Resources) { - r.AllocResources = resources -} - // RankFeasibleIterator is used to iteratively yield nodes along // with ranking metadata. The iterators may manage some state for // performance optimizations. @@ -217,7 +212,6 @@ OUTER: // Accumulate the total resource requirement total.Add(taskResources) } - option.AllocResources = total // Add the resources we are trying to fit proposed = append(proposed, &structs.Allocation{Resources: total}) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index afb6da80f..df1badf36 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -317,7 +317,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, TaskResources: option.TaskResources, - Resources: option.AllocResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, }