Merge pull request #586 from hashicorp/f-job-gc

Add garbage collection to jobs
This commit is contained in:
Alex Dadgar
2015-12-18 11:21:50 -08:00
14 changed files with 470 additions and 48 deletions

View File

@@ -131,6 +131,14 @@ type Config struct {
// for GC. This gives users some time to debug a failed evaluation.
EvalGCThreshold time.Duration
// JobGCInterval is how often we dispatch a job to GC jobs that are
// available for garbage collection.
JobGCInterval time.Duration
// JobGCThreshold is how old a job must be before it eligible for GC. This gives
// the user time to inspect the job.
JobGCThreshold time.Duration
// NodeGCInterval is how often we dispatch a job to GC failed nodes.
NodeGCInterval time.Duration
@@ -202,6 +210,8 @@ func DefaultConfig() *Config {
ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour,
JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute,
NodeGCThreshold: 24 * time.Hour,
EvalNackTimeout: 60 * time.Second,

View File

@@ -33,11 +33,90 @@ func (s *CoreScheduler) Process(eval *structs.Evaluation) error {
return s.evalGC(eval)
case structs.CoreJobNodeGC:
return s.nodeGC(eval)
case structs.CoreJobJobGC:
return s.jobGC(eval)
default:
return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID)
}
}
// jobGC is used to garbage collect eligible jobs.
func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
// Get all the jobs eligible for garbage collection.
iter, err := c.snap.JobsByGC(true)
if err != nil {
return err
}
// Get the time table to calculate GC cutoffs.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.JobGCThreshold)
// Collect the allocations, evaluations and jobs to GC
var gcAlloc, gcEval, gcJob []string
OUTER:
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)
// Ignore new jobs.
if job.CreateIndex > oldThreshold {
continue
}
evals, err := c.snap.EvalsByJob(job.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get evals for job %s: %v", job.ID, err)
continue
}
for _, eval := range evals {
gc, allocs, err := c.gcEval(eval, oldThreshold)
if err != nil || !gc {
continue OUTER
}
gcEval = append(gcEval, eval.ID)
gcAlloc = append(gcAlloc, allocs...)
}
// Job is eligible for garbage collection
gcJob = append(gcJob, job.ID)
}
// Fast-path the nothing case
if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 {
return nil
}
c.srv.logger.Printf("[DEBUG] sched.core: job GC: %d jobs, %d evaluations, %d allocs eligible",
len(gcJob), len(gcEval), len(gcAlloc))
// Reap the evals and allocs
if err := c.evalReap(gcEval, gcAlloc); err != nil {
return err
}
// Call to the leader to deregister the jobs.
for _, job := range gcJob {
req := structs.JobDeregisterRequest{
JobID: job,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
}
var resp structs.JobDeregisterResponse
if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err)
return err
}
}
return nil
}
// evalGC is used to garbage collect old evaluations
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
@@ -57,39 +136,16 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string
OUTER:
for {
raw := iter.Next()
if raw == nil {
break
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
eval := raw.(*structs.Evaluation)
// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > oldThreshold {
continue
}
// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
gc, allocs, err := c.gcEval(eval, oldThreshold)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
eval.ID, err)
continue
return err
}
// Scan the allocations to ensure they are terminal and old
for _, alloc := range allocs {
if !alloc.TerminalStatus() || alloc.ModifyIndex > oldThreshold {
continue OUTER
}
}
// Evaluation is eligible for garbage collection
gcEval = append(gcEval, eval.ID)
for _, alloc := range allocs {
gcAlloc = append(gcAlloc, alloc.ID)
if gc {
gcEval = append(gcEval, eval.ID)
gcAlloc = append(gcAlloc, allocs...)
}
}
@@ -100,10 +156,52 @@ OUTER:
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: %d evaluations, %d allocs eligible",
len(gcEval), len(gcAlloc))
return c.evalReap(gcEval, gcAlloc)
}
// gcEval returns whether the eval should be garbage collected given a raft
// threshold index. The eval disqualifies for garbage collection if it or its
// allocs are not older than the threshold. If the eval should be garbage
// collected, the associated alloc ids that should also be removed are also
// returned
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) (
bool, []string, error) {
// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex {
return false, nil, nil
}
// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
eval.ID, err)
return false, nil, err
}
// Scan the allocations to ensure they are terminal and old
for _, alloc := range allocs {
if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex {
return false, nil, nil
}
}
allocIds := make([]string, len(allocs))
for i, alloc := range allocs {
allocIds[i] = alloc.ID
}
// Evaluation is eligible for garbage collection
return true, allocIds, nil
}
// evalReap contacts the leader and issues a reap on the passed evals and
// allocs.
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
// Call to the leader to issue the reap
req := structs.EvalDeleteRequest{
Evals: gcEval,
Allocs: gcAlloc,
Evals: evals,
Allocs: allocs,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
@@ -113,6 +211,7 @@ OUTER:
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
return err
}
return nil
}

View File

