diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index 2c578f595..0d76d6ca0 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -67,6 +67,9 @@ type deploymentWatcher struct { // the evaluation. Access should be done through the lock outstandingBatch bool + // latestEval is the latest eval for the job + latestEval uint64 + logger *log.Logger ctx context.Context exitFn context.CancelFunc @@ -175,6 +178,7 @@ func (w *deploymentWatcher) SetAllocHealth( resp.EvalID = areq.Eval.ID resp.EvalCreateIndex = index resp.DeploymentModifyIndex = index + w.setLatestEval(index) return nil } @@ -197,6 +201,7 @@ func (w *deploymentWatcher) PromoteDeployment( resp.EvalID = areq.Eval.ID resp.EvalCreateIndex = index resp.DeploymentModifyIndex = index + w.setLatestEval(index) return nil } @@ -225,6 +230,7 @@ func (w *deploymentWatcher) PauseDeployment( resp.EvalID = evalID resp.EvalCreateIndex = i resp.DeploymentModifyIndex = i + w.setLatestEval(i) return nil } @@ -236,12 +242,11 @@ func (w *deploymentWatcher) StopWatch() { // watch is the long running watcher that takes actions upon allocation changes func (w *deploymentWatcher) watch() { - latestEval := uint64(0) for { // Block getting all allocations that are part of the deployment using // the last evaluation index. This will have us block waiting for // something to change past what the scheduler has evaluated. - allocResp, err := w.getAllocs(latestEval) + allocResp, err := w.getAllocs(w.getLatestEval()) if err != nil { if err == context.Canceled { return @@ -251,7 +256,7 @@ func (w *deploymentWatcher) watch() { } // Get the latest evaluation snapshot index - latestEval, err = w.latestEvalIndex() + latestEval, err := w.latestEvalIndex() if err != nil { if err == context.Canceled { return @@ -314,7 +319,7 @@ func (w *deploymentWatcher) watch() { if index, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil { w.logger.Printf("[ERR] nomad.deployment_watcher: failed to update deployment %q status: %v", w.d.ID, err) } else { - latestEval = index + w.setLatestEval(index) } } else if createEval { // Create an eval to push the deployment along @@ -346,6 +351,9 @@ func (w *deploymentWatcher) latestStableJob() (*structs.Job, error) { func (w *deploymentWatcher) createEval() (evalID string, evalCreateIndex uint64, err error) { e := w.getEval() evalCreateIndex, err = w.createEvaluation(e) + if err != nil { + w.setLatestEval(evalCreateIndex) + } return e.ID, evalCreateIndex, err } @@ -354,24 +362,24 @@ func (w *deploymentWatcher) createEvalBatched(forIndex uint64) { w.l.Lock() defer w.l.Unlock() - if w.outstandingBatch { + if w.outstandingBatch || forIndex < w.latestEval { return } - w.logger.Printf("[TRACE] nomad.deployment_watcher: creating eval for index %d %q", forIndex, w.d.ID) w.outstandingBatch = true go func() { // Sleep til the batching period is over time.Sleep(evalBatchPeriod) + if _, _, err := w.createEval(); err != nil { + w.logger.Printf("[ERR] nomad.deployment_watcher: failed to create evaluation for deployment %q: %v", w.d.ID, err) + } + w.l.Lock() w.outstandingBatch = false w.l.Unlock() - if _, _, err := w.createEval(); err != nil { - w.logger.Printf("[ERR] nomad.deployment_watcher: failed to create evaluation for deployment %q: %v", w.d.ID, err) - } }() } @@ -440,14 +448,31 @@ func (w *deploymentWatcher) latestEvalIndex() (uint64, error) { } if len(resp.Evaluations) == 0 { + w.setLatestEval(resp.Index) return resp.Index, nil } // Prefer using the snapshot index. Otherwise use the create index e := resp.Evaluations[0] if e.SnapshotIndex != 0 { + w.setLatestEval(e.SnapshotIndex) return e.SnapshotIndex, nil } + w.setLatestEval(e.CreateIndex) return e.CreateIndex, nil } + +func (w *deploymentWatcher) setLatestEval(index uint64) { + w.l.Lock() + defer w.l.Unlock() + if index > w.latestEval { + w.latestEval = index + } +} + +func (w *deploymentWatcher) getLatestEval() uint64 { + w.l.Lock() + defer w.l.Unlock() + return w.latestEval +} diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index a6c952856..e9571b4c3 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -227,7 +227,6 @@ func (w *Watcher) add(d *structs.Deployment) error { } w.watchers[d.ID] = newDeploymentWatcher(w.ctx, w.queryLimiter, w.logger, w.stateWatchers, d, resp.Job, w) - w.logger.Printf("[TRACE] nomad.deployments_watcher: tracking deployment %q", d.ID) return nil } @@ -245,7 +244,6 @@ func (w *Watcher) remove(d *structs.Deployment) { if watcher, ok := w.watchers[d.ID]; ok { watcher.StopWatch() delete(w.watchers, d.ID) - w.logger.Printf("[TRACE] nomad.deployments_watcher: untracking deployment %q", d.ID) } }