More style and readablity fixes from review

This commit is contained in:
Preetha Appan
2018-10-17 23:49:37 -05:00
parent 191b8626d2
commit 21432d69dc
8 changed files with 98 additions and 104 deletions

View File

@@ -248,27 +248,27 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) {
// 1) Preempted allocations in plan results are updated
// 2) Evals are inserted for preempted jobs
func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) {
require := require.New(t)
state := testStateStore(t)
alloc := mock.Alloc()
job := alloc.Job
alloc.Job = nil
require := require.New(t)
// Insert job
err := state.UpsertJob(999, job)
require.Nil(err)
eval := mock.Eval()
eval.JobID = job.ID
require.NoError(err)
// Create an eval
eval := mock.Eval()
eval.JobID = job.ID
err = state.UpsertEvals(1, []*structs.Evaluation{eval})
require.Nil(err)
require.NoError(err)
// Insert alloc that'll be preempted in the plan
// Insert alloc that will be preempted in the plan
preemptedAlloc := mock.Alloc()
err = state.UpsertAllocs(2, []*structs.Allocation{preemptedAlloc})
require.Nil(err)
require.NoError(err)
minimalPreemptedAlloc := &structs.Allocation{
ID: preemptedAlloc.ID,
@@ -277,6 +277,7 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) {
DesiredDescription: fmt.Sprintf("Preempted by allocation %v", alloc.ID),
}
// Create eval for preempted job
eval2 := mock.Eval()
eval2.JobID = preemptedAlloc.JobID
@@ -292,32 +293,33 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) {
}
err = state.UpsertPlanResults(1000, &res)
require.Nil(err)
require.NoError(err)
ws := memdb.NewWatchSet()
// Verify alloc and eval created by plan
out, err := state.AllocByID(ws, alloc.ID)
require.Nil(err)
require.NoError(err)
require.Equal(alloc, out)
index, err := state.Index("allocs")
require.Nil(err)
require.NoError(err)
require.EqualValues(1000, index)
evalOut, err := state.EvalByID(ws, eval.ID)
require.Nil(err)
require.NoError(err)
require.NotNil(evalOut)
require.EqualValues(1000, evalOut.ModifyIndex)
// Verify preempted alloc and eval for preempted job
// Verify preempted alloc
preempted, err := state.AllocByID(ws, preemptedAlloc.ID)
require.Nil(err)
require.NoError(err)
require.Equal(preempted.DesiredStatus, structs.AllocDesiredStatusEvict)
require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by allocation %v", alloc.ID))
// Verify eval for preempted job
preemptedJobEval, err := state.EvalByID(ws, eval2.ID)
require.Nil(err)
require.NoError(err)
require.NotNil(preemptedJobEval)
require.EqualValues(1000, preemptedJobEval.ModifyIndex)

View File

