non-purge deregisters

This commit is contained in:
Alex Dadgar
2017-04-14 20:54:30 -07:00
parent 330800f683
commit 950171f094
17 changed files with 447 additions and 46 deletions

View File

@@ -149,10 +149,12 @@ func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *Query
return resp, qm, nil
}
// Deregister is used to remove an existing job.
func (j *Jobs) Deregister(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
// Deregister is used to remove an existing job. If purge is set to true, the job
// is deregistered and purged from the system versus still being queryable and
// eventually GC'ed from the system. Most callers should not specify purge.
func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) {
var resp deregisterJobResponse
wm, err := j.client.delete("/v1/job/"+jobID, &resp, q)
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", jobID, purge), &resp, q)
if err != nil {
return "", nil, err
}
@@ -290,6 +292,7 @@ type ParameterizedJobConfig struct {
// Job is used to serialize a job.
type Job struct {
Stop *bool
Region *string
ID *string
ParentID *string
@@ -338,6 +341,9 @@ func (j *Job) Canonicalize() {
if j.Priority == nil {
j.Priority = helper.IntToPtr(50)
}
if j.Stop == nil {
j.Stop = helper.BoolToPtr(false)
}
if j.Region == nil {
j.Region = helper.StringToPtr("global")
}
@@ -425,6 +431,7 @@ type JobListStub struct {
Name string
Type string
Priority int
Stop bool
Status string
StatusDescription string
JobSummary *JobSummary

View File

@@ -690,13 +690,12 @@ func TestJobs_Deregister(t *testing.T) {
assertWriteMeta(t, wm)
// Attempting delete on non-existing job returns an error
if _, _, err = jobs.Deregister("nope", nil); err != nil {
if _, _, err = jobs.Deregister("nope", false, nil); err != nil {
t.Fatalf("unexpected error deregistering job: %v", err)
}
// Deleting an existing job works
evalID, wm3, err := jobs.Deregister("job1", nil)
// Do a soft deregister of an existing job
evalID, wm3, err := jobs.Deregister("job1", false, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -705,6 +704,26 @@ func TestJobs_Deregister(t *testing.T) {
t.Fatalf("missing eval ID")
}
// Check that the job is still queryable
out, qm1, err := jobs.Info("job1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertQueryMeta(t, qm1)
if out == nil {
t.Fatalf("missing job")
}
// Do a purge deregister of an existing job
evalID, wm4, err := jobs.Deregister("job1", true, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertWriteMeta(t, wm4)
if evalID == "" {
t.Fatalf("missing eval ID")
}
// Check that the job is really gone
result, qm, err := jobs.List(nil)
if err != nil {

View File

@@ -1,6 +1,7 @@
package agent
import (
"fmt"
"net/http"
"strconv"
"strings"
@@ -312,8 +313,20 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {
purgeStr := req.URL.Query().Get("purge")
var purgeBool bool
if purgeStr != "" {
var err error
purgeBool, err = strconv.ParseBool(purgeStr)
if err != nil {
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "purge", purgeStr, err)
}
}
args := structs.JobDeregisterRequest{
JobID: jobName,
Purge: purgeBool,
}
s.parseRegion(req, &args.Region)
@@ -397,6 +410,7 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
job.Canonicalize()
j := &structs.Job{
Stop: *job.Stop,
Region: *job.Region,
ID: *job.ID,
ParentID: *job.ParentID,

View File

@@ -383,7 +383,7 @@ func TestHTTP_JobDelete(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Make the HTTP request
// Make the HTTP request to do a soft delete
req, err := http.NewRequest("DELETE", "/v1/job/"+job.ID, nil)
if err != nil {
t.Fatalf("err: %v", err)
@@ -407,16 +407,56 @@ func TestHTTP_JobDelete(t *testing.T) {
t.Fatalf("missing index")
}
// Check the job is gone
getReq := structs.JobSpecificRequest{
// Check the job is still queryable
getReq1 := structs.JobSpecificRequest{
JobID: job.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var getResp structs.SingleJobResponse
if err := s.Agent.RPC("Job.GetJob", &getReq, &getResp); err != nil {
var getResp1 structs.SingleJobResponse
if err := s.Agent.RPC("Job.GetJob", &getReq1, &getResp1); err != nil {
t.Fatalf("err: %v", err)
}
if getResp.Job != nil {
if getResp1.Job == nil {
t.Fatalf("job doesn't exists")
}
if !getResp1.Job.Stop {
t.Fatalf("job should be marked as stop")
}
// Make the HTTP request to do a purge delete
req2, err := http.NewRequest("DELETE", "/v1/job/"+job.ID+"?purge=true", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW.Flush()
// Make the request
obj, err = s.Server.JobSpecificRequest(respW, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the response
dereg = obj.(structs.JobDeregisterResponse)
if dereg.EvalID == "" {
t.Fatalf("bad: %v", dereg)
}
// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
// Check the job is gone
getReq2 := structs.JobSpecificRequest{
JobID: job.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var getResp2 structs.SingleJobResponse
if err := s.Agent.RPC("Job.GetJob", &getReq2, &getResp2); err != nil {
t.Fatalf("err: %v", err)
}
if getResp2.Job != nil {
t.Fatalf("job still exists")
}
})
@@ -751,6 +791,7 @@ func TestHTTP_JobDispatch(t *testing.T) {
func TestJobs_ApiJobToStructsJob(t *testing.T) {
apiJob := &api.Job{
Stop: helper.BoolToPtr(true),
Region: helper.StringToPtr("global"),
ID: helper.StringToPtr("foo"),
ParentID: helper.StringToPtr("lol"),
@@ -922,12 +963,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
VaultToken: helper.StringToPtr("token"),
Status: helper.StringToPtr("status"),
StatusDescription: helper.StringToPtr("status_desc"),
Version: helper.Uint64ToPtr(10),
CreateIndex: helper.Uint64ToPtr(1),
ModifyIndex: helper.Uint64ToPtr(3),
JobModifyIndex: helper.Uint64ToPtr(5),
}
expected := &structs.Job{
Stop: true,
Region: "global",
ID: "foo",
ParentID: "lol",
@@ -1094,12 +1137,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
VaultToken: "token",
Status: "status",
StatusDescription: "status_desc",
CreateIndex: 1,
ModifyIndex: 3,
JobModifyIndex: 5,
VaultToken: "token",
}
structsJob := ApiJobToStructJob(apiJob)

View File

@@ -141,7 +141,7 @@ func (c *StatusCommand) Run(args []string) int {
fmt.Sprintf("Type|%s", *job.Type),
fmt.Sprintf("Priority|%d", *job.Priority),
fmt.Sprintf("Datacenters|%s", strings.Join(job.Datacenters, ",")),
fmt.Sprintf("Status|%s", *job.Status),
fmt.Sprintf("Status|%s", getStatusString(*job.Status, *job.Stop)),
fmt.Sprintf("Periodic|%v", periodic),
fmt.Sprintf("Parameterized|%v", parameterized),
}
@@ -448,7 +448,14 @@ func createStatusListOutput(jobs []*api.JobListStub) string {
job.ID,
job.Type,
job.Priority,
job.Status)
getStatusString(job.Status, job.Stop))
}
return formatList(out)
}
func getStatusString(status string, stop bool) string {
if stop {
return fmt.Sprintf("%s (stopped)", status)
}
return status
}

View File

@@ -31,6 +31,10 @@ Stop Options:
screen, which can be used to examine the evaluation using the eval-status
command.
-purge
Purge is used to stop the job and purge it from the system. If not set, the
job will still be queryable and will be purged by the garbage collector.
-yes
Automatic yes to prompts.
@@ -45,13 +49,14 @@ func (c *StopCommand) Synopsis() string {
}
func (c *StopCommand) Run(args []string) int {
var detach, verbose, autoYes bool
var detach, purge, verbose, autoYes bool
flags := c.Meta.FlagSet("stop", FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&autoYes, "yes", false, "")
flags.BoolVar(&purge, "purge", false, "")
if err := flags.Parse(args); err != nil {
return 1
@@ -132,7 +137,7 @@ func (c *StopCommand) Run(args []string) int {
}
// Invoke the stop
evalID, _, err := client.Jobs().Deregister(*job.ID, nil)
evalID, _, err := client.Jobs().Deregister(*job.ID, purge, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err))
return 1

View File

@@ -330,20 +330,43 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteJob(index, req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err)
return err
}
// If it is periodic remove it from the dispatcher
if err := n.periodicDispatcher.Remove(req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err)
return err
}
// We always delete from the periodic launch table because it is possible that
// the job was updated to be non-perioidic, thus checking if it is periodic
// doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunch(index, req.JobID)
if req.Purge {
if err := n.state.DeleteJob(index, req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err)
return err
}
// We always delete from the periodic launch table because it is possible that
// the job was updated to be non-perioidic, thus checking if it is periodic
// doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunch(index, req.JobID)
} else {
// Get the current job and mark it as stopped and re-insert it.
ws := memdb.NewWatchSet()
current, err := n.state.JobByID(ws, req.JobID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: JobByID lookup failed: %v", err)
return err
}
if current == nil {
return fmt.Errorf("job %q doesn't exist to be deregistered", req.JobID)
}
stopped := current.Copy()
stopped.Stop = true
if err := n.state.UpsertJob(index, stopped); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertJob failed: %v", err)
return err
}
}
return nil
}

View File

@@ -314,7 +314,7 @@ func TestFSM_RegisterJob(t *testing.T) {
}
}
func TestFSM_DeregisterJob(t *testing.T) {
func TestFSM_DeregisterJob_Purge(t *testing.T) {
fsm := testFSM(t)
job := mock.PeriodicJob()
@@ -333,6 +333,7 @@ func TestFSM_DeregisterJob(t *testing.T) {
req2 := structs.JobDeregisterRequest{
JobID: job.ID,
Purge: true,
}
buf, err = structs.Encode(structs.JobDeregisterRequestType, req2)
if err != nil {
@@ -369,6 +370,65 @@ func TestFSM_DeregisterJob(t *testing.T) {
}
}
func TestFSM_DeregisterJob_NoPurge(t *testing.T) {
fsm := testFSM(t)
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
req2 := structs.JobDeregisterRequest{
JobID: job.ID,
Purge: false,
}
buf, err = structs.Encode(structs.JobDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if jobOut == nil {
t.Fatalf("job not found!")
}
if !jobOut.Stop {
t.Fatalf("job not stopped found!")
}
// Verify it was removed from the periodic runner.
if _, ok := fsm.periodicDispatcher.tracked[job.ID]; ok {
t.Fatal("job not removed from periodic runner")
}
// Verify it was removed from the periodic launch table.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut == nil {
t.Fatalf("launch not found!")
}
}
func TestFSM_UpdateEval(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)

View File

@@ -432,6 +432,11 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
return fmt.Errorf("index update failed: %v", err)
}
// Delete the job versions
if err := s.deleteJobVersions(index, job, txn); err != nil {
return err
}
// Delete the job summary
if _, err = txn.DeleteAll("job_summary", "id", jobID); err != nil {
return fmt.Errorf("deleing job summary failed: %v", err)
@@ -444,6 +449,37 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
return nil
}
// deleteJobVersions deletes all versions of the given job.
func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error {
iter, err := txn.Get("job_versions", "id_prefix", job.ID)
if err != nil {
return err
}
for {
raw := iter.Next()
if raw == nil {
break
}
// Ensure the ID is an exact match
j := raw.(*structs.Job)
if j.ID != job.ID {
continue
}
if _, err = txn.DeleteAll("job_versions", "id", job.ID, job.Version); err != nil {
return fmt.Errorf("deleing job versions failed: %v", err)
}
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}
// upsertJobVersion inserts a job into its historic version table and limits the
// number of job versions that are tracked.
func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error {

View File

@@ -797,6 +797,30 @@ func TestStateStore_DeleteJob_Job(t *testing.T) {
t.Fatalf("expected summary to be nil, but got: %v", summary)
}
index, err = state.Index("job_summary")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1001 {
t.Fatalf("bad: %d", index)
}
versions, err := state.JobVersionsByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(versions) != 0 {
t.Fatalf("expected no job versions")
}
index, err = state.Index("job_summary")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1001 {
t.Fatalf("bad: %d", index)
}
if watchFired(ws) {
t.Fatalf("bad")
}

View File

@@ -239,6 +239,12 @@ type JobRegisterRequest struct {
// to deregister a job as being a schedulable entity.
type JobDeregisterRequest struct {
JobID string
// Purge controls whether the deregister purges the job from the system or
// whether the job is just marked as stopped and will be removed by the
// garbage collector
Purge bool
WriteRequest
}
@@ -1114,6 +1120,12 @@ const (
// is further composed of tasks. A task group (TG) is the unit of scheduling
// however.
type Job struct {
// Stop marks whether the user has stopped the job. A stopped job will
// have all created allocations stopped and acts as a way to stop a job
// without purging it from the system. This allows existing allocs to be
// queried and the job to be inspected as it is being killed.
Stop bool
// Region is the Nomad region that handles scheduling this job
Region string
@@ -1384,6 +1396,7 @@ func (j *Job) Stub(summary *JobSummary) *JobListStub {
Name: j.Name,
Type: j.Type,
Priority: j.Priority,
Stop: j.Stop,
Status: j.Status,
StatusDescription: j.StatusDescription,
CreateIndex: j.CreateIndex,
@@ -1483,6 +1496,7 @@ type JobListStub struct {
Name string
Type string
Priority int
Stop bool
Status string
StatusDescription string
JobSummary *JobSummary

View File

@@ -179,6 +179,12 @@ func (s *GenericScheduler) createBlockedEval(planFailure bool) error {
return s.planner.CreateEval(s.blocked)
}
// isStoppedJob returns if the scheduling is for a stopped job and the scheduler
// should stop all its allocations.
func (s *GenericScheduler) isStoppedJob() bool {
return s.job == nil || s.job.Stop
}
// process is wrapped in retryMax to iteratively run the handler until we have no
// further work or we've made the maximum number of attempts.
func (s *GenericScheduler) process() (bool, error) {
@@ -191,7 +197,7 @@ func (s *GenericScheduler) process() (bool, error) {
s.eval.JobID, err)
}
numTaskGroups := 0
if s.job != nil {
if !s.isStoppedJob() {
numTaskGroups = len(s.job.TaskGroups)
}
s.queuedAllocs = make(map[string]int, numTaskGroups)
@@ -207,7 +213,7 @@ func (s *GenericScheduler) process() (bool, error) {
// Construct the placement stack
s.stack = NewGenericStack(s.batch, s.ctx)
if s.job != nil {
if !s.isStoppedJob() {
s.stack.SetJob(s.job)
}
@@ -351,7 +357,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([
func (s *GenericScheduler) computeJobAllocs() error {
// Materialize all the task groups, job could be missing if deregistered
var groups map[string]*structs.TaskGroup
if s.job != nil {
if !s.isStoppedJob() {
groups = materializeTaskGroups(s.job)
}
@@ -398,7 +404,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
// Check if a rolling upgrade strategy is being used
limit := len(diff.update) + len(diff.migrate) + len(diff.lost)
if s.job != nil && s.job.Update.Rolling() {
if !s.isStoppedJob() && s.job.Update.Rolling() {
limit = s.job.Update.MaxParallel
}
@@ -414,7 +420,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
// Nothing remaining to do if placement is not required
if len(diff.place) == 0 {
if s.job != nil {
if !s.isStoppedJob() {
for _, tg := range s.job.TaskGroups {
s.queuedAllocs[tg.Name] = 0
}

View File

@@ -1671,7 +1671,7 @@ func TestServiceSched_JobModify_DistinctProperty(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobDeregister(t *testing.T) {
func TestServiceSched_JobDeregister_Purged(t *testing.T) {
h := NewHarness(t)
// Generate a fake job with allocations
@@ -1735,6 +1735,72 @@ func TestServiceSched_JobDeregister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobDeregister_Stopped(t *testing.T) {
h := NewHarness(t)
// Generate a fake job with allocations
job := mock.Job()
job.Stop = true
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
allocs = append(allocs, alloc)
}
for _, alloc := range allocs {
h.State.UpsertJobSummary(h.NextIndex(), mock.JobSummary(alloc.JobID))
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan evicted all nodes
if len(plan.NodeUpdate["12345678-abcd-efab-cdef-123456789abc"]) != len(allocs) {
t.Fatalf("bad: %#v", plan)
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.ID, false)
noErr(t, err)
// Ensure that the job field on the allocation is still populated
for _, alloc := range out {
if alloc.Job == nil {
t.Fatalf("bad: %#v", alloc)
}
}
// Ensure no remaining allocations
out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 0 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_NodeDown(t *testing.T) {
h := NewHarness(t)

View File

@@ -83,6 +83,12 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
s.queuedAllocs)
}
// isStoppedJob returns if the scheduling is for a stopped job and the scheduler
// should stop all its allocations.
func (s *SystemScheduler) isStoppedJob() bool {
return s.job == nil || s.job.Stop
}
// process is wrapped in retryMax to iteratively run the handler until we have no
// further work or we've made the maximum number of attempts.
func (s *SystemScheduler) process() (bool, error) {
@@ -95,13 +101,13 @@ func (s *SystemScheduler) process() (bool, error) {
s.eval.JobID, err)
}
numTaskGroups := 0
if s.job != nil {
if !s.isStoppedJob() {
numTaskGroups = len(s.job.TaskGroups)
}
s.queuedAllocs = make(map[string]int, numTaskGroups)
// Get the ready nodes in the required datacenters
if s.job != nil {
if !s.isStoppedJob() {
s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return false, fmt.Errorf("failed to get ready nodes: %v", err)
@@ -119,7 +125,7 @@ func (s *SystemScheduler) process() (bool, error) {
// Construct the placement stack
s.stack = NewSystemStack(s.ctx)
if s.job != nil {
if !s.isStoppedJob() {
s.stack.SetJob(s.job)
}
@@ -228,7 +234,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
// Check if a rolling upgrade strategy is being used
limit := len(diff.update)
if s.job != nil && s.job.Update.Rolling() {
if !s.isStoppedJob() && s.job.Update.Rolling() {
limit = s.job.Update.MaxParallel
}
@@ -237,7 +243,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
// Nothing remaining to do if placement is not required
if len(diff.place) == 0 {
if s.job != nil {
if !s.isStoppedJob() {
for _, tg := range s.job.TaskGroups {
s.queuedAllocs[tg.Name] = 0
}

View File

@@ -773,7 +773,7 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) {
}
}
func TestSystemSched_JobDeregister(t *testing.T) {
func TestSystemSched_JobDeregister_Purged(t *testing.T) {
h := NewHarness(t)
// Create some nodes
@@ -842,6 +842,77 @@ func TestSystemSched_JobDeregister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_JobDeregister_Stopped(t *testing.T) {
h := NewHarness(t)
// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Generate a fake job with allocations
job := mock.SystemJob()
job.Stop = true
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
for _, alloc := range allocs {
noErr(t, h.State.UpsertJobSummary(h.NextIndex(), mock.JobSummary(alloc.JobID)))
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan evicted the job from all nodes.
for _, node := range nodes {
if len(plan.NodeUpdate[node.ID]) != 1 {
t.Fatalf("bad: %#v", plan)
}
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.ID, false)
noErr(t, err)
// Ensure no remaining allocations
out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 0 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_NodeDown(t *testing.T) {
h := NewHarness(t)

View File

@@ -21,7 +21,7 @@ type allocTuple struct {
// a job requires. This is used to do the count expansion.
func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
out := make(map[string]*structs.TaskGroup)
if job == nil {
if job == nil || job.Stop {
return out
}

View File

@@ -41,6 +41,11 @@ the request. It is safe to exit the monitor early using ctrl+c.
* `-yes`: Automatic yes to prompts.
* `-purge`: Purge is used to stop the job and purge it from the system. If not
set, the job will still be queryable and will be purged by the garbage
collector.
## Examples
Stop the job with ID "job1":