vendor raft to master branch

This commit is contained in:
Chelsea Holland Komlo
2018-01-19 17:00:01 -05:00
parent 028ba9f423
commit d443098f0c
14 changed files with 236 additions and 102 deletions

View File

@@ -1,11 +1,14 @@
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
test:
go test -timeout=60s ./...
go test -timeout=60s .
integ: test
INTEG_TESTS=yes go test -timeout=25s -run=Integ ./...
INTEG_TESTS=yes go test -timeout=25s -run=Integ .
fuzz:
go test -timeout=300s ./fuzzy
deps:
go get -d -v ./...
echo $(DEPS) | xargs -n1 go get -d

View File

@@ -3,7 +3,7 @@ raft [![Build Status](https://travis-ci.org/hashicorp/raft.png)](https://travis-
raft is a [Go](http://www.golang.org) library that manages a replicated
log and can be used with an FSM to manage replicated state machines. It
is library for providing [consensus](http://en.wikipedia.org/wiki/Consensus_(computer_science)).
is a library for providing [consensus](http://en.wikipedia.org/wiki/Consensus_(computer_science)).
The use cases for such a library are far-reaching as replicated state
machines are a key component of many distributed systems. They enable
@@ -32,6 +32,24 @@ A pure Go backend using [BoltDB](https://github.com/boltdb/bolt) is also availab
[raft-boltdb](https://github.com/hashicorp/raft-boltdb). It can also be used as a `LogStore`
and `StableStore`.
## Tagged Releases
As of September 2017, Hashicorp will start using tags for this library to clearly indicate
major version updates. We recommend you vendor your application's dependency on this library.
* v0.1.0 is the original stable version of the library that was in master and has been maintained
with no breaking API changes. This was in use by Consul prior to version 0.7.0.
* v1.0.0 takes the changes that were staged in the library-v2-stage-one branch. This version
manages server identities using a UUID, so introduces some breaking API changes. It also versions
the Raft protocol, and requires some special steps when interoperating with Raft servers running
older versions of the library (see the detailed comment in config.go about version compatibility).
You can reference https://github.com/hashicorp/consul/pull/2222 for an idea of what was required
to port Consul to these new interfaces.
This version includes some new features as well, including non voting servers, a new address
provider abstraction in the transport layer, and more resilient snapshots.
## Protocol
raft is based on ["Raft: In Search of an Understandable Consensus Algorithm"](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf)

View File

@@ -492,6 +492,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
}
r.processConfigurationLogEntry(&entry)
}
r.logger.Printf("[INFO] raft: Initial configuration (index=%d): %+v",
r.configurations.latestIndex, r.configurations.latest.Servers)

View File

@@ -283,7 +283,7 @@ func encodePeers(configuration Configuration, trans Transport) []byte {
var encPeers [][]byte
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
encPeers = append(encPeers, trans.EncodePeer(server.Address))
encPeers = append(encPeers, trans.EncodePeer(server.ID, server.Address))
}
}

View File

@@ -12,6 +12,7 @@ import (
"log"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"time"
@@ -406,17 +407,18 @@ func (s *FileSnapshotSink) Close() error {
return err
}
// fsync the parent directory, to sync directory edits to disk
parentFH, err := os.Open(s.parentDir)
defer parentFH.Close()
if err != nil {
s.logger.Printf("[ERR] snapshot: Failed to open snapshot parent directory %v, error: %v", s.parentDir, err)
return err
}
if runtime.GOOS != "windows" { //skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
parentFH, err := os.Open(s.parentDir)
defer parentFH.Close()
if err != nil {
s.logger.Printf("[ERR] snapshot: Failed to open snapshot parent directory %v, error: %v", s.parentDir, err)
return err
}
if err = parentFH.Sync(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed syncing parent directory %v, error: %v", s.parentDir, err)
return err
if err = parentFH.Sync(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed syncing parent directory %v, error: %v", s.parentDir, err)
return err
}
}
// Reap any old snapshots

View File

@@ -75,7 +75,7 @@ func (i *InmemTransport) LocalAddr() ServerAddress {
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) {
func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) {
i.RLock()
peer, ok := i.peers[target]
i.RUnlock()
@@ -90,7 +90,7 @@ func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipe
}
// AppendEntries implements the Transport interface.
func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
func (i *InmemTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
rpcResp, err := i.makeRPC(target, args, nil, i.timeout)
if err != nil {
return err
@@ -103,7 +103,7 @@ func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntries
}
// RequestVote implements the Transport interface.
func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error {
func (i *InmemTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error {
rpcResp, err := i.makeRPC(target, args, nil, i.timeout)
if err != nil {
return err
@@ -116,7 +116,7 @@ func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequ
}
// InstallSnapshot implements the Transport interface.
func (i *InmemTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
rpcResp, err := i.makeRPC(target, args, data, 10*i.timeout)
if err != nil {
return err
@@ -159,7 +159,7 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re
}
// EncodePeer implements the Transport interface.
func (i *InmemTransport) EncodePeer(p ServerAddress) []byte {
func (i *InmemTransport) EncodePeer(id ServerID, p ServerAddress) []byte {
return []byte(p)
}

View File

@@ -69,17 +69,45 @@ type NetworkTransport struct {
maxPool int
serverAddressProvider ServerAddressProvider
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
stream StreamLayer
streamCancel context.CancelFunc
stream StreamLayer
// streamCtx is used to cancel existing connection handlers.
streamCtx context.Context
streamCancel context.CancelFunc
streamCtxLock sync.RWMutex
timeout time.Duration
TimeoutScale int
}
// NetworkTransportConfig encapsulates configuration for the network transport layer.
type NetworkTransportConfig struct {
// ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC
ServerAddressProvider ServerAddressProvider
Logger *log.Logger
// Dialer
Stream StreamLayer
// MaxPool controls how many connections we will pool
MaxPool int
// Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply
// the timeout by (SnapshotSize / TimeoutScale).
Timeout time.Duration
}
type ServerAddressProvider interface {
ServerAddr(id ServerID) (ServerAddress, error)
}
// StreamLayer is used with the NetworkTransport to provide
// the low level stream abstraction.
type StreamLayer interface {
@@ -114,6 +142,32 @@ type netPipeline struct {
shutdownLock sync.Mutex
}
// NewNetworkTransportWithConfig creates a new network transport with the given config struct
func NewNetworkTransportWithConfig(
config *NetworkTransportConfig,
) *NetworkTransport {
if config.Logger == nil {
config.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
trans := &NetworkTransport{
connPool: make(map[ServerAddress][]*netConn),
consumeCh: make(chan RPC),
logger: config.Logger,
maxPool: config.MaxPool,
shutdownCh: make(chan struct{}),
stream: config.Stream,
timeout: config.Timeout,
TimeoutScale: DefaultTimeoutScale,
serverAddressProvider: config.ServerAddressProvider,
}
// Create the connection context and then start our listener.
trans.setupStreamContext()
go trans.listen()
return trans
}
// NewNetworkTransport creates a new network transport with the given dialer
// and listener. The maxPool controls how many connections we will pool. The
// timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply
@@ -127,10 +181,12 @@ func NewNetworkTransport(
if logOutput == nil {
logOutput = os.Stderr
}
return NewNetworkTransportWithLogger(stream, maxPool, timeout, log.New(logOutput, "", log.LstdFlags))
logger := log.New(logOutput, "", log.LstdFlags)
config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger}
return NewNetworkTransportWithConfig(config)
}
// NewNetworkTransportWithLogger creates a new network transport with the given dialer
// NewNetworkTransportWithLogger creates a new network transport with the given logger, dialer
// and listener. The maxPool controls how many connections we will pool. The
// timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply
// the timeout by (SnapshotSize / TimeoutScale).
@@ -140,44 +196,23 @@ func NewNetworkTransportWithLogger(
timeout time.Duration,
logger *log.Logger,
) *NetworkTransport {
if logger == nil {
logger = log.New(os.Stderr, "", log.LstdFlags)
}
trans := &NetworkTransport{
connPool: make(map[ServerAddress][]*netConn),
consumeCh: make(chan RPC),
logger: logger,
maxPool: maxPool,
shutdownCh: make(chan struct{}),
stream: stream,
timeout: timeout,
TimeoutScale: DefaultTimeoutScale,
}
ctx, cancel := context.WithCancel(context.Background())
trans.streamCancel = cancel
go trans.listen(ctx)
return trans
config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger}
return NewNetworkTransportWithConfig(config)
}
// Pause closes the current stream and existing connections for a
// NetworkTransport instance
func (n *NetworkTransport) Pause() {
for _, e := range n.connPool {
for _, conn := range e {
conn.Release()
}
}
n.streamCancel()
}
// Pause creates a new stream for a NetworkTransport instance
func (n *NetworkTransport) Reload(s StreamLayer) {
n.stream = s
// setupStreamContext is used to create a new stream context. This should be
// called with the stream lock held.
func (n *NetworkTransport) setupStreamContext() {
ctx, cancel := context.WithCancel(context.Background())
n.streamCtx = ctx
n.streamCancel = cancel
go n.listen(ctx)
}
// getStreamContext is used retrieve the current stream context.
func (n *NetworkTransport) getStreamContext() context.Context {
n.streamCtxLock.RLock()
defer n.streamCtxLock.RUnlock()
return n.streamCtx
}
// SetHeartbeatHandler is used to setup a heartbeat handler
@@ -189,6 +224,31 @@ func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC)) {
n.heartbeatFn = cb
}
// CloseStreams closes the current streams.
func (n *NetworkTransport) CloseStreams() {
n.connPoolLock.Lock()
defer n.connPoolLock.Unlock()
// Close all the connections in the connection pool and then remove their
// entry.
for k, e := range n.connPool {
for _, conn := range e {
conn.Release()
}
delete(n.connPool, k)
}
// Cancel the existing connections and create a new context. Both these
// operations must always be done with the lock held otherwise we can create
// connection handlers that are holding a context that will never be
// cancelable.
n.streamCtxLock.Lock()
n.streamCancel()
n.setupStreamContext()
n.streamCtxLock.Unlock()
}
// Close is used to stop the network transport.
func (n *NetworkTransport) Close() error {
n.shutdownLock.Lock()
@@ -239,6 +299,24 @@ func (n *NetworkTransport) getPooledConn(target ServerAddress) *netConn {
return conn
}
// getConnFromAddressProvider returns a connection from the server address provider if available, or defaults to a connection using the target server address
func (n *NetworkTransport) getConnFromAddressProvider(id ServerID, target ServerAddress) (*netConn, error) {
address := n.getProviderAddressOrFallback(id, target)
return n.getConn(address)
}
func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target ServerAddress) ServerAddress {
if n.serverAddressProvider != nil {
serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id)
if err != nil {
n.logger.Printf("[WARN] Unable to get address for server id %v, using fallback address %v: %v", id, target, err)
} else {
return serverAddressOverride
}
}
return target
}
// getConn is used to get a connection from the pool.
func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) {
// Check for a pooled conn
@@ -285,9 +363,9 @@ func (n *NetworkTransport) returnConn(conn *netConn) {
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) {
func (n *NetworkTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) {
// Get a connection
conn, err := n.getConn(target)
conn, err := n.getConnFromAddressProvider(id, target)
if err != nil {
return nil, err
}
@@ -297,19 +375,19 @@ func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPi
}
// AppendEntries implements the Transport interface.
func (n *NetworkTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
return n.genericRPC(target, rpcAppendEntries, args, resp)
func (n *NetworkTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
return n.genericRPC(id, target, rpcAppendEntries, args, resp)
}
// RequestVote implements the Transport interface.
func (n *NetworkTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error {
return n.genericRPC(target, rpcRequestVote, args, resp)
func (n *NetworkTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error {
return n.genericRPC(id, target, rpcRequestVote, args, resp)
}
// genericRPC handles a simple request/response RPC.
func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error {
func (n *NetworkTransport) genericRPC(id ServerID, target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error {
// Get a conn
conn, err := n.getConn(target)
conn, err := n.getConnFromAddressProvider(id, target)
if err != nil {
return err
}
@@ -333,9 +411,9 @@ func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args
}
// InstallSnapshot implements the Transport interface.
func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
func (n *NetworkTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
// Get a conn, always close for InstallSnapshot
conn, err := n.getConn(target)
conn, err := n.getConnFromAddressProvider(id, target)
if err != nil {
return err
}
@@ -371,8 +449,9 @@ func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSn
}
// EncodePeer implements the Transport interface.
func (n *NetworkTransport) EncodePeer(p ServerAddress) []byte {
return []byte(p)
func (n *NetworkTransport) EncodePeer(id ServerID, p ServerAddress) []byte {
address := n.getProviderAddressOrFallback(id, p)
return []byte(address)
}
// DecodePeer implements the Transport interface.
@@ -381,15 +460,8 @@ func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress {
}
// listen is used to handling incoming connections.
func (n *NetworkTransport) listen(ctx context.Context) {
func (n *NetworkTransport) listen() {
for {
select {
case <-ctx.Done():
n.logger.Println("[INFO] raft-net: stream layer is closed")
return
default:
}
// Accept incoming connections
conn, err := n.stream.Accept()
if err != nil {
@@ -402,12 +474,14 @@ func (n *NetworkTransport) listen(ctx context.Context) {
n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr())
// Handle the connection in dedicated routine
go n.handleConn(ctx, conn)
go n.handleConn(n.getStreamContext(), conn)
}
}
// handleConn is used to handle an inbound connection for its lifespan.
func (n *NetworkTransport) handleConn(ctx context.Context, conn net.Conn) {
// handleConn is used to handle an inbound connection for its lifespan. The
// handler will exit when the passed context is cancelled or the connection is
// closed.
func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) {
defer conn.Close()
r := bufio.NewReader(conn)
w := bufio.NewWriter(conn)
@@ -416,8 +490,8 @@ func (n *NetworkTransport) handleConn(ctx context.Context, conn net.Conn) {
for {
select {
case <-ctx.Done():
n.logger.Println("[INFO] raft-net: stream layer is closed for handleConn")
case <-connCtx.Done():
n.logger.Println("[DEBUG] raft-net: stream layer is closed")
return
default:
}

View File

@@ -13,6 +13,11 @@ type Observation struct {
Data interface{}
}
// LeaderObservation is used for the data when leadership changes.
type LeaderObservation struct {
leader ServerAddress
}
// nextObserverId is used to provide a unique ID for each observer to aid in
// deregistration.
var nextObserverID uint64

View File

@@ -88,8 +88,12 @@ type leaderState struct {
// setLeader is used to modify the current leader of the cluster
func (r *Raft) setLeader(leader ServerAddress) {
r.leaderLock.Lock()
oldLeader := r.leader
r.leader = leader
r.leaderLock.Unlock()
if oldLeader != leader {
r.observe(LeaderObservation{leader: leader})
}
}
// requestConfigChange is a helper for the above functions that make
@@ -1379,7 +1383,7 @@ func (r *Raft) electSelf() <-chan *voteResult {
req := &RequestVoteRequest{
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
Candidate: r.trans.EncodePeer(r.localAddr),
Candidate: r.trans.EncodePeer(r.localID, r.localAddr),
LastLogIndex: lastIdx,
LastLogTerm: lastTerm,
}
@@ -1389,7 +1393,7 @@ func (r *Raft) electSelf() <-chan *voteResult {
r.goFunc(func() {
defer metrics.MeasureSince([]string{"raft", "candidate", "electSelf"}, time.Now())
resp := &voteResult{voterID: peer.ID}
err := r.trans.RequestVote(peer.Address, req, &resp.RequestVoteResponse)
err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse)
if err != nil {
r.logger.Printf("[ERR] raft: Failed to make RequestVote RPC to %v: %v", peer, err)
resp.Term = req.Term

View File

@@ -157,7 +157,7 @@ PIPELINE:
goto RPC
}
// replicateTo is a hepler to replicate(), used to replicate the logs up to a
// replicateTo is a helper to replicate(), used to replicate the logs up to a
// given last index.
// If the follower log is behind, we take care to bring them up to date.
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
@@ -183,7 +183,7 @@ START:
// Make the RPC call
start = time.Now()
if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil {
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Printf("[ERR] raft: Failed to AppendEntries to %v: %v", s.peer, err)
s.failures++
return
@@ -278,7 +278,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
RPCHeader: r.getRPCHeader(),
SnapshotVersion: meta.Version,
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
Leader: r.trans.EncodePeer(r.localID, r.localAddr),
LastLogIndex: meta.Index,
LastLogTerm: meta.Term,
Peers: meta.Peers,
@@ -290,7 +290,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
// Make the call
start := time.Now()
var resp InstallSnapshotResponse
if err := r.trans.InstallSnapshot(s.peer.Address, &req, &resp, snapshot); err != nil {
if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil {
r.logger.Printf("[ERR] raft: Failed to install snapshot %v: %v", snapID, err)
s.failures++
return false, err
@@ -332,7 +332,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
req := AppendEntriesRequest{
RPCHeader: r.getRPCHeader(),
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
Leader: r.trans.EncodePeer(r.localID, r.localAddr),
}
var resp AppendEntriesResponse
for {
@@ -345,7 +345,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
}
start := time.Now()
if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil {
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Printf("[ERR] raft: Failed to heartbeat to %v: %v", s.peer.Address, err)
failures++
select {
@@ -367,7 +367,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
// back to the standard replication which can handle more complex situations.
func (r *Raft) pipelineReplicate(s *followerReplication) error {
// Create a new pipeline
pipeline, err := r.trans.AppendEntriesPipeline(s.peer.Address)
pipeline, err := r.trans.AppendEntriesPipeline(s.peer.ID, s.peer.Address)
if err != nil {
return err
}
@@ -476,7 +476,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,
func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error {
req.RPCHeader = r.getRPCHeader()
req.Term = s.currentTerm
req.Leader = r.trans.EncodePeer(r.localAddr)
req.Leader = r.trans.EncodePeer(r.localID, r.localAddr)
req.LeaderCommitIndex = r.getCommitIndex()
if err := r.setPreviousLog(req, nextIndex); err != nil {
return err

16
vendor/github.com/hashicorp/raft/tag.sh generated vendored Executable file
View File

@@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -e
# The version must be supplied from the environment. Do not include the
# leading "v".
if [ -z $VERSION ]; then
echo "Please specify a version."
exit 1
fi
# Generate the tag.
echo "==> Tagging version $VERSION..."
git commit --allow-empty -a --gpg-sign=348FFC4C -m "Release v$VERSION"
git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" master
exit 0

View File

@@ -28,7 +28,7 @@ func NewTCPTransport(
timeout time.Duration,
logOutput io.Writer,
) (*NetworkTransport, error) {
return newTCPTransport(bindAddr, advertise, maxPool, timeout, func(stream StreamLayer) *NetworkTransport {
return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport {
return NewNetworkTransport(stream, maxPool, timeout, logOutput)
})
}
@@ -42,15 +42,26 @@ func NewTCPTransportWithLogger(
timeout time.Duration,
logger *log.Logger,
) (*NetworkTransport, error) {
return newTCPTransport(bindAddr, advertise, maxPool, timeout, func(stream StreamLayer) *NetworkTransport {
return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport {
return NewNetworkTransportWithLogger(stream, maxPool, timeout, logger)
})
}
// NewTCPTransportWithLogger returns a NetworkTransport that is built on top of
// a TCP streaming transport layer, using a default logger and the address provider
func NewTCPTransportWithConfig(
bindAddr string,
advertise net.Addr,
config *NetworkTransportConfig,
) (*NetworkTransport, error) {
return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport {
config.Stream = stream
return NewNetworkTransportWithConfig(config)
})
}
func newTCPTransport(bindAddr string,
advertise net.Addr,
maxPool int,
timeout time.Duration,
transportCreator func(stream StreamLayer) *NetworkTransport) (*NetworkTransport, error) {
// Try to bind
list, err := net.Listen("tcp", bindAddr)

View File

@@ -35,20 +35,20 @@ type Transport interface {
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error)
AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error)
// AppendEntries sends the appropriate RPC to the target node.
AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
// RequestVote sends the appropriate RPC to the target node.
RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error
RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error
// InstallSnapshot is used to push a snapshot down to a follower. The data is read from
// the ReadCloser and streamed to the client.
InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
// EncodePeer is used to serialize a peer's address.
EncodePeer(ServerAddress) []byte
EncodePeer(id ServerID, addr ServerAddress) []byte
// DecodePeer is used to deserialize a peer's address.
DecodePeer([]byte) ServerAddress

2
vendor/vendor.json vendored
View File

@@ -162,7 +162,7 @@
{"path":"github.com/hashicorp/logutils","revision":"0dc08b1671f34c4250ce212759ebd880f743d883"},
{"path":"github.com/hashicorp/memberlist","checksumSHA1":"1zk7IeGClUqBo+Phsx89p7fQ/rQ=","revision":"23ad4b7d7b38496cd64c241dfd4c60b7794c254a","revisionTime":"2017-02-08T21:15:06Z"},
{"path":"github.com/hashicorp/net-rpc-msgpackrpc","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3"},
{"path":"github.com/hashicorp/raft","checksumSHA1":"ecpaHOImbL/NaivWrUDUUe5461E=","revision":"3a6f3bdfe4fc69e300c6d122b1a92051af6f0b95","revisionTime":"2017-08-07T22:22:24Z"},
{"path":"github.com/hashicorp/raft","checksumSHA1":"zkA9uvbj1BdlveyqXpVTh1N6ers=","revision":"077966dbc90f342107eb723ec52fdb0463ec789b","revisionTime":"2018-01-17T20:29:25Z","version":"=master","versionExact":"master"},
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"/oss17GO4hXGM7QnUdI3VzcAHzA=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"pvLOzocYsZtxuJ9pREHRTxYnoa4=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"},