diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 1bff28f67..54a9955f0 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -28,19 +28,35 @@ func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Schedule } // Process is used to implement the scheduler.Scheduler interface -func (s *CoreScheduler) Process(eval *structs.Evaluation) error { +func (c *CoreScheduler) Process(eval *structs.Evaluation) error { switch eval.JobID { case structs.CoreJobEvalGC: - return s.evalGC(eval) + return c.evalGC(eval) case structs.CoreJobNodeGC: - return s.nodeGC(eval) + return c.nodeGC(eval) case structs.CoreJobJobGC: - return s.jobGC(eval) + return c.jobGC(eval) + case structs.CoreJobForceGC: + return c.forceGC(eval) default: return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID) } } +// forceGC is used to garbage collect all eligible objects. +func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { + if err := c.jobGC(eval); err != nil { + return err + } + if err := c.evalGC(eval); err != nil { + return err + } + + // Node GC must occur after the others to ensure the allocations are + // cleared. + return c.nodeGC(eval) +} + // jobGC is used to garbage collect eligible jobs. func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { // Get all the jobs eligible for garbage collection. @@ -50,7 +66,7 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { } var oldThreshold uint64 - if eval.TriggeredBy == structs.EvalTriggerForceGC { + if eval.JobID == structs.CoreJobForceGC { // The GC was forced, so set the threshold to its maximum so everything // will GC. oldThreshold = math.MaxUint64 @@ -60,9 +76,9 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { tt := c.srv.fsm.TimeTable() cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold) oldThreshold = tt.NearestIndex(cutoff) + c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)", + oldThreshold, c.srv.config.JobGCThreshold) } - c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)", - oldThreshold, c.srv.config.JobGCThreshold) // Collect the allocations, evaluations and jobs to GC var gcAlloc, gcEval, gcJob []string @@ -137,7 +153,7 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { } var oldThreshold uint64 - if eval.TriggeredBy == structs.EvalTriggerForceGC { + if eval.JobID == structs.CoreJobForceGC { // The GC was forced, so set the threshold to its maximum so everything // will GC. oldThreshold = math.MaxUint64 @@ -149,9 +165,9 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { tt := c.srv.fsm.TimeTable() cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold) oldThreshold = tt.NearestIndex(cutoff) + c.srv.logger.Printf("[DEBUG] sched.core: eval GC: scanning before index %d (%v)", + oldThreshold, c.srv.config.EvalGCThreshold) } - c.srv.logger.Printf("[DEBUG] sched.core: eval GC: scanning before index %d (%v)", - oldThreshold, c.srv.config.EvalGCThreshold) // Collect the allocations and evaluations to GC var gcAlloc, gcEval []string @@ -163,12 +179,22 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { return err } - // If the eval is from a "batch" job we don't want to garbage collect - // its allocations. If there is a long running batch job and its + // If the eval is from a running "batch" job we don't want to garbage + // collect its allocations. If there is a long running batch job and its // terminal allocations get GC'd the scheduler would re-run the // allocations. - if len(allocs) != 0 && eval.Type == structs.JobTypeBatch { - continue + if eval.Type == structs.JobTypeBatch { + // Check if the job is running + job, err := c.snap.JobByID(eval.JobID) + if err != nil { + return err + } + + // If the job has been deregistered, we want to garbage collect the + // allocations and evaluations. + if job != nil && len(allocs) != 0 { + continue + } } if gc { @@ -257,7 +283,7 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { } var oldThreshold uint64 - if eval.TriggeredBy == structs.EvalTriggerForceGC { + if eval.JobID == structs.CoreJobForceGC { // The GC was forced, so set the threshold to its maximum so everything // will GC. oldThreshold = math.MaxUint64 @@ -269,9 +295,9 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { tt := c.srv.fsm.TimeTable() cutoff := time.Now().UTC().Add(-1 * c.srv.config.NodeGCThreshold) oldThreshold = tt.NearestIndex(cutoff) + c.srv.logger.Printf("[DEBUG] sched.core: node GC: scanning before index %d (%v)", + oldThreshold, c.srv.config.NodeGCThreshold) } - c.srv.logger.Printf("[DEBUG] sched.core: node GC: scanning before index %d (%v)", - oldThreshold, c.srv.config.NodeGCThreshold) // Collect the nodes to GC var gcNode []string diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 792c30648..20da04d07 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -113,7 +113,77 @@ func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) { } } -func TestCoreScheduler_EvalGC_Batch_Allocs(t *testing.T) { +func TestCoreScheduler_EvalGC_Batch_Allocs_WithJob(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert job. + state := s1.fsm.State() + job := mock.Job() + job.Type = structs.JobTypeBatch + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert "dead" eval + eval := mock.Eval() + eval.Type = structs.JobTypeBatch + eval.Status = structs.EvalStatusFailed + eval.JobID = job.ID + if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + // Insert "dead" alloc + alloc := mock.Alloc() + alloc.EvalID = eval.ID + alloc.JobID = job.ID + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobEvalGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Shouldn't be gone because there are associated allocs. + out, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("bad: %v", out) + } + + outA, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA == nil { + t.Fatalf("bad: %v", outA) + } +} + +func TestCoreScheduler_EvalGC_Batch_Allocs_NoJob(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) @@ -156,22 +226,14 @@ func TestCoreScheduler_EvalGC_Batch_Allocs(t *testing.T) { t.Fatalf("err: %v", err) } - // Shouldn't be gone because there are associated allocs. + // Should be gone because the job is deregistered. out, err := state.EvalByID(eval.ID) if err != nil { t.Fatalf("err: %v", err) } - if out == nil { + if out != nil { t.Fatalf("bad: %v", out) } - - outA, err := state.AllocByID(alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA == nil { - t.Fatalf("bad: %v", outA) - } } func TestCoreScheduler_EvalGC_Force(t *testing.T) { @@ -205,7 +267,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.forceCoreJobEval(structs.CoreJobEvalGC) + gc := s1.coreJobEval(structs.CoreJobForceGC) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -294,7 +356,7 @@ func TestCoreScheduler_NodeGC_Force(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.forceCoreJobEval(structs.CoreJobNodeGC) + gc := s1.coreJobEval(structs.CoreJobForceGC) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -502,7 +564,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.forceCoreJobEval(structs.CoreJobJobGC) + gc := s1.coreJobEval(structs.CoreJobForceGC) err = core.Process(gc) if err != nil { t.Fatalf("test(%s) err: %v", test.test, err) diff --git a/nomad/leader.go b/nomad/leader.go index d1e15806a..077a2cad5 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -270,14 +270,6 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation { } } -// forceCoreJobEval returns an evaluation for a core job that will ignore GC -// cutoffs. -func (s *Server) forceCoreJobEval(job string) *structs.Evaluation { - eval := s.coreJobEval(job) - eval.TriggeredBy = structs.EvalTriggerForceGC - return eval -} - // reapFailedEvaluations is used to reap evaluations that // have reached their delivery limit and should be failed func (s *Server) reapFailedEvaluations(stopCh chan struct{}) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 484cd5bbc..51c04e20a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2435,7 +2435,6 @@ const ( EvalTriggerPeriodicJob = "periodic-job" EvalTriggerNodeUpdate = "node-update" EvalTriggerScheduled = "scheduled" - EvalTriggerForceGC = "force-gc" EvalTriggerRollingUpdate = "rolling-update" ) @@ -2456,6 +2455,9 @@ const ( // evaluations and allocations are terminal. If so, we delete these out of // the system. CoreJobJobGC = "job-gc" + + // CoreJobForceGC is used to force garbage collection of all GCable objects. + CoreJobForceGC = "force-gc" ) // Evaluation is used anytime we need to apply business logic as a result diff --git a/nomad/system_endpoint.go b/nomad/system_endpoint.go index 048651677..510dfef1a 100644 --- a/nomad/system_endpoint.go +++ b/nomad/system_endpoint.go @@ -16,8 +16,6 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen return err } - s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobEvalGC)) - s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobNodeGC)) - s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobJobGC)) + s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC)) return nil }