Updating the job summary while mutating jobs and allocation objects

This commit is contained in:
Diptanu Choudhury
2016-07-20 14:09:03 -07:00
parent 36abb97d87
commit 0347f60cfc
6 changed files with 167 additions and 107 deletions

View File

@@ -239,11 +239,6 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
return err
}
if err := n.state.UpdateSummaryWithJob(req.Job, index); err != nil {
n.logger.Printf("[ERR] nomad.fsm: Updating job summary failed: %v", err)
return err
}
// We always add the job to the periodic dispatcher because there is the
// possibility that the periodic spec was removed and then we should stop
// tracking it.
@@ -398,10 +393,6 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
}
}
if err := n.updateJobSummary(index, req.Alloc); err != nil {
return err
}
if err := n.state.UpsertAllocs(index, req.Alloc); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertAllocs failed: %v", err)
return err
@@ -419,8 +410,11 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}
if err := n.updateJobSummary(index, req.Alloc); err != nil {
return err
for _, alloc := range req.Alloc {
if existing, _ := n.state.AllocByID(alloc.ID); existing != nil {
alloc.JobID = existing.JobID
alloc.TaskGroup = existing.TaskGroup
}
}
// Update all the client allocations
@@ -448,20 +442,6 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}
func (n *nomadFSM) updateJobSummary(index uint64, allocations []*structs.Allocation) error {
for _, alloc := range allocations {
existingAlloc, err := n.state.AllocByID(alloc.ID)
if err != nil {
return fmt.Errorf("unable to get allocation from state store: %v", err)
}
if err := n.state.UpdateSummaryWithAlloc(alloc, existingAlloc, index); err != nil {
return err
}
}
return nil
}
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()

View File

