nomad: expose serf methods, add leave support

This commit is contained in:
Armon Dadgar
2015-06-03 13:25:50 +02:00
parent 18c031c041
commit 216f2907c2

View File

@@ -28,6 +28,10 @@ const (
// raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently commited entries.
raftLogCacheSize = 512
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second
)
// Server is Nomad server which manages the job queues,
@@ -39,8 +43,8 @@ type Server struct {
// Endpoints holds our RPC endpoints
endpoints endpoints
// The raft instance is used among Consul nodes within the
// DC to protect operations that require strong consistency
// The raft instance is used among Nomad nodes within the
// region to protect operations that require strong consistency
raft *raft.Raft
raftLayer *RaftLayer
raftPeers raft.PeerStore
@@ -67,6 +71,7 @@ type Server struct {
// eventCh is used to receive events from the serf cluster
eventCh chan serf.Event
left bool
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
@@ -168,6 +173,65 @@ func (s *Server) Shutdown() error {
return nil
}
// Leave is used to prepare for a graceful shutdown of the server
func (s *Server) Leave() error {
s.logger.Printf("[INFO] nomad: server starting leave")
s.left = true
// Check the number of known peers
numPeers, err := s.numOtherPeers()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to check raft peers: %v", err)
return err
}
// If we are the current leader, and we have any other peers (cluster has multiple
// servers), we should do a RemovePeer to safely reduce the quorum size. If we are
// not the leader, then we should issue our leave intention and wait to be removed
// for some sane period of time.
isLeader := s.IsLeader()
if isLeader && numPeers > 0 {
future := s.raft.RemovePeer(s.raftTransport.LocalAddr())
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
s.logger.Printf("[ERR] nomad: failed to remove ourself as raft peer: %v", err)
}
}
// Leave the gossip pool
if s.serf != nil {
if err := s.serf.Leave(); err != nil {
s.logger.Printf("[ERR] nomad: failed to leave Serf cluster: %v", err)
}
}
// If we were not leader, wait to be safely removed from the cluster.
// We must wait to allow the raft replication to take place, otherwise
// an immediate shutdown could cause a loss of quorum.
if !isLeader {
limit := time.Now().Add(raftRemoveGracePeriod)
for numPeers > 0 && time.Now().Before(limit) {
// Update the number of peers
numPeers, err = s.numOtherPeers()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to check raft peers: %v", err)
break
}
// Avoid the sleep if we are done
if numPeers == 0 {
break
}
// Sleep a while and check again
time.Sleep(50 * time.Millisecond)
}
if numPeers != 0 {
s.logger.Printf("[WARN] nomad: failed to leave raft peer set gracefully, timeout")
}
}
return nil
}
// setupRPC is used to setup the RPC listener
func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
// Create endpoints
@@ -346,11 +410,54 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
return serf.Create(conf)
}
// numOtherPeers is used to check on the number of known peers
// excluding the local ndoe
func (s *Server) numOtherPeers() (int, error) {
peers, err := s.raftPeers.Peers()
if err != nil {
return 0, err
}
otherPeers := raft.ExcludePeer(peers, s.raftTransport.LocalAddr())
return len(otherPeers), nil
}
// IsLeader checks if this server is the cluster leader
func (s *Server) IsLeader() bool {
return s.raft.State() == raft.Leader
}
// Join is used to have Nomad join the gossip ring
// The target address should be another node listening on the
// Serf address
func (s *Server) Join(addrs []string) (int, error) {
return s.serf.Join(addrs, true)
}
// LocalMember is used to return the local node
func (c *Server) LocalMember() serf.Member {
return c.serf.LocalMember()
}
// Members is used to return the members of the serf cluster
func (s *Server) Members() []serf.Member {
return s.serf.Members()
}
// RemoveFailedNode is used to remove a failed node from the cluster
func (s *Server) RemoveFailedNode(node string) error {
return s.serf.RemoveFailedNode(node)
}
// KeyManager returns the Serf keyring manager
func (s *Server) KeyManager() *serf.KeyManager {
return s.serf.KeyManager()
}
// Encrypted determines if gossip is encrypted
func (s *Server) Encrypted() bool {
return s.serf.EncryptionEnabled()
}
// inmemCodec is used to do an RPC call without going over a network
type inmemCodec struct {
method string