From 03c6692f9802ecc62f46793712da357aa9f2de61 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 21 Jul 2016 14:43:21 -0700 Subject: [PATCH] Fixed some bugs --- api/jobs_test.go | 2 +- client/client_test.go | 18 ++++++-- command/agent/alloc_endpoint_test.go | 17 ++++++-- command/agent/job_endpoint.go | 2 +- command/agent/node_endpoint_test.go | 9 ++++ command/status.go | 1 + nomad/alloc_endpoint_test.go | 12 +++++- nomad/fsm.go | 3 +- nomad/fsm_test.go | 16 ++++++-- nomad/job_endpoint.go | 8 ++-- nomad/job_endpoint_test.go | 4 +- nomad/mock/mock.go | 6 +-- nomad/state/state_store.go | 6 ++- nomad/state/state_store_test.go | 61 +++++++++++++++++++++++----- nomad/structs/structs.go | 2 +- nomad/worker_test.go | 6 +-- scheduler/system_sched.go | 4 ++ 17 files changed, 136 insertions(+), 41 deletions(-) diff --git a/api/jobs_test.go b/api/jobs_test.go index fea8fc11e..11efc7a36 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -508,7 +508,7 @@ func TestJobs_JobSummary(t *testing.T) { } assertWriteMeta(t, wm) - // Query the job again and ensure it exists + // Query the job summary again and ensure it exists result, qm, err := jobs.Summary("job1", nil) if err != nil { t.Fatalf("err: %s", err) diff --git a/client/client_test.go b/client/client_test.go index 60af15bad..105b089da 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -357,6 +357,9 @@ func TestClient_UpdateAllocStatus(t *testing.T) { alloc.ClientStatus = originalStatus state := s1.State() + if err := state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)); err != nil { + t.Fatal(err) + } state.UpsertAllocs(100, []*structs.Allocation{alloc}) testutil.WaitForResult(func() (bool, error) { @@ -394,6 +397,12 @@ func TestClient_WatchAllocs(t *testing.T) { alloc2.NodeID = c1.Node().ID state := s1.State() + if err := state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)); err != nil { + t.Fatal(err) + } err := state.UpsertAllocs(100, []*structs.Allocation{alloc1, alloc2}) if err != nil { @@ -469,8 +478,10 @@ func TestClient_SaveRestoreState(t *testing.T) { task.Config["args"] = []string{"10"} state := s1.State() - err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}) - if err != nil { + if err := state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } + if err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}); err != nil { t.Fatalf("err: %v", err) } @@ -485,8 +496,7 @@ func TestClient_SaveRestoreState(t *testing.T) { }) // Shutdown the client, saves state - err = c1.Shutdown() - if err != nil { + if err := c1.Shutdown(); err != nil { t.Fatalf("err: %v", err) } diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index e2a4efd83..7ae3706ec 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -58,13 +58,21 @@ func TestHTTP_AllocsPrefixList(t *testing.T) { httpTest(t, nil, func(s *TestServer) { // Directly manipulate the state state := s.Agent.server.State() + alloc1 := mock.Alloc() alloc1.ID = "aaaaaaaa-e8f7-fd38-c855-ab94ceb89706" alloc2 := mock.Alloc() alloc2.ID = "aaabbbbb-e8f7-fd38-c855-ab94ceb89706" - err := state.UpsertAllocs(1000, - []*structs.Allocation{alloc1, alloc2}) - if err != nil { + summary1 := mock.JobSummary(alloc1.JobID) + summary2 := mock.JobSummary(alloc2.JobID) + if err := state.UpsertJobSummary(998, summary1); err != nil { + t.Fatal(err) + } + if err := state.UpsertJobSummary(999, summary2); err != nil { + t.Fatal(err) + } + if err := state.UpsertAllocs(1000, + []*structs.Allocation{alloc1, alloc2}); err != nil { t.Fatalf("err: %v", err) } @@ -110,6 +118,9 @@ func TestHTTP_AllocQuery(t *testing.T) { // Directly manipulate the state state := s.Agent.server.State() alloc := mock.Alloc() + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)); err != nil { + t.Fatal(err) + } err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) if err != nil { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 2d366d592..a4be34c25 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -253,7 +253,7 @@ func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Reque return nil, nil } - var out structs.SingleJobSummaryResponse + var out structs.JobSummaryResponse if err := s.agent.RPC("Job.GetSummary", &args, &out); err != nil { return nil, err } diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index a63739a18..a9fe426bf 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -129,6 +129,9 @@ func TestHTTP_NodeForceEval(t *testing.T) { state := s.Agent.server.State() alloc1 := mock.Alloc() alloc1.NodeID = node.ID + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) if err != nil { t.Fatalf("err: %v", err) @@ -177,6 +180,9 @@ func TestHTTP_NodeAllocations(t *testing.T) { state := s.Agent.server.State() alloc1 := mock.Alloc() alloc1.NodeID = node.ID + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) if err != nil { t.Fatalf("err: %v", err) @@ -231,6 +237,9 @@ func TestHTTP_NodeDrain(t *testing.T) { state := s.Agent.server.State() alloc1 := mock.Alloc() alloc1.NodeID = node.ID + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) if err != nil { t.Fatalf("err: %v", err) diff --git a/command/status.go b/command/status.go index 3322175ff..825d805c4 100644 --- a/command/status.go +++ b/command/status.go @@ -264,6 +264,7 @@ func (c *StatusCommand) outputJobInfo(client *api.Client, job *api.Job) error { tgs.Running, tgs.Failed, tgs.Complete, tgs.Lost, ) + idx += 1 } c.Ui.Output(formatList(summaries)) } diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index 263dc7d4c..74afbf89b 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -19,9 +19,13 @@ func TestAllocEndpoint_List(t *testing.T) { // Create the register request alloc := mock.Alloc() + summary := mock.JobSummary(alloc.JobID) state := s1.fsm.State() - err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) - if err != nil { + + if err := state.UpsertJobSummary(999, summary); err != nil { + t.Fatalf("err: %v", err) + } + if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -75,6 +79,10 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) { // Create the alloc alloc := mock.Alloc() + summary := mock.JobSummary(alloc.JobID) + if err := state.UpsertJobSummary(1, summary); err != nil { + t.Fatalf("err: %v", err) + } // Upsert alloc triggers watches time.AfterFunc(100*time.Millisecond, func() { if err := state.UpsertAllocs(2, []*structs.Allocation{alloc}); err != nil { diff --git a/nomad/fsm.go b/nomad/fsm.go index d317f6058..1b42aea2a 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -410,6 +410,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } + // Updating the allocs with the job id and task group name for _, alloc := range req.Alloc { if existing, _ := n.state.AllocByID(alloc.ID); existing != nil { alloc.JobID = existing.JobID @@ -808,7 +809,7 @@ func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink, break } - jobSummary := raw.(*structs.JobSummary) + jobSummary := raw.(structs.JobSummary) sink.Write([]byte{byte(JobSummarySnapshot)}) if err := encoder.Encode(jobSummary); err != nil { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 897a975de..0f0ac0d06 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -489,6 +489,7 @@ func TestFSM_UpsertAllocs(t *testing.T) { fsm := testFSM(t) alloc := mock.Alloc() + fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) req := structs.AllocUpdateRequest{ Alloc: []*structs.Allocation{alloc}, } @@ -544,6 +545,7 @@ func TestFSM_UpsertAllocs_SharedJob(t *testing.T) { fsm := testFSM(t) alloc := mock.Alloc() + fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) job := alloc.Job alloc.Job = nil req := structs.AllocUpdateRequest{ @@ -611,6 +613,7 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) { fsm := testFSM(t) alloc := mock.Alloc() + fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) job := alloc.Job resources := alloc.Resources alloc.Resources = nil @@ -667,7 +670,9 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { alloc.NodeID = node.ID alloc2 := mock.Alloc() alloc2.NodeID = node.ID - state.UpsertAllocs(1, []*structs.Allocation{alloc, alloc2}) + state.UpsertJobSummary(8, mock.JobSummary(alloc.JobID)) + state.UpsertJobSummary(9, mock.JobSummary(alloc2.JobID)) + state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2}) clientAlloc := new(structs.Allocation) *clientAlloc = *alloc @@ -730,7 +735,8 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { state := fsm.State() alloc := mock.Alloc() - state.UpsertAllocs(1, []*structs.Allocation{alloc}) + state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID)) + state.UpsertAllocs(10, []*structs.Allocation{alloc}) clientAlloc := new(structs.Allocation) *clientAlloc = *alloc @@ -757,7 +763,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { clientAlloc.CreateIndex = out.CreateIndex clientAlloc.ModifyIndex = out.ModifyIndex if !reflect.DeepEqual(clientAlloc, out) { - t.Fatalf("bad: %#v %#v", clientAlloc, out) + t.Fatalf("err: %#v,%#v", clientAlloc, out) } } @@ -857,8 +863,10 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) { fsm := testFSM(t) state := fsm.State() alloc1 := mock.Alloc() - state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) alloc2 := mock.Alloc() + state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)) + state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) + state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) state.UpsertAllocs(1001, []*structs.Allocation{alloc2}) // Verify the contents diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 809219dbc..4af40cd03 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -115,7 +115,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // GetSummary retreives the summary of a job func (j *Job) GetSummary(args *structs.JobSummaryRequest, - reply *structs.SingleJobSummaryResponse) error { + reply *structs.JobSummaryResponse) error { if done, err := j.srv.forward("Job.GetSummary", args, args, reply); done { return err } @@ -126,12 +126,12 @@ func (j *Job) GetSummary(args *structs.JobSummaryRequest, queryMeta: &reply.QueryMeta, watch: watch.NewItems(watch.Item{JobSummary: args.JobID}), run: func() error { - - // Look for the job snap, err := j.srv.fsm.State().Snapshot() if err != nil { return err } + + // Look for job summary out, err := snap.JobSummaryByID(args.JobID) if err != nil { return err @@ -155,8 +155,6 @@ func (j *Job) GetSummary(args *structs.JobSummaryRequest, return nil }} return j.srv.blockingRPC(&opts) - - return nil } // Evaluate is used to force a job for re-evaluation diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index b3dd5a0b0..0b3e747d4 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -735,12 +735,12 @@ func TestJobEndpoint_GetJobSummary(t *testing.T) { job.ModifyIndex = resp.JobModifyIndex job.JobModifyIndex = resp.JobModifyIndex - // Lookup the job + // Lookup the job summary get := &structs.JobSummaryRequest{ JobID: job.ID, QueryOptions: structs.QueryOptions{Region: "global"}, } - var resp2 structs.SingleJobSummaryResponse + var resp2 structs.JobSummaryResponse if err := msgpackrpc.CallWithCodec(codec, "Job.GetSummary", get, &resp2); err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 9fa38fe99..8a449197e 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -235,9 +235,9 @@ func JobSummary(jobID string) *structs.JobSummary { js := &structs.JobSummary{ JobID: jobID, Summary: map[string]structs.TaskGroupSummary{ - "cache": { - Queued: 5, - Starting: 1, + "web": { + Queued: 0, + Starting: 0, }, }, } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d5ccde233..a1ab1dd9f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1314,6 +1314,9 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, // or inserted func (s *StateStore) updateSummaryWithAlloc(index uint64, allocs []*structs.Allocation, watcher watch.Items, txn *memdb.Txn) error { + if len(allocs) == 0 { + return nil + } jobID := allocs[0].JobID jobSummary, err := s.JobSummaryByID(jobID) @@ -1339,7 +1342,8 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, allocs []*structs.Allo if existing == nil { switch alloc.DesiredStatus { case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: - s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", alloc.DesiredStatus) + s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", + alloc.ID, alloc.DesiredStatus) } switch alloc.ClientStatus { case structs.AllocClientStatusPending: diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1dc7f22b2..323ae09e2 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -144,6 +144,15 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { alloc2.NodeID = node.ID alloc.ClientStatus = structs.AllocClientStatusRunning alloc1.ClientStatus = structs.AllocClientStatusFailed + if err := state.UpsertJobSummary(990, mock.JobSummary(alloc.JobID)); err != nil { + t.Fatal(err) + } + if err := state.UpsertJobSummary(990, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } + if err := state.UpsertJobSummary(990, mock.JobSummary(alloc2.JobID)); err != nil { + t.Fatal(err) + } alloc2.ClientStatus = structs.AllocClientStatusPending if err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc1, alloc2}); err != nil { @@ -1286,6 +1295,10 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) { watch.Item{AllocNode: alloc1.NodeID}, watch.Item{AllocNode: alloc2.NodeID}) + state.UpsertJobSummary(900, mock.JobSummary(eval1.JobID)) + state.UpsertJobSummary(901, mock.JobSummary(eval2.JobID)) + state.UpsertJobSummary(902, mock.JobSummary(alloc1.JobID)) + state.UpsertJobSummary(903, mock.JobSummary(alloc2.JobID)) err := state.UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}) if err != nil { t.Fatalf("err: %v", err) @@ -1540,6 +1553,8 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { watch.Item{AllocJob: alloc2.JobID}, watch.Item{AllocNode: alloc2.NodeID}) + state.UpsertJobSummary(900, mock.JobSummary(alloc.JobID)) + state.UpsertJobSummary(901, mock.JobSummary(alloc2.JobID)) if err := state.UpsertJob(999, alloc.Job); err != nil { t.Fatalf("err: %v", err) } @@ -1623,8 +1638,8 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { t.Fatalf("err: %v", err) } tgSummary2 := summary2.Summary["web"] - if tgSummary2.Running != 1 { - t.Fatalf("expected running: %v, actual: %v", 1, tgSummary2.Failed) + if tgSummary2.Running != 0 { + t.Fatalf("expected running: %v, actual: %v", 0, tgSummary2.Failed) } notify.verify(t) @@ -1687,6 +1702,7 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() + state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) if err := state.UpsertJob(999, alloc.Job); err != nil { t.Fatalf("err: %v", err) @@ -1709,6 +1725,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { alloc2 := mock.Alloc() alloc2.ID = alloc.ID alloc2.NodeID = alloc.NodeID + ".new" + state.UpsertJobSummary(1001, mock.JobSummary(alloc2.JobID)) notify := setupNotifyTest( state, @@ -1718,7 +1735,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { watch.Item{AllocJob: alloc2.JobID}, watch.Item{AllocNode: alloc2.NodeID}) - err = state.UpsertAllocs(1001, []*structs.Allocation{alloc2}) + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc2}) if err != nil { t.Fatalf("err: %v", err) } @@ -1735,7 +1752,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { if out.CreateIndex != 1000 { t.Fatalf("bad: %#v", out) } - if out.ModifyIndex != 1001 { + if out.ModifyIndex != 1002 { t.Fatalf("bad: %#v", out) } @@ -1743,7 +1760,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - if index != 1001 { + if index != 1002 { t.Fatalf("bad: %d", index) } @@ -1764,6 +1781,7 @@ func TestStateStore_EvictAlloc_Alloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) @@ -1805,6 +1823,10 @@ func TestStateStore_AllocsByNode(t *testing.T) { allocs = append(allocs, alloc) } + for idx, alloc := range allocs { + state.UpsertJobSummary(uint64(900+idx), mock.JobSummary(alloc.JobID)) + } + err := state.UpsertAllocs(1000, allocs) if err != nil { t.Fatalf("err: %v", err) @@ -1839,6 +1861,10 @@ func TestStateStore_AllocsByNodeTerminal(t *testing.T) { allocs = append(allocs, alloc) } + for idx, alloc := range allocs { + state.UpsertJobSummary(uint64(900+idx), mock.JobSummary(alloc.JobID)) + } + err := state.UpsertAllocs(1000, allocs) if err != nil { t.Fatalf("err: %v", err) @@ -1881,6 +1907,10 @@ func TestStateStore_AllocsByJob(t *testing.T) { allocs = append(allocs, alloc) } + for i, alloc := range allocs { + state.UpsertJobSummary(uint64(900+i), mock.JobSummary(alloc.JobID)) + } + err := state.UpsertAllocs(1000, allocs) if err != nil { t.Fatalf("err: %v", err) @@ -1920,6 +1950,10 @@ func TestStateStore_AllocsByIDPrefix(t *testing.T) { allocs = append(allocs, alloc) } + for i, alloc := range allocs { + state.UpsertJobSummary(uint64(900+i), mock.JobSummary(alloc.JobID)) + } + err := state.UpsertAllocs(1000, allocs) if err != nil { t.Fatalf("err: %v", err) @@ -1974,6 +2008,9 @@ func TestStateStore_Allocs(t *testing.T) { alloc := mock.Alloc() allocs = append(allocs, alloc) } + for i, alloc := range allocs { + state.UpsertJobSummary(uint64(900+i), mock.JobSummary(alloc.JobID)) + } err := state.UpsertAllocs(1000, allocs) if err != nil { @@ -2185,6 +2222,7 @@ func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) { alloc := mock.Alloc() alloc.JobID = job.ID alloc.DesiredStatus = structs.AllocDesiredStatusStop + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -2216,6 +2254,7 @@ func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) { alloc := mock.Alloc() alloc.JobID = job.ID alloc.DesiredStatus = structs.AllocDesiredStatusRun + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -2333,11 +2372,8 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) { t.Fatalf("err: %v", err) } - job.TaskGroups[0].Count = 1 - err = state.UpsertJob(1003, job) - if err != nil { - t.Fatalf("err: %v", err) - } + outA, _ := state.AllocByID(alloc3.ID) + summary, _ = state.JobSummaryByID(job.ID) expectedSummary := structs.JobSummary{ JobID: job.ID, @@ -2346,6 +2382,8 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) { Starting: 3, }, }, + CreateIndex: job.CreateIndex, + ModifyIndex: outA.ModifyIndex, } if !reflect.DeepEqual(summary, &expectedSummary) { t.Fatalf("expected summary: %v, actual: %v", expectedSummary, summary) @@ -2368,6 +2406,7 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) { if err := state.UpsertAllocs(1004, []*structs.Allocation{alloc4, alloc5}); err != nil { t.Fatalf("err: %v", err) } + outA, _ = state.AllocByID(alloc5.ID) summary, _ = state.JobSummaryByID(job.ID) expectedSummary = structs.JobSummary{ JobID: job.ID, @@ -2377,6 +2416,8 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) { Starting: 1, }, }, + CreateIndex: job.CreateIndex, + ModifyIndex: outA.ModifyIndex, } if !reflect.DeepEqual(summary, &expectedSummary) { t.Fatalf("expected: %v, actual: %v", expectedSummary, summary) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 3273261c2..40d978a9d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -448,7 +448,7 @@ type SingleJobResponse struct { } // SingleJobSummary is used to return a single job summary -type SingleJobSummaryResponse struct { +type JobSummaryResponse struct { JobSummary *JobSummary QueryMeta } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 7e34983de..611f03f3e 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -472,9 +472,9 @@ func TestWorker_ReblockEval(t *testing.T) { // Create the job summary js := mock.JobSummary(eval1.JobID) - tg := js.Summary["cache"] + tg := js.Summary["web"] tg.Queued = 100 - js.Summary["cache"] = tg + js.Summary["web"] = tg if err := s1.fsm.State().UpsertJobSummary(1001, js); err != nil { t.Fatal(err) } @@ -490,7 +490,7 @@ func TestWorker_ReblockEval(t *testing.T) { } eval2 := evalOut.Copy() - eval2.QueuedAllocations = map[string]int{"cache": 50} + eval2.QueuedAllocations = map[string]int{"web": 50} // Attempt to reblock eval w := &Worker{srv: s1, logger: s1.logger, evalToken: token} diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 34bd09f2e..06b3f269f 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -149,6 +149,10 @@ func (s *SystemScheduler) process() (bool, error) { if result != nil { for _, allocations := range result.NodeAllocation { for _, allocation := range allocations { + if allocation.CreateIndex != result.AllocIndex { + continue + } + if _, ok := s.queuedAllocs[allocation.TaskGroup]; ok { s.queuedAllocs[allocation.TaskGroup] -= 1 } else {