Updated tests and comments

This commit is contained in:
Diptanu Choudhury
2016-08-03 18:08:37 -07:00
parent 2ff26b9739
commit 3bdaf91dd1
6 changed files with 98 additions and 45 deletions

View File

@@ -129,7 +129,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest))
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcilesummaries", s.wrap(s.ReconcileJobSummaries))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))
if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)

View File

@@ -25,7 +25,7 @@ func TestHTTP_SystemGarbageCollect(t *testing.T) {
func TestHTTP_ReconcileJobSummaries(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/system/reconcilesummaries", nil)
req, err := http.NewRequest("PUT", "/v1/system/reconcile/summaries", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -35,7 +35,6 @@ func TestHTTP_ReconcileJobSummaries(t *testing.T) {
if _, err := s.Server.ReconcileJobSummaries(respW, req); err != nil {
t.Fatalf("err: %v", err)
}
t.Fatalf("code %v", respW.Code)
if respW.Code != 200 {
t.Fatalf("expected: %v, actual: %v", 200, respW.Code)

View File

@@ -446,7 +446,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}
// applyReconcileSummaries reconciles summaries for all the job
// applyReconcileSummaries reconciles summaries for all the jobs
func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} {
if err := n.state.ReconcileJobSummaries(index); err != nil {
return err
@@ -592,6 +592,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
// Create Job Summaries
// COMPAT 0.4 -> 0.4.1
// We can remove this in 0.5. This exists so that the server creates job
// summaries if they were not present previously. When users upgrade to 0.5
// from 0.4.1, the snapshot will contain job summaries so it will be safe to
// remove this block.
index, err := n.state.Index("job_summary")
if err != nil {
return fmt.Errorf("couldn't fetch index of job summary table: %v", err)
@@ -624,20 +628,20 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
if err != nil {
return err
}
var jobs []*structs.Job
for {
rawJob := iter.Next()
if rawJob == nil {
break
}
jobs = append(jobs, rawJob.(*structs.Job))
}
snap, err := n.state.Snapshot()
if err != nil {
return fmt.Errorf("unable to create snapshot: %v", err)
}
for _, job := range jobs {
// Invoking the scheduler for every job so that we can populate the number
// of queued allocations for every job
for {
rawJob := iter.Next()
if rawJob == nil {
break
}
job := rawJob.(*structs.Job)
planner := &scheduler.Harness{
State: &snap.StateStore,
}

View File

@@ -991,12 +991,14 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
state.UpsertJob(1000, job1)
// make a job which can make partial progress
job2 := mock.Job()
state.UpsertJob(1010, job2)
alloc := mock.Alloc()
alloc.NodeID = node.ID
state.UpsertJob(1010, alloc.Job)
state.UpsertAllocs(1011, []*structs.Allocation{alloc})
// Delete the summaries
state.DeleteJobSummary(1030, job1.ID)
state.DeleteJobSummary(1040, job2.ID)
state.DeleteJobSummary(1040, alloc.Job.ID)
// Delete the index
if err := state.RemoveIndex("job_summary"); err != nil {
@@ -1022,13 +1024,16 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out1)
}
out2, _ := state2.JobSummaryByID(job2.ID)
// This exercises the code path which adds the allocations made by the
// planner and the number of unplaced allocations in the reconcile summaries
// codepath
out2, _ := state2.JobSummaryByID(alloc.Job.ID)
expected = structs.JobSummary{
JobID: job2.ID,
JobID: alloc.Job.ID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Queued: 10,
Starting: 0,
Starting: 1,
},
},
CreateIndex: latestIndex,

View File

@@ -1209,29 +1209,12 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
if err != nil {
return err
}
var jobs []*structs.Job
for {
rawJob := iter.Next()
if rawJob == nil {
break
}
jobs = append(jobs, rawJob.(*structs.Job))
}
for _, job := range jobs {
// Find all the allocations for the jobs
var allocs []*structs.Allocation
iter, err = txn.Get("allocs", "job", job.ID)
if err != nil {
return err
}
for {
rawAlloc := iter.Next()
if rawAlloc == nil {
break
}
allocs = append(allocs, rawAlloc.(*structs.Allocation))
}
job := rawJob.(*structs.Job)
// Create a job summary for the job
summary := structs.JobSummary{
@@ -1241,8 +1224,20 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
for _, tg := range job.TaskGroups {
summary.Summary[tg.Name] = structs.TaskGroupSummary{}
}
// Find all the allocations for the jobs
iterAllocs, err := txn.Get("allocs", "job", job.ID)
if err != nil {
return err
}
// Calculate the summary for the job
for _, alloc := range allocs {
for {
rawAlloc := iterAllocs.Next()
if rawAlloc == nil {
break
}
alloc := rawAlloc.(*structs.Allocation)
tg := summary.Summary[alloc.TaskGroup]
switch alloc.ClientStatus {
case structs.AllocClientStatusFailed:

View File

@@ -2058,17 +2058,61 @@ func TestStateStore_JobSummary(t *testing.T) {
func TestStateStore_ReconcileJobSummary(t *testing.T) {
state := testStateStore(t)
// Create an alloc
alloc := mock.Alloc()
// Add another task group to the job
tg2 := alloc.Job.TaskGroups[0].Copy()
tg2.Name = "db"
alloc.Job.TaskGroups = append(alloc.Job.TaskGroups, tg2)
state.UpsertJob(100, alloc.Job)
alloc1 := mock.Alloc()
alloc1.JobID = alloc.Job.ID
alloc1.Job = alloc.Job
state.UpsertAllocs(110, []*structs.Allocation{alloc, alloc1})
// Create one more alloc for the db task group
alloc2 := mock.Alloc()
alloc2.TaskGroup = "db"
alloc2.JobID = alloc.JobID
alloc2.Job = alloc.Job
alloc2 := alloc1.Copy()
alloc2.ClientStatus = structs.AllocClientStatusRunning
state.UpdateAllocsFromClient(120, []*structs.Allocation{alloc2})
// Upserts the alloc
state.UpsertAllocs(110, []*structs.Allocation{alloc, alloc2})
// Change the state of the first alloc to running
alloc3 := alloc.Copy()
alloc3.ClientStatus = structs.AllocClientStatusRunning
state.UpdateAllocsFromClient(120, []*structs.Allocation{alloc3})
//Add some more allocs to the second tg
alloc4 := mock.Alloc()
alloc4.JobID = alloc.JobID
alloc4.Job = alloc.Job
alloc4.TaskGroup = "db"
alloc5 := alloc4.Copy()
alloc5.ClientStatus = structs.AllocClientStatusRunning
alloc6 := mock.Alloc()
alloc6.JobID = alloc.JobID
alloc6.Job = alloc.Job
alloc6.TaskGroup = "db"
alloc7 := alloc6.Copy()
alloc7.ClientStatus = structs.AllocClientStatusComplete
alloc8 := mock.Alloc()
alloc8.JobID = alloc.JobID
alloc8.Job = alloc.Job
alloc8.TaskGroup = "db"
alloc9 := alloc8.Copy()
alloc9.ClientStatus = structs.AllocClientStatusFailed
alloc10 := mock.Alloc()
alloc10.JobID = alloc.JobID
alloc10.Job = alloc.Job
alloc10.TaskGroup = "db"
alloc11 := alloc10.Copy()
alloc11.ClientStatus = structs.AllocClientStatusLost
state.UpsertAllocs(130, []*structs.Allocation{alloc4, alloc6, alloc8, alloc10})
state.UpdateAllocsFromClient(150, []*structs.Allocation{alloc5, alloc7, alloc9, alloc11})
// DeleteJobSummary is a helper method and doesn't modify the indexes table
state.DeleteJobSummary(130, alloc.Job.ID)
@@ -2080,8 +2124,14 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) {
JobID: alloc.Job.ID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Running: 1,
},
"db": structs.TaskGroupSummary{
Starting: 1,
Running: 1,
Failed: 1,
Complete: 1,
Lost: 1,
},
},
CreateIndex: 120,