diff --git a/nomad/deploymentwatcher/batcher.go b/nomad/deploymentwatcher/batcher.go index a47ea37fa..c6b6d9056 100644 --- a/nomad/deploymentwatcher/batcher.go +++ b/nomad/deploymentwatcher/batcher.go @@ -8,14 +8,11 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -const ( - // evalBatchDuration is the duration in which evaluations are batched before - // commiting to Raft. - evalBatchDuration = 200 * time.Millisecond -) - // EvalBatcher is used to batch the creation of evaluations type EvalBatcher struct { + // batch is the batching duration + batch time.Duration + // raft is used to actually commit the evaluations raft DeploymentRaftEndpoints @@ -34,11 +31,12 @@ type EvalBatcher struct { // NewEvalBatcher returns an EvalBatcher that uses the passed raft endpoints to // create the evaluations and exits the batcher when the passed exit channel is // closed. -func NewEvalBatcher(raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher { +func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher { b := &EvalBatcher{ - raft: raft, - ctx: ctx, - inCh: make(chan *structs.Evaluation, 10), + batch: batchDuration, + raft: raft, + ctx: ctx, + inCh: make(chan *structs.Evaluation, 10), } go b.batcher() @@ -49,11 +47,10 @@ func NewEvalBatcher(raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatc // tracks the evaluations creation. func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture { b.l.Lock() - defer b.l.Unlock() - if b.f == nil { b.f = NewEvalFuture() } + b.l.Unlock() b.inCh <- e return b.f @@ -61,17 +58,26 @@ func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture { // batcher is the long lived batcher goroutine func (b *EvalBatcher) batcher() { - ticker := time.NewTicker(evalBatchDuration) + timer := time.NewTimer(b.batch) evals := make(map[string]*structs.Evaluation) for { select { case <-b.ctx.Done(): - ticker.Stop() + timer.Stop() return case e := <-b.inCh: - evals[e.DeploymentID] = e - case <-ticker.C: if len(evals) == 0 { + if !timer.Stop() { + <-timer.C + } + timer.Reset(b.batch) + } + + evals[e.DeploymentID] = e + case <-timer.C: + if len(evals) == 0 { + // Reset the timer + timer.Reset(b.batch) continue } @@ -83,6 +89,8 @@ func (b *EvalBatcher) batcher() { // Shouldn't be possible but protect ourselves if f == nil { + // Reset the timer + timer.Reset(b.batch) continue } @@ -97,6 +105,9 @@ func (b *EvalBatcher) batcher() { // Reset the evals list evals = make(map[string]*structs.Evaluation) + + // Reset the timer + timer.Reset(b.batch) } } } diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index e9571b4c3..dce4e4734 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "sync" + "time" "golang.org/x/time/rate" @@ -54,9 +55,13 @@ type DeploymentStateWatchers interface { } const ( - // limitStateQueriesPerSecond is the number of state queries allowed per + // LimitStateQueriesPerSecond is the number of state queries allowed per // second - limitStateQueriesPerSecond = 15.0 + LimitStateQueriesPerSecond = 15.0 + + // EvalBatchDuration is the duration in which evaluations are batched before + // commiting to Raft. + EvalBatchDuration = 250 * time.Millisecond ) // Watcher is used to watch deployments and their allocations created @@ -69,6 +74,10 @@ type Watcher struct { // queryLimiter is used to limit the rate of blocking queries queryLimiter *rate.Limiter + // evalBatchDuration is the duration to batch eval creation across all + // deployment watchers + evalBatchDuration time.Duration + // raft contains the set of Raft endpoints that can be used by the // deployments watcher raft DeploymentRaftEndpoints @@ -92,17 +101,23 @@ type Watcher struct { // NewDeploymentsWatcher returns a deployments watcher that is used to watch // deployments and trigger the scheduler as needed. -func NewDeploymentsWatcher(logger *log.Logger, w DeploymentStateWatchers, raft DeploymentRaftEndpoints) *Watcher { +func NewDeploymentsWatcher( + logger *log.Logger, + w DeploymentStateWatchers, + raft DeploymentRaftEndpoints, + stateQueriesPerSecond float64, + evalBatchDuration time.Duration) *Watcher { ctx, exitFn := context.WithCancel(context.Background()) return &Watcher{ - queryLimiter: rate.NewLimiter(limitStateQueriesPerSecond, 100), - stateWatchers: w, - raft: raft, - watchers: make(map[string]*deploymentWatcher, 32), - evalBatcher: NewEvalBatcher(raft, ctx), - logger: logger, - ctx: ctx, - exitFn: exitFn, + queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100), + evalBatchDuration: evalBatchDuration, + stateWatchers: w, + raft: raft, + watchers: make(map[string]*deploymentWatcher, 32), + evalBatcher: NewEvalBatcher(evalBatchDuration, raft, ctx), + logger: logger, + ctx: ctx, + exitFn: exitFn, } } @@ -136,7 +151,7 @@ func (w *Watcher) Flush() { w.watchers = make(map[string]*deploymentWatcher, 32) w.ctx, w.exitFn = context.WithCancel(context.Background()) - w.evalBatcher = NewEvalBatcher(w.raft, w.ctx) + w.evalBatcher = NewEvalBatcher(w.evalBatchDuration, w.raft, w.ctx) } // watchDeployments is the long lived go-routine that watches for deployments to @@ -309,11 +324,7 @@ func (w *Watcher) PauseDeployment(req *structs.DeploymentPauseRequest, resp *str // createEvaluation commits the given evaluation to Raft but batches the commit // with other calls. func (w *Watcher) createEvaluation(eval *structs.Evaluation) (uint64, error) { - w.l.Lock() - f := w.evalBatcher.CreateEval(eval) - w.l.Unlock() - - return f.Results() + return w.evalBatcher.CreateEval(eval).Results() } // upsertJob commits the given job to Raft diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index 527f3778e..2e1056c63 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -18,7 +18,7 @@ import ( func TestWatcher_WatchDeployments(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Return no allocations or evals m.On("Allocations", mocker.Anything, mocker.Anything).Return(nil).Run(func(args mocker.Arguments) { @@ -100,7 +100,7 @@ func TestWatcher_WatchDeployments(t *testing.T) { func TestWatcher_UnknownDeployment(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) w.SetEnabled(true) // Set up the calls for retrieving deployments @@ -146,7 +146,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Create a job, and a deployment j := mock.Job() @@ -195,7 +195,7 @@ func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Create a job, alloc, and a deployment j := mock.Job() @@ -245,7 +245,7 @@ func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Create a job, alloc, and a deployment j := mock.Job() @@ -302,7 +302,7 @@ func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Create a job, alloc, and a deployment j := mock.Job() @@ -371,7 +371,7 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Create a job, canary alloc, and a deployment j := mock.Job() @@ -430,7 +430,7 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Create a job, canary alloc, and a deployment j := mock.Job() @@ -489,7 +489,7 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Create a job and a deployment j := mock.Job() @@ -537,7 +537,7 @@ func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Create a job and a deployment j := mock.Job() @@ -586,7 +586,7 @@ func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Create a job and a deployment j := mock.Job() @@ -636,7 +636,7 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration) // Create a job and a deployment j := mock.Job() @@ -686,7 +686,7 @@ func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { func TestDeploymentWatcher_Watch(t *testing.T) { assert := assert.New(t) m := newMockBackend(t) - w := NewDeploymentsWatcher(testLogger(), m, m) + w := NewDeploymentsWatcher(testLogger(), m, m, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment j := mock.Job() @@ -735,8 +735,7 @@ func TestDeploymentWatcher_Watch(t *testing.T) { HealthyAllocationIDs: []string{a.ID}, }, } - i := m.nextIndex() - assert.Nil(m.state.UpsertDeploymentAllocHealth(i, req), "UpsertDeploymentAllocHealth") + assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") } // Wait for there to be one eval @@ -775,8 +774,7 @@ func TestDeploymentWatcher_Watch(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - i := m.nextIndex() - assert.Nil(m.state.UpsertDeploymentAllocHealth(i, req2), "UpsertDeploymentAllocHealth") + assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") // Wait for there to be one eval testutil.WaitForResult(func() (bool, error) { @@ -813,3 +811,108 @@ func TestDeploymentWatcher_Watch(t *testing.T) { } // Test evaluations are batched between watchers +func TestWatcher_BatchEvals(t *testing.T) { + assert := assert.New(t) + m := newMockBackend(t) + w := NewDeploymentsWatcher(testLogger(), m, m, 1000.0, EvalBatchDuration) + + // Create a job, alloc, for two deployments + j1 := mock.Job() + d1 := mock.Deployment() + d1.JobID = j1.ID + a1 := mock.Alloc() + a1.DeploymentID = d1.ID + + j2 := mock.Job() + d2 := mock.Deployment() + d2.JobID = j2.ID + a2 := mock.Alloc() + a2.DeploymentID = d2.ID + + assert.Nil(m.state.UpsertJob(m.nextIndex(), j1), "UpsertJob") + assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob") + assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d1, false), "UpsertDeployment") + assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d2, false), "UpsertDeployment") + assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a1}), "UpsertAllocs") + assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs") + + // Assert the following methods will be called + m.On("List", mocker.Anything, mocker.Anything).Return(nil).Run(m.listFromState) + + m.On("Allocations", mocker.MatchedBy(matchDeploymentSpecificRequest(d1.ID)), + mocker.Anything).Return(nil).Run(m.allocationsFromState) + m.On("Allocations", mocker.MatchedBy(matchDeploymentSpecificRequest(d2.ID)), + mocker.Anything).Return(nil).Run(m.allocationsFromState) + + m.On("Evaluations", mocker.MatchedBy(matchJobSpecificRequest(j1.ID)), + mocker.Anything).Return(nil).Run(m.evaluationsFromState) + m.On("Evaluations", mocker.MatchedBy(matchJobSpecificRequest(j2.ID)), + mocker.Anything).Return(nil).Run(m.evaluationsFromState) + + m.On("GetJob", mocker.MatchedBy(matchJobSpecificRequest(j1.ID)), + mocker.Anything).Return(nil).Run(m.getJobFromState) + m.On("GetJob", mocker.MatchedBy(matchJobSpecificRequest(j2.ID)), + mocker.Anything).Return(nil).Run(m.getJobFromState) + + m.On("GetJobVersions", mocker.MatchedBy(matchJobSpecificRequest(j1.ID)), + mocker.Anything).Return(nil).Run(m.getJobVersionsFromState) + m.On("GetJobVersions", mocker.MatchedBy(matchJobSpecificRequest(j2.ID)), + mocker.Anything).Return(nil).Run(m.getJobVersionsFromState) + + w.SetEnabled(true) + testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil }, + func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") }) + + // Assert that we will get a createEvaluation call only once and it contains + // both deployments. This will verify that the watcher is batching + // allocation changes + m1 := matchUpsertEvals([]string{d1.ID, d2.ID}) + m.On("UpsertEvals", mocker.MatchedBy(m1)).Return(nil).Once() + + // Update the allocs health to healthy which should create an evaluation + req := &structs.ApplyDeploymentAllocHealthRequest{ + DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ + DeploymentID: d1.ID, + HealthyAllocationIDs: []string{a1.ID}, + }, + } + assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") + + req2 := &structs.ApplyDeploymentAllocHealthRequest{ + DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ + DeploymentID: d2.ID, + HealthyAllocationIDs: []string{a2.ID}, + }, + } + assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + + // Wait for there to be one eval for each job + testutil.WaitForResult(func() (bool, error) { + ws := memdb.NewWatchSet() + evals1, err := m.state.EvalsByJob(ws, j1.ID) + if err != nil { + return false, err + } + + evals2, err := m.state.EvalsByJob(ws, j2.ID) + if err != nil { + return false, err + } + + if l := len(evals1); l != 1 { + return false, fmt.Errorf("Got %d evals; want 1", l) + } + + if l := len(evals2); l != 1 { + return false, fmt.Errorf("Got %d evals; want 1", l) + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) + + m.AssertCalled(t, "UpsertEvals", mocker.MatchedBy(m1)) + testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil }, + func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") }) +} diff --git a/nomad/deploymentwatcher/testutil_test.go b/nomad/deploymentwatcher/testutil_test.go index 71b49b9cc..0c28cba9f 100644 --- a/nomad/deploymentwatcher/testutil_test.go +++ b/nomad/deploymentwatcher/testutil_test.go @@ -15,7 +15,7 @@ import ( ) func testLogger() *log.Logger { - return log.New(os.Stderr, "", log.LstdFlags) + return log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds) } type mockBackend struct {