diff --git a/api/jobs.go b/api/jobs.go index 17e75daff..f50826ad3 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -107,6 +107,12 @@ type UpdateStrategy struct { MaxParallel int } +// JobGCConfig configures the garbage collection policy of a job. +type JobGCConfig struct { + Enabled bool + Threshold time.Duration +} + // Job is used to serialize a job. type Job struct { Region string @@ -119,6 +125,7 @@ type Job struct { Constraints []*Constraint TaskGroups []*TaskGroup Update *UpdateStrategy + GC *JobGCConfig Meta map[string]string Status string StatusDescription string diff --git a/jobspec/parse.go b/jobspec/parse.go index 765f58b3a..723fac6a1 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -91,6 +91,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { delete(m, "meta") delete(m, "update") delete(m, "periodic") + delete(m, "gc") // Set the ID and name to the object key result.ID = obj.Keys[0].Token.Value().(string) @@ -135,6 +136,13 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { } } + // If we have a gc config, then parse that + if o := listVal.Filter("gc"); len(o.Items) > 0 { + if err := parseGC(&result.GC, o); err != nil { + return err + } + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { @@ -714,3 +722,39 @@ func parsePeriodic(result **structs.PeriodicConfig, list *ast.ObjectList) error *result = &p return nil } + +func parseGC(result **structs.JobGCConfig, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'gc' block allowed per job") + } + + // Get our resource object + o := list.Items[0] + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return err + } + + // Enabled by default if the gc block exists. + if value, ok := m["enabled"]; !ok { + m["Enabled"] = true + } else { + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("gc.enabled should be set to true or false; %v", err) + } + m["Enabled"] = enabled + } + + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: result, + }) + if err != nil { + return err + } + return dec.Decode(m) +} diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 4497348eb..768fa9988 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -246,6 +246,22 @@ func TestParse(t *testing.T) { false, }, + { + "gc.hcl", + &structs.Job{ + ID: "foo", + Name: "foo", + Priority: 50, + Region: "global", + Type: "service", + GC: &structs.JobGCConfig{ + Enabled: true, + Threshold: 2 * time.Hour, + }, + }, + false, + }, + { "specify-job.hcl", &structs.Job{ diff --git a/jobspec/test-fixtures/gc.hcl b/jobspec/test-fixtures/gc.hcl new file mode 100644 index 000000000..cef4bcabd --- /dev/null +++ b/jobspec/test-fixtures/gc.hcl @@ -0,0 +1,5 @@ +job "foo" { + gc { + threshold = "2h" + } +} diff --git a/nomad/config.go b/nomad/config.go index 91986644f..850aa58c2 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -131,6 +131,10 @@ type Config struct { // for GC. This gives users some time to debug a failed evaluation. EvalGCThreshold time.Duration + // JobGCInterval is how often we dispatch a job to GC jobs that are + // available for garbage collection. + JobGCInterval time.Duration + // NodeGCInterval is how often we dispatch a job to GC failed nodes. NodeGCInterval time.Duration @@ -202,6 +206,7 @@ func DefaultConfig() *Config { ReconcileInterval: 60 * time.Second, EvalGCInterval: 5 * time.Minute, EvalGCThreshold: 1 * time.Hour, + JobGCInterval: 5 * time.Minute, NodeGCInterval: 5 * time.Minute, NodeGCThreshold: 24 * time.Hour, EvalNackTimeout: 60 * time.Second, diff --git a/nomad/core_sched.go b/nomad/core_sched.go index b5ed092f9..27ca5cfcf 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -33,11 +33,88 @@ func (s *CoreScheduler) Process(eval *structs.Evaluation) error { return s.evalGC(eval) case structs.CoreJobNodeGC: return s.nodeGC(eval) + case structs.CoreJobJobGC: + return s.jobGC(eval) default: return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID) } } +// jobGC is used to garbage collect eligible jobs. +func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { + // Get all the jobs eligible for garbage collection. + jIter, err := c.snap.JobsByGC(true) + if err != nil { + return err + } + + // Get the time table to calculate GC cutoffs. + tt := c.srv.fsm.TimeTable() + + // Collect the allocations and evaluations to GC + var gcAlloc, gcEval, gcJob []string + +OUTER: + for i := jIter.Next(); i != nil; i = jIter.Next() { + job := i.(*structs.Job) + cutoff := time.Now().UTC().Add(-1 * job.GC.Threshold) + oldThreshold := tt.NearestIndex(cutoff) + + // Ignore new jobs. + if job.CreateIndex > oldThreshold { + continue OUTER + } + + evals, err := c.snap.EvalsByJob(job.ID) + if err != nil { + c.srv.logger.Printf("[ERR] sched.core: failed to get evals for job %s: %v", job.ID, err) + continue + } + + for _, eval := range evals { + gc, allocs, err := c.gcEval(eval, oldThreshold) + if err != nil || !gc { + continue OUTER + } + + gcEval = append(gcEval, eval.ID) + gcAlloc = append(gcAlloc, allocs...) + } + + // Job is eligible for garbage collection + gcJob = append(gcJob, job.ID) + } + + // Fast-path the nothing case + if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 { + return nil + } + c.srv.logger.Printf("[DEBUG] sched.core: job GC: %d jobs, %d evaluations, %d allocs eligible", + len(gcJob), len(gcEval), len(gcAlloc)) + + // Reap the evals and allocs + if err := c.evalReap(gcEval, gcAlloc); err != nil { + return err + } + + // Call to the leader to deregister the jobs. + for _, job := range gcJob { + req := structs.JobDeregisterRequest{ + JobID: job, + WriteRequest: structs.WriteRequest{ + Region: c.srv.config.Region, + }, + } + var resp structs.JobDeregisterResponse + if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil { + c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err) + return err + } + } + + return nil +} + // evalGC is used to garbage collect old evaluations func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { // Iterate over the evaluations @@ -57,39 +134,16 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { // Collect the allocations and evaluations to GC var gcAlloc, gcEval []string - -OUTER: - for { - raw := iter.Next() - if raw == nil { - break - } + for raw := iter.Next(); raw != nil; raw = iter.Next() { eval := raw.(*structs.Evaluation) - - // Ignore non-terminal and new evaluations - if !eval.TerminalStatus() || eval.ModifyIndex > oldThreshold { - continue - } - - // Get the allocations by eval - allocs, err := c.snap.AllocsByEval(eval.ID) + gc, allocs, err := c.gcEval(eval, oldThreshold) if err != nil { - c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v", - eval.ID, err) - continue + return err } - // Scan the allocations to ensure they are terminal and old - for _, alloc := range allocs { - if !alloc.TerminalStatus() || alloc.ModifyIndex > oldThreshold { - continue OUTER - } - } - - // Evaluation is eligible for garbage collection - gcEval = append(gcEval, eval.ID) - for _, alloc := range allocs { - gcAlloc = append(gcAlloc, alloc.ID) + if gc { + gcEval = append(gcEval, eval.ID) + gcAlloc = append(gcAlloc, allocs...) } } @@ -100,10 +154,52 @@ OUTER: c.srv.logger.Printf("[DEBUG] sched.core: eval GC: %d evaluations, %d allocs eligible", len(gcEval), len(gcAlloc)) + return c.evalReap(gcEval, gcAlloc) +} + +// gcEval returns whether the eval should be garbage collected given a raft +// threshold index. The eval disqualifies for garbage collection if it or its +// allocs are not older than the threshold. If the eval should be garbage +// collected, the associated alloc ids that should also be removed are also +// returned +func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) ( + bool, []string, error) { + // Ignore non-terminal and new evaluations + if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex { + return false, nil, nil + } + + // Get the allocations by eval + allocs, err := c.snap.AllocsByEval(eval.ID) + if err != nil { + c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v", + eval.ID, err) + return false, nil, err + } + + // Scan the allocations to ensure they are terminal and old + for _, alloc := range allocs { + if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex { + return false, nil, nil + } + } + + allocIds := make([]string, len(allocs)) + for i, alloc := range allocs { + allocIds[i] = alloc.ID + } + + // Evaluation is eligible for garbage collection + return true, allocIds, nil +} + +// evalReap contacts the leader and issues a reap on the passed evals and +// allocs. +func (c *CoreScheduler) evalReap(evals, allocs []string) error { // Call to the leader to issue the reap req := structs.EvalDeleteRequest{ - Evals: gcEval, - Allocs: gcAlloc, + Evals: evals, + Allocs: allocs, WriteRequest: structs.WriteRequest{ Region: c.srv.config.Region, }, @@ -113,6 +209,7 @@ OUTER: c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err) return err } + return nil } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index b8dfae961..2d4057d50 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -111,3 +111,111 @@ func TestCoreScheduler_NodeGC(t *testing.T) { t.Fatalf("bad: %v", out) } } + +func TestCoreScheduler_JobGC(t *testing.T) { + tests := []struct { + test, evalStatus, allocStatus string + shouldExist bool + }{ + { + test: "Terminal", + evalStatus: structs.EvalStatusFailed, + allocStatus: structs.AllocDesiredStatusFailed, + shouldExist: false, + }, + { + test: "Has Alloc", + evalStatus: structs.EvalStatusFailed, + allocStatus: structs.AllocDesiredStatusRun, + shouldExist: true, + }, + { + test: "Has Eval", + evalStatus: structs.EvalStatusPending, + allocStatus: structs.AllocDesiredStatusFailed, + shouldExist: true, + }, + } + + for _, test := range tests { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert job. + state := s1.fsm.State() + job := mock.Job() + threshold := 1 * time.Hour + job.GC = &structs.JobGCConfig{ + Enabled: true, + Threshold: threshold, + } + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Insert eval + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = test.evalStatus + err = state.UpsertEvals(1001, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Insert alloc + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.EvalID = eval.ID + alloc.DesiredStatus = test.allocStatus + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*threshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobJobGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Should still exist + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("test(%s) err: %v", err) + } + if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) { + t.Fatalf("test(%s) bad: %v", test.test, out) + } + + outE, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("test(%s) err: %v", err) + } + if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) { + t.Fatalf("test(%s) bad: %v", test.test, out) + } + + outA, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("test(%s) err: %v", err) + } + if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) { + t.Fatalf("test(%s) bad: %v", test.test, outA) + } + } +} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 4d8cc128d..f693af9c4 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -29,8 +29,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } - // Initialize all the fields of services - args.Job.InitAllServiceFields() + // Initialize the job fields (sets defaults and any other necessary init + // work). + args.Job.InitFields() if args.Job.Type == structs.JobTypeCore { return fmt.Errorf("job type cannot be core") diff --git a/nomad/leader.go b/nomad/leader.go index 8e0d6be7d..0e267a7dc 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -173,6 +173,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { defer evalGC.Stop() nodeGC := time.NewTicker(s.config.NodeGCInterval) defer nodeGC.Stop() + jobGC := time.NewTicker(s.config.JobGCInterval) + defer jobGC.Stop() for { select { @@ -180,6 +182,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC)) case <-nodeGC.C: s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC)) + case <-jobGC.C: + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC)) case <-stopCh: return } diff --git a/nomad/state/schema.go b/nomad/state/schema.go index dfd663aba..f795d1ff0 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/structs" ) // stateStoreSchema is used to return the schema for the state store @@ -100,10 +101,33 @@ func jobTableSchema() *memdb.TableSchema { Lowercase: false, }, }, + "gc": &memdb.IndexSchema{ + Name: "gc", + AllowMissing: false, + Unique: false, + Indexer: &memdb.ConditionalIndex{ + Conditional: jobIsGCable, + }, + }, }, } } +// jobIsGCable satisfies the ConditionalIndexFunc interface and creates an index +// on whether a job is eligible for garbage collection. +func jobIsGCable(obj interface{}) (bool, error) { + j, ok := obj.(*structs.Job) + if !ok { + return false, fmt.Errorf("Unexpected type: %v", obj) + } + + if j.GC == nil || !j.GC.Enabled { + return false, nil + } + + return true, nil +} + // evalTableSchema returns the MemDB schema for the eval table. // This table is used to store all the evaluations that are pending // or recently completed. diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 30ee87259..c6236183a 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -372,6 +372,18 @@ func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator return iter, nil } +// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage +// collection. +func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) { + txn := s.db.Txn(false) + + iter, err := txn.Get("jobs", "gc", gc) + if err != nil { + return nil, err + } + return iter, nil +} + // UpsertEvaluation is used to upsert an evaluation func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error { txn := s.db.Txn(true) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 5e1021e55..aa497cd4a 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -471,6 +471,66 @@ func TestStateStore_JobsByScheduler(t *testing.T) { } } +func TestStateStore_JobsByGC(t *testing.T) { + state := testStateStore(t) + var gc, nonGc []*structs.Job + + for i := 0; i < 10; i++ { + job := mock.Job() + nonGc = append(nonGc, job) + + if err := state.UpsertJob(1000+uint64(i), job); err != nil { + t.Fatalf("err: %v", err) + } + } + + for i := 0; i < 10; i++ { + job := mock.Job() + job.GC = &structs.JobGCConfig{ + Enabled: true, + Threshold: structs.DefaultJobGCThreshold, + } + gc = append(gc, job) + + if err := state.UpsertJob(2000+uint64(i), job); err != nil { + t.Fatalf("err: %v", err) + } + } + + iter, err := state.JobsByGC(true) + if err != nil { + t.Fatalf("err: %v", err) + } + + var outGc []*structs.Job + for i := iter.Next(); i != nil; i = iter.Next() { + outGc = append(outGc, i.(*structs.Job)) + } + + iter, err = state.JobsByGC(false) + if err != nil { + t.Fatalf("err: %v", err) + } + + var outNonGc []*structs.Job + for i := iter.Next(); i != nil; i = iter.Next() { + outNonGc = append(outNonGc, i.(*structs.Job)) + } + + sort.Sort(JobIDSort(gc)) + sort.Sort(JobIDSort(nonGc)) + sort.Sort(JobIDSort(outGc)) + sort.Sort(JobIDSort(outNonGc)) + + if !reflect.DeepEqual(gc, outGc) { + t.Fatalf("bad: %#v %#v", gc, outGc) + } + + if !reflect.DeepEqual(nonGc, outNonGc) { + t.Fatalf("bad: %#v %#v", nonGc, outNonGc) + } +} + func TestStateStore_RestoreJob(t *testing.T) { state := testStateStore(t) job := mock.Job() diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index d156394dc..a3bf23885 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -22,19 +22,47 @@ func TestRemoveAllocs(t *testing.T) { } } -func TestFilterTerminalALlocs(t *testing.T) { +func TestFilterTerminalAllocs(t *testing.T) { l := []*Allocation{ - &Allocation{ID: "foo", DesiredStatus: AllocDesiredStatusRun}, &Allocation{ID: "bar", DesiredStatus: AllocDesiredStatusEvict}, &Allocation{ID: "baz", DesiredStatus: AllocDesiredStatusStop}, - &Allocation{ID: "zip", DesiredStatus: AllocDesiredStatusRun}, + &Allocation{ + ID: "zip", + DesiredStatus: AllocDesiredStatusRun, + TaskStates: map[string]*TaskState{ + "a": &TaskState{State: TaskStatePending}, + }, + }, + &Allocation{ + ID: "foo", + DesiredStatus: AllocDesiredStatusRun, + TaskStates: map[string]*TaskState{ + "a": &TaskState{State: TaskStatePending}, + }, + }, + &Allocation{ + ID: "bam", + DesiredStatus: AllocDesiredStatusRun, + TaskStates: map[string]*TaskState{ + "a": &TaskState{State: TaskStatePending}, + "b": &TaskState{State: TaskStateDead}, + }, + }, + &Allocation{ + ID: "fizz", + DesiredStatus: AllocDesiredStatusRun, + TaskStates: map[string]*TaskState{ + "a": &TaskState{State: TaskStateDead}, + "b": &TaskState{State: TaskStateDead}, + }, + }, } out := FilterTerminalAllocs(l) - if len(out) != 2 { + if len(out) != 3 { t.Fatalf("bad: %#v", out) } - if out[0].ID != "foo" && out[1].ID != "zip" { + if out[0].ID != "zip" && out[1].ID != "foo" && out[2].ID != "bam" { t.Fatalf("bad: %#v", out) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a86fae843..6d384623e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -764,6 +764,10 @@ type Job struct { // Periodic is used to define the interval the job is run at. Periodic *PeriodicConfig + // GC is used to mark the job as available for garbage collection after it + // has no outstanding evaluations or allocations. + GC *JobGCConfig + // Meta is used to associate arbitrary metadata with this // job. This is opaque to Nomad. Meta map[string]string @@ -779,6 +783,18 @@ type Job struct { ModifyIndex uint64 } +// InitFields is used to initialize fields in the Job. This should be called +// when registering a Job. +func (j *Job) InitFields() { + // Initialize the service block. + j.InitAllServiceFields() + + // Initalize the GC policy + if j.GC != nil { + j.GC.Init() + } +} + // InitAllServiceFields traverses all Task Groups and makes them // interpolate Job, Task group and Task names in all Service names. // It also generates the check names if they are not set. This method also @@ -854,6 +870,13 @@ func (j *Job) Validate() error { fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch)) } + // Validate the GC config. + if j.GC != nil { + if err := j.GC.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } @@ -899,6 +922,37 @@ type JobListStub struct { ModifyIndex uint64 } +const ( + // DefaultJobGCThreshold is the default threshold for garbage collecting + // eligible jobs. + DefaultJobGCThreshold = 4 * time.Hour +) + +// JobGCConfig configures the garbage collection policy of a job. +type JobGCConfig struct { + // Enabled determines whether the job is eligible for garbage collection. + Enabled bool + + // Threshold is how old a job must be before it eligible for GC. This gives + // the user time to inspect the job. + Threshold time.Duration +} + +// Init sets the Threshold time to its default value if it is un-specified but +// garbage collection is enabled. +func (gc *JobGCConfig) Init() { + if gc.Enabled && gc.Threshold == 0 { + gc.Threshold = DefaultJobGCThreshold + } +} + +func (gc *JobGCConfig) Validate() error { + if gc.Threshold < 0 { + return fmt.Errorf("job GC threshold must be positive: %v", gc.Threshold) + } + return nil +} + // UpdateStrategy is used to modify how updates are done type UpdateStrategy struct { // Stagger is the amount of time between the updates @@ -1470,14 +1524,21 @@ type Allocation struct { ModifyIndex uint64 } -// TerminalStatus returns if the desired status is terminal and -// will no longer transition. This is not based on the current client status. +// TerminalStatus returns if the desired or actual status is terminal and +// will no longer transition. func (a *Allocation) TerminalStatus() bool { switch a.DesiredStatus { case AllocDesiredStatusStop, AllocDesiredStatusEvict, AllocDesiredStatusFailed: return true default: - return false + // If all tasks are dead, the alloc is terminal. + for _, state := range a.TaskStates { + if state.State != TaskStateDead { + return false + } + } + + return true } } @@ -1656,6 +1717,12 @@ const ( // We periodically scan nodes in a terminal state, and if they have no // corresponding allocations we delete these out of the system. CoreJobNodeGC = "node-gc" + + // CoreJobJobGC is used for the garbage collection of eligible jobs. We + // periodically scan garbage collectible jobs and check if both their + // evaluations and allocations are terminal. If so, we delete these out of + // the system. + CoreJobJobGC = "job-gc" ) // Evaluation is used anytime we need to apply business logic as a result diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 914b54b06..1f5730286 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -61,6 +61,9 @@ func TestEvalContext_ProposedAlloc(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -72,6 +75,9 @@ func TestEvalContext_ProposedAlloc(t *testing.T) { MemoryMB: 1024, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 605902ed4..68716ea76 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -203,6 +203,9 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -214,6 +217,9 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { MemoryMB: 1024, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) @@ -277,6 +283,9 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -288,6 +297,9 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { MemoryMB: 1024, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) @@ -317,7 +329,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { t.Fatalf("Bad: %v", out[0]) } if out[1].Score != 18 { - t.Fatalf("Bad: %v", out[0]) + t.Fatalf("Bad: %v", out[1]) } } diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md index 99aefe2b4..8b8d0ef1c 100644 --- a/website/source/docs/jobspec/index.html.md +++ b/website/source/docs/jobspec/index.html.md @@ -156,6 +156,29 @@ The `job` object supports the following keys: and "h" suffix can be used, such as "30s". Both values default to zero, which disables rolling updates. +* `gc` - Specifies the job's garbage collection configuration. This allows jobs + to be garbage collected when all their evaluations and allocations are + terminal. The `gc` block has the following format: + + ``` + "gc" { + // Enabled is set to true by default if the "gc" block is included. + enabled = true + + // Threshold is a duration that configures how old a job must be + // before it is garbage collected. + threshold = "4h" + } + ``` + + * `enabled`: Toggles the eligibility of a job for garbage collection. + + * `threshold`: `threshold` is a string that should be parseable as a + [time.Duration](https://golang.org/pkg/time/#ParseDuration). A job will + only be garbage collected after the job, its evaluations and allocations + are all older than the threshold. + + ### Task Group The `group` object supports the following keys: