From 5e1e7afc62e5be3a9de6117d76de362f1bcd22a0 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 15 Apr 2017 16:47:19 -0700 Subject: [PATCH] GC and some fixes --- command/status.go | 18 +-- nomad/core_sched.go | 18 ++- nomad/core_sched_test.go | 210 ++++++++++++++++++++++++++++++-- nomad/job_endpoint.go | 4 + nomad/job_endpoint_test.go | 80 +++++++++++- nomad/state/schema.go | 28 ++++- nomad/state/state_store.go | 5 + nomad/state/state_store_test.go | 53 ++++++++ nomad/system_endpoint_test.go | 12 +- 9 files changed, 399 insertions(+), 29 deletions(-) diff --git a/command/status.go b/command/status.go index 7b3e42976..e4a529230 100644 --- a/command/status.go +++ b/command/status.go @@ -147,13 +147,17 @@ func (c *StatusCommand) Run(args []string) int { } if periodic && !parameterized { - location, err := job.Periodic.GetLocation() - if err == nil { - now := time.Now().In(location) - next := job.Periodic.Next(now) - basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s", - fmt.Sprintf("%s (%s from now)", - formatTime(next), formatTimeDifference(now, next, time.Second)))) + if *job.Stop { + basic = append(basic, fmt.Sprintf("Next Periodic Launch|none (job stopped)")) + } else { + location, err := job.Periodic.GetLocation() + if err == nil { + now := time.Now().In(location) + next := job.Periodic.Next(now) + basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s", + fmt.Sprintf("%s (%s from now)", + formatTime(next), formatTimeDifference(now, next, time.Second)))) + } } } diff --git a/nomad/core_sched.go b/nomad/core_sched.go index a191effdd..83f8bd10e 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -149,6 +149,7 @@ OUTER: for _, job := range gcJob { req := structs.JobDeregisterRequest{ JobID: job, + Purge: true, WriteRequest: structs.WriteRequest{ Region: c.srv.config.Region, }, @@ -243,9 +244,24 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, return false, nil, err } + // Can collect if: + // Job doesn't exist + // Job is Stopped and dead + // allowBatch and the job is dead + collect := false + if job == nil { + collect = true + } else if job.Status != structs.JobStatusDead { + collect = false + } else if job.Stop { + collect = true + } else if allowBatch { + collect = true + } + // We don't want to gc anything related to a job which is not dead // If the batch job doesn't exist we can GC it regardless of allowBatch - if job != nil && (!allowBatch || job.Status != structs.JobStatusDead) { + if !collect { return false, nil, nil } } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index ce1e39cd3..a10a4c16d 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -1006,6 +1006,103 @@ func TestCoreScheduler_JobGC_OneShot(t *testing.T) { } } +// This test ensures that stopped jobs are GCd +func TestCoreScheduler_JobGC_Stopped(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 + s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) + + // Insert job. + state := s1.fsm.State() + job := mock.Job() + //job.Status = structs.JobStatusDead + job.Stop = true + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert two complete evals + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusComplete + + eval2 := mock.Eval() + eval2.JobID = job.ID + eval2.Status = structs.EvalStatusComplete + + err = state.UpsertEvals(1001, []*structs.Evaluation{eval, eval2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert one complete alloc + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.EvalID = eval.ID + alloc.DesiredStatus = structs.AllocDesiredStatusStop + + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobJobGC, 2000) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Shouldn't still exist + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %v", out) + } + + outE, err := state.EvalByID(ws, eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outE != nil { + t.Fatalf("bad: %v", outE) + } + + outE2, err := state.EvalByID(ws, eval2.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outE2 != nil { + t.Fatalf("bad: %v", outE2) + } + + outA, err := state.AllocByID(ws, alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA != nil { + t.Fatalf("bad: %v", outA) + } +} + func TestCoreScheduler_JobGC_Force(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() @@ -1066,8 +1163,8 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) { } } -// This test ensures parameterized and periodic jobs don't get GCd -func TestCoreScheduler_JobGC_NonGCable(t *testing.T) { +// This test ensures parameterized jobs only get gc'd when stopped +func TestCoreScheduler_JobGC_Parameterized(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) @@ -1088,12 +1185,6 @@ func TestCoreScheduler_JobGC_NonGCable(t *testing.T) { t.Fatalf("err: %v", err) } - // Insert a periodic job. - job2 := mock.PeriodicJob() - if err := state.UpsertJob(1001, job2); err != nil { - t.Fatalf("err: %v", err) - } - // Create a core scheduler snap, err := state.Snapshot() if err != nil { @@ -1118,12 +1209,109 @@ func TestCoreScheduler_JobGC_NonGCable(t *testing.T) { t.Fatalf("bad: %v", out) } - outE, err := state.JobByID(ws, job2.ID) + // Mark the job as stopped and try again + job2 := job.Copy() + job2.Stop = true + err = state.UpsertJob(2000, job2) if err != nil { t.Fatalf("err: %v", err) } - if outE == nil { - t.Fatalf("bad: %v", outE) + + // Create a core scheduler + snap, err = state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core = NewCoreScheduler(s1, snap) + + // Attempt the GC + gc = s1.coreJobEval(structs.CoreJobForceGC, 2002) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should not exist + out, err = state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %+v", out) + } +} + +// This test ensures periodic jobs don't get GCd til they are stopped +func TestCoreScheduler_JobGC_Periodic(t *testing.T) { + + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 + s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) + + // Insert a parameterized job. + state := s1.fsm.State() + job := mock.PeriodicJob() + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobForceGC, 1002) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should still exist + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("bad: %v", out) + } + + // Mark the job as stopped and try again + job2 := job.Copy() + job2.Stop = true + err = state.UpsertJob(2000, job2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a core scheduler + snap, err = state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core = NewCoreScheduler(s1, snap) + + // Attempt the GC + gc = s1.coreJobEval(structs.CoreJobForceGC, 2002) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should not exist + out, err = state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %+v", out) } } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 1d51cb5d8..af99e1f4b 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -846,6 +846,10 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa return fmt.Errorf("Specified job %q is not a parameterized job", args.JobID) } + if parameterizedJob.Stop { + return fmt.Errorf("Specified job %q is stopped", args.JobID) + } + // Validate the arguments if err := validateDispatchRequest(args, parameterizedJob); err != nil { return err diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index f7795bdd6..ebc2d5fe4 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -854,9 +854,10 @@ func TestJobEndpoint_Deregister(t *testing.T) { t.Fatalf("err: %v", err) } - // Deregister + // Deregister but don't purge dereg := &structs.JobDeregisterRequest{ JobID: job.ID, + Purge: false, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.JobDeregisterResponse @@ -867,15 +868,18 @@ func TestJobEndpoint_Deregister(t *testing.T) { t.Fatalf("bad index: %d", resp2.Index) } - // Check for the node in the FSM + // Check for the job in the FSM ws := memdb.NewWatchSet() state := s1.fsm.State() out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } - if out != nil { - t.Fatalf("unexpected job") + if out == nil { + t.Fatalf("job purged") + } + if !out.Stop { + t.Fatalf("job not stopped") } // Lookup the evaluation @@ -908,6 +912,60 @@ func TestJobEndpoint_Deregister(t *testing.T) { if eval.Status != structs.EvalStatusPending { t.Fatalf("bad: %#v", eval) } + + // Deregister and purge + dereg2 := &structs.JobDeregisterRequest{ + JobID: job.ID, + Purge: true, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp3 structs.JobDeregisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg2, &resp3); err != nil { + t.Fatalf("err: %v", err) + } + if resp3.Index == 0 { + t.Fatalf("bad index: %d", resp3.Index) + } + + // Check for the job in the FSM + out, err = state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("unexpected job") + } + + // Lookup the evaluation + eval, err = state.EvalByID(ws, resp3.EvalID) + if err != nil { + t.Fatalf("err: %v", err) + } + if eval == nil { + t.Fatalf("expected eval") + } + if eval.CreateIndex != resp3.EvalCreateIndex { + t.Fatalf("index mis-match") + } + + if eval.Priority != structs.JobDefaultPriority { + t.Fatalf("bad: %#v", eval) + } + if eval.Type != structs.JobTypeService { + t.Fatalf("bad: %#v", eval) + } + if eval.TriggeredBy != structs.EvalTriggerJobDeregister { + t.Fatalf("bad: %#v", eval) + } + if eval.JobID != job.ID { + t.Fatalf("bad: %#v", eval) + } + if eval.JobModifyIndex != resp3.JobModifyIndex { + t.Fatalf("bad: %#v", eval) + } + if eval.Status != structs.EvalStatusPending { + t.Fatalf("bad: %#v", eval) + } } func TestJobEndpoint_Deregister_NonExistent(t *testing.T) { @@ -990,6 +1048,7 @@ func TestJobEndpoint_Deregister_Periodic(t *testing.T) { // Deregister dereg := &structs.JobDeregisterRequest{ JobID: job.ID, + Purge: true, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.JobDeregisterResponse @@ -1042,6 +1101,7 @@ func TestJobEndpoint_Deregister_ParameterizedJob(t *testing.T) { // Deregister dereg := &structs.JobDeregisterRequest{ JobID: job.ID, + Purge: true, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.JobDeregisterResponse @@ -2089,6 +2149,11 @@ func TestJobEndpoint_Dispatch(t *testing.T) { d6 := mock.PeriodicJob() d6.ParameterizedJob = &structs.ParameterizedJobConfig{} + d7 := mock.Job() + d7.Type = structs.JobTypeBatch + d7.ParameterizedJob = &structs.ParameterizedJobConfig{} + d7.Stop = true + reqNoInputNoMeta := &structs.JobDispatchRequest{} reqInputDataNoMeta := &structs.JobDispatchRequest{ Payload: []byte("hello world"), @@ -2210,6 +2275,13 @@ func TestJobEndpoint_Dispatch(t *testing.T) { dispatchReq: reqNoInputNoMeta, noEval: true, }, + { + name: "periodic job stopped, ensure error", + parameterizedJob: d7, + dispatchReq: reqNoInputNoMeta, + err: true, + errStr: "stopped", + }, } for _, tc := range cases { diff --git a/nomad/state/schema.go b/nomad/state/schema.go index d31af34b2..2ecd1e88d 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -181,11 +181,31 @@ func jobIsGCable(obj interface{}) (bool, error) { return false, fmt.Errorf("Unexpected type: %v", obj) } - // The job is GCable if it is batch, it is not periodic and is not a - // parameterized job. + // If the job is periodic or parameterized it is only garbage collectable if + // it is stopped. periodic := j.Periodic != nil && j.Periodic.Enabled - gcable := j.Type == structs.JobTypeBatch && !periodic && !j.IsParameterized() - return gcable, nil + parameterized := j.IsParameterized() + if periodic || parameterized { + return j.Stop, nil + } + + // If the job isn't dead it isn't eligible + if j.Status != structs.JobStatusDead { + return false, nil + } + + // Any job that is stopped is eligible for garbage collection + if j.Stop { + return true, nil + } + + // Otherwise, only batch jobs are eligible because they complete on their + // own without a user stopping them. + if j.Type != structs.JobTypeBatch { + return false, nil + } + + return true, nil } // jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index b6770d154..314613f36 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1787,6 +1787,11 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b // job is periodic or is a parameterized job, we mark it as running as // it will never have an allocation/evaluation against it. if job.IsPeriodic() || job.IsParameterized() { + // If the job is stopped mark it as dead + if job.Stop { + return structs.JobStatusDead, nil + } + return structs.JobStatusRunning, nil } return structs.JobStatusPending, nil diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f92f336e3..9e0a5b51e 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3760,6 +3760,59 @@ func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) { } } +func TestStateStore_GetJobStatus_PeriodicJob(t *testing.T) { + state := testStateStore(t) + job := mock.PeriodicJob() + + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusRunning { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) + } + + // Mark it as stopped + job.Stop = true + status, err = state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusDead { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) + } +} + +func TestStateStore_GetJobStatus_ParameterizedJob(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + job.ParameterizedJob = &structs.ParameterizedJobConfig{} + + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusRunning { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) + } + + // Mark it as stopped + job.Stop = true + status, err = state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusDead { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) + } +} + func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { state := testStateStore(t) job := mock.Job() diff --git a/nomad/system_endpoint_test.go b/nomad/system_endpoint_test.go index 14dada680..ffad325eb 100644 --- a/nomad/system_endpoint_test.go +++ b/nomad/system_endpoint_test.go @@ -22,8 +22,16 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { state := s1.fsm.State() job := mock.Job() job.Type = structs.JobTypeBatch + job.Stop = true if err := state.UpsertJob(1000, job); err != nil { - t.Fatalf("UpsertAllocs() failed: %v", err) + t.Fatalf("UpsertJob() failed: %v", err) + } + + eval := mock.Eval() + eval.Status = structs.EvalStatusComplete + eval.JobID = job.ID + if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("UpsertEvals() failed: %v", err) } // Make the GC request @@ -45,7 +53,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { return false, err } if exist != nil { - return false, fmt.Errorf("job %q wasn't garbage collected", job.ID) + return false, fmt.Errorf("job %+v wasn't garbage collected", job) } return true, nil }, func(err error) {