GC and some fixes

This commit is contained in:
Alex Dadgar
2017-04-15 16:47:19 -07:00
parent 950171f094
commit 5e1e7afc62
9 changed files with 399 additions and 29 deletions

View File

@@ -147,13 +147,17 @@ func (c *StatusCommand) Run(args []string) int {
}
if periodic && !parameterized {
location, err := job.Periodic.GetLocation()
if err == nil {
now := time.Now().In(location)
next := job.Periodic.Next(now)
basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s",
fmt.Sprintf("%s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second))))
if *job.Stop {
basic = append(basic, fmt.Sprintf("Next Periodic Launch|none (job stopped)"))
} else {
location, err := job.Periodic.GetLocation()
if err == nil {
now := time.Now().In(location)
next := job.Periodic.Next(now)
basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s",
fmt.Sprintf("%s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second))))
}
}
}

View File

@@ -149,6 +149,7 @@ OUTER:
for _, job := range gcJob {
req := structs.JobDeregisterRequest{
JobID: job,
Purge: true,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
@@ -243,9 +244,24 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return false, nil, err
}
// Can collect if:
// Job doesn't exist
// Job is Stopped and dead
// allowBatch and the job is dead
collect := false
if job == nil {
collect = true
} else if job.Status != structs.JobStatusDead {
collect = false
} else if job.Stop {
collect = true
} else if allowBatch {
collect = true
}
// We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch
if job != nil && (!allowBatch || job.Status != structs.JobStatusDead) {
if !collect {
return false, nil, nil
}
}

View File

@@ -1006,6 +1006,103 @@ func TestCoreScheduler_JobGC_OneShot(t *testing.T) {
}
}
// This test ensures that stopped jobs are GCd
func TestCoreScheduler_JobGC_Stopped(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert job.
state := s1.fsm.State()
job := mock.Job()
//job.Status = structs.JobStatusDead
job.Stop = true
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert two complete evals
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
eval2 := mock.Eval()
eval2.JobID = job.ID
eval2.Status = structs.EvalStatusComplete
err = state.UpsertEvals(1001, []*structs.Evaluation{eval, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert one complete alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Shouldn't still exist
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
outE, err := state.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE != nil {
t.Fatalf("bad: %v", outE)
}
outE2, err := state.EvalByID(ws, eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE2 != nil {
t.Fatalf("bad: %v", outE2)
}
outA, err := state.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
}
func TestCoreScheduler_JobGC_Force(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@@ -1066,8 +1163,8 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
}
}
// This test ensures parameterized and periodic jobs don't get GCd
func TestCoreScheduler_JobGC_NonGCable(t *testing.T) {
// This test ensures parameterized jobs only get gc'd when stopped
func TestCoreScheduler_JobGC_Parameterized(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
@@ -1088,12 +1185,6 @@ func TestCoreScheduler_JobGC_NonGCable(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Insert a periodic job.
job2 := mock.PeriodicJob()
if err := state.UpsertJob(1001, job2); err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
@@ -1118,12 +1209,109 @@ func TestCoreScheduler_JobGC_NonGCable(t *testing.T) {
t.Fatalf("bad: %v", out)
}
outE, err := state.JobByID(ws, job2.ID)
// Mark the job as stopped and try again
job2 := job.Copy()
job2.Stop = true
err = state.UpsertJob(2000, job2)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE == nil {
t.Fatalf("bad: %v", outE)
// Create a core scheduler
snap, err = state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core = NewCoreScheduler(s1, snap)
// Attempt the GC
gc = s1.coreJobEval(structs.CoreJobForceGC, 2002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should not exist
out, err = state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %+v", out)
}
}
// This test ensures periodic jobs don't get GCd til they are stopped
func TestCoreScheduler_JobGC_Periodic(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert a parameterized job.
state := s1.fsm.State()
job := mock.PeriodicJob()
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should still exist
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
// Mark the job as stopped and try again
job2 := job.Copy()
job2.Stop = true
err = state.UpsertJob(2000, job2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err = state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core = NewCoreScheduler(s1, snap)
// Attempt the GC
gc = s1.coreJobEval(structs.CoreJobForceGC, 2002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should not exist
out, err = state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %+v", out)
}
}

View File

@@ -846,6 +846,10 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
return fmt.Errorf("Specified job %q is not a parameterized job", args.JobID)
}
if parameterizedJob.Stop {
return fmt.Errorf("Specified job %q is stopped", args.JobID)
}
// Validate the arguments
if err := validateDispatchRequest(args, parameterizedJob); err != nil {
return err

View File

@@ -854,9 +854,10 @@ func TestJobEndpoint_Deregister(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Deregister
// Deregister but don't purge
dereg := &structs.JobDeregisterRequest{
JobID: job.ID,
Purge: false,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.JobDeregisterResponse
@@ -867,15 +868,18 @@ func TestJobEndpoint_Deregister(t *testing.T) {
t.Fatalf("bad index: %d", resp2.Index)
}
// Check for the node in the FSM
// Check for the job in the FSM
ws := memdb.NewWatchSet()
state := s1.fsm.State()
out, err := state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("unexpected job")
if out == nil {
t.Fatalf("job purged")
}
if !out.Stop {
t.Fatalf("job not stopped")
}
// Lookup the evaluation
@@ -908,6 +912,60 @@ func TestJobEndpoint_Deregister(t *testing.T) {
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
// Deregister and purge
dereg2 := &structs.JobDeregisterRequest{
JobID: job.ID,
Purge: true,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp3 structs.JobDeregisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg2, &resp3); err != nil {
t.Fatalf("err: %v", err)
}
if resp3.Index == 0 {
t.Fatalf("bad index: %d", resp3.Index)
}
// Check for the job in the FSM
out, err = state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("unexpected job")
}
// Lookup the evaluation
eval, err = state.EvalByID(ws, resp3.EvalID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eval == nil {
t.Fatalf("expected eval")
}
if eval.CreateIndex != resp3.EvalCreateIndex {
t.Fatalf("index mis-match")
}
if eval.Priority != structs.JobDefaultPriority {
t.Fatalf("bad: %#v", eval)
}
if eval.Type != structs.JobTypeService {
t.Fatalf("bad: %#v", eval)
}
if eval.TriggeredBy != structs.EvalTriggerJobDeregister {
t.Fatalf("bad: %#v", eval)
}
if eval.JobID != job.ID {
t.Fatalf("bad: %#v", eval)
}
if eval.JobModifyIndex != resp3.JobModifyIndex {
t.Fatalf("bad: %#v", eval)
}
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
}
func TestJobEndpoint_Deregister_NonExistent(t *testing.T) {
@@ -990,6 +1048,7 @@ func TestJobEndpoint_Deregister_Periodic(t *testing.T) {
// Deregister
dereg := &structs.JobDeregisterRequest{
JobID: job.ID,
Purge: true,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.JobDeregisterResponse
@@ -1042,6 +1101,7 @@ func TestJobEndpoint_Deregister_ParameterizedJob(t *testing.T) {
// Deregister
dereg := &structs.JobDeregisterRequest{
JobID: job.ID,
Purge: true,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.JobDeregisterResponse
@@ -2089,6 +2149,11 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
d6 := mock.PeriodicJob()
d6.ParameterizedJob = &structs.ParameterizedJobConfig{}
d7 := mock.Job()
d7.Type = structs.JobTypeBatch
d7.ParameterizedJob = &structs.ParameterizedJobConfig{}
d7.Stop = true
reqNoInputNoMeta := &structs.JobDispatchRequest{}
reqInputDataNoMeta := &structs.JobDispatchRequest{
Payload: []byte("hello world"),
@@ -2210,6 +2275,13 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
dispatchReq: reqNoInputNoMeta,
noEval: true,
},
{
name: "periodic job stopped, ensure error",
parameterizedJob: d7,
dispatchReq: reqNoInputNoMeta,
err: true,
errStr: "stopped",
},
}
for _, tc := range cases {

View File

@@ -181,11 +181,31 @@ func jobIsGCable(obj interface{}) (bool, error) {
return false, fmt.Errorf("Unexpected type: %v", obj)
}
// The job is GCable if it is batch, it is not periodic and is not a
// parameterized job.
// If the job is periodic or parameterized it is only garbage collectable if
// it is stopped.
periodic := j.Periodic != nil && j.Periodic.Enabled
gcable := j.Type == structs.JobTypeBatch && !periodic && !j.IsParameterized()
return gcable, nil
parameterized := j.IsParameterized()
if periodic || parameterized {
return j.Stop, nil
}
// If the job isn't dead it isn't eligible
if j.Status != structs.JobStatusDead {
return false, nil
}
// Any job that is stopped is eligible for garbage collection
if j.Stop {
return true, nil
}
// Otherwise, only batch jobs are eligible because they complete on their
// own without a user stopping them.
if j.Type != structs.JobTypeBatch {
return false, nil
}
return true, nil
}
// jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index

View File

@@ -1787,6 +1787,11 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
// job is periodic or is a parameterized job, we mark it as running as
// it will never have an allocation/evaluation against it.
if job.IsPeriodic() || job.IsParameterized() {
// If the job is stopped mark it as dead
if job.Stop {
return structs.JobStatusDead, nil
}
return structs.JobStatusRunning, nil
}
return structs.JobStatusPending, nil

View File

@@ -3760,6 +3760,59 @@ func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) {
}
}
func TestStateStore_GetJobStatus_PeriodicJob(t *testing.T) {
state := testStateStore(t)
job := mock.PeriodicJob()
txn := state.db.Txn(false)
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
}
if status != structs.JobStatusRunning {
t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning)
}
// Mark it as stopped
job.Stop = true
status, err = state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
}
if status != structs.JobStatusDead {
t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead)
}
}
func TestStateStore_GetJobStatus_ParameterizedJob(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
txn := state.db.Txn(false)
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
}
if status != structs.JobStatusRunning {
t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning)
}
// Mark it as stopped
job.Stop = true
status, err = state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
}
if status != structs.JobStatusDead {
t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead)
}
}
func TestStateStore_SetJobStatus_PendingEval(t *testing.T) {
state := testStateStore(t)
job := mock.Job()

View File

@@ -22,8 +22,16 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Stop = true
if err := state.UpsertJob(1000, job); err != nil {
t.Fatalf("UpsertAllocs() failed: %v", err)
t.Fatalf("UpsertJob() failed: %v", err)
}
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.JobID = job.ID
if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("UpsertEvals() failed: %v", err)
}
// Make the GC request
@@ -45,7 +53,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
return false, err
}
if exist != nil {
return false, fmt.Errorf("job %q wasn't garbage collected", job.ID)
return false, fmt.Errorf("job %+v wasn't garbage collected", job)
}
return true, nil
}, func(err error) {