@@ -111,3 +111,107 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
t.Fatalf("bad: %v", out)
}
}
func TestCoreScheduler_JobGC(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string
shouldExist bool
}{
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: false,
},
{
test: "Has Alloc",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusRun,
shouldExist: true,
},
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: true,
},
}
for _, test := range tests {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.GC = true
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Insert eval
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = test.evalStatus
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Insert alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = test.allocStatus
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Should still exist
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", err)
}
if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}
outE, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", err)
}
if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}
outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", err)
}
if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) {
t.Fatalf("test(%s) bad: %v", test.test, outA)
}
}
}

View File

@@ -1,6 +1,7 @@
package nomad
import (
"errors"
"fmt"
"time"
@@ -25,12 +26,17 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
if args.Job == nil {
return fmt.Errorf("missing job for registration")
}
if err := args.Job.Validate(); err != nil {
if err := j.checkBlacklist(args.Job); err != nil {
return err
}
// Initialize all the fields of services
args.Job.InitAllServiceFields()
// Initialize the job fields (sets defaults and any necessary init work).
args.Job.InitFields()
if err := args.Job.Validate(); err != nil {
return err
}
if args.Job.Type == structs.JobTypeCore {
return fmt.Errorf("job type cannot be core")
@@ -75,6 +81,16 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return nil
}
// checkBlacklist returns an error if the user has set any blacklisted field in
// the job.
func (j *Job) checkBlacklist(job *structs.Job) error {
if job.GC {
return errors.New("GC field of a job is used only internally and should not be set by user")
}
return nil
}
// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {

View File

@@ -171,6 +171,68 @@ func TestJobEndpoint_Register_Existing(t *testing.T) {
}
}
func TestJobEndpoint_Register_Batch(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.Type = structs.JobTypeBatch
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if !out.GC {
t.Fatal("expect batch job to be made garbage collectible")
}
}
func TestJobEndpoint_Register_GC_Set(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.GC = true
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err == nil {
t.Fatalf("expect err")
}
}
func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue

View File

@@ -173,6 +173,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
defer evalGC.Stop()
nodeGC := time.NewTicker(s.config.NodeGCInterval)
defer nodeGC.Stop()
jobGC := time.NewTicker(s.config.JobGCInterval)
defer jobGC.Stop()
for {
select {
@@ -180,6 +182,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC))
case <-nodeGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC))
case <-jobGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC))
case <-stopCh:
return
}

View File

@@ -231,12 +231,6 @@ func Alloc() *structs.Allocation {
},
},
},
Services: map[string]string{"web-frontend": "nomad-registered-task-1234"},
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStatePending,
},
},
Job: Job(),
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
// stateStoreSchema is used to return the schema for the state store
@@ -100,10 +101,29 @@ func jobTableSchema() *memdb.TableSchema {
Lowercase: false,
},
},
"gc": &memdb.IndexSchema{
Name: "gc",
AllowMissing: false,
Unique: false,
Indexer: &memdb.ConditionalIndex{
Conditional: jobIsGCable,
},
},
},
}
}
// jobIsGCable satisfies the ConditionalIndexFunc interface and creates an index
// on whether a job is eligible for garbage collection.
func jobIsGCable(obj interface{}) (bool, error) {
j, ok := obj.(*structs.Job)
if !ok {
return false, fmt.Errorf("Unexpected type: %v", obj)
}
return j.GC, nil
}
// evalTableSchema returns the MemDB schema for the eval table.
// This table is used to store all the evaluations that are pending
// or recently completed.

View File

@@ -372,6 +372,18 @@ func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator
return iter, nil
}
// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage
// collection.
func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
iter, err := txn.Get("jobs", "gc", gc)
if err != nil {
return nil, err
}
return iter, nil
}
// UpsertEvaluation is used to upsert an evaluation
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true)

View File