@@ -231,6 +231,19 @@ func Eval() *structs.Evaluation {
return eval
}
func JobSummary(jobID string) *structs.JobSummary {
js := &structs.JobSummary{
JobID: jobID,
Summary: map[string]structs.TaskGroupSummary{
"cache": {
Queued: 5,
Starting: 1,
},
},
}
return js
}
func Alloc() *structs.Allocation {
alloc := &structs.Allocation{
ID: structs.GenerateUUID(),

View File

@@ -85,6 +85,19 @@ func (s *StateStore) StopWatch(items watch.Items, notify chan struct{}) {
s.watch.stopWatch(items, notify)
}
// UpsertJobSummary upserts a job summary into the state store. This is for
// testing purposes
func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error {
txn := s.db.Txn(true)
defer txn.Abort()
if err := txn.Insert("job_summary", *jobSummary); err != nil {
return err
}
txn.Commit()
return nil
}
// UpsertNode is used to register a node or update a node definition
// This is assumed to be triggered by the client, so we retain the value
// of drain which is set by the scheduler.
@@ -336,6 +349,8 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
}
}
s.updateSummaryWithJob(index, job, watcher, txn)
// Insert the job
if err := txn.Insert("jobs", job); err != nil {
return fmt.Errorf("job insert failed: %v", err)
@@ -474,7 +489,8 @@ func (s *StateStore) JobSummaryByID(jobID string) (*structs.JobSummary, error) {
return nil, err
}
if existing != nil {
return existing.(*structs.JobSummary), nil
summary := existing.(structs.JobSummary)
return summary.Copy(), nil
}
return nil, nil
@@ -652,7 +668,7 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
return fmt.Errorf("job summary lookup failed: %v", err)
}
if summaryRaw != nil {
js := summaryRaw.(*structs.JobSummary)
js := summaryRaw.(structs.JobSummary)
var hasSummaryChanged bool
for tg, num := range eval.QueuedAllocations {
if summary, ok := js.Summary[tg]; ok {
@@ -820,6 +836,10 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "allocs"})
if err := s.updateSummaryWithAlloc(index, allocs, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
// Handle each of the updated allocations
for _, alloc := range allocs {
if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil {
@@ -895,6 +915,10 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "allocs"})
if err := s.updateSummaryWithAlloc(index, allocs, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
// Handle the allocations
jobs := make(map[string]string, 1)
for _, alloc := range allocs {
@@ -1237,11 +1261,8 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
// updateSummaryWithJob creates or updates job summaries when new jobs are
// upserted or existing ones are updated
func (s *StateStore) UpdateSummaryWithJob(job *structs.Job, index uint64) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
watcher watch.Items, txn *memdb.Txn) error {
existing, err := s.JobSummaryByID(job.ID)
if err != nil {
@@ -1282,107 +1303,101 @@ func (s *StateStore) UpdateSummaryWithJob(job *structs.Job, index uint64) error
}
}
if err := txn.Insert("job_summary", existing); err != nil {
if err := txn.Insert("job_summary", *existing); err != nil {
return err
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
// updateSummaryWithAlloc updates the job summary when allocations are updated
// or inserted
func (s *StateStore) UpdateSummaryWithAlloc(newAlloc *structs.Allocation,
existingAlloc *structs.Allocation, index uint64) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
func (s *StateStore) updateSummaryWithAlloc(index uint64, allocs []*structs.Allocation,
watcher watch.Items, txn *memdb.Txn) error {
jobID := newAlloc.JobID
taskGroup := newAlloc.TaskGroup
if existingAlloc != nil {
jobID = existingAlloc.JobID
taskGroup = existingAlloc.TaskGroup
}
existing, err := s.JobSummaryByID(jobID)
jobID := allocs[0].JobID
jobSummary, err := s.JobSummaryByID(jobID)
if err != nil {
return fmt.Errorf("lookup of job summary failed: %v", err)
return fmt.Errorf("unable to look up job summary: %v", err)
}
if existing == nil {
return fmt.Errorf("unable to find job summary")
if jobSummary == nil {
return fmt.Errorf("job summary not found")
}
currentJSModifyIndex := jobSummary.ModifyIndex
tgSummary, ok := existing.Summary[taskGroup]
if !ok {
return fmt.Errorf("unable to find task group in the job summary: %v", taskGroup)
}
var hasSummaryChanged bool
if existingAlloc == nil {
switch newAlloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", newAlloc.DesiredStatus)
}
switch newAlloc.ClientStatus {
case structs.AllocClientStatusPending:
tgSummary.Starting += 1
hasSummaryChanged = true
case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", newAlloc.ClientStatus)
}
} else if existingAlloc.ClientStatus != newAlloc.ClientStatus {
// Incrementing the client of the bin of the current state
switch newAlloc.ClientStatus {
case structs.AllocClientStatusRunning:
tgSummary.Running += 1
case structs.AllocClientStatusFailed:
tgSummary.Failed += 1
case structs.AllocClientStatusPending:
tgSummary.Starting += 1
case structs.AllocClientStatusComplete:
tgSummary.Complete += 1
case structs.AllocClientStatusLost:
tgSummary.Lost += 1
for _, alloc := range allocs {
// Look for existing alloc
existing, err := s.AllocByID(alloc.ID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
// Decrementing the count of the bin of the last state
switch existingAlloc.ClientStatus {
case structs.AllocClientStatusRunning:
tgSummary.Running -= 1
case structs.AllocClientStatusFailed:
tgSummary.Failed -= 1
case structs.AllocClientStatusPending:
tgSummary.Starting -= 1
case structs.AllocClientStatusComplete:
tgSummary.Complete -= 1
case structs.AllocClientStatusLost:
tgSummary.Lost -= 1
tgSummary, ok := jobSummary.Summary[alloc.TaskGroup]
if !ok {
return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup)
}
if existing == nil {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", alloc.DesiredStatus)
}
switch alloc.ClientStatus {
case structs.AllocClientStatusPending:
tgSummary.Starting += 1
if tgSummary.Queued > 0 {
tgSummary.Queued -= 1
}
jobSummary.ModifyIndex = index
case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", alloc.ClientStatus)
}
} else if existing.ClientStatus != alloc.ClientStatus {
// Incrementing the client of the bin of the current state
switch alloc.ClientStatus {
case structs.AllocClientStatusRunning:
tgSummary.Running += 1
case structs.AllocClientStatusFailed:
tgSummary.Failed += 1
case structs.AllocClientStatusPending:
tgSummary.Starting += 1
case structs.AllocClientStatusComplete:
tgSummary.Complete += 1
case structs.AllocClientStatusLost:
tgSummary.Lost += 1
}
hasSummaryChanged = true
// Decrementing the count of the bin of the last state
switch existing.ClientStatus {
case structs.AllocClientStatusRunning:
tgSummary.Running -= 1
case structs.AllocClientStatusFailed:
tgSummary.Failed -= 1
case structs.AllocClientStatusPending:
tgSummary.Starting -= 1
case structs.AllocClientStatusComplete:
tgSummary.Complete -= 1
case structs.AllocClientStatusLost:
tgSummary.Lost -= 1
}
jobSummary.ModifyIndex = index
}
jobSummary.Summary[alloc.TaskGroup] = tgSummary
}
// The job summary has changed, so add to watcher and update the modify
// index.
if hasSummaryChanged {
existing.ModifyIndex = index
if currentJSModifyIndex < jobSummary.ModifyIndex {
watcher.Add(watch.Item{Table: "job_summary"})
watcher.Add(watch.Item{JobSummary: existing.JobID})
watcher.Add(watch.Item{JobSummary: jobID})
// Update the indexes table for job summary
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
if err := txn.Insert("job_summary", *jobSummary); err != nil {
return fmt.Errorf("updating job summary failed: %v", err)
}
}
existing.Summary[taskGroup] = tgSummary
if err := txn.Insert("job_summary", existing); err != nil {
return fmt.Errorf("inserting job summary failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
@@ -1474,7 +1489,7 @@ func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) err
// JobSummaryRestore is used to restore a job summary
func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error {
if err := r.txn.Insert("job_summary", jobSummary); err != nil {
if err := r.txn.Insert("job_summary", *jobSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
return nil

View File

@@ -982,6 +982,18 @@ type JobSummary struct {
ModifyIndex uint64
}
// Copy returns a new copy of JobSummary
func (js *JobSummary) Copy() *JobSummary {
newJobSummary := new(JobSummary)
*newJobSummary = *js
newTGSummary := make(map[string]TaskGroupSummary, len(js.Summary))
for k, v := range js.Summary {
newTGSummary[k] = v
}
newJobSummary.Summary = newTGSummary
return newJobSummary
}
// TaskGroup summarizes the state of all the allocations of a particular
// TaskGroup
type TaskGroupSummary struct {

View File

@@ -430,6 +430,26 @@ func (w *Worker) ReblockEval(eval *structs.Evaluation) error {
}
defer metrics.MeasureSince([]string{"nomad", "worker", "reblock_eval"}, time.Now())
// Update the evaluation if the queued jobs is not same as what is
// recorded in the job summary
summary, err := w.srv.fsm.state.JobSummaryByID(eval.JobID)
if err != nil {
return fmt.Errorf("coultn't retreive job summary: %v", err)
}
if summary != nil {
var hasChanged bool
for tg, summary := range summary.Summary {
if queued, ok := eval.QueuedAllocations[tg]; ok {
hasChanged = (queued != summary.Queued)
}
}
if hasChanged {
if err := w.UpdateEval(eval); err != nil {
return err
}
}
}
// Store the snapshot index in the eval
eval.SnapshotIndex = w.snapshotIndex

View File

@@ -463,12 +463,22 @@ func TestWorker_ReblockEval(t *testing.T) {
// Create the blocked eval
eval1 := mock.Eval()
eval1.Status = structs.EvalStatusBlocked
eval1.QueuedAllocations = map[string]int{"cache": 100}
// Insert it into the state store
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil {
t.Fatal(err)
}
// Create the job summary
js := mock.JobSummary(eval1.JobID)
tg := js.Summary["cache"]
tg.Queued = 100
js.Summary["cache"] = tg
if err := s1.fsm.State().UpsertJobSummary(1001, js); err != nil {
t.Fatal(err)
}
// Enqueue the eval and then dequeue
s1.evalBroker.Enqueue(eval1)
evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
@@ -480,6 +490,7 @@ func TestWorker_ReblockEval(t *testing.T) {
}
eval2 := evalOut.Copy()
eval2.QueuedAllocations = map[string]int{"cache": 50}
// Attempt to reblock eval
w := &Worker{srv: s1, logger: s1.logger, evalToken: token}
@@ -497,6 +508,15 @@ func TestWorker_ReblockEval(t *testing.T) {
t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker: %#v", bStats)
}
// Check that the eval was updated
eval, err := s1.fsm.State().EvalByID(eval2.ID)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(eval.QueuedAllocations, eval2.QueuedAllocations) {
t.Fatalf("expected: %#v, actual: %#v", eval2.QueuedAllocations, eval.QueuedAllocations)
}
// Check that the snapshot index was set properly by unblocking the eval and
// then dequeuing.
s1.blockedEvals.Unblock("foobar", 1000)