mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
nomad: support DevMode configuration
This commit is contained in:
@@ -23,6 +23,10 @@ type Config struct {
|
||||
// DataDir is the directory to store our state in
|
||||
DataDir string
|
||||
|
||||
// DevMode is used for development purposes only and limits the
|
||||
// use of persistence or state.
|
||||
DevMode bool
|
||||
|
||||
// LogOutput is the location to write logs to. If this is not set,
|
||||
// logs will go to stderr.
|
||||
LogOutput io.Writer
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
type nomadFSM struct {
|
||||
logOutput io.Writer
|
||||
logger *log.Logger
|
||||
path string
|
||||
state *StateStore
|
||||
}
|
||||
|
||||
@@ -29,7 +28,7 @@ type nomadSnapshot struct {
|
||||
}
|
||||
|
||||
// NewFSMPath is used to construct a new FSM with a blank state
|
||||
func NewFSM(path string, logOutput io.Writer) (*nomadFSM, error) {
|
||||
func NewFSM(logOutput io.Writer) (*nomadFSM, error) {
|
||||
// Create a state store
|
||||
state, err := NewStateStore(logOutput)
|
||||
if err != nil {
|
||||
@@ -39,7 +38,6 @@ func NewFSM(path string, logOutput io.Writer) (*nomadFSM, error) {
|
||||
fsm := &nomadFSM{
|
||||
logOutput: logOutput,
|
||||
logger: log.New(logOutput, "", log.LstdFlags),
|
||||
path: path,
|
||||
state: state,
|
||||
}
|
||||
return fsm, nil
|
||||
|
||||
117
nomad/server.go
117
nomad/server.go
@@ -14,7 +14,6 @@ import (
|
||||
|
||||
const (
|
||||
raftState = "raft/"
|
||||
tmpStatePath = "tmp/"
|
||||
snapshotsRetained = 2
|
||||
|
||||
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
||||
@@ -34,6 +33,7 @@ type Server struct {
|
||||
raftLayer *RaftLayer
|
||||
raftPeers raft.PeerStore
|
||||
raftStore *raftboltdb.BoltStore
|
||||
raftInmem *raft.InmemStore
|
||||
raftTransport *raft.NetworkTransport
|
||||
|
||||
// fsm is the state machine used with Raft
|
||||
@@ -111,69 +111,84 @@ func (s *Server) Shutdown() error {
|
||||
// setupRaft is used to setup and initialize Raft
|
||||
func (s *Server) setupRaft() error {
|
||||
// If we are in bootstrap mode, enable a single node cluster
|
||||
if s.config.Bootstrap {
|
||||
if s.config.Bootstrap || s.config.DevMode {
|
||||
s.config.RaftConfig.EnableSingleNode = true
|
||||
}
|
||||
|
||||
// Create the base state path
|
||||
statePath := filepath.Join(s.config.DataDir, tmpStatePath)
|
||||
if err := os.RemoveAll(statePath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ensurePath(statePath, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the FSM
|
||||
var err error
|
||||
s.fsm, err = NewFSM(statePath, s.config.LogOutput)
|
||||
s.fsm, err = NewFSM(s.config.LogOutput)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the base raft path
|
||||
path := filepath.Join(s.config.DataDir, raftState)
|
||||
if err := ensurePath(path, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the backend raft store for logs and stable storage
|
||||
store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.raftStore = store
|
||||
|
||||
// Wrap the store in a LogCache to improve performance
|
||||
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the snapshot store
|
||||
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a transport layer
|
||||
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
|
||||
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second,
|
||||
s.config.LogOutput)
|
||||
s.raftTransport = trans
|
||||
|
||||
// Setup the peer store
|
||||
s.raftPeers = raft.NewJSONPeers(path, trans)
|
||||
// Create the backend raft store for logs and stable storage
|
||||
var log raft.LogStore
|
||||
var stable raft.StableStore
|
||||
var snap raft.SnapshotStore
|
||||
var peers raft.PeerStore
|
||||
if s.config.DevMode {
|
||||
store := raft.NewInmemStore()
|
||||
s.raftInmem = store
|
||||
stable = store
|
||||
log = store
|
||||
snap = raft.NewDiscardSnapshotStore()
|
||||
peers = &raft.StaticPeers{}
|
||||
|
||||
// Ensure local host is always included if we are in bootstrap mode
|
||||
if s.config.Bootstrap {
|
||||
peers, err := s.raftPeers.Peers()
|
||||
} else {
|
||||
// Create the base raft path
|
||||
path := filepath.Join(s.config.DataDir, raftState)
|
||||
if err := ensurePath(path, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the BoltDB backend
|
||||
store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.raftStore = store
|
||||
stable = store
|
||||
|
||||
// Wrap the store in a LogCache to improve performance
|
||||
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
return err
|
||||
}
|
||||
if !raft.PeerContained(peers, trans.LocalAddr()) {
|
||||
s.raftPeers.SetPeers(raft.AddUniquePeer(peers, trans.LocalAddr()))
|
||||
log = cacheStore
|
||||
|
||||
// Create the snapshot store
|
||||
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
|
||||
if err != nil {
|
||||
if s.raftStore != nil {
|
||||
s.raftStore.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
snap = snapshots
|
||||
|
||||
// Setup the peer store
|
||||
s.raftPeers = raft.NewJSONPeers(path, trans)
|
||||
peers = s.raftPeers
|
||||
}
|
||||
|
||||
// Ensure local host is always included if we are in bootstrap mode
|
||||
if s.config.RaftConfig.EnableSingleNode {
|
||||
p, err := peers.Peers()
|
||||
if err != nil {
|
||||
if s.raftStore != nil {
|
||||
s.raftStore.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
if !raft.PeerContained(p, trans.LocalAddr()) {
|
||||
peers.SetPeers(raft.AddUniquePeer(p, trans.LocalAddr()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,10 +196,12 @@ func (s *Server) setupRaft() error {
|
||||
s.config.RaftConfig.LogOutput = s.config.LogOutput
|
||||
|
||||
// Setup the Raft store
|
||||
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, cacheStore, store,
|
||||
snapshots, s.raftPeers, trans)
|
||||
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable,
|
||||
snap, peers, trans)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
if s.raftStore != nil {
|
||||
s.raftStore.Close()
|
||||
}
|
||||
trans.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user