Fixed some bugs

This commit is contained in:
Diptanu Choudhury
2016-07-21 14:43:21 -07:00
parent 8f45222ce8
commit 03c6692f98
17 changed files with 136 additions and 41 deletions

View File

@@ -508,7 +508,7 @@ func TestJobs_JobSummary(t *testing.T) {
}
assertWriteMeta(t, wm)
// Query the job again and ensure it exists
// Query the job summary again and ensure it exists
result, qm, err := jobs.Summary("job1", nil)
if err != nil {
t.Fatalf("err: %s", err)

View File

@@ -357,6 +357,9 @@ func TestClient_UpdateAllocStatus(t *testing.T) {
alloc.ClientStatus = originalStatus
state := s1.State()
if err := state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
state.UpsertAllocs(100, []*structs.Allocation{alloc})
testutil.WaitForResult(func() (bool, error) {
@@ -394,6 +397,12 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2.NodeID = c1.Node().ID
state := s1.State()
if err := state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(100,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
@@ -469,8 +478,10 @@ func TestClient_SaveRestoreState(t *testing.T) {
task.Config["args"] = []string{"10"}
state := s1.State()
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
if err := state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}
@@ -485,8 +496,7 @@ func TestClient_SaveRestoreState(t *testing.T) {
})
// Shutdown the client, saves state
err = c1.Shutdown()
if err != nil {
if err := c1.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}

View File

@@ -58,13 +58,21 @@ func TestHTTP_AllocsPrefixList(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Directly manipulate the state
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.ID = "aaaaaaaa-e8f7-fd38-c855-ab94ceb89706"
alloc2 := mock.Alloc()
alloc2.ID = "aaabbbbb-e8f7-fd38-c855-ab94ceb89706"
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
summary1 := mock.JobSummary(alloc1.JobID)
summary2 := mock.JobSummary(alloc2.JobID)
if err := state.UpsertJobSummary(998, summary1); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(999, summary2); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2}); err != nil {
t.Fatalf("err: %v", err)
}
@@ -110,6 +118,9 @@ func TestHTTP_AllocQuery(t *testing.T) {
// Directly manipulate the state
state := s.Agent.server.State()
alloc := mock.Alloc()
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc})
if err != nil {

View File

@@ -253,7 +253,7 @@ func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Reque
return nil, nil
}
var out structs.SingleJobSummaryResponse
var out structs.JobSummaryResponse
if err := s.agent.RPC("Job.GetSummary", &args, &out); err != nil {
return nil, err
}

View File

@@ -129,6 +129,9 @@ func TestHTTP_NodeForceEval(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
@@ -177,6 +180,9 @@ func TestHTTP_NodeAllocations(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
@@ -231,6 +237,9 @@ func TestHTTP_NodeDrain(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)

View File

@@ -264,6 +264,7 @@ func (c *StatusCommand) outputJobInfo(client *api.Client, job *api.Job) error {
tgs.Running, tgs.Failed,
tgs.Complete, tgs.Lost,
)
idx += 1
}
c.Ui.Output(formatList(summaries))
}

View File

@@ -19,9 +19,13 @@ func TestAllocEndpoint_List(t *testing.T) {
// Create the register request
alloc := mock.Alloc()
summary := mock.JobSummary(alloc.JobID)
state := s1.fsm.State()
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
if err := state.UpsertJobSummary(999, summary); err != nil {
t.Fatalf("err: %v", err)
}
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
@@ -75,6 +79,10 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
// Create the alloc
alloc := mock.Alloc()
summary := mock.JobSummary(alloc.JobID)
if err := state.UpsertJobSummary(1, summary); err != nil {
t.Fatalf("err: %v", err)
}
// Upsert alloc triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertAllocs(2, []*structs.Allocation{alloc}); err != nil {

View File

@@ -410,6 +410,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}
// Updating the allocs with the job id and task group name
for _, alloc := range req.Alloc {
if existing, _ := n.state.AllocByID(alloc.ID); existing != nil {
alloc.JobID = existing.JobID
@@ -808,7 +809,7 @@ func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink,
break
}
jobSummary := raw.(*structs.JobSummary)
jobSummary := raw.(structs.JobSummary)
sink.Write([]byte{byte(JobSummarySnapshot)})
if err := encoder.Encode(jobSummary); err != nil {

View File

@@ -489,6 +489,7 @@ func TestFSM_UpsertAllocs(t *testing.T) {
fsm := testFSM(t)
alloc := mock.Alloc()
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
}
@@ -544,6 +545,7 @@ func TestFSM_UpsertAllocs_SharedJob(t *testing.T) {
fsm := testFSM(t)
alloc := mock.Alloc()
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
job := alloc.Job
alloc.Job = nil
req := structs.AllocUpdateRequest{
@@ -611,6 +613,7 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
fsm := testFSM(t)
alloc := mock.Alloc()
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
job := alloc.Job
resources := alloc.Resources
alloc.Resources = nil
@@ -667,7 +670,9 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
alloc.NodeID = node.ID
alloc2 := mock.Alloc()
alloc2.NodeID = node.ID
state.UpsertAllocs(1, []*structs.Allocation{alloc, alloc2})
state.UpsertJobSummary(8, mock.JobSummary(alloc.JobID))
state.UpsertJobSummary(9, mock.JobSummary(alloc2.JobID))
state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2})
clientAlloc := new(structs.Allocation)
*clientAlloc = *alloc
@@ -730,7 +735,8 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) {
state := fsm.State()
alloc := mock.Alloc()
state.UpsertAllocs(1, []*structs.Allocation{alloc})
state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(10, []*structs.Allocation{alloc})
clientAlloc := new(structs.Allocation)
*clientAlloc = *alloc
@@ -757,7 +763,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) {
clientAlloc.CreateIndex = out.CreateIndex
clientAlloc.ModifyIndex = out.ModifyIndex
if !reflect.DeepEqual(clientAlloc, out) {
t.Fatalf("bad: %#v %#v", clientAlloc, out)
t.Fatalf("err: %#v,%#v", clientAlloc, out)
}
}
@@ -857,8 +863,10 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) {
fsm := testFSM(t)
state := fsm.State()
alloc1 := mock.Alloc()
state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
alloc2 := mock.Alloc()
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
state.UpsertAllocs(1001, []*structs.Allocation{alloc2})
// Verify the contents

View File

@@ -115,7 +115,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// GetSummary retreives the summary of a job
func (j *Job) GetSummary(args *structs.JobSummaryRequest,
reply *structs.SingleJobSummaryResponse) error {
reply *structs.JobSummaryResponse) error {
if done, err := j.srv.forward("Job.GetSummary", args, args, reply); done {
return err
}
@@ -126,12 +126,12 @@ func (j *Job) GetSummary(args *structs.JobSummaryRequest,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{JobSummary: args.JobID}),
run: func() error {
// Look for the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
// Look for job summary
out, err := snap.JobSummaryByID(args.JobID)
if err != nil {
return err
@@ -155,8 +155,6 @@ func (j *Job) GetSummary(args *structs.JobSummaryRequest,
return nil
}}
return j.srv.blockingRPC(&opts)
return nil
}
// Evaluate is used to force a job for re-evaluation

View File

@@ -735,12 +735,12 @@ func TestJobEndpoint_GetJobSummary(t *testing.T) {
job.ModifyIndex = resp.JobModifyIndex
job.JobModifyIndex = resp.JobModifyIndex
// Lookup the job
// Lookup the job summary
get := &structs.JobSummaryRequest{
JobID: job.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp2 structs.SingleJobSummaryResponse
var resp2 structs.JobSummaryResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.GetSummary", get, &resp2); err != nil {
t.Fatalf("err: %v", err)
}

View File

@@ -235,9 +235,9 @@ func JobSummary(jobID string) *structs.JobSummary {
js := &structs.JobSummary{
JobID: jobID,
Summary: map[string]structs.TaskGroupSummary{
"cache": {
Queued: 5,
Starting: 1,
"web": {
Queued: 0,
Starting: 0,
},
},
}

View File

@@ -1314,6 +1314,9 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
// or inserted
func (s *StateStore) updateSummaryWithAlloc(index uint64, allocs []*structs.Allocation,
watcher watch.Items, txn *memdb.Txn) error {
if len(allocs) == 0 {
return nil
}
jobID := allocs[0].JobID
jobSummary, err := s.JobSummaryByID(jobID)
@@ -1339,7 +1342,8 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, allocs []*structs.Allo
if existing == nil {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", alloc.DesiredStatus)
s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v",
alloc.ID, alloc.DesiredStatus)
}
switch alloc.ClientStatus {
case structs.AllocClientStatusPending:

View File

@@ -144,6 +144,15 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
alloc2.NodeID = node.ID
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc1.ClientStatus = structs.AllocClientStatusFailed
if err := state.UpsertJobSummary(990, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(990, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(990, mock.JobSummary(alloc2.JobID)); err != nil {
t.Fatal(err)
}
alloc2.ClientStatus = structs.AllocClientStatusPending
if err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc1, alloc2}); err != nil {
@@ -1286,6 +1295,10 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
watch.Item{AllocNode: alloc1.NodeID},
watch.Item{AllocNode: alloc2.NodeID})
state.UpsertJobSummary(900, mock.JobSummary(eval1.JobID))
state.UpsertJobSummary(901, mock.JobSummary(eval2.JobID))
state.UpsertJobSummary(902, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(903, mock.JobSummary(alloc2.JobID))
err := state.UpsertEvals(1000, []*structs.Evaluation{eval1, eval2})
if err != nil {
t.Fatalf("err: %v", err)
@@ -1540,6 +1553,8 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
watch.Item{AllocJob: alloc2.JobID},
watch.Item{AllocNode: alloc2.NodeID})
state.UpsertJobSummary(900, mock.JobSummary(alloc.JobID))
state.UpsertJobSummary(901, mock.JobSummary(alloc2.JobID))
if err := state.UpsertJob(999, alloc.Job); err != nil {
t.Fatalf("err: %v", err)
}
@@ -1623,8 +1638,8 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
t.Fatalf("err: %v", err)
}
tgSummary2 := summary2.Summary["web"]
if tgSummary2.Running != 1 {
t.Fatalf("expected running: %v, actual: %v", 1, tgSummary2.Failed)
if tgSummary2.Running != 0 {
t.Fatalf("expected running: %v, actual: %v", 0, tgSummary2.Failed)
}
notify.verify(t)
@@ -1687,6 +1702,7 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) {
func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))
if err := state.UpsertJob(999, alloc.Job); err != nil {
t.Fatalf("err: %v", err)
@@ -1709,6 +1725,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
alloc2 := mock.Alloc()
alloc2.ID = alloc.ID
alloc2.NodeID = alloc.NodeID + ".new"
state.UpsertJobSummary(1001, mock.JobSummary(alloc2.JobID))
notify := setupNotifyTest(
state,
@@ -1718,7 +1735,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
watch.Item{AllocJob: alloc2.JobID},
watch.Item{AllocNode: alloc2.NodeID})
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc2})
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -1735,7 +1752,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
if out.CreateIndex != 1000 {
t.Fatalf("bad: %#v", out)
}
if out.ModifyIndex != 1001 {
if out.ModifyIndex != 1002 {
t.Fatalf("bad: %#v", out)
}
@@ -1743,7 +1760,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1001 {
if index != 1002 {
t.Fatalf("bad: %d", index)
}
@@ -1764,6 +1781,7 @@ func TestStateStore_EvictAlloc_Alloc(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
@@ -1805,6 +1823,10 @@ func TestStateStore_AllocsByNode(t *testing.T) {
allocs = append(allocs, alloc)
}
for idx, alloc := range allocs {
state.UpsertJobSummary(uint64(900+idx), mock.JobSummary(alloc.JobID))
}
err := state.UpsertAllocs(1000, allocs)
if err != nil {
t.Fatalf("err: %v", err)
@@ -1839,6 +1861,10 @@ func TestStateStore_AllocsByNodeTerminal(t *testing.T) {
allocs = append(allocs, alloc)
}
for idx, alloc := range allocs {
state.UpsertJobSummary(uint64(900+idx), mock.JobSummary(alloc.JobID))
}
err := state.UpsertAllocs(1000, allocs)
if err != nil {
t.Fatalf("err: %v", err)
@@ -1881,6 +1907,10 @@ func TestStateStore_AllocsByJob(t *testing.T) {
allocs = append(allocs, alloc)
}
for i, alloc := range allocs {
state.UpsertJobSummary(uint64(900+i), mock.JobSummary(alloc.JobID))
}
err := state.UpsertAllocs(1000, allocs)
if err != nil {
t.Fatalf("err: %v", err)
@@ -1920,6 +1950,10 @@ func TestStateStore_AllocsByIDPrefix(t *testing.T) {
allocs = append(allocs, alloc)
}
for i, alloc := range allocs {
state.UpsertJobSummary(uint64(900+i), mock.JobSummary(alloc.JobID))
}
err := state.UpsertAllocs(1000, allocs)
if err != nil {
t.Fatalf("err: %v", err)
@@ -1974,6 +2008,9 @@ func TestStateStore_Allocs(t *testing.T) {
alloc := mock.Alloc()
allocs = append(allocs, alloc)
}
for i, alloc := range allocs {
state.UpsertJobSummary(uint64(900+i), mock.JobSummary(alloc.JobID))
}
err := state.UpsertAllocs(1000, allocs)
if err != nil {
@@ -2185,6 +2222,7 @@ func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) {
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
@@ -2216,6 +2254,7 @@ func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) {
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
@@ -2333,11 +2372,8 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) {
t.Fatalf("err: %v", err)
}
job.TaskGroups[0].Count = 1
err = state.UpsertJob(1003, job)
if err != nil {
t.Fatalf("err: %v", err)
}
outA, _ := state.AllocByID(alloc3.ID)
summary, _ = state.JobSummaryByID(job.ID)
expectedSummary := structs.JobSummary{
JobID: job.ID,
@@ -2346,6 +2382,8 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) {
Starting: 3,
},
},
CreateIndex: job.CreateIndex,
ModifyIndex: outA.ModifyIndex,
}
if !reflect.DeepEqual(summary, &expectedSummary) {
t.Fatalf("expected summary: %v, actual: %v", expectedSummary, summary)
@@ -2368,6 +2406,7 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) {
if err := state.UpsertAllocs(1004, []*structs.Allocation{alloc4, alloc5}); err != nil {
t.Fatalf("err: %v", err)
}
outA, _ = state.AllocByID(alloc5.ID)
summary, _ = state.JobSummaryByID(job.ID)
expectedSummary = structs.JobSummary{
JobID: job.ID,
@@ -2377,6 +2416,8 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) {
Starting: 1,
},
},
CreateIndex: job.CreateIndex,
ModifyIndex: outA.ModifyIndex,
}
if !reflect.DeepEqual(summary, &expectedSummary) {
t.Fatalf("expected: %v, actual: %v", expectedSummary, summary)

View File

@@ -448,7 +448,7 @@ type SingleJobResponse struct {
}
// SingleJobSummary is used to return a single job summary
type SingleJobSummaryResponse struct {
type JobSummaryResponse struct {
JobSummary *JobSummary
QueryMeta
}

View File

@@ -472,9 +472,9 @@ func TestWorker_ReblockEval(t *testing.T) {
// Create the job summary
js := mock.JobSummary(eval1.JobID)
tg := js.Summary["cache"]
tg := js.Summary["web"]
tg.Queued = 100
js.Summary["cache"] = tg
js.Summary["web"] = tg
if err := s1.fsm.State().UpsertJobSummary(1001, js); err != nil {
t.Fatal(err)
}
@@ -490,7 +490,7 @@ func TestWorker_ReblockEval(t *testing.T) {
}
eval2 := evalOut.Copy()
eval2.QueuedAllocations = map[string]int{"cache": 50}
eval2.QueuedAllocations = map[string]int{"web": 50}
// Attempt to reblock eval
w := &Worker{srv: s1, logger: s1.logger, evalToken: token}

View File

@@ -149,6 +149,10 @@ func (s *SystemScheduler) process() (bool, error) {
if result != nil {
for _, allocations := range result.NodeAllocation {
for _, allocation := range allocations {
if allocation.CreateIndex != result.AllocIndex {
continue
}
if _, ok := s.queuedAllocs[allocation.TaskGroup]; ok {
s.queuedAllocs[allocation.TaskGroup] -= 1
} else {