From 504305e23a06d43234e1da523a8e04780b501137 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 4 Jun 2015 12:33:12 +0200 Subject: [PATCH] nomad: testing serf join --- nomad/serf.go | 65 ++++++++++++++++++++------------------------ nomad/serf_test.go | 27 ++++++++++++++++++ nomad/server.go | 16 ++++++++--- nomad/server_test.go | 51 ++++++++++++++++++++++++++++++++++ nomad/util.go | 6 ++++ 5 files changed, 125 insertions(+), 40 deletions(-) create mode 100644 nomad/serf_test.go create mode 100644 nomad/server_test.go diff --git a/nomad/serf.go b/nomad/serf.go index e51b9b661..464b5cfb3 100644 --- a/nomad/serf.go +++ b/nomad/serf.go @@ -26,46 +26,39 @@ func (s *Server) serfEventHandler() { // nodeJoin is used to handle join events on the serf cluster func (s *Server) nodeJoin(me serf.MemberEvent) { - //for _, m := range me.Members { - // ok, parts := isConsulServer(m) - // if !ok { - // if wan { - // s.logger.Printf("[WARN] consul: non-server in WAN pool: %s", m.Name) - // } - // continue - // } - // s.logger.Printf("[INFO] consul: adding server %s", parts) + for _, m := range me.Members { + ok, parts := isNomadServer(m) + if !ok { + s.logger.Printf("[WARN] nomad: non-server in gossip pool: %s", m.Name) + continue + } + s.logger.Printf("[INFO] nomad: adding server %s", parts) - // // Check if this server is known - // found := false - // s.remoteLock.Lock() - // existing := s.remoteConsuls[parts.Datacenter] - // for idx, e := range existing { - // if e.Name == parts.Name { - // existing[idx] = parts - // found = true - // break - // } - // } + // Check if this server is known + found := false + s.peerLock.Lock() + existing := s.peers[parts.Region] + for idx, e := range existing { + if e.Name == parts.Name { + existing[idx] = parts + found = true + break + } + } - // // Add ot the list if not known - // if !found { - // s.remoteConsuls[parts.Datacenter] = append(existing, parts) - // } - // s.remoteLock.Unlock() + // Add ot the list if not known + if !found { + s.peers[parts.Region] = append(existing, parts) + } + s.peerLock.Unlock() - // // Add to the local list as well - // if !wan && parts.Datacenter == s.config.Datacenter { - // s.localLock.Lock() - // s.localConsuls[parts.Addr.String()] = parts - // s.localLock.Unlock() - // } + // TODO: Add as Raft peer - // // If we still expecting to bootstrap, may need to handle this - // if s.config.BootstrapExpect != 0 { - // s.maybeBootstrap() - // } - //} + // If we still expecting to bootstrap, may need to handle this + if s.config.BootstrapExpect != 0 { + s.maybeBootstrap() + } + } } // maybeBootsrap is used to handle bootstrapping when a new consul server joins diff --git a/nomad/serf_test.go b/nomad/serf_test.go new file mode 100644 index 000000000..5b19e9e91 --- /dev/null +++ b/nomad/serf_test.go @@ -0,0 +1,27 @@ +package nomad + +import ( + "fmt" + "testing" +) + +func TestNomad_JoinPeer(t *testing.T) { + s1 := testServer(t, nil) + s2 := testServer(t, nil) + s2Addr := fmt.Sprintf("127.0.0.1:%d", s2.config.SerfConfig.MemberlistConfig.BindPort) + + num, err := s1.Join([]string{s2Addr}) + if err != nil { + t.Fatalf("err: %v", err) + } + if num != 1 { + t.Fatalf("bad: %d", num) + } + + if members := s1.Members(); len(members) != 2 { + t.Fatalf("bad: %#v", members) + } + if members := s2.Members(); len(members) != 2 { + t.Fatalf("bad: %#v", members) + } +} diff --git a/nomad/server.go b/nomad/server.go index ea600de8c..50e3a50a5 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -63,6 +63,11 @@ type Server struct { // rpcTLS is the TLS config for incoming TLS requests rpcTLS *tls.Config + // peers is used to track the known Nomad servers. This is + // used for region forwarding and clustering. + peers map[string][]*serverParts + peerLock sync.RWMutex + // serf is the Serf cluster containing only Nomad // servers. This is used for multi-region federation // and automatic clustering within regions. @@ -103,6 +108,7 @@ func NewServer(config *Config) (*Server, error) { config: config, logger: logger, rpcServer: rpc.NewServer(), + peers: make(map[string][]*serverParts), eventCh: make(chan serf.Event, 256), shutdownCh: make(chan struct{}), } @@ -395,7 +401,12 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( conf.MemberlistConfig.LogOutput = s.config.LogOutput conf.LogOutput = s.config.LogOutput conf.EventCh = ch - conf.SnapshotPath = filepath.Join(s.config.DataDir, path) + if !s.config.DevMode { + conf.SnapshotPath = filepath.Join(s.config.DataDir, path) + if err := ensurePath(conf.SnapshotPath, false); err != nil { + return nil, err + } + } conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion] conf.RejoinAfterLeave = true conf.Merge = &serfMergeDelegate{} @@ -404,9 +415,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( // When enabled, the Serf gossip may just turn off if we are the minority // node which is rather unexpected. conf.EnableNameConflictResolution = false - if err := ensurePath(conf.SnapshotPath, false); err != nil { - return nil, err - } return serf.Create(conf) } diff --git a/nomad/server_test.go b/nomad/server_test.go new file mode 100644 index 000000000..2bb4b1e0c --- /dev/null +++ b/nomad/server_test.go @@ -0,0 +1,51 @@ +package nomad + +import ( + "fmt" + "net" + "sync/atomic" + "testing" + "time" +) + +var nextPort uint32 = 15000 + +func getPort() int { + return int(atomic.AddUint32(&nextPort, 1)) +} + +func testServer(t *testing.T, cb func(*Config)) *Server { + // Setup the default settings + config := DefaultConfig() + config.DevMode = true + config.RPCAddr = &net.TCPAddr{ + IP: []byte{127, 0, 0, 1}, + Port: getPort(), + } + config.NodeName = fmt.Sprintf("Node %d", config.RPCAddr.Port) + + // Tighten the Serf timing + config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1" + config.SerfConfig.MemberlistConfig.BindPort = getPort() + config.SerfConfig.MemberlistConfig.SuspicionMult = 2 + config.SerfConfig.MemberlistConfig.ProbeTimeout = 50 * time.Millisecond + config.SerfConfig.MemberlistConfig.ProbeInterval = 100 * time.Millisecond + config.SerfConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond + + // Tighten the Raft timing + config.RaftConfig.LeaderLeaseTimeout = 20 * time.Millisecond + config.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond + config.RaftConfig.ElectionTimeout = 40 * time.Millisecond + + // Invoke the callback if any + if cb != nil { + cb(config) + } + + // Create server + server, err := NewServer(config) + if err != nil { + t.Fatalf("err: %v", err) + } + return server +} diff --git a/nomad/util.go b/nomad/util.go index 5419aa353..be1715dc9 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -1,6 +1,7 @@ package nomad import ( + "fmt" "net" "os" "path/filepath" @@ -42,6 +43,11 @@ type serverParts struct { Addr net.Addr } +func (s *serverParts) String() string { + return fmt.Sprintf("%s (Addr: %s) (DC: %s)", + s.Name, s.Addr, s.Datacenter) +} + // Returns if a member is a Nomad server. Returns a boolean, // and a struct with the various important components func isNomadServer(m serf.Member) (bool, *serverParts) {