diff --git a/nomad/fsm.go b/nomad/fsm.go index 6b2c28a6f..d317f6058 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -239,11 +239,6 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { return err } - if err := n.state.UpdateSummaryWithJob(req.Job, index); err != nil { - n.logger.Printf("[ERR] nomad.fsm: Updating job summary failed: %v", err) - return err - } - // We always add the job to the periodic dispatcher because there is the // possibility that the periodic spec was removed and then we should stop // tracking it. @@ -398,10 +393,6 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { } } - if err := n.updateJobSummary(index, req.Alloc); err != nil { - return err - } - if err := n.state.UpsertAllocs(index, req.Alloc); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpsertAllocs failed: %v", err) return err @@ -419,8 +410,11 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } - if err := n.updateJobSummary(index, req.Alloc); err != nil { - return err + for _, alloc := range req.Alloc { + if existing, _ := n.state.AllocByID(alloc.ID); existing != nil { + alloc.JobID = existing.JobID + alloc.TaskGroup = existing.TaskGroup + } } // Update all the client allocations @@ -448,20 +442,6 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } -func (n *nomadFSM) updateJobSummary(index uint64, allocations []*structs.Allocation) error { - for _, alloc := range allocations { - existingAlloc, err := n.state.AllocByID(alloc.ID) - if err != nil { - return fmt.Errorf("unable to get allocation from state store: %v", err) - } - if err := n.state.UpdateSummaryWithAlloc(alloc, existingAlloc, index); err != nil { - return err - } - } - - return nil -} - func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 7c9b4fb46..9fa38fe99 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -231,6 +231,19 @@ func Eval() *structs.Evaluation { return eval } +func JobSummary(jobID string) *structs.JobSummary { + js := &structs.JobSummary{ + JobID: jobID, + Summary: map[string]structs.TaskGroupSummary{ + "cache": { + Queued: 5, + Starting: 1, + }, + }, + } + return js +} + func Alloc() *structs.Allocation { alloc := &structs.Allocation{ ID: structs.GenerateUUID(), diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 2ab39dc27..e97a23e14 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -85,6 +85,19 @@ func (s *StateStore) StopWatch(items watch.Items, notify chan struct{}) { s.watch.stopWatch(items, notify) } +// UpsertJobSummary upserts a job summary into the state store. This is for +// testing purposes +func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error { + txn := s.db.Txn(true) + defer txn.Abort() + + if err := txn.Insert("job_summary", *jobSummary); err != nil { + return err + } + txn.Commit() + return nil +} + // UpsertNode is used to register a node or update a node definition // This is assumed to be triggered by the client, so we retain the value // of drain which is set by the scheduler. @@ -336,6 +349,8 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { } } + s.updateSummaryWithJob(index, job, watcher, txn) + // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -474,7 +489,8 @@ func (s *StateStore) JobSummaryByID(jobID string) (*structs.JobSummary, error) { return nil, err } if existing != nil { - return existing.(*structs.JobSummary), nil + summary := existing.(structs.JobSummary) + return summary.Copy(), nil } return nil, nil @@ -652,7 +668,7 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct return fmt.Errorf("job summary lookup failed: %v", err) } if summaryRaw != nil { - js := summaryRaw.(*structs.JobSummary) + js := summaryRaw.(structs.JobSummary) var hasSummaryChanged bool for tg, num := range eval.QueuedAllocations { if summary, ok := js.Summary[tg]; ok { @@ -820,6 +836,10 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo watcher := watch.NewItems() watcher.Add(watch.Item{Table: "allocs"}) + if err := s.updateSummaryWithAlloc(index, allocs, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } + // Handle each of the updated allocations for _, alloc := range allocs { if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil { @@ -895,6 +915,10 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er watcher := watch.NewItems() watcher.Add(watch.Item{Table: "allocs"}) + if err := s.updateSummaryWithAlloc(index, allocs, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } + // Handle the allocations jobs := make(map[string]string, 1) for _, alloc := range allocs { @@ -1237,11 +1261,8 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b // updateSummaryWithJob creates or updates job summaries when new jobs are // upserted or existing ones are updated -func (s *StateStore) UpdateSummaryWithJob(job *structs.Job, index uint64) error { - txn := s.db.Txn(true) - defer txn.Abort() - - watcher := watch.NewItems() +func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, + watcher watch.Items, txn *memdb.Txn) error { existing, err := s.JobSummaryByID(job.ID) if err != nil { @@ -1282,107 +1303,101 @@ func (s *StateStore) UpdateSummaryWithJob(job *structs.Job, index uint64) error } } - if err := txn.Insert("job_summary", existing); err != nil { + if err := txn.Insert("job_summary", *existing); err != nil { return err } - txn.Defer(func() { s.watch.notify(watcher) }) - txn.Commit() return nil } // updateSummaryWithAlloc updates the job summary when allocations are updated // or inserted -func (s *StateStore) UpdateSummaryWithAlloc(newAlloc *structs.Allocation, - existingAlloc *structs.Allocation, index uint64) error { - txn := s.db.Txn(true) - defer txn.Abort() - watcher := watch.NewItems() +func (s *StateStore) updateSummaryWithAlloc(index uint64, allocs []*structs.Allocation, + watcher watch.Items, txn *memdb.Txn) error { - jobID := newAlloc.JobID - taskGroup := newAlloc.TaskGroup - if existingAlloc != nil { - jobID = existingAlloc.JobID - taskGroup = existingAlloc.TaskGroup - } - - existing, err := s.JobSummaryByID(jobID) + jobID := allocs[0].JobID + jobSummary, err := s.JobSummaryByID(jobID) if err != nil { - return fmt.Errorf("lookup of job summary failed: %v", err) + return fmt.Errorf("unable to look up job summary: %v", err) } - if existing == nil { - return fmt.Errorf("unable to find job summary") + if jobSummary == nil { + return fmt.Errorf("job summary not found") } + currentJSModifyIndex := jobSummary.ModifyIndex - tgSummary, ok := existing.Summary[taskGroup] - if !ok { - return fmt.Errorf("unable to find task group in the job summary: %v", taskGroup) - } - var hasSummaryChanged bool - if existingAlloc == nil { - switch newAlloc.DesiredStatus { - case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: - s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", newAlloc.DesiredStatus) - } - switch newAlloc.ClientStatus { - case structs.AllocClientStatusPending: - tgSummary.Starting += 1 - hasSummaryChanged = true - case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: - s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", newAlloc.ClientStatus) - } - } else if existingAlloc.ClientStatus != newAlloc.ClientStatus { - // Incrementing the client of the bin of the current state - switch newAlloc.ClientStatus { - case structs.AllocClientStatusRunning: - tgSummary.Running += 1 - case structs.AllocClientStatusFailed: - tgSummary.Failed += 1 - case structs.AllocClientStatusPending: - tgSummary.Starting += 1 - case structs.AllocClientStatusComplete: - tgSummary.Complete += 1 - case structs.AllocClientStatusLost: - tgSummary.Lost += 1 + for _, alloc := range allocs { + // Look for existing alloc + existing, err := s.AllocByID(alloc.ID) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) } - // Decrementing the count of the bin of the last state - switch existingAlloc.ClientStatus { - case structs.AllocClientStatusRunning: - tgSummary.Running -= 1 - case structs.AllocClientStatusFailed: - tgSummary.Failed -= 1 - case structs.AllocClientStatusPending: - tgSummary.Starting -= 1 - case structs.AllocClientStatusComplete: - tgSummary.Complete -= 1 - case structs.AllocClientStatusLost: - tgSummary.Lost -= 1 + tgSummary, ok := jobSummary.Summary[alloc.TaskGroup] + if !ok { + return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup) } + if existing == nil { + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: + s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", alloc.DesiredStatus) + } + switch alloc.ClientStatus { + case structs.AllocClientStatusPending: + tgSummary.Starting += 1 + if tgSummary.Queued > 0 { + tgSummary.Queued -= 1 + } + jobSummary.ModifyIndex = index + case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: + s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", alloc.ClientStatus) + } + } else if existing.ClientStatus != alloc.ClientStatus { + // Incrementing the client of the bin of the current state + switch alloc.ClientStatus { + case structs.AllocClientStatusRunning: + tgSummary.Running += 1 + case structs.AllocClientStatusFailed: + tgSummary.Failed += 1 + case structs.AllocClientStatusPending: + tgSummary.Starting += 1 + case structs.AllocClientStatusComplete: + tgSummary.Complete += 1 + case structs.AllocClientStatusLost: + tgSummary.Lost += 1 + } - hasSummaryChanged = true + // Decrementing the count of the bin of the last state + switch existing.ClientStatus { + case structs.AllocClientStatusRunning: + tgSummary.Running -= 1 + case structs.AllocClientStatusFailed: + tgSummary.Failed -= 1 + case structs.AllocClientStatusPending: + tgSummary.Starting -= 1 + case structs.AllocClientStatusComplete: + tgSummary.Complete -= 1 + case structs.AllocClientStatusLost: + tgSummary.Lost -= 1 + } + jobSummary.ModifyIndex = index + } + jobSummary.Summary[alloc.TaskGroup] = tgSummary } - // The job summary has changed, so add to watcher and update the modify - // index. - if hasSummaryChanged { - existing.ModifyIndex = index + if currentJSModifyIndex < jobSummary.ModifyIndex { watcher.Add(watch.Item{Table: "job_summary"}) - watcher.Add(watch.Item{JobSummary: existing.JobID}) + watcher.Add(watch.Item{JobSummary: jobID}) // Update the indexes table for job summary if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } + + if err := txn.Insert("job_summary", *jobSummary); err != nil { + return fmt.Errorf("updating job summary failed: %v", err) + } } - existing.Summary[taskGroup] = tgSummary - if err := txn.Insert("job_summary", existing); err != nil { - return fmt.Errorf("inserting job summary failed: %v", err) - } - - txn.Defer(func() { s.watch.notify(watcher) }) - txn.Commit() return nil } @@ -1474,7 +1489,7 @@ func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) err // JobSummaryRestore is used to restore a job summary func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error { - if err := r.txn.Insert("job_summary", jobSummary); err != nil { + if err := r.txn.Insert("job_summary", *jobSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) } return nil diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 90bbe7956..8e0bf6fc1 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -982,6 +982,18 @@ type JobSummary struct { ModifyIndex uint64 } +// Copy returns a new copy of JobSummary +func (js *JobSummary) Copy() *JobSummary { + newJobSummary := new(JobSummary) + *newJobSummary = *js + newTGSummary := make(map[string]TaskGroupSummary, len(js.Summary)) + for k, v := range js.Summary { + newTGSummary[k] = v + } + newJobSummary.Summary = newTGSummary + return newJobSummary +} + // TaskGroup summarizes the state of all the allocations of a particular // TaskGroup type TaskGroupSummary struct { diff --git a/nomad/worker.go b/nomad/worker.go index 775c4b5c2..cf596febd 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -430,6 +430,26 @@ func (w *Worker) ReblockEval(eval *structs.Evaluation) error { } defer metrics.MeasureSince([]string{"nomad", "worker", "reblock_eval"}, time.Now()) + // Update the evaluation if the queued jobs is not same as what is + // recorded in the job summary + summary, err := w.srv.fsm.state.JobSummaryByID(eval.JobID) + if err != nil { + return fmt.Errorf("coultn't retreive job summary: %v", err) + } + if summary != nil { + var hasChanged bool + for tg, summary := range summary.Summary { + if queued, ok := eval.QueuedAllocations[tg]; ok { + hasChanged = (queued != summary.Queued) + } + } + if hasChanged { + if err := w.UpdateEval(eval); err != nil { + return err + } + } + } + // Store the snapshot index in the eval eval.SnapshotIndex = w.snapshotIndex diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 0a04a92cb..7e34983de 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -463,12 +463,22 @@ func TestWorker_ReblockEval(t *testing.T) { // Create the blocked eval eval1 := mock.Eval() eval1.Status = structs.EvalStatusBlocked + eval1.QueuedAllocations = map[string]int{"cache": 100} // Insert it into the state store if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil { t.Fatal(err) } + // Create the job summary + js := mock.JobSummary(eval1.JobID) + tg := js.Summary["cache"] + tg.Queued = 100 + js.Summary["cache"] = tg + if err := s1.fsm.State().UpsertJobSummary(1001, js); err != nil { + t.Fatal(err) + } + // Enqueue the eval and then dequeue s1.evalBroker.Enqueue(eval1) evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) @@ -480,6 +490,7 @@ func TestWorker_ReblockEval(t *testing.T) { } eval2 := evalOut.Copy() + eval2.QueuedAllocations = map[string]int{"cache": 50} // Attempt to reblock eval w := &Worker{srv: s1, logger: s1.logger, evalToken: token} @@ -497,6 +508,15 @@ func TestWorker_ReblockEval(t *testing.T) { t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker: %#v", bStats) } + // Check that the eval was updated + eval, err := s1.fsm.State().EvalByID(eval2.ID) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(eval.QueuedAllocations, eval2.QueuedAllocations) { + t.Fatalf("expected: %#v, actual: %#v", eval2.QueuedAllocations, eval.QueuedAllocations) + } + // Check that the snapshot index was set properly by unblocking the eval and // then dequeuing. s1.blockedEvals.Unblock("foobar", 1000)