diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 623d81d57..548968249 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -39,8 +39,8 @@ type BlockedEvals struct { capacityChangeCh chan *capacityUpdate // jobs is the map of blocked job and is used to ensure that only one - // blocked eval exists for each job. - jobs map[string]struct{} + // blocked eval exists for each job. The value is the blocked evaluation ID. + jobs map[string]string // unblockIndexes maps computed node classes to the index in which they were // unblocked. This is used to check if an evaluation could have been @@ -91,7 +91,7 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { evalBroker: evalBroker, captured: make(map[string]wrappedEval), escaped: make(map[string]wrappedEval), - jobs: make(map[string]struct{}), + jobs: make(map[string]string), unblockIndexes: make(map[string]uint64), capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), duplicateCh: make(chan struct{}, 1), @@ -183,7 +183,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { // Mark the job as tracked. b.stats.TotalBlocked++ - b.jobs[eval.JobID] = struct{}{} + b.jobs[eval.JobID] = eval.ID // Wrap the evaluation, capturing its token. wrapped := wrappedEval{ @@ -244,6 +244,40 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool { return false } +// Untrack causes any blocked evaluation for the passed job to be no longer +// tracked. Untrack is called when there is a successful evaluation for the job +// and a blocked evaluation is no longer needed. +func (b *BlockedEvals) Untrack(jobID string) { + b.l.Lock() + defer b.l.Unlock() + + // Do nothing if not enabled + if !b.enabled { + return + } + + // Get the evaluation ID to cancel + evalID, ok := b.jobs[jobID] + if !ok { + // No blocked evaluation so exit + return + } + + // Attempt to delete the evaluation + if w, ok := b.captured[evalID]; ok { + delete(b.jobs, w.eval.JobID) + delete(b.captured, evalID) + b.stats.TotalBlocked-- + } + + if w, ok := b.escaped[evalID]; ok { + delete(b.jobs, w.eval.JobID) + delete(b.escaped, evalID) + b.stats.TotalEscaped-- + b.stats.TotalBlocked-- + } +} + // Unblock causes any evaluation that could potentially make progress on a // capacity change on the passed computed node class to be enqueued into the // eval broker. @@ -410,7 +444,7 @@ func (b *BlockedEvals) Flush() { b.stats.TotalBlocked = 0 b.captured = make(map[string]wrappedEval) b.escaped = make(map[string]wrappedEval) - b.jobs = make(map[string]struct{}) + b.jobs = make(map[string]string) b.duplicates = nil b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer) b.stopCh = make(chan struct{}) diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 646b24d80..2a3192e9a 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -484,3 +484,27 @@ func TestBlockedEvals_UnblockFailed(t *testing.T) { t.Fatalf("bad: %#v", blockedStats) } } + +func TestBlockedEvals_Untrack(t *testing.T) { + blocked, _ := testBlockedEvals(t) + + // Create two blocked evals and add them to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false} + e.SnapshotIndex = 1000 + blocked.Block(e) + + // Verify block did track + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } + + // Untrack and verify + blocked.Untrack(e.JobID) + bStats = blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 695a95fa5..01deb5f33 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -350,6 +350,11 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { n.evalBroker.Enqueue(eval) } else if eval.ShouldBlock() { n.blockedEvals.Block(eval) + } else if eval.Status == structs.EvalStatusComplete && + len(eval.FailedTGAllocs) == 0 { + // If we have a successful evaluation for a node, untrack any + // blocked evaluation + n.blockedEvals.Untrack(eval.JobID) } } return nil diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 90cebdc48..1883a6c97 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -445,6 +445,114 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) { } } +func TestFSM_UpdateEval_Untrack(t *testing.T) { + fsm := testFSM(t) + fsm.evalBroker.SetEnabled(true) + fsm.blockedEvals.SetEnabled(true) + + // Mark an eval as blocked. + bEval := mock.Eval() + bEval.ClassEligibility = map[string]bool{"v1:123": true} + fsm.blockedEvals.Block(bEval) + + // Create a successful eval for the same job + eval := mock.Eval() + eval.JobID = bEval.JobID + eval.Status = structs.EvalStatusComplete + + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("not found!") + } + if out.CreateIndex != 1 { + t.Fatalf("bad index: %d", out.CreateIndex) + } + + // Verify the eval wasn't enqueued + stats := fsm.evalBroker.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v %#v", stats, out) + } + + // Verify the eval was untracked in the blocked tracker. + bStats := fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 0 { + t.Fatalf("bad: %#v %#v", bStats, out) + } +} + +func TestFSM_UpdateEval_NoUntrack(t *testing.T) { + fsm := testFSM(t) + fsm.evalBroker.SetEnabled(true) + fsm.blockedEvals.SetEnabled(true) + + // Mark an eval as blocked. + bEval := mock.Eval() + bEval.ClassEligibility = map[string]bool{"v1:123": true} + fsm.blockedEvals.Block(bEval) + + // Create a successful eval for the same job but with placement failures + eval := mock.Eval() + eval.JobID = bEval.JobID + eval.Status = structs.EvalStatusComplete + eval.FailedTGAllocs = make(map[string]*structs.AllocMetric) + eval.FailedTGAllocs["test"] = new(structs.AllocMetric) + + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("not found!") + } + if out.CreateIndex != 1 { + t.Fatalf("bad index: %d", out.CreateIndex) + } + + // Verify the eval wasn't enqueued + stats := fsm.evalBroker.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v %#v", stats, out) + } + + // Verify the eval was not untracked in the blocked tracker. + bStats := fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 1 { + t.Fatalf("bad: %#v %#v", bStats, out) + } +} + func TestFSM_DeleteEval(t *testing.T) { fsm := testFSM(t) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 05ffd78cf..aed0e7ab3 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -214,9 +214,17 @@ func evalTableSchema() *memdb.TableSchema { Name: "job", AllowMissing: false, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "JobID", - Lowercase: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "JobID", + Lowercase: true, + }, + &memdb.StringFieldIndex{ + Field: "Status", + Lowercase: true, + }, + }, }, }, }, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index c9c794507..a18c5ee99 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -672,7 +672,7 @@ func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) { return iter, nil } -// UpsertEvaluation is used to upsert an evaluation +// UpsertEvals is used to upsert a set of evaluations func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error { txn := s.db.Txn(true) defer txn.Abort() @@ -685,7 +685,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro for _, eval := range evals { watcher.Add(watch.Item{Eval: eval.ID}) watcher.Add(watch.Item{EvalJob: eval.JobID}) - if err := s.nestedUpsertEval(txn, index, eval); err != nil { + if err := s.nestedUpsertEval(txn, watcher, index, eval); err != nil { return err } @@ -703,7 +703,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro } // nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction -func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error { +func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index uint64, eval *structs.Evaluation) error { // Lookup the evaluation existing, err := txn.First("evals", "id", eval.ID) if err != nil { @@ -751,6 +751,37 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct } } + // Check if the job has any blocked evaluations and cancel them + if eval.Status == structs.EvalStatusComplete && len(eval.FailedTGAllocs) == 0 { + // Get the blocked evaluation for a job if it exists + iter, err := txn.Get("evals", "job", eval.JobID, structs.EvalStatusBlocked) + if err != nil { + return fmt.Errorf("failed to get blocked evals for job %q", eval.JobID, err) + } + + var blocked []*structs.Evaluation + for { + raw := iter.Next() + if raw == nil { + break + } + blocked = append(blocked, raw.(*structs.Evaluation)) + } + + // Go through and update the evals + for _, eval := range blocked { + newEval := eval.Copy() + newEval.Status = structs.EvalStatusCancelled + newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID) + newEval.ModifyIndex = index + if err := txn.Insert("evals", newEval); err != nil { + return fmt.Errorf("eval insert failed: %v", err) + } + + watcher.Add(watch.Item{Eval: newEval.ID}) + } + } + // Insert the eval if err := txn.Insert("evals", eval); err != nil { return fmt.Errorf("eval insert failed: %v", err) @@ -855,7 +886,7 @@ func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) { txn := s.db.Txn(false) // Get an iterator over the node allocations - iter, err := txn.Get("evals", "job", jobID) + iter, err := txn.Get("evals", "job_prefix", jobID) if err != nil { return nil, err } @@ -1603,7 +1634,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b } } - evals, err := txn.Get("evals", "job", job.ID) + evals, err := txn.Get("evals", "job_prefix", job.ID) if err != nil { return "", err } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index a4deb2439..b73f24e4f 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1340,6 +1340,74 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) { notify.verify(t) } +func TestStateStore_UpsertEvals_CancelBlocked(t *testing.T) { + state := testStateStore(t) + + // Create two blocked evals for the same job + j := "test-job" + b1, b2 := mock.Eval(), mock.Eval() + b1.JobID = j + b1.Status = structs.EvalStatusBlocked + b2.JobID = j + b2.Status = structs.EvalStatusBlocked + + err := state.UpsertEvals(999, []*structs.Evaluation{b1, b2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create one complete and successful eval for the job + eval := mock.Eval() + eval.JobID = j + eval.Status = structs.EvalStatusComplete + + notify := setupNotifyTest( + state, + watch.Item{Table: "evals"}, + watch.Item{Eval: b1.ID}, + watch.Item{Eval: b2.ID}, + watch.Item{Eval: eval.ID}, + watch.Item{EvalJob: eval.JobID}) + + if err := state.UpsertEvals(1000, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(eval, out) { + t.Fatalf("bad: %#v %#v", eval, out) + } + + index, err := state.Index("evals") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1000 { + t.Fatalf("bad: %d", index) + } + + // Get b1/b2 and check they are cancelled + out1, err := state.EvalByID(b1.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + out2, err := state.EvalByID(b2.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if out1.Status != structs.EvalStatusCancelled || out2.Status != structs.EvalStatusCancelled { + t.Fatalf("bad: %#v %#v", out1, out2) + } + + notify.verify(t) +} + func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) { state := testStateStore(t) eval := mock.Eval()