Track unblock indexes and check evals on block to see if they missed an update while in the scheduler

This commit is contained in:
Alex Dadgar
2016-05-23 15:24:31 -07:00
parent 580a363537
commit 1fc3fec1be
5 changed files with 223 additions and 21 deletions

View File

@@ -35,15 +35,21 @@ type BlockedEvals struct {
escaped map[string]*structs.Evaluation
// unblockCh is used to buffer unblocking of evaluations.
capacityChangeCh chan string
capacityChangeCh chan *capacityUpdate
// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job.
jobs map[string]struct{}
// unblockIndexes maps computed node classes to the index in which they were
// unblocked. This is used to check if an evaluation could have been
// unblocked between the time they were in the scheduler and the time they
// are being blocked.
unblockIndexes map[string]uint64
// duplicates is the set of evaluations for jobs that had pre-existing
// blocked evaluations. These should be marked as cancelled since only one
// blocked eval is neeeded bper job.
// blocked eval is neeeded per job.
duplicates []*structs.Evaluation
// duplicateCh is used to signal that a duplicate eval was added to the
@@ -55,6 +61,12 @@ type BlockedEvals struct {
stopCh chan struct{}
}
// capacityUpdate stores unblock data.
type capacityUpdate struct {
computedClass string
index uint64
}
// BlockedStats returns all the stats about the blocked eval tracker.
type BlockedStats struct {
// TotalEscaped is the total number of blocked evaluations that have escaped
@@ -73,7 +85,8 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
captured: make(map[string]*structs.Evaluation),
escaped: make(map[string]*structs.Evaluation),
jobs: make(map[string]struct{}),
capacityChangeCh: make(chan string, unblockBuffer),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
stats: new(BlockedStats),
@@ -133,6 +146,14 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) {
return
}
// Check if the eval missed an unblock while it was in the scheduler at an
// older index.
if b.missedUnblock(eval) {
// Just re-enqueue the eval immediately
b.evalBroker.Enqueue(eval)
return
}
// Mark the job as tracked.
b.stats.TotalBlocked++
b.jobs[eval.JobID] = struct{}{}
@@ -152,16 +173,65 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) {
b.captured[eval.ID] = eval
}
// missedUnblock returns whether an evaluation missed an unblock while it was in
// the scheduler. Since the scheduler can operate at an index in the past, the
// evaluation may have been processed missing data that would allow it to
// complete. This method returns if that is the case and should be called with
// the lock held.
func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
var max uint64 = 0
for class, index := range b.unblockIndexes {
// Calculate the max unblock index
if max < index {
max = index
}
elig, ok := eval.ClassEligibility[class]
if !ok {
// The evaluation was processed and did not encounter this class.
// Thus for correctness we need to unblock it.
return true
}
// The evaluation could use the computed node class and the eval was
// processed before the last unblock.
if elig && eval.SnapshotIndex < index {
return true
}
}
// If the evaluation has escaped, and the map contains an index older than
// the evaluations, it should be unblocked.
if eval.EscapedComputedClass && eval.SnapshotIndex < max {
return true
}
// The evaluation is ahead of all recent unblocks.
return false
}
// Unblock causes any evaluation that could potentially make progress on a
// capacity change on the passed computed node class to be enqueued into the
// eval broker.
func (b *BlockedEvals) Unblock(computedClass string) {
func (b *BlockedEvals) Unblock(computedClass string, index uint64) {
b.l.Lock()
// Do nothing if not enabled
if !b.enabled {
b.l.Unlock()
return
}
b.capacityChangeCh <- computedClass
// Store the index in which the unblock happened. We use this on subsequent
// block calls in case the evaluation was in the scheduler when a trigger
// occured.
b.unblockIndexes[computedClass] = index
b.l.Unlock()
b.capacityChangeCh <- &capacityUpdate{
computedClass: computedClass,
index: index,
}
}
// watchCapacity is a long lived function that watches for capacity changes in
@@ -171,15 +241,15 @@ func (b *BlockedEvals) watchCapacity() {
select {
case <-b.stopCh:
return
case computedClass := <-b.capacityChangeCh:
b.unblock(computedClass)
case update := <-b.capacityChangeCh:
b.unblock(update.computedClass, update.index)
}
}
}
// unblock unblocks all blocked evals that could run on the passed computed node
// class.
func (b *BlockedEvals) unblock(computedClass string) {
func (b *BlockedEvals) unblock(computedClass string, index uint64) {
b.l.Lock()
defer b.l.Unlock()
@@ -188,6 +258,11 @@ func (b *BlockedEvals) unblock(computedClass string) {
return
}
// Store the index in which the unblock happened. We use this on subsequent
// block calls in case the evaluation was in the scheduler when a trigger
// occured.
b.unblockIndexes[computedClass] = index
// Every eval that has escaped computed node class has to be unblocked
// because any node could potentially be feasible.
var unblocked []*structs.Evaluation
@@ -273,7 +348,7 @@ func (b *BlockedEvals) Flush() {
b.escaped = make(map[string]*structs.Evaluation)
b.jobs = make(map[string]struct{})
b.duplicates = nil
b.capacityChangeCh = make(chan string, unblockBuffer)
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
b.duplicateCh = make(chan struct{}, 1)
}

View File

@@ -53,6 +53,27 @@ func TestBlockedEvals_Block_SameJob(t *testing.T) {
}
}
func TestBlockedEvals_Block_PriorUnblocks(t *testing.T) {
blocked, _ := testBlockedEvals(t)
// Do unblocks prior to blocking
blocked.Unblock("v1:123", 1000)
blocked.Unblock("v1:123", 1001)
// Create two blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false}
e.SnapshotIndex = 999
blocked.Block(e)
// Verify block did track both
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}
func TestBlockedEvals_GetDuplicates(t *testing.T) {
blocked, _ := testBlockedEvals(t)
@@ -105,7 +126,7 @@ func TestBlockedEvals_UnblockEscaped(t *testing.T) {
t.Fatalf("bad: %#v", bStats)
}
blocked.Unblock("v1:123")
blocked.Unblock("v1:123", 1000)
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
@@ -141,7 +162,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) {
t.Fatalf("bad: %#v", blockedStats)
}
blocked.Unblock("v1:123")
blocked.Unblock("v1:123", 1000)
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
@@ -178,7 +199,7 @@ func TestBlockedEvals_UnblockIneligible(t *testing.T) {
}
// Should do nothing
blocked.Unblock("v1:123")
blocked.Unblock("v1:123", 1000)
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock didn't cause an enqueue
@@ -214,7 +235,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) {
}
// Should unblock because the eval hasn't seen this node class.
blocked.Unblock("v1:789")
blocked.Unblock("v1:789", 1000)
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock causes an enqueue
@@ -233,3 +254,108 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) {
t.Fatalf("err: %s", err)
})
}
// Test the block case in which the eval should be immediately unblocked since
// it is escaped and old
func TestBlockedEvals_Block_ImmediateUnblock_Escaped(t *testing.T) {
blocked, broker := testBlockedEvals(t)
// Do an unblock prior to blocking
blocked.Unblock("v1:123", 1000)
// Create a blocked eval that is eligible on a specific node class and add
// it to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.EscapedComputedClass = true
e.SnapshotIndex = 900
blocked.Block(e)
// Verify block caused the eval to be immediately unblocked
blockedStats := blocked.Stats()
if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", blockedStats)
}
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
// Test the block case in which the eval should be immediately unblocked since
// it there is an unblock on an unseen class
func TestBlockedEvals_Block_ImmediateUnblock_UnseenClass(t *testing.T) {
blocked, broker := testBlockedEvals(t)
// Do an unblock prior to blocking
blocked.Unblock("v1:123", 1000)
// Create a blocked eval that is eligible on a specific node class and add
// it to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.EscapedComputedClass = false
e.SnapshotIndex = 900
blocked.Block(e)
// Verify block caused the eval to be immediately unblocked
blockedStats := blocked.Stats()
if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", blockedStats)
}
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
// Test the block case in which the eval should be immediately unblocked since
// it a class it is eligible for has been unblocked
func TestBlockedEvals_Block_ImmediateUnblock_SeenClass(t *testing.T) {
blocked, broker := testBlockedEvals(t)
// Do an unblock prior to blocking
blocked.Unblock("v1:123", 1000)
// Create a blocked eval that is eligible on a specific node class and add
// it to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false}
e.SnapshotIndex = 900
blocked.Block(e)
// Verify block caused the eval to be immediately unblocked
blockedStats := blocked.Stats()
if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", blockedStats)
}
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

