mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
nomad: more skeleton
This commit is contained in:
@@ -3,6 +3,7 @@ package nomad
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
@@ -13,6 +14,10 @@ const (
|
||||
ProtocolVersionMax = 1
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultRPCAddr = &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 4646}
|
||||
)
|
||||
|
||||
// Config is used to parameterize the server
|
||||
type Config struct {
|
||||
// Bootstrap mode is used to bring up the first Consul server.
|
||||
@@ -35,8 +40,21 @@ type Config struct {
|
||||
// ProtocolVersionMin and ProtocolVersionMax.
|
||||
ProtocolVersion uint8
|
||||
|
||||
// RPCAddr is the RPC address used by Nomad. This should be reachable
|
||||
// by the other servers and clients
|
||||
RPCAddr *net.TCPAddr
|
||||
|
||||
// RPCAdvertise is the address that is advertised to other nodes for
|
||||
// the RPC endpoint. This can differ from the RPC address, if for example
|
||||
// the RPCAddr is unspecified "0.0.0.0:4646", but this address must be
|
||||
// reachable
|
||||
RPCAdvertise *net.TCPAddr
|
||||
|
||||
// RaftConfig is the configuration used for Raft in the local DC
|
||||
RaftConfig *raft.Config
|
||||
|
||||
// RequireTLS ensures that all RPC traffic is protected with TLS
|
||||
RequireTLS bool
|
||||
}
|
||||
|
||||
// CheckVersion is used to check if the ProtocolVersion is valid
|
||||
@@ -56,6 +74,7 @@ func DefaultConfig() *Config {
|
||||
c := &Config{
|
||||
ProtocolVersion: ProtocolVersionMax,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
RPCAddr: DefaultRPCAddr,
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
136
nomad/rpc.go
136
nomad/rpc.go
@@ -1,10 +1,138 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/yamux"
|
||||
)
|
||||
|
||||
type RPCType byte
|
||||
|
||||
const (
|
||||
rpcNomad RPCType = iota
|
||||
rpcRaft
|
||||
rpcMultiplex
|
||||
rpcTLS
|
||||
rpcNomad RPCType = 0x01
|
||||
rpcRaft = 0x02
|
||||
rpcMultiplex = 0x03
|
||||
rpcTLS = 0x04
|
||||
)
|
||||
|
||||
const (
|
||||
// rpcHTTPSMagic is used to detect an incoming HTTPS
|
||||
// request. TLS starts with the 0x16 magic byte.
|
||||
rpcHTTPSMagic = 0x16
|
||||
|
||||
// rpcHTTPMagic is used to detect an incoming HTTP
|
||||
// request. The request starts with 'HTTP'
|
||||
rpcHTTPMagic = 0x48
|
||||
)
|
||||
|
||||
// listen is used to listen for incoming RPC connections
|
||||
func (s *Server) listen() {
|
||||
for {
|
||||
// Accept a connection
|
||||
conn, err := s.rpcListener.Accept()
|
||||
if err != nil {
|
||||
if s.shutdown {
|
||||
return
|
||||
}
|
||||
s.logger.Printf("[ERR] nomad.rpc: failed to accept RPC conn: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
go s.handleConn(conn, false)
|
||||
metrics.IncrCounter([]string{"nomad", "rpc", "accept_conn"}, 1)
|
||||
}
|
||||
}
|
||||
|
||||
// handleConn is used to determine if this is a Raft or
|
||||
// Nomad type RPC connection and invoke the correct handler
|
||||
func (s *Server) handleConn(conn net.Conn, isTLS bool) {
|
||||
// Read a single byte
|
||||
buf := make([]byte, 1)
|
||||
if _, err := conn.Read(buf); err != nil {
|
||||
if err != io.EOF {
|
||||
s.logger.Printf("[ERR] nomad.rpc: failed to read byte: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// Enforce TLS if VerifyIncoming is set
|
||||
if s.config.RequireTLS && !isTLS && RPCType(buf[0]) != rpcTLS {
|
||||
s.logger.Printf("[WARN] nomad.rpc: Non-TLS connection attempted with RequireTLS set")
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// Switch on the byte
|
||||
switch RPCType(buf[0]) {
|
||||
case rpcNomad:
|
||||
s.handleNomadConn(conn)
|
||||
|
||||
case rpcRaft:
|
||||
metrics.IncrCounter([]string{"nomad", "rpc", "raft_handoff"}, 1)
|
||||
s.raftLayer.Handoff(conn)
|
||||
|
||||
case rpcMultiplex:
|
||||
s.handleMultiplex(conn)
|
||||
|
||||
case rpcTLS:
|
||||
if s.rpcTLS == nil {
|
||||
s.logger.Printf("[WARN] nomad.rpc: TLS connection attempted, server not configured for TLS")
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
conn = tls.Server(conn, s.rpcTLS)
|
||||
s.handleConn(conn, true)
|
||||
|
||||
default:
|
||||
s.logger.Printf("[ERR] nomad.rpc: unrecognized RPC byte: %v", buf[0])
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// handleMultiplex is used to multiplex a single incoming connection
|
||||
// using the Yamux multiplexer
|
||||
func (s *Server) handleMultiplex(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
conf := yamux.DefaultConfig()
|
||||
conf.LogOutput = s.config.LogOutput
|
||||
server, _ := yamux.Server(conn, conf)
|
||||
for {
|
||||
sub, err := server.Accept()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
s.logger.Printf("[ERR] nomad.rpc: multiplex conn accept failed: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
go s.handleNomadConn(sub)
|
||||
}
|
||||
}
|
||||
|
||||
// handleNomadConn is used to service a single Nomad RPC connection
|
||||
func (s *Server) handleNomadConn(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
rpcCodec := msgpackrpc.NewServerCodec(conn)
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {
|
||||
if err != io.EOF && !strings.Contains(err.Error(), "closed") {
|
||||
s.logger.Printf("[ERR] nomad.rpc: RPC error: %v (%v)", err, conn)
|
||||
metrics.IncrCounter([]string{"nomad", "rpc", "request_error"}, 1)
|
||||
}
|
||||
return
|
||||
}
|
||||
metrics.IncrCounter([]string{"nomad", "rpc", "request"}, 1)
|
||||
}
|
||||
}
|
||||
|
||||
149
nomad/server.go
149
nomad/server.go
@@ -1,13 +1,20 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/raft-boltdb"
|
||||
)
|
||||
@@ -27,6 +34,9 @@ type Server struct {
|
||||
config *Config
|
||||
logger *log.Logger
|
||||
|
||||
// Endpoints holds our RPC endpoints
|
||||
endpoints endpoints
|
||||
|
||||
// The raft instance is used among Consul nodes within the
|
||||
// DC to protect operations that require strong consistency
|
||||
raft *raft.Raft
|
||||
@@ -39,11 +49,23 @@ type Server struct {
|
||||
// fsm is the state machine used with Raft
|
||||
fsm *nomadFSM
|
||||
|
||||
// rpcListener is used to listen for incoming connections
|
||||
rpcListener net.Listener
|
||||
rpcServer *rpc.Server
|
||||
|
||||
// rpcTLS is the TLS config for incoming TLS requests
|
||||
rpcTLS *tls.Config
|
||||
|
||||
shutdown bool
|
||||
shutdownCh chan struct{}
|
||||
shutdownLock sync.Mutex
|
||||
}
|
||||
|
||||
// Holds the RPC endpoints
|
||||
type endpoints struct {
|
||||
Status *Status
|
||||
}
|
||||
|
||||
// NewServer is used to construct a new Nomad server from the
|
||||
// configuration, potentially returning an error
|
||||
func NewServer(config *Config) (*Server, error) {
|
||||
@@ -67,12 +89,22 @@ func NewServer(config *Config) (*Server, error) {
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Initialize the RPC layer
|
||||
// TODO: TLS...
|
||||
if err := s.setupRPC(nil); err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the Raft server
|
||||
if err := s.setupRaft(); err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start Raft: %v", err)
|
||||
}
|
||||
|
||||
// Start the RPC listeners
|
||||
go s.listen()
|
||||
|
||||
// Done
|
||||
return s, nil
|
||||
}
|
||||
@@ -102,6 +134,11 @@ func (s *Server) Shutdown() error {
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown the RPC listener
|
||||
if s.rpcListener != nil {
|
||||
s.rpcListener.Close()
|
||||
}
|
||||
|
||||
// Close the fsm
|
||||
if s.fsm != nil {
|
||||
s.fsm.Close()
|
||||
@@ -109,6 +146,46 @@ func (s *Server) Shutdown() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupRPC is used to setup the RPC listener
|
||||
func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
|
||||
// Create endpoints
|
||||
s.endpoints.Status = &Status{s}
|
||||
|
||||
// Register the handlers
|
||||
s.rpcServer.Register(s.endpoints.Status)
|
||||
|
||||
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.rpcListener = list
|
||||
|
||||
var advertise net.Addr
|
||||
if s.config.RPCAdvertise != nil {
|
||||
advertise = s.config.RPCAdvertise
|
||||
} else {
|
||||
advertise = s.rpcListener.Addr()
|
||||
}
|
||||
|
||||
// Verify that we have a usable advertise address
|
||||
addr, ok := advertise.(*net.TCPAddr)
|
||||
if !ok {
|
||||
list.Close()
|
||||
return fmt.Errorf("RPC advertise address is not a TCP Address: %v", addr)
|
||||
}
|
||||
if addr.IP.IsUnspecified() {
|
||||
list.Close()
|
||||
return fmt.Errorf("RPC advertise address is not advertisable: %v", addr)
|
||||
}
|
||||
|
||||
// Provide a DC specific wrapper. Raft replication is only
|
||||
// ever done in the same datacenter, so we can provide it as a constant.
|
||||
// wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap)
|
||||
// TODO: TLS...
|
||||
s.raftLayer = NewRaftLayer(advertise, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupRaft is used to setup and initialize Raft
|
||||
func (s *Server) setupRaft() error {
|
||||
// If we are in bootstrap mode, enable a single node cluster
|
||||
@@ -211,3 +288,75 @@ func (s *Server) setupRaft() error {
|
||||
go s.monitorLeadership()
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsLeader checks if this server is the cluster leader
|
||||
func (s *Server) IsLeader() bool {
|
||||
return s.raft.State() == raft.Leader
|
||||
}
|
||||
|
||||
// inmemCodec is used to do an RPC call without going over a network
|
||||
type inmemCodec struct {
|
||||
method string
|
||||
args interface{}
|
||||
reply interface{}
|
||||
err error
|
||||
}
|
||||
|
||||
func (i *inmemCodec) ReadRequestHeader(req *rpc.Request) error {
|
||||
req.ServiceMethod = i.method
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *inmemCodec) ReadRequestBody(args interface{}) error {
|
||||
sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.args)))
|
||||
dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(args)))
|
||||
dst.Set(sourceValue)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *inmemCodec) WriteResponse(resp *rpc.Response, reply interface{}) error {
|
||||
if resp.Error != "" {
|
||||
i.err = errors.New(resp.Error)
|
||||
return nil
|
||||
}
|
||||
sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(reply)))
|
||||
dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.reply)))
|
||||
dst.Set(sourceValue)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *inmemCodec) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RPC is used to make a local RPC call
|
||||
func (s *Server) RPC(method string, args interface{}, reply interface{}) error {
|
||||
codec := &inmemCodec{
|
||||
method: method,
|
||||
args: args,
|
||||
reply: reply,
|
||||
}
|
||||
if err := s.rpcServer.ServeRequest(codec); err != nil {
|
||||
return err
|
||||
}
|
||||
return codec.err
|
||||
}
|
||||
|
||||
// Stats is used to return statistics for debugging and insight
|
||||
// for various sub-systems
|
||||
func (s *Server) Stats() map[string]map[string]string {
|
||||
toString := func(v uint64) string {
|
||||
return strconv.FormatUint(v, 10)
|
||||
}
|
||||
stats := map[string]map[string]string{
|
||||
"nomad": map[string]string{
|
||||
"server": "true",
|
||||
"leader": fmt.Sprintf("%v", s.IsLeader()),
|
||||
"bootstrap": fmt.Sprintf("%v", s.config.Bootstrap),
|
||||
"known_regions": toString(uint64(0)),
|
||||
},
|
||||
"raft": s.raft.Stats(),
|
||||
"runtime": runtimeStats(),
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
33
nomad/status_endpoint.go
Normal file
33
nomad/status_endpoint.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package nomad
|
||||
|
||||
// Status endpoint is used to check on server status
|
||||
type Status struct {
|
||||
server *Server
|
||||
}
|
||||
|
||||
// Ping is used to just check for connectivity
|
||||
func (s *Status) Ping(args struct{}, reply *struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Leader is used to get the address of the leader
|
||||
func (s *Status) Leader(args struct{}, reply *string) error {
|
||||
leader := s.server.raft.Leader()
|
||||
if leader != "" {
|
||||
*reply = leader
|
||||
} else {
|
||||
*reply = ""
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Peers is used to get all the Raft peers
|
||||
func (s *Status) Peers(args struct{}, reply *[]string) error {
|
||||
peers, err := s.server.raftPeers.Peers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*reply = peers
|
||||
return nil
|
||||
}
|
||||
@@ -3,6 +3,8 @@ package nomad
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// ensurePath is used to make sure a path exists
|
||||
@@ -12,3 +14,15 @@ func ensurePath(path string, dir bool) error {
|
||||
}
|
||||
return os.MkdirAll(path, 0755)
|
||||
}
|
||||
|
||||
// runtimeStats is used to return various runtime information
|
||||
func runtimeStats() map[string]string {
|
||||
return map[string]string{
|
||||
"os": runtime.GOOS,
|
||||
"arch": runtime.GOARCH,
|
||||
"version": runtime.Version(),
|
||||
"max_procs": strconv.FormatInt(int64(runtime.GOMAXPROCS(0)), 10),
|
||||
"goroutines": strconv.FormatInt(int64(runtime.NumGoroutine()), 10),
|
||||
"cpu_count": strconv.FormatInt(int64(runtime.NumCPU()), 10),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user