diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index 15e3fd1f9..bd911d9f6 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -39,9 +39,15 @@ type DeploymentStateWatchers interface { // deployment. Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error + // List is used to list all the deployments in the system + List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error + // GetJobVersions is used to lookup the versions of a job. This is used when // rolling back to find the latest stable job GetJobVersions(args *structs.JobSpecificRequest, reply *structs.JobVersionsResponse) error + + // GetJob is used to lookup a particular job. + GetJob(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error } // Watcher is used to watch deployments and their allocations created @@ -89,10 +95,14 @@ func NewDeploymentsWatcher(logger *log.Logger, w DeploymentStateWatchers, raft D // should only be enabled on the active leader. func (w *Watcher) SetEnabled(enabled bool) { w.l.Lock() + wasEnabled := w.enabled w.enabled = enabled w.l.Unlock() if !enabled { w.Flush() + } else if !wasEnabled { + // Start the watcher if we are transistioning to an enabled state + go w.watchDeployments() } } @@ -113,27 +123,100 @@ func (w *Watcher) Flush() { w.evalBatcher = NewEvalBatcher(w.raft, w.exitCh) } -// Watch adds a deployment to the watch list -func (w *Watcher) Watch(d *structs.Deployment, j *structs.Job) { +// watchDeployments is the long lived go-routine that watches for deployments to +// add and remove watchers on. +func (w *Watcher) watchDeployments() { + dindex := uint64(0) + for { + // Block getting all deployments using the last deployment index. + var resp *structs.DeploymentListResponse + select { + case <-w.exitCh: + return + case resp = <-w.getDeploys(dindex): + } + + // Guard against npe + if resp == nil { + continue + } + + // Ensure we are tracking the things we should and not tracking what we + // shouldn't be + for _, d := range resp.Deployments { + if d.Active() { + if err := w.add(d); err != nil { + w.logger.Printf("[ERR] nomad.deployments_watcher: failed to track deployment %q: %v", d.ID, err) + } + } else { + w.remove(d) + } + } + + // Update the latest index + dindex = resp.Index + } +} + +// getDeploys retrieves all deployments blocking at the given index. +func (w *Watcher) getDeploys(index uint64) <-chan *structs.DeploymentListResponse { + c := make(chan *structs.DeploymentListResponse, 1) + go func() { + // Build the request + args := &structs.DeploymentListRequest{ + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + }, + } + var resp structs.DeploymentListResponse + + for resp.Index <= index { + if err := w.stateWatchers.List(args, &resp); err != nil { + w.logger.Printf("[ERR] nomad.deployments_watcher: failed to retrieve deployments: %v", err) + close(c) + return + } + } + + c <- &resp + }() + return c +} + +// add adds a deployment to the watch list +func (w *Watcher) add(d *structs.Deployment) error { w.l.Lock() defer w.l.Unlock() // Not enabled so no-op if !w.enabled { - return + return nil } // Already watched so no-op if _, ok := w.watchers[d.ID]; ok { - return + return nil } - w.watchers[d.ID] = newDeploymentWatcher(w.logger, w.stateWatchers, d, j, w) + // Get the job the deployment is referencing + args := &structs.JobSpecificRequest{ + JobID: d.JobID, + } + var resp structs.SingleJobResponse + if err := w.stateWatchers.GetJob(args, &resp); err != nil { + return err + } + if resp.Job == nil { + return fmt.Errorf("deployment %q references unknown job %q", d.ID, d.JobID) + } + + w.watchers[d.ID] = newDeploymentWatcher(w.logger, w.stateWatchers, d, resp.Job, w) + return nil } -// Unwatch stops watching a deployment. This can be because the deployment is +// remove stops watching a deployment. This can be because the deployment is // complete or being deleted. -func (w *Watcher) Unwatch(d *structs.Deployment) { +func (w *Watcher) remove(d *structs.Deployment) { w.l.Lock() defer w.l.Unlock() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 09e19b2d7..7b26b838e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3843,10 +3843,10 @@ func (v *Vault) Validate() error { const ( // DeploymentStatuses are the various states a deployment can be be in DeploymentStatusRunning = "running" + DeploymentStatusPaused = "paused" DeploymentStatusFailed = "failed" DeploymentStatusSuccessful = "successful" DeploymentStatusCancelled = "cancelled" - DeploymentStatusPaused = "paused" // DeploymentStatusDescriptions are the various descriptions of the states a // deployment can be in.