mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
Remove running, system scheduler, and fix tg overriding eligibility
This commit is contained in:
@@ -23,7 +23,6 @@ const (
|
||||
type BlockedEvals struct {
|
||||
evalBroker *EvalBroker
|
||||
enabled bool
|
||||
running bool
|
||||
stats *BlockedStats
|
||||
l sync.RWMutex
|
||||
|
||||
@@ -92,11 +91,15 @@ func (b *BlockedEvals) Enabled() bool {
|
||||
// should only be enabled on the active leader.
|
||||
func (b *BlockedEvals) SetEnabled(enabled bool) {
|
||||
b.l.Lock()
|
||||
b.enabled = enabled
|
||||
if !b.running {
|
||||
b.running = true
|
||||
if b.enabled == enabled {
|
||||
// No-op
|
||||
return
|
||||
} else if enabled {
|
||||
go b.watchCapacity()
|
||||
} else {
|
||||
close(b.stopCh)
|
||||
}
|
||||
b.enabled = enabled
|
||||
b.l.Unlock()
|
||||
if !enabled {
|
||||
b.Flush()
|
||||
@@ -181,7 +184,7 @@ func (b *BlockedEvals) unblock(computedClass string) {
|
||||
defer b.l.Unlock()
|
||||
|
||||
// Protect against the case of a flush.
|
||||
if !b.running {
|
||||
if !b.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -265,12 +268,6 @@ func (b *BlockedEvals) Flush() {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
// Kill any running goroutines
|
||||
if b.running {
|
||||
close(b.stopCh)
|
||||
b.running = false
|
||||
}
|
||||
|
||||
// Reset the blocked eval tracker.
|
||||
b.stats.TotalEscaped = 0
|
||||
b.stats.TotalBlocked = 0
|
||||
|
||||
@@ -242,7 +242,12 @@ func (e *EvalEligibility) GetClasses() map[string]bool {
|
||||
case EvalComputedClassEligible:
|
||||
elig[class] = true
|
||||
case EvalComputedClassIneligible:
|
||||
elig[class] = false
|
||||
// Only mark as ineligible if it hasn't been marked before. This
|
||||
// prevents one task group marking a class as ineligible when it
|
||||
// is eligible on another task group.
|
||||
if _, ok := elig[class]; !ok {
|
||||
elig[class] = false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,6 +216,10 @@ func TestEvalEligibility_GetClasses(t *testing.T) {
|
||||
e.SetTaskGroupEligibility(false, "bar", "v1:4")
|
||||
e.SetTaskGroupEligibility(true, "bar", "v1:5")
|
||||
|
||||
// Mark an existing eligible class as ineligible in the TG.
|
||||
e.SetTaskGroupEligibility(false, "fizz", "v1:1")
|
||||
e.SetTaskGroupEligibility(false, "fizz", "v1:3")
|
||||
|
||||
expClasses := map[string]bool{
|
||||
"v1:1": true,
|
||||
"v1:2": false,
|
||||
|
||||
@@ -35,8 +35,6 @@ type SystemScheduler struct {
|
||||
|
||||
limitReached bool
|
||||
nextEval *structs.Evaluation
|
||||
|
||||
blocked *structs.Evaluation
|
||||
}
|
||||
|
||||
// NewSystemScheduler is a factory function to instantiate a new system
|
||||
@@ -129,19 +127,6 @@ func (s *SystemScheduler) process() (bool, error) {
|
||||
s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID)
|
||||
}
|
||||
|
||||
// If there are failed allocations, we need to create a blocked evaluation
|
||||
// to place the failed allocations when resources become available.
|
||||
if len(s.plan.FailedAllocs) != 0 && s.blocked == nil {
|
||||
e := s.ctx.Eligibility()
|
||||
classes := e.GetClasses()
|
||||
s.blocked = s.eval.BlockedEval(classes, e.HasEscaped())
|
||||
if err := s.planner.CreateEval(s.blocked); err != nil {
|
||||
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
|
||||
return false, err
|
||||
}
|
||||
s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID)
|
||||
}
|
||||
|
||||
// Submit the plan
|
||||
result, newState, err := s.planner.SubmitPlan(s.plan)
|
||||
if err != nil {
|
||||
|
||||
@@ -184,89 +184,6 @@ func TestSystemSched_JobRegister_AllocFail(t *testing.T) {
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
}
|
||||
|
||||
func TestSystemSched_JobRegister_BlockedEval(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
// Create a full node
|
||||
node := mock.Node()
|
||||
node.Reserved = node.Resources
|
||||
node.ComputeClass()
|
||||
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
|
||||
|
||||
// Create an ineligible node
|
||||
node2 := mock.Node()
|
||||
node2.Attributes["kernel.name"] = "windows"
|
||||
node2.ComputeClass()
|
||||
noErr(t, h.State.UpsertNode(h.NextIndex(), node2))
|
||||
|
||||
// Create a jobs
|
||||
job := mock.SystemJob()
|
||||
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
|
||||
|
||||
// Create a mock evaluation to register the job
|
||||
eval := &structs.Evaluation{
|
||||
ID: structs.GenerateUUID(),
|
||||
Priority: job.Priority,
|
||||
TriggeredBy: structs.EvalTriggerJobRegister,
|
||||
JobID: job.ID,
|
||||
}
|
||||
|
||||
// Process the evaluation
|
||||
err := h.Process(NewServiceScheduler, eval)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure a single plan
|
||||
if len(h.Plans) != 1 {
|
||||
t.Fatalf("bad: %#v", h.Plans)
|
||||
}
|
||||
plan := h.Plans[0]
|
||||
|
||||
// Ensure the plan has created a follow up eval.
|
||||
if len(h.CreateEvals) != 1 {
|
||||
t.Fatalf("bad: %#v", h.CreateEvals)
|
||||
}
|
||||
|
||||
created := h.CreateEvals[0]
|
||||
if created.Status != structs.EvalStatusBlocked {
|
||||
t.Fatalf("bad: %#v", created)
|
||||
}
|
||||
|
||||
classes := created.ClassEligibility
|
||||
if len(classes) != 2 || !classes[node.ComputedClass] || classes[node2.ComputedClass] {
|
||||
t.Fatalf("bad: %#v", classes)
|
||||
}
|
||||
|
||||
if created.EscapedComputedClass {
|
||||
t.Fatalf("bad: %#v", created)
|
||||
}
|
||||
|
||||
// Ensure the plan failed to alloc
|
||||
if len(plan.FailedAllocs) != 1 {
|
||||
t.Fatalf("bad: %#v", plan)
|
||||
}
|
||||
|
||||
// Lookup the allocations by JobID
|
||||
out, err := h.State.AllocsByJob(job.ID)
|
||||
noErr(t, err)
|
||||
|
||||
// Ensure all allocations placed
|
||||
if len(out) != 1 {
|
||||
for _, a := range out {
|
||||
t.Logf("%#v", a)
|
||||
}
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
|
||||
// Check the available nodes
|
||||
if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 2 {
|
||||
t.Fatalf("bad: %#v", out[0].Metrics)
|
||||
}
|
||||
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
}
|
||||
|
||||
func TestSystemSched_JobModify(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user