mirror of
https://github.com/kemko/nomad.git
synced 2026-01-10 12:25:42 +03:00
simplify the batcher's timers
This commit is contained in:
@@ -58,40 +58,28 @@ func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
|
||||
|
||||
// batcher is the long lived batcher goroutine
|
||||
func (b *EvalBatcher) batcher() {
|
||||
timer := time.NewTimer(b.batch)
|
||||
var timerCh <-chan time.Time
|
||||
evals := make(map[string]*structs.Evaluation)
|
||||
for {
|
||||
select {
|
||||
case <-b.ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
case e := <-b.inCh:
|
||||
if len(evals) == 0 {
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
timer.Reset(b.batch)
|
||||
if timerCh == nil {
|
||||
timerCh = time.After(b.batch)
|
||||
}
|
||||
|
||||
evals[e.DeploymentID] = e
|
||||
case <-timer.C:
|
||||
if len(evals) == 0 {
|
||||
// Reset the timer
|
||||
timer.Reset(b.batch)
|
||||
continue
|
||||
}
|
||||
|
||||
case <-timerCh:
|
||||
// Capture the future
|
||||
b.l.Lock()
|
||||
f := b.f
|
||||
b.f = nil
|
||||
b.l.Unlock()
|
||||
|
||||
// Shouldn't be possible but protect ourselves
|
||||
// Shouldn't be possible
|
||||
if f == nil {
|
||||
// Reset the timer
|
||||
timer.Reset(b.batch)
|
||||
continue
|
||||
panic("no future")
|
||||
}
|
||||
|
||||
// Capture the evals
|
||||
@@ -100,14 +88,13 @@ func (b *EvalBatcher) batcher() {
|
||||
all = append(all, e)
|
||||
}
|
||||
|
||||
// Upsert the evals
|
||||
f.Set(b.raft.UpsertEvals(all))
|
||||
// Upsert the evals in a go routine
|
||||
go f.Set(b.raft.UpsertEvals(all))
|
||||
|
||||
// Reset the evals list
|
||||
// Reset the evals list and timer
|
||||
evals = make(map[string]*structs.Evaluation)
|
||||
timerCh = nil
|
||||
|
||||
// Reset the timer
|
||||
timer.Reset(b.batch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user