mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Add garbage collection to jobs
This commit is contained in:
@@ -107,6 +107,12 @@ type UpdateStrategy struct {
|
||||
MaxParallel int
|
||||
}
|
||||
|
||||
// JobGCConfig configures the garbage collection policy of a job.
|
||||
type JobGCConfig struct {
|
||||
Enabled bool
|
||||
Threshold time.Duration
|
||||
}
|
||||
|
||||
// Job is used to serialize a job.
|
||||
type Job struct {
|
||||
Region string
|
||||
@@ -119,6 +125,7 @@ type Job struct {
|
||||
Constraints []*Constraint
|
||||
TaskGroups []*TaskGroup
|
||||
Update *UpdateStrategy
|
||||
GC *JobGCConfig
|
||||
Meta map[string]string
|
||||
Status string
|
||||
StatusDescription string
|
||||
|
||||
@@ -91,6 +91,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
|
||||
delete(m, "meta")
|
||||
delete(m, "update")
|
||||
delete(m, "periodic")
|
||||
delete(m, "gc")
|
||||
|
||||
// Set the ID and name to the object key
|
||||
result.ID = obj.Keys[0].Token.Value().(string)
|
||||
@@ -135,6 +136,13 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
|
||||
}
|
||||
}
|
||||
|
||||
// If we have a gc config, then parse that
|
||||
if o := listVal.Filter("gc"); len(o.Items) > 0 {
|
||||
if err := parseGC(&result.GC, o); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Parse out meta fields. These are in HCL as a list so we need
|
||||
// to iterate over them and merge them.
|
||||
if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 {
|
||||
@@ -714,3 +722,39 @@ func parsePeriodic(result **structs.PeriodicConfig, list *ast.ObjectList) error
|
||||
*result = &p
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseGC(result **structs.JobGCConfig, list *ast.ObjectList) error {
|
||||
list = list.Elem()
|
||||
if len(list.Items) > 1 {
|
||||
return fmt.Errorf("only one 'gc' block allowed per job")
|
||||
}
|
||||
|
||||
// Get our resource object
|
||||
o := list.Items[0]
|
||||
|
||||
var m map[string]interface{}
|
||||
if err := hcl.DecodeObject(&m, o.Val); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Enabled by default if the gc block exists.
|
||||
if value, ok := m["enabled"]; !ok {
|
||||
m["Enabled"] = true
|
||||
} else {
|
||||
enabled, err := parseBool(value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("gc.enabled should be set to true or false; %v", err)
|
||||
}
|
||||
m["Enabled"] = enabled
|
||||
}
|
||||
|
||||
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
|
||||
WeaklyTypedInput: true,
|
||||
Result: result,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return dec.Decode(m)
|
||||
}
|
||||
|
||||
@@ -246,6 +246,22 @@ func TestParse(t *testing.T) {
|
||||
false,
|
||||
},
|
||||
|
||||
{
|
||||
"gc.hcl",
|
||||
&structs.Job{
|
||||
ID: "foo",
|
||||
Name: "foo",
|
||||
Priority: 50,
|
||||
Region: "global",
|
||||
Type: "service",
|
||||
GC: &structs.JobGCConfig{
|
||||
Enabled: true,
|
||||
Threshold: 2 * time.Hour,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
|
||||
{
|
||||
"specify-job.hcl",
|
||||
&structs.Job{
|
||||
|
||||
5
jobspec/test-fixtures/gc.hcl
Normal file
5
jobspec/test-fixtures/gc.hcl
Normal file
@@ -0,0 +1,5 @@
|
||||
job "foo" {
|
||||
gc {
|
||||
threshold = "2h"
|
||||
}
|
||||
}
|
||||
@@ -131,6 +131,10 @@ type Config struct {
|
||||
// for GC. This gives users some time to debug a failed evaluation.
|
||||
EvalGCThreshold time.Duration
|
||||
|
||||
// JobGCInterval is how often we dispatch a job to GC jobs that are
|
||||
// available for garbage collection.
|
||||
JobGCInterval time.Duration
|
||||
|
||||
// NodeGCInterval is how often we dispatch a job to GC failed nodes.
|
||||
NodeGCInterval time.Duration
|
||||
|
||||
@@ -202,6 +206,7 @@ func DefaultConfig() *Config {
|
||||
ReconcileInterval: 60 * time.Second,
|
||||
EvalGCInterval: 5 * time.Minute,
|
||||
EvalGCThreshold: 1 * time.Hour,
|
||||
JobGCInterval: 5 * time.Minute,
|
||||
NodeGCInterval: 5 * time.Minute,
|
||||
NodeGCThreshold: 24 * time.Hour,
|
||||
EvalNackTimeout: 60 * time.Second,
|
||||
|
||||
@@ -33,11 +33,88 @@ func (s *CoreScheduler) Process(eval *structs.Evaluation) error {
|
||||
return s.evalGC(eval)
|
||||
case structs.CoreJobNodeGC:
|
||||
return s.nodeGC(eval)
|
||||
case structs.CoreJobJobGC:
|
||||
return s.jobGC(eval)
|
||||
default:
|
||||
return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
// jobGC is used to garbage collect eligible jobs.
|
||||
func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
|
||||
// Get all the jobs eligible for garbage collection.
|
||||
jIter, err := c.snap.JobsByGC(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the time table to calculate GC cutoffs.
|
||||
tt := c.srv.fsm.TimeTable()
|
||||
|
||||
// Collect the allocations and evaluations to GC
|
||||
var gcAlloc, gcEval, gcJob []string
|
||||
|
||||
OUTER:
|
||||
for i := jIter.Next(); i != nil; i = jIter.Next() {
|
||||
job := i.(*structs.Job)
|
||||
cutoff := time.Now().UTC().Add(-1 * job.GC.Threshold)
|
||||
oldThreshold := tt.NearestIndex(cutoff)
|
||||
|
||||
// Ignore new jobs.
|
||||
if job.CreateIndex > oldThreshold {
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
evals, err := c.snap.EvalsByJob(job.ID)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] sched.core: failed to get evals for job %s: %v", job.ID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, eval := range evals {
|
||||
gc, allocs, err := c.gcEval(eval, oldThreshold)
|
||||
if err != nil || !gc {
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
gcEval = append(gcEval, eval.ID)
|
||||
gcAlloc = append(gcAlloc, allocs...)
|
||||
}
|
||||
|
||||
// Job is eligible for garbage collection
|
||||
gcJob = append(gcJob, job.ID)
|
||||
}
|
||||
|
||||
// Fast-path the nothing case
|
||||
if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 {
|
||||
return nil
|
||||
}
|
||||
c.srv.logger.Printf("[DEBUG] sched.core: job GC: %d jobs, %d evaluations, %d allocs eligible",
|
||||
len(gcJob), len(gcEval), len(gcAlloc))
|
||||
|
||||
// Reap the evals and allocs
|
||||
if err := c.evalReap(gcEval, gcAlloc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Call to the leader to deregister the jobs.
|
||||
for _, job := range gcJob {
|
||||
req := structs.JobDeregisterRequest{
|
||||
JobID: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.srv.config.Region,
|
||||
},
|
||||
}
|
||||
var resp structs.JobDeregisterResponse
|
||||
if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil {
|
||||
c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// evalGC is used to garbage collect old evaluations
|
||||
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
|
||||
// Iterate over the evaluations
|
||||
@@ -57,39 +134,16 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
|
||||
|
||||
// Collect the allocations and evaluations to GC
|
||||
var gcAlloc, gcEval []string
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
eval := raw.(*structs.Evaluation)
|
||||
|
||||
// Ignore non-terminal and new evaluations
|
||||
if !eval.TerminalStatus() || eval.ModifyIndex > oldThreshold {
|
||||
continue
|
||||
}
|
||||
|
||||
// Get the allocations by eval
|
||||
allocs, err := c.snap.AllocsByEval(eval.ID)
|
||||
gc, allocs, err := c.gcEval(eval, oldThreshold)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
|
||||
eval.ID, err)
|
||||
continue
|
||||
return err
|
||||
}
|
||||
|
||||
// Scan the allocations to ensure they are terminal and old
|
||||
for _, alloc := range allocs {
|
||||
if !alloc.TerminalStatus() || alloc.ModifyIndex > oldThreshold {
|
||||
continue OUTER
|
||||
}
|
||||
}
|
||||
|
||||
// Evaluation is eligible for garbage collection
|
||||
gcEval = append(gcEval, eval.ID)
|
||||
for _, alloc := range allocs {
|
||||
gcAlloc = append(gcAlloc, alloc.ID)
|
||||
if gc {
|
||||
gcEval = append(gcEval, eval.ID)
|
||||
gcAlloc = append(gcAlloc, allocs...)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,10 +154,52 @@ OUTER:
|
||||
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: %d evaluations, %d allocs eligible",
|
||||
len(gcEval), len(gcAlloc))
|
||||
|
||||
return c.evalReap(gcEval, gcAlloc)
|
||||
}
|
||||
|
||||
// gcEval returns whether the eval should be garbage collected given a raft
|
||||
// threshold index. The eval disqualifies for garbage collection if it or its
|
||||
// allocs are not older than the threshold. If the eval should be garbage
|
||||
// collected, the associated alloc ids that should also be removed are also
|
||||
// returned
|
||||
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) (
|
||||
bool, []string, error) {
|
||||
// Ignore non-terminal and new evaluations
|
||||
if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex {
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
// Get the allocations by eval
|
||||
allocs, err := c.snap.AllocsByEval(eval.ID)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
|
||||
eval.ID, err)
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
// Scan the allocations to ensure they are terminal and old
|
||||
for _, alloc := range allocs {
|
||||
if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex {
|
||||
return false, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
allocIds := make([]string, len(allocs))
|
||||
for i, alloc := range allocs {
|
||||
allocIds[i] = alloc.ID
|
||||
}
|
||||
|
||||
// Evaluation is eligible for garbage collection
|
||||
return true, allocIds, nil
|
||||
}
|
||||
|
||||
// evalReap contacts the leader and issues a reap on the passed evals and
|
||||
// allocs.
|
||||
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
|
||||
// Call to the leader to issue the reap
|
||||
req := structs.EvalDeleteRequest{
|
||||
Evals: gcEval,
|
||||
Allocs: gcAlloc,
|
||||
Evals: evals,
|
||||
Allocs: allocs,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.srv.config.Region,
|
||||
},
|
||||
@@ -113,6 +209,7 @@ OUTER:
|
||||
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -111,3 +111,111 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoreScheduler_JobGC(t *testing.T) {
|
||||
tests := []struct {
|
||||
test, evalStatus, allocStatus string
|
||||
shouldExist bool
|
||||
}{
|
||||
{
|
||||
test: "Terminal",
|
||||
evalStatus: structs.EvalStatusFailed,
|
||||
allocStatus: structs.AllocDesiredStatusFailed,
|
||||
shouldExist: false,
|
||||
},
|
||||
{
|
||||
test: "Has Alloc",
|
||||
evalStatus: structs.EvalStatusFailed,
|
||||
allocStatus: structs.AllocDesiredStatusRun,
|
||||
shouldExist: true,
|
||||
},
|
||||
{
|
||||
test: "Has Eval",
|
||||
evalStatus: structs.EvalStatusPending,
|
||||
allocStatus: structs.AllocDesiredStatusFailed,
|
||||
shouldExist: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Insert job.
|
||||
state := s1.fsm.State()
|
||||
job := mock.Job()
|
||||
threshold := 1 * time.Hour
|
||||
job.GC = &structs.JobGCConfig{
|
||||
Enabled: true,
|
||||
Threshold: threshold,
|
||||
}
|
||||
err := state.UpsertJob(1000, job)
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", test.test, err)
|
||||
}
|
||||
|
||||
// Insert eval
|
||||
eval := mock.Eval()
|
||||
eval.JobID = job.ID
|
||||
eval.Status = test.evalStatus
|
||||
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", test.test, err)
|
||||
}
|
||||
|
||||
// Insert alloc
|
||||
alloc := mock.Alloc()
|
||||
alloc.JobID = job.ID
|
||||
alloc.EvalID = eval.ID
|
||||
alloc.DesiredStatus = test.allocStatus
|
||||
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", test.test, err)
|
||||
}
|
||||
|
||||
// Update the time tables to make this work
|
||||
tt := s1.fsm.TimeTable()
|
||||
tt.Witness(2000, time.Now().UTC().Add(-1*threshold))
|
||||
|
||||
// Create a core scheduler
|
||||
snap, err := state.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", test.test, err)
|
||||
}
|
||||
core := NewCoreScheduler(s1, snap)
|
||||
|
||||
// Attempt the GC
|
||||
gc := s1.coreJobEval(structs.CoreJobJobGC)
|
||||
gc.ModifyIndex = 2000
|
||||
err = core.Process(gc)
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", test.test, err)
|
||||
}
|
||||
|
||||
// Should still exist
|
||||
out, err := state.JobByID(job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", err)
|
||||
}
|
||||
if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) {
|
||||
t.Fatalf("test(%s) bad: %v", test.test, out)
|
||||
}
|
||||
|
||||
outE, err := state.EvalByID(eval.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", err)
|
||||
}
|
||||
if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) {
|
||||
t.Fatalf("test(%s) bad: %v", test.test, out)
|
||||
}
|
||||
|
||||
outA, err := state.AllocByID(alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", err)
|
||||
}
|
||||
if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) {
|
||||
t.Fatalf("test(%s) bad: %v", test.test, outA)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,8 +29,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize all the fields of services
|
||||
args.Job.InitAllServiceFields()
|
||||
// Initialize the job fields (sets defaults and any other necessary init
|
||||
// work).
|
||||
args.Job.InitFields()
|
||||
|
||||
if args.Job.Type == structs.JobTypeCore {
|
||||
return fmt.Errorf("job type cannot be core")
|
||||
|
||||
@@ -173,6 +173,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
|
||||
defer evalGC.Stop()
|
||||
nodeGC := time.NewTicker(s.config.NodeGCInterval)
|
||||
defer nodeGC.Stop()
|
||||
jobGC := time.NewTicker(s.config.JobGCInterval)
|
||||
defer jobGC.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -180,6 +182,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC))
|
||||
case <-nodeGC.C:
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC))
|
||||
case <-jobGC.C:
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC))
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// stateStoreSchema is used to return the schema for the state store
|
||||
@@ -100,10 +101,33 @@ func jobTableSchema() *memdb.TableSchema {
|
||||
Lowercase: false,
|
||||
},
|
||||
},
|
||||
"gc": &memdb.IndexSchema{
|
||||
Name: "gc",
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.ConditionalIndex{
|
||||
Conditional: jobIsGCable,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// jobIsGCable satisfies the ConditionalIndexFunc interface and creates an index
|
||||
// on whether a job is eligible for garbage collection.
|
||||
func jobIsGCable(obj interface{}) (bool, error) {
|
||||
j, ok := obj.(*structs.Job)
|
||||
if !ok {
|
||||
return false, fmt.Errorf("Unexpected type: %v", obj)
|
||||
}
|
||||
|
||||
if j.GC == nil || !j.GC.Enabled {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// evalTableSchema returns the MemDB schema for the eval table.
|
||||
// This table is used to store all the evaluations that are pending
|
||||
// or recently completed.
|
||||
|
||||
@@ -372,6 +372,18 @@ func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage
|
||||
// collection.
|
||||
func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) {
|
||||
txn := s.db.Txn(false)
|
||||
|
||||
iter, err := txn.Get("jobs", "gc", gc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// UpsertEvaluation is used to upsert an evaluation
|
||||
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
|
||||
txn := s.db.Txn(true)
|
||||
|
||||
@@ -471,6 +471,66 @@ func TestStateStore_JobsByScheduler(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_JobsByGC(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
var gc, nonGc []*structs.Job
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
job := mock.Job()
|
||||
nonGc = append(nonGc, job)
|
||||
|
||||
if err := state.UpsertJob(1000+uint64(i), job); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
job := mock.Job()
|
||||
job.GC = &structs.JobGCConfig{
|
||||
Enabled: true,
|
||||
Threshold: structs.DefaultJobGCThreshold,
|
||||
}
|
||||
gc = append(gc, job)
|
||||
|
||||
if err := state.UpsertJob(2000+uint64(i), job); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
iter, err := state.JobsByGC(true)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
var outGc []*structs.Job
|
||||
for i := iter.Next(); i != nil; i = iter.Next() {
|
||||
outGc = append(outGc, i.(*structs.Job))
|
||||
}
|
||||
|
||||
iter, err = state.JobsByGC(false)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
var outNonGc []*structs.Job
|
||||
for i := iter.Next(); i != nil; i = iter.Next() {
|
||||
outNonGc = append(outNonGc, i.(*structs.Job))
|
||||
}
|
||||
|
||||
sort.Sort(JobIDSort(gc))
|
||||
sort.Sort(JobIDSort(nonGc))
|
||||
sort.Sort(JobIDSort(outGc))
|
||||
sort.Sort(JobIDSort(outNonGc))
|
||||
|
||||
if !reflect.DeepEqual(gc, outGc) {
|
||||
t.Fatalf("bad: %#v %#v", gc, outGc)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(nonGc, outNonGc) {
|
||||
t.Fatalf("bad: %#v %#v", nonGc, outNonGc)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_RestoreJob(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
job := mock.Job()
|
||||
|
||||
@@ -22,19 +22,47 @@ func TestRemoveAllocs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterTerminalALlocs(t *testing.T) {
|
||||
func TestFilterTerminalAllocs(t *testing.T) {
|
||||
l := []*Allocation{
|
||||
&Allocation{ID: "foo", DesiredStatus: AllocDesiredStatusRun},
|
||||
&Allocation{ID: "bar", DesiredStatus: AllocDesiredStatusEvict},
|
||||
&Allocation{ID: "baz", DesiredStatus: AllocDesiredStatusStop},
|
||||
&Allocation{ID: "zip", DesiredStatus: AllocDesiredStatusRun},
|
||||
&Allocation{
|
||||
ID: "zip",
|
||||
DesiredStatus: AllocDesiredStatusRun,
|
||||
TaskStates: map[string]*TaskState{
|
||||
"a": &TaskState{State: TaskStatePending},
|
||||
},
|
||||
},
|
||||
&Allocation{
|
||||
ID: "foo",
|
||||
DesiredStatus: AllocDesiredStatusRun,
|
||||
TaskStates: map[string]*TaskState{
|
||||
"a": &TaskState{State: TaskStatePending},
|
||||
},
|
||||
},
|
||||
&Allocation{
|
||||
ID: "bam",
|
||||
DesiredStatus: AllocDesiredStatusRun,
|
||||
TaskStates: map[string]*TaskState{
|
||||
"a": &TaskState{State: TaskStatePending},
|
||||
"b": &TaskState{State: TaskStateDead},
|
||||
},
|
||||
},
|
||||
&Allocation{
|
||||
ID: "fizz",
|
||||
DesiredStatus: AllocDesiredStatusRun,
|
||||
TaskStates: map[string]*TaskState{
|
||||
"a": &TaskState{State: TaskStateDead},
|
||||
"b": &TaskState{State: TaskStateDead},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
out := FilterTerminalAllocs(l)
|
||||
if len(out) != 2 {
|
||||
if len(out) != 3 {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
if out[0].ID != "foo" && out[1].ID != "zip" {
|
||||
if out[0].ID != "zip" && out[1].ID != "foo" && out[2].ID != "bam" {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -764,6 +764,10 @@ type Job struct {
|
||||
// Periodic is used to define the interval the job is run at.
|
||||
Periodic *PeriodicConfig
|
||||
|
||||
// GC is used to mark the job as available for garbage collection after it
|
||||
// has no outstanding evaluations or allocations.
|
||||
GC *JobGCConfig
|
||||
|
||||
// Meta is used to associate arbitrary metadata with this
|
||||
// job. This is opaque to Nomad.
|
||||
Meta map[string]string
|
||||
@@ -779,6 +783,18 @@ type Job struct {
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// InitFields is used to initialize fields in the Job. This should be called
|
||||
// when registering a Job.
|
||||
func (j *Job) InitFields() {
|
||||
// Initialize the service block.
|
||||
j.InitAllServiceFields()
|
||||
|
||||
// Initalize the GC policy
|
||||
if j.GC != nil {
|
||||
j.GC.Init()
|
||||
}
|
||||
}
|
||||
|
||||
// InitAllServiceFields traverses all Task Groups and makes them
|
||||
// interpolate Job, Task group and Task names in all Service names.
|
||||
// It also generates the check names if they are not set. This method also
|
||||
@@ -854,6 +870,13 @@ func (j *Job) Validate() error {
|
||||
fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch))
|
||||
}
|
||||
|
||||
// Validate the GC config.
|
||||
if j.GC != nil {
|
||||
if err := j.GC.Validate(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
@@ -899,6 +922,37 @@ type JobListStub struct {
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
const (
|
||||
// DefaultJobGCThreshold is the default threshold for garbage collecting
|
||||
// eligible jobs.
|
||||
DefaultJobGCThreshold = 4 * time.Hour
|
||||
)
|
||||
|
||||
// JobGCConfig configures the garbage collection policy of a job.
|
||||
type JobGCConfig struct {
|
||||
// Enabled determines whether the job is eligible for garbage collection.
|
||||
Enabled bool
|
||||
|
||||
// Threshold is how old a job must be before it eligible for GC. This gives
|
||||
// the user time to inspect the job.
|
||||
Threshold time.Duration
|
||||
}
|
||||
|
||||
// Init sets the Threshold time to its default value if it is un-specified but
|
||||
// garbage collection is enabled.
|
||||
func (gc *JobGCConfig) Init() {
|
||||
if gc.Enabled && gc.Threshold == 0 {
|
||||
gc.Threshold = DefaultJobGCThreshold
|
||||
}
|
||||
}
|
||||
|
||||
func (gc *JobGCConfig) Validate() error {
|
||||
if gc.Threshold < 0 {
|
||||
return fmt.Errorf("job GC threshold must be positive: %v", gc.Threshold)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateStrategy is used to modify how updates are done
|
||||
type UpdateStrategy struct {
|
||||
// Stagger is the amount of time between the updates
|
||||
@@ -1470,14 +1524,21 @@ type Allocation struct {
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// TerminalStatus returns if the desired status is terminal and
|
||||
// will no longer transition. This is not based on the current client status.
|
||||
// TerminalStatus returns if the desired or actual status is terminal and
|
||||
// will no longer transition.
|
||||
func (a *Allocation) TerminalStatus() bool {
|
||||
switch a.DesiredStatus {
|
||||
case AllocDesiredStatusStop, AllocDesiredStatusEvict, AllocDesiredStatusFailed:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
// If all tasks are dead, the alloc is terminal.
|
||||
for _, state := range a.TaskStates {
|
||||
if state.State != TaskStateDead {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1656,6 +1717,12 @@ const (
|
||||
// We periodically scan nodes in a terminal state, and if they have no
|
||||
// corresponding allocations we delete these out of the system.
|
||||
CoreJobNodeGC = "node-gc"
|
||||
|
||||
// CoreJobJobGC is used for the garbage collection of eligible jobs. We
|
||||
// periodically scan garbage collectible jobs and check if both their
|
||||
// evaluations and allocations are terminal. If so, we delete these out of
|
||||
// the system.
|
||||
CoreJobJobGC = "job-gc"
|
||||
)
|
||||
|
||||
// Evaluation is used anytime we need to apply business logic as a result
|
||||
|
||||
@@ -61,6 +61,9 @@ func TestEvalContext_ProposedAlloc(t *testing.T) {
|
||||
MemoryMB: 2048,
|
||||
},
|
||||
DesiredStatus: structs.AllocDesiredStatusRun,
|
||||
TaskStates: map[string]*structs.TaskState{
|
||||
"foo": &structs.TaskState{State: structs.TaskStatePending},
|
||||
},
|
||||
}
|
||||
alloc2 := &structs.Allocation{
|
||||
ID: structs.GenerateUUID(),
|
||||
@@ -72,6 +75,9 @@ func TestEvalContext_ProposedAlloc(t *testing.T) {
|
||||
MemoryMB: 1024,
|
||||
},
|
||||
DesiredStatus: structs.AllocDesiredStatusRun,
|
||||
TaskStates: map[string]*structs.TaskState{
|
||||
"foo": &structs.TaskState{State: structs.TaskStatePending},
|
||||
},
|
||||
}
|
||||
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
|
||||
|
||||
|
||||
@@ -203,6 +203,9 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
|
||||
MemoryMB: 2048,
|
||||
},
|
||||
DesiredStatus: structs.AllocDesiredStatusRun,
|
||||
TaskStates: map[string]*structs.TaskState{
|
||||
"foo": &structs.TaskState{State: structs.TaskStatePending},
|
||||
},
|
||||
}
|
||||
alloc2 := &structs.Allocation{
|
||||
ID: structs.GenerateUUID(),
|
||||
@@ -214,6 +217,9 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
|
||||
MemoryMB: 1024,
|
||||
},
|
||||
DesiredStatus: structs.AllocDesiredStatusRun,
|
||||
TaskStates: map[string]*structs.TaskState{
|
||||
"foo": &structs.TaskState{State: structs.TaskStatePending},
|
||||
},
|
||||
}
|
||||
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
|
||||
|
||||
@@ -277,6 +283,9 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
|
||||
MemoryMB: 2048,
|
||||
},
|
||||
DesiredStatus: structs.AllocDesiredStatusRun,
|
||||
TaskStates: map[string]*structs.TaskState{
|
||||
"foo": &structs.TaskState{State: structs.TaskStatePending},
|
||||
},
|
||||
}
|
||||
alloc2 := &structs.Allocation{
|
||||
ID: structs.GenerateUUID(),
|
||||
@@ -288,6 +297,9 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
|
||||
MemoryMB: 1024,
|
||||
},
|
||||
DesiredStatus: structs.AllocDesiredStatusRun,
|
||||
TaskStates: map[string]*structs.TaskState{
|
||||
"foo": &structs.TaskState{State: structs.TaskStatePending},
|
||||
},
|
||||
}
|
||||
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
|
||||
|
||||
@@ -317,7 +329,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
|
||||
t.Fatalf("Bad: %v", out[0])
|
||||
}
|
||||
if out[1].Score != 18 {
|
||||
t.Fatalf("Bad: %v", out[0])
|
||||
t.Fatalf("Bad: %v", out[1])
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -156,6 +156,29 @@ The `job` object supports the following keys:
|
||||
and "h" suffix can be used, such as "30s". Both values default to zero,
|
||||
which disables rolling updates.
|
||||
|
||||
* `gc` - Specifies the job's garbage collection configuration. This allows jobs
|
||||
to be garbage collected when all their evaluations and allocations are
|
||||
terminal. The `gc` block has the following format:
|
||||
|
||||
```
|
||||
"gc" {
|
||||
// Enabled is set to true by default if the "gc" block is included.
|
||||
enabled = true
|
||||
|
||||
// Threshold is a duration that configures how old a job must be
|
||||
// before it is garbage collected.
|
||||
threshold = "4h"
|
||||
}
|
||||
```
|
||||
|
||||
* `enabled`: Toggles the eligibility of a job for garbage collection.
|
||||
|
||||
* `threshold`: `threshold` is a string that should be parseable as a
|
||||
[time.Duration](https://golang.org/pkg/time/#ParseDuration). A job will
|
||||
only be garbage collected after the job, its evaluations and allocations
|
||||
are all older than the threshold.
|
||||
|
||||
|
||||
### Task Group
|
||||
|
||||
The `group` object supports the following keys:
|
||||
|
||||
Reference in New Issue
Block a user