@@ -120,6 +120,7 @@ type AutopilotConfig struct {
ModifyIndex uint64
}
// SchedulerConfiguration is the config for controlling scheduler behavior
type SchedulerConfiguration struct {
// PreemptionConfig specifies whether to enable eviction of lower
// priority jobs to place higher priority jobs.

View File

@@ -1885,27 +1885,6 @@ func (r *Resources) Add(delta *Resources) error {
return nil
}
// Subtract removes the resources of the delta to this, potentially
// returning an error if not possible.
func (r *Resources) Subtract(delta *Resources) error {
if delta == nil {
return nil
}
r.CPU -= delta.CPU
r.MemoryMB -= delta.MemoryMB
r.DiskMB -= delta.DiskMB
r.IOPS -= delta.IOPS
for _, n := range delta.Networks {
// Find the matching interface by IP or CIDR
idx := r.NetIndex(n)
if idx != -1 {
r.Networks[idx].MBits -= delta.Networks[idx].MBits
}
}
return nil
}
func (r *Resources) GoString() string {
return fmt.Sprintf("*%#v", *r)
}
@@ -8224,10 +8203,15 @@ func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preempting
// TaskResources are needed by the plan applier to check if allocations fit
// after removing preempted allocations
newAlloc.TaskResources = alloc.TaskResources
if alloc.AllocatedResources != nil {
newAlloc.AllocatedResources = alloc.AllocatedResources
} else {
newAlloc.TaskResources = alloc.TaskResources
newAlloc.SharedResources = alloc.SharedResources
}
node := alloc.NodeID
// Append this alloc to slice for this node
node := alloc.NodeID
existing := p.NodePreemptions[node]
p.NodePreemptions[node] = append(existing, newAlloc)
}
@@ -8263,14 +8247,6 @@ func (p *Plan) IsNoOp() bool {
len(p.DeploymentUpdates) == 0
}
// PreemptedAllocs is used to store information about a set of allocations
// for the same job that get preempted as part of placing allocations for the
// job in the plan.
// Preempted allocs represents a map from jobid to allocations
// to be preempted
type PreemptedAllocs map[*NamespacedID][]*Allocation
// PlanResult is the result of a plan submitted to the leader.
type PlanResult struct {
// NodeUpdate contains all the updates that were committed.
@@ -8341,7 +8317,7 @@ type DesiredUpdates struct {
InPlaceUpdate uint64
DestructiveUpdate uint64
Canary uint64
Evict uint64
Preemptions uint64
}
func (d *DesiredUpdates) GoString() string {

View File

@@ -1869,56 +1869,73 @@ func TestResource_Add_Network(t *testing.T) {
}
}
func TestResource_Subtract(t *testing.T) {
r1 := &Resources{
CPU: 2000,
MemoryMB: 2048,
DiskMB: 10000,
IOPS: 100,
Networks: []*NetworkResource{
{
CIDR: "10.0.0.0/8",
MBits: 100,
ReservedPorts: []Port{{"ssh", 22}},
func TestComparableResources_Subtract(t *testing.T) {
r1 := &ComparableResources{
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: 2000,
},
Memory: AllocatedMemoryResources{
MemoryMB: 2048,
},
Networks: []*NetworkResource{
{
CIDR: "10.0.0.0/8",
MBits: 100,
ReservedPorts: []Port{{"ssh", 22}},
},
},
},
}
r2 := &Resources{
CPU: 1000,
MemoryMB: 1024,
DiskMB: 5000,
IOPS: 50,
Networks: []*NetworkResource{
{
IP: "10.0.0.1",
MBits: 20,
ReservedPorts: []Port{{"web", 80}},
},
Shared: AllocatedSharedResources{
DiskMB: 10000,
},
}
err := r1.Subtract(r2)
if err != nil {
t.Fatalf("Err: %v", err)
}
expect := &Resources{
CPU: 1000,
MemoryMB: 1024,
DiskMB: 5000,
IOPS: 50,
Networks: []*NetworkResource{
{
CIDR: "10.0.0.0/8",
MBits: 80,
ReservedPorts: []Port{{"ssh", 22}},
r2 := &ComparableResources{
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
Networks: []*NetworkResource{
{
CIDR: "10.0.0.0/8",
MBits: 20,
ReservedPorts: []Port{{"ssh", 22}},
},
},
},
Shared: AllocatedSharedResources{
DiskMB: 5000,
},
}
r1.Subtract(r2)
expect := &ComparableResources{
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
Networks: []*NetworkResource{
{
CIDR: "10.0.0.0/8",
MBits: 80,
ReservedPorts: []Port{{"ssh", 22}},
},
},
},
Shared: AllocatedSharedResources{
DiskMB: 5000,
},
}
if !reflect.DeepEqual(expect.Networks, r1.Networks) {
t.Fatalf("bad: %#v %#v", expect, r1)
}
require := require.New(t)
require.Equal(expect, r1)
}
func TestEncodeDecode(t *testing.T) {

View File

@@ -130,8 +130,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs,
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc,
structs.EvalTriggerFailedFollowUp,
structs.EvalTriggerPreemption:
structs.EvalTriggerFailedFollowUp, structs.EvalTriggerPreemption:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)

View File

@@ -234,17 +234,17 @@ OUTER:
}
// Look for preemptible allocations to satisfy the network resource for this task
preemptedAllocsForTaskNetwork := preemptForNetworkResourceAsk(iter.priority, proposed, ask, netIdx, currentPreemptions)
if preemptedAllocsForTaskNetwork == nil {
netPreemptions := preemptForNetworkResourceAsk(iter.priority, proposed, ask, netIdx, currentPreemptions)
if netPreemptions == nil {
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("unable to meet network resource %v after preemption", ask))
netIdx.Release()
continue OUTER
}
allocsToPreempt = append(allocsToPreempt, preemptedAllocsForTaskNetwork...)
allocsToPreempt = append(allocsToPreempt, netPreemptions...)
// First subtract out preempted allocations
proposed = structs.RemoveAllocs(proposed, preemptedAllocsForTaskNetwork)
proposed = structs.RemoveAllocs(proposed, netPreemptions)
// Reset the network index and try the offer again
netIdx.Release()
@@ -254,8 +254,7 @@ OUTER:
offer, err = netIdx.AssignNetwork(ask)
if offer == nil {
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("unexecpted error, unable to create offer after preempting:%v", err))
iter.ctx.Logger().Error(fmt.Sprintf("unexpected error, unable to create offer after preempting:%v", err))
netIdx.Release()
continue OUTER
}
@@ -275,6 +274,9 @@ OUTER:
total.Tasks[task.Name] = taskResources
}
// Store current set of running allocs before adding resources for the task group
current := proposed
// Add the resources we are trying to fit
proposed = append(proposed, &structs.Allocation{AllocatedResources: total})
@@ -290,10 +292,9 @@ OUTER:
// If eviction is enabled and the node doesn't fit the alloc, check if
// any allocs can be preempted
// Remove the last element containing the current placement from proposed allocs
current := proposed[:len(proposed)-1]
preemptForTaskGroup := findPreemptibleAllocationsForTaskGroup(iter.priority, current, total, option.Node, currentPreemptions)
allocsToPreempt = append(allocsToPreempt, preemptForTaskGroup...)
// If we were unable to find preempted allocs to meet these requirements
// mark as exhausted and continue
if len(preemptForTaskGroup) == 0 {
@@ -304,6 +305,7 @@ OUTER:
if len(allocsToPreempt) > 0 {
option.PreemptedAllocs = allocsToPreempt
}
// Score the fit normally otherwise
fitness := structs.ScoreFit(option.Node, util)
normalizedFit := fitness / binPackingMaxFitScore

View File

@@ -347,7 +347,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
alloc.PreviousAllocation = missing.Alloc.ID
}
// If this placement involves preemption, set DesiredState to stop for those allocations
// If this placement involves preemption, set DesiredState to evict for those allocations
if option.PreemptedAllocs != nil {
var preemptedAllocIDs []string
for _, stop := range option.PreemptedAllocs {
@@ -358,7 +358,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub())
if s.plan.Annotations.DesiredTGUpdates != nil {
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name]
desired.Evict += 1
desired.Preemptions += 1
}
}
}

View File

@@ -121,9 +121,6 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
var preemptedAllocs []*structs.Allocation
for _, preemptions := range result.NodePreemptions {
for _, alloc := range preemptions {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
alloc.ModifyTime = now
preemptedAllocs = append(preemptedAllocs, alloc)
}