From 335d8b42212e459a818df56380df0a83243b98ea Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 15 Aug 2015 16:07:50 -0700 Subject: [PATCH] nomad: core scheduler will GC evaluations and allocations --- nomad/core_sched.go | 83 +++++++++++++++++++++++++++++++++++++++- nomad/core_sched_test.go | 65 +++++++++++++++++++++++++++++++ nomad/leader.go | 9 ++--- 3 files changed, 151 insertions(+), 6 deletions(-) create mode 100644 nomad/core_sched_test.go diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 3874722cc..edd62212c 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -1,6 +1,8 @@ package nomad import ( + "fmt" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -24,6 +26,85 @@ func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Schedule } // Process is used to implement the scheduler.Scheduler interface -func (s *CoreScheduler) Process(*structs.Evaluation) error { +func (s *CoreScheduler) Process(eval *structs.Evaluation) error { + switch eval.JobID { + case structs.CoreJobEvalGC: + return s.evalGC(eval) + default: + return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID) + } +} + +// evalGC is used to garbage collect old evaluations +func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { + // Iterate over the evaluations + iter, err := c.snap.Evals() + if err != nil { + return err + } + + // TODO: compute the old threshold limit for GC. + // This is a rough mapping of a time duration to the + // Raft index it belongs to. + var oldThreshold uint64 = eval.ModifyIndex + + // Collect the allocations and evaluations to GC + var gcAlloc, gcEval []string + +OUTER: + for { + raw := iter.Next() + if raw == nil { + break + } + eval := raw.(*structs.Evaluation) + + // Ignore non-terminal and new evaluations + if !eval.TerminalStatus() || eval.ModifyIndex > oldThreshold { + continue + } + + // Get the allocations by eval + allocs, err := c.snap.AllocsByEval(eval.ID) + if err != nil { + c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v", + eval.ID, err) + continue + } + + // Scan the allocations to ensure they are terminal and old + for _, alloc := range allocs { + if !alloc.TerminalStatus() || alloc.ModifyIndex > oldThreshold { + continue OUTER + } + } + + // Evaluation is eligible for garbage collection + gcEval = append(gcEval, eval.ID) + for _, alloc := range allocs { + gcAlloc = append(gcAlloc, alloc.ID) + } + } + + // Fast-path the nothing case + if len(gcEval) == 0 && len(gcAlloc) == 0 { + return nil + } + c.srv.logger.Printf("[DEBUG] sched.core: eval GC: %d evaluations, %d allocs", + len(gcEval), len(gcAlloc)) + + // Call to the leader to issue the reap + req := structs.EvalDeleteRequest{ + Evals: gcEval, + Allocs: gcAlloc, + WriteRequest: structs.WriteRequest{ + Region: c.srv.config.Region, + }, + } + var resp structs.GenericResponse + if err := c.srv.RPC("Eval.Reap", &req, &resp); err != nil { + c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err) + return err + } return nil } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go new file mode 100644 index 000000000..823c507f4 --- /dev/null +++ b/nomad/core_sched_test.go @@ -0,0 +1,65 @@ +package nomad + +import ( + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" +) + +func TestCoreScheduler_EvalGC(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert "dead" eval + state := s1.fsm.State() + eval := mock.Eval() + eval.Status = structs.EvalStatusFailed + err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert "dead" alloc + alloc := mock.Alloc() + alloc.EvalID = eval.ID + alloc.Status = structs.AllocStatusFailed + err = state.UpdateAllocations(1001, nil, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobEvalGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should be gone + out, err := state.GetEvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %v", out) + } + + outA, err := state.GetAllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA != nil { + t.Fatalf("bad: %v", outA) + } +} diff --git a/nomad/leader.go b/nomad/leader.go index 74d3893e1..1f0f03307 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -153,16 +153,16 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { for { select { case <-evalGC.C: - s.dispatchCoreJob(structs.CoreJobEvalGC) + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC)) case <-stopCh: return } } } -// dispatchCoreJob is used to create an evaluation for a core job -func (s *Server) dispatchCoreJob(job string) { - eval := &structs.Evaluation{ +// coreJobEval returns an evaluation for a core job +func (s *Server) coreJobEval(job string) *structs.Evaluation { + return &structs.Evaluation{ ID: generateUUID(), Priority: structs.CoreJobPriority, Type: structs.JobTypeCore, @@ -171,7 +171,6 @@ func (s *Server) dispatchCoreJob(job string) { Status: structs.EvalStatusPending, ModifyIndex: s.raft.AppliedIndex(), } - s.evalBroker.Enqueue(eval) } // revokeLeadership is invoked once we step down as leader.