mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 18:05:42 +03:00
@@ -30,7 +30,7 @@ func gcConfig() *GCConfig {
|
||||
// runners to be terminal
|
||||
func exitAllocRunner(runners ...AllocRunner) {
|
||||
for _, ar := range runners {
|
||||
terminalAlloc := ar.Alloc()
|
||||
terminalAlloc := ar.Alloc().Copy()
|
||||
terminalAlloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
ar.Update(terminalAlloc)
|
||||
}
|
||||
|
||||
@@ -242,6 +242,7 @@ func (m *manager) waitForFirstFingerprint(ctx context.Context, cancel context.Ca
|
||||
return
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
var availDrivers []string
|
||||
var wg sync.WaitGroup
|
||||
|
||||
@@ -253,7 +254,9 @@ func (m *manager) waitForFirstFingerprint(ctx context.Context, cancel context.Ca
|
||||
defer wg.Done()
|
||||
instance.WaitForFirstFingerprint(ctx)
|
||||
if instance.getLastHealth() != drivers.HealthStateUndetected {
|
||||
mu.Lock()
|
||||
availDrivers = append(availDrivers, name)
|
||||
mu.Unlock()
|
||||
}
|
||||
}(n, i)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
@@ -138,8 +138,8 @@ func (b *BlockedEvals) SetEnabled(enabled bool) {
|
||||
b.l.Unlock()
|
||||
return
|
||||
} else if enabled {
|
||||
go b.watchCapacity()
|
||||
go b.prune()
|
||||
go b.watchCapacity(b.stopCh, b.capacityChangeCh)
|
||||
go b.prune(b.stopCh)
|
||||
} else {
|
||||
close(b.stopCh)
|
||||
}
|
||||
@@ -475,12 +475,12 @@ func (b *BlockedEvals) UnblockClassAndQuota(class, quota string, index uint64) {
|
||||
|
||||
// watchCapacity is a long lived function that watches for capacity changes in
|
||||
// nodes and unblocks the correct set of evals.
|
||||
func (b *BlockedEvals) watchCapacity() {
|
||||
func (b *BlockedEvals) watchCapacity(stopCh <-chan struct{}, changeCh <-chan *capacityUpdate) {
|
||||
for {
|
||||
select {
|
||||
case <-b.stopCh:
|
||||
case <-stopCh:
|
||||
return
|
||||
case update := <-b.capacityChangeCh:
|
||||
case update := <-changeCh:
|
||||
b.unblock(update.computedClass, update.quotaChange, update.index)
|
||||
}
|
||||
}
|
||||
@@ -606,6 +606,11 @@ SCAN:
|
||||
b.l.Unlock()
|
||||
return dups
|
||||
}
|
||||
|
||||
// Capture chans inside the lock to prevent a race with them getting
|
||||
// reset in Flush
|
||||
dupCh := b.duplicateCh
|
||||
stopCh := b.stopCh
|
||||
b.l.Unlock()
|
||||
|
||||
// Create the timer
|
||||
@@ -616,11 +621,11 @@ SCAN:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-b.stopCh:
|
||||
case <-stopCh:
|
||||
return nil
|
||||
case <-timeoutCh:
|
||||
return nil
|
||||
case <-b.duplicateCh:
|
||||
case <-dupCh:
|
||||
goto SCAN
|
||||
}
|
||||
}
|
||||
@@ -676,13 +681,13 @@ func (b *BlockedEvals) EmitStats(period time.Duration, stopCh chan struct{}) {
|
||||
}
|
||||
|
||||
// prune is a long lived function that prunes unnecessary objects on a timer.
|
||||
func (b *BlockedEvals) prune() {
|
||||
func (b *BlockedEvals) prune(stopCh <-chan struct{}) {
|
||||
ticker := time.NewTicker(pruneInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-b.stopCh:
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
b.pruneUnblockIndexes()
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
"context"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/lib/delayheap"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -167,7 +167,7 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
|
||||
// start the go routine for delayed evals
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
b.delayedEvalCancelFunc = cancel
|
||||
go b.runDelayedEvalsWatcher(ctx)
|
||||
go b.runDelayedEvalsWatcher(ctx, b.delayedEvalsUpdateCh)
|
||||
}
|
||||
b.l.Unlock()
|
||||
if !enabled {
|
||||
@@ -741,7 +741,7 @@ func (d *evalWrapper) Namespace() string {
|
||||
|
||||
// runDelayedEvalsWatcher is a long-lived function that waits till a time deadline is met for
|
||||
// pending evaluations before enqueuing them
|
||||
func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context) {
|
||||
func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan struct{}) {
|
||||
var timerChannel <-chan time.Time
|
||||
var delayTimer *time.Timer
|
||||
for {
|
||||
@@ -768,7 +768,7 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context) {
|
||||
b.stats.TotalWaiting -= 1
|
||||
b.enqueueLocked(eval, eval.Type)
|
||||
b.l.Unlock()
|
||||
case <-b.delayedEvalsUpdateCh:
|
||||
case <-updateCh:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -399,6 +399,8 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
|
||||
|
||||
// Check that the new leader is tracking the periodic job only
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
leader.periodicDispatcher.l.Lock()
|
||||
defer leader.periodicDispatcher.l.Unlock()
|
||||
if _, tracked := leader.periodicDispatcher.tracked[tuplePeriodic]; !tracked {
|
||||
return false, fmt.Errorf("periodic job not tracked")
|
||||
}
|
||||
|
||||
@@ -173,7 +173,7 @@ func (p *PeriodicDispatch) SetEnabled(enabled bool) {
|
||||
// If we are transitioning from disabled to enabled, run the daemon.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
p.stopFn = cancel
|
||||
go p.run(ctx)
|
||||
go p.run(ctx, p.updateCh)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,7 +320,7 @@ func (p *PeriodicDispatch) shouldRun() bool {
|
||||
|
||||
// run is a long-lived function that waits till a job's periodic spec is met and
|
||||
// then creates an evaluation to run the job.
|
||||
func (p *PeriodicDispatch) run(ctx context.Context) {
|
||||
func (p *PeriodicDispatch) run(ctx context.Context, updateCh <-chan struct{}) {
|
||||
var launchCh <-chan time.Time
|
||||
for p.shouldRun() {
|
||||
job, launch := p.nextLaunch()
|
||||
@@ -335,7 +335,7 @@ func (p *PeriodicDispatch) run(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-p.updateCh:
|
||||
case <-updateCh:
|
||||
continue
|
||||
case <-launchCh:
|
||||
p.dispatch(job, launch)
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"testing"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/acl"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -120,8 +120,9 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) {
|
||||
ws := memdb.NewWatchSet()
|
||||
eval, err := state.EvalByID(ws, resp.EvalID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(eval)
|
||||
assert.Equal(eval.CreateIndex, resp.EvalCreateIndex)
|
||||
if assert.NotNil(eval) {
|
||||
assert.Equal(eval.CreateIndex, resp.EvalCreateIndex)
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch the response with management token
|
||||
@@ -135,8 +136,9 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) {
|
||||
ws := memdb.NewWatchSet()
|
||||
eval, err := state.EvalByID(ws, resp.EvalID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(eval)
|
||||
assert.Equal(eval.CreateIndex, resp.EvalCreateIndex)
|
||||
if assert.NotNil(eval) {
|
||||
assert.Equal(eval.CreateIndex, resp.EvalCreateIndex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user