mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 10:55:42 +03:00
batch test
This commit is contained in:
@@ -8,14 +8,11 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// evalBatchDuration is the duration in which evaluations are batched before
|
||||
// commiting to Raft.
|
||||
evalBatchDuration = 200 * time.Millisecond
|
||||
)
|
||||
|
||||
// EvalBatcher is used to batch the creation of evaluations
|
||||
type EvalBatcher struct {
|
||||
// batch is the batching duration
|
||||
batch time.Duration
|
||||
|
||||
// raft is used to actually commit the evaluations
|
||||
raft DeploymentRaftEndpoints
|
||||
|
||||
@@ -34,11 +31,12 @@ type EvalBatcher struct {
|
||||
// NewEvalBatcher returns an EvalBatcher that uses the passed raft endpoints to
|
||||
// create the evaluations and exits the batcher when the passed exit channel is
|
||||
// closed.
|
||||
func NewEvalBatcher(raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher {
|
||||
func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher {
|
||||
b := &EvalBatcher{
|
||||
raft: raft,
|
||||
ctx: ctx,
|
||||
inCh: make(chan *structs.Evaluation, 10),
|
||||
batch: batchDuration,
|
||||
raft: raft,
|
||||
ctx: ctx,
|
||||
inCh: make(chan *structs.Evaluation, 10),
|
||||
}
|
||||
|
||||
go b.batcher()
|
||||
@@ -49,11 +47,10 @@ func NewEvalBatcher(raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatc
|
||||
// tracks the evaluations creation.
|
||||
func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
if b.f == nil {
|
||||
b.f = NewEvalFuture()
|
||||
}
|
||||
b.l.Unlock()
|
||||
|
||||
b.inCh <- e
|
||||
return b.f
|
||||
@@ -61,17 +58,26 @@ func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
|
||||
|
||||
// batcher is the long lived batcher goroutine
|
||||
func (b *EvalBatcher) batcher() {
|
||||
ticker := time.NewTicker(evalBatchDuration)
|
||||
timer := time.NewTimer(b.batch)
|
||||
evals := make(map[string]*structs.Evaluation)
|
||||
for {
|
||||
select {
|
||||
case <-b.ctx.Done():
|
||||
ticker.Stop()
|
||||
timer.Stop()
|
||||
return
|
||||
case e := <-b.inCh:
|
||||
evals[e.DeploymentID] = e
|
||||
case <-ticker.C:
|
||||
if len(evals) == 0 {
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
timer.Reset(b.batch)
|
||||
}
|
||||
|
||||
evals[e.DeploymentID] = e
|
||||
case <-timer.C:
|
||||
if len(evals) == 0 {
|
||||
// Reset the timer
|
||||
timer.Reset(b.batch)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -83,6 +89,8 @@ func (b *EvalBatcher) batcher() {
|
||||
|
||||
// Shouldn't be possible but protect ourselves
|
||||
if f == nil {
|
||||
// Reset the timer
|
||||
timer.Reset(b.batch)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -97,6 +105,9 @@ func (b *EvalBatcher) batcher() {
|
||||
|
||||
// Reset the evals list
|
||||
evals = make(map[string]*structs.Evaluation)
|
||||
|
||||
// Reset the timer
|
||||
timer.Reset(b.batch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
@@ -54,9 +55,13 @@ type DeploymentStateWatchers interface {
|
||||
}
|
||||
|
||||
const (
|
||||
// limitStateQueriesPerSecond is the number of state queries allowed per
|
||||
// LimitStateQueriesPerSecond is the number of state queries allowed per
|
||||
// second
|
||||
limitStateQueriesPerSecond = 15.0
|
||||
LimitStateQueriesPerSecond = 15.0
|
||||
|
||||
// EvalBatchDuration is the duration in which evaluations are batched before
|
||||
// commiting to Raft.
|
||||
EvalBatchDuration = 250 * time.Millisecond
|
||||
)
|
||||
|
||||
// Watcher is used to watch deployments and their allocations created
|
||||
@@ -69,6 +74,10 @@ type Watcher struct {
|
||||
// queryLimiter is used to limit the rate of blocking queries
|
||||
queryLimiter *rate.Limiter
|
||||
|
||||
// evalBatchDuration is the duration to batch eval creation across all
|
||||
// deployment watchers
|
||||
evalBatchDuration time.Duration
|
||||
|
||||
// raft contains the set of Raft endpoints that can be used by the
|
||||
// deployments watcher
|
||||
raft DeploymentRaftEndpoints
|
||||
@@ -92,17 +101,23 @@ type Watcher struct {
|
||||
|
||||
// NewDeploymentsWatcher returns a deployments watcher that is used to watch
|
||||
// deployments and trigger the scheduler as needed.
|
||||
func NewDeploymentsWatcher(logger *log.Logger, w DeploymentStateWatchers, raft DeploymentRaftEndpoints) *Watcher {
|
||||
func NewDeploymentsWatcher(
|
||||
logger *log.Logger,
|
||||
w DeploymentStateWatchers,
|
||||
raft DeploymentRaftEndpoints,
|
||||
stateQueriesPerSecond float64,
|
||||
evalBatchDuration time.Duration) *Watcher {
|
||||
ctx, exitFn := context.WithCancel(context.Background())
|
||||
return &Watcher{
|
||||
queryLimiter: rate.NewLimiter(limitStateQueriesPerSecond, 100),
|
||||
stateWatchers: w,
|
||||
raft: raft,
|
||||
watchers: make(map[string]*deploymentWatcher, 32),
|
||||
evalBatcher: NewEvalBatcher(raft, ctx),
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
exitFn: exitFn,
|
||||
queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100),
|
||||
evalBatchDuration: evalBatchDuration,
|
||||
stateWatchers: w,
|
||||
raft: raft,
|
||||
watchers: make(map[string]*deploymentWatcher, 32),
|
||||
evalBatcher: NewEvalBatcher(evalBatchDuration, raft, ctx),
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
exitFn: exitFn,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,7 +151,7 @@ func (w *Watcher) Flush() {
|
||||
|
||||
w.watchers = make(map[string]*deploymentWatcher, 32)
|
||||
w.ctx, w.exitFn = context.WithCancel(context.Background())
|
||||
w.evalBatcher = NewEvalBatcher(w.raft, w.ctx)
|
||||
w.evalBatcher = NewEvalBatcher(w.evalBatchDuration, w.raft, w.ctx)
|
||||
}
|
||||
|
||||
// watchDeployments is the long lived go-routine that watches for deployments to
|
||||
@@ -309,11 +324,7 @@ func (w *Watcher) PauseDeployment(req *structs.DeploymentPauseRequest, resp *str
|
||||
// createEvaluation commits the given evaluation to Raft but batches the commit
|
||||
// with other calls.
|
||||
func (w *Watcher) createEvaluation(eval *structs.Evaluation) (uint64, error) {
|
||||
w.l.Lock()
|
||||
f := w.evalBatcher.CreateEval(eval)
|
||||
w.l.Unlock()
|
||||
|
||||
return f.Results()
|
||||
return w.evalBatcher.CreateEval(eval).Results()
|
||||
}
|
||||
|
||||
// upsertJob commits the given job to Raft
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
func TestWatcher_WatchDeployments(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Return no allocations or evals
|
||||
m.On("Allocations", mocker.Anything, mocker.Anything).Return(nil).Run(func(args mocker.Arguments) {
|
||||
@@ -100,7 +100,7 @@ func TestWatcher_WatchDeployments(t *testing.T) {
|
||||
func TestWatcher_UnknownDeployment(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
w.SetEnabled(true)
|
||||
|
||||
// Set up the calls for retrieving deployments
|
||||
@@ -146,7 +146,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) {
|
||||
func TestWatcher_SetAllocHealth_Unknown(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Create a job, and a deployment
|
||||
j := mock.Job()
|
||||
@@ -195,7 +195,7 @@ func TestWatcher_SetAllocHealth_Unknown(t *testing.T) {
|
||||
func TestWatcher_SetAllocHealth_Healthy(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Create a job, alloc, and a deployment
|
||||
j := mock.Job()
|
||||
@@ -245,7 +245,7 @@ func TestWatcher_SetAllocHealth_Healthy(t *testing.T) {
|
||||
func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Create a job, alloc, and a deployment
|
||||
j := mock.Job()
|
||||
@@ -302,7 +302,7 @@ func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) {
|
||||
func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Create a job, alloc, and a deployment
|
||||
j := mock.Job()
|
||||
@@ -371,7 +371,7 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) {
|
||||
func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Create a job, canary alloc, and a deployment
|
||||
j := mock.Job()
|
||||
@@ -430,7 +430,7 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) {
|
||||
func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Create a job, canary alloc, and a deployment
|
||||
j := mock.Job()
|
||||
@@ -489,7 +489,7 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) {
|
||||
func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Create a job and a deployment
|
||||
j := mock.Job()
|
||||
@@ -537,7 +537,7 @@ func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) {
|
||||
func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Create a job and a deployment
|
||||
j := mock.Job()
|
||||
@@ -586,7 +586,7 @@ func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) {
|
||||
func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Create a job and a deployment
|
||||
j := mock.Job()
|
||||
@@ -636,7 +636,7 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) {
|
||||
func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
|
||||
|
||||
// Create a job and a deployment
|
||||
j := mock.Job()
|
||||
@@ -686,7 +686,7 @@ func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) {
|
||||
func TestDeploymentWatcher_Watch(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, 1000.0, 1*time.Millisecond)
|
||||
|
||||
// Create a job, alloc, and a deployment
|
||||
j := mock.Job()
|
||||
@@ -735,8 +735,7 @@ func TestDeploymentWatcher_Watch(t *testing.T) {
|
||||
HealthyAllocationIDs: []string{a.ID},
|
||||
},
|
||||
}
|
||||
i := m.nextIndex()
|
||||
assert.Nil(m.state.UpsertDeploymentAllocHealth(i, req), "UpsertDeploymentAllocHealth")
|
||||
assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth")
|
||||
}
|
||||
|
||||
// Wait for there to be one eval
|
||||
@@ -775,8 +774,7 @@ func TestDeploymentWatcher_Watch(t *testing.T) {
|
||||
UnhealthyAllocationIDs: []string{a.ID},
|
||||
},
|
||||
}
|
||||
i := m.nextIndex()
|
||||
assert.Nil(m.state.UpsertDeploymentAllocHealth(i, req2), "UpsertDeploymentAllocHealth")
|
||||
assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth")
|
||||
|
||||
// Wait for there to be one eval
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
@@ -813,3 +811,108 @@ func TestDeploymentWatcher_Watch(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test evaluations are batched between watchers
|
||||
func TestWatcher_BatchEvals(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m, 1000.0, EvalBatchDuration)
|
||||
|
||||
// Create a job, alloc, for two deployments
|
||||
j1 := mock.Job()
|
||||
d1 := mock.Deployment()
|
||||
d1.JobID = j1.ID
|
||||
a1 := mock.Alloc()
|
||||
a1.DeploymentID = d1.ID
|
||||
|
||||
j2 := mock.Job()
|
||||
d2 := mock.Deployment()
|
||||
d2.JobID = j2.ID
|
||||
a2 := mock.Alloc()
|
||||
a2.DeploymentID = d2.ID
|
||||
|
||||
assert.Nil(m.state.UpsertJob(m.nextIndex(), j1), "UpsertJob")
|
||||
assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob")
|
||||
assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d1, false), "UpsertDeployment")
|
||||
assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d2, false), "UpsertDeployment")
|
||||
assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a1}), "UpsertAllocs")
|
||||
assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs")
|
||||
|
||||
// Assert the following methods will be called
|
||||
m.On("List", mocker.Anything, mocker.Anything).Return(nil).Run(m.listFromState)
|
||||
|
||||
m.On("Allocations", mocker.MatchedBy(matchDeploymentSpecificRequest(d1.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.allocationsFromState)
|
||||
m.On("Allocations", mocker.MatchedBy(matchDeploymentSpecificRequest(d2.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.allocationsFromState)
|
||||
|
||||
m.On("Evaluations", mocker.MatchedBy(matchJobSpecificRequest(j1.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.evaluationsFromState)
|
||||
m.On("Evaluations", mocker.MatchedBy(matchJobSpecificRequest(j2.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.evaluationsFromState)
|
||||
|
||||
m.On("GetJob", mocker.MatchedBy(matchJobSpecificRequest(j1.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.getJobFromState)
|
||||
m.On("GetJob", mocker.MatchedBy(matchJobSpecificRequest(j2.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.getJobFromState)
|
||||
|
||||
m.On("GetJobVersions", mocker.MatchedBy(matchJobSpecificRequest(j1.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.getJobVersionsFromState)
|
||||
m.On("GetJobVersions", mocker.MatchedBy(matchJobSpecificRequest(j2.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.getJobVersionsFromState)
|
||||
|
||||
w.SetEnabled(true)
|
||||
testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil },
|
||||
func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") })
|
||||
|
||||
// Assert that we will get a createEvaluation call only once and it contains
|
||||
// both deployments. This will verify that the watcher is batching
|
||||
// allocation changes
|
||||
m1 := matchUpsertEvals([]string{d1.ID, d2.ID})
|
||||
m.On("UpsertEvals", mocker.MatchedBy(m1)).Return(nil).Once()
|
||||
|
||||
// Update the allocs health to healthy which should create an evaluation
|
||||
req := &structs.ApplyDeploymentAllocHealthRequest{
|
||||
DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{
|
||||
DeploymentID: d1.ID,
|
||||
HealthyAllocationIDs: []string{a1.ID},
|
||||
},
|
||||
}
|
||||
assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth")
|
||||
|
||||
req2 := &structs.ApplyDeploymentAllocHealthRequest{
|
||||
DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{
|
||||
DeploymentID: d2.ID,
|
||||
HealthyAllocationIDs: []string{a2.ID},
|
||||
},
|
||||
}
|
||||
assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth")
|
||||
|
||||
// Wait for there to be one eval for each job
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
ws := memdb.NewWatchSet()
|
||||
evals1, err := m.state.EvalsByJob(ws, j1.ID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
evals2, err := m.state.EvalsByJob(ws, j2.ID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if l := len(evals1); l != 1 {
|
||||
return false, fmt.Errorf("Got %d evals; want 1", l)
|
||||
}
|
||||
|
||||
if l := len(evals2); l != 1 {
|
||||
return false, fmt.Errorf("Got %d evals; want 1", l)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
|
||||
m.AssertCalled(t, "UpsertEvals", mocker.MatchedBy(m1))
|
||||
testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil },
|
||||
func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") })
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
)
|
||||
|
||||
func testLogger() *log.Logger {
|
||||
return log.New(os.Stderr, "", log.LstdFlags)
|
||||
return log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds)
|
||||
}
|
||||
|
||||
type mockBackend struct {
|
||||
|
||||
Reference in New Issue
Block a user