mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
Fix filtering issue and add a test that would catch it
This commit is contained in:
@@ -267,6 +267,9 @@ func NewDistinctPropertyIterator(ctx Context, source FeasibleIterator) *Distinct
|
||||
|
||||
func (iter *DistinctPropertyIterator) SetTaskGroup(tg *structs.TaskGroup) {
|
||||
iter.tg = tg
|
||||
|
||||
// Check if there is a distinct property
|
||||
iter.hasDistinctPropertyConstraints = len(iter.jobPropertySets) != 0 || len(iter.groupPropertySets[tg.Name]) != 0
|
||||
}
|
||||
|
||||
func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) {
|
||||
@@ -278,7 +281,6 @@ func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) {
|
||||
continue
|
||||
}
|
||||
|
||||
iter.hasDistinctPropertyConstraints = true
|
||||
pset := NewPropertySet(iter.ctx, job)
|
||||
pset.SetJobConstraint(c)
|
||||
iter.jobPropertySets = append(iter.jobPropertySets, pset)
|
||||
@@ -290,7 +292,6 @@ func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) {
|
||||
continue
|
||||
}
|
||||
|
||||
iter.hasDistinctPropertyConstraints = true
|
||||
pset := NewPropertySet(iter.ctx, job)
|
||||
pset.SetTGConstraint(c, tg.Name)
|
||||
iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset)
|
||||
|
||||
@@ -399,6 +399,83 @@ func TestServiceSched_JobRegister_DistinctProperty(t *testing.T) {
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
}
|
||||
|
||||
func TestServiceSched_JobRegister_DistinctProperty_TaskGroup(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
// Create some nodes
|
||||
for i := 0; i < 2; i++ {
|
||||
node := mock.Node()
|
||||
node.Meta["ssd"] = "true"
|
||||
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
|
||||
}
|
||||
|
||||
// Create a job that uses distinct property and has count higher than what is
|
||||
// possible.
|
||||
job := mock.Job()
|
||||
job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy())
|
||||
job.TaskGroups[0].Count = 1
|
||||
job.TaskGroups[0].Constraints = append(job.TaskGroups[0].Constraints,
|
||||
&structs.Constraint{
|
||||
Operand: structs.ConstraintDistinctProperty,
|
||||
LTarget: "${meta.ssd}",
|
||||
})
|
||||
|
||||
job.TaskGroups[1].Name = "tg2"
|
||||
job.TaskGroups[1].Count = 1
|
||||
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
|
||||
|
||||
// Create a mock evaluation to register the job
|
||||
eval := &structs.Evaluation{
|
||||
ID: structs.GenerateUUID(),
|
||||
Priority: job.Priority,
|
||||
TriggeredBy: structs.EvalTriggerJobRegister,
|
||||
JobID: job.ID,
|
||||
}
|
||||
|
||||
// Process the evaluation
|
||||
err := h.Process(NewServiceScheduler, eval)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure a single plan
|
||||
if len(h.Plans) != 1 {
|
||||
t.Fatalf("bad: %#v", h.Plans)
|
||||
}
|
||||
plan := h.Plans[0]
|
||||
|
||||
// Ensure the plan doesn't have annotations.
|
||||
if plan.Annotations != nil {
|
||||
t.Fatalf("expected no annotations")
|
||||
}
|
||||
|
||||
// Ensure the eval hasn't spawned blocked eval
|
||||
if len(h.CreateEvals) != 0 {
|
||||
t.Fatalf("bad: %#v", h.CreateEvals[0])
|
||||
}
|
||||
|
||||
// Ensure the plan allocated
|
||||
var planned []*structs.Allocation
|
||||
for _, allocList := range plan.NodeAllocation {
|
||||
planned = append(planned, allocList...)
|
||||
}
|
||||
if len(planned) != 2 {
|
||||
t.Fatalf("bad: %#v", plan)
|
||||
}
|
||||
|
||||
// Lookup the allocations by JobID
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := h.State.AllocsByJob(ws, job.ID, false)
|
||||
noErr(t, err)
|
||||
|
||||
// Ensure all allocations placed
|
||||
if len(out) != 2 {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
}
|
||||
|
||||
func TestServiceSched_JobRegister_Annotate(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
|
||||
@@ -12,8 +12,12 @@ type propertySet struct {
|
||||
// ctx is used to lookup the plan and state
|
||||
ctx Context
|
||||
|
||||
// jobID is the job we are operating on
|
||||
jobID string
|
||||
|
||||
// taskGroup is optionally set if the constraint is for a task group
|
||||
taskGroup string
|
||||
|
||||
// constraint is the constraint this property set is checking
|
||||
constraint *structs.Constraint
|
||||
|
||||
@@ -51,37 +55,25 @@ func NewPropertySet(ctx Context, job *structs.Job) *propertySet {
|
||||
func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) {
|
||||
// Store the constraint
|
||||
p.constraint = constraint
|
||||
|
||||
// Retrieve all previously placed allocations
|
||||
ws := memdb.NewWatchSet()
|
||||
allocs, err := p.ctx.State().AllocsByJob(ws, p.jobID, false)
|
||||
if err != nil {
|
||||
p.errorBuilding = fmt.Errorf("failed to get job's allocations: %v", err)
|
||||
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", p.errorBuilding)
|
||||
return
|
||||
}
|
||||
|
||||
// Only want running allocs
|
||||
filtered, _ := structs.FilterTerminalAllocs(allocs)
|
||||
|
||||
// Get all the nodes that have been used by the allocs
|
||||
nodes, err := p.buildNodeMap(filtered)
|
||||
if err != nil {
|
||||
p.errorBuilding = err
|
||||
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
p.populateExisting(constraint, filtered, nodes)
|
||||
p.populateExisting(constraint)
|
||||
}
|
||||
|
||||
// SetTGConstraint is used to parameterize the property set for a
|
||||
// distinct_property constraint set at the task group level. The inputs are the
|
||||
// constraint and the task group name.
|
||||
func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup string) {
|
||||
// Store that this is for a task group
|
||||
p.taskGroup = taskGroup
|
||||
|
||||
// Store the constraint
|
||||
p.constraint = constraint
|
||||
|
||||
p.populateExisting(constraint)
|
||||
}
|
||||
|
||||
// populateExisting is a helper shared when setting the constraint to populate
|
||||
// the existing values.
|
||||
func (p *propertySet) populateExisting(constraint *structs.Constraint) {
|
||||
// Retrieve all previously placed allocations
|
||||
ws := memdb.NewWatchSet()
|
||||
allocs, err := p.ctx.State().AllocsByJob(ws, p.jobID, false)
|
||||
@@ -91,16 +83,8 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup
|
||||
return
|
||||
}
|
||||
|
||||
// Only want running allocs from the task group
|
||||
n := len(allocs)
|
||||
for i := 0; i < n; i++ {
|
||||
if allocs[i].TaskGroup != taskGroup || allocs[i].TerminalStatus() {
|
||||
allocs[i], allocs[n-1] = allocs[n-1], nil
|
||||
i--
|
||||
n--
|
||||
}
|
||||
}
|
||||
allocs = allocs[:n]
|
||||
// Filter to the correct set of allocs
|
||||
allocs = p.filterAllocs(allocs)
|
||||
|
||||
// Get all the nodes that have been used by the allocs
|
||||
nodes, err := p.buildNodeMap(allocs)
|
||||
@@ -110,14 +94,7 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup
|
||||
return
|
||||
}
|
||||
|
||||
p.populateExisting(constraint, allocs, nodes)
|
||||
}
|
||||
|
||||
// populateExisting is a helper shared when setting the constraint to populate
|
||||
// the existing values. The allocations should be filtered appropriately prior
|
||||
// to calling.
|
||||
func (p *propertySet) populateExisting(constraint *structs.Constraint, jobAllocs []*structs.Allocation, nodes map[string]*structs.Node) {
|
||||
for _, alloc := range jobAllocs {
|
||||
for _, alloc := range allocs {
|
||||
nProperty, ok := p.getProperty(nodes[alloc.NodeID], constraint.LTarget)
|
||||
if !ok {
|
||||
continue
|
||||
@@ -141,13 +118,14 @@ func (p *propertySet) PopulateProposed() {
|
||||
for _, updates := range p.ctx.Plan().NodeUpdate {
|
||||
stopping = append(stopping, updates...)
|
||||
}
|
||||
stopping = p.filterAllocs(stopping)
|
||||
|
||||
// Gather the proposed allocations
|
||||
var proposed []*structs.Allocation
|
||||
for _, pallocs := range p.ctx.Plan().NodeAllocation {
|
||||
proposed = append(proposed, pallocs...)
|
||||
}
|
||||
proposed, _ = structs.FilterTerminalAllocs(proposed)
|
||||
proposed = p.filterAllocs(proposed)
|
||||
|
||||
// Get the used nodes
|
||||
both := make([]*structs.Allocation, 0, len(stopping)+len(proposed))
|
||||
@@ -223,6 +201,29 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin
|
||||
return true, ""
|
||||
}
|
||||
|
||||
// filterAllocs filters a set of allocations to just be those that are running
|
||||
// and if the property set is operation at a task group level, for allocations
|
||||
// for that task group
|
||||
func (p *propertySet) filterAllocs(allocs []*structs.Allocation) []*structs.Allocation {
|
||||
n := len(allocs)
|
||||
for i := 0; i < n; i++ {
|
||||
remove := allocs[i].TerminalStatus()
|
||||
|
||||
// If the constraint is on the task group filter the allocations to just
|
||||
// those on the task group
|
||||
if p.taskGroup != "" {
|
||||
remove = remove || allocs[i].TaskGroup != p.taskGroup
|
||||
}
|
||||
|
||||
if remove {
|
||||
allocs[i], allocs[n-1] = allocs[n-1], nil
|
||||
i--
|
||||
n--
|
||||
}
|
||||
}
|
||||
return allocs[:n]
|
||||
}
|
||||
|
||||
// buildNodeMap takes a list of allocations and returns a map of the nodes used
|
||||
// by those allocations
|
||||
func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) {
|
||||
|
||||
Reference in New Issue
Block a user