From 20c67f218cc7ed03e436ae52fe1a1d49ef399a66 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 4 Jun 2015 12:42:56 +0200 Subject: [PATCH] nomad: adding reconcile channel --- nomad/serf.go | 34 ++++++++++++++++++++++++++++++++-- nomad/server.go | 18 ++++++++++++------ 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/nomad/serf.go b/nomad/serf.go index 464b5cfb3..f8b0600a4 100644 --- a/nomad/serf.go +++ b/nomad/serf.go @@ -2,6 +2,12 @@ package nomad import "github.com/hashicorp/serf/serf" +const ( + // StatusReap is used to update the status of a node if we + // are handling a EventMemberReap + StatusReap = serf.MemberStatus(-1) +) + // serfEventHandler is used to handle events from the serf cluster func (s *Server) serfEventHandler() { for { @@ -10,8 +16,10 @@ func (s *Server) serfEventHandler() { switch e.EventType() { case serf.EventMemberJoin: s.nodeJoin(e.(serf.MemberEvent)) + s.localMemberEvent(e.(serf.MemberEvent)) case serf.EventMemberLeave, serf.EventMemberFailed: s.nodeFailed(e.(serf.MemberEvent)) + s.localMemberEvent(e.(serf.MemberEvent)) case serf.EventMemberUpdate, serf.EventMemberReap, serf.EventUser, serf.EventQuery: // Ignore default: @@ -52,8 +60,6 @@ func (s *Server) nodeJoin(me serf.MemberEvent) { } s.peerLock.Unlock() - // TODO: Add as Raft peer - // If we still expecting to bootstrap, may need to handle this if s.config.BootstrapExpect != 0 { s.maybeBootstrap() @@ -153,3 +159,27 @@ func (s *Server) nodeFailed(me serf.MemberEvent) { // } //} } + +// localMemberEvent is used to reconcile Serf events with the +// consistent store if we are the current leader. +func (s *Server) localMemberEvent(me serf.MemberEvent) { + // Do nothing if we are not the leader + if !s.IsLeader() { + return + } + + // Check if this is a reap event + isReap := me.EventType() == serf.EventMemberReap + + // Queue the members for reconciliation + for _, m := range me.Members { + // Change the status if this is a reap event + if isReap { + m.Status = StatusReap + } + select { + case s.reconcileCh <- m: + default: + } + } +} diff --git a/nomad/server.go b/nomad/server.go index 50e3a50a5..9bd086b4d 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -73,6 +73,11 @@ type Server struct { // and automatic clustering within regions. serf *serf.Serf + // reconcileCh is used to pass events from the serf handler + // into the leader manager. Mostly used to handle when servers + // join/leave from the region. + reconcileCh chan serf.Member + // eventCh is used to receive events from the serf cluster eventCh chan serf.Event @@ -105,12 +110,13 @@ func NewServer(config *Config) (*Server, error) { // Create the server s := &Server{ - config: config, - logger: logger, - rpcServer: rpc.NewServer(), - peers: make(map[string][]*serverParts), - eventCh: make(chan serf.Event, 256), - shutdownCh: make(chan struct{}), + config: config, + logger: logger, + rpcServer: rpc.NewServer(), + peers: make(map[string][]*serverParts), + reconcileCh: make(chan serf.Member, 32), + eventCh: make(chan serf.Event, 256), + shutdownCh: make(chan struct{}), } // Initialize the RPC layer