diff --git a/nomad/config.go b/nomad/config.go new file mode 100644 index 000000000..5e70f21c0 --- /dev/null +++ b/nomad/config.go @@ -0,0 +1,57 @@ +package nomad + +import ( + "fmt" + "io" + + "github.com/hashicorp/raft" +) + +// These are the protocol versions that Nomad can understand +const ( + ProtocolVersionMin uint8 = 1 + ProtocolVersionMax = 1 +) + +// Config is used to parameterize the server +type Config struct { + // Bootstrap mode is used to bring up the first Consul server. + // It is required so that it can elect a leader without any + // other nodes being present + Bootstrap bool + + // DataDir is the directory to store our state in + DataDir string + + // LogOutput is the location to write logs to. If this is not set, + // logs will go to stderr. + LogOutput io.Writer + + // ProtocolVersion is the protocol version to speak. This must be between + // ProtocolVersionMin and ProtocolVersionMax. + ProtocolVersion uint8 + + // RaftConfig is the configuration used for Raft in the local DC + RaftConfig *raft.Config +} + +// CheckVersion is used to check if the ProtocolVersion is valid +func (c *Config) CheckVersion() error { + if c.ProtocolVersion < ProtocolVersionMin { + return fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]", + c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) + } else if c.ProtocolVersion > ProtocolVersionMax { + return fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]", + c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) + } + return nil +} + +// DefaultConfig returns the default configuration +func DefaultConfig() *Config { + c := &Config{ + ProtocolVersion: ProtocolVersionMax, + RaftConfig: raft.DefaultConfig(), + } + return c +} diff --git a/nomad/fsm.go b/nomad/fsm.go new file mode 100644 index 000000000..52b6a4599 --- /dev/null +++ b/nomad/fsm.go @@ -0,0 +1,106 @@ +package nomad + +import ( + "fmt" + "io" + "log" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/raft" +) + +// nomadFSM implements a finite state machine that is used +// along with Raft to provide strong consistency. We implement +// this outside the Server to avoid exposing this outside the package. +type nomadFSM struct { + logOutput io.Writer + logger *log.Logger + path string + state *StateStore +} + +// nomadSnapshot is used to provide a snapshot of the current +// state in a way that can be accessed concurrently with operations +// that may modify the live state. +type nomadSnapshot struct { + state *StateSnapshot +} + +// NewFSMPath is used to construct a new FSM with a blank state +func NewFSM(path string, logOutput io.Writer) (*nomadFSM, error) { + // Create a state store + state, err := NewStateStore(logOutput) + if err != nil { + return nil, err + } + + fsm := &nomadFSM{ + logOutput: logOutput, + logger: log.New(logOutput, "", log.LstdFlags), + path: path, + state: state, + } + return fsm, nil +} + +// Close is used to cleanup resources associated with the FSM +func (n *nomadFSM) Close() error { + return n.state.Close() +} + +// State is used to return a handle to the current state +func (n *nomadFSM) State() *StateStore { + return n.state +} + +func (n *nomadFSM) Apply(log *raft.Log) interface{} { + buf := log.Data + msgType := structs.MessageType(buf[0]) + + // Check if this message type should be ignored when unknown. This is + // used so that new commands can be added with developer control if older + // versions can safely ignore the command, or if they should crash. + ignoreUnknown := false + if msgType&structs.IgnoreUnknownTypeFlag == structs.IgnoreUnknownTypeFlag { + msgType &= ^structs.IgnoreUnknownTypeFlag + ignoreUnknown = true + } + + switch msgType { + default: + if ignoreUnknown { + n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) + return nil + } else { + panic(fmt.Errorf("failed to apply request: %#v", buf)) + } + } +} +func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { + defer func(start time.Time) { + n.logger.Printf("[INFO] nomad.fsm: snapshot created in %v", time.Now().Sub(start)) + }(time.Now()) + + // Create a new snapshot + snap, err := n.state.Snapshot() + if err != nil { + return nil, err + } + return &nomadSnapshot{snap}, nil +} + +func (n *nomadFSM) Restore(old io.ReadCloser) error { + defer old.Close() + return nil +} + +func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { + defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now()) + return nil +} + +func (s *nomadSnapshot) Release() { + s.state.Close() +} diff --git a/nomad/leader.go b/nomad/leader.go new file mode 100644 index 000000000..e2176c263 --- /dev/null +++ b/nomad/leader.go @@ -0,0 +1,39 @@ +package nomad + +// monitorLeadership is used to monitor if we acquire or lose our role +// as the leader in the Raft cluster. There is some work the leader is +// expected to do, so we must react to changes +func (s *Server) monitorLeadership() { + leaderCh := s.raft.LeaderCh() + var stopCh chan struct{} + for { + select { + case isLeader := <-leaderCh: + if isLeader { + stopCh = make(chan struct{}) + go s.leaderLoop(stopCh) + s.logger.Printf("[INFO] nomad: cluster leadership acquired") + } else if stopCh != nil { + close(stopCh) + stopCh = nil + s.logger.Printf("[INFO] nomad: cluster leadership lost") + } + case <-s.shutdownCh: + return + } + } +} + +// leaderLoop runs as long as we are the leader to run various +// maintence activities +func (s *Server) leaderLoop(stopCh chan struct{}) { + // Wait until leadership is lost + for { + select { + case <-stopCh: + return + case <-s.shutdownCh: + return + } + } +} diff --git a/nomad/raft_rpc.go b/nomad/raft_rpc.go new file mode 100644 index 000000000..dfbc2235f --- /dev/null +++ b/nomad/raft_rpc.go @@ -0,0 +1,111 @@ +package nomad + +import ( + "fmt" + "net" + "sync" + "time" + + "github.com/hashicorp/consul/tlsutil" +) + +// RaftLayer implements the raft.StreamLayer interface, +// so that we can use a single RPC layer for Raft and Nomad +type RaftLayer struct { + // Addr is the listener address to return + addr net.Addr + + // connCh is used to accept connections + connCh chan net.Conn + + // TLS wrapper + tlsWrap tlsutil.Wrapper + + // Tracks if we are closed + closed bool + closeCh chan struct{} + closeLock sync.Mutex +} + +// NewRaftLayer is used to initialize a new RaftLayer which can +// be used as a StreamLayer for Raft. If a tlsConfig is provided, +// then the connection will use TLS. +func NewRaftLayer(addr net.Addr, tlsWrap tlsutil.Wrapper) *RaftLayer { + layer := &RaftLayer{ + addr: addr, + connCh: make(chan net.Conn), + tlsWrap: tlsWrap, + closeCh: make(chan struct{}), + } + return layer +} + +// Handoff is used to hand off a connection to the +// RaftLayer. This allows it to be Accept()'ed +func (l *RaftLayer) Handoff(c net.Conn) error { + select { + case l.connCh <- c: + return nil + case <-l.closeCh: + return fmt.Errorf("Raft RPC layer closed") + } +} + +// Accept is used to return connection which are +// dialed to be used with the Raft layer +func (l *RaftLayer) Accept() (net.Conn, error) { + select { + case conn := <-l.connCh: + return conn, nil + case <-l.closeCh: + return nil, fmt.Errorf("Raft RPC layer closed") + } +} + +// Close is used to stop listening for Raft connections +func (l *RaftLayer) Close() error { + l.closeLock.Lock() + defer l.closeLock.Unlock() + + if !l.closed { + l.closed = true + close(l.closeCh) + } + return nil +} + +// Addr is used to return the address of the listener +func (l *RaftLayer) Addr() net.Addr { + return l.addr +} + +// Dial is used to create a new outgoing connection +func (l *RaftLayer) Dial(address string, timeout time.Duration) (net.Conn, error) { + conn, err := net.DialTimeout("tcp", address, timeout) + if err != nil { + return nil, err + } + + // Check for tls mode + if l.tlsWrap != nil { + // Switch the connection into TLS mode + if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil { + conn.Close() + return nil, err + } + + // Wrap the connection in a TLS client + conn, err = l.tlsWrap(conn) + if err != nil { + return nil, err + } + } + + // Write the Raft byte to set the mode + _, err = conn.Write([]byte{byte(rpcRaft)}) + if err != nil { + conn.Close() + return nil, err + } + return conn, err +} diff --git a/nomad/rpc.go b/nomad/rpc.go new file mode 100644 index 000000000..9201738d6 --- /dev/null +++ b/nomad/rpc.go @@ -0,0 +1,10 @@ +package nomad + +type RPCType byte + +const ( + rpcNomad RPCType = iota + rpcRaft + rpcMultiplex + rpcTLS +) diff --git a/nomad/server.go b/nomad/server.go new file mode 100644 index 000000000..bf204ba76 --- /dev/null +++ b/nomad/server.go @@ -0,0 +1,195 @@ +package nomad + +import ( + "fmt" + "log" + "os" + "path/filepath" + "sync" + "time" + + "github.com/hashicorp/raft" + "github.com/hashicorp/raft-boltdb" +) + +const ( + raftState = "raft/" + tmpStatePath = "tmp/" + snapshotsRetained = 2 + + // raftLogCacheSize is the maximum number of logs to cache in-memory. + // This is used to reduce disk I/O for the recently commited entries. + raftLogCacheSize = 512 +) + +// Server is Nomad server which manages the job queues, +// schedulers, and notification bus for agents. +type Server struct { + config *Config + logger *log.Logger + + // The raft instance is used among Consul nodes within the + // DC to protect operations that require strong consistency + raft *raft.Raft + raftLayer *RaftLayer + raftPeers raft.PeerStore + raftStore *raftboltdb.BoltStore + raftTransport *raft.NetworkTransport + + // fsm is the state machine used with Raft + fsm *nomadFSM + + shutdown bool + shutdownCh chan struct{} + shutdownLock sync.Mutex +} + +// NewServer is used to construct a new Nomad server from the +// configuration, potentially returning an error +func NewServer(config *Config) (*Server, error) { + // Check the protocol version + if err := config.CheckVersion(); err != nil { + return nil, err + } + + // Ensure we have a log output + if config.LogOutput == nil { + config.LogOutput = os.Stderr + } + + // Create a logger + logger := log.New(config.LogOutput, "", log.LstdFlags) + + // Create the server + s := &Server{ + config: config, + logger: logger, + shutdownCh: make(chan struct{}), + } + + // Initialize the Raft server + if err := s.setupRaft(); err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to start Raft: %v", err) + } + + // Done + return s, nil +} + +// Shutdown is used to shutdown the server +func (s *Server) Shutdown() error { + s.logger.Printf("[INFO] nomad: shutting down server") + s.shutdownLock.Lock() + defer s.shutdownLock.Unlock() + + if s.shutdown { + return nil + } + + s.shutdown = true + close(s.shutdownCh) + + if s.raft != nil { + s.raftTransport.Close() + s.raftLayer.Close() + future := s.raft.Shutdown() + if err := future.Error(); err != nil { + s.logger.Printf("[WARN] nomad: Error shutting down raft: %s", err) + } + s.raftStore.Close() + } + + // Close the fsm + if s.fsm != nil { + s.fsm.Close() + } + + return nil +} + +// setupRaft is used to setup and initialize Raft +func (s *Server) setupRaft() error { + // If we are in bootstrap mode, enable a single node cluster + if s.config.Bootstrap { + s.config.RaftConfig.EnableSingleNode = true + } + + // Create the base state path + statePath := filepath.Join(s.config.DataDir, tmpStatePath) + if err := os.RemoveAll(statePath); err != nil { + return err + } + if err := ensurePath(statePath, true); err != nil { + return err + } + + // Create the FSM + var err error + s.fsm, err = NewFSM(statePath, s.config.LogOutput) + if err != nil { + return err + } + + // Create the base raft path + path := filepath.Join(s.config.DataDir, raftState) + if err := ensurePath(path, true); err != nil { + return err + } + + // Create the backend raft store for logs and stable storage + store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db")) + if err != nil { + return err + } + s.raftStore = store + + // Wrap the store in a LogCache to improve performance + cacheStore, err := raft.NewLogCache(raftLogCacheSize, store) + if err != nil { + store.Close() + return err + } + + // Create the snapshot store + snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput) + if err != nil { + store.Close() + return err + } + + // Create a transport layer + trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput) + s.raftTransport = trans + + // Setup the peer store + s.raftPeers = raft.NewJSONPeers(path, trans) + + // Ensure local host is always included if we are in bootstrap mode + if s.config.Bootstrap { + peers, err := s.raftPeers.Peers() + if err != nil { + store.Close() + return err + } + if !raft.PeerContained(peers, trans.LocalAddr()) { + s.raftPeers.SetPeers(raft.AddUniquePeer(peers, trans.LocalAddr())) + } + } + + // Make sure we set the LogOutput + s.config.RaftConfig.LogOutput = s.config.LogOutput + + // Setup the Raft store + s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, cacheStore, store, + snapshots, s.raftPeers, trans) + if err != nil { + store.Close() + trans.Close() + return err + } + + // Start monitoring leadership + go s.monitorLeadership() + return nil +} diff --git a/nomad/state_store.go b/nomad/state_store.go new file mode 100644 index 000000000..b92af33cb --- /dev/null +++ b/nomad/state_store.go @@ -0,0 +1,43 @@ +package nomad + +import ( + "io" + "log" +) + +// The StateStore is responsible for maintaining all the Consul +// state. It is manipulated by the FSM which maintains consistency +// through the use of Raft. The goals of the StateStore are to provide +// high concurrency for read operations without blocking writes, and +// to provide write availability in the face of reads. +type StateStore struct { + logger *log.Logger +} + +// StateSnapshot is used to provide a point-in-time snapshot +type StateSnapshot struct { + store *StateStore +} + +// Close is used to abort the transaction and allow for cleanup +func (s *StateSnapshot) Close() error { + return nil +} + +// NewStateStore is used to create a new state store +func NewStateStore(logOutput io.Writer) (*StateStore, error) { + s := &StateStore{ + logger: log.New(logOutput, "", log.LstdFlags), + } + return s, nil +} + +// Close is used to safely shutdown the state store +func (s *StateStore) Close() error { + return nil +} + +// Snapshot is used to create a point in time snapshot +func (s *StateStore) Snapshot() (*StateSnapshot, error) { + return nil, nil +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go new file mode 100644 index 000000000..fe467534c --- /dev/null +++ b/nomad/structs/structs.go @@ -0,0 +1,16 @@ +package structs + +type MessageType uint8 + +const ( + RegisterRequestType MessageType = iota +) + +const ( + // IgnoreUnknownTypeFlag is set along with a MessageType + // to indicate that the message type can be safely ignored + // if it is not recognized. This is for future proofing, so + // that new commands can be added in a way that won't cause + // old servers to crash when the FSM attempts to process them. + IgnoreUnknownTypeFlag MessageType = 128 +) diff --git a/nomad/util.go b/nomad/util.go new file mode 100644 index 000000000..5d6b8deea --- /dev/null +++ b/nomad/util.go @@ -0,0 +1,14 @@ +package nomad + +import ( + "os" + "path/filepath" +) + +// ensurePath is used to make sure a path exists +func ensurePath(path string, dir bool) error { + if !dir { + path = filepath.Dir(path) + } + return os.MkdirAll(path, 0755) +}