From 216f2907c2d88de1e3a61bd64ac1bfbb7d85d829 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 3 Jun 2015 13:25:50 +0200 Subject: [PATCH] nomad: expose serf methods, add leave support --- nomad/server.go | 111 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 2 deletions(-) diff --git a/nomad/server.go b/nomad/server.go index 6c24f1ea4..84ea97147 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -28,6 +28,10 @@ const ( // raftLogCacheSize is the maximum number of logs to cache in-memory. // This is used to reduce disk I/O for the recently commited entries. raftLogCacheSize = 512 + + // raftRemoveGracePeriod is how long we wait to allow a RemovePeer + // to replicate to gracefully leave the cluster. + raftRemoveGracePeriod = 5 * time.Second ) // Server is Nomad server which manages the job queues, @@ -39,8 +43,8 @@ type Server struct { // Endpoints holds our RPC endpoints endpoints endpoints - // The raft instance is used among Consul nodes within the - // DC to protect operations that require strong consistency + // The raft instance is used among Nomad nodes within the + // region to protect operations that require strong consistency raft *raft.Raft raftLayer *RaftLayer raftPeers raft.PeerStore @@ -67,6 +71,7 @@ type Server struct { // eventCh is used to receive events from the serf cluster eventCh chan serf.Event + left bool shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -168,6 +173,65 @@ func (s *Server) Shutdown() error { return nil } +// Leave is used to prepare for a graceful shutdown of the server +func (s *Server) Leave() error { + s.logger.Printf("[INFO] nomad: server starting leave") + s.left = true + + // Check the number of known peers + numPeers, err := s.numOtherPeers() + if err != nil { + s.logger.Printf("[ERR] nomad: failed to check raft peers: %v", err) + return err + } + + // If we are the current leader, and we have any other peers (cluster has multiple + // servers), we should do a RemovePeer to safely reduce the quorum size. If we are + // not the leader, then we should issue our leave intention and wait to be removed + // for some sane period of time. + isLeader := s.IsLeader() + if isLeader && numPeers > 0 { + future := s.raft.RemovePeer(s.raftTransport.LocalAddr()) + if err := future.Error(); err != nil && err != raft.ErrUnknownPeer { + s.logger.Printf("[ERR] nomad: failed to remove ourself as raft peer: %v", err) + } + } + + // Leave the gossip pool + if s.serf != nil { + if err := s.serf.Leave(); err != nil { + s.logger.Printf("[ERR] nomad: failed to leave Serf cluster: %v", err) + } + } + + // If we were not leader, wait to be safely removed from the cluster. + // We must wait to allow the raft replication to take place, otherwise + // an immediate shutdown could cause a loss of quorum. + if !isLeader { + limit := time.Now().Add(raftRemoveGracePeriod) + for numPeers > 0 && time.Now().Before(limit) { + // Update the number of peers + numPeers, err = s.numOtherPeers() + if err != nil { + s.logger.Printf("[ERR] nomad: failed to check raft peers: %v", err) + break + } + + // Avoid the sleep if we are done + if numPeers == 0 { + break + } + + // Sleep a while and check again + time.Sleep(50 * time.Millisecond) + } + if numPeers != 0 { + s.logger.Printf("[WARN] nomad: failed to leave raft peer set gracefully, timeout") + } + } + return nil +} + // setupRPC is used to setup the RPC listener func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { // Create endpoints @@ -346,11 +410,54 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( return serf.Create(conf) } +// numOtherPeers is used to check on the number of known peers +// excluding the local ndoe +func (s *Server) numOtherPeers() (int, error) { + peers, err := s.raftPeers.Peers() + if err != nil { + return 0, err + } + otherPeers := raft.ExcludePeer(peers, s.raftTransport.LocalAddr()) + return len(otherPeers), nil +} + // IsLeader checks if this server is the cluster leader func (s *Server) IsLeader() bool { return s.raft.State() == raft.Leader } +// Join is used to have Nomad join the gossip ring +// The target address should be another node listening on the +// Serf address +func (s *Server) Join(addrs []string) (int, error) { + return s.serf.Join(addrs, true) +} + +// LocalMember is used to return the local node +func (c *Server) LocalMember() serf.Member { + return c.serf.LocalMember() +} + +// Members is used to return the members of the serf cluster +func (s *Server) Members() []serf.Member { + return s.serf.Members() +} + +// RemoveFailedNode is used to remove a failed node from the cluster +func (s *Server) RemoveFailedNode(node string) error { + return s.serf.RemoveFailedNode(node) +} + +// KeyManager returns the Serf keyring manager +func (s *Server) KeyManager() *serf.KeyManager { + return s.serf.KeyManager() +} + +// Encrypted determines if gossip is encrypted +func (s *Server) Encrypted() bool { + return s.serf.EncryptionEnabled() +} + // inmemCodec is used to do an RPC call without going over a network type inmemCodec struct { method string