Merge pull request #2163 from hashicorp/b-summary

Job Summary: Fix queued accounting and remove in-place state store updates
This commit is contained in:
Alex Dadgar
2017-01-11 13:36:36 -08:00
committed by GitHub
5 changed files with 76 additions and 38 deletions

View File

@@ -88,6 +88,7 @@ type Allocation struct {
PreviousAllocation string
CreateIndex uint64
ModifyIndex uint64
AllocModifyIndex uint64
CreateTime int64
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"log"
"reflect"
"time"
"github.com/armon/go-metrics"
@@ -728,10 +729,11 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
}
// Get the job summary from the fsm state store
summary, err := n.state.JobSummaryByID(job.ID)
originalSummary, err := n.state.JobSummaryByID(job.ID)
if err != nil {
return err
}
summary := originalSummary.Copy()
// Add the allocations scheduler has made to queued since these
// allocations are never getting placed until the scheduler is invoked
@@ -760,12 +762,19 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
if !ok {
return fmt.Errorf("task group %q not found while updating queued count", tg)
}
// We add instead of setting here because we want to take into
// consideration what the scheduler with a mock planner thinks it
// placed. Those should be counted as queued as well
tgSummary.Queued += queued
summary.Summary[tg] = tgSummary
}
if err := n.state.UpsertJobSummary(index, summary); err != nil {
return err
if !reflect.DeepEqual(summary, originalSummary) {
summary.ModifyIndex = index
if err := n.state.UpsertJobSummary(index, summary); err != nil {
return err
}
}
}
return nil
@@ -1002,7 +1011,7 @@ func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink,
break
}
jobSummary := raw.(structs.JobSummary)
jobSummary := raw.(*structs.JobSummary)
sink.Write([]byte{byte(JobSummarySnapshot)})
if err := encoder.Encode(jobSummary); err != nil {

View File

@@ -91,7 +91,7 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma
defer txn.Abort()
// Update the index
if err := txn.Insert("job_summary", *jobSummary); err != nil {
if err := txn.Insert("job_summary", jobSummary); err != nil {
return err
}
@@ -410,31 +410,39 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
// Only continue if the summary exists. It could not exist if the parent
// job was removed
if summaryRaw != nil {
existing := summaryRaw.(structs.JobSummary)
existing := summaryRaw.(*structs.JobSummary)
pSummary := existing.Copy()
if pSummary.Children != nil {
modified := false
switch job.Status {
case structs.JobStatusPending:
pSummary.Children.Pending--
pSummary.Children.Dead++
modified = true
case structs.JobStatusRunning:
pSummary.Children.Running--
pSummary.Children.Dead++
modified = true
case structs.JobStatusDead:
default:
return fmt.Errorf("unknown old job status %q", job.Status)
}
watcher.Add(watch.Item{Table: "job_summary"})
watcher.Add(watch.Item{JobSummary: job.ParentID})
if modified {
// Update the modify index
pSummary.ModifyIndex = index
// Insert the summary
if err := txn.Insert("job_summary", *pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
watcher.Add(watch.Item{Table: "job_summary"})
watcher.Add(watch.Item{JobSummary: job.ParentID})
// Insert the summary
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}
}
}
@@ -545,8 +553,8 @@ func (s *StateStore) JobSummaryByID(jobID string) (*structs.JobSummary, error) {
return nil, err
}
if existing != nil {
summary := existing.(structs.JobSummary)
return summary.Copy(), nil
summary := existing.(*structs.JobSummary)
return summary, nil
}
return nil, nil
@@ -725,8 +733,8 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index
return fmt.Errorf("job summary lookup failed: %v", err)
}
if summaryRaw != nil {
js := summaryRaw.(structs.JobSummary)
var hasSummaryChanged bool
js := summaryRaw.(*structs.JobSummary).Copy()
hasSummaryChanged := false
for tg, num := range eval.QueuedAllocations {
if summary, ok := js.Summary[tg]; ok {
if summary.Queued != num {
@@ -1420,7 +1428,7 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
job := rawJob.(*structs.Job)
// Create a job summary for the job
summary := structs.JobSummary{
summary := &structs.JobSummary{
JobID: job.ID,
Summary: make(map[string]structs.TaskGroupSummary),
}
@@ -1565,7 +1573,7 @@ func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.
// Only continue if the summary exists. It could not exist if the parent
// job was removed
if summaryRaw != nil {
existing := summaryRaw.(structs.JobSummary)
existing := summaryRaw.(*structs.JobSummary)
pSummary := existing.Copy()
if pSummary.Children == nil {
pSummary.Children = new(structs.JobChildrenSummary)
@@ -1607,7 +1615,7 @@ func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.
watcher.Add(watch.Item{JobSummary: updated.ParentID})
// Insert the summary
if err := txn.Insert("job_summary", *pSummary); err != nil {
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
@@ -1667,13 +1675,19 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
watcher watch.Items, txn *memdb.Txn) error {
existing, err := s.JobSummaryByID(job.ID)
// Update the job summary
summaryRaw, err := txn.First("job_summary", "id", job.ID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for job: %v", err)
return fmt.Errorf("job summary lookup failed: %v", err)
}
var hasSummaryChanged bool
if existing == nil {
existing = &structs.JobSummary{
// Get the summary or create if necessary
var summary *structs.JobSummary
hasSummaryChanged := false
if summaryRaw != nil {
summary = summaryRaw.(*structs.JobSummary).Copy()
} else {
summary = &structs.JobSummary{
JobID: job.ID,
Summary: make(map[string]structs.TaskGroupSummary),
Children: new(structs.JobChildrenSummary),
@@ -1681,15 +1695,16 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
}
hasSummaryChanged = true
}
for _, tg := range job.TaskGroups {
if _, ok := existing.Summary[tg.Name]; !ok {
if _, ok := summary.Summary[tg.Name]; !ok {
newSummary := structs.TaskGroupSummary{
Complete: 0,
Failed: 0,
Running: 0,
Starting: 0,
}
existing.Summary[tg.Name] = newSummary
summary.Summary[tg.Name] = newSummary
hasSummaryChanged = true
}
}
@@ -1697,7 +1712,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
// The job summary has changed, so add to watcher and update the modify
// index.
if hasSummaryChanged {
existing.ModifyIndex = index
summary.ModifyIndex = index
watcher.Add(watch.Item{Table: "job_summary"})
watcher.Add(watch.Item{JobSummary: job.ID})
@@ -1705,7 +1720,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
if err := txn.Insert("job_summary", *existing); err != nil {
if err := txn.Insert("job_summary", summary); err != nil {
return err
}
}
@@ -1727,6 +1742,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
if err != nil {
return fmt.Errorf("unable to lookup job summary for job id %q: %v", err)
}
if summaryRaw == nil {
// Check if the job is de-registered
rawJob, err := txn.First("jobs", "id", alloc.JobID)
@@ -1738,10 +1754,12 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
if rawJob == nil {
return nil
}
return fmt.Errorf("job summary for job %q is not present", alloc.JobID)
}
summary := summaryRaw.(structs.JobSummary)
jobSummary := summary.Copy()
// Get a copy of the existing summary
jobSummary := summaryRaw.(*structs.JobSummary).Copy()
// Not updating the job summary because the allocation doesn't belong to the
// currently registered job
@@ -1753,7 +1771,8 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
if !ok {
return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup)
}
var summaryChanged bool
summaryChanged := false
if existingAlloc == nil {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
@@ -1814,7 +1833,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
return fmt.Errorf("index update failed: %v", err)
}
if err := txn.Insert("job_summary", *jobSummary); err != nil {
if err := txn.Insert("job_summary", jobSummary); err != nil {
return fmt.Errorf("updating job summary failed: %v", err)
}
}
@@ -1951,7 +1970,7 @@ func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) err
func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error {
r.items.Add(watch.Item{Table: "job_summary"})
r.items.Add(watch.Item{JobSummary: jobSummary.JobID})
if err := r.txn.Insert("job_summary", *jobSummary); err != nil {
if err := r.txn.Insert("job_summary", jobSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
return nil

View File

@@ -684,8 +684,14 @@ func adjustQueuedAllocations(logger *log.Logger, result *structs.PlanResult, que
if result != nil {
for _, allocations := range result.NodeAllocation {
for _, allocation := range allocations {
// Ensure that the allocation is newly created
if allocation.CreateIndex != result.AllocIndex {
// Ensure that the allocation is newly created. We check that
// the CreateIndex is equal to the ModifyIndex in order to check
// that the allocation was just created. We do not check that
// the CreateIndex is equal to the results AllocIndex because
// the allocations we get back have gone through the planner's
// optimistic snapshot and thus their indexes may not be
// correct, but they will be consistent.
if allocation.CreateIndex != allocation.ModifyIndex {
continue
}

View File

@@ -1055,10 +1055,13 @@ func TestUtil_AdjustQueuedAllocations(t *testing.T) {
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc2.CreateIndex = 4
alloc2.ModifyIndex = 4
alloc3 := mock.Alloc()
alloc3.CreateIndex = 3
alloc3.ModifyIndex = 5
alloc4 := mock.Alloc()
alloc4.CreateIndex = 6
alloc4.ModifyIndex = 8
planResult := structs.PlanResult{
NodeUpdate: map[string][]*structs.Allocation{
@@ -1073,7 +1076,7 @@ func TestUtil_AdjustQueuedAllocations(t *testing.T) {
},
},
RefreshIndex: 3,
AllocIndex: 4,
AllocIndex: 16, // Should not be considered
}
queuedAllocs := map[string]int{"web": 2}