nomad: testing serf join

This commit is contained in:
Armon Dadgar
2015-06-04 12:33:12 +02:00
parent 54368c01f9
commit 504305e23a
5 changed files with 125 additions and 40 deletions

View File

@@ -26,46 +26,39 @@ func (s *Server) serfEventHandler() {
// nodeJoin is used to handle join events on the serf cluster
func (s *Server) nodeJoin(me serf.MemberEvent) {
//for _, m := range me.Members {
// ok, parts := isConsulServer(m)
// if !ok {
// if wan {
// s.logger.Printf("[WARN] consul: non-server in WAN pool: %s", m.Name)
// }
// continue
// }
// s.logger.Printf("[INFO] consul: adding server %s", parts)
for _, m := range me.Members {
ok, parts := isNomadServer(m)
if !ok {
s.logger.Printf("[WARN] nomad: non-server in gossip pool: %s", m.Name)
continue
}
s.logger.Printf("[INFO] nomad: adding server %s", parts)
// // Check if this server is known
// found := false
// s.remoteLock.Lock()
// existing := s.remoteConsuls[parts.Datacenter]
// for idx, e := range existing {
// if e.Name == parts.Name {
// existing[idx] = parts
// found = true
// break
// }
// }
// Check if this server is known
found := false
s.peerLock.Lock()
existing := s.peers[parts.Region]
for idx, e := range existing {
if e.Name == parts.Name {
existing[idx] = parts
found = true
break
}
}
// // Add ot the list if not known
// if !found {
// s.remoteConsuls[parts.Datacenter] = append(existing, parts)
// }
// s.remoteLock.Unlock()
// Add ot the list if not known
if !found {
s.peers[parts.Region] = append(existing, parts)
}
s.peerLock.Unlock()
// // Add to the local list as well
// if !wan && parts.Datacenter == s.config.Datacenter {
// s.localLock.Lock()
// s.localConsuls[parts.Addr.String()] = parts
// s.localLock.Unlock()
// }
// TODO: Add as Raft peer
// // If we still expecting to bootstrap, may need to handle this
// if s.config.BootstrapExpect != 0 {
// s.maybeBootstrap()
// }
//}
// If we still expecting to bootstrap, may need to handle this
if s.config.BootstrapExpect != 0 {
s.maybeBootstrap()
}
}
}
// maybeBootsrap is used to handle bootstrapping when a new consul server joins

27
nomad/serf_test.go Normal file
View File

@@ -0,0 +1,27 @@
package nomad
import (
"fmt"
"testing"
)
func TestNomad_JoinPeer(t *testing.T) {
s1 := testServer(t, nil)
s2 := testServer(t, nil)
s2Addr := fmt.Sprintf("127.0.0.1:%d", s2.config.SerfConfig.MemberlistConfig.BindPort)
num, err := s1.Join([]string{s2Addr})
if err != nil {
t.Fatalf("err: %v", err)
}
if num != 1 {
t.Fatalf("bad: %d", num)
}
if members := s1.Members(); len(members) != 2 {
t.Fatalf("bad: %#v", members)
}
if members := s2.Members(); len(members) != 2 {
t.Fatalf("bad: %#v", members)
}
}

View File

@@ -63,6 +63,11 @@ type Server struct {
// rpcTLS is the TLS config for incoming TLS requests
rpcTLS *tls.Config
// peers is used to track the known Nomad servers. This is
// used for region forwarding and clustering.
peers map[string][]*serverParts
peerLock sync.RWMutex
// serf is the Serf cluster containing only Nomad
// servers. This is used for multi-region federation
// and automatic clustering within regions.
@@ -103,6 +108,7 @@ func NewServer(config *Config) (*Server, error) {
config: config,
logger: logger,
rpcServer: rpc.NewServer(),
peers: make(map[string][]*serverParts),
eventCh: make(chan serf.Event, 256),
shutdownCh: make(chan struct{}),
}
@@ -395,7 +401,12 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.EventCh = ch
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
if !s.config.DevMode {
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
if err := ensurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
}
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = true
conf.Merge = &serfMergeDelegate{}
@@ -404,9 +415,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf.EnableNameConflictResolution = false
if err := ensurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}

51
nomad/server_test.go Normal file
View File

@@ -0,0 +1,51 @@
package nomad
import (
"fmt"
"net"
"sync/atomic"
"testing"
"time"
)
var nextPort uint32 = 15000
func getPort() int {
return int(atomic.AddUint32(&nextPort, 1))
}
func testServer(t *testing.T, cb func(*Config)) *Server {
// Setup the default settings
config := DefaultConfig()
config.DevMode = true
config.RPCAddr = &net.TCPAddr{
IP: []byte{127, 0, 0, 1},
Port: getPort(),
}
config.NodeName = fmt.Sprintf("Node %d", config.RPCAddr.Port)
// Tighten the Serf timing
config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfConfig.MemberlistConfig.BindPort = getPort()
config.SerfConfig.MemberlistConfig.SuspicionMult = 2
config.SerfConfig.MemberlistConfig.ProbeTimeout = 50 * time.Millisecond
config.SerfConfig.MemberlistConfig.ProbeInterval = 100 * time.Millisecond
config.SerfConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
// Tighten the Raft timing
config.RaftConfig.LeaderLeaseTimeout = 20 * time.Millisecond
config.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
config.RaftConfig.ElectionTimeout = 40 * time.Millisecond
// Invoke the callback if any
if cb != nil {
cb(config)
}
// Create server
server, err := NewServer(config)
if err != nil {
t.Fatalf("err: %v", err)
}
return server
}

View File

@@ -1,6 +1,7 @@
package nomad
import (
"fmt"
"net"
"os"
"path/filepath"
@@ -42,6 +43,11 @@ type serverParts struct {
Addr net.Addr
}
func (s *serverParts) String() string {
return fmt.Sprintf("%s (Addr: %s) (DC: %s)",
s.Name, s.Addr, s.Datacenter)
}
// Returns if a member is a Nomad server. Returns a boolean,
// and a struct with the various important components
func isNomadServer(m serf.Member) (bool, *serverParts) {