@@ -471,6 +471,63 @@ func TestStateStore_JobsByScheduler(t *testing.T) {
}
}
func TestStateStore_JobsByGC(t *testing.T) {
state := testStateStore(t)
var gc, nonGc []*structs.Job
for i := 0; i < 10; i++ {
job := mock.Job()
nonGc = append(nonGc, job)
if err := state.UpsertJob(1000+uint64(i), job); err != nil {
t.Fatalf("err: %v", err)
}
}
for i := 0; i < 10; i++ {
job := mock.Job()
job.GC = true
gc = append(gc, job)
if err := state.UpsertJob(2000+uint64(i), job); err != nil {
t.Fatalf("err: %v", err)
}
}
iter, err := state.JobsByGC(true)
if err != nil {
t.Fatalf("err: %v", err)
}
var outGc []*structs.Job
for i := iter.Next(); i != nil; i = iter.Next() {
outGc = append(outGc, i.(*structs.Job))
}
iter, err = state.JobsByGC(false)
if err != nil {
t.Fatalf("err: %v", err)
}
var outNonGc []*structs.Job
for i := iter.Next(); i != nil; i = iter.Next() {
outNonGc = append(outNonGc, i.(*structs.Job))
}
sort.Sort(JobIDSort(gc))
sort.Sort(JobIDSort(nonGc))
sort.Sort(JobIDSort(outGc))
sort.Sort(JobIDSort(outNonGc))
if !reflect.DeepEqual(gc, outGc) {
t.Fatalf("bad: %#v %#v", gc, outGc)
}
if !reflect.DeepEqual(nonGc, outNonGc) {
t.Fatalf("bad: %#v %#v", nonGc, outNonGc)
}
}
func TestStateStore_RestoreJob(t *testing.T) {
state := testStateStore(t)
job := mock.Job()

View File

@@ -22,19 +22,27 @@ func TestRemoveAllocs(t *testing.T) {
}
}
func TestFilterTerminalALlocs(t *testing.T) {
func TestFilterTerminalAllocs(t *testing.T) {
l := []*Allocation{
&Allocation{ID: "foo", DesiredStatus: AllocDesiredStatusRun},
&Allocation{ID: "bar", DesiredStatus: AllocDesiredStatusEvict},
&Allocation{ID: "baz", DesiredStatus: AllocDesiredStatusStop},
&Allocation{ID: "zip", DesiredStatus: AllocDesiredStatusRun},
&Allocation{
ID: "foo",
DesiredStatus: AllocDesiredStatusRun,
ClientStatus: AllocClientStatusPending,
},
&Allocation{
ID: "bam",
DesiredStatus: AllocDesiredStatusRun,
ClientStatus: AllocClientStatusDead,
},
}
out := FilterTerminalAllocs(l)
if len(out) != 2 {
if len(out) != 1 {
t.Fatalf("bad: %#v", out)
}
if out[0].ID != "foo" && out[1].ID != "zip" {
if out[0].ID != "foo" {
t.Fatalf("bad: %#v", out)
}
}

View File

@@ -764,6 +764,10 @@ type Job struct {
// Periodic is used to define the interval the job is run at.
Periodic *PeriodicConfig
// GC is used to mark the job as available for garbage collection after it
// has no outstanding evaluations or allocations.
GC bool
// Meta is used to associate arbitrary metadata with this
// job. This is opaque to Nomad.
Meta map[string]string
@@ -779,6 +783,18 @@ type Job struct {
ModifyIndex uint64
}
// InitFields is used to initialize fields in the Job. This should be called
// when registering a Job.
func (j *Job) InitFields() {
// Initialize the service block.
j.InitAllServiceFields()
// If the job is batch then make it GC.
if j.Type == JobTypeBatch {
j.GC = true
}
}
// InitAllServiceFields traverses all Task Groups and makes them
// interpolate Job, Task group and Task names in all Service names.
// It also generates the check names if they are not set. This method also
@@ -1470,12 +1486,20 @@ type Allocation struct {
ModifyIndex uint64
}
// TerminalStatus returns if the desired status is terminal and
// will no longer transition. This is not based on the current client status.
// TerminalStatus returns if the desired or actual status is terminal and
// will no longer transition.
func (a *Allocation) TerminalStatus() bool {
// First check the desired state and if that isn't terminal, check client
// state.
switch a.DesiredStatus {
case AllocDesiredStatusStop, AllocDesiredStatusEvict, AllocDesiredStatusFailed:
return true
default:
}
switch a.ClientStatus {
case AllocClientStatusDead, AllocClientStatusFailed:
return true
default:
return false
}
@@ -1656,6 +1680,12 @@ const (
// We periodically scan nodes in a terminal state, and if they have no
// corresponding allocations we delete these out of the system.
CoreJobNodeGC = "node-gc"
// CoreJobJobGC is used for the garbage collection of eligible jobs. We
// periodically scan garbage collectible jobs and check if both their
// evaluations and allocations are terminal. If so, we delete these out of
// the system.
CoreJobJobGC = "job-gc"
)
// Evaluation is used anytime we need to apply business logic as a result

View File

@@ -61,6 +61,7 @@ func TestEvalContext_ProposedAlloc(t *testing.T) {
MemoryMB: 2048,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
alloc2 := &structs.Allocation{
ID: structs.GenerateUUID(),
@@ -72,6 +73,7 @@ func TestEvalContext_ProposedAlloc(t *testing.T) {
MemoryMB: 1024,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))

View File

@@ -203,6 +203,7 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
MemoryMB: 2048,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
alloc2 := &structs.Allocation{
ID: structs.GenerateUUID(),
@@ -214,6 +215,7 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
MemoryMB: 1024,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
@@ -277,6 +279,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
MemoryMB: 2048,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
alloc2 := &structs.Allocation{
ID: structs.GenerateUUID(),
@@ -288,6 +291,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
MemoryMB: 1024,
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
@@ -317,7 +321,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
t.Fatalf("Bad: %v", out[0])
}
if out[1].Score != 18 {
t.Fatalf("Bad: %v", out[0])
t.Fatalf("Bad: %v", out[1])
}
}