mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
Merge pull request #2155 from hashicorp/f-cancel
Cancel blocked evals upon successful one for job
This commit is contained in:
@@ -39,8 +39,8 @@ type BlockedEvals struct {
|
||||
capacityChangeCh chan *capacityUpdate
|
||||
|
||||
// jobs is the map of blocked job and is used to ensure that only one
|
||||
// blocked eval exists for each job.
|
||||
jobs map[string]struct{}
|
||||
// blocked eval exists for each job. The value is the blocked evaluation ID.
|
||||
jobs map[string]string
|
||||
|
||||
// unblockIndexes maps computed node classes to the index in which they were
|
||||
// unblocked. This is used to check if an evaluation could have been
|
||||
@@ -91,7 +91,7 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
|
||||
evalBroker: evalBroker,
|
||||
captured: make(map[string]wrappedEval),
|
||||
escaped: make(map[string]wrappedEval),
|
||||
jobs: make(map[string]struct{}),
|
||||
jobs: make(map[string]string),
|
||||
unblockIndexes: make(map[string]uint64),
|
||||
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
|
||||
duplicateCh: make(chan struct{}, 1),
|
||||
@@ -183,7 +183,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
|
||||
|
||||
// Mark the job as tracked.
|
||||
b.stats.TotalBlocked++
|
||||
b.jobs[eval.JobID] = struct{}{}
|
||||
b.jobs[eval.JobID] = eval.ID
|
||||
|
||||
// Wrap the evaluation, capturing its token.
|
||||
wrapped := wrappedEval{
|
||||
@@ -244,6 +244,40 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Untrack causes any blocked evaluation for the passed job to be no longer
|
||||
// tracked. Untrack is called when there is a successful evaluation for the job
|
||||
// and a blocked evaluation is no longer needed.
|
||||
func (b *BlockedEvals) Untrack(jobID string) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
// Do nothing if not enabled
|
||||
if !b.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
// Get the evaluation ID to cancel
|
||||
evalID, ok := b.jobs[jobID]
|
||||
if !ok {
|
||||
// No blocked evaluation so exit
|
||||
return
|
||||
}
|
||||
|
||||
// Attempt to delete the evaluation
|
||||
if w, ok := b.captured[evalID]; ok {
|
||||
delete(b.jobs, w.eval.JobID)
|
||||
delete(b.captured, evalID)
|
||||
b.stats.TotalBlocked--
|
||||
}
|
||||
|
||||
if w, ok := b.escaped[evalID]; ok {
|
||||
delete(b.jobs, w.eval.JobID)
|
||||
delete(b.escaped, evalID)
|
||||
b.stats.TotalEscaped--
|
||||
b.stats.TotalBlocked--
|
||||
}
|
||||
}
|
||||
|
||||
// Unblock causes any evaluation that could potentially make progress on a
|
||||
// capacity change on the passed computed node class to be enqueued into the
|
||||
// eval broker.
|
||||
@@ -410,7 +444,7 @@ func (b *BlockedEvals) Flush() {
|
||||
b.stats.TotalBlocked = 0
|
||||
b.captured = make(map[string]wrappedEval)
|
||||
b.escaped = make(map[string]wrappedEval)
|
||||
b.jobs = make(map[string]struct{})
|
||||
b.jobs = make(map[string]string)
|
||||
b.duplicates = nil
|
||||
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
|
||||
b.stopCh = make(chan struct{})
|
||||
|
||||
@@ -484,3 +484,27 @@ func TestBlockedEvals_UnblockFailed(t *testing.T) {
|
||||
t.Fatalf("bad: %#v", blockedStats)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockedEvals_Untrack(t *testing.T) {
|
||||
blocked, _ := testBlockedEvals(t)
|
||||
|
||||
// Create two blocked evals and add them to the blocked tracker.
|
||||
e := mock.Eval()
|
||||
e.Status = structs.EvalStatusBlocked
|
||||
e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false}
|
||||
e.SnapshotIndex = 1000
|
||||
blocked.Block(e)
|
||||
|
||||
// Verify block did track
|
||||
bStats := blocked.Stats()
|
||||
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
|
||||
t.Fatalf("bad: %#v", bStats)
|
||||
}
|
||||
|
||||
// Untrack and verify
|
||||
blocked.Untrack(e.JobID)
|
||||
bStats = blocked.Stats()
|
||||
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
|
||||
t.Fatalf("bad: %#v", bStats)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -350,6 +350,11 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
|
||||
n.evalBroker.Enqueue(eval)
|
||||
} else if eval.ShouldBlock() {
|
||||
n.blockedEvals.Block(eval)
|
||||
} else if eval.Status == structs.EvalStatusComplete &&
|
||||
len(eval.FailedTGAllocs) == 0 {
|
||||
// If we have a successful evaluation for a node, untrack any
|
||||
// blocked evaluation
|
||||
n.blockedEvals.Untrack(eval.JobID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -445,6 +445,114 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_UpdateEval_Untrack(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
fsm.evalBroker.SetEnabled(true)
|
||||
fsm.blockedEvals.SetEnabled(true)
|
||||
|
||||
// Mark an eval as blocked.
|
||||
bEval := mock.Eval()
|
||||
bEval.ClassEligibility = map[string]bool{"v1:123": true}
|
||||
fsm.blockedEvals.Block(bEval)
|
||||
|
||||
// Create a successful eval for the same job
|
||||
eval := mock.Eval()
|
||||
eval.JobID = bEval.JobID
|
||||
eval.Status = structs.EvalStatusComplete
|
||||
|
||||
req := structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{eval},
|
||||
}
|
||||
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify we are registered
|
||||
out, err := fsm.State().EvalByID(eval.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out == nil {
|
||||
t.Fatalf("not found!")
|
||||
}
|
||||
if out.CreateIndex != 1 {
|
||||
t.Fatalf("bad index: %d", out.CreateIndex)
|
||||
}
|
||||
|
||||
// Verify the eval wasn't enqueued
|
||||
stats := fsm.evalBroker.Stats()
|
||||
if stats.TotalReady != 0 {
|
||||
t.Fatalf("bad: %#v %#v", stats, out)
|
||||
}
|
||||
|
||||
// Verify the eval was untracked in the blocked tracker.
|
||||
bStats := fsm.blockedEvals.Stats()
|
||||
if bStats.TotalBlocked != 0 {
|
||||
t.Fatalf("bad: %#v %#v", bStats, out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_UpdateEval_NoUntrack(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
fsm.evalBroker.SetEnabled(true)
|
||||
fsm.blockedEvals.SetEnabled(true)
|
||||
|
||||
// Mark an eval as blocked.
|
||||
bEval := mock.Eval()
|
||||
bEval.ClassEligibility = map[string]bool{"v1:123": true}
|
||||
fsm.blockedEvals.Block(bEval)
|
||||
|
||||
// Create a successful eval for the same job but with placement failures
|
||||
eval := mock.Eval()
|
||||
eval.JobID = bEval.JobID
|
||||
eval.Status = structs.EvalStatusComplete
|
||||
eval.FailedTGAllocs = make(map[string]*structs.AllocMetric)
|
||||
eval.FailedTGAllocs["test"] = new(structs.AllocMetric)
|
||||
|
||||
req := structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{eval},
|
||||
}
|
||||
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify we are registered
|
||||
out, err := fsm.State().EvalByID(eval.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out == nil {
|
||||
t.Fatalf("not found!")
|
||||
}
|
||||
if out.CreateIndex != 1 {
|
||||
t.Fatalf("bad index: %d", out.CreateIndex)
|
||||
}
|
||||
|
||||
// Verify the eval wasn't enqueued
|
||||
stats := fsm.evalBroker.Stats()
|
||||
if stats.TotalReady != 0 {
|
||||
t.Fatalf("bad: %#v %#v", stats, out)
|
||||
}
|
||||
|
||||
// Verify the eval was not untracked in the blocked tracker.
|
||||
bStats := fsm.blockedEvals.Stats()
|
||||
if bStats.TotalBlocked != 1 {
|
||||
t.Fatalf("bad: %#v %#v", bStats, out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_DeleteEval(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
|
||||
|
||||
@@ -214,9 +214,17 @@ func evalTableSchema() *memdb.TableSchema {
|
||||
Name: "job",
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "JobID",
|
||||
Lowercase: true,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "JobID",
|
||||
Lowercase: true,
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Status",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -672,7 +672,7 @@ func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) {
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// UpsertEvaluation is used to upsert an evaluation
|
||||
// UpsertEvals is used to upsert a set of evaluations
|
||||
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
@@ -685,7 +685,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
|
||||
for _, eval := range evals {
|
||||
watcher.Add(watch.Item{Eval: eval.ID})
|
||||
watcher.Add(watch.Item{EvalJob: eval.JobID})
|
||||
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
|
||||
if err := s.nestedUpsertEval(txn, watcher, index, eval); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -703,7 +703,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
|
||||
}
|
||||
|
||||
// nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction
|
||||
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error {
|
||||
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index uint64, eval *structs.Evaluation) error {
|
||||
// Lookup the evaluation
|
||||
existing, err := txn.First("evals", "id", eval.ID)
|
||||
if err != nil {
|
||||
@@ -751,6 +751,37 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the job has any blocked evaluations and cancel them
|
||||
if eval.Status == structs.EvalStatusComplete && len(eval.FailedTGAllocs) == 0 {
|
||||
// Get the blocked evaluation for a job if it exists
|
||||
iter, err := txn.Get("evals", "job", eval.JobID, structs.EvalStatusBlocked)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get blocked evals for job %q", eval.JobID, err)
|
||||
}
|
||||
|
||||
var blocked []*structs.Evaluation
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
blocked = append(blocked, raw.(*structs.Evaluation))
|
||||
}
|
||||
|
||||
// Go through and update the evals
|
||||
for _, eval := range blocked {
|
||||
newEval := eval.Copy()
|
||||
newEval.Status = structs.EvalStatusCancelled
|
||||
newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID)
|
||||
newEval.ModifyIndex = index
|
||||
if err := txn.Insert("evals", newEval); err != nil {
|
||||
return fmt.Errorf("eval insert failed: %v", err)
|
||||
}
|
||||
|
||||
watcher.Add(watch.Item{Eval: newEval.ID})
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the eval
|
||||
if err := txn.Insert("evals", eval); err != nil {
|
||||
return fmt.Errorf("eval insert failed: %v", err)
|
||||
@@ -855,7 +886,7 @@ func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) {
|
||||
txn := s.db.Txn(false)
|
||||
|
||||
// Get an iterator over the node allocations
|
||||
iter, err := txn.Get("evals", "job", jobID)
|
||||
iter, err := txn.Get("evals", "job_prefix", jobID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1603,7 +1634,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
|
||||
}
|
||||
}
|
||||
|
||||
evals, err := txn.Get("evals", "job", job.ID)
|
||||
evals, err := txn.Get("evals", "job_prefix", job.ID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -1340,6 +1340,74 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) {
|
||||
notify.verify(t)
|
||||
}
|
||||
|
||||
func TestStateStore_UpsertEvals_CancelBlocked(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
|
||||
// Create two blocked evals for the same job
|
||||
j := "test-job"
|
||||
b1, b2 := mock.Eval(), mock.Eval()
|
||||
b1.JobID = j
|
||||
b1.Status = structs.EvalStatusBlocked
|
||||
b2.JobID = j
|
||||
b2.Status = structs.EvalStatusBlocked
|
||||
|
||||
err := state.UpsertEvals(999, []*structs.Evaluation{b1, b2})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create one complete and successful eval for the job
|
||||
eval := mock.Eval()
|
||||
eval.JobID = j
|
||||
eval.Status = structs.EvalStatusComplete
|
||||
|
||||
notify := setupNotifyTest(
|
||||
state,
|
||||
watch.Item{Table: "evals"},
|
||||
watch.Item{Eval: b1.ID},
|
||||
watch.Item{Eval: b2.ID},
|
||||
watch.Item{Eval: eval.ID},
|
||||
watch.Item{EvalJob: eval.JobID})
|
||||
|
||||
if err := state.UpsertEvals(1000, []*structs.Evaluation{eval}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
out, err := state.EvalByID(eval.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(eval, out) {
|
||||
t.Fatalf("bad: %#v %#v", eval, out)
|
||||
}
|
||||
|
||||
index, err := state.Index("evals")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if index != 1000 {
|
||||
t.Fatalf("bad: %d", index)
|
||||
}
|
||||
|
||||
// Get b1/b2 and check they are cancelled
|
||||
out1, err := state.EvalByID(b1.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
out2, err := state.EvalByID(b2.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if out1.Status != structs.EvalStatusCancelled || out2.Status != structs.EvalStatusCancelled {
|
||||
t.Fatalf("bad: %#v %#v", out1, out2)
|
||||
}
|
||||
|
||||
notify.verify(t)
|
||||
}
|
||||
|
||||
func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
eval := mock.Eval()
|
||||
|
||||
Reference in New Issue
Block a user