diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index ecb845c12..558826937 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -35,15 +35,21 @@ type BlockedEvals struct { escaped map[string]*structs.Evaluation // unblockCh is used to buffer unblocking of evaluations. - capacityChangeCh chan string + capacityChangeCh chan *capacityUpdate // jobs is the map of blocked job and is used to ensure that only one // blocked eval exists for each job. jobs map[string]struct{} + // unblockIndexes maps computed node classes to the index in which they were + // unblocked. This is used to check if an evaluation could have been + // unblocked between the time they were in the scheduler and the time they + // are being blocked. + unblockIndexes map[string]uint64 + // duplicates is the set of evaluations for jobs that had pre-existing // blocked evaluations. These should be marked as cancelled since only one - // blocked eval is neeeded bper job. + // blocked eval is neeeded per job. duplicates []*structs.Evaluation // duplicateCh is used to signal that a duplicate eval was added to the @@ -55,6 +61,12 @@ type BlockedEvals struct { stopCh chan struct{} } +// capacityUpdate stores unblock data. +type capacityUpdate struct { + computedClass string + index uint64 +} + // BlockedStats returns all the stats about the blocked eval tracker. type BlockedStats struct { // TotalEscaped is the total number of blocked evaluations that have escaped @@ -73,7 +85,8 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { captured: make(map[string]*structs.Evaluation), escaped: make(map[string]*structs.Evaluation), jobs: make(map[string]struct{}), - capacityChangeCh: make(chan string, unblockBuffer), + unblockIndexes: make(map[string]uint64), + capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), duplicateCh: make(chan struct{}, 1), stopCh: make(chan struct{}), stats: new(BlockedStats), @@ -133,6 +146,14 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { return } + // Check if the eval missed an unblock while it was in the scheduler at an + // older index. + if b.missedUnblock(eval) { + // Just re-enqueue the eval immediately + b.evalBroker.Enqueue(eval) + return + } + // Mark the job as tracked. b.stats.TotalBlocked++ b.jobs[eval.JobID] = struct{}{} @@ -152,16 +173,65 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { b.captured[eval.ID] = eval } +// missedUnblock returns whether an evaluation missed an unblock while it was in +// the scheduler. Since the scheduler can operate at an index in the past, the +// evaluation may have been processed missing data that would allow it to +// complete. This method returns if that is the case and should be called with +// the lock held. +func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool { + var max uint64 = 0 + for class, index := range b.unblockIndexes { + // Calculate the max unblock index + if max < index { + max = index + } + + elig, ok := eval.ClassEligibility[class] + if !ok { + // The evaluation was processed and did not encounter this class. + // Thus for correctness we need to unblock it. + return true + } + + // The evaluation could use the computed node class and the eval was + // processed before the last unblock. + if elig && eval.SnapshotIndex < index { + return true + } + } + + // If the evaluation has escaped, and the map contains an index older than + // the evaluations, it should be unblocked. + if eval.EscapedComputedClass && eval.SnapshotIndex < max { + return true + } + + // The evaluation is ahead of all recent unblocks. + return false +} + // Unblock causes any evaluation that could potentially make progress on a // capacity change on the passed computed node class to be enqueued into the // eval broker. -func (b *BlockedEvals) Unblock(computedClass string) { +func (b *BlockedEvals) Unblock(computedClass string, index uint64) { + b.l.Lock() + // Do nothing if not enabled if !b.enabled { + b.l.Unlock() return } - b.capacityChangeCh <- computedClass + // Store the index in which the unblock happened. We use this on subsequent + // block calls in case the evaluation was in the scheduler when a trigger + // occured. + b.unblockIndexes[computedClass] = index + b.l.Unlock() + + b.capacityChangeCh <- &capacityUpdate{ + computedClass: computedClass, + index: index, + } } // watchCapacity is a long lived function that watches for capacity changes in @@ -171,15 +241,15 @@ func (b *BlockedEvals) watchCapacity() { select { case <-b.stopCh: return - case computedClass := <-b.capacityChangeCh: - b.unblock(computedClass) + case update := <-b.capacityChangeCh: + b.unblock(update.computedClass, update.index) } } } // unblock unblocks all blocked evals that could run on the passed computed node // class. -func (b *BlockedEvals) unblock(computedClass string) { +func (b *BlockedEvals) unblock(computedClass string, index uint64) { b.l.Lock() defer b.l.Unlock() @@ -188,6 +258,11 @@ func (b *BlockedEvals) unblock(computedClass string) { return } + // Store the index in which the unblock happened. We use this on subsequent + // block calls in case the evaluation was in the scheduler when a trigger + // occured. + b.unblockIndexes[computedClass] = index + // Every eval that has escaped computed node class has to be unblocked // because any node could potentially be feasible. var unblocked []*structs.Evaluation @@ -273,7 +348,7 @@ func (b *BlockedEvals) Flush() { b.escaped = make(map[string]*structs.Evaluation) b.jobs = make(map[string]struct{}) b.duplicates = nil - b.capacityChangeCh = make(chan string, unblockBuffer) + b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer) b.stopCh = make(chan struct{}) b.duplicateCh = make(chan struct{}, 1) } diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index cf725a7a1..32487d9ee 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -53,6 +53,27 @@ func TestBlockedEvals_Block_SameJob(t *testing.T) { } } +func TestBlockedEvals_Block_PriorUnblocks(t *testing.T) { + blocked, _ := testBlockedEvals(t) + + // Do unblocks prior to blocking + blocked.Unblock("v1:123", 1000) + blocked.Unblock("v1:123", 1001) + + // Create two blocked evals and add them to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false} + e.SnapshotIndex = 999 + blocked.Block(e) + + // Verify block did track both + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } +} + func TestBlockedEvals_GetDuplicates(t *testing.T) { blocked, _ := testBlockedEvals(t) @@ -105,7 +126,7 @@ func TestBlockedEvals_UnblockEscaped(t *testing.T) { t.Fatalf("bad: %#v", bStats) } - blocked.Unblock("v1:123") + blocked.Unblock("v1:123", 1000) testutil.WaitForResult(func() (bool, error) { // Verify Unblock caused an enqueue @@ -141,7 +162,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) { t.Fatalf("bad: %#v", blockedStats) } - blocked.Unblock("v1:123") + blocked.Unblock("v1:123", 1000) testutil.WaitForResult(func() (bool, error) { // Verify Unblock caused an enqueue @@ -178,7 +199,7 @@ func TestBlockedEvals_UnblockIneligible(t *testing.T) { } // Should do nothing - blocked.Unblock("v1:123") + blocked.Unblock("v1:123", 1000) testutil.WaitForResult(func() (bool, error) { // Verify Unblock didn't cause an enqueue @@ -214,7 +235,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { } // Should unblock because the eval hasn't seen this node class. - blocked.Unblock("v1:789") + blocked.Unblock("v1:789", 1000) testutil.WaitForResult(func() (bool, error) { // Verify Unblock causes an enqueue @@ -233,3 +254,108 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { t.Fatalf("err: %s", err) }) } + +// Test the block case in which the eval should be immediately unblocked since +// it is escaped and old +func TestBlockedEvals_Block_ImmediateUnblock_Escaped(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Do an unblock prior to blocking + blocked.Unblock("v1:123", 1000) + + // Create a blocked eval that is eligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.EscapedComputedClass = true + e.SnapshotIndex = 900 + blocked.Block(e) + + // Verify block caused the eval to be immediately unblocked + blockedStats := blocked.Stats() + if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + +// Test the block case in which the eval should be immediately unblocked since +// it there is an unblock on an unseen class +func TestBlockedEvals_Block_ImmediateUnblock_UnseenClass(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Do an unblock prior to blocking + blocked.Unblock("v1:123", 1000) + + // Create a blocked eval that is eligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.EscapedComputedClass = false + e.SnapshotIndex = 900 + blocked.Block(e) + + // Verify block caused the eval to be immediately unblocked + blockedStats := blocked.Stats() + if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + +// Test the block case in which the eval should be immediately unblocked since +// it a class it is eligible for has been unblocked +func TestBlockedEvals_Block_ImmediateUnblock_SeenClass(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Do an unblock prior to blocking + blocked.Unblock("v1:123", 1000) + + // Create a blocked eval that is eligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false} + e.SnapshotIndex = 900 + blocked.Block(e) + + // Verify block caused the eval to be immediately unblocked + blockedStats := blocked.Stats() + if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 6237e9760..a220138f8 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -158,7 +158,7 @@ func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} { // Unblock evals for the nodes computed node class if it is in a ready // state. if req.Node.Status == structs.NodeStatusReady { - n.blockedEvals.Unblock(req.Node.ComputedClass) + n.blockedEvals.Unblock(req.Node.ComputedClass, index) } return nil @@ -199,7 +199,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { return err } - n.blockedEvals.Unblock(node.ComputedClass) + n.blockedEvals.Unblock(node.ComputedClass, index) } return nil @@ -420,7 +420,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return err } - n.blockedEvals.Unblock(node.ComputedClass) + n.blockedEvals.Unblock(node.ComputedClass, index) } } diff --git a/nomad/worker.go b/nomad/worker.go index 0e02377e2..9326f112e 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -58,7 +58,11 @@ type Worker struct { failures uint - evalToken string + evalToken string + + // snapshotIndex is the index of the snapshot in which the scheduler was + // first envoked. It is used to mark the SnapshotIndex of evaluations + // Created, Updated or Reblocked. snapshotIndex uint64 } @@ -326,9 +330,6 @@ SUBMIT: return nil, nil, fmt.Errorf("failed to snapshot state: %v", err) } state = snap - - // Store the snapshot's index - w.snapshotIndex = result.RefreshIndex } // Return the result and potential state update diff --git a/nomad/worker_test.go b/nomad/worker_test.go index fa92ef0a4..840ea1a28 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -501,7 +501,7 @@ func TestWorker_ReblockEval(t *testing.T) { // Check that the snapshot index was set properly by unblocking the eval and // then dequeuing. - s1.blockedEvals.Unblock("foobar") + s1.blockedEvals.Unblock("foobar", 1000) reblockedEval, _, err := s1.evalBroker.Dequeue([]string{eval1.Type}, 1*time.Second) if err != nil {