diff --git a/api/jobs.go b/api/jobs.go index a30fdd3f8..e2e301396 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -149,10 +149,12 @@ func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *Query return resp, qm, nil } -// Deregister is used to remove an existing job. -func (j *Jobs) Deregister(jobID string, q *WriteOptions) (string, *WriteMeta, error) { +// Deregister is used to remove an existing job. If purge is set to true, the job +// is deregistered and purged from the system versus still being queryable and +// eventually GC'ed from the system. Most callers should not specify purge. +func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) { var resp deregisterJobResponse - wm, err := j.client.delete("/v1/job/"+jobID, &resp, q) + wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", jobID, purge), &resp, q) if err != nil { return "", nil, err } @@ -290,6 +292,7 @@ type ParameterizedJobConfig struct { // Job is used to serialize a job. type Job struct { + Stop *bool Region *string ID *string ParentID *string @@ -338,6 +341,9 @@ func (j *Job) Canonicalize() { if j.Priority == nil { j.Priority = helper.IntToPtr(50) } + if j.Stop == nil { + j.Stop = helper.BoolToPtr(false) + } if j.Region == nil { j.Region = helper.StringToPtr("global") } @@ -425,6 +431,7 @@ type JobListStub struct { Name string Type string Priority int + Stop bool Status string StatusDescription string JobSummary *JobSummary diff --git a/api/jobs_test.go b/api/jobs_test.go index 5d81de261..6a4d63ea6 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -690,13 +690,12 @@ func TestJobs_Deregister(t *testing.T) { assertWriteMeta(t, wm) // Attempting delete on non-existing job returns an error - if _, _, err = jobs.Deregister("nope", nil); err != nil { + if _, _, err = jobs.Deregister("nope", false, nil); err != nil { t.Fatalf("unexpected error deregistering job: %v", err) - } - // Deleting an existing job works - evalID, wm3, err := jobs.Deregister("job1", nil) + // Do a soft deregister of an existing job + evalID, wm3, err := jobs.Deregister("job1", false, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -705,6 +704,26 @@ func TestJobs_Deregister(t *testing.T) { t.Fatalf("missing eval ID") } + // Check that the job is still queryable + out, qm1, err := jobs.Info("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm1) + if out == nil { + t.Fatalf("missing job") + } + + // Do a purge deregister of an existing job + evalID, wm4, err := jobs.Deregister("job1", true, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm4) + if evalID == "" { + t.Fatalf("missing eval ID") + } + // Check that the job is really gone result, qm, err := jobs.List(nil) if err != nil { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index ece4f2559..f58a88b38 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1,6 +1,7 @@ package agent import ( + "fmt" "net/http" "strconv" "strings" @@ -312,8 +313,20 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, jobName string) (interface{}, error) { + + purgeStr := req.URL.Query().Get("purge") + var purgeBool bool + if purgeStr != "" { + var err error + purgeBool, err = strconv.ParseBool(purgeStr) + if err != nil { + return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "purge", purgeStr, err) + } + } + args := structs.JobDeregisterRequest{ JobID: jobName, + Purge: purgeBool, } s.parseRegion(req, &args.Region) @@ -397,6 +410,7 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { job.Canonicalize() j := &structs.Job{ + Stop: *job.Stop, Region: *job.Region, ID: *job.ID, ParentID: *job.ParentID, diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 11715a398..49abdf8de 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -383,7 +383,7 @@ func TestHTTP_JobDelete(t *testing.T) { t.Fatalf("err: %v", err) } - // Make the HTTP request + // Make the HTTP request to do a soft delete req, err := http.NewRequest("DELETE", "/v1/job/"+job.ID, nil) if err != nil { t.Fatalf("err: %v", err) @@ -407,16 +407,56 @@ func TestHTTP_JobDelete(t *testing.T) { t.Fatalf("missing index") } - // Check the job is gone - getReq := structs.JobSpecificRequest{ + // Check the job is still queryable + getReq1 := structs.JobSpecificRequest{ JobID: job.ID, QueryOptions: structs.QueryOptions{Region: "global"}, } - var getResp structs.SingleJobResponse - if err := s.Agent.RPC("Job.GetJob", &getReq, &getResp); err != nil { + var getResp1 structs.SingleJobResponse + if err := s.Agent.RPC("Job.GetJob", &getReq1, &getResp1); err != nil { t.Fatalf("err: %v", err) } - if getResp.Job != nil { + if getResp1.Job == nil { + t.Fatalf("job doesn't exists") + } + if !getResp1.Job.Stop { + t.Fatalf("job should be marked as stop") + } + + // Make the HTTP request to do a purge delete + req2, err := http.NewRequest("DELETE", "/v1/job/"+job.ID+"?purge=true", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW.Flush() + + // Make the request + obj, err = s.Server.JobSpecificRequest(respW, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check the response + dereg = obj.(structs.JobDeregisterResponse) + if dereg.EvalID == "" { + t.Fatalf("bad: %v", dereg) + } + + // Check for the index + if respW.HeaderMap.Get("X-Nomad-Index") == "" { + t.Fatalf("missing index") + } + + // Check the job is gone + getReq2 := structs.JobSpecificRequest{ + JobID: job.ID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var getResp2 structs.SingleJobResponse + if err := s.Agent.RPC("Job.GetJob", &getReq2, &getResp2); err != nil { + t.Fatalf("err: %v", err) + } + if getResp2.Job != nil { t.Fatalf("job still exists") } }) @@ -751,6 +791,7 @@ func TestHTTP_JobDispatch(t *testing.T) { func TestJobs_ApiJobToStructsJob(t *testing.T) { apiJob := &api.Job{ + Stop: helper.BoolToPtr(true), Region: helper.StringToPtr("global"), ID: helper.StringToPtr("foo"), ParentID: helper.StringToPtr("lol"), @@ -922,12 +963,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { VaultToken: helper.StringToPtr("token"), Status: helper.StringToPtr("status"), StatusDescription: helper.StringToPtr("status_desc"), + Version: helper.Uint64ToPtr(10), CreateIndex: helper.Uint64ToPtr(1), ModifyIndex: helper.Uint64ToPtr(3), JobModifyIndex: helper.Uint64ToPtr(5), } expected := &structs.Job{ + Stop: true, Region: "global", ID: "foo", ParentID: "lol", @@ -1094,12 +1137,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { }, }, - VaultToken: "token", - Status: "status", - StatusDescription: "status_desc", - CreateIndex: 1, - ModifyIndex: 3, - JobModifyIndex: 5, + VaultToken: "token", } structsJob := ApiJobToStructJob(apiJob) diff --git a/command/status.go b/command/status.go index bf5680813..7b3e42976 100644 --- a/command/status.go +++ b/command/status.go @@ -141,7 +141,7 @@ func (c *StatusCommand) Run(args []string) int { fmt.Sprintf("Type|%s", *job.Type), fmt.Sprintf("Priority|%d", *job.Priority), fmt.Sprintf("Datacenters|%s", strings.Join(job.Datacenters, ",")), - fmt.Sprintf("Status|%s", *job.Status), + fmt.Sprintf("Status|%s", getStatusString(*job.Status, *job.Stop)), fmt.Sprintf("Periodic|%v", periodic), fmt.Sprintf("Parameterized|%v", parameterized), } @@ -448,7 +448,14 @@ func createStatusListOutput(jobs []*api.JobListStub) string { job.ID, job.Type, job.Priority, - job.Status) + getStatusString(job.Status, job.Stop)) } return formatList(out) } + +func getStatusString(status string, stop bool) string { + if stop { + return fmt.Sprintf("%s (stopped)", status) + } + return status +} diff --git a/command/stop.go b/command/stop.go index c60957d8d..7fa25c99a 100644 --- a/command/stop.go +++ b/command/stop.go @@ -31,6 +31,10 @@ Stop Options: screen, which can be used to examine the evaluation using the eval-status command. + -purge + Purge is used to stop the job and purge it from the system. If not set, the + job will still be queryable and will be purged by the garbage collector. + -yes Automatic yes to prompts. @@ -45,13 +49,14 @@ func (c *StopCommand) Synopsis() string { } func (c *StopCommand) Run(args []string) int { - var detach, verbose, autoYes bool + var detach, purge, verbose, autoYes bool flags := c.Meta.FlagSet("stop", FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&autoYes, "yes", false, "") + flags.BoolVar(&purge, "purge", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -132,7 +137,7 @@ func (c *StopCommand) Run(args []string) int { } // Invoke the stop - evalID, _, err := client.Jobs().Deregister(*job.ID, nil) + evalID, _, err := client.Jobs().Deregister(*job.ID, purge, nil) if err != nil { c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err)) return 1 diff --git a/nomad/fsm.go b/nomad/fsm.go index cb8e1f701..0ece2bb99 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -330,20 +330,43 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.DeleteJob(index, req.JobID); err != nil { - n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err) - return err - } - + // If it is periodic remove it from the dispatcher if err := n.periodicDispatcher.Remove(req.JobID); err != nil { n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err) return err } - // We always delete from the periodic launch table because it is possible that - // the job was updated to be non-perioidic, thus checking if it is periodic - // doesn't ensure we clean it up properly. - n.state.DeletePeriodicLaunch(index, req.JobID) + if req.Purge { + if err := n.state.DeleteJob(index, req.JobID); err != nil { + n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err) + return err + } + + // We always delete from the periodic launch table because it is possible that + // the job was updated to be non-perioidic, thus checking if it is periodic + // doesn't ensure we clean it up properly. + n.state.DeletePeriodicLaunch(index, req.JobID) + } else { + // Get the current job and mark it as stopped and re-insert it. + ws := memdb.NewWatchSet() + current, err := n.state.JobByID(ws, req.JobID) + if err != nil { + n.logger.Printf("[ERR] nomad.fsm: JobByID lookup failed: %v", err) + return err + } + + if current == nil { + return fmt.Errorf("job %q doesn't exist to be deregistered", req.JobID) + } + + stopped := current.Copy() + stopped.Stop = true + + if err := n.state.UpsertJob(index, stopped); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpsertJob failed: %v", err) + return err + } + } return nil } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 80e48faee..99e8ecb43 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -314,7 +314,7 @@ func TestFSM_RegisterJob(t *testing.T) { } } -func TestFSM_DeregisterJob(t *testing.T) { +func TestFSM_DeregisterJob_Purge(t *testing.T) { fsm := testFSM(t) job := mock.PeriodicJob() @@ -333,6 +333,7 @@ func TestFSM_DeregisterJob(t *testing.T) { req2 := structs.JobDeregisterRequest{ JobID: job.ID, + Purge: true, } buf, err = structs.Encode(structs.JobDeregisterRequestType, req2) if err != nil { @@ -369,6 +370,65 @@ func TestFSM_DeregisterJob(t *testing.T) { } } +func TestFSM_DeregisterJob_NoPurge(t *testing.T) { + fsm := testFSM(t) + + job := mock.PeriodicJob() + req := structs.JobRegisterRequest{ + Job: job, + } + buf, err := structs.Encode(structs.JobRegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + req2 := structs.JobDeregisterRequest{ + JobID: job.ID, + Purge: false, + } + buf, err = structs.Encode(structs.JobDeregisterRequestType, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are NOT registered + ws := memdb.NewWatchSet() + jobOut, err := fsm.State().JobByID(ws, req.Job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if jobOut == nil { + t.Fatalf("job not found!") + } + if !jobOut.Stop { + t.Fatalf("job not stopped found!") + } + + // Verify it was removed from the periodic runner. + if _, ok := fsm.periodicDispatcher.tracked[job.ID]; ok { + t.Fatal("job not removed from periodic runner") + } + + // Verify it was removed from the periodic launch table. + launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if launchOut == nil { + t.Fatalf("launch not found!") + } +} + func TestFSM_UpdateEval(t *testing.T) { fsm := testFSM(t) fsm.evalBroker.SetEnabled(true) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index ac0e1f426..b6770d154 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -432,6 +432,11 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return fmt.Errorf("index update failed: %v", err) } + // Delete the job versions + if err := s.deleteJobVersions(index, job, txn); err != nil { + return err + } + // Delete the job summary if _, err = txn.DeleteAll("job_summary", "id", jobID); err != nil { return fmt.Errorf("deleing job summary failed: %v", err) @@ -444,6 +449,37 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return nil } +// deleteJobVersions deletes all versions of the given job. +func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error { + iter, err := txn.Get("job_versions", "id_prefix", job.ID) + if err != nil { + return err + } + + for { + raw := iter.Next() + if raw == nil { + break + } + + // Ensure the ID is an exact match + j := raw.(*structs.Job) + if j.ID != job.ID { + continue + } + + if _, err = txn.DeleteAll("job_versions", "id", job.ID, job.Version); err != nil { + return fmt.Errorf("deleing job versions failed: %v", err) + } + } + + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + return nil +} + // upsertJobVersion inserts a job into its historic version table and limits the // number of job versions that are tracked. func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 385957ed1..f92f336e3 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -797,6 +797,30 @@ func TestStateStore_DeleteJob_Job(t *testing.T) { t.Fatalf("expected summary to be nil, but got: %v", summary) } + index, err = state.Index("job_summary") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1001 { + t.Fatalf("bad: %d", index) + } + + versions, err := state.JobVersionsByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(versions) != 0 { + t.Fatalf("expected no job versions") + } + + index, err = state.Index("job_summary") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1001 { + t.Fatalf("bad: %d", index) + } + if watchFired(ws) { t.Fatalf("bad") } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5ac6d7632..98c559a47 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -239,6 +239,12 @@ type JobRegisterRequest struct { // to deregister a job as being a schedulable entity. type JobDeregisterRequest struct { JobID string + + // Purge controls whether the deregister purges the job from the system or + // whether the job is just marked as stopped and will be removed by the + // garbage collector + Purge bool + WriteRequest } @@ -1114,6 +1120,12 @@ const ( // is further composed of tasks. A task group (TG) is the unit of scheduling // however. type Job struct { + // Stop marks whether the user has stopped the job. A stopped job will + // have all created allocations stopped and acts as a way to stop a job + // without purging it from the system. This allows existing allocs to be + // queried and the job to be inspected as it is being killed. + Stop bool + // Region is the Nomad region that handles scheduling this job Region string @@ -1384,6 +1396,7 @@ func (j *Job) Stub(summary *JobSummary) *JobListStub { Name: j.Name, Type: j.Type, Priority: j.Priority, + Stop: j.Stop, Status: j.Status, StatusDescription: j.StatusDescription, CreateIndex: j.CreateIndex, @@ -1483,6 +1496,7 @@ type JobListStub struct { Name string Type string Priority int + Stop bool Status string StatusDescription string JobSummary *JobSummary diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 5653d9537..d2df0344e 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -179,6 +179,12 @@ func (s *GenericScheduler) createBlockedEval(planFailure bool) error { return s.planner.CreateEval(s.blocked) } +// isStoppedJob returns if the scheduling is for a stopped job and the scheduler +// should stop all its allocations. +func (s *GenericScheduler) isStoppedJob() bool { + return s.job == nil || s.job.Stop +} + // process is wrapped in retryMax to iteratively run the handler until we have no // further work or we've made the maximum number of attempts. func (s *GenericScheduler) process() (bool, error) { @@ -191,7 +197,7 @@ func (s *GenericScheduler) process() (bool, error) { s.eval.JobID, err) } numTaskGroups := 0 - if s.job != nil { + if !s.isStoppedJob() { numTaskGroups = len(s.job.TaskGroups) } s.queuedAllocs = make(map[string]int, numTaskGroups) @@ -207,7 +213,7 @@ func (s *GenericScheduler) process() (bool, error) { // Construct the placement stack s.stack = NewGenericStack(s.batch, s.ctx) - if s.job != nil { + if !s.isStoppedJob() { s.stack.SetJob(s.job) } @@ -351,7 +357,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([ func (s *GenericScheduler) computeJobAllocs() error { // Materialize all the task groups, job could be missing if deregistered var groups map[string]*structs.TaskGroup - if s.job != nil { + if !s.isStoppedJob() { groups = materializeTaskGroups(s.job) } @@ -398,7 +404,7 @@ func (s *GenericScheduler) computeJobAllocs() error { // Check if a rolling upgrade strategy is being used limit := len(diff.update) + len(diff.migrate) + len(diff.lost) - if s.job != nil && s.job.Update.Rolling() { + if !s.isStoppedJob() && s.job.Update.Rolling() { limit = s.job.Update.MaxParallel } @@ -414,7 +420,7 @@ func (s *GenericScheduler) computeJobAllocs() error { // Nothing remaining to do if placement is not required if len(diff.place) == 0 { - if s.job != nil { + if !s.isStoppedJob() { for _, tg := range s.job.TaskGroups { s.queuedAllocs[tg.Name] = 0 } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 832e17627..afb0e48d4 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1671,7 +1671,7 @@ func TestServiceSched_JobModify_DistinctProperty(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestServiceSched_JobDeregister(t *testing.T) { +func TestServiceSched_JobDeregister_Purged(t *testing.T) { h := NewHarness(t) // Generate a fake job with allocations @@ -1735,6 +1735,72 @@ func TestServiceSched_JobDeregister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobDeregister_Stopped(t *testing.T) { + h := NewHarness(t) + + // Generate a fake job with allocations + job := mock.Job() + job.Stop = true + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + allocs = append(allocs, alloc) + } + for _, alloc := range allocs { + h.State.UpsertJobSummary(h.NextIndex(), mock.JobSummary(alloc.JobID)) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted all nodes + if len(plan.NodeUpdate["12345678-abcd-efab-cdef-123456789abc"]) != len(allocs) { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure that the job field on the allocation is still populated + for _, alloc := range out { + if alloc.Job == nil { + t.Fatalf("bad: %#v", alloc) + } + } + + // Ensure no remaining allocations + out, _ = structs.FilterTerminalAllocs(out) + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_NodeDown(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 755153d9c..b95e5b5fe 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -83,6 +83,12 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { s.queuedAllocs) } +// isStoppedJob returns if the scheduling is for a stopped job and the scheduler +// should stop all its allocations. +func (s *SystemScheduler) isStoppedJob() bool { + return s.job == nil || s.job.Stop +} + // process is wrapped in retryMax to iteratively run the handler until we have no // further work or we've made the maximum number of attempts. func (s *SystemScheduler) process() (bool, error) { @@ -95,13 +101,13 @@ func (s *SystemScheduler) process() (bool, error) { s.eval.JobID, err) } numTaskGroups := 0 - if s.job != nil { + if !s.isStoppedJob() { numTaskGroups = len(s.job.TaskGroups) } s.queuedAllocs = make(map[string]int, numTaskGroups) // Get the ready nodes in the required datacenters - if s.job != nil { + if !s.isStoppedJob() { s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) @@ -119,7 +125,7 @@ func (s *SystemScheduler) process() (bool, error) { // Construct the placement stack s.stack = NewSystemStack(s.ctx) - if s.job != nil { + if !s.isStoppedJob() { s.stack.SetJob(s.job) } @@ -228,7 +234,7 @@ func (s *SystemScheduler) computeJobAllocs() error { // Check if a rolling upgrade strategy is being used limit := len(diff.update) - if s.job != nil && s.job.Update.Rolling() { + if !s.isStoppedJob() && s.job.Update.Rolling() { limit = s.job.Update.MaxParallel } @@ -237,7 +243,7 @@ func (s *SystemScheduler) computeJobAllocs() error { // Nothing remaining to do if placement is not required if len(diff.place) == 0 { - if s.job != nil { + if !s.isStoppedJob() { for _, tg := range s.job.TaskGroups { s.queuedAllocs[tg.Name] = 0 } diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 313d573d3..36713a2f6 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -773,7 +773,7 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { } } -func TestSystemSched_JobDeregister(t *testing.T) { +func TestSystemSched_JobDeregister_Purged(t *testing.T) { h := NewHarness(t) // Create some nodes @@ -842,6 +842,77 @@ func TestSystemSched_JobDeregister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestSystemSched_JobDeregister_Stopped(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + job.Stop = true + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + allocs = append(allocs, alloc) + } + for _, alloc := range allocs { + noErr(t, h.State.UpsertJobSummary(h.NextIndex(), mock.JobSummary(alloc.JobID))) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted the job from all nodes. + for _, node := range nodes { + if len(plan.NodeUpdate[node.ID]) != 1 { + t.Fatalf("bad: %#v", plan) + } + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure no remaining allocations + out, _ = structs.FilterTerminalAllocs(out) + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestSystemSched_NodeDown(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/util.go b/scheduler/util.go index 08db94f15..01f10c8dc 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -21,7 +21,7 @@ type allocTuple struct { // a job requires. This is used to do the count expansion. func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { out := make(map[string]*structs.TaskGroup) - if job == nil { + if job == nil || job.Stop { return out } diff --git a/website/source/docs/commands/stop.html.md.erb b/website/source/docs/commands/stop.html.md.erb index f43261012..9040ae1db 100644 --- a/website/source/docs/commands/stop.html.md.erb +++ b/website/source/docs/commands/stop.html.md.erb @@ -41,6 +41,11 @@ the request. It is safe to exit the monitor early using ctrl+c. * `-yes`: Automatic yes to prompts. +* `-purge`: Purge is used to stop the job and purge it from the system. If not +set, the job will still be queryable and will be purged by the garbage +collector. + + ## Examples Stop the job with ID "job1":