mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
nomad: adding basic structure for raft
This commit is contained in:
57
nomad/config.go
Normal file
57
nomad/config.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
// These are the protocol versions that Nomad can understand
|
||||
const (
|
||||
ProtocolVersionMin uint8 = 1
|
||||
ProtocolVersionMax = 1
|
||||
)
|
||||
|
||||
// Config is used to parameterize the server
|
||||
type Config struct {
|
||||
// Bootstrap mode is used to bring up the first Consul server.
|
||||
// It is required so that it can elect a leader without any
|
||||
// other nodes being present
|
||||
Bootstrap bool
|
||||
|
||||
// DataDir is the directory to store our state in
|
||||
DataDir string
|
||||
|
||||
// LogOutput is the location to write logs to. If this is not set,
|
||||
// logs will go to stderr.
|
||||
LogOutput io.Writer
|
||||
|
||||
// ProtocolVersion is the protocol version to speak. This must be between
|
||||
// ProtocolVersionMin and ProtocolVersionMax.
|
||||
ProtocolVersion uint8
|
||||
|
||||
// RaftConfig is the configuration used for Raft in the local DC
|
||||
RaftConfig *raft.Config
|
||||
}
|
||||
|
||||
// CheckVersion is used to check if the ProtocolVersion is valid
|
||||
func (c *Config) CheckVersion() error {
|
||||
if c.ProtocolVersion < ProtocolVersionMin {
|
||||
return fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
|
||||
c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
||||
} else if c.ProtocolVersion > ProtocolVersionMax {
|
||||
return fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
|
||||
c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DefaultConfig returns the default configuration
|
||||
func DefaultConfig() *Config {
|
||||
c := &Config{
|
||||
ProtocolVersion: ProtocolVersionMax,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
}
|
||||
return c
|
||||
}
|
||||
106
nomad/fsm.go
Normal file
106
nomad/fsm.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
// nomadFSM implements a finite state machine that is used
|
||||
// along with Raft to provide strong consistency. We implement
|
||||
// this outside the Server to avoid exposing this outside the package.
|
||||
type nomadFSM struct {
|
||||
logOutput io.Writer
|
||||
logger *log.Logger
|
||||
path string
|
||||
state *StateStore
|
||||
}
|
||||
|
||||
// nomadSnapshot is used to provide a snapshot of the current
|
||||
// state in a way that can be accessed concurrently with operations
|
||||
// that may modify the live state.
|
||||
type nomadSnapshot struct {
|
||||
state *StateSnapshot
|
||||
}
|
||||
|
||||
// NewFSMPath is used to construct a new FSM with a blank state
|
||||
func NewFSM(path string, logOutput io.Writer) (*nomadFSM, error) {
|
||||
// Create a state store
|
||||
state, err := NewStateStore(logOutput)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fsm := &nomadFSM{
|
||||
logOutput: logOutput,
|
||||
logger: log.New(logOutput, "", log.LstdFlags),
|
||||
path: path,
|
||||
state: state,
|
||||
}
|
||||
return fsm, nil
|
||||
}
|
||||
|
||||
// Close is used to cleanup resources associated with the FSM
|
||||
func (n *nomadFSM) Close() error {
|
||||
return n.state.Close()
|
||||
}
|
||||
|
||||
// State is used to return a handle to the current state
|
||||
func (n *nomadFSM) State() *StateStore {
|
||||
return n.state
|
||||
}
|
||||
|
||||
func (n *nomadFSM) Apply(log *raft.Log) interface{} {
|
||||
buf := log.Data
|
||||
msgType := structs.MessageType(buf[0])
|
||||
|
||||
// Check if this message type should be ignored when unknown. This is
|
||||
// used so that new commands can be added with developer control if older
|
||||
// versions can safely ignore the command, or if they should crash.
|
||||
ignoreUnknown := false
|
||||
if msgType&structs.IgnoreUnknownTypeFlag == structs.IgnoreUnknownTypeFlag {
|
||||
msgType &= ^structs.IgnoreUnknownTypeFlag
|
||||
ignoreUnknown = true
|
||||
}
|
||||
|
||||
switch msgType {
|
||||
default:
|
||||
if ignoreUnknown {
|
||||
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
|
||||
return nil
|
||||
} else {
|
||||
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
||||
}
|
||||
}
|
||||
}
|
||||
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||
defer func(start time.Time) {
|
||||
n.logger.Printf("[INFO] nomad.fsm: snapshot created in %v", time.Now().Sub(start))
|
||||
}(time.Now())
|
||||
|
||||
// Create a new snapshot
|
||||
snap, err := n.state.Snapshot()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &nomadSnapshot{snap}, nil
|
||||
}
|
||||
|
||||
func (n *nomadFSM) Restore(old io.ReadCloser) error {
|
||||
defer old.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *nomadSnapshot) Release() {
|
||||
s.state.Close()
|
||||
}
|
||||
39
nomad/leader.go
Normal file
39
nomad/leader.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package nomad
|
||||
|
||||
// monitorLeadership is used to monitor if we acquire or lose our role
|
||||
// as the leader in the Raft cluster. There is some work the leader is
|
||||
// expected to do, so we must react to changes
|
||||
func (s *Server) monitorLeadership() {
|
||||
leaderCh := s.raft.LeaderCh()
|
||||
var stopCh chan struct{}
|
||||
for {
|
||||
select {
|
||||
case isLeader := <-leaderCh:
|
||||
if isLeader {
|
||||
stopCh = make(chan struct{})
|
||||
go s.leaderLoop(stopCh)
|
||||
s.logger.Printf("[INFO] nomad: cluster leadership acquired")
|
||||
} else if stopCh != nil {
|
||||
close(stopCh)
|
||||
stopCh = nil
|
||||
s.logger.Printf("[INFO] nomad: cluster leadership lost")
|
||||
}
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// leaderLoop runs as long as we are the leader to run various
|
||||
// maintence activities
|
||||
func (s *Server) leaderLoop(stopCh chan struct{}) {
|
||||
// Wait until leadership is lost
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
111
nomad/raft_rpc.go
Normal file
111
nomad/raft_rpc.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
)
|
||||
|
||||
// RaftLayer implements the raft.StreamLayer interface,
|
||||
// so that we can use a single RPC layer for Raft and Nomad
|
||||
type RaftLayer struct {
|
||||
// Addr is the listener address to return
|
||||
addr net.Addr
|
||||
|
||||
// connCh is used to accept connections
|
||||
connCh chan net.Conn
|
||||
|
||||
// TLS wrapper
|
||||
tlsWrap tlsutil.Wrapper
|
||||
|
||||
// Tracks if we are closed
|
||||
closed bool
|
||||
closeCh chan struct{}
|
||||
closeLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewRaftLayer is used to initialize a new RaftLayer which can
|
||||
// be used as a StreamLayer for Raft. If a tlsConfig is provided,
|
||||
// then the connection will use TLS.
|
||||
func NewRaftLayer(addr net.Addr, tlsWrap tlsutil.Wrapper) *RaftLayer {
|
||||
layer := &RaftLayer{
|
||||
addr: addr,
|
||||
connCh: make(chan net.Conn),
|
||||
tlsWrap: tlsWrap,
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
return layer
|
||||
}
|
||||
|
||||
// Handoff is used to hand off a connection to the
|
||||
// RaftLayer. This allows it to be Accept()'ed
|
||||
func (l *RaftLayer) Handoff(c net.Conn) error {
|
||||
select {
|
||||
case l.connCh <- c:
|
||||
return nil
|
||||
case <-l.closeCh:
|
||||
return fmt.Errorf("Raft RPC layer closed")
|
||||
}
|
||||
}
|
||||
|
||||
// Accept is used to return connection which are
|
||||
// dialed to be used with the Raft layer
|
||||
func (l *RaftLayer) Accept() (net.Conn, error) {
|
||||
select {
|
||||
case conn := <-l.connCh:
|
||||
return conn, nil
|
||||
case <-l.closeCh:
|
||||
return nil, fmt.Errorf("Raft RPC layer closed")
|
||||
}
|
||||
}
|
||||
|
||||
// Close is used to stop listening for Raft connections
|
||||
func (l *RaftLayer) Close() error {
|
||||
l.closeLock.Lock()
|
||||
defer l.closeLock.Unlock()
|
||||
|
||||
if !l.closed {
|
||||
l.closed = true
|
||||
close(l.closeCh)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Addr is used to return the address of the listener
|
||||
func (l *RaftLayer) Addr() net.Addr {
|
||||
return l.addr
|
||||
}
|
||||
|
||||
// Dial is used to create a new outgoing connection
|
||||
func (l *RaftLayer) Dial(address string, timeout time.Duration) (net.Conn, error) {
|
||||
conn, err := net.DialTimeout("tcp", address, timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check for tls mode
|
||||
if l.tlsWrap != nil {
|
||||
// Switch the connection into TLS mode
|
||||
if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap the connection in a TLS client
|
||||
conn, err = l.tlsWrap(conn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Write the Raft byte to set the mode
|
||||
_, err = conn.Write([]byte{byte(rpcRaft)})
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
return conn, err
|
||||
}
|
||||
10
nomad/rpc.go
Normal file
10
nomad/rpc.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package nomad
|
||||
|
||||
type RPCType byte
|
||||
|
||||
const (
|
||||
rpcNomad RPCType = iota
|
||||
rpcRaft
|
||||
rpcMultiplex
|
||||
rpcTLS
|
||||
)
|
||||
195
nomad/server.go
Normal file
195
nomad/server.go
Normal file
@@ -0,0 +1,195 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/raft-boltdb"
|
||||
)
|
||||
|
||||
const (
|
||||
raftState = "raft/"
|
||||
tmpStatePath = "tmp/"
|
||||
snapshotsRetained = 2
|
||||
|
||||
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
||||
// This is used to reduce disk I/O for the recently commited entries.
|
||||
raftLogCacheSize = 512
|
||||
)
|
||||
|
||||
// Server is Nomad server which manages the job queues,
|
||||
// schedulers, and notification bus for agents.
|
||||
type Server struct {
|
||||
config *Config
|
||||
logger *log.Logger
|
||||
|
||||
// The raft instance is used among Consul nodes within the
|
||||
// DC to protect operations that require strong consistency
|
||||
raft *raft.Raft
|
||||
raftLayer *RaftLayer
|
||||
raftPeers raft.PeerStore
|
||||
raftStore *raftboltdb.BoltStore
|
||||
raftTransport *raft.NetworkTransport
|
||||
|
||||
// fsm is the state machine used with Raft
|
||||
fsm *nomadFSM
|
||||
|
||||
shutdown bool
|
||||
shutdownCh chan struct{}
|
||||
shutdownLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewServer is used to construct a new Nomad server from the
|
||||
// configuration, potentially returning an error
|
||||
func NewServer(config *Config) (*Server, error) {
|
||||
// Check the protocol version
|
||||
if err := config.CheckVersion(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Ensure we have a log output
|
||||
if config.LogOutput == nil {
|
||||
config.LogOutput = os.Stderr
|
||||
}
|
||||
|
||||
// Create a logger
|
||||
logger := log.New(config.LogOutput, "", log.LstdFlags)
|
||||
|
||||
// Create the server
|
||||
s := &Server{
|
||||
config: config,
|
||||
logger: logger,
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Initialize the Raft server
|
||||
if err := s.setupRaft(); err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start Raft: %v", err)
|
||||
}
|
||||
|
||||
// Done
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Shutdown is used to shutdown the server
|
||||
func (s *Server) Shutdown() error {
|
||||
s.logger.Printf("[INFO] nomad: shutting down server")
|
||||
s.shutdownLock.Lock()
|
||||
defer s.shutdownLock.Unlock()
|
||||
|
||||
if s.shutdown {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.shutdown = true
|
||||
close(s.shutdownCh)
|
||||
|
||||
if s.raft != nil {
|
||||
s.raftTransport.Close()
|
||||
s.raftLayer.Close()
|
||||
future := s.raft.Shutdown()
|
||||
if err := future.Error(); err != nil {
|
||||
s.logger.Printf("[WARN] nomad: Error shutting down raft: %s", err)
|
||||
}
|
||||
s.raftStore.Close()
|
||||
}
|
||||
|
||||
// Close the fsm
|
||||
if s.fsm != nil {
|
||||
s.fsm.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupRaft is used to setup and initialize Raft
|
||||
func (s *Server) setupRaft() error {
|
||||
// If we are in bootstrap mode, enable a single node cluster
|
||||
if s.config.Bootstrap {
|
||||
s.config.RaftConfig.EnableSingleNode = true
|
||||
}
|
||||
|
||||
// Create the base state path
|
||||
statePath := filepath.Join(s.config.DataDir, tmpStatePath)
|
||||
if err := os.RemoveAll(statePath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ensurePath(statePath, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the FSM
|
||||
var err error
|
||||
s.fsm, err = NewFSM(statePath, s.config.LogOutput)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the base raft path
|
||||
path := filepath.Join(s.config.DataDir, raftState)
|
||||
if err := ensurePath(path, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the backend raft store for logs and stable storage
|
||||
store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.raftStore = store
|
||||
|
||||
// Wrap the store in a LogCache to improve performance
|
||||
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the snapshot store
|
||||
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a transport layer
|
||||
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
|
||||
s.raftTransport = trans
|
||||
|
||||
// Setup the peer store
|
||||
s.raftPeers = raft.NewJSONPeers(path, trans)
|
||||
|
||||
// Ensure local host is always included if we are in bootstrap mode
|
||||
if s.config.Bootstrap {
|
||||
peers, err := s.raftPeers.Peers()
|
||||
if err != nil {
|
||||
store.Close()
|
||||
return err
|
||||
}
|
||||
if !raft.PeerContained(peers, trans.LocalAddr()) {
|
||||
s.raftPeers.SetPeers(raft.AddUniquePeer(peers, trans.LocalAddr()))
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we set the LogOutput
|
||||
s.config.RaftConfig.LogOutput = s.config.LogOutput
|
||||
|
||||
// Setup the Raft store
|
||||
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, cacheStore, store,
|
||||
snapshots, s.raftPeers, trans)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
trans.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Start monitoring leadership
|
||||
go s.monitorLeadership()
|
||||
return nil
|
||||
}
|
||||
43
nomad/state_store.go
Normal file
43
nomad/state_store.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
)
|
||||
|
||||
// The StateStore is responsible for maintaining all the Consul
|
||||
// state. It is manipulated by the FSM which maintains consistency
|
||||
// through the use of Raft. The goals of the StateStore are to provide
|
||||
// high concurrency for read operations without blocking writes, and
|
||||
// to provide write availability in the face of reads.
|
||||
type StateStore struct {
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// StateSnapshot is used to provide a point-in-time snapshot
|
||||
type StateSnapshot struct {
|
||||
store *StateStore
|
||||
}
|
||||
|
||||
// Close is used to abort the transaction and allow for cleanup
|
||||
func (s *StateSnapshot) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewStateStore is used to create a new state store
|
||||
func NewStateStore(logOutput io.Writer) (*StateStore, error) {
|
||||
s := &StateStore{
|
||||
logger: log.New(logOutput, "", log.LstdFlags),
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Close is used to safely shutdown the state store
|
||||
func (s *StateStore) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Snapshot is used to create a point in time snapshot
|
||||
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
|
||||
return nil, nil
|
||||
}
|
||||
16
nomad/structs/structs.go
Normal file
16
nomad/structs/structs.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package structs
|
||||
|
||||
type MessageType uint8
|
||||
|
||||
const (
|
||||
RegisterRequestType MessageType = iota
|
||||
)
|
||||
|
||||
const (
|
||||
// IgnoreUnknownTypeFlag is set along with a MessageType
|
||||
// to indicate that the message type can be safely ignored
|
||||
// if it is not recognized. This is for future proofing, so
|
||||
// that new commands can be added in a way that won't cause
|
||||
// old servers to crash when the FSM attempts to process them.
|
||||
IgnoreUnknownTypeFlag MessageType = 128
|
||||
)
|
||||
14
nomad/util.go
Normal file
14
nomad/util.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// ensurePath is used to make sure a path exists
|
||||
func ensurePath(path string, dir bool) error {
|
||||
if !dir {
|
||||
path = filepath.Dir(path)
|
||||
}
|
||||
return os.MkdirAll(path, 0755)
|
||||
}
|
||||
Reference in New Issue
Block a user