Add watcher to server

This commit is contained in:
Alex Dadgar
2017-06-28 15:35:52 -07:00
parent aeeb0a656d
commit c86af4d9e9
5 changed files with 189 additions and 39 deletions

View File

@@ -0,0 +1,84 @@
package nomad
import "github.com/hashicorp/nomad/nomad/structs"
// deploymentWatcherStateShim is the shim that provides the state watching
// methods. These should be set by the server and passed to the deployment
// watcher.
type deploymentWatcherStateShim struct {
// evaluations returns the set of evaluations for the given job
evaluations func(args *structs.JobSpecificRequest, reply *structs.JobEvaluationsResponse) error
// allocations returns the set of allocations that are part of the
// deployment.
allocations func(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error
// list is used to list all the deployments in the system
list func(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error
// getJobVersions is used to lookup the versions of a job. This is used when
// rolling back to find the latest stable job
getJobVersions func(args *structs.JobSpecificRequest, reply *structs.JobVersionsResponse) error
// getJob is used to lookup a particular job.
getJob func(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error
}
func (d *deploymentWatcherStateShim) Evaluations(args *structs.JobSpecificRequest, reply *structs.JobEvaluationsResponse) error {
return d.evaluations(args, reply)
}
func (d *deploymentWatcherStateShim) Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error {
return d.allocations(args, reply)
}
func (d *deploymentWatcherStateShim) List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error {
return d.list(args, reply)
}
func (d *deploymentWatcherStateShim) GetJobVersions(args *structs.JobSpecificRequest, reply *structs.JobVersionsResponse) error {
return d.getJobVersions(args, reply)
}
func (d *deploymentWatcherStateShim) GetJob(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error {
return d.getJob(args, reply)
}
// deploymentWatcherRaftShim is the shim that provides the state watching
// methods. These should be set by the server and passed to the deployment
// watcher.
type deploymentWatcherRaftShim struct {
// apply is used to apply a message to Raft
apply raftApplyFn
}
func (d *deploymentWatcherRaftShim) UpsertEvals(evals []*structs.Evaluation) (uint64, error) {
update := &structs.EvalUpdateRequest{
Evals: evals,
}
_, index, err := d.apply(structs.EvalUpdateRequestType, update)
return index, err
}
func (d *deploymentWatcherRaftShim) UpsertJob(job *structs.Job) (uint64, error) {
update := &structs.JobRegisterRequest{
Job: job,
}
_, index, err := d.apply(structs.JobRegisterRequestType, update)
return index, err
}
func (d *deploymentWatcherRaftShim) UpsertDeploymentStatusUpdate(u *structs.DeploymentStatusUpdateRequest) (uint64, error) {
_, index, err := d.apply(structs.DeploymentStatusUpdateRequestType, u)
return index, err
}
func (d *deploymentWatcherRaftShim) UpsertDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) {
_, index, err := d.apply(structs.DeploymentPromoteRequestType, req)
return index, err
}
func (d *deploymentWatcherRaftShim) UpsertDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) {
_, index, err := d.apply(structs.DeploymentAllocHealthRequestType, req)
return index, err
}

View File

@@ -101,39 +101,52 @@ type Watcher struct {
// NewDeploymentsWatcher returns a deployments watcher that is used to watch
// deployments and trigger the scheduler as needed.
func NewDeploymentsWatcher(
logger *log.Logger,
w DeploymentStateWatchers,
raft DeploymentRaftEndpoints,
stateQueriesPerSecond float64,
func NewDeploymentsWatcher(logger *log.Logger, stateQueriesPerSecond float64,
evalBatchDuration time.Duration) *Watcher {
ctx, exitFn := context.WithCancel(context.Background())
return &Watcher{
queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100),
evalBatchDuration: evalBatchDuration,
stateWatchers: w,
raft: raft,
watchers: make(map[string]*deploymentWatcher, 32),
evalBatcher: NewEvalBatcher(evalBatchDuration, raft, ctx),
logger: logger,
ctx: ctx,
exitFn: exitFn,
}
}
// SetStateWatchers sets the interface for accessing state watchers
func (w *Watcher) SetStateWatchers(watchers DeploymentStateWatchers) {
w.l.Lock()
defer w.l.Unlock()
w.stateWatchers = watchers
}
// SetRaftEndpoints sets the interface for writing to Raft
func (w *Watcher) SetRaftEndpoints(raft DeploymentRaftEndpoints) {
w.l.Lock()
defer w.l.Unlock()
w.raft = raft
}
// SetEnabled is used to control if the watcher is enabled. The watcher
// should only be enabled on the active leader.
func (w *Watcher) SetEnabled(enabled bool) {
func (w *Watcher) SetEnabled(enabled bool) error {
w.l.Lock()
// Ensure our state is correct
if w.stateWatchers == nil || w.raft == nil {
return fmt.Errorf("State watchers and Raft endpoints must be set before starting")
}
wasEnabled := w.enabled
w.enabled = enabled
w.l.Unlock()
if !enabled {
w.Flush()
} else if !wasEnabled {
// Start the watcher if we are transistioning to an enabled state
// Flush the state to create the necessary objects
w.Flush()
// If we are starting now, launch the watch daemon
if enabled && !wasEnabled {
go w.watchDeployments()
}
return nil
}
// Flush is used to clear the state of the watcher
@@ -147,7 +160,9 @@ func (w *Watcher) Flush() {
}
// Kill everything associated with the watcher
w.exitFn()
if w.exitFn != nil {
w.exitFn()
}
w.watchers = make(map[string]*deploymentWatcher, 32)
w.ctx, w.exitFn = context.WithCancel(context.Background())

View File

@@ -131,6 +131,11 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Enable the blocked eval tracker, since we are now the leader
s.blockedEvals.SetEnabled(true)
// Enable the deployment watcher, since we are now the leader
if err := s.deploymentWatcher.SetEnabled(true); err != nil {
return err
}
// Restore the eval broker state
if err := s.restoreEvals(); err != nil {
return err
@@ -483,6 +488,11 @@ func (s *Server) revokeLeadership() error {
// Disable the Vault client as it is only useful as a leader.
s.vault.SetActive(false)
// Disable the deployment watcher as it is only useful as a leader.
if err := s.deploymentWatcher.SetEnabled(false); err != nil {
return err
}
// Clear the heartbeat timers on either shutdown or step down,
// since we are no longer responsible for TTL expirations.
if err := s.clearAllHeartbeatTimers(); err != nil {

View File

@@ -297,6 +297,9 @@ func (s *Server) raftApplyFuture(t structs.MessageType, msg interface{}) (raft.A
return future, nil
}
// raftApplyFn is the function signature for applying a msg to Raft
type raftApplyFn func(t structs.MessageType, msg interface{}) (interface{}, uint64, error)
// raftApply is used to encode a message, run it through raft, and return
// the FSM response along with any errors
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, uint64, error) {

View File

@@ -22,6 +22,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/nomad/deploymentwatcher"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
@@ -124,21 +125,25 @@ type Server struct {
// eventCh is used to receive events from the serf cluster
eventCh chan serf.Event
// evalBroker is used to manage the in-progress evaluations
// that are waiting to be brokered to a sub-scheduler
evalBroker *EvalBroker
// BlockedEvals is used to manage evaluations that are blocked on node
// capacity changes.
blockedEvals *BlockedEvals
// planQueue is used to manage the submitted allocation
// plans that are waiting to be assessed by the leader
planQueue *PlanQueue
// deploymentWatcher is used to watch deployments and their allocations and
// make the required calls to continue to transistion the deployment.
deploymentWatcher *deploymentwatcher.Watcher
// evalBroker is used to manage the in-progress evaluations
// that are waiting to be brokered to a sub-scheduler
evalBroker *EvalBroker
// periodicDispatcher is used to track and create evaluations for periodic jobs.
periodicDispatcher *PeriodicDispatch
// planQueue is used to manage the submitted allocation
// plans that are waiting to be assessed by the leader
planQueue *PlanQueue
// heartbeatTimers track the expiration time of each heartbeat that has
// a TTL. On expiration, the node status is updated to be 'down'.
heartbeatTimers map[string]*time.Timer
@@ -219,22 +224,28 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
incomingTLS = itls
}
// Create the deployment watcher
watcher := deploymentwatcher.NewDeploymentsWatcher(logger,
deploymentwatcher.LimitStateQueriesPerSecond,
deploymentwatcher.EvalBatchDuration)
// Create the server
s := &Server{
config: config,
consulCatalog: consulCatalog,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
rpcServer: rpc.NewServer(),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: blockedEvals,
planQueue: planQueue,
rpcTLS: incomingTLS,
shutdownCh: make(chan struct{}),
config: config,
consulCatalog: consulCatalog,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
rpcServer: rpc.NewServer(),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: blockedEvals,
deploymentWatcher: watcher,
planQueue: planQueue,
rpcTLS: incomingTLS,
shutdownCh: make(chan struct{}),
}
// Create the periodic dispatcher for launching periodic jobs.
@@ -281,6 +292,11 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
return nil, fmt.Errorf("failed to create server Consul syncer: %v", err)
}
// Setup the deployment watcher.
if err := s.setupDeploymentWatcher(); err != nil {
return nil, fmt.Errorf("failed to create deployment watcher: %v", err)
}
// Monitor leadership changes
go s.monitorLeadership()
@@ -663,6 +679,28 @@ func (s *Server) setupConsulSyncer() error {
return nil
}
// setupDeploymentWatcher creates a deployment watcher that consumes the RPC
// endpoints for state information and makes transistions via Raft through a
// shim that provides the appropriate methods.
func (s *Server) setupDeploymentWatcher() error {
// Create the shims
stateShim := &deploymentWatcherStateShim{
evaluations: s.endpoints.Job.Evaluations,
allocations: s.endpoints.Deployment.Allocations,
list: s.endpoints.Deployment.List,
getJobVersions: s.endpoints.Job.GetJobVersions,
getJob: s.endpoints.Job.GetJob,
}
raftShim := &deploymentWatcherRaftShim{
apply: s.raftApply,
}
s.deploymentWatcher.SetStateWatchers(stateShim)
s.deploymentWatcher.SetRaftEndpoints(raftShim)
return nil
}
// setupVaultClient is used to set up the Vault API client.
func (s *Server) setupVaultClient() error {
v, err := NewVaultClient(s.config.VaultConfig, s.logger, s.purgeVaultAccessors)