View File

@@ -158,7 +158,7 @@ func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
// Unblock evals for the nodes computed node class if it is in a ready
// state.
if req.Node.Status == structs.NodeStatusReady {
n.blockedEvals.Unblock(req.Node.ComputedClass)
n.blockedEvals.Unblock(req.Node.ComputedClass, index)
}
return nil
@@ -199,7 +199,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
return err
}
n.blockedEvals.Unblock(node.ComputedClass)
n.blockedEvals.Unblock(node.ComputedClass, index)
}
return nil
@@ -420,7 +420,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return err
}
n.blockedEvals.Unblock(node.ComputedClass)
n.blockedEvals.Unblock(node.ComputedClass, index)
}
}

View File

@@ -58,7 +58,11 @@ type Worker struct {
failures uint
evalToken string
evalToken string
// snapshotIndex is the index of the snapshot in which the scheduler was
// first envoked. It is used to mark the SnapshotIndex of evaluations
// Created, Updated or Reblocked.
snapshotIndex uint64
}
@@ -326,9 +330,6 @@ SUBMIT:
return nil, nil, fmt.Errorf("failed to snapshot state: %v", err)
}
state = snap
// Store the snapshot's index
w.snapshotIndex = result.RefreshIndex
}
// Return the result and potential state update

View File

@@ -501,7 +501,7 @@ func TestWorker_ReblockEval(t *testing.T) {
// Check that the snapshot index was set properly by unblocking the eval and
// then dequeuing.
s1.blockedEvals.Unblock("foobar")
s1.blockedEvals.Unblock("foobar", 1000)
reblockedEval, _, err := s1.evalBroker.Dequeue([]string{eval1.Type}, 1*time.Second)
if err != nil {