diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index a750a8fa2..3fa78d14e 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -248,27 +248,27 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { // 1) Preempted allocations in plan results are updated // 2) Evals are inserted for preempted jobs func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { + require := require.New(t) + state := testStateStore(t) alloc := mock.Alloc() job := alloc.Job alloc.Job = nil - require := require.New(t) - + // Insert job err := state.UpsertJob(999, job) - require.Nil(err) - - eval := mock.Eval() - eval.JobID = job.ID + require.NoError(err) // Create an eval + eval := mock.Eval() + eval.JobID = job.ID err = state.UpsertEvals(1, []*structs.Evaluation{eval}) - require.Nil(err) + require.NoError(err) - // Insert alloc that'll be preempted in the plan + // Insert alloc that will be preempted in the plan preemptedAlloc := mock.Alloc() err = state.UpsertAllocs(2, []*structs.Allocation{preemptedAlloc}) - require.Nil(err) + require.NoError(err) minimalPreemptedAlloc := &structs.Allocation{ ID: preemptedAlloc.ID, @@ -277,6 +277,7 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { DesiredDescription: fmt.Sprintf("Preempted by allocation %v", alloc.ID), } + // Create eval for preempted job eval2 := mock.Eval() eval2.JobID = preemptedAlloc.JobID @@ -292,32 +293,33 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { } err = state.UpsertPlanResults(1000, &res) - require.Nil(err) + require.NoError(err) ws := memdb.NewWatchSet() // Verify alloc and eval created by plan out, err := state.AllocByID(ws, alloc.ID) - require.Nil(err) + require.NoError(err) require.Equal(alloc, out) index, err := state.Index("allocs") - require.Nil(err) + require.NoError(err) require.EqualValues(1000, index) evalOut, err := state.EvalByID(ws, eval.ID) - require.Nil(err) + require.NoError(err) require.NotNil(evalOut) require.EqualValues(1000, evalOut.ModifyIndex) - // Verify preempted alloc and eval for preempted job + // Verify preempted alloc preempted, err := state.AllocByID(ws, preemptedAlloc.ID) - require.Nil(err) + require.NoError(err) require.Equal(preempted.DesiredStatus, structs.AllocDesiredStatusEvict) require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by allocation %v", alloc.ID)) + // Verify eval for preempted job preemptedJobEval, err := state.EvalByID(ws, eval2.ID) - require.Nil(err) + require.NoError(err) require.NotNil(preemptedJobEval) require.EqualValues(1000, preemptedJobEval.ModifyIndex) diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index f6143a053..97809726e 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -120,6 +120,7 @@ type AutopilotConfig struct { ModifyIndex uint64 } +// SchedulerConfiguration is the config for controlling scheduler behavior type SchedulerConfiguration struct { // PreemptionConfig specifies whether to enable eviction of lower // priority jobs to place higher priority jobs. diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 0040a6802..4e7269410 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1885,27 +1885,6 @@ func (r *Resources) Add(delta *Resources) error { return nil } -// Subtract removes the resources of the delta to this, potentially -// returning an error if not possible. -func (r *Resources) Subtract(delta *Resources) error { - if delta == nil { - return nil - } - r.CPU -= delta.CPU - r.MemoryMB -= delta.MemoryMB - r.DiskMB -= delta.DiskMB - r.IOPS -= delta.IOPS - - for _, n := range delta.Networks { - // Find the matching interface by IP or CIDR - idx := r.NetIndex(n) - if idx != -1 { - r.Networks[idx].MBits -= delta.Networks[idx].MBits - } - } - return nil -} - func (r *Resources) GoString() string { return fmt.Sprintf("*%#v", *r) } @@ -8224,10 +8203,15 @@ func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preempting // TaskResources are needed by the plan applier to check if allocations fit // after removing preempted allocations - newAlloc.TaskResources = alloc.TaskResources + if alloc.AllocatedResources != nil { + newAlloc.AllocatedResources = alloc.AllocatedResources + } else { + newAlloc.TaskResources = alloc.TaskResources + newAlloc.SharedResources = alloc.SharedResources + } - node := alloc.NodeID // Append this alloc to slice for this node + node := alloc.NodeID existing := p.NodePreemptions[node] p.NodePreemptions[node] = append(existing, newAlloc) } @@ -8263,14 +8247,6 @@ func (p *Plan) IsNoOp() bool { len(p.DeploymentUpdates) == 0 } -// PreemptedAllocs is used to store information about a set of allocations -// for the same job that get preempted as part of placing allocations for the -// job in the plan. - -// Preempted allocs represents a map from jobid to allocations -// to be preempted -type PreemptedAllocs map[*NamespacedID][]*Allocation - // PlanResult is the result of a plan submitted to the leader. type PlanResult struct { // NodeUpdate contains all the updates that were committed. @@ -8341,7 +8317,7 @@ type DesiredUpdates struct { InPlaceUpdate uint64 DestructiveUpdate uint64 Canary uint64 - Evict uint64 + Preemptions uint64 } func (d *DesiredUpdates) GoString() string { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index dd1c25342..a2338300e 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1869,56 +1869,73 @@ func TestResource_Add_Network(t *testing.T) { } } -func TestResource_Subtract(t *testing.T) { - r1 := &Resources{ - CPU: 2000, - MemoryMB: 2048, - DiskMB: 10000, - IOPS: 100, - Networks: []*NetworkResource{ - { - CIDR: "10.0.0.0/8", - MBits: 100, - ReservedPorts: []Port{{"ssh", 22}}, +func TestComparableResources_Subtract(t *testing.T) { + r1 := &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 2000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 2048, + }, + Networks: []*NetworkResource{ + { + CIDR: "10.0.0.0/8", + MBits: 100, + ReservedPorts: []Port{{"ssh", 22}}, + }, }, }, - } - r2 := &Resources{ - CPU: 1000, - MemoryMB: 1024, - DiskMB: 5000, - IOPS: 50, - Networks: []*NetworkResource{ - { - IP: "10.0.0.1", - MBits: 20, - ReservedPorts: []Port{{"web", 80}}, - }, + Shared: AllocatedSharedResources{ + DiskMB: 10000, }, } - err := r1.Subtract(r2) - if err != nil { - t.Fatalf("Err: %v", err) - } - - expect := &Resources{ - CPU: 1000, - MemoryMB: 1024, - DiskMB: 5000, - IOPS: 50, - Networks: []*NetworkResource{ - { - CIDR: "10.0.0.0/8", - MBits: 80, - ReservedPorts: []Port{{"ssh", 22}}, + r2 := &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 1000, }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Networks: []*NetworkResource{ + { + CIDR: "10.0.0.0/8", + MBits: 20, + ReservedPorts: []Port{{"ssh", 22}}, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 5000, + }, + } + r1.Subtract(r2) + + expect := &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Networks: []*NetworkResource{ + { + CIDR: "10.0.0.0/8", + MBits: 80, + ReservedPorts: []Port{{"ssh", 22}}, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 5000, }, } - if !reflect.DeepEqual(expect.Networks, r1.Networks) { - t.Fatalf("bad: %#v %#v", expect, r1) - } + require := require.New(t) + require.Equal(expect, r1) } func TestEncodeDecode(t *testing.T) { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 541678fd5..93f982ede 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -130,8 +130,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs, structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc, - structs.EvalTriggerFailedFollowUp, - structs.EvalTriggerPreemption: + structs.EvalTriggerFailedFollowUp, structs.EvalTriggerPreemption: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) diff --git a/scheduler/rank.go b/scheduler/rank.go index 7e8ba45e5..5286fe302 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -234,17 +234,17 @@ OUTER: } // Look for preemptible allocations to satisfy the network resource for this task - preemptedAllocsForTaskNetwork := preemptForNetworkResourceAsk(iter.priority, proposed, ask, netIdx, currentPreemptions) - if preemptedAllocsForTaskNetwork == nil { + netPreemptions := preemptForNetworkResourceAsk(iter.priority, proposed, ask, netIdx, currentPreemptions) + if netPreemptions == nil { iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("unable to meet network resource %v after preemption", ask)) netIdx.Release() continue OUTER } - allocsToPreempt = append(allocsToPreempt, preemptedAllocsForTaskNetwork...) + allocsToPreempt = append(allocsToPreempt, netPreemptions...) // First subtract out preempted allocations - proposed = structs.RemoveAllocs(proposed, preemptedAllocsForTaskNetwork) + proposed = structs.RemoveAllocs(proposed, netPreemptions) // Reset the network index and try the offer again netIdx.Release() @@ -254,8 +254,7 @@ OUTER: offer, err = netIdx.AssignNetwork(ask) if offer == nil { - iter.ctx.Metrics().ExhaustedNode(option.Node, - fmt.Sprintf("unexecpted error, unable to create offer after preempting:%v", err)) + iter.ctx.Logger().Error(fmt.Sprintf("unexpected error, unable to create offer after preempting:%v", err)) netIdx.Release() continue OUTER } @@ -275,6 +274,9 @@ OUTER: total.Tasks[task.Name] = taskResources } + // Store current set of running allocs before adding resources for the task group + current := proposed + // Add the resources we are trying to fit proposed = append(proposed, &structs.Allocation{AllocatedResources: total}) @@ -290,10 +292,9 @@ OUTER: // If eviction is enabled and the node doesn't fit the alloc, check if // any allocs can be preempted - // Remove the last element containing the current placement from proposed allocs - current := proposed[:len(proposed)-1] preemptForTaskGroup := findPreemptibleAllocationsForTaskGroup(iter.priority, current, total, option.Node, currentPreemptions) allocsToPreempt = append(allocsToPreempt, preemptForTaskGroup...) + // If we were unable to find preempted allocs to meet these requirements // mark as exhausted and continue if len(preemptForTaskGroup) == 0 { @@ -304,6 +305,7 @@ OUTER: if len(allocsToPreempt) > 0 { option.PreemptedAllocs = allocsToPreempt } + // Score the fit normally otherwise fitness := structs.ScoreFit(option.Node, util) normalizedFit := fitness / binPackingMaxFitScore diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 912440aef..e3f4015fb 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -347,7 +347,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { alloc.PreviousAllocation = missing.Alloc.ID } - // If this placement involves preemption, set DesiredState to stop for those allocations + // If this placement involves preemption, set DesiredState to evict for those allocations if option.PreemptedAllocs != nil { var preemptedAllocIDs []string for _, stop := range option.PreemptedAllocs { @@ -358,7 +358,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub()) if s.plan.Annotations.DesiredTGUpdates != nil { desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name] - desired.Evict += 1 + desired.Preemptions += 1 } } } diff --git a/scheduler/testing.go b/scheduler/testing.go index 0a527ede9..d701436c5 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -121,9 +121,6 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er var preemptedAllocs []*structs.Allocation for _, preemptions := range result.NodePreemptions { for _, alloc := range preemptions { - if alloc.CreateTime == 0 { - alloc.CreateTime = now - } alloc.ModifyTime = now preemptedAllocs = append(preemptedAllocs, alloc) }