Merge pull request #4867 from hashicorp/b-deployment-progress-deadline

Blocked evaluation fixes
This commit is contained in:
Alex Dadgar
2018-11-13 10:29:03 -08:00
committed by GitHub
10 changed files with 186 additions and 66 deletions

View File

@@ -6,6 +6,8 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -29,6 +31,9 @@ const (
// allocations. It is unblocked when the capacity of a node that could run the
// failed allocation becomes available.
type BlockedEvals struct {
// logger is the logger to use by the blocked eval tracker.
logger log.Logger
evalBroker *EvalBroker
enabled bool
stats *BlockedStats
@@ -47,7 +52,7 @@ type BlockedEvals struct {
// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job. The value is the blocked evaluation ID.
jobs map[string]string
jobs map[structs.NamespacedID]string
// unblockIndexes maps computed node classes or quota name to the index in
// which they were unblocked. This is used to check if an evaluation could
@@ -102,12 +107,13 @@ type BlockedStats struct {
// NewBlockedEvals creates a new blocked eval tracker that will enqueue
// unblocked evals into the passed broker.
func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
return &BlockedEvals{
logger: logger.Named("blocked_evals"),
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
jobs: make(map[string]string),
jobs: make(map[structs.NamespacedID]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
@@ -176,19 +182,11 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
return
}
// Check if the job already has a blocked evaluation. If it does add it to
// the list of duplicates. We only ever want one blocked evaluation per job,
// otherwise we would create unnecessary work for the scheduler as multiple
// evals for the same job would be run, all producing the same outcome.
if _, existing := b.jobs[eval.JobID]; existing {
b.duplicates = append(b.duplicates, eval)
// Unblock any waiter.
select {
case b.duplicateCh <- struct{}{}:
default:
}
// Handle the new evaluation being for a job we are already tracking.
if b.processBlockJobDuplicate(eval) {
// If process block job duplicate returns true, the new evaluation has
// been marked as a duplicate and we have nothing to do, so return
// early.
return
}
@@ -205,7 +203,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
}
// Mark the job as tracked.
b.jobs[eval.JobID] = eval.ID
b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] = eval.ID
b.stats.TotalBlocked++
// Track that the evaluation is being added due to reaching the quota limit
@@ -234,6 +232,71 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
b.captured[eval.ID] = wrapped
}
// processBlockJobDuplicate handles the case where the new eval is for a job
// that we are already tracking. If the eval is a duplicate, we add the older
// evaluation by Raft index to the list of duplicates such that it can be
// cancelled. We only ever want one blocked evaluation per job, otherwise we
// would create unnecessary work for the scheduler as multiple evals for the
// same job would be run, all producing the same outcome. It is critical to
// prefer the newer evaluation, since it will contain the most up to date set of
// class eligibility. The return value is set to true, if the passed evaluation
// is cancelled. This should be called with the lock held.
func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) (newCancelled bool) {
existingID, hasExisting := b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)]
if !hasExisting {
return
}
var dup *structs.Evaluation
existingW, ok := b.captured[existingID]
if ok {
if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) {
delete(b.captured, existingID)
b.stats.TotalBlocked--
dup = existingW.eval
} else {
dup = eval
newCancelled = true
}
} else {
existingW, ok = b.escaped[existingID]
if !ok {
// This is a programming error
b.logger.Error("existing blocked evaluation is neither tracked as captured or escaped", "existing_id", existingID)
delete(b.jobs, structs.NewNamespacedID(eval.JobID, eval.Namespace))
return
}
if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) {
delete(b.escaped, existingID)
b.stats.TotalEscaped--
dup = existingW.eval
} else {
dup = eval
newCancelled = true
}
}
b.duplicates = append(b.duplicates, dup)
// Unblock any waiter.
select {
case b.duplicateCh <- struct{}{}:
default:
}
return
}
// latestEvalIndex returns the max of the evaluations create and snapshot index
func latestEvalIndex(eval *structs.Evaluation) uint64 {
if eval == nil {
return 0
}
return helper.Uint64Max(eval.CreateIndex, eval.SnapshotIndex)
}
// missedUnblock returns whether an evaluation missed an unblock while it was in
// the scheduler. Since the scheduler can operate at an index in the past, the
// evaluation may have been processed missing data that would allow it to
@@ -291,7 +354,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
// Untrack causes any blocked evaluation for the passed job to be no longer
// tracked. Untrack is called when there is a successful evaluation for the job
// and a blocked evaluation is no longer needed.
func (b *BlockedEvals) Untrack(jobID string) {
func (b *BlockedEvals) Untrack(jobID, namespace string) {
b.l.Lock()
defer b.l.Unlock()
@@ -300,8 +363,10 @@ func (b *BlockedEvals) Untrack(jobID string) {
return
}
nsID := structs.NewNamespacedID(jobID, namespace)
// Get the evaluation ID to cancel
evalID, ok := b.jobs[jobID]
evalID, ok := b.jobs[nsID]
if !ok {
// No blocked evaluation so exit
return
@@ -309,7 +374,7 @@ func (b *BlockedEvals) Untrack(jobID string) {
// Attempt to delete the evaluation
if w, ok := b.captured[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.jobs, nsID)
delete(b.captured, evalID)
b.stats.TotalBlocked--
if w.eval.QuotaLimitReached != "" {
@@ -318,7 +383,7 @@ func (b *BlockedEvals) Untrack(jobID string) {
}
if w, ok := b.escaped[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.jobs, nsID)
delete(b.escaped, evalID)
b.stats.TotalEscaped--
b.stats.TotalBlocked--
@@ -440,7 +505,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
for id, wrapped := range b.escaped {
unblocked[wrapped.eval] = wrapped.token
delete(b.escaped, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
if wrapped.eval.QuotaLimitReached != "" {
numQuotaLimit++
@@ -467,7 +532,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
// is eligible based on the computed node class, or never seen the
// computed node class.
unblocked[wrapped.eval] = wrapped.token
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
delete(b.captured, id)
if wrapped.eval.QuotaLimitReached != "" {
numQuotaLimit++
@@ -502,7 +567,7 @@ func (b *BlockedEvals) UnblockFailed() {
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblocked[wrapped.eval] = wrapped.token
delete(b.captured, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
if wrapped.eval.QuotaLimitReached != "" {
quotaLimit++
}
@@ -513,7 +578,7 @@ func (b *BlockedEvals) UnblockFailed() {
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblocked[wrapped.eval] = wrapped.token
delete(b.escaped, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
b.stats.TotalEscaped -= 1
if wrapped.eval.QuotaLimitReached != "" {
quotaLimit++
@@ -571,7 +636,7 @@ func (b *BlockedEvals) Flush() {
b.stats.TotalQuotaLimit = 0
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[string]string)
b.jobs = make(map[structs.NamespacedID]string)
b.unblockIndexes = make(map[string]uint64)
b.timetable = nil
b.duplicates = nil

View File

@@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@@ -14,7 +15,7 @@ import (
func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) {
broker := testBroker(t, 0)
broker.SetEnabled(true)
blocked := NewBlockedEvals(broker)
blocked := NewBlockedEvals(broker, testlog.HCLogger(t))
blocked.SetEnabled(true)
return blocked, broker
}
@@ -99,14 +100,20 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {
// Create duplicate blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.CreateIndex = 100
e2 := mock.Eval()
e2.JobID = e.JobID
e2.CreateIndex = 101
e3 := mock.Eval()
e3.JobID = e.JobID
e3.CreateIndex = 102
e4 := mock.Eval()
e4.JobID = e.JobID
e4.CreateIndex = 100
blocked.Block(e)
blocked.Block(e2)
// Verify block did track both
// Verify stats such that we are only tracking one
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
@@ -114,8 +121,8 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {
// Get the duplicates.
out := blocked.GetDuplicates(0)
if len(out) != 1 || !reflect.DeepEqual(out[0], e2) {
t.Fatalf("bad: %#v %#v", out, e2)
if len(out) != 1 || !reflect.DeepEqual(out[0], e) {
t.Fatalf("bad: %#v %#v", out, e)
}
// Call block again after a small sleep.
@@ -126,9 +133,28 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {
// Get the duplicates.
out = blocked.GetDuplicates(1 * time.Second)
if len(out) != 1 || !reflect.DeepEqual(out[0], e3) {
if len(out) != 1 || !reflect.DeepEqual(out[0], e2) {
t.Fatalf("bad: %#v %#v", out, e2)
}
// Verify stats such that we are only tracking one
bStats = blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
// Add an older evaluation and assert it gets cancelled
blocked.Block(e4)
out = blocked.GetDuplicates(0)
if len(out) != 1 || !reflect.DeepEqual(out[0], e4) {
t.Fatalf("bad: %#v %#v", out, e4)
}
// Verify stats such that we are only tracking one
bStats = blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}
func TestBlockedEvals_UnblockEscaped(t *testing.T) {
@@ -647,7 +673,7 @@ func TestBlockedEvals_Untrack(t *testing.T) {
}
// Untrack and verify
blocked.Untrack(e.JobID)
blocked.Untrack(e.JobID, e.Namespace)
bStats = blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
@@ -672,7 +698,7 @@ func TestBlockedEvals_Untrack_Quota(t *testing.T) {
}
// Untrack and verify
blocked.Untrack(e.JobID)
blocked.Untrack(e.JobID, e.Namespace)
bs = blocked.Stats()
if bs.TotalBlocked != 0 || bs.TotalEscaped != 0 || bs.TotalQuotaLimit != 0 {
t.Fatalf("bad: %#v", bs)

View File

@@ -7,10 +7,9 @@ import (
"sync"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@@ -632,7 +631,7 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) {
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
n.blockedEvals.Untrack(eval.JobID)
n.blockedEvals.Untrack(eval.JobID, eval.Namespace)
}
}

View File

@@ -48,11 +48,12 @@ func testStateStore(t *testing.T) *state.StateStore {
func testFSM(t *testing.T) *nomadFSM {
broker := testBroker(t, 0)
dispatcher, _ := testPeriodicDispatcher(t)
logger := testlog.HCLogger(t)
fsmConfig := &FSMConfig{
EvalBroker: broker,
Periodic: dispatcher,
Blocked: NewBlockedEvals(broker),
Logger: testlog.HCLogger(t),
Blocked: NewBlockedEvals(broker, logger),
Logger: logger,
Region: "global",
}
fsm, err := NewFSM(fsmConfig)

View File

@@ -627,8 +627,10 @@ func TestLeader_ReapDuplicateEval(t *testing.T) {
// Create a duplicate blocked eval
eval := mock.Eval()
eval.CreateIndex = 100
eval2 := mock.Eval()
eval2.JobID = eval.JobID
eval2.CreateIndex = 102
s1.blockedEvals.Block(eval)
s1.blockedEvals.Block(eval2)
@@ -636,7 +638,7 @@ func TestLeader_ReapDuplicateEval(t *testing.T) {
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
ws := memdb.NewWatchSet()
out, err := state.EvalByID(ws, eval2.ID)
out, err := state.EvalByID(ws, eval.ID)
if err != nil {
return false, err
}

View File

@@ -15,14 +15,12 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/consul/agent/consul/autopilot"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
lru "github.com/hashicorp/golang-lru"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/pool"
@@ -35,6 +33,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
)
@@ -267,9 +266,6 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error)
return nil, err
}
// Create a new blocked eval tracker.
blockedEvals := NewBlockedEvals(evalBroker)
// Configure TLS
tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, true, true)
if err != nil {
@@ -304,7 +300,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error)
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: blockedEvals,
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
shutdownCh: make(chan struct{}),
@@ -401,7 +397,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error)
go s.planQueue.EmitStats(time.Second, s.shutdownCh)
// Emit metrics for the blocked eval tracker.
go blockedEvals.EmitStats(time.Second, s.shutdownCh)
go s.blockedEvals.EmitStats(time.Second, s.shutdownCh)
// Emit metrics for the Vault client.
go s.vault.EmitStats(time.Second, s.shutdownCh)

View File

@@ -164,6 +164,14 @@ type NamespacedID struct {
Namespace string
}
// NewNamespacedID returns a new namespaced ID given the ID and namespace
func NewNamespacedID(id, ns string) NamespacedID {
return NamespacedID{
ID: id,
Namespace: ns,
}
}
func (n NamespacedID) String() string {
return fmt.Sprintf("<ns: %q, id: %q>", n.Namespace, n.ID)
}

View File

@@ -6,10 +6,9 @@ import (
"sync"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
)

View File

@@ -5,7 +5,6 @@ import (
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -246,16 +245,6 @@ func (e *EvalEligibility) HasEscaped() bool {
func (e *EvalEligibility) GetClasses() map[string]bool {
elig := make(map[string]bool)
// Go through the job.
for class, feas := range e.job {
switch feas {
case EvalComputedClassEligible:
elig[class] = true
case EvalComputedClassIneligible:
elig[class] = false
}
}
// Go through the task groups.
for _, classes := range e.taskGroups {
for class, feas := range classes {
@@ -273,6 +262,21 @@ func (e *EvalEligibility) GetClasses() map[string]bool {
}
}
// Go through the job.
for class, feas := range e.job {
switch feas {
case EvalComputedClassEligible:
// Only mark as eligible if it hasn't been marked before. This
// prevents the job marking a class as eligible when it is ineligible
// to all the task groups.
if _, ok := elig[class]; !ok {
elig[class] = true
}
case EvalComputedClassIneligible:
elig[class] = false
}
}
return elig
}

View File

@@ -1,7 +1,6 @@
package scheduler
import (
"reflect"
"testing"
"github.com/hashicorp/nomad/helper/testlog"
@@ -9,6 +8,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func testContext(t testing.TB) (*state.StateStore, *EvalContext) {
@@ -260,7 +260,7 @@ func TestEvalEligibility_GetClasses(t *testing.T) {
e.SetTaskGroupEligibility(false, "fizz", "v1:3")
expClasses := map[string]bool{
"v1:1": true,
"v1:1": false,
"v1:2": false,
"v1:3": true,
"v1:4": false,
@@ -268,7 +268,27 @@ func TestEvalEligibility_GetClasses(t *testing.T) {
}
actClasses := e.GetClasses()
if !reflect.DeepEqual(actClasses, expClasses) {
t.Fatalf("GetClasses() returned %#v; want %#v", actClasses, expClasses)
}
require.Equal(t, expClasses, actClasses)
}
func TestEvalEligibility_GetClasses_JobEligible_TaskGroupIneligible(t *testing.T) {
e := NewEvalEligibility()
e.SetJobEligibility(true, "v1:1")
e.SetTaskGroupEligibility(false, "foo", "v1:1")
e.SetJobEligibility(true, "v1:2")
e.SetTaskGroupEligibility(false, "foo", "v1:2")
e.SetTaskGroupEligibility(true, "bar", "v1:2")
e.SetJobEligibility(true, "v1:3")
e.SetTaskGroupEligibility(false, "foo", "v1:3")
e.SetTaskGroupEligibility(false, "bar", "v1:3")
expClasses := map[string]bool{
"v1:1": false,
"v1:2": true,
"v1:3": false,
}
actClasses := e.GetClasses()
require.Equal(t, expClasses, actClasses)
}