mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
Vendor v2 stage one
This commit is contained in:
4
vendor/github.com/hashicorp/raft/Makefile
generated
vendored
4
vendor/github.com/hashicorp/raft/Makefile
generated
vendored
@@ -1,10 +1,10 @@
|
||||
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
|
||||
|
||||
test:
|
||||
go test -timeout=30s ./...
|
||||
go test -timeout=60s ./...
|
||||
|
||||
integ: test
|
||||
INTEG_TESTS=yes go test -timeout=3s -run=Integ ./...
|
||||
INTEG_TESTS=yes go test -timeout=5s -run=Integ ./...
|
||||
|
||||
deps:
|
||||
go get -d -v ./...
|
||||
|
||||
1007
vendor/github.com/hashicorp/raft/api.go
generated
vendored
Normal file
1007
vendor/github.com/hashicorp/raft/api.go
generated
vendored
Normal file
File diff suppressed because it is too large
Load Diff
75
vendor/github.com/hashicorp/raft/commands.go
generated
vendored
75
vendor/github.com/hashicorp/raft/commands.go
generated
vendored
@@ -1,8 +1,25 @@
|
||||
package raft
|
||||
|
||||
// RPCHeader is a common sub-structure used to pass along protocol version and
|
||||
// other information about the cluster. For older Raft implementations before
|
||||
// versioning was added this will default to a zero-valued structure when read
|
||||
// by newer Raft versions.
|
||||
type RPCHeader struct {
|
||||
// ProtocolVersion is the version of the protocol the sender is
|
||||
// speaking.
|
||||
ProtocolVersion ProtocolVersion
|
||||
}
|
||||
|
||||
// WithRPCHeader is an interface that exposes the RPC header.
|
||||
type WithRPCHeader interface {
|
||||
GetRPCHeader() RPCHeader
|
||||
}
|
||||
|
||||
// AppendEntriesRequest is the command used to append entries to the
|
||||
// replicated log.
|
||||
type AppendEntriesRequest struct {
|
||||
RPCHeader
|
||||
|
||||
// Provide the current term and leader
|
||||
Term uint64
|
||||
Leader []byte
|
||||
@@ -18,9 +35,16 @@ type AppendEntriesRequest struct {
|
||||
LeaderCommitIndex uint64
|
||||
}
|
||||
|
||||
// See WithRPCHeader.
|
||||
func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader {
|
||||
return r.RPCHeader
|
||||
}
|
||||
|
||||
// AppendEntriesResponse is the response returned from an
|
||||
// AppendEntriesRequest.
|
||||
type AppendEntriesResponse struct {
|
||||
RPCHeader
|
||||
|
||||
// Newer term if leader is out of date
|
||||
Term uint64
|
||||
|
||||
@@ -35,9 +59,16 @@ type AppendEntriesResponse struct {
|
||||
NoRetryBackoff bool
|
||||
}
|
||||
|
||||
// See WithRPCHeader.
|
||||
func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader {
|
||||
return r.RPCHeader
|
||||
}
|
||||
|
||||
// RequestVoteRequest is the command used by a candidate to ask a Raft peer
|
||||
// for a vote in an election.
|
||||
type RequestVoteRequest struct {
|
||||
RPCHeader
|
||||
|
||||
// Provide the term and our id
|
||||
Term uint64
|
||||
Candidate []byte
|
||||
@@ -47,21 +78,38 @@ type RequestVoteRequest struct {
|
||||
LastLogTerm uint64
|
||||
}
|
||||
|
||||
// See WithRPCHeader.
|
||||
func (r *RequestVoteRequest) GetRPCHeader() RPCHeader {
|
||||
return r.RPCHeader
|
||||
}
|
||||
|
||||
// RequestVoteResponse is the response returned from a RequestVoteRequest.
|
||||
type RequestVoteResponse struct {
|
||||
// Newer term if leader is out of date
|
||||
RPCHeader
|
||||
|
||||
// Newer term if leader is out of date.
|
||||
Term uint64
|
||||
|
||||
// Return the peers, so that a node can shutdown on removal
|
||||
// Peers is deprecated, but required by servers that only understand
|
||||
// protocol version 0. This is not populated in protocol version 2
|
||||
// and later.
|
||||
Peers []byte
|
||||
|
||||
// Is the vote granted
|
||||
// Is the vote granted.
|
||||
Granted bool
|
||||
}
|
||||
|
||||
// See WithRPCHeader.
|
||||
func (r *RequestVoteResponse) GetRPCHeader() RPCHeader {
|
||||
return r.RPCHeader
|
||||
}
|
||||
|
||||
// InstallSnapshotRequest is the command sent to a Raft peer to bootstrap its
|
||||
// log (and state machine) from a snapshot on another peer.
|
||||
type InstallSnapshotRequest struct {
|
||||
RPCHeader
|
||||
SnapshotVersion SnapshotVersion
|
||||
|
||||
Term uint64
|
||||
Leader []byte
|
||||
|
||||
@@ -69,16 +117,35 @@ type InstallSnapshotRequest struct {
|
||||
LastLogIndex uint64
|
||||
LastLogTerm uint64
|
||||
|
||||
// Peer Set in the snapshot
|
||||
// Peer Set in the snapshot. This is deprecated in favor of Configuration
|
||||
// but remains here in case we receive an InstallSnapshot from a leader
|
||||
// that's running old code.
|
||||
Peers []byte
|
||||
|
||||
// Cluster membership.
|
||||
Configuration []byte
|
||||
// Log index where 'Configuration' entry was originally written.
|
||||
ConfigurationIndex uint64
|
||||
|
||||
// Size of the snapshot
|
||||
Size int64
|
||||
}
|
||||
|
||||
// See WithRPCHeader.
|
||||
func (r *InstallSnapshotRequest) GetRPCHeader() RPCHeader {
|
||||
return r.RPCHeader
|
||||
}
|
||||
|
||||
// InstallSnapshotResponse is the response returned from an
|
||||
// InstallSnapshotRequest.
|
||||
type InstallSnapshotResponse struct {
|
||||
RPCHeader
|
||||
|
||||
Term uint64
|
||||
Success bool
|
||||
}
|
||||
|
||||
// See WithRPCHeader.
|
||||
func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader {
|
||||
return r.RPCHeader
|
||||
}
|
||||
|
||||
101
vendor/github.com/hashicorp/raft/commitment.go
generated
vendored
Normal file
101
vendor/github.com/hashicorp/raft/commitment.go
generated
vendored
Normal file
@@ -0,0 +1,101 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Commitment is used to advance the leader's commit index. The leader and
|
||||
// replication goroutines report in newly written entries with Match(), and
|
||||
// this notifies on commitCh when the commit index has advanced.
|
||||
type commitment struct {
|
||||
// protectes matchIndexes and commitIndex
|
||||
sync.Mutex
|
||||
// notified when commitIndex increases
|
||||
commitCh chan struct{}
|
||||
// voter ID to log index: the server stores up through this log entry
|
||||
matchIndexes map[ServerID]uint64
|
||||
// a quorum stores up through this log entry. monotonically increases.
|
||||
commitIndex uint64
|
||||
// the first index of this leader's term: this needs to be replicated to a
|
||||
// majority of the cluster before this leader may mark anything committed
|
||||
// (per Raft's commitment rule)
|
||||
startIndex uint64
|
||||
}
|
||||
|
||||
// newCommitment returns an commitment struct that notifies the provided
|
||||
// channel when log entries have been committed. A new commitment struct is
|
||||
// created each time this server becomes leader for a particular term.
|
||||
// 'configuration' is the servers in the cluster.
|
||||
// 'startIndex' is the first index created in this term (see
|
||||
// its description above).
|
||||
func newCommitment(commitCh chan struct{}, configuration Configuration, startIndex uint64) *commitment {
|
||||
matchIndexes := make(map[ServerID]uint64)
|
||||
for _, server := range configuration.Servers {
|
||||
if server.Suffrage == Voter {
|
||||
matchIndexes[server.ID] = 0
|
||||
}
|
||||
}
|
||||
return &commitment{
|
||||
commitCh: commitCh,
|
||||
matchIndexes: matchIndexes,
|
||||
commitIndex: 0,
|
||||
startIndex: startIndex,
|
||||
}
|
||||
}
|
||||
|
||||
// Called when a new cluster membership configuration is created: it will be
|
||||
// used to determine commitment from now on. 'configuration' is the servers in
|
||||
// the cluster.
|
||||
func (c *commitment) setConfiguration(configuration Configuration) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
oldMatchIndexes := c.matchIndexes
|
||||
c.matchIndexes = make(map[ServerID]uint64)
|
||||
for _, server := range configuration.Servers {
|
||||
if server.Suffrage == Voter {
|
||||
c.matchIndexes[server.ID] = oldMatchIndexes[server.ID] // defaults to 0
|
||||
}
|
||||
}
|
||||
c.recalculate()
|
||||
}
|
||||
|
||||
// Called by leader after commitCh is notified
|
||||
func (c *commitment) getCommitIndex() uint64 {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
return c.commitIndex
|
||||
}
|
||||
|
||||
// Match is called once a server completes writing entries to disk: either the
|
||||
// leader has written the new entry or a follower has replied to an
|
||||
// AppendEntries RPC. The given server's disk agrees with this server's log up
|
||||
// through the given index.
|
||||
func (c *commitment) match(server ServerID, matchIndex uint64) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
|
||||
c.matchIndexes[server] = matchIndex
|
||||
c.recalculate()
|
||||
}
|
||||
}
|
||||
|
||||
// Internal helper to calculate new commitIndex from matchIndexes.
|
||||
// Must be called with lock held.
|
||||
func (c *commitment) recalculate() {
|
||||
if len(c.matchIndexes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
matched := make([]uint64, 0, len(c.matchIndexes))
|
||||
for _, idx := range c.matchIndexes {
|
||||
matched = append(matched, idx)
|
||||
}
|
||||
sort.Sort(uint64Slice(matched))
|
||||
quorumMatchIndex := matched[(len(matched)-1)/2]
|
||||
|
||||
if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex {
|
||||
c.commitIndex = quorumMatchIndex
|
||||
asyncNotifyCh(c.commitCh)
|
||||
}
|
||||
}
|
||||
184
vendor/github.com/hashicorp/raft/config.go
generated
vendored
184
vendor/github.com/hashicorp/raft/config.go
generated
vendored
@@ -7,18 +7,136 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Config provides any necessary configuration to
|
||||
// the Raft server
|
||||
// These are the versions of the protocol (which includes RPC messages as
|
||||
// well as Raft-specific log entries) that this server can _understand_. Use
|
||||
// the ProtocolVersion member of the Config object to control the version of
|
||||
// the protocol to use when _speaking_ to other servers. Note that depending on
|
||||
// the protocol version being spoken, some otherwise understood RPC messages
|
||||
// may be refused. See dispositionRPC for details of this logic.
|
||||
//
|
||||
// There are notes about the upgrade path in the description of the versions
|
||||
// below. If you are starting a fresh cluster then there's no reason not to
|
||||
// jump right to the latest protocol version. If you need to interoperate with
|
||||
// older, version 0 Raft servers you'll need to drive the cluster through the
|
||||
// different versions in order.
|
||||
//
|
||||
// The version details are complicated, but here's a summary of what's required
|
||||
// to get from a version 0 cluster to version 3:
|
||||
//
|
||||
// 1. In version N of your app that starts using the new Raft library with
|
||||
// versioning, set ProtocolVersion to 1.
|
||||
// 2. Make version N+1 of your app require version N as a prerequisite (all
|
||||
// servers must be upgraded). For version N+1 of your app set ProtocolVersion
|
||||
// to 2.
|
||||
// 3. Similarly, make version N+2 of your app require version N+1 as a
|
||||
// prerequisite. For version N+2 of your app, set ProtocolVersion to 3.
|
||||
//
|
||||
// During this upgrade, older cluster members will still have Server IDs equal
|
||||
// to their network addresses. To upgrade an older member and give it an ID, it
|
||||
// needs to leave the cluster and re-enter:
|
||||
//
|
||||
// 1. Remove the server from the cluster with RemoveServer, using its network
|
||||
// address as its ServerID.
|
||||
// 2. Update the server's config to a better ID (restarting the server).
|
||||
// 3. Add the server back to the cluster with AddVoter, using its new ID.
|
||||
//
|
||||
// You can do this during the rolling upgrade from N+1 to N+2 of your app, or
|
||||
// as a rolling change at any time after the upgrade.
|
||||
//
|
||||
// Version History
|
||||
//
|
||||
// 0: Original Raft library before versioning was added. Servers running this
|
||||
// version of the Raft library use AddPeerDeprecated/RemovePeerDeprecated
|
||||
// for all configuration changes, and have no support for LogConfiguration.
|
||||
// 1: First versioned protocol, used to interoperate with old servers, and begin
|
||||
// the migration path to newer versions of the protocol. Under this version
|
||||
// all configuration changes are propagated using the now-deprecated
|
||||
// RemovePeerDeprecated Raft log entry. This means that server IDs are always
|
||||
// set to be the same as the server addresses (since the old log entry type
|
||||
// cannot transmit an ID), and only AddPeer/RemovePeer APIs are supported.
|
||||
// Servers running this version of the protocol can understand the new
|
||||
// LogConfiguration Raft log entry but will never generate one so they can
|
||||
// remain compatible with version 0 Raft servers in the cluster.
|
||||
// 2: Transitional protocol used when migrating an existing cluster to the new
|
||||
// server ID system. Server IDs are still set to be the same as server
|
||||
// addresses, but all configuration changes are propagated using the new
|
||||
// LogConfiguration Raft log entry type, which can carry full ID information.
|
||||
// This version supports the old AddPeer/RemovePeer APIs as well as the new
|
||||
// ID-based AddVoter/RemoveServer APIs which should be used when adding
|
||||
// version 3 servers to the cluster later. This version sheds all
|
||||
// interoperability with version 0 servers, but can interoperate with newer
|
||||
// Raft servers running with protocol version 1 since they can understand the
|
||||
// new LogConfiguration Raft log entry, and this version can still understand
|
||||
// their RemovePeerDeprecated Raft log entries. We need this protocol version
|
||||
// as an intermediate step between 1 and 3 so that servers will propagate the
|
||||
// ID information that will come from newly-added (or -rolled) servers using
|
||||
// protocol version 3, but since they are still using their address-based IDs
|
||||
// from the previous step they will still be able to track commitments and
|
||||
// their own voting status properly. If we skipped this step, servers would
|
||||
// be started with their new IDs, but they wouldn't see themselves in the old
|
||||
// address-based configuration, so none of the servers would think they had a
|
||||
// vote.
|
||||
// 3: Protocol adding full support for server IDs and new ID-based server APIs
|
||||
// (AddVoter, AddNonvoter, etc.), old AddPeer/RemovePeer APIs are no longer
|
||||
// supported. Version 2 servers should be swapped out by removing them from
|
||||
// the cluster one-by-one and re-adding them with updated configuration for
|
||||
// this protocol version, along with their server ID. The remove/add cycle
|
||||
// is required to populate their server ID. Note that removing must be done
|
||||
// by ID, which will be the old server's address.
|
||||
type ProtocolVersion int
|
||||
|
||||
const (
|
||||
ProtocolVersionMin ProtocolVersion = 0
|
||||
ProtocolVersionMax = 3
|
||||
)
|
||||
|
||||
// These are versions of snapshots that this server can _understand_. Currently,
|
||||
// it is always assumed that this server generates the latest version, though
|
||||
// this may be changed in the future to include a configurable version.
|
||||
//
|
||||
// Version History
|
||||
//
|
||||
// 0: Original Raft library before versioning was added. The peers portion of
|
||||
// these snapshots is encoded in the legacy format which requires decodePeers
|
||||
// to parse. This version of snapshots should only be produced by the
|
||||
// unversioned Raft library.
|
||||
// 1: New format which adds support for a full configuration structure and its
|
||||
// associated log index, with support for server IDs and non-voting server
|
||||
// modes. To ease upgrades, this also includes the legacy peers structure but
|
||||
// that will never be used by servers that understand version 1 snapshots.
|
||||
// Since the original Raft library didn't enforce any versioning, we must
|
||||
// include the legacy peers structure for this version, but we can deprecate
|
||||
// it in the next snapshot version.
|
||||
type SnapshotVersion int
|
||||
|
||||
const (
|
||||
SnapshotVersionMin SnapshotVersion = 0
|
||||
SnapshotVersionMax = 1
|
||||
)
|
||||
|
||||
// Config provides any necessary configuration for the Raft server.
|
||||
type Config struct {
|
||||
// Time in follower state without a leader before we attempt an election.
|
||||
// ProtocolVersion allows a Raft server to inter-operate with older
|
||||
// Raft servers running an older version of the code. This is used to
|
||||
// version the wire protocol as well as Raft-specific log entries that
|
||||
// the server uses when _speaking_ to other servers. There is currently
|
||||
// no auto-negotiation of versions so all servers must be manually
|
||||
// configured with compatible versions. See ProtocolVersionMin and
|
||||
// ProtocolVersionMax for the versions of the protocol that this server
|
||||
// can _understand_.
|
||||
ProtocolVersion ProtocolVersion
|
||||
|
||||
// HeartbeatTimeout specifies the time in follower state without
|
||||
// a leader before we attempt an election.
|
||||
HeartbeatTimeout time.Duration
|
||||
|
||||
// Time in candidate state without a leader before we attempt an election.
|
||||
// ElectionTimeout specifies the time in candidate state without
|
||||
// a leader before we attempt an election.
|
||||
ElectionTimeout time.Duration
|
||||
|
||||
// Time without an Apply() operation before we heartbeat to ensure
|
||||
// a timely commit. Due to random staggering, may be delayed as much as
|
||||
// 2x this value.
|
||||
// CommitTimeout controls the time without an Apply() operation
|
||||
// before we heartbeat to ensure a timely commit. Due to random
|
||||
// staggering, may be delayed as much as 2x this value.
|
||||
CommitTimeout time.Duration
|
||||
|
||||
// MaxAppendEntries controls the maximum number of append entries
|
||||
@@ -33,13 +151,6 @@ type Config struct {
|
||||
// we can become a leader of a cluster containing only this node.
|
||||
ShutdownOnRemove bool
|
||||
|
||||
// DisableBootstrapAfterElect is used to turn off EnableSingleNode
|
||||
// after the node is elected. This is used to prevent self-election
|
||||
// if the node is removed from the Raft cluster via RemovePeer. Setting
|
||||
// it to false will keep the bootstrap mode, allowing the node to self-elect
|
||||
// and potentially bootstrap a separate cluster.
|
||||
DisableBootstrapAfterElect bool
|
||||
|
||||
// TrailingLogs controls how many logs we leave after a snapshot. This is
|
||||
// used so that we can quickly replay logs on a follower instead of being
|
||||
// forced to send an entire snapshot.
|
||||
@@ -55,11 +166,6 @@ type Config struct {
|
||||
// just replay a small set of logs.
|
||||
SnapshotThreshold uint64
|
||||
|
||||
// EnableSingleNode allows for a single node mode of operation. This
|
||||
// is false by default, which prevents a lone node from electing itself.
|
||||
// leader.
|
||||
EnableSingleNode bool
|
||||
|
||||
// LeaderLeaseTimeout is used to control how long the "lease" lasts
|
||||
// for being the leader without being able to contact a quorum
|
||||
// of nodes. If we reach this interval without contact, we will
|
||||
@@ -70,6 +176,11 @@ type Config struct {
|
||||
// never be used except for testing purposes, as it can cause a split-brain.
|
||||
StartAsLeader bool
|
||||
|
||||
// The unique ID for this server across all time. When running with
|
||||
// ProtocolVersion < 3, you must set this to be the same as the network
|
||||
// address of your transport.
|
||||
LocalID ServerID
|
||||
|
||||
// NotifyCh is used to provide a channel that will be notified of leadership
|
||||
// changes. Raft will block writing to this channel, so it should either be
|
||||
// buffered or aggressively consumed.
|
||||
@@ -87,22 +198,35 @@ type Config struct {
|
||||
// DefaultConfig returns a Config with usable defaults.
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
HeartbeatTimeout: 1000 * time.Millisecond,
|
||||
ElectionTimeout: 1000 * time.Millisecond,
|
||||
CommitTimeout: 50 * time.Millisecond,
|
||||
MaxAppendEntries: 64,
|
||||
ShutdownOnRemove: true,
|
||||
DisableBootstrapAfterElect: true,
|
||||
TrailingLogs: 10240,
|
||||
SnapshotInterval: 120 * time.Second,
|
||||
SnapshotThreshold: 8192,
|
||||
EnableSingleNode: false,
|
||||
LeaderLeaseTimeout: 500 * time.Millisecond,
|
||||
ProtocolVersion: ProtocolVersionMax,
|
||||
HeartbeatTimeout: 1000 * time.Millisecond,
|
||||
ElectionTimeout: 1000 * time.Millisecond,
|
||||
CommitTimeout: 50 * time.Millisecond,
|
||||
MaxAppendEntries: 64,
|
||||
ShutdownOnRemove: true,
|
||||
TrailingLogs: 10240,
|
||||
SnapshotInterval: 120 * time.Second,
|
||||
SnapshotThreshold: 8192,
|
||||
LeaderLeaseTimeout: 500 * time.Millisecond,
|
||||
}
|
||||
}
|
||||
|
||||
// ValidateConfig is used to validate a sane configuration
|
||||
func ValidateConfig(config *Config) error {
|
||||
// We don't actually support running as 0 in the library any more, but
|
||||
// we do understand it.
|
||||
protocolMin := ProtocolVersionMin
|
||||
if protocolMin == 0 {
|
||||
protocolMin = 1
|
||||
}
|
||||
if config.ProtocolVersion < protocolMin ||
|
||||
config.ProtocolVersion > ProtocolVersionMax {
|
||||
return fmt.Errorf("Protocol version %d must be >= %d and <= %d",
|
||||
config.ProtocolVersion, protocolMin, ProtocolVersionMax)
|
||||
}
|
||||
if len(config.LocalID) == 0 {
|
||||
return fmt.Errorf("LocalID cannot be empty")
|
||||
}
|
||||
if config.HeartbeatTimeout < 5*time.Millisecond {
|
||||
return fmt.Errorf("Heartbeat timeout is too low")
|
||||
}
|
||||
|
||||
343
vendor/github.com/hashicorp/raft/configuration.go
generated
vendored
Normal file
343
vendor/github.com/hashicorp/raft/configuration.go
generated
vendored
Normal file
@@ -0,0 +1,343 @@
|
||||
package raft
|
||||
|
||||
import "fmt"
|
||||
|
||||
// ServerSuffrage determines whether a Server in a Configuration gets a vote.
|
||||
type ServerSuffrage int
|
||||
|
||||
// Note: Don't renumber these, since the numbers are written into the log.
|
||||
const (
|
||||
// Voter is a server whose vote is counted in elections and whose match index
|
||||
// is used in advancing the leader's commit index.
|
||||
Voter ServerSuffrage = iota
|
||||
// Nonvoter is a server that receives log entries but is not considered for
|
||||
// elections or commitment purposes.
|
||||
Nonvoter
|
||||
// Staging is a server that acts like a nonvoter with one exception: once a
|
||||
// staging server receives enough log entries to be sufficiently caught up to
|
||||
// the leader's log, the leader will invoke a membership change to change
|
||||
// the Staging server to a Voter.
|
||||
Staging
|
||||
)
|
||||
|
||||
func (s ServerSuffrage) String() string {
|
||||
switch s {
|
||||
case Voter:
|
||||
return "Voter"
|
||||
case Nonvoter:
|
||||
return "Nonvoter"
|
||||
case Staging:
|
||||
return "Staging"
|
||||
}
|
||||
return "ServerSuffrage"
|
||||
}
|
||||
|
||||
// ServerID is a unique string identifying a server for all time.
|
||||
type ServerID string
|
||||
|
||||
// ServerAddress is a network address for a server that a transport can contact.
|
||||
type ServerAddress string
|
||||
|
||||
// Server tracks the information about a single server in a configuration.
|
||||
type Server struct {
|
||||
// Suffrage determines whether the server gets a vote.
|
||||
Suffrage ServerSuffrage
|
||||
// ID is a unique string identifying this server for all time.
|
||||
ID ServerID
|
||||
// Address is its network address that a transport can contact.
|
||||
Address ServerAddress
|
||||
}
|
||||
|
||||
// Configuration tracks which servers are in the cluster, and whether they have
|
||||
// votes. This should include the local server, if it's a member of the cluster.
|
||||
// The servers are listed no particular order, but each should only appear once.
|
||||
// These entries are appended to the log during membership changes.
|
||||
type Configuration struct {
|
||||
Servers []Server
|
||||
}
|
||||
|
||||
// Clone makes a deep copy of a Configuration.
|
||||
func (c *Configuration) Clone() (copy Configuration) {
|
||||
copy.Servers = append(copy.Servers, c.Servers...)
|
||||
return
|
||||
}
|
||||
|
||||
// ConfigurationChangeCommand is the different ways to change the cluster
|
||||
// configuration.
|
||||
type ConfigurationChangeCommand uint8
|
||||
|
||||
const (
|
||||
// AddStaging makes a server Staging unless its Voter.
|
||||
AddStaging ConfigurationChangeCommand = iota
|
||||
// AddNonvoter makes a server Nonvoter unless its Staging or Voter.
|
||||
AddNonvoter
|
||||
// DemoteVoter makes a server Nonvoter unless its absent.
|
||||
DemoteVoter
|
||||
// RemoveServer removes a server entirely from the cluster membership.
|
||||
RemoveServer
|
||||
// Promote is created automatically by a leader; it turns a Staging server
|
||||
// into a Voter.
|
||||
Promote
|
||||
)
|
||||
|
||||
func (c ConfigurationChangeCommand) String() string {
|
||||
switch c {
|
||||
case AddStaging:
|
||||
return "AddStaging"
|
||||
case AddNonvoter:
|
||||
return "AddNonvoter"
|
||||
case DemoteVoter:
|
||||
return "DemoteVoter"
|
||||
case RemoveServer:
|
||||
return "RemoveServer"
|
||||
case Promote:
|
||||
return "Promote"
|
||||
}
|
||||
return "ConfigurationChangeCommand"
|
||||
}
|
||||
|
||||
// configurationChangeRequest describes a change that a leader would like to
|
||||
// make to its current configuration. It's used only within a single server
|
||||
// (never serialized into the log), as part of `configurationChangeFuture`.
|
||||
type configurationChangeRequest struct {
|
||||
command ConfigurationChangeCommand
|
||||
serverID ServerID
|
||||
serverAddress ServerAddress // only present for AddStaging, AddNonvoter
|
||||
// prevIndex, if nonzero, is the index of the only configuration upon which
|
||||
// this change may be applied; if another configuration entry has been
|
||||
// added in the meantime, this request will fail.
|
||||
prevIndex uint64
|
||||
}
|
||||
|
||||
// configurations is state tracked on every server about its Configurations.
|
||||
// Note that, per Diego's dissertation, there can be at most one uncommitted
|
||||
// configuration at a time (the next configuration may not be created until the
|
||||
// prior one has been committed).
|
||||
//
|
||||
// One downside to storing just two configurations is that if you try to take a
|
||||
// snahpsot when your state machine hasn't yet applied the committedIndex, we
|
||||
// have no record of the configuration that would logically fit into that
|
||||
// snapshot. We disallow snapshots in that case now. An alternative approach,
|
||||
// which LogCabin uses, is to track every configuration change in the
|
||||
// log.
|
||||
type configurations struct {
|
||||
// committed is the latest configuration in the log/snapshot that has been
|
||||
// committed (the one with the largest index).
|
||||
committed Configuration
|
||||
// committedIndex is the log index where 'committed' was written.
|
||||
committedIndex uint64
|
||||
// latest is the latest configuration in the log/snapshot (may be committed
|
||||
// or uncommitted)
|
||||
latest Configuration
|
||||
// latestIndex is the log index where 'latest' was written.
|
||||
latestIndex uint64
|
||||
}
|
||||
|
||||
// Clone makes a deep copy of a configurations object.
|
||||
func (c *configurations) Clone() (copy configurations) {
|
||||
copy.committed = c.committed.Clone()
|
||||
copy.committedIndex = c.committedIndex
|
||||
copy.latest = c.latest.Clone()
|
||||
copy.latestIndex = c.latestIndex
|
||||
return
|
||||
}
|
||||
|
||||
// hasVote returns true if the server identified by 'id' is a Voter in the
|
||||
// provided Configuration.
|
||||
func hasVote(configuration Configuration, id ServerID) bool {
|
||||
for _, server := range configuration.Servers {
|
||||
if server.ID == id {
|
||||
return server.Suffrage == Voter
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// checkConfiguration tests a cluster membership configuration for common
|
||||
// errors.
|
||||
func checkConfiguration(configuration Configuration) error {
|
||||
idSet := make(map[ServerID]bool)
|
||||
addressSet := make(map[ServerAddress]bool)
|
||||
var voters int
|
||||
for _, server := range configuration.Servers {
|
||||
if server.ID == "" {
|
||||
return fmt.Errorf("Empty ID in configuration: %v", configuration)
|
||||
}
|
||||
if server.Address == "" {
|
||||
return fmt.Errorf("Empty address in configuration: %v", server)
|
||||
}
|
||||
if idSet[server.ID] {
|
||||
return fmt.Errorf("Found duplicate ID in configuration: %v", server.ID)
|
||||
}
|
||||
idSet[server.ID] = true
|
||||
if addressSet[server.Address] {
|
||||
return fmt.Errorf("Found duplicate address in configuration: %v", server.Address)
|
||||
}
|
||||
addressSet[server.Address] = true
|
||||
if server.Suffrage == Voter {
|
||||
voters++
|
||||
}
|
||||
}
|
||||
if voters == 0 {
|
||||
return fmt.Errorf("Need at least one voter in configuration: %v", configuration)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// nextConfiguration generates a new Configuration from the current one and a
|
||||
// configuration change request. It's split from appendConfigurationEntry so
|
||||
// that it can be unit tested easily.
|
||||
func nextConfiguration(current Configuration, currentIndex uint64, change configurationChangeRequest) (Configuration, error) {
|
||||
if change.prevIndex > 0 && change.prevIndex != currentIndex {
|
||||
return Configuration{}, fmt.Errorf("Configuration changed since %v (latest is %v)", change.prevIndex, currentIndex)
|
||||
}
|
||||
|
||||
configuration := current.Clone()
|
||||
switch change.command {
|
||||
case AddStaging:
|
||||
// TODO: barf on new address?
|
||||
newServer := Server{
|
||||
// TODO: This should add the server as Staging, to be automatically
|
||||
// promoted to Voter later. However, the promoton to Voter is not yet
|
||||
// implemented, and doing so is not trivial with the way the leader loop
|
||||
// coordinates with the replication goroutines today. So, for now, the
|
||||
// server will have a vote right away, and the Promote case below is
|
||||
// unused.
|
||||
Suffrage: Voter,
|
||||
ID: change.serverID,
|
||||
Address: change.serverAddress,
|
||||
}
|
||||
found := false
|
||||
for i, server := range configuration.Servers {
|
||||
if server.ID == change.serverID {
|
||||
if server.Suffrage == Voter {
|
||||
configuration.Servers[i].Address = change.serverAddress
|
||||
} else {
|
||||
configuration.Servers[i] = newServer
|
||||
}
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
configuration.Servers = append(configuration.Servers, newServer)
|
||||
}
|
||||
case AddNonvoter:
|
||||
newServer := Server{
|
||||
Suffrage: Nonvoter,
|
||||
ID: change.serverID,
|
||||
Address: change.serverAddress,
|
||||
}
|
||||
found := false
|
||||
for i, server := range configuration.Servers {
|
||||
if server.ID == change.serverID {
|
||||
if server.Suffrage != Nonvoter {
|
||||
configuration.Servers[i].Address = change.serverAddress
|
||||
} else {
|
||||
configuration.Servers[i] = newServer
|
||||
}
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
configuration.Servers = append(configuration.Servers, newServer)
|
||||
}
|
||||
case DemoteVoter:
|
||||
for i, server := range configuration.Servers {
|
||||
if server.ID == change.serverID {
|
||||
configuration.Servers[i].Suffrage = Nonvoter
|
||||
break
|
||||
}
|
||||
}
|
||||
case RemoveServer:
|
||||
for i, server := range configuration.Servers {
|
||||
if server.ID == change.serverID {
|
||||
configuration.Servers = append(configuration.Servers[:i], configuration.Servers[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
case Promote:
|
||||
for i, server := range configuration.Servers {
|
||||
if server.ID == change.serverID && server.Suffrage == Staging {
|
||||
configuration.Servers[i].Suffrage = Voter
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we didn't do something bad like remove the last voter
|
||||
if err := checkConfiguration(configuration); err != nil {
|
||||
return Configuration{}, err
|
||||
}
|
||||
|
||||
return configuration, nil
|
||||
}
|
||||
|
||||
// encodePeers is used to serialize a Configuration into the old peers format.
|
||||
// This is here for backwards compatibility when operating with a mix of old
|
||||
// servers and should be removed once we deprecate support for protocol version 1.
|
||||
func encodePeers(configuration Configuration, trans Transport) []byte {
|
||||
// Gather up all the voters, other suffrage types are not supported by
|
||||
// this data format.
|
||||
var encPeers [][]byte
|
||||
for _, server := range configuration.Servers {
|
||||
if server.Suffrage == Voter {
|
||||
encPeers = append(encPeers, trans.EncodePeer(server.Address))
|
||||
}
|
||||
}
|
||||
|
||||
// Encode the entire array.
|
||||
buf, err := encodeMsgPack(encPeers)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to encode peers: %v", err))
|
||||
}
|
||||
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
// decodePeers is used to deserialize an old list of peers into a Configuration.
|
||||
// This is here for backwards compatibility with old log entries and snapshots;
|
||||
// it should be removed eventually.
|
||||
func decodePeers(buf []byte, trans Transport) Configuration {
|
||||
// Decode the buffer first.
|
||||
var encPeers [][]byte
|
||||
if err := decodeMsgPack(buf, &encPeers); err != nil {
|
||||
panic(fmt.Errorf("failed to decode peers: %v", err))
|
||||
}
|
||||
|
||||
// Deserialize each peer.
|
||||
var servers []Server
|
||||
for _, enc := range encPeers {
|
||||
p := trans.DecodePeer(enc)
|
||||
servers = append(servers, Server{
|
||||
Suffrage: Voter,
|
||||
ID: ServerID(p),
|
||||
Address: ServerAddress(p),
|
||||
})
|
||||
}
|
||||
|
||||
return Configuration{
|
||||
Servers: servers,
|
||||
}
|
||||
}
|
||||
|
||||
// encodeConfiguration serializes a Configuration using MsgPack, or panics on
|
||||
// errors.
|
||||
func encodeConfiguration(configuration Configuration) []byte {
|
||||
buf, err := encodeMsgPack(configuration)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to encode configuration: %v", err))
|
||||
}
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
// decodeConfiguration deserializes a Configuration using MsgPack, or panics on
|
||||
// errors.
|
||||
func decodeConfiguration(buf []byte) Configuration {
|
||||
var configuration Configuration
|
||||
if err := decodeMsgPack(buf, &configuration); err != nil {
|
||||
panic(fmt.Errorf("failed to decode configuration: %v", err))
|
||||
}
|
||||
return configuration
|
||||
}
|
||||
3
vendor/github.com/hashicorp/raft/discard_snapshot.go
generated
vendored
3
vendor/github.com/hashicorp/raft/discard_snapshot.go
generated
vendored
@@ -19,7 +19,8 @@ func NewDiscardSnapshotStore() *DiscardSnapshotStore {
|
||||
return &DiscardSnapshotStore{}
|
||||
}
|
||||
|
||||
func (d *DiscardSnapshotStore) Create(index, term uint64, peers []byte) (SnapshotSink, error) {
|
||||
func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64,
|
||||
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
|
||||
return &DiscardSnapshotSink{}, nil
|
||||
}
|
||||
|
||||
|
||||
40
vendor/github.com/hashicorp/raft/file_snapshot.go
generated
vendored
40
vendor/github.com/hashicorp/raft/file_snapshot.go
generated
vendored
@@ -119,8 +119,14 @@ func (f *FileSnapshotStore) testPermissions() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fh.Close()
|
||||
os.Remove(path)
|
||||
|
||||
if err = fh.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = os.Remove(path); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -132,7 +138,13 @@ func snapshotName(term, index uint64) string {
|
||||
}
|
||||
|
||||
// Create is used to start a new snapshot
|
||||
func (f *FileSnapshotStore) Create(index, term uint64, peers []byte) (SnapshotSink, error) {
|
||||
func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
|
||||
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
|
||||
// We only support version 1 snapshots at this time.
|
||||
if version != 1 {
|
||||
return nil, fmt.Errorf("unsupported snapshot version %d", version)
|
||||
}
|
||||
|
||||
// Create a new path
|
||||
name := snapshotName(term, index)
|
||||
path := filepath.Join(f.path, name+tmpSuffix)
|
||||
@@ -151,10 +163,13 @@ func (f *FileSnapshotStore) Create(index, term uint64, peers []byte) (SnapshotSi
|
||||
dir: path,
|
||||
meta: fileSnapshotMeta{
|
||||
SnapshotMeta: SnapshotMeta{
|
||||
ID: name,
|
||||
Index: index,
|
||||
Term: term,
|
||||
Peers: peers,
|
||||
Version: version,
|
||||
ID: name,
|
||||
Index: index,
|
||||
Term: term,
|
||||
Peers: encodePeers(configuration, trans),
|
||||
Configuration: configuration,
|
||||
ConfigurationIndex: configurationIndex,
|
||||
},
|
||||
CRC: nil,
|
||||
},
|
||||
@@ -236,6 +251,12 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Make sure we can understand this version.
|
||||
if meta.Version < SnapshotVersionMin || meta.Version > SnapshotVersionMax {
|
||||
f.logger.Printf("[WARN] snapshot: Snapshot version for %v not supported: %d", dirName, meta.Version)
|
||||
continue
|
||||
}
|
||||
|
||||
// Append, but only return up to the retain count
|
||||
snapMeta = append(snapMeta, meta)
|
||||
}
|
||||
@@ -380,7 +401,10 @@ func (s *FileSnapshotSink) Close() error {
|
||||
}
|
||||
|
||||
// Reap any old snapshots
|
||||
s.store.ReapSnapshots()
|
||||
if err := s.store.ReapSnapshots(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
99
vendor/github.com/hashicorp/raft/fsm.go
generated
vendored
99
vendor/github.com/hashicorp/raft/fsm.go
generated
vendored
@@ -1,13 +1,20 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
)
|
||||
|
||||
// FSM provides an interface that can be implemented by
|
||||
// clients to make use of the replicated log.
|
||||
type FSM interface {
|
||||
// Apply log is invoked once a log entry is committed.
|
||||
// It returns a value which will be made available in the
|
||||
// ApplyFuture returned by Raft.Apply method if that
|
||||
// method was called on the same Raft node as the FSM.
|
||||
Apply(*Log) interface{}
|
||||
|
||||
// Snapshot is used to support log compaction. This call should
|
||||
@@ -35,3 +42,95 @@ type FSMSnapshot interface {
|
||||
// Release is invoked when we are finished with the snapshot.
|
||||
Release()
|
||||
}
|
||||
|
||||
// runFSM is a long running goroutine responsible for applying logs
|
||||
// to the FSM. This is done async of other logs since we don't want
|
||||
// the FSM to block our internal operations.
|
||||
func (r *Raft) runFSM() {
|
||||
var lastIndex, lastTerm uint64
|
||||
|
||||
commit := func(req *commitTuple) {
|
||||
// Apply the log if a command
|
||||
var resp interface{}
|
||||
if req.log.Type == LogCommand {
|
||||
start := time.Now()
|
||||
resp = r.fsm.Apply(req.log)
|
||||
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
|
||||
}
|
||||
|
||||
// Update the indexes
|
||||
lastIndex = req.log.Index
|
||||
lastTerm = req.log.Term
|
||||
|
||||
// Invoke the future if given
|
||||
if req.future != nil {
|
||||
req.future.response = resp
|
||||
req.future.respond(nil)
|
||||
}
|
||||
}
|
||||
|
||||
restore := func(req *restoreFuture) {
|
||||
// Open the snapshot
|
||||
meta, source, err := r.snapshots.Open(req.ID)
|
||||
if err != nil {
|
||||
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
|
||||
return
|
||||
}
|
||||
|
||||
// Attempt to restore
|
||||
start := time.Now()
|
||||
if err := r.fsm.Restore(source); err != nil {
|
||||
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
|
||||
source.Close()
|
||||
return
|
||||
}
|
||||
source.Close()
|
||||
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
|
||||
|
||||
// Update the last index and term
|
||||
lastIndex = meta.Index
|
||||
lastTerm = meta.Term
|
||||
req.respond(nil)
|
||||
}
|
||||
|
||||
snapshot := func(req *reqSnapshotFuture) {
|
||||
// Is there something to snapshot?
|
||||
if lastIndex == 0 {
|
||||
req.respond(ErrNothingNewToSnapshot)
|
||||
return
|
||||
}
|
||||
|
||||
// Start a snapshot
|
||||
start := time.Now()
|
||||
snap, err := r.fsm.Snapshot()
|
||||
metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start)
|
||||
|
||||
// Respond to the request
|
||||
req.index = lastIndex
|
||||
req.term = lastTerm
|
||||
req.snapshot = snap
|
||||
req.respond(err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case ptr := <-r.fsmMutateCh:
|
||||
switch req := ptr.(type) {
|
||||
case *commitTuple:
|
||||
commit(req)
|
||||
|
||||
case *restoreFuture:
|
||||
restore(req)
|
||||
|
||||
default:
|
||||
panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr))
|
||||
}
|
||||
|
||||
case req := <-r.fsmSnapshotCh:
|
||||
snapshot(req)
|
||||
|
||||
case <-r.shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
131
vendor/github.com/hashicorp/raft/future.go
generated
vendored
131
vendor/github.com/hashicorp/raft/future.go
generated
vendored
@@ -1,22 +1,63 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Future is used to represent an action that may occur in the future.
|
||||
type Future interface {
|
||||
// Error blocks until the future arrives and then
|
||||
// returns the error status of the future.
|
||||
// This may be called any number of times - all
|
||||
// calls will return the same value.
|
||||
// Note that it is not OK to call this method
|
||||
// twice concurrently on the same Future instance.
|
||||
Error() error
|
||||
}
|
||||
|
||||
// ApplyFuture is used for Apply() and can returns the FSM response.
|
||||
type ApplyFuture interface {
|
||||
// IndexFuture is used for future actions that can result in a raft log entry
|
||||
// being created.
|
||||
type IndexFuture interface {
|
||||
Future
|
||||
Response() interface{}
|
||||
|
||||
// Index holds the index of the newly applied log entry.
|
||||
// This must not be called until after the Error method has returned.
|
||||
Index() uint64
|
||||
}
|
||||
|
||||
// ApplyFuture is used for Apply and can return the FSM response.
|
||||
type ApplyFuture interface {
|
||||
IndexFuture
|
||||
|
||||
// Response returns the FSM response as returned
|
||||
// by the FSM.Apply method. This must not be called
|
||||
// until after the Error method has returned.
|
||||
Response() interface{}
|
||||
}
|
||||
|
||||
// ConfigurationFuture is used for GetConfiguration and can return the
|
||||
// latest configuration in use by Raft.
|
||||
type ConfigurationFuture interface {
|
||||
IndexFuture
|
||||
|
||||
// Configuration contains the latest configuration. This must
|
||||
// not be called until after the Error method has returned.
|
||||
Configuration() Configuration
|
||||
}
|
||||
|
||||
// SnapshotFuture is used for waiting on a user-triggered snapshot to complete.
|
||||
type SnapshotFuture interface {
|
||||
Future
|
||||
|
||||
// Open is a function you can call to access the underlying snapshot and
|
||||
// its metadata. This must not be called until after the Error method
|
||||
// has returned.
|
||||
Open() (*SnapshotMeta, io.ReadCloser, error)
|
||||
}
|
||||
|
||||
// errorFuture is used to return a static error.
|
||||
type errorFuture struct {
|
||||
err error
|
||||
@@ -48,6 +89,9 @@ func (d *deferError) init() {
|
||||
|
||||
func (d *deferError) Error() error {
|
||||
if d.err != nil {
|
||||
// Note that when we've received a nil error, this
|
||||
// won't trigger, but the channel is closed after
|
||||
// send so we'll still return nil below.
|
||||
return d.err
|
||||
}
|
||||
if d.errCh == nil {
|
||||
@@ -69,12 +113,28 @@ func (d *deferError) respond(err error) {
|
||||
d.responded = true
|
||||
}
|
||||
|
||||
// There are several types of requests that cause a configuration entry to
|
||||
// be appended to the log. These are encoded here for leaderLoop() to process.
|
||||
// This is internal to a single server.
|
||||
type configurationChangeFuture struct {
|
||||
logFuture
|
||||
req configurationChangeRequest
|
||||
}
|
||||
|
||||
// bootstrapFuture is used to attempt a live bootstrap of the cluster. See the
|
||||
// Raft object's BootstrapCluster member function for more details.
|
||||
type bootstrapFuture struct {
|
||||
deferError
|
||||
|
||||
// configuration is the proposed bootstrap configuration to apply.
|
||||
configuration Configuration
|
||||
}
|
||||
|
||||
// logFuture is used to apply a log entry and waits until
|
||||
// the log is considered committed.
|
||||
type logFuture struct {
|
||||
deferError
|
||||
log Log
|
||||
policy quorumPolicy
|
||||
response interface{}
|
||||
dispatch time.Time
|
||||
}
|
||||
@@ -87,11 +147,6 @@ func (l *logFuture) Index() uint64 {
|
||||
return l.log.Index
|
||||
}
|
||||
|
||||
type peerFuture struct {
|
||||
deferError
|
||||
peers []string
|
||||
}
|
||||
|
||||
type shutdownFuture struct {
|
||||
raft *Raft
|
||||
}
|
||||
@@ -100,18 +155,48 @@ func (s *shutdownFuture) Error() error {
|
||||
if s.raft == nil {
|
||||
return nil
|
||||
}
|
||||
for s.raft.getRoutines() > 0 {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
s.raft.waitShutdown()
|
||||
if closeable, ok := s.raft.trans.(WithClose); ok {
|
||||
closeable.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// snapshotFuture is used for waiting on a snapshot to complete.
|
||||
type snapshotFuture struct {
|
||||
// userSnapshotFuture is used for waiting on a user-triggered snapshot to
|
||||
// complete.
|
||||
type userSnapshotFuture struct {
|
||||
deferError
|
||||
|
||||
// opener is a function used to open the snapshot. This is filled in
|
||||
// once the future returns with no error.
|
||||
opener func() (*SnapshotMeta, io.ReadCloser, error)
|
||||
}
|
||||
|
||||
// Open is a function you can call to access the underlying snapshot and its
|
||||
// metadata.
|
||||
func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
|
||||
if u.opener == nil {
|
||||
return nil, nil, fmt.Errorf("no snapshot available")
|
||||
} else {
|
||||
// Invalidate the opener so it can't get called multiple times,
|
||||
// which isn't generally safe.
|
||||
defer func() {
|
||||
u.opener = nil
|
||||
}()
|
||||
return u.opener()
|
||||
}
|
||||
}
|
||||
|
||||
// userRestoreFuture is used for waiting on a user-triggered restore of an
|
||||
// external snapshot to complete.
|
||||
type userRestoreFuture struct {
|
||||
deferError
|
||||
|
||||
// meta is the metadata that belongs with the snapshot.
|
||||
meta *SnapshotMeta
|
||||
|
||||
// reader is the interface to read the snapshot contents from.
|
||||
reader io.Reader
|
||||
}
|
||||
|
||||
// reqSnapshotFuture is used for requesting a snapshot start.
|
||||
@@ -122,7 +207,6 @@ type reqSnapshotFuture struct {
|
||||
// snapshot details provided by the FSM runner before responding
|
||||
index uint64
|
||||
term uint64
|
||||
peers []string
|
||||
snapshot FSMSnapshot
|
||||
}
|
||||
|
||||
@@ -143,6 +227,23 @@ type verifyFuture struct {
|
||||
voteLock sync.Mutex
|
||||
}
|
||||
|
||||
// configurationsFuture is used to retrieve the current configurations. This is
|
||||
// used to allow safe access to this information outside of the main thread.
|
||||
type configurationsFuture struct {
|
||||
deferError
|
||||
configurations configurations
|
||||
}
|
||||
|
||||
// Configuration returns the latest configuration in use by Raft.
|
||||
func (c *configurationsFuture) Configuration() Configuration {
|
||||
return c.configurations.latest
|
||||
}
|
||||
|
||||
// Index returns the index of the latest configuration in use by Raft.
|
||||
func (c *configurationsFuture) Index() uint64 {
|
||||
return c.configurations.latestIndex
|
||||
}
|
||||
|
||||
// vote is used to respond to a verifyFuture.
|
||||
// This may block when responding on the notifyCh.
|
||||
func (v *verifyFuture) vote(leader bool) {
|
||||
|
||||
213
vendor/github.com/hashicorp/raft/inflight.go
generated
vendored
213
vendor/github.com/hashicorp/raft/inflight.go
generated
vendored
@@ -1,213 +0,0 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// QuorumPolicy allows individual logFutures to have different
|
||||
// commitment rules while still using the inflight mechanism.
|
||||
type quorumPolicy interface {
|
||||
// Checks if a commit from a given peer is enough to
|
||||
// satisfy the commitment rules
|
||||
Commit() bool
|
||||
|
||||
// Checks if a commit is committed
|
||||
IsCommitted() bool
|
||||
}
|
||||
|
||||
// MajorityQuorum is used by Apply transactions and requires
|
||||
// a simple majority of nodes.
|
||||
type majorityQuorum struct {
|
||||
count int
|
||||
votesNeeded int
|
||||
}
|
||||
|
||||
func newMajorityQuorum(clusterSize int) *majorityQuorum {
|
||||
votesNeeded := (clusterSize / 2) + 1
|
||||
return &majorityQuorum{count: 0, votesNeeded: votesNeeded}
|
||||
}
|
||||
|
||||
func (m *majorityQuorum) Commit() bool {
|
||||
m.count++
|
||||
return m.count >= m.votesNeeded
|
||||
}
|
||||
|
||||
func (m *majorityQuorum) IsCommitted() bool {
|
||||
return m.count >= m.votesNeeded
|
||||
}
|
||||
|
||||
// Inflight is used to track operations that are still in-flight.
|
||||
type inflight struct {
|
||||
sync.Mutex
|
||||
committed *list.List
|
||||
commitCh chan struct{}
|
||||
minCommit uint64
|
||||
maxCommit uint64
|
||||
operations map[uint64]*logFuture
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// NewInflight returns an inflight struct that notifies
|
||||
// the provided channel when logs are finished committing.
|
||||
func newInflight(commitCh chan struct{}) *inflight {
|
||||
return &inflight{
|
||||
committed: list.New(),
|
||||
commitCh: commitCh,
|
||||
minCommit: 0,
|
||||
maxCommit: 0,
|
||||
operations: make(map[uint64]*logFuture),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start is used to mark a logFuture as being inflight. It
|
||||
// also commits the entry, as it is assumed the leader is
|
||||
// starting.
|
||||
func (i *inflight) Start(l *logFuture) {
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
i.start(l)
|
||||
}
|
||||
|
||||
// StartAll is used to mark a list of logFuture's as being
|
||||
// inflight. It also commits each entry as the leader is
|
||||
// assumed to be starting.
|
||||
func (i *inflight) StartAll(logs []*logFuture) {
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
for _, l := range logs {
|
||||
i.start(l)
|
||||
}
|
||||
}
|
||||
|
||||
// start is used to mark a single entry as inflight,
|
||||
// must be invoked with the lock held.
|
||||
func (i *inflight) start(l *logFuture) {
|
||||
idx := l.log.Index
|
||||
i.operations[idx] = l
|
||||
|
||||
if idx > i.maxCommit {
|
||||
i.maxCommit = idx
|
||||
}
|
||||
if i.minCommit == 0 {
|
||||
i.minCommit = idx
|
||||
}
|
||||
i.commit(idx)
|
||||
}
|
||||
|
||||
// Cancel is used to cancel all in-flight operations.
|
||||
// This is done when the leader steps down, and all futures
|
||||
// are sent the given error.
|
||||
func (i *inflight) Cancel(err error) {
|
||||
// Close the channel first to unblock any pending commits
|
||||
close(i.stopCh)
|
||||
|
||||
// Lock after close to avoid deadlock
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
|
||||
// Respond to all inflight operations
|
||||
for _, op := range i.operations {
|
||||
op.respond(err)
|
||||
}
|
||||
|
||||
// Clear all the committed but not processed
|
||||
for e := i.committed.Front(); e != nil; e = e.Next() {
|
||||
e.Value.(*logFuture).respond(err)
|
||||
}
|
||||
|
||||
// Clear the map
|
||||
i.operations = make(map[uint64]*logFuture)
|
||||
|
||||
// Clear the list of committed
|
||||
i.committed = list.New()
|
||||
|
||||
// Close the commmitCh
|
||||
close(i.commitCh)
|
||||
|
||||
// Reset indexes
|
||||
i.minCommit = 0
|
||||
i.maxCommit = 0
|
||||
}
|
||||
|
||||
// Committed returns all the committed operations in order.
|
||||
func (i *inflight) Committed() (l *list.List) {
|
||||
i.Lock()
|
||||
l, i.committed = i.committed, list.New()
|
||||
i.Unlock()
|
||||
return l
|
||||
}
|
||||
|
||||
// Commit is used by leader replication routines to indicate that
|
||||
// a follower was finished committing a log to disk.
|
||||
func (i *inflight) Commit(index uint64) {
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
i.commit(index)
|
||||
}
|
||||
|
||||
// CommitRange is used to commit a range of indexes inclusively.
|
||||
// It is optimized to avoid commits for indexes that are not tracked.
|
||||
func (i *inflight) CommitRange(minIndex, maxIndex uint64) {
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
|
||||
// Update the minimum index
|
||||
minIndex = max(i.minCommit, minIndex)
|
||||
|
||||
// Commit each index
|
||||
for idx := minIndex; idx <= maxIndex; idx++ {
|
||||
i.commit(idx)
|
||||
}
|
||||
}
|
||||
|
||||
// commit is used to commit a single index. Must be called with the lock held.
|
||||
func (i *inflight) commit(index uint64) {
|
||||
op, ok := i.operations[index]
|
||||
if !ok {
|
||||
// Ignore if not in the map, as it may be committed already
|
||||
return
|
||||
}
|
||||
|
||||
// Check if we've satisfied the commit
|
||||
if !op.policy.Commit() {
|
||||
return
|
||||
}
|
||||
|
||||
// Cannot commit if this is not the minimum inflight. This can happen
|
||||
// if the quorum size changes, meaning a previous commit requires a larger
|
||||
// quorum that this commit. We MUST block until the previous log is committed,
|
||||
// otherwise logs will be applied out of order.
|
||||
if index != i.minCommit {
|
||||
return
|
||||
}
|
||||
|
||||
NOTIFY:
|
||||
// Add the operation to the committed list
|
||||
i.committed.PushBack(op)
|
||||
|
||||
// Stop tracking since it is committed
|
||||
delete(i.operations, index)
|
||||
|
||||
// Update the indexes
|
||||
if index == i.maxCommit {
|
||||
i.minCommit = 0
|
||||
i.maxCommit = 0
|
||||
|
||||
} else {
|
||||
i.minCommit++
|
||||
}
|
||||
|
||||
// Check if the next in-flight operation is ready
|
||||
if i.minCommit != 0 {
|
||||
op = i.operations[i.minCommit]
|
||||
if op.policy.IsCommitted() {
|
||||
index = i.minCommit
|
||||
goto NOTIFY
|
||||
}
|
||||
}
|
||||
|
||||
// Async notify of ready operations
|
||||
asyncNotifyCh(i.commitCh)
|
||||
}
|
||||
106
vendor/github.com/hashicorp/raft/inmem_snapshot.go
generated
vendored
Normal file
106
vendor/github.com/hashicorp/raft/inmem_snapshot.go
generated
vendored
Normal file
@@ -0,0 +1,106 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// InmemSnapshotStore implements the SnapshotStore interface and
|
||||
// retains only the most recent snapshot
|
||||
type InmemSnapshotStore struct {
|
||||
latest *InmemSnapshotSink
|
||||
hasSnapshot bool
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
// InmemSnapshotSink implements SnapshotSink in memory
|
||||
type InmemSnapshotSink struct {
|
||||
meta SnapshotMeta
|
||||
contents *bytes.Buffer
|
||||
}
|
||||
|
||||
// NewInmemSnapshotStore creates a blank new InmemSnapshotStore
|
||||
func NewInmemSnapshotStore() *InmemSnapshotStore {
|
||||
return &InmemSnapshotStore{
|
||||
latest: &InmemSnapshotSink{
|
||||
contents: &bytes.Buffer{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Create replaces the stored snapshot with a new one using the given args
|
||||
func (m *InmemSnapshotStore) Create(version SnapshotVersion, index, term uint64,
|
||||
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
|
||||
// We only support version 1 snapshots at this time.
|
||||
if version != 1 {
|
||||
return nil, fmt.Errorf("unsupported snapshot version %d", version)
|
||||
}
|
||||
|
||||
name := snapshotName(term, index)
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
sink := &InmemSnapshotSink{
|
||||
meta: SnapshotMeta{
|
||||
Version: version,
|
||||
ID: name,
|
||||
Index: index,
|
||||
Term: term,
|
||||
Peers: encodePeers(configuration, trans),
|
||||
Configuration: configuration,
|
||||
ConfigurationIndex: configurationIndex,
|
||||
},
|
||||
contents: &bytes.Buffer{},
|
||||
}
|
||||
m.hasSnapshot = true
|
||||
m.latest = sink
|
||||
|
||||
return sink, nil
|
||||
}
|
||||
|
||||
// List returns the latest snapshot taken
|
||||
func (m *InmemSnapshotStore) List() ([]*SnapshotMeta, error) {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
if !m.hasSnapshot {
|
||||
return []*SnapshotMeta{}, nil
|
||||
}
|
||||
return []*SnapshotMeta{&m.latest.meta}, nil
|
||||
}
|
||||
|
||||
// Open wraps an io.ReadCloser around the snapshot contents
|
||||
func (m *InmemSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
if m.latest.meta.ID != id {
|
||||
return nil, nil, fmt.Errorf("[ERR] snapshot: failed to open snapshot id: %s", id)
|
||||
}
|
||||
|
||||
return &m.latest.meta, ioutil.NopCloser(m.latest.contents), nil
|
||||
}
|
||||
|
||||
// Write appends the given bytes to the snapshot contents
|
||||
func (s *InmemSnapshotSink) Write(p []byte) (n int, err error) {
|
||||
written, err := io.Copy(s.contents, bytes.NewReader(p))
|
||||
s.meta.Size += written
|
||||
return int(written), err
|
||||
}
|
||||
|
||||
// Close updates the Size and is otherwise a no-op
|
||||
func (s *InmemSnapshotSink) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *InmemSnapshotSink) ID() string {
|
||||
return s.meta.ID
|
||||
}
|
||||
|
||||
func (s *InmemSnapshotSink) Cancel() error {
|
||||
return nil
|
||||
}
|
||||
11
vendor/github.com/hashicorp/raft/inmem_store.go
generated
vendored
11
vendor/github.com/hashicorp/raft/inmem_store.go
generated
vendored
@@ -81,7 +81,16 @@ func (i *InmemStore) DeleteRange(min, max uint64) error {
|
||||
for j := min; j <= max; j++ {
|
||||
delete(i.logs, j)
|
||||
}
|
||||
i.lowIndex = max + 1
|
||||
if min <= i.lowIndex {
|
||||
i.lowIndex = max + 1
|
||||
}
|
||||
if max >= i.highIndex {
|
||||
i.highIndex = min - 1
|
||||
}
|
||||
if i.lowIndex > i.highIndex {
|
||||
i.lowIndex = 0
|
||||
i.highIndex = 0
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
48
vendor/github.com/hashicorp/raft/inmem_transport.go
generated
vendored
48
vendor/github.com/hashicorp/raft/inmem_transport.go
generated
vendored
@@ -9,15 +9,15 @@ import (
|
||||
|
||||
// NewInmemAddr returns a new in-memory addr with
|
||||
// a randomly generate UUID as the ID.
|
||||
func NewInmemAddr() string {
|
||||
return generateUUID()
|
||||
func NewInmemAddr() ServerAddress {
|
||||
return ServerAddress(generateUUID())
|
||||
}
|
||||
|
||||
// inmemPipeline is used to pipeline requests for the in-mem transport.
|
||||
type inmemPipeline struct {
|
||||
trans *InmemTransport
|
||||
peer *InmemTransport
|
||||
peerAddr string
|
||||
peerAddr ServerAddress
|
||||
|
||||
doneCh chan AppendFuture
|
||||
inprogressCh chan *inmemPipelineInflight
|
||||
@@ -37,22 +37,22 @@ type inmemPipelineInflight struct {
|
||||
type InmemTransport struct {
|
||||
sync.RWMutex
|
||||
consumerCh chan RPC
|
||||
localAddr string
|
||||
peers map[string]*InmemTransport
|
||||
localAddr ServerAddress
|
||||
peers map[ServerAddress]*InmemTransport
|
||||
pipelines []*inmemPipeline
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// NewInmemTransport is used to initialize a new transport
|
||||
// and generates a random local address if none is specified
|
||||
func NewInmemTransport(addr string) (string, *InmemTransport) {
|
||||
if addr == "" {
|
||||
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
|
||||
if string(addr) == "" {
|
||||
addr = NewInmemAddr()
|
||||
}
|
||||
trans := &InmemTransport{
|
||||
consumerCh: make(chan RPC, 16),
|
||||
localAddr: addr,
|
||||
peers: make(map[string]*InmemTransport),
|
||||
peers: make(map[ServerAddress]*InmemTransport),
|
||||
timeout: 50 * time.Millisecond,
|
||||
}
|
||||
return addr, trans
|
||||
@@ -69,13 +69,13 @@ func (i *InmemTransport) Consumer() <-chan RPC {
|
||||
}
|
||||
|
||||
// LocalAddr implements the Transport interface.
|
||||
func (i *InmemTransport) LocalAddr() string {
|
||||
func (i *InmemTransport) LocalAddr() ServerAddress {
|
||||
return i.localAddr
|
||||
}
|
||||
|
||||
// AppendEntriesPipeline returns an interface that can be used to pipeline
|
||||
// AppendEntries requests.
|
||||
func (i *InmemTransport) AppendEntriesPipeline(target string) (AppendPipeline, error) {
|
||||
func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) {
|
||||
i.RLock()
|
||||
peer, ok := i.peers[target]
|
||||
i.RUnlock()
|
||||
@@ -90,7 +90,7 @@ func (i *InmemTransport) AppendEntriesPipeline(target string) (AppendPipeline, e
|
||||
}
|
||||
|
||||
// AppendEntries implements the Transport interface.
|
||||
func (i *InmemTransport) AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
|
||||
func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
|
||||
rpcResp, err := i.makeRPC(target, args, nil, i.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -103,7 +103,7 @@ func (i *InmemTransport) AppendEntries(target string, args *AppendEntriesRequest
|
||||
}
|
||||
|
||||
// RequestVote implements the Transport interface.
|
||||
func (i *InmemTransport) RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error {
|
||||
func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error {
|
||||
rpcResp, err := i.makeRPC(target, args, nil, i.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -116,7 +116,7 @@ func (i *InmemTransport) RequestVote(target string, args *RequestVoteRequest, re
|
||||
}
|
||||
|
||||
// InstallSnapshot implements the Transport interface.
|
||||
func (i *InmemTransport) InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
|
||||
func (i *InmemTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
|
||||
rpcResp, err := i.makeRPC(target, args, data, 10*i.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -128,7 +128,7 @@ func (i *InmemTransport) InstallSnapshot(target string, args *InstallSnapshotReq
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *InmemTransport) makeRPC(target string, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) {
|
||||
func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) {
|
||||
i.RLock()
|
||||
peer, ok := i.peers[target]
|
||||
i.RUnlock()
|
||||
@@ -158,21 +158,19 @@ func (i *InmemTransport) makeRPC(target string, args interface{}, r io.Reader, t
|
||||
return
|
||||
}
|
||||
|
||||
// EncodePeer implements the Transport interface. It uses the UUID as the
|
||||
// address directly.
|
||||
func (i *InmemTransport) EncodePeer(p string) []byte {
|
||||
// EncodePeer implements the Transport interface.
|
||||
func (i *InmemTransport) EncodePeer(p ServerAddress) []byte {
|
||||
return []byte(p)
|
||||
}
|
||||
|
||||
// DecodePeer implements the Transport interface. It wraps the UUID in an
|
||||
// InmemAddr.
|
||||
func (i *InmemTransport) DecodePeer(buf []byte) string {
|
||||
return string(buf)
|
||||
// DecodePeer implements the Transport interface.
|
||||
func (i *InmemTransport) DecodePeer(buf []byte) ServerAddress {
|
||||
return ServerAddress(buf)
|
||||
}
|
||||
|
||||
// Connect is used to connect this transport to another transport for
|
||||
// a given peer name. This allows for local routing.
|
||||
func (i *InmemTransport) Connect(peer string, t Transport) {
|
||||
func (i *InmemTransport) Connect(peer ServerAddress, t Transport) {
|
||||
trans := t.(*InmemTransport)
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
@@ -180,7 +178,7 @@ func (i *InmemTransport) Connect(peer string, t Transport) {
|
||||
}
|
||||
|
||||
// Disconnect is used to remove the ability to route to a given peer.
|
||||
func (i *InmemTransport) Disconnect(peer string) {
|
||||
func (i *InmemTransport) Disconnect(peer ServerAddress) {
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
delete(i.peers, peer)
|
||||
@@ -202,7 +200,7 @@ func (i *InmemTransport) Disconnect(peer string) {
|
||||
func (i *InmemTransport) DisconnectAll() {
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
i.peers = make(map[string]*InmemTransport)
|
||||
i.peers = make(map[ServerAddress]*InmemTransport)
|
||||
|
||||
// Handle pipelines
|
||||
for _, pipeline := range i.pipelines {
|
||||
@@ -217,7 +215,7 @@ func (i *InmemTransport) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newInmemPipeline(trans *InmemTransport, peer *InmemTransport, addr string) *inmemPipeline {
|
||||
func newInmemPipeline(trans *InmemTransport, peer *InmemTransport, addr ServerAddress) *inmemPipeline {
|
||||
i := &inmemPipeline{
|
||||
trans: trans,
|
||||
peer: peer,
|
||||
|
||||
44
vendor/github.com/hashicorp/raft/log.go
generated
vendored
44
vendor/github.com/hashicorp/raft/log.go
generated
vendored
@@ -10,11 +10,15 @@ const (
|
||||
// LogNoop is used to assert leadership.
|
||||
LogNoop
|
||||
|
||||
// LogAddPeer is used to add a new peer.
|
||||
LogAddPeer
|
||||
// LogAddPeer is used to add a new peer. This should only be used with
|
||||
// older protocol versions designed to be compatible with unversioned
|
||||
// Raft servers. See comments in config.go for details.
|
||||
LogAddPeerDeprecated
|
||||
|
||||
// LogRemovePeer is used to remove an existing peer.
|
||||
LogRemovePeer
|
||||
// LogRemovePeer is used to remove an existing peer. This should only be
|
||||
// used with older protocol versions designed to be compatible with
|
||||
// unversioned Raft servers. See comments in config.go for details.
|
||||
LogRemovePeerDeprecated
|
||||
|
||||
// LogBarrier is used to ensure all preceding operations have been
|
||||
// applied to the FSM. It is similar to LogNoop, but instead of returning
|
||||
@@ -22,39 +26,47 @@ const (
|
||||
// it is possible there are operations committed but not yet applied to
|
||||
// the FSM.
|
||||
LogBarrier
|
||||
|
||||
// LogConfiguration establishes a membership change configuration. It is
|
||||
// created when a server is added, removed, promoted, etc. Only used
|
||||
// when protocol version 1 or greater is in use.
|
||||
LogConfiguration
|
||||
)
|
||||
|
||||
// Log entries are replicated to all members of the Raft cluster
|
||||
// and form the heart of the replicated state machine.
|
||||
type Log struct {
|
||||
// Index holds the index of the log entry.
|
||||
Index uint64
|
||||
Term uint64
|
||||
Type LogType
|
||||
Data []byte
|
||||
|
||||
// peer is not exported since it is not transmitted, only used
|
||||
// internally to construct the Data field.
|
||||
peer string
|
||||
// Term holds the election term of the log entry.
|
||||
Term uint64
|
||||
|
||||
// Type holds the type of the log entry.
|
||||
Type LogType
|
||||
|
||||
// Data holds the log entry's type-specific data.
|
||||
Data []byte
|
||||
}
|
||||
|
||||
// LogStore is used to provide an interface for storing
|
||||
// and retrieving logs in a durable fashion.
|
||||
type LogStore interface {
|
||||
// Returns the first index written. 0 for no entries.
|
||||
// FirstIndex returns the first index written. 0 for no entries.
|
||||
FirstIndex() (uint64, error)
|
||||
|
||||
// Returns the last index written. 0 for no entries.
|
||||
// LastIndex returns the last index written. 0 for no entries.
|
||||
LastIndex() (uint64, error)
|
||||
|
||||
// Gets a log entry at a given index.
|
||||
// GetLog gets a log entry at a given index.
|
||||
GetLog(index uint64, log *Log) error
|
||||
|
||||
// Stores a log entry.
|
||||
// StoreLog stores a log entry.
|
||||
StoreLog(log *Log) error
|
||||
|
||||
// Stores multiple log entries.
|
||||
// StoreLogs stores multiple log entries.
|
||||
StoreLogs(logs []*Log) error
|
||||
|
||||
// Deletes a range of log entries. The range is inclusive.
|
||||
// DeleteRange deletes a range of log entries. The range is inclusive.
|
||||
DeleteRange(min, max uint64) error
|
||||
}
|
||||
|
||||
83
vendor/github.com/hashicorp/raft/membership.md
generated
vendored
Normal file
83
vendor/github.com/hashicorp/raft/membership.md
generated
vendored
Normal file
@@ -0,0 +1,83 @@
|
||||
Simon (@superfell) and I (@ongardie) talked through reworking this library's cluster membership changes last Friday. We don't see a way to split this into independent patches, so we're taking the next best approach: submitting the plan here for review, then working on an enormous PR. Your feedback would be appreciated. (@superfell is out this week, however, so don't expect him to respond quickly.)
|
||||
|
||||
These are the main goals:
|
||||
- Bringing things in line with the description in my PhD dissertation;
|
||||
- Catching up new servers prior to granting them a vote, as well as allowing permanent non-voting members; and
|
||||
- Eliminating the `peers.json` file, to avoid issues of consistency between that and the log/snapshot.
|
||||
|
||||
## Data-centric view
|
||||
|
||||
We propose to re-define a *configuration* as a set of servers, where each server includes an address (as it does today) and a mode that is either:
|
||||
- *Voter*: a server whose vote is counted in elections and whose match index is used in advancing the leader's commit index.
|
||||
- *Nonvoter*: a server that receives log entries but is not considered for elections or commitment purposes.
|
||||
- *Staging*: a server that acts like a nonvoter with one exception: once a staging server receives enough log entries to catch up sufficiently to the leader's log, the leader will invoke a membership change to change the staging server to a voter.
|
||||
|
||||
All changes to the configuration will be done by writing a new configuration to the log. The new configuration will be in affect as soon as it is appended to the log (not when it is committed like a normal state machine command). Note that, per my dissertation, there can be at most one uncommitted configuration at a time (the next configuration may not be created until the prior one has been committed). It's not strictly necessary to follow these same rules for the nonvoter/staging servers, but we think its best to treat all changes uniformly.
|
||||
|
||||
Each server will track two configurations:
|
||||
1. its *committed configuration*: the latest configuration in the log/snapshot that has been committed, along with its index.
|
||||
2. its *latest configuration*: the latest configuration in the log/snapshot (may be committed or uncommitted), along with its index.
|
||||
|
||||
When there's no membership change happening, these two will be the same. The latest configuration is almost always the one used, except:
|
||||
- When followers truncate the suffix of their logs, they may need to fall back to the committed configuration.
|
||||
- When snapshotting, the committed configuration is written, to correspond with the committed log prefix that is being snapshotted.
|
||||
|
||||
|
||||
## Application API
|
||||
|
||||
We propose the following operations for clients to manipulate the cluster configuration:
|
||||
- AddVoter: server becomes staging unless voter,
|
||||
- AddNonvoter: server becomes nonvoter unless staging or voter,
|
||||
- DemoteVoter: server becomes nonvoter unless absent,
|
||||
- RemovePeer: server removed from configuration,
|
||||
- GetConfiguration: waits for latest config to commit, returns committed config.
|
||||
|
||||
This diagram, of which I'm quite proud, shows the possible transitions:
|
||||
```
|
||||
+-----------------------------------------------------------------------------+
|
||||
| |
|
||||
| Start -> +--------+ |
|
||||
| ,------<------------| | |
|
||||
| / | absent | |
|
||||
| / RemovePeer--> | | <---RemovePeer |
|
||||
| / | +--------+ \ |
|
||||
| / | | \ |
|
||||
| AddNonvoter | AddVoter \ |
|
||||
| | ,->---' `--<-. | \ |
|
||||
| v / \ v \ |
|
||||
| +----------+ +----------+ +----------+ |
|
||||
| | | ---AddVoter--> | | -log caught up --> | | |
|
||||
| | nonvoter | | staging | | voter | |
|
||||
| | | <-DemoteVoter- | | ,- | | |
|
||||
| +----------+ \ +----------+ / +----------+ |
|
||||
| \ / |
|
||||
| `--------------<---------------' |
|
||||
| |
|
||||
+-----------------------------------------------------------------------------+
|
||||
```
|
||||
|
||||
While these operations aren't quite symmetric, we think they're a good set to capture
|
||||
the possible intent of the user. For example, if I want to make sure a server doesn't have a vote, but the server isn't part of the configuration at all, it probably shouldn't be added as a nonvoting server.
|
||||
|
||||
Each of these application-level operations will be interpreted by the leader and, if it has an effect, will cause the leader to write a new configuration entry to its log. Which particular application-level operation caused the log entry to be written need not be part of the log entry.
|
||||
|
||||
## Code implications
|
||||
|
||||
This is a non-exhaustive list, but we came up with a few things:
|
||||
- Remove the PeerStore: the `peers.json` file introduces the possibility of getting out of sync with the log and snapshot, and it's hard to maintain this atomically as the log changes. It's not clear whether it's meant to track the committed or latest configuration, either.
|
||||
- Servers will have to search their snapshot and log to find the committed configuration and the latest configuration on startup.
|
||||
- Bootstrap will no longer use `peers.json` but should initialize the log or snapshot with an application-provided configuration entry.
|
||||
- Snapshots should store the index of their configuration along with the configuration itself. In my experience with LogCabin, the original log index of the configuration is very useful to include in debug log messages.
|
||||
- As noted in hashicorp/raft#84, configuration change requests should come in via a separate channel, and one may not proceed until the last has been committed.
|
||||
- As to deciding when a log is sufficiently caught up, implementing a sophisticated algorithm *is* something that can be done in a separate PR. An easy and decent placeholder is: once the staging server has reached 95% of the leader's commit index, promote it.
|
||||
|
||||
## Feedback
|
||||
|
||||
Again, we're looking for feedback here before we start working on this. Here are some questions to think about:
|
||||
- Does this seem like where we want things to go?
|
||||
- Is there anything here that should be left out?
|
||||
- Is there anything else we're forgetting about?
|
||||
- Is there a good way to break this up?
|
||||
- What do we need to worry about in terms of backwards compatibility?
|
||||
- What implication will this have on current tests?
|
||||
- What's the best way to test this code, in particular the small changes that will be sprinkled all over the library?
|
||||
40
vendor/github.com/hashicorp/raft/net_transport.go
generated
vendored
40
vendor/github.com/hashicorp/raft/net_transport.go
generated
vendored
@@ -56,7 +56,7 @@ is not known if there is an error.
|
||||
|
||||
*/
|
||||
type NetworkTransport struct {
|
||||
connPool map[string][]*netConn
|
||||
connPool map[ServerAddress][]*netConn
|
||||
connPoolLock sync.Mutex
|
||||
|
||||
consumeCh chan RPC
|
||||
@@ -84,11 +84,11 @@ type StreamLayer interface {
|
||||
net.Listener
|
||||
|
||||
// Dial is used to create a new outgoing connection
|
||||
Dial(address string, timeout time.Duration) (net.Conn, error)
|
||||
Dial(address ServerAddress, timeout time.Duration) (net.Conn, error)
|
||||
}
|
||||
|
||||
type netConn struct {
|
||||
target string
|
||||
target ServerAddress
|
||||
conn net.Conn
|
||||
r *bufio.Reader
|
||||
w *bufio.Writer
|
||||
@@ -142,7 +142,7 @@ func NewNetworkTransportWithLogger(
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
}
|
||||
trans := &NetworkTransport{
|
||||
connPool: make(map[string][]*netConn),
|
||||
connPool: make(map[ServerAddress][]*netConn),
|
||||
consumeCh: make(chan RPC),
|
||||
logger: logger,
|
||||
maxPool: maxPool,
|
||||
@@ -183,8 +183,8 @@ func (n *NetworkTransport) Consumer() <-chan RPC {
|
||||
}
|
||||
|
||||
// LocalAddr implements the Transport interface.
|
||||
func (n *NetworkTransport) LocalAddr() string {
|
||||
return n.stream.Addr().String()
|
||||
func (n *NetworkTransport) LocalAddr() ServerAddress {
|
||||
return ServerAddress(n.stream.Addr().String())
|
||||
}
|
||||
|
||||
// IsShutdown is used to check if the transport is shutdown.
|
||||
@@ -198,7 +198,7 @@ func (n *NetworkTransport) IsShutdown() bool {
|
||||
}
|
||||
|
||||
// getExistingConn is used to grab a pooled connection.
|
||||
func (n *NetworkTransport) getPooledConn(target string) *netConn {
|
||||
func (n *NetworkTransport) getPooledConn(target ServerAddress) *netConn {
|
||||
n.connPoolLock.Lock()
|
||||
defer n.connPoolLock.Unlock()
|
||||
|
||||
@@ -215,7 +215,7 @@ func (n *NetworkTransport) getPooledConn(target string) *netConn {
|
||||
}
|
||||
|
||||
// getConn is used to get a connection from the pool.
|
||||
func (n *NetworkTransport) getConn(target string) (*netConn, error) {
|
||||
func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) {
|
||||
// Check for a pooled conn
|
||||
if conn := n.getPooledConn(target); conn != nil {
|
||||
return conn, nil
|
||||
@@ -260,7 +260,7 @@ func (n *NetworkTransport) returnConn(conn *netConn) {
|
||||
|
||||
// AppendEntriesPipeline returns an interface that can be used to pipeline
|
||||
// AppendEntries requests.
|
||||
func (n *NetworkTransport) AppendEntriesPipeline(target string) (AppendPipeline, error) {
|
||||
func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) {
|
||||
// Get a connection
|
||||
conn, err := n.getConn(target)
|
||||
if err != nil {
|
||||
@@ -272,17 +272,17 @@ func (n *NetworkTransport) AppendEntriesPipeline(target string) (AppendPipeline,
|
||||
}
|
||||
|
||||
// AppendEntries implements the Transport interface.
|
||||
func (n *NetworkTransport) AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
|
||||
func (n *NetworkTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
|
||||
return n.genericRPC(target, rpcAppendEntries, args, resp)
|
||||
}
|
||||
|
||||
// RequestVote implements the Transport interface.
|
||||
func (n *NetworkTransport) RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error {
|
||||
func (n *NetworkTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error {
|
||||
return n.genericRPC(target, rpcRequestVote, args, resp)
|
||||
}
|
||||
|
||||
// genericRPC handles a simple request/response RPC.
|
||||
func (n *NetworkTransport) genericRPC(target string, rpcType uint8, args interface{}, resp interface{}) error {
|
||||
func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error {
|
||||
// Get a conn
|
||||
conn, err := n.getConn(target)
|
||||
if err != nil {
|
||||
@@ -295,7 +295,7 @@ func (n *NetworkTransport) genericRPC(target string, rpcType uint8, args interfa
|
||||
}
|
||||
|
||||
// Send the RPC
|
||||
if err := sendRPC(conn, rpcType, args); err != nil {
|
||||
if err = sendRPC(conn, rpcType, args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -308,7 +308,7 @@ func (n *NetworkTransport) genericRPC(target string, rpcType uint8, args interfa
|
||||
}
|
||||
|
||||
// InstallSnapshot implements the Transport interface.
|
||||
func (n *NetworkTransport) InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
|
||||
func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
|
||||
// Get a conn, always close for InstallSnapshot
|
||||
conn, err := n.getConn(target)
|
||||
if err != nil {
|
||||
@@ -326,17 +326,17 @@ func (n *NetworkTransport) InstallSnapshot(target string, args *InstallSnapshotR
|
||||
}
|
||||
|
||||
// Send the RPC
|
||||
if err := sendRPC(conn, rpcInstallSnapshot, args); err != nil {
|
||||
if err = sendRPC(conn, rpcInstallSnapshot, args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Stream the state
|
||||
if _, err := io.Copy(conn.w, data); err != nil {
|
||||
if _, err = io.Copy(conn.w, data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Flush
|
||||
if err := conn.w.Flush(); err != nil {
|
||||
if err = conn.w.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -346,13 +346,13 @@ func (n *NetworkTransport) InstallSnapshot(target string, args *InstallSnapshotR
|
||||
}
|
||||
|
||||
// EncodePeer implements the Transport interface.
|
||||
func (n *NetworkTransport) EncodePeer(p string) []byte {
|
||||
func (n *NetworkTransport) EncodePeer(p ServerAddress) []byte {
|
||||
return []byte(p)
|
||||
}
|
||||
|
||||
// DecodePeer implements the Transport interface.
|
||||
func (n *NetworkTransport) DecodePeer(buf []byte) string {
|
||||
return string(buf)
|
||||
func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress {
|
||||
return ServerAddress(buf)
|
||||
}
|
||||
|
||||
// listen is used to handling incoming connections.
|
||||
|
||||
32
vendor/github.com/hashicorp/raft/observer.go
generated
vendored
32
vendor/github.com/hashicorp/raft/observer.go
generated
vendored
@@ -6,21 +6,20 @@ import (
|
||||
|
||||
// Observation is sent along the given channel to observers when an event occurs.
|
||||
type Observation struct {
|
||||
// Raft holds the Raft instance generating the observation.
|
||||
Raft *Raft
|
||||
// Data holds observation-specific data. Possible types are
|
||||
// *RequestVoteRequest and RaftState.
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
// LeaderObservation is used for the data when leadership changes.
|
||||
type LeaderObservation struct {
|
||||
leader string
|
||||
}
|
||||
|
||||
// nextObserverId is used to provide a unique ID for each observer to aid in
|
||||
// deregistration.
|
||||
var nextObserverId uint64
|
||||
var nextObserverID uint64
|
||||
|
||||
// FilterFn is a function that can be registered in order to filter observations
|
||||
// by returning false.
|
||||
// FilterFn is a function that can be registered in order to filter observations.
|
||||
// The function reports whether the observation should be included - if
|
||||
// it returns false, the observation will be filtered out.
|
||||
type FilterFn func(o *Observation) bool
|
||||
|
||||
// Observer describes what to do with a given observation.
|
||||
@@ -44,14 +43,19 @@ type Observer struct {
|
||||
numDropped uint64
|
||||
}
|
||||
|
||||
// Create a new observer with the specified channel, blocking behavior, and
|
||||
// filter (filter can be nil).
|
||||
// NewObserver creates a new observer that can be registered
|
||||
// to make observations on a Raft instance. Observations
|
||||
// will be sent on the given channel if they satisfy the
|
||||
// given filter.
|
||||
//
|
||||
// If blocking is true, the observer will block when it can't
|
||||
// send on the channel, otherwise it may discard events.
|
||||
func NewObserver(channel chan Observation, blocking bool, filter FilterFn) *Observer {
|
||||
return &Observer{
|
||||
channel: channel,
|
||||
blocking: blocking,
|
||||
filter: filter,
|
||||
id: atomic.AddUint64(&nextObserverId, 1),
|
||||
id: atomic.AddUint64(&nextObserverID, 1),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,21 +69,21 @@ func (or *Observer) GetNumDropped() uint64 {
|
||||
return atomic.LoadUint64(&or.numDropped)
|
||||
}
|
||||
|
||||
// Register a new observer.
|
||||
// RegisterObserver registers a new observer.
|
||||
func (r *Raft) RegisterObserver(or *Observer) {
|
||||
r.observersLock.Lock()
|
||||
defer r.observersLock.Unlock()
|
||||
r.observers[or.id] = or
|
||||
}
|
||||
|
||||
// Deregister an observer.
|
||||
// DeregisterObserver deregisters an observer.
|
||||
func (r *Raft) DeregisterObserver(or *Observer) {
|
||||
r.observersLock.Lock()
|
||||
defer r.observersLock.Unlock()
|
||||
delete(r.observers, or.id)
|
||||
}
|
||||
|
||||
// Send an observation to every observer.
|
||||
// observe sends an observation to every observer.
|
||||
func (r *Raft) observe(o interface{}) {
|
||||
// In general observers should not block. But in any case this isn't
|
||||
// disastrous as we only hold a read lock, which merely prevents
|
||||
|
||||
122
vendor/github.com/hashicorp/raft/peer.go
generated
vendored
122
vendor/github.com/hashicorp/raft/peer.go
generated
vendored
@@ -1,122 +0,0 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
jsonPeerPath = "peers.json"
|
||||
)
|
||||
|
||||
// PeerStore provides an interface for persistent storage and
|
||||
// retrieval of peers. We use a separate interface than StableStore
|
||||
// since the peers may need to be edited by a human operator. For example,
|
||||
// in a two node cluster, the failure of either node requires human intervention
|
||||
// since consensus is impossible.
|
||||
type PeerStore interface {
|
||||
// Peers returns the list of known peers.
|
||||
Peers() ([]string, error)
|
||||
|
||||
// SetPeers sets the list of known peers. This is invoked when a peer is
|
||||
// added or removed.
|
||||
SetPeers([]string) error
|
||||
}
|
||||
|
||||
// StaticPeers is used to provide a static list of peers.
|
||||
type StaticPeers struct {
|
||||
StaticPeers []string
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
// Peers implements the PeerStore interface.
|
||||
func (s *StaticPeers) Peers() ([]string, error) {
|
||||
s.l.Lock()
|
||||
peers := s.StaticPeers
|
||||
s.l.Unlock()
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
// SetPeers implements the PeerStore interface.
|
||||
func (s *StaticPeers) SetPeers(p []string) error {
|
||||
s.l.Lock()
|
||||
s.StaticPeers = p
|
||||
s.l.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// JSONPeers is used to provide peer persistence on disk in the form
|
||||
// of a JSON file. This allows human operators to manipulate the file.
|
||||
type JSONPeers struct {
|
||||
l sync.Mutex
|
||||
path string
|
||||
trans Transport
|
||||
}
|
||||
|
||||
// NewJSONPeers creates a new JSONPeers store. Requires a transport
|
||||
// to handle the serialization of network addresses.
|
||||
func NewJSONPeers(base string, trans Transport) *JSONPeers {
|
||||
path := filepath.Join(base, jsonPeerPath)
|
||||
store := &JSONPeers{
|
||||
path: path,
|
||||
trans: trans,
|
||||
}
|
||||
return store
|
||||
}
|
||||
|
||||
// Peers implements the PeerStore interface.
|
||||
func (j *JSONPeers) Peers() ([]string, error) {
|
||||
j.l.Lock()
|
||||
defer j.l.Unlock()
|
||||
|
||||
// Read the file
|
||||
buf, err := ioutil.ReadFile(j.path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check for no peers
|
||||
if len(buf) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Decode the peers
|
||||
var peerSet []string
|
||||
dec := json.NewDecoder(bytes.NewReader(buf))
|
||||
if err := dec.Decode(&peerSet); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Deserialize each peer
|
||||
var peers []string
|
||||
for _, p := range peerSet {
|
||||
peers = append(peers, j.trans.DecodePeer([]byte(p)))
|
||||
}
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
// SetPeers implements the PeerStore interface.
|
||||
func (j *JSONPeers) SetPeers(peers []string) error {
|
||||
j.l.Lock()
|
||||
defer j.l.Unlock()
|
||||
|
||||
// Encode each peer
|
||||
var peerSet []string
|
||||
for _, p := range peers {
|
||||
peerSet = append(peerSet, string(j.trans.EncodePeer(p)))
|
||||
}
|
||||
|
||||
// Convert to JSON
|
||||
var buf bytes.Buffer
|
||||
enc := json.NewEncoder(&buf)
|
||||
if err := enc.Encode(peerSet); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write out as JSON
|
||||
return ioutil.WriteFile(j.path, buf.Bytes(), 0755)
|
||||
}
|
||||
46
vendor/github.com/hashicorp/raft/peersjson.go
generated
vendored
Normal file
46
vendor/github.com/hashicorp/raft/peersjson.go
generated
vendored
Normal file
@@ -0,0 +1,46 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
// ReadPeersJSON consumes a legacy peers.json file in the format of the old JSON
|
||||
// peer store and creates a new-style configuration structure. This can be used
|
||||
// to migrate this data or perform manual recovery when running protocol versions
|
||||
// that can interoperate with older, unversioned Raft servers. This should not be
|
||||
// used once server IDs are in use, because the old peers.json file didn't have
|
||||
// support for these, nor non-voter suffrage types.
|
||||
func ReadPeersJSON(path string) (Configuration, error) {
|
||||
// Read in the file.
|
||||
buf, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return Configuration{}, err
|
||||
}
|
||||
|
||||
// Parse it as JSON.
|
||||
var peers []string
|
||||
dec := json.NewDecoder(bytes.NewReader(buf))
|
||||
if err := dec.Decode(&peers); err != nil {
|
||||
return Configuration{}, err
|
||||
}
|
||||
|
||||
// Map it into the new-style configuration structure. We can only specify
|
||||
// voter roles here, and the ID has to be the same as the address.
|
||||
var configuration Configuration
|
||||
for _, peer := range peers {
|
||||
server := Server{
|
||||
Suffrage: Voter,
|
||||
ID: ServerID(peer),
|
||||
Address: ServerAddress(peer),
|
||||
}
|
||||
configuration.Servers = append(configuration.Servers, server)
|
||||
}
|
||||
|
||||
// We should only ingest valid configurations.
|
||||
if err := checkConfiguration(configuration); err != nil {
|
||||
return Configuration{}, err
|
||||
}
|
||||
return configuration, nil
|
||||
}
|
||||
1578
vendor/github.com/hashicorp/raft/raft.go
generated
vendored
1578
vendor/github.com/hashicorp/raft/raft.go
generated
vendored
File diff suppressed because it is too large
Load Diff
129
vendor/github.com/hashicorp/raft/replication.go
generated
vendored
129
vendor/github.com/hashicorp/raft/replication.go
generated
vendored
@@ -24,32 +24,58 @@ var (
|
||||
ErrPipelineReplicationNotSupported = errors.New("pipeline replication not supported")
|
||||
)
|
||||
|
||||
// followerReplication is in charge of sending snapshots and log entries from
|
||||
// this leader during this particular term to a remote follower.
|
||||
type followerReplication struct {
|
||||
peer string
|
||||
inflight *inflight
|
||||
// peer contains the network address and ID of the remote follower.
|
||||
peer Server
|
||||
|
||||
stopCh chan uint64
|
||||
// commitment tracks the entries acknowledged by followers so that the
|
||||
// leader's commit index can advance. It is updated on successsful
|
||||
// AppendEntries responses.
|
||||
commitment *commitment
|
||||
|
||||
// stopCh is notified/closed when this leader steps down or the follower is
|
||||
// removed from the cluster. In the follower removed case, it carries a log
|
||||
// index; replication should be attempted with a best effort up through that
|
||||
// index, before exiting.
|
||||
stopCh chan uint64
|
||||
// triggerCh is notified every time new entries are appended to the log.
|
||||
triggerCh chan struct{}
|
||||
|
||||
// currentTerm is the term of this leader, to be included in AppendEntries
|
||||
// requests.
|
||||
currentTerm uint64
|
||||
matchIndex uint64
|
||||
nextIndex uint64
|
||||
// nextIndex is the index of the next log entry to send to the follower,
|
||||
// which may fall past the end of the log.
|
||||
nextIndex uint64
|
||||
|
||||
lastContact time.Time
|
||||
// lastContact is updated to the current time whenever any response is
|
||||
// received from the follower (successful or not). This is used to check
|
||||
// whether the leader should step down (Raft.checkLeaderLease()).
|
||||
lastContact time.Time
|
||||
// lastContactLock protects 'lastContact'.
|
||||
lastContactLock sync.RWMutex
|
||||
|
||||
// failures counts the number of failed RPCs since the last success, which is
|
||||
// used to apply backoff.
|
||||
failures uint64
|
||||
|
||||
notifyCh chan struct{}
|
||||
notify []*verifyFuture
|
||||
// notifyCh is notified to send out a heartbeat, which is used to check that
|
||||
// this server is still leader.
|
||||
notifyCh chan struct{}
|
||||
// notify is a list of futures to be resolved upon receipt of an
|
||||
// acknowledgement, then cleared from this list.
|
||||
notify []*verifyFuture
|
||||
// notifyLock protects 'notify'.
|
||||
notifyLock sync.Mutex
|
||||
|
||||
// stepDown is used to indicate to the leader that we
|
||||
// should step down based on information from a follower.
|
||||
stepDown chan struct{}
|
||||
|
||||
// allowPipeline is used to control it seems like
|
||||
// pipeline replication should be enabled.
|
||||
// allowPipeline is used to determine when to pipeline the AppendEntries RPCs.
|
||||
// It is private to this replication goroutine.
|
||||
allowPipeline bool
|
||||
}
|
||||
|
||||
@@ -83,8 +109,8 @@ func (s *followerReplication) setLastContact() {
|
||||
s.lastContactLock.Unlock()
|
||||
}
|
||||
|
||||
// replicate is a long running routine that is used to manage
|
||||
// the process of replicating logs to our followers.
|
||||
// replicate is a long running routine that replicates log entries to a single
|
||||
// follower.
|
||||
func (r *Raft) replicate(s *followerReplication) {
|
||||
// Start an async heartbeating routing
|
||||
stopHeartbeat := make(chan struct{})
|
||||
@@ -104,7 +130,7 @@ RPC:
|
||||
case <-s.triggerCh:
|
||||
lastLogIdx, _ := r.getLastLog()
|
||||
shouldStop = r.replicateTo(s, lastLogIdx)
|
||||
case <-randomTimeout(r.conf.CommitTimeout):
|
||||
case <-randomTimeout(r.conf.CommitTimeout): // TODO: what is this?
|
||||
lastLogIdx, _ := r.getLastLog()
|
||||
shouldStop = r.replicateTo(s, lastLogIdx)
|
||||
}
|
||||
@@ -131,7 +157,8 @@ PIPELINE:
|
||||
goto RPC
|
||||
}
|
||||
|
||||
// replicateTo is used to replicate the logs up to a given last index.
|
||||
// replicateTo is a hepler to replicate(), used to replicate the logs up to a
|
||||
// given last index.
|
||||
// If the follower log is behind, we take care to bring them up to date.
|
||||
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
|
||||
// Create the base request
|
||||
@@ -156,12 +183,12 @@ START:
|
||||
|
||||
// Make the RPC call
|
||||
start = time.Now()
|
||||
if err := r.trans.AppendEntries(s.peer, &req, &resp); err != nil {
|
||||
if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil {
|
||||
r.logger.Printf("[ERR] raft: Failed to AppendEntries to %v: %v", s.peer, err)
|
||||
s.failures++
|
||||
return
|
||||
}
|
||||
appendStats(s.peer, start, float32(len(req.Entries)))
|
||||
appendStats(string(s.peer.ID), start, float32(len(req.Entries)))
|
||||
|
||||
// Check for a newer term, stop running
|
||||
if resp.Term > req.Term {
|
||||
@@ -182,7 +209,6 @@ START:
|
||||
s.allowPipeline = true
|
||||
} else {
|
||||
s.nextIndex = max(min(s.nextIndex-1, resp.LastLog+1), 1)
|
||||
s.matchIndex = s.nextIndex - 1
|
||||
if resp.NoRetryBackoff {
|
||||
s.failures = 0
|
||||
} else {
|
||||
@@ -192,6 +218,17 @@ START:
|
||||
}
|
||||
|
||||
CHECK_MORE:
|
||||
// Poll the stop channel here in case we are looping and have been asked
|
||||
// to stop, or have stepped down as leader. Even for the best effort case
|
||||
// where we are asked to replicate to a given index and then shutdown,
|
||||
// it's better to not loop in here to send lots of entries to a straggler
|
||||
// that's leaving the cluster anyways.
|
||||
select {
|
||||
case <-s.stopCh:
|
||||
return true
|
||||
default:
|
||||
}
|
||||
|
||||
// Check if there are more logs to replicate
|
||||
if s.nextIndex <= lastIndex {
|
||||
goto START
|
||||
@@ -238,23 +275,27 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
|
||||
|
||||
// Setup the request
|
||||
req := InstallSnapshotRequest{
|
||||
Term: s.currentTerm,
|
||||
Leader: r.trans.EncodePeer(r.localAddr),
|
||||
LastLogIndex: meta.Index,
|
||||
LastLogTerm: meta.Term,
|
||||
Peers: meta.Peers,
|
||||
Size: meta.Size,
|
||||
RPCHeader: r.getRPCHeader(),
|
||||
SnapshotVersion: meta.Version,
|
||||
Term: s.currentTerm,
|
||||
Leader: r.trans.EncodePeer(r.localAddr),
|
||||
LastLogIndex: meta.Index,
|
||||
LastLogTerm: meta.Term,
|
||||
Peers: meta.Peers,
|
||||
Size: meta.Size,
|
||||
Configuration: encodeConfiguration(meta.Configuration),
|
||||
ConfigurationIndex: meta.ConfigurationIndex,
|
||||
}
|
||||
|
||||
// Make the call
|
||||
start := time.Now()
|
||||
var resp InstallSnapshotResponse
|
||||
if err := r.trans.InstallSnapshot(s.peer, &req, &resp, snapshot); err != nil {
|
||||
if err := r.trans.InstallSnapshot(s.peer.Address, &req, &resp, snapshot); err != nil {
|
||||
r.logger.Printf("[ERR] raft: Failed to install snapshot %v: %v", snapID, err)
|
||||
s.failures++
|
||||
return false, err
|
||||
}
|
||||
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", s.peer}, start)
|
||||
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(s.peer.ID)}, start)
|
||||
|
||||
// Check for a newer term, stop running
|
||||
if resp.Term > req.Term {
|
||||
@@ -267,12 +308,9 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
|
||||
|
||||
// Check for success
|
||||
if resp.Success {
|
||||
// Mark any inflight logs as committed
|
||||
s.inflight.CommitRange(s.matchIndex+1, meta.Index)
|
||||
|
||||
// Update the indexes
|
||||
s.matchIndex = meta.Index
|
||||
s.nextIndex = s.matchIndex + 1
|
||||
s.nextIndex = meta.Index + 1
|
||||
s.commitment.match(s.peer.ID, meta.Index)
|
||||
|
||||
// Clear any failures
|
||||
s.failures = 0
|
||||
@@ -292,8 +330,9 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
|
||||
func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
|
||||
var failures uint64
|
||||
req := AppendEntriesRequest{
|
||||
Term: s.currentTerm,
|
||||
Leader: r.trans.EncodePeer(r.localAddr),
|
||||
RPCHeader: r.getRPCHeader(),
|
||||
Term: s.currentTerm,
|
||||
Leader: r.trans.EncodePeer(r.localAddr),
|
||||
}
|
||||
var resp AppendEntriesResponse
|
||||
for {
|
||||
@@ -306,8 +345,8 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err := r.trans.AppendEntries(s.peer, &req, &resp); err != nil {
|
||||
r.logger.Printf("[ERR] raft: Failed to heartbeat to %v: %v", s.peer, err)
|
||||
if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil {
|
||||
r.logger.Printf("[ERR] raft: Failed to heartbeat to %v: %v", s.peer.Address, err)
|
||||
failures++
|
||||
select {
|
||||
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
|
||||
@@ -316,7 +355,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
|
||||
} else {
|
||||
s.setLastContact()
|
||||
failures = 0
|
||||
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", s.peer}, start)
|
||||
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
|
||||
s.notifyAll(resp.Success)
|
||||
}
|
||||
}
|
||||
@@ -328,7 +367,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
|
||||
// back to the standard replication which can handle more complex situations.
|
||||
func (r *Raft) pipelineReplicate(s *followerReplication) error {
|
||||
// Create a new pipeline
|
||||
pipeline, err := r.trans.AppendEntriesPipeline(s.peer)
|
||||
pipeline, err := r.trans.AppendEntriesPipeline(s.peer.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -355,6 +394,7 @@ SEND:
|
||||
case <-finishCh:
|
||||
break SEND
|
||||
case maxIndex := <-s.stopCh:
|
||||
// Make a best effort to replicate up to this index
|
||||
if maxIndex > 0 {
|
||||
r.pipelineSend(s, pipeline, &nextIndex, maxIndex)
|
||||
}
|
||||
@@ -377,7 +417,8 @@ SEND:
|
||||
return nil
|
||||
}
|
||||
|
||||
// pipelineSend is used to send data over a pipeline.
|
||||
// pipelineSend is used to send data over a pipeline. It is a helper to
|
||||
// pipelineReplicate.
|
||||
func (r *Raft) pipelineSend(s *followerReplication, p AppendPipeline, nextIdx *uint64, lastIndex uint64) (shouldStop bool) {
|
||||
// Create a new append request
|
||||
req := new(AppendEntriesRequest)
|
||||
@@ -407,7 +448,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,
|
||||
select {
|
||||
case ready := <-respCh:
|
||||
req, resp := ready.Request(), ready.Response()
|
||||
appendStats(s.peer, ready.Start(), float32(len(req.Entries)))
|
||||
appendStats(string(s.peer.ID), ready.Start(), float32(len(req.Entries)))
|
||||
|
||||
// Check for a newer term, stop running
|
||||
if resp.Term > req.Term {
|
||||
@@ -433,6 +474,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,
|
||||
|
||||
// setupAppendEntries is used to setup an append entries request.
|
||||
func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error {
|
||||
req.RPCHeader = r.getRPCHeader()
|
||||
req.Term = s.currentTerm
|
||||
req.Leader = r.trans.EncodePeer(r.localAddr)
|
||||
req.LeaderCommitIndex = r.getCommitIndex()
|
||||
@@ -503,18 +545,15 @@ func (r *Raft) handleStaleTerm(s *followerReplication) {
|
||||
asyncNotifyCh(s.stepDown)
|
||||
}
|
||||
|
||||
// updateLastAppended is used to update follower replication state after a successful
|
||||
// AppendEntries RPC.
|
||||
// updateLastAppended is used to update follower replication state after a
|
||||
// successful AppendEntries RPC.
|
||||
// TODO: This isn't used during InstallSnapshot, but the code there is similar.
|
||||
func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) {
|
||||
// Mark any inflight logs as committed
|
||||
if logs := req.Entries; len(logs) > 0 {
|
||||
first := logs[0]
|
||||
last := logs[len(logs)-1]
|
||||
s.inflight.CommitRange(first.Index, last.Index)
|
||||
|
||||
// Update the indexes
|
||||
s.matchIndex = last.Index
|
||||
s.nextIndex = last.Index + 1
|
||||
s.commitment.match(s.peer.ID, last.Index)
|
||||
}
|
||||
|
||||
// Notify still leader
|
||||
|
||||
211
vendor/github.com/hashicorp/raft/snapshot.go
generated
vendored
211
vendor/github.com/hashicorp/raft/snapshot.go
generated
vendored
@@ -1,26 +1,50 @@
|
||||
package raft
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
)
|
||||
|
||||
// SnapshotMeta is for metadata of a snapshot.
|
||||
type SnapshotMeta struct {
|
||||
ID string // ID is opaque to the store, and is used for opening
|
||||
// Version is the version number of the snapshot metadata. This does not cover
|
||||
// the application's data in the snapshot, that should be versioned
|
||||
// separately.
|
||||
Version SnapshotVersion
|
||||
|
||||
// ID is opaque to the store, and is used for opening.
|
||||
ID string
|
||||
|
||||
// Index and Term store when the snapshot was taken.
|
||||
Index uint64
|
||||
Term uint64
|
||||
|
||||
// Peers is deprecated and used to support version 0 snapshots, but will
|
||||
// be populated in version 1 snapshots as well to help with upgrades.
|
||||
Peers []byte
|
||||
Size int64
|
||||
|
||||
// Configuration and ConfigurationIndex are present in version 1
|
||||
// snapshots and later.
|
||||
Configuration Configuration
|
||||
ConfigurationIndex uint64
|
||||
|
||||
// Size is the size of the snapshot in bytes.
|
||||
Size int64
|
||||
}
|
||||
|
||||
// SnapshotStore interface is used to allow for flexible implementations
|
||||
// of snapshot storage and retrieval. For example, a client could implement
|
||||
// a shared state store such as S3, allowing new nodes to restore snapshots
|
||||
// without steaming from the leader.
|
||||
// without streaming from the leader.
|
||||
type SnapshotStore interface {
|
||||
// Create is used to begin a snapshot at a given index and term,
|
||||
// with the current peer set already encoded.
|
||||
Create(index, term uint64, peers []byte) (SnapshotSink, error)
|
||||
// Create is used to begin a snapshot at a given index and term, and with
|
||||
// the given committed configuration. The version parameter controls
|
||||
// which snapshot version to create.
|
||||
Create(version SnapshotVersion, index, term uint64, configuration Configuration,
|
||||
configurationIndex uint64, trans Transport) (SnapshotSink, error)
|
||||
|
||||
// List is used to list the available snapshots in the store.
|
||||
// It should return then in descending order, with the highest index first.
|
||||
@@ -38,3 +62,178 @@ type SnapshotSink interface {
|
||||
ID() string
|
||||
Cancel() error
|
||||
}
|
||||
|
||||
// runSnapshots is a long running goroutine used to manage taking
|
||||
// new snapshots of the FSM. It runs in parallel to the FSM and
|
||||
// main goroutines, so that snapshots do not block normal operation.
|
||||
func (r *Raft) runSnapshots() {
|
||||
for {
|
||||
select {
|
||||
case <-randomTimeout(r.conf.SnapshotInterval):
|
||||
// Check if we should snapshot
|
||||
if !r.shouldSnapshot() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Trigger a snapshot
|
||||
if _, err := r.takeSnapshot(); err != nil {
|
||||
r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err)
|
||||
}
|
||||
|
||||
case future := <-r.userSnapshotCh:
|
||||
// User-triggered, run immediately
|
||||
id, err := r.takeSnapshot()
|
||||
if err != nil {
|
||||
r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err)
|
||||
} else {
|
||||
future.opener = func() (*SnapshotMeta, io.ReadCloser, error) {
|
||||
return r.snapshots.Open(id)
|
||||
}
|
||||
}
|
||||
future.respond(err)
|
||||
|
||||
case <-r.shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// shouldSnapshot checks if we meet the conditions to take
|
||||
// a new snapshot.
|
||||
func (r *Raft) shouldSnapshot() bool {
|
||||
// Check the last snapshot index
|
||||
lastSnap, _ := r.getLastSnapshot()
|
||||
|
||||
// Check the last log index
|
||||
lastIdx, err := r.logs.LastIndex()
|
||||
if err != nil {
|
||||
r.logger.Printf("[ERR] raft: Failed to get last log index: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Compare the delta to the threshold
|
||||
delta := lastIdx - lastSnap
|
||||
return delta >= r.conf.SnapshotThreshold
|
||||
}
|
||||
|
||||
// takeSnapshot is used to take a new snapshot. This must only be called from
|
||||
// the snapshot thread, never the main thread. This returns the ID of the new
|
||||
// snapshot, along with an error.
|
||||
func (r *Raft) takeSnapshot() (string, error) {
|
||||
defer metrics.MeasureSince([]string{"raft", "snapshot", "takeSnapshot"}, time.Now())
|
||||
|
||||
// Create a request for the FSM to perform a snapshot.
|
||||
snapReq := &reqSnapshotFuture{}
|
||||
snapReq.init()
|
||||
|
||||
// Wait for dispatch or shutdown.
|
||||
select {
|
||||
case r.fsmSnapshotCh <- snapReq:
|
||||
case <-r.shutdownCh:
|
||||
return "", ErrRaftShutdown
|
||||
}
|
||||
|
||||
// Wait until we get a response
|
||||
if err := snapReq.Error(); err != nil {
|
||||
if err != ErrNothingNewToSnapshot {
|
||||
err = fmt.Errorf("failed to start snapshot: %v", err)
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
defer snapReq.snapshot.Release()
|
||||
|
||||
// Make a request for the configurations and extract the committed info.
|
||||
// We have to use the future here to safely get this information since
|
||||
// it is owned by the main thread.
|
||||
configReq := &configurationsFuture{}
|
||||
configReq.init()
|
||||
select {
|
||||
case r.configurationsCh <- configReq:
|
||||
case <-r.shutdownCh:
|
||||
return "", ErrRaftShutdown
|
||||
}
|
||||
if err := configReq.Error(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
committed := configReq.configurations.committed
|
||||
committedIndex := configReq.configurations.committedIndex
|
||||
|
||||
// We don't support snapshots while there's a config change outstanding
|
||||
// since the snapshot doesn't have a means to represent this state. This
|
||||
// is a little weird because we need the FSM to apply an index that's
|
||||
// past the configuration change, even though the FSM itself doesn't see
|
||||
// the configuration changes. It should be ok in practice with normal
|
||||
// application traffic flowing through the FSM. If there's none of that
|
||||
// then it's not crucial that we snapshot, since there's not much going
|
||||
// on Raft-wise.
|
||||
if snapReq.index < committedIndex {
|
||||
return "", fmt.Errorf("cannot take snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
|
||||
committedIndex, snapReq.index)
|
||||
}
|
||||
|
||||
// Create a new snapshot.
|
||||
r.logger.Printf("[INFO] raft: Starting snapshot up to %d", snapReq.index)
|
||||
start := time.Now()
|
||||
version := getSnapshotVersion(r.protocolVersion)
|
||||
sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create snapshot: %v", err)
|
||||
}
|
||||
metrics.MeasureSince([]string{"raft", "snapshot", "create"}, start)
|
||||
|
||||
// Try to persist the snapshot.
|
||||
start = time.Now()
|
||||
if err := snapReq.snapshot.Persist(sink); err != nil {
|
||||
sink.Cancel()
|
||||
return "", fmt.Errorf("failed to persist snapshot: %v", err)
|
||||
}
|
||||
metrics.MeasureSince([]string{"raft", "snapshot", "persist"}, start)
|
||||
|
||||
// Close and check for error.
|
||||
if err := sink.Close(); err != nil {
|
||||
return "", fmt.Errorf("failed to close snapshot: %v", err)
|
||||
}
|
||||
|
||||
// Update the last stable snapshot info.
|
||||
r.setLastSnapshot(snapReq.index, snapReq.term)
|
||||
|
||||
// Compact the logs.
|
||||
if err := r.compactLogs(snapReq.index); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
r.logger.Printf("[INFO] raft: Snapshot to %d complete", snapReq.index)
|
||||
return sink.ID(), nil
|
||||
}
|
||||
|
||||
// compactLogs takes the last inclusive index of a snapshot
|
||||
// and trims the logs that are no longer needed.
|
||||
func (r *Raft) compactLogs(snapIdx uint64) error {
|
||||
defer metrics.MeasureSince([]string{"raft", "compactLogs"}, time.Now())
|
||||
// Determine log ranges to compact
|
||||
minLog, err := r.logs.FirstIndex()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get first log index: %v", err)
|
||||
}
|
||||
|
||||
// Check if we have enough logs to truncate
|
||||
lastLogIdx, _ := r.getLastLog()
|
||||
if lastLogIdx <= r.conf.TrailingLogs {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Truncate up to the end of the snapshot, or `TrailingLogs`
|
||||
// back from the head, which ever is further back. This ensures
|
||||
// at least `TrailingLogs` entries, but does not allow logs
|
||||
// after the snapshot to be removed.
|
||||
maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs)
|
||||
|
||||
// Log this
|
||||
r.logger.Printf("[INFO] raft: Compacting logs from %d to %d", minLog, maxLog)
|
||||
|
||||
// Compact the logs
|
||||
if err := r.logs.DeleteRange(minLog, maxLog); err != nil {
|
||||
return fmt.Errorf("log compaction failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
24
vendor/github.com/hashicorp/raft/state.go
generated
vendored
24
vendor/github.com/hashicorp/raft/state.go
generated
vendored
@@ -62,8 +62,8 @@ type raftState struct {
|
||||
lastLogIndex uint64
|
||||
lastLogTerm uint64
|
||||
|
||||
// Tracks the number of live routines
|
||||
runningRoutines int32
|
||||
// Tracks running goroutines
|
||||
routinesGroup sync.WaitGroup
|
||||
|
||||
// The current state
|
||||
state RaftState
|
||||
@@ -133,28 +133,20 @@ func (r *raftState) setLastApplied(index uint64) {
|
||||
atomic.StoreUint64(&r.lastApplied, index)
|
||||
}
|
||||
|
||||
func (r *raftState) incrRoutines() {
|
||||
atomic.AddInt32(&r.runningRoutines, 1)
|
||||
}
|
||||
|
||||
func (r *raftState) decrRoutines() {
|
||||
atomic.AddInt32(&r.runningRoutines, -1)
|
||||
}
|
||||
|
||||
func (r *raftState) getRoutines() int32 {
|
||||
return atomic.LoadInt32(&r.runningRoutines)
|
||||
}
|
||||
|
||||
// Start a goroutine and properly handle the race between a routine
|
||||
// starting and incrementing, and exiting and decrementing.
|
||||
func (r *raftState) goFunc(f func()) {
|
||||
r.incrRoutines()
|
||||
r.routinesGroup.Add(1)
|
||||
go func() {
|
||||
defer r.decrRoutines()
|
||||
defer r.routinesGroup.Done()
|
||||
f()
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *raftState) waitShutdown() {
|
||||
r.routinesGroup.Wait()
|
||||
}
|
||||
|
||||
// getLastIndex returns the last index in stable storage.
|
||||
// Either from the last log or from the last snapshot.
|
||||
func (r *raftState) getLastIndex() uint64 {
|
||||
|
||||
4
vendor/github.com/hashicorp/raft/tcp_transport.go
generated
vendored
4
vendor/github.com/hashicorp/raft/tcp_transport.go
generated
vendored
@@ -81,8 +81,8 @@ func newTCPTransport(bindAddr string,
|
||||
}
|
||||
|
||||
// Dial implements the StreamLayer interface.
|
||||
func (t *TCPStreamLayer) Dial(address string, timeout time.Duration) (net.Conn, error) {
|
||||
return net.DialTimeout("tcp", address, timeout)
|
||||
func (t *TCPStreamLayer) Dial(address ServerAddress, timeout time.Duration) (net.Conn, error) {
|
||||
return net.DialTimeout("tcp", string(address), timeout)
|
||||
}
|
||||
|
||||
// Accept implements the net.Listener interface.
|
||||
|
||||
47
vendor/github.com/hashicorp/raft/transport.go
generated
vendored
47
vendor/github.com/hashicorp/raft/transport.go
generated
vendored
@@ -31,27 +31,27 @@ type Transport interface {
|
||||
Consumer() <-chan RPC
|
||||
|
||||
// LocalAddr is used to return our local address to distinguish from our peers.
|
||||
LocalAddr() string
|
||||
LocalAddr() ServerAddress
|
||||
|
||||
// AppendEntriesPipeline returns an interface that can be used to pipeline
|
||||
// AppendEntries requests.
|
||||
AppendEntriesPipeline(target string) (AppendPipeline, error)
|
||||
AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error)
|
||||
|
||||
// AppendEntries sends the appropriate RPC to the target node.
|
||||
AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
|
||||
AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
|
||||
|
||||
// RequestVote sends the appropriate RPC to the target node.
|
||||
RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error
|
||||
RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error
|
||||
|
||||
// InstallSnapshot is used to push a snapshot down to a follower. The data is read from
|
||||
// the ReadCloser and streamed to the client.
|
||||
InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
|
||||
InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
|
||||
|
||||
// EncodePeer is used to serialize a peer name.
|
||||
EncodePeer(string) []byte
|
||||
// EncodePeer is used to serialize a peer's address.
|
||||
EncodePeer(ServerAddress) []byte
|
||||
|
||||
// DecodePeer is used to deserialize a peer name.
|
||||
DecodePeer([]byte) string
|
||||
// DecodePeer is used to deserialize a peer's address.
|
||||
DecodePeer([]byte) ServerAddress
|
||||
|
||||
// SetHeartbeatHandler is used to setup a heartbeat handler
|
||||
// as a fast-pass. This is to avoid head-of-line blocking from
|
||||
@@ -60,14 +60,19 @@ type Transport interface {
|
||||
SetHeartbeatHandler(cb func(rpc RPC))
|
||||
}
|
||||
|
||||
// Close() lives in a separate interface as unfortunately it wasn't in the
|
||||
// WithClose is an interface that a transport may provide which
|
||||
// allows a transport to be shut down cleanly when a Raft instance
|
||||
// shuts down.
|
||||
//
|
||||
// It is defined separately from Transport as unfortunately it wasn't in the
|
||||
// original interface specification.
|
||||
type WithClose interface {
|
||||
// Permanently close a transport, stop all go-routines etc
|
||||
// Close permanently closes a transport, stopping
|
||||
// any associated goroutines and freeing other resources.
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Loopback transport is an interface that provides a loopback transport suitable for testing
|
||||
// LoopbackTransport is an interface that provides a loopback transport suitable for testing
|
||||
// e.g. InmemTransport. It's there so we don't have to rewrite tests.
|
||||
type LoopbackTransport interface {
|
||||
Transport // Embedded transport reference
|
||||
@@ -79,9 +84,9 @@ type LoopbackTransport interface {
|
||||
// disconnection. Unless the transport is a loopback transport, the transport specified to
|
||||
// "Connect" is likely to be nil.
|
||||
type WithPeers interface {
|
||||
Connect(peer string, t Transport) // Connect a peer
|
||||
Disconnect(peer string) // Disconnect a given peer
|
||||
DisconnectAll() // Disconnect all peers, possibly to reconnect them later
|
||||
Connect(peer ServerAddress, t Transport) // Connect a peer
|
||||
Disconnect(peer ServerAddress) // Disconnect a given peer
|
||||
DisconnectAll() // Disconnect all peers, possibly to reconnect them later
|
||||
}
|
||||
|
||||
// AppendPipeline is used for pipelining AppendEntries requests. It is used
|
||||
@@ -96,14 +101,24 @@ type AppendPipeline interface {
|
||||
// response futures when they are ready.
|
||||
Consumer() <-chan AppendFuture
|
||||
|
||||
// Closes pipeline and cancels all inflight RPCs
|
||||
// Close closes the pipeline and cancels all inflight RPCs
|
||||
Close() error
|
||||
}
|
||||
|
||||
// AppendFuture is used to return information about a pipelined AppendEntries request.
|
||||
type AppendFuture interface {
|
||||
Future
|
||||
|
||||
// Start returns the time that the append request was started.
|
||||
// It is always OK to call this method.
|
||||
Start() time.Time
|
||||
|
||||
// Request holds the parameters of the AppendEntries call.
|
||||
// It is always OK to call this method.
|
||||
Request() *AppendEntriesRequest
|
||||
|
||||
// Response holds the results of the AppendEntries call.
|
||||
// This method must only be called after the Error
|
||||
// method returns, and will only be valid on success.
|
||||
Response() *AppendEntriesResponse
|
||||
}
|
||||
|
||||
103
vendor/github.com/hashicorp/raft/util.go
generated
vendored
103
vendor/github.com/hashicorp/raft/util.go
generated
vendored
@@ -3,7 +3,6 @@ package raft
|
||||
import (
|
||||
"bytes"
|
||||
crand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/big"
|
||||
@@ -68,14 +67,6 @@ func generateUUID() string {
|
||||
buf[10:16])
|
||||
}
|
||||
|
||||
// asyncNotify is used to do an async channel send to
|
||||
// a list of channels. This will not block.
|
||||
func asyncNotify(chans []chan struct{}) {
|
||||
for _, ch := range chans {
|
||||
asyncNotifyCh(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// asyncNotifyCh is used to do an async channel send
|
||||
// to a single channel without blocking.
|
||||
func asyncNotifyCh(ch chan struct{}) {
|
||||
@@ -85,6 +76,17 @@ func asyncNotifyCh(ch chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// drainNotifyCh empties out a single-item notification channel without
|
||||
// blocking, and returns whether it received anything.
|
||||
func drainNotifyCh(ch chan struct{}) bool {
|
||||
select {
|
||||
case <-ch:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// asyncNotifyBool is used to do an async notification
|
||||
// on a bool channel.
|
||||
func asyncNotifyBool(ch chan bool, v bool) {
|
||||
@@ -94,70 +96,6 @@ func asyncNotifyBool(ch chan bool, v bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// ExcludePeer is used to exclude a single peer from a list of peers.
|
||||
func ExcludePeer(peers []string, peer string) []string {
|
||||
otherPeers := make([]string, 0, len(peers))
|
||||
for _, p := range peers {
|
||||
if p != peer {
|
||||
otherPeers = append(otherPeers, p)
|
||||
}
|
||||
}
|
||||
return otherPeers
|
||||
}
|
||||
|
||||
// PeerContained checks if a given peer is contained in a list.
|
||||
func PeerContained(peers []string, peer string) bool {
|
||||
for _, p := range peers {
|
||||
if p == peer {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// AddUniquePeer is used to add a peer to a list of existing
|
||||
// peers only if it is not already contained.
|
||||
func AddUniquePeer(peers []string, peer string) []string {
|
||||
if PeerContained(peers, peer) {
|
||||
return peers
|
||||
}
|
||||
return append(peers, peer)
|
||||
}
|
||||
|
||||
// encodePeers is used to serialize a list of peers.
|
||||
func encodePeers(peers []string, trans Transport) []byte {
|
||||
// Encode each peer
|
||||
var encPeers [][]byte
|
||||
for _, p := range peers {
|
||||
encPeers = append(encPeers, trans.EncodePeer(p))
|
||||
}
|
||||
|
||||
// Encode the entire array
|
||||
buf, err := encodeMsgPack(encPeers)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to encode peers: %v", err))
|
||||
}
|
||||
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
// decodePeers is used to deserialize a list of peers.
|
||||
func decodePeers(buf []byte, trans Transport) []string {
|
||||
// Decode the buffer first
|
||||
var encPeers [][]byte
|
||||
if err := decodeMsgPack(buf, &encPeers); err != nil {
|
||||
panic(fmt.Errorf("failed to decode peers: %v", err))
|
||||
}
|
||||
|
||||
// Deserialize each peer
|
||||
var peers []string
|
||||
for _, enc := range encPeers {
|
||||
peers = append(peers, trans.DecodePeer(enc))
|
||||
}
|
||||
|
||||
return peers
|
||||
}
|
||||
|
||||
// Decode reverses the encode operation on a byte slice input.
|
||||
func decodeMsgPack(buf []byte, out interface{}) error {
|
||||
r := bytes.NewBuffer(buf)
|
||||
@@ -175,18 +113,6 @@ func encodeMsgPack(in interface{}) (*bytes.Buffer, error) {
|
||||
return buf, err
|
||||
}
|
||||
|
||||
// Converts bytes to an integer.
|
||||
func bytesToUint64(b []byte) uint64 {
|
||||
return binary.BigEndian.Uint64(b)
|
||||
}
|
||||
|
||||
// Converts a uint64 to a byte slice.
|
||||
func uint64ToBytes(u uint64) []byte {
|
||||
buf := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(buf, u)
|
||||
return buf
|
||||
}
|
||||
|
||||
// backoff is used to compute an exponential backoff
|
||||
// duration. Base time is scaled by the current round,
|
||||
// up to some maximum scale factor.
|
||||
@@ -198,3 +124,10 @@ func backoff(base time.Duration, round, limit uint64) time.Duration {
|
||||
}
|
||||
return base
|
||||
}
|
||||
|
||||
// Needed for sorting []uint64, used to determine commitment
|
||||
type uint64Slice []uint64
|
||||
|
||||
func (p uint64Slice) Len() int { return len(p) }
|
||||
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
|
||||
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
||||
6
vendor/vendor.json
vendored
6
vendor/vendor.json
vendored
@@ -796,10 +796,10 @@
|
||||
"revision": "a14192a58a694c123d8fe5481d4a4727d6ae82f3"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "lXd9Jdzl1JmphijD/sAaw169/0Q=",
|
||||
"checksumSHA1": "wpirHJV/6VEbbD+HyAP2/6Xc0ek=",
|
||||
"path": "github.com/hashicorp/raft",
|
||||
"revision": "4bcac2adb06930200744feb591fee8a0790f9c98",
|
||||
"revisionTime": "2016-06-03T20:22:43Z"
|
||||
"revision": "aaad9f10266e089bd401e7a6487651a69275641b",
|
||||
"revisionTime": "2016-11-10T00:52:40Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "QAxukkv54/iIvLfsUP6IK4R0m/A=",
|
||||
|
||||
Reference in New Issue
Block a user