mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
nomad: adding reconcile channel
This commit is contained in:
@@ -2,6 +2,12 @@ package nomad
|
||||
|
||||
import "github.com/hashicorp/serf/serf"
|
||||
|
||||
const (
|
||||
// StatusReap is used to update the status of a node if we
|
||||
// are handling a EventMemberReap
|
||||
StatusReap = serf.MemberStatus(-1)
|
||||
)
|
||||
|
||||
// serfEventHandler is used to handle events from the serf cluster
|
||||
func (s *Server) serfEventHandler() {
|
||||
for {
|
||||
@@ -10,8 +16,10 @@ func (s *Server) serfEventHandler() {
|
||||
switch e.EventType() {
|
||||
case serf.EventMemberJoin:
|
||||
s.nodeJoin(e.(serf.MemberEvent))
|
||||
s.localMemberEvent(e.(serf.MemberEvent))
|
||||
case serf.EventMemberLeave, serf.EventMemberFailed:
|
||||
s.nodeFailed(e.(serf.MemberEvent))
|
||||
s.localMemberEvent(e.(serf.MemberEvent))
|
||||
case serf.EventMemberUpdate, serf.EventMemberReap,
|
||||
serf.EventUser, serf.EventQuery: // Ignore
|
||||
default:
|
||||
@@ -52,8 +60,6 @@ func (s *Server) nodeJoin(me serf.MemberEvent) {
|
||||
}
|
||||
s.peerLock.Unlock()
|
||||
|
||||
// TODO: Add as Raft peer
|
||||
|
||||
// If we still expecting to bootstrap, may need to handle this
|
||||
if s.config.BootstrapExpect != 0 {
|
||||
s.maybeBootstrap()
|
||||
@@ -153,3 +159,27 @@ func (s *Server) nodeFailed(me serf.MemberEvent) {
|
||||
// }
|
||||
//}
|
||||
}
|
||||
|
||||
// localMemberEvent is used to reconcile Serf events with the
|
||||
// consistent store if we are the current leader.
|
||||
func (s *Server) localMemberEvent(me serf.MemberEvent) {
|
||||
// Do nothing if we are not the leader
|
||||
if !s.IsLeader() {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if this is a reap event
|
||||
isReap := me.EventType() == serf.EventMemberReap
|
||||
|
||||
// Queue the members for reconciliation
|
||||
for _, m := range me.Members {
|
||||
// Change the status if this is a reap event
|
||||
if isReap {
|
||||
m.Status = StatusReap
|
||||
}
|
||||
select {
|
||||
case s.reconcileCh <- m:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,6 +73,11 @@ type Server struct {
|
||||
// and automatic clustering within regions.
|
||||
serf *serf.Serf
|
||||
|
||||
// reconcileCh is used to pass events from the serf handler
|
||||
// into the leader manager. Mostly used to handle when servers
|
||||
// join/leave from the region.
|
||||
reconcileCh chan serf.Member
|
||||
|
||||
// eventCh is used to receive events from the serf cluster
|
||||
eventCh chan serf.Event
|
||||
|
||||
@@ -105,12 +110,13 @@ func NewServer(config *Config) (*Server, error) {
|
||||
|
||||
// Create the server
|
||||
s := &Server{
|
||||
config: config,
|
||||
logger: logger,
|
||||
rpcServer: rpc.NewServer(),
|
||||
peers: make(map[string][]*serverParts),
|
||||
eventCh: make(chan serf.Event, 256),
|
||||
shutdownCh: make(chan struct{}),
|
||||
config: config,
|
||||
logger: logger,
|
||||
rpcServer: rpc.NewServer(),
|
||||
peers: make(map[string][]*serverParts),
|
||||
reconcileCh: make(chan serf.Member, 32),
|
||||
eventCh: make(chan serf.Event, 256),
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Initialize the RPC layer
|
||||
|
||||
Reference in New Issue
Block a user