Ensuring resources are re-calculated properly in fsm

This commit is contained in:
Diptanu Choudhury
2016-08-26 20:08:03 -07:00
parent c9d35e4050
commit 8e76a3d52a
6 changed files with 12 additions and 15 deletions

View File

@@ -397,6 +397,12 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}
taskGroup := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if taskGroup == nil {
return fmt.Errorf("unable to find task group %q in job %q", alloc.TaskGroup, alloc.Job)
}
alloc.Resources.DiskMB = taskGroup.LocalDisk.DiskMB
}
if err := n.state.UpsertAllocs(index, req.Alloc); err != nil {

View File

@@ -644,6 +644,7 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
alloc.AllocModifyIndex = out.AllocModifyIndex
// Resources should be recomputed
resources.DiskMB = alloc.Job.TaskGroups[0].LocalDisk.DiskMB
alloc.Resources = resources
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)

View File

@@ -359,7 +359,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
// COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB
// from task resources
for i, tg := range job.TaskGroups {
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
@@ -373,7 +373,6 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
job.TaskGroups[i] = tg
}
// Insert the job
@@ -1712,7 +1711,7 @@ func (r *StateRestore) JobRestore(job *structs.Job) error {
// COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB
// from task resources
for i, tg := range job.TaskGroups {
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
@@ -1726,7 +1725,6 @@ func (r *StateRestore) JobRestore(job *structs.Job) error {
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
job.TaskGroups[i] = tg
}
if err := r.txn.Insert("jobs", job); err != nil {

View File

@@ -452,7 +452,6 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
TaskGroup: missing.TaskGroup.Name,
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
Resources: option.AllocResources,
TaskResources: option.TaskResources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,

View File

@@ -10,10 +10,9 @@ import (
// along with a node when iterating. This state can be modified as
// various rank methods are applied.
type RankedNode struct {
Node *structs.Node
Score float64
TaskResources map[string]*structs.Resources
AllocResources *structs.Resources
Node *structs.Node
Score float64
TaskResources map[string]*structs.Resources
// Allocs is used to cache the proposed allocations on the
// node. This can be shared between iterators that require it.
@@ -45,10 +44,6 @@ func (r *RankedNode) SetTaskResources(task *structs.Task,
r.TaskResources[task.Name] = resource
}
func (r *RankedNode) SetAllocResources(resources *structs.Resources) {
r.AllocResources = resources
}
// RankFeasibleIterator is used to iteratively yield nodes along
// with ranking metadata. The iterators may manage some state for
// performance optimizations.
@@ -217,7 +212,6 @@ OUTER:
// Accumulate the total resource requirement
total.Add(taskResources)
}
option.AllocResources = total
// Add the resources we are trying to fit
proposed = append(proposed, &structs.Allocation{Resources: total})

View File

@@ -317,7 +317,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
TaskResources: option.TaskResources,
Resources: option.AllocResources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}