diff --git a/nomad/deployment_watcher_shims.go b/nomad/deployment_watcher_shims.go new file mode 100644 index 000000000..ad2dbdcda --- /dev/null +++ b/nomad/deployment_watcher_shims.go @@ -0,0 +1,84 @@ +package nomad + +import "github.com/hashicorp/nomad/nomad/structs" + +// deploymentWatcherStateShim is the shim that provides the state watching +// methods. These should be set by the server and passed to the deployment +// watcher. +type deploymentWatcherStateShim struct { + // evaluations returns the set of evaluations for the given job + evaluations func(args *structs.JobSpecificRequest, reply *structs.JobEvaluationsResponse) error + + // allocations returns the set of allocations that are part of the + // deployment. + allocations func(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error + + // list is used to list all the deployments in the system + list func(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error + + // getJobVersions is used to lookup the versions of a job. This is used when + // rolling back to find the latest stable job + getJobVersions func(args *structs.JobSpecificRequest, reply *structs.JobVersionsResponse) error + + // getJob is used to lookup a particular job. + getJob func(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error +} + +func (d *deploymentWatcherStateShim) Evaluations(args *structs.JobSpecificRequest, reply *structs.JobEvaluationsResponse) error { + return d.evaluations(args, reply) +} + +func (d *deploymentWatcherStateShim) Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error { + return d.allocations(args, reply) +} + +func (d *deploymentWatcherStateShim) List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error { + return d.list(args, reply) +} + +func (d *deploymentWatcherStateShim) GetJobVersions(args *structs.JobSpecificRequest, reply *structs.JobVersionsResponse) error { + return d.getJobVersions(args, reply) +} + +func (d *deploymentWatcherStateShim) GetJob(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error { + return d.getJob(args, reply) +} + +// deploymentWatcherRaftShim is the shim that provides the state watching +// methods. These should be set by the server and passed to the deployment +// watcher. +type deploymentWatcherRaftShim struct { + // apply is used to apply a message to Raft + apply raftApplyFn +} + +func (d *deploymentWatcherRaftShim) UpsertEvals(evals []*structs.Evaluation) (uint64, error) { + update := &structs.EvalUpdateRequest{ + Evals: evals, + } + _, index, err := d.apply(structs.EvalUpdateRequestType, update) + return index, err +} + +func (d *deploymentWatcherRaftShim) UpsertJob(job *structs.Job) (uint64, error) { + update := &structs.JobRegisterRequest{ + Job: job, + } + _, index, err := d.apply(structs.JobRegisterRequestType, update) + return index, err +} + +func (d *deploymentWatcherRaftShim) UpsertDeploymentStatusUpdate(u *structs.DeploymentStatusUpdateRequest) (uint64, error) { + _, index, err := d.apply(structs.DeploymentStatusUpdateRequestType, u) + return index, err +} + +func (d *deploymentWatcherRaftShim) UpsertDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) { + _, index, err := d.apply(structs.DeploymentPromoteRequestType, req) + return index, err +} + +func (d *deploymentWatcherRaftShim) UpsertDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) { + _, index, err := d.apply(structs.DeploymentAllocHealthRequestType, req) + return index, err +} diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index dce4e4734..82425e387 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -101,39 +101,52 @@ type Watcher struct { // NewDeploymentsWatcher returns a deployments watcher that is used to watch // deployments and trigger the scheduler as needed. -func NewDeploymentsWatcher( - logger *log.Logger, - w DeploymentStateWatchers, - raft DeploymentRaftEndpoints, - stateQueriesPerSecond float64, +func NewDeploymentsWatcher(logger *log.Logger, stateQueriesPerSecond float64, evalBatchDuration time.Duration) *Watcher { - ctx, exitFn := context.WithCancel(context.Background()) + return &Watcher{ queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100), evalBatchDuration: evalBatchDuration, - stateWatchers: w, - raft: raft, - watchers: make(map[string]*deploymentWatcher, 32), - evalBatcher: NewEvalBatcher(evalBatchDuration, raft, ctx), logger: logger, - ctx: ctx, - exitFn: exitFn, } } +// SetStateWatchers sets the interface for accessing state watchers +func (w *Watcher) SetStateWatchers(watchers DeploymentStateWatchers) { + w.l.Lock() + defer w.l.Unlock() + w.stateWatchers = watchers +} + +// SetRaftEndpoints sets the interface for writing to Raft +func (w *Watcher) SetRaftEndpoints(raft DeploymentRaftEndpoints) { + w.l.Lock() + defer w.l.Unlock() + w.raft = raft +} + // SetEnabled is used to control if the watcher is enabled. The watcher // should only be enabled on the active leader. -func (w *Watcher) SetEnabled(enabled bool) { +func (w *Watcher) SetEnabled(enabled bool) error { w.l.Lock() + // Ensure our state is correct + if w.stateWatchers == nil || w.raft == nil { + return fmt.Errorf("State watchers and Raft endpoints must be set before starting") + } + wasEnabled := w.enabled w.enabled = enabled w.l.Unlock() - if !enabled { - w.Flush() - } else if !wasEnabled { - // Start the watcher if we are transistioning to an enabled state + + // Flush the state to create the necessary objects + w.Flush() + + // If we are starting now, launch the watch daemon + if enabled && !wasEnabled { go w.watchDeployments() } + + return nil } // Flush is used to clear the state of the watcher @@ -147,7 +160,9 @@ func (w *Watcher) Flush() { } // Kill everything associated with the watcher - w.exitFn() + if w.exitFn != nil { + w.exitFn() + } w.watchers = make(map[string]*deploymentWatcher, 32) w.ctx, w.exitFn = context.WithCancel(context.Background()) diff --git a/nomad/leader.go b/nomad/leader.go index 275f84de9..7b80dded3 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -131,6 +131,11 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Enable the blocked eval tracker, since we are now the leader s.blockedEvals.SetEnabled(true) + // Enable the deployment watcher, since we are now the leader + if err := s.deploymentWatcher.SetEnabled(true); err != nil { + return err + } + // Restore the eval broker state if err := s.restoreEvals(); err != nil { return err @@ -483,6 +488,11 @@ func (s *Server) revokeLeadership() error { // Disable the Vault client as it is only useful as a leader. s.vault.SetActive(false) + // Disable the deployment watcher as it is only useful as a leader. + if err := s.deploymentWatcher.SetEnabled(false); err != nil { + return err + } + // Clear the heartbeat timers on either shutdown or step down, // since we are no longer responsible for TTL expirations. if err := s.clearAllHeartbeatTimers(); err != nil { diff --git a/nomad/rpc.go b/nomad/rpc.go index 9ba2156d1..b0eeff515 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -297,6 +297,9 @@ func (s *Server) raftApplyFuture(t structs.MessageType, msg interface{}) (raft.A return future, nil } +// raftApplyFn is the function signature for applying a msg to Raft +type raftApplyFn func(t structs.MessageType, msg interface{}) (interface{}, uint64, error) + // raftApply is used to encode a message, run it through raft, and return // the FSM response along with any errors func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, uint64, error) { diff --git a/nomad/server.go b/nomad/server.go index 70844f07f..ed07a94d5 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -22,6 +22,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/tlsutil" + "github.com/hashicorp/nomad/nomad/deploymentwatcher" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" @@ -124,21 +125,25 @@ type Server struct { // eventCh is used to receive events from the serf cluster eventCh chan serf.Event - // evalBroker is used to manage the in-progress evaluations - // that are waiting to be brokered to a sub-scheduler - evalBroker *EvalBroker - // BlockedEvals is used to manage evaluations that are blocked on node // capacity changes. blockedEvals *BlockedEvals - // planQueue is used to manage the submitted allocation - // plans that are waiting to be assessed by the leader - planQueue *PlanQueue + // deploymentWatcher is used to watch deployments and their allocations and + // make the required calls to continue to transistion the deployment. + deploymentWatcher *deploymentwatcher.Watcher + + // evalBroker is used to manage the in-progress evaluations + // that are waiting to be brokered to a sub-scheduler + evalBroker *EvalBroker // periodicDispatcher is used to track and create evaluations for periodic jobs. periodicDispatcher *PeriodicDispatch + // planQueue is used to manage the submitted allocation + // plans that are waiting to be assessed by the leader + planQueue *PlanQueue + // heartbeatTimers track the expiration time of each heartbeat that has // a TTL. On expiration, the node status is updated to be 'down'. heartbeatTimers map[string]*time.Timer @@ -219,22 +224,28 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg incomingTLS = itls } + // Create the deployment watcher + watcher := deploymentwatcher.NewDeploymentsWatcher(logger, + deploymentwatcher.LimitStateQueriesPerSecond, + deploymentwatcher.EvalBatchDuration) + // Create the server s := &Server{ - config: config, - consulCatalog: consulCatalog, - connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), - logger: logger, - rpcServer: rpc.NewServer(), - peers: make(map[string][]*serverParts), - localPeers: make(map[raft.ServerAddress]*serverParts), - reconcileCh: make(chan serf.Member, 32), - eventCh: make(chan serf.Event, 256), - evalBroker: evalBroker, - blockedEvals: blockedEvals, - planQueue: planQueue, - rpcTLS: incomingTLS, - shutdownCh: make(chan struct{}), + config: config, + consulCatalog: consulCatalog, + connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), + logger: logger, + rpcServer: rpc.NewServer(), + peers: make(map[string][]*serverParts), + localPeers: make(map[raft.ServerAddress]*serverParts), + reconcileCh: make(chan serf.Member, 32), + eventCh: make(chan serf.Event, 256), + evalBroker: evalBroker, + blockedEvals: blockedEvals, + deploymentWatcher: watcher, + planQueue: planQueue, + rpcTLS: incomingTLS, + shutdownCh: make(chan struct{}), } // Create the periodic dispatcher for launching periodic jobs. @@ -281,6 +292,11 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg return nil, fmt.Errorf("failed to create server Consul syncer: %v", err) } + // Setup the deployment watcher. + if err := s.setupDeploymentWatcher(); err != nil { + return nil, fmt.Errorf("failed to create deployment watcher: %v", err) + } + // Monitor leadership changes go s.monitorLeadership() @@ -663,6 +679,28 @@ func (s *Server) setupConsulSyncer() error { return nil } +// setupDeploymentWatcher creates a deployment watcher that consumes the RPC +// endpoints for state information and makes transistions via Raft through a +// shim that provides the appropriate methods. +func (s *Server) setupDeploymentWatcher() error { + + // Create the shims + stateShim := &deploymentWatcherStateShim{ + evaluations: s.endpoints.Job.Evaluations, + allocations: s.endpoints.Deployment.Allocations, + list: s.endpoints.Deployment.List, + getJobVersions: s.endpoints.Job.GetJobVersions, + getJob: s.endpoints.Job.GetJob, + } + raftShim := &deploymentWatcherRaftShim{ + apply: s.raftApply, + } + + s.deploymentWatcher.SetStateWatchers(stateShim) + s.deploymentWatcher.SetRaftEndpoints(raftShim) + return nil +} + // setupVaultClient is used to set up the Vault API client. func (s *Server) setupVaultClient() error { v, err := NewVaultClient(s.config.VaultConfig, s.logger, s.purgeVaultAccessors)