From d443098f0c38536a35d1f577ca5d2840ee136422 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Fri, 19 Jan 2018 17:00:01 -0500 Subject: [PATCH] vendor raft to master branch --- vendor/github.com/hashicorp/raft/Makefile | 7 +- vendor/github.com/hashicorp/raft/README.md | 20 +- vendor/github.com/hashicorp/raft/api.go | 1 + .../hashicorp/raft/configuration.go | 2 +- .../hashicorp/raft/file_snapshot.go | 22 +- .../hashicorp/raft/inmem_transport.go | 10 +- .../hashicorp/raft/net_transport.go | 200 ++++++++++++------ vendor/github.com/hashicorp/raft/observer.go | 5 + vendor/github.com/hashicorp/raft/raft.go | 8 +- .../github.com/hashicorp/raft/replication.go | 16 +- vendor/github.com/hashicorp/raft/tag.sh | 16 ++ .../hashicorp/raft/tcp_transport.go | 19 +- vendor/github.com/hashicorp/raft/transport.go | 10 +- vendor/vendor.json | 2 +- 14 files changed, 236 insertions(+), 102 deletions(-) create mode 100755 vendor/github.com/hashicorp/raft/tag.sh diff --git a/vendor/github.com/hashicorp/raft/Makefile b/vendor/github.com/hashicorp/raft/Makefile index 0910e2ceb..75d947f13 100644 --- a/vendor/github.com/hashicorp/raft/Makefile +++ b/vendor/github.com/hashicorp/raft/Makefile @@ -1,11 +1,14 @@ DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) test: - go test -timeout=60s ./... + go test -timeout=60s . integ: test - INTEG_TESTS=yes go test -timeout=25s -run=Integ ./... + INTEG_TESTS=yes go test -timeout=25s -run=Integ . +fuzz: + go test -timeout=300s ./fuzzy + deps: go get -d -v ./... echo $(DEPS) | xargs -n1 go get -d diff --git a/vendor/github.com/hashicorp/raft/README.md b/vendor/github.com/hashicorp/raft/README.md index 8778b13dc..a70ec8a08 100644 --- a/vendor/github.com/hashicorp/raft/README.md +++ b/vendor/github.com/hashicorp/raft/README.md @@ -3,7 +3,7 @@ raft [![Build Status](https://travis-ci.org/hashicorp/raft.png)](https://travis- raft is a [Go](http://www.golang.org) library that manages a replicated log and can be used with an FSM to manage replicated state machines. It -is library for providing [consensus](http://en.wikipedia.org/wiki/Consensus_(computer_science)). +is a library for providing [consensus](http://en.wikipedia.org/wiki/Consensus_(computer_science)). The use cases for such a library are far-reaching as replicated state machines are a key component of many distributed systems. They enable @@ -32,6 +32,24 @@ A pure Go backend using [BoltDB](https://github.com/boltdb/bolt) is also availab [raft-boltdb](https://github.com/hashicorp/raft-boltdb). It can also be used as a `LogStore` and `StableStore`. +## Tagged Releases + +As of September 2017, Hashicorp will start using tags for this library to clearly indicate +major version updates. We recommend you vendor your application's dependency on this library. + +* v0.1.0 is the original stable version of the library that was in master and has been maintained +with no breaking API changes. This was in use by Consul prior to version 0.7.0. + +* v1.0.0 takes the changes that were staged in the library-v2-stage-one branch. This version +manages server identities using a UUID, so introduces some breaking API changes. It also versions +the Raft protocol, and requires some special steps when interoperating with Raft servers running +older versions of the library (see the detailed comment in config.go about version compatibility). +You can reference https://github.com/hashicorp/consul/pull/2222 for an idea of what was required +to port Consul to these new interfaces. + + This version includes some new features as well, including non voting servers, a new address + provider abstraction in the transport layer, and more resilient snapshots. + ## Protocol raft is based on ["Raft: In Search of an Understandable Consensus Algorithm"](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) diff --git a/vendor/github.com/hashicorp/raft/api.go b/vendor/github.com/hashicorp/raft/api.go index 147cde295..73f057c98 100644 --- a/vendor/github.com/hashicorp/raft/api.go +++ b/vendor/github.com/hashicorp/raft/api.go @@ -492,6 +492,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna } r.processConfigurationLogEntry(&entry) } + r.logger.Printf("[INFO] raft: Initial configuration (index=%d): %+v", r.configurations.latestIndex, r.configurations.latest.Servers) diff --git a/vendor/github.com/hashicorp/raft/configuration.go b/vendor/github.com/hashicorp/raft/configuration.go index 74508c5e5..8afc38bd9 100644 --- a/vendor/github.com/hashicorp/raft/configuration.go +++ b/vendor/github.com/hashicorp/raft/configuration.go @@ -283,7 +283,7 @@ func encodePeers(configuration Configuration, trans Transport) []byte { var encPeers [][]byte for _, server := range configuration.Servers { if server.Suffrage == Voter { - encPeers = append(encPeers, trans.EncodePeer(server.Address)) + encPeers = append(encPeers, trans.EncodePeer(server.ID, server.Address)) } } diff --git a/vendor/github.com/hashicorp/raft/file_snapshot.go b/vendor/github.com/hashicorp/raft/file_snapshot.go index 119bfd308..ffc941454 100644 --- a/vendor/github.com/hashicorp/raft/file_snapshot.go +++ b/vendor/github.com/hashicorp/raft/file_snapshot.go @@ -12,6 +12,7 @@ import ( "log" "os" "path/filepath" + "runtime" "sort" "strings" "time" @@ -406,17 +407,18 @@ func (s *FileSnapshotSink) Close() error { return err } - // fsync the parent directory, to sync directory edits to disk - parentFH, err := os.Open(s.parentDir) - defer parentFH.Close() - if err != nil { - s.logger.Printf("[ERR] snapshot: Failed to open snapshot parent directory %v, error: %v", s.parentDir, err) - return err - } + if runtime.GOOS != "windows" { //skipping fsync for directory entry edits on Windows, only needed for *nix style file systems + parentFH, err := os.Open(s.parentDir) + defer parentFH.Close() + if err != nil { + s.logger.Printf("[ERR] snapshot: Failed to open snapshot parent directory %v, error: %v", s.parentDir, err) + return err + } - if err = parentFH.Sync(); err != nil { - s.logger.Printf("[ERR] snapshot: Failed syncing parent directory %v, error: %v", s.parentDir, err) - return err + if err = parentFH.Sync(); err != nil { + s.logger.Printf("[ERR] snapshot: Failed syncing parent directory %v, error: %v", s.parentDir, err) + return err + } } // Reap any old snapshots diff --git a/vendor/github.com/hashicorp/raft/inmem_transport.go b/vendor/github.com/hashicorp/raft/inmem_transport.go index 3693cd5ad..ce37f63aa 100644 --- a/vendor/github.com/hashicorp/raft/inmem_transport.go +++ b/vendor/github.com/hashicorp/raft/inmem_transport.go @@ -75,7 +75,7 @@ func (i *InmemTransport) LocalAddr() ServerAddress { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. -func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) { +func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) { i.RLock() peer, ok := i.peers[target] i.RUnlock() @@ -90,7 +90,7 @@ func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipe } // AppendEntries implements the Transport interface. -func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { +func (i *InmemTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { rpcResp, err := i.makeRPC(target, args, nil, i.timeout) if err != nil { return err @@ -103,7 +103,7 @@ func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntries } // RequestVote implements the Transport interface. -func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { +func (i *InmemTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { rpcResp, err := i.makeRPC(target, args, nil, i.timeout) if err != nil { return err @@ -116,7 +116,7 @@ func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequ } // InstallSnapshot implements the Transport interface. -func (i *InmemTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { +func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { rpcResp, err := i.makeRPC(target, args, data, 10*i.timeout) if err != nil { return err @@ -159,7 +159,7 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re } // EncodePeer implements the Transport interface. -func (i *InmemTransport) EncodePeer(p ServerAddress) []byte { +func (i *InmemTransport) EncodePeer(id ServerID, p ServerAddress) []byte { return []byte(p) } diff --git a/vendor/github.com/hashicorp/raft/net_transport.go b/vendor/github.com/hashicorp/raft/net_transport.go index b45d03e24..454fc2a57 100644 --- a/vendor/github.com/hashicorp/raft/net_transport.go +++ b/vendor/github.com/hashicorp/raft/net_transport.go @@ -69,17 +69,45 @@ type NetworkTransport struct { maxPool int + serverAddressProvider ServerAddressProvider + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex - stream StreamLayer - streamCancel context.CancelFunc + stream StreamLayer + + // streamCtx is used to cancel existing connection handlers. + streamCtx context.Context + streamCancel context.CancelFunc + streamCtxLock sync.RWMutex timeout time.Duration TimeoutScale int } +// NetworkTransportConfig encapsulates configuration for the network transport layer. +type NetworkTransportConfig struct { + // ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC + ServerAddressProvider ServerAddressProvider + + Logger *log.Logger + + // Dialer + Stream StreamLayer + + // MaxPool controls how many connections we will pool + MaxPool int + + // Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply + // the timeout by (SnapshotSize / TimeoutScale). + Timeout time.Duration +} + +type ServerAddressProvider interface { + ServerAddr(id ServerID) (ServerAddress, error) +} + // StreamLayer is used with the NetworkTransport to provide // the low level stream abstraction. type StreamLayer interface { @@ -114,6 +142,32 @@ type netPipeline struct { shutdownLock sync.Mutex } +// NewNetworkTransportWithConfig creates a new network transport with the given config struct +func NewNetworkTransportWithConfig( + config *NetworkTransportConfig, +) *NetworkTransport { + if config.Logger == nil { + config.Logger = log.New(os.Stderr, "", log.LstdFlags) + } + trans := &NetworkTransport{ + connPool: make(map[ServerAddress][]*netConn), + consumeCh: make(chan RPC), + logger: config.Logger, + maxPool: config.MaxPool, + shutdownCh: make(chan struct{}), + stream: config.Stream, + timeout: config.Timeout, + TimeoutScale: DefaultTimeoutScale, + serverAddressProvider: config.ServerAddressProvider, + } + + // Create the connection context and then start our listener. + trans.setupStreamContext() + go trans.listen() + + return trans +} + // NewNetworkTransport creates a new network transport with the given dialer // and listener. The maxPool controls how many connections we will pool. The // timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply @@ -127,10 +181,12 @@ func NewNetworkTransport( if logOutput == nil { logOutput = os.Stderr } - return NewNetworkTransportWithLogger(stream, maxPool, timeout, log.New(logOutput, "", log.LstdFlags)) + logger := log.New(logOutput, "", log.LstdFlags) + config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} + return NewNetworkTransportWithConfig(config) } -// NewNetworkTransportWithLogger creates a new network transport with the given dialer +// NewNetworkTransportWithLogger creates a new network transport with the given logger, dialer // and listener. The maxPool controls how many connections we will pool. The // timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply // the timeout by (SnapshotSize / TimeoutScale). @@ -140,44 +196,23 @@ func NewNetworkTransportWithLogger( timeout time.Duration, logger *log.Logger, ) *NetworkTransport { - if logger == nil { - logger = log.New(os.Stderr, "", log.LstdFlags) - } - - trans := &NetworkTransport{ - connPool: make(map[ServerAddress][]*netConn), - consumeCh: make(chan RPC), - logger: logger, - maxPool: maxPool, - shutdownCh: make(chan struct{}), - stream: stream, - timeout: timeout, - TimeoutScale: DefaultTimeoutScale, - } - - ctx, cancel := context.WithCancel(context.Background()) - trans.streamCancel = cancel - go trans.listen(ctx) - return trans + config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} + return NewNetworkTransportWithConfig(config) } -// Pause closes the current stream and existing connections for a -// NetworkTransport instance -func (n *NetworkTransport) Pause() { - for _, e := range n.connPool { - for _, conn := range e { - conn.Release() - } - } - n.streamCancel() -} - -// Pause creates a new stream for a NetworkTransport instance -func (n *NetworkTransport) Reload(s StreamLayer) { - n.stream = s +// setupStreamContext is used to create a new stream context. This should be +// called with the stream lock held. +func (n *NetworkTransport) setupStreamContext() { ctx, cancel := context.WithCancel(context.Background()) + n.streamCtx = ctx n.streamCancel = cancel - go n.listen(ctx) +} + +// getStreamContext is used retrieve the current stream context. +func (n *NetworkTransport) getStreamContext() context.Context { + n.streamCtxLock.RLock() + defer n.streamCtxLock.RUnlock() + return n.streamCtx } // SetHeartbeatHandler is used to setup a heartbeat handler @@ -189,6 +224,31 @@ func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC)) { n.heartbeatFn = cb } +// CloseStreams closes the current streams. +func (n *NetworkTransport) CloseStreams() { + n.connPoolLock.Lock() + defer n.connPoolLock.Unlock() + + // Close all the connections in the connection pool and then remove their + // entry. + for k, e := range n.connPool { + for _, conn := range e { + conn.Release() + } + + delete(n.connPool, k) + } + + // Cancel the existing connections and create a new context. Both these + // operations must always be done with the lock held otherwise we can create + // connection handlers that are holding a context that will never be + // cancelable. + n.streamCtxLock.Lock() + n.streamCancel() + n.setupStreamContext() + n.streamCtxLock.Unlock() +} + // Close is used to stop the network transport. func (n *NetworkTransport) Close() error { n.shutdownLock.Lock() @@ -239,6 +299,24 @@ func (n *NetworkTransport) getPooledConn(target ServerAddress) *netConn { return conn } +// getConnFromAddressProvider returns a connection from the server address provider if available, or defaults to a connection using the target server address +func (n *NetworkTransport) getConnFromAddressProvider(id ServerID, target ServerAddress) (*netConn, error) { + address := n.getProviderAddressOrFallback(id, target) + return n.getConn(address) +} + +func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target ServerAddress) ServerAddress { + if n.serverAddressProvider != nil { + serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id) + if err != nil { + n.logger.Printf("[WARN] Unable to get address for server id %v, using fallback address %v: %v", id, target, err) + } else { + return serverAddressOverride + } + } + return target +} + // getConn is used to get a connection from the pool. func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) { // Check for a pooled conn @@ -285,9 +363,9 @@ func (n *NetworkTransport) returnConn(conn *netConn) { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. -func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) { +func (n *NetworkTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) { // Get a connection - conn, err := n.getConn(target) + conn, err := n.getConnFromAddressProvider(id, target) if err != nil { return nil, err } @@ -297,19 +375,19 @@ func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPi } // AppendEntries implements the Transport interface. -func (n *NetworkTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { - return n.genericRPC(target, rpcAppendEntries, args, resp) +func (n *NetworkTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { + return n.genericRPC(id, target, rpcAppendEntries, args, resp) } // RequestVote implements the Transport interface. -func (n *NetworkTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { - return n.genericRPC(target, rpcRequestVote, args, resp) +func (n *NetworkTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { + return n.genericRPC(id, target, rpcRequestVote, args, resp) } // genericRPC handles a simple request/response RPC. -func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error { +func (n *NetworkTransport) genericRPC(id ServerID, target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error { // Get a conn - conn, err := n.getConn(target) + conn, err := n.getConnFromAddressProvider(id, target) if err != nil { return err } @@ -333,9 +411,9 @@ func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args } // InstallSnapshot implements the Transport interface. -func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { +func (n *NetworkTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { // Get a conn, always close for InstallSnapshot - conn, err := n.getConn(target) + conn, err := n.getConnFromAddressProvider(id, target) if err != nil { return err } @@ -371,8 +449,9 @@ func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSn } // EncodePeer implements the Transport interface. -func (n *NetworkTransport) EncodePeer(p ServerAddress) []byte { - return []byte(p) +func (n *NetworkTransport) EncodePeer(id ServerID, p ServerAddress) []byte { + address := n.getProviderAddressOrFallback(id, p) + return []byte(address) } // DecodePeer implements the Transport interface. @@ -381,15 +460,8 @@ func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress { } // listen is used to handling incoming connections. -func (n *NetworkTransport) listen(ctx context.Context) { +func (n *NetworkTransport) listen() { for { - select { - case <-ctx.Done(): - n.logger.Println("[INFO] raft-net: stream layer is closed") - return - default: - } - // Accept incoming connections conn, err := n.stream.Accept() if err != nil { @@ -402,12 +474,14 @@ func (n *NetworkTransport) listen(ctx context.Context) { n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr()) // Handle the connection in dedicated routine - go n.handleConn(ctx, conn) + go n.handleConn(n.getStreamContext(), conn) } } -// handleConn is used to handle an inbound connection for its lifespan. -func (n *NetworkTransport) handleConn(ctx context.Context, conn net.Conn) { +// handleConn is used to handle an inbound connection for its lifespan. The +// handler will exit when the passed context is cancelled or the connection is +// closed. +func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) { defer conn.Close() r := bufio.NewReader(conn) w := bufio.NewWriter(conn) @@ -416,8 +490,8 @@ func (n *NetworkTransport) handleConn(ctx context.Context, conn net.Conn) { for { select { - case <-ctx.Done(): - n.logger.Println("[INFO] raft-net: stream layer is closed for handleConn") + case <-connCtx.Done(): + n.logger.Println("[DEBUG] raft-net: stream layer is closed") return default: } diff --git a/vendor/github.com/hashicorp/raft/observer.go b/vendor/github.com/hashicorp/raft/observer.go index 76c4d555d..bce17ef19 100644 --- a/vendor/github.com/hashicorp/raft/observer.go +++ b/vendor/github.com/hashicorp/raft/observer.go @@ -13,6 +13,11 @@ type Observation struct { Data interface{} } +// LeaderObservation is used for the data when leadership changes. +type LeaderObservation struct { + leader ServerAddress +} + // nextObserverId is used to provide a unique ID for each observer to aid in // deregistration. var nextObserverID uint64 diff --git a/vendor/github.com/hashicorp/raft/raft.go b/vendor/github.com/hashicorp/raft/raft.go index 0f89ccaa4..b5cc9ca98 100644 --- a/vendor/github.com/hashicorp/raft/raft.go +++ b/vendor/github.com/hashicorp/raft/raft.go @@ -88,8 +88,12 @@ type leaderState struct { // setLeader is used to modify the current leader of the cluster func (r *Raft) setLeader(leader ServerAddress) { r.leaderLock.Lock() + oldLeader := r.leader r.leader = leader r.leaderLock.Unlock() + if oldLeader != leader { + r.observe(LeaderObservation{leader: leader}) + } } // requestConfigChange is a helper for the above functions that make @@ -1379,7 +1383,7 @@ func (r *Raft) electSelf() <-chan *voteResult { req := &RequestVoteRequest{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), - Candidate: r.trans.EncodePeer(r.localAddr), + Candidate: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: lastIdx, LastLogTerm: lastTerm, } @@ -1389,7 +1393,7 @@ func (r *Raft) electSelf() <-chan *voteResult { r.goFunc(func() { defer metrics.MeasureSince([]string{"raft", "candidate", "electSelf"}, time.Now()) resp := &voteResult{voterID: peer.ID} - err := r.trans.RequestVote(peer.Address, req, &resp.RequestVoteResponse) + err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse) if err != nil { r.logger.Printf("[ERR] raft: Failed to make RequestVote RPC to %v: %v", peer, err) resp.Term = req.Term diff --git a/vendor/github.com/hashicorp/raft/replication.go b/vendor/github.com/hashicorp/raft/replication.go index 683927343..e631b5a09 100644 --- a/vendor/github.com/hashicorp/raft/replication.go +++ b/vendor/github.com/hashicorp/raft/replication.go @@ -157,7 +157,7 @@ PIPELINE: goto RPC } -// replicateTo is a hepler to replicate(), used to replicate the logs up to a +// replicateTo is a helper to replicate(), used to replicate the logs up to a // given last index. // If the follower log is behind, we take care to bring them up to date. func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) { @@ -183,7 +183,7 @@ START: // Make the RPC call start = time.Now() - if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil { + if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { r.logger.Printf("[ERR] raft: Failed to AppendEntries to %v: %v", s.peer, err) s.failures++ return @@ -278,7 +278,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { RPCHeader: r.getRPCHeader(), SnapshotVersion: meta.Version, Term: s.currentTerm, - Leader: r.trans.EncodePeer(r.localAddr), + Leader: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: meta.Index, LastLogTerm: meta.Term, Peers: meta.Peers, @@ -290,7 +290,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Make the call start := time.Now() var resp InstallSnapshotResponse - if err := r.trans.InstallSnapshot(s.peer.Address, &req, &resp, snapshot); err != nil { + if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil { r.logger.Printf("[ERR] raft: Failed to install snapshot %v: %v", snapID, err) s.failures++ return false, err @@ -332,7 +332,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { req := AppendEntriesRequest{ RPCHeader: r.getRPCHeader(), Term: s.currentTerm, - Leader: r.trans.EncodePeer(r.localAddr), + Leader: r.trans.EncodePeer(r.localID, r.localAddr), } var resp AppendEntriesResponse for { @@ -345,7 +345,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { } start := time.Now() - if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil { + if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { r.logger.Printf("[ERR] raft: Failed to heartbeat to %v: %v", s.peer.Address, err) failures++ select { @@ -367,7 +367,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { // back to the standard replication which can handle more complex situations. func (r *Raft) pipelineReplicate(s *followerReplication) error { // Create a new pipeline - pipeline, err := r.trans.AppendEntriesPipeline(s.peer.Address) + pipeline, err := r.trans.AppendEntriesPipeline(s.peer.ID, s.peer.Address) if err != nil { return err } @@ -476,7 +476,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh, func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error { req.RPCHeader = r.getRPCHeader() req.Term = s.currentTerm - req.Leader = r.trans.EncodePeer(r.localAddr) + req.Leader = r.trans.EncodePeer(r.localID, r.localAddr) req.LeaderCommitIndex = r.getCommitIndex() if err := r.setPreviousLog(req, nextIndex); err != nil { return err diff --git a/vendor/github.com/hashicorp/raft/tag.sh b/vendor/github.com/hashicorp/raft/tag.sh new file mode 100755 index 000000000..cd16623a7 --- /dev/null +++ b/vendor/github.com/hashicorp/raft/tag.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -e + +# The version must be supplied from the environment. Do not include the +# leading "v". +if [ -z $VERSION ]; then + echo "Please specify a version." + exit 1 +fi + +# Generate the tag. +echo "==> Tagging version $VERSION..." +git commit --allow-empty -a --gpg-sign=348FFC4C -m "Release v$VERSION" +git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" master + +exit 0 diff --git a/vendor/github.com/hashicorp/raft/tcp_transport.go b/vendor/github.com/hashicorp/raft/tcp_transport.go index 9281508a0..29b2740f6 100644 --- a/vendor/github.com/hashicorp/raft/tcp_transport.go +++ b/vendor/github.com/hashicorp/raft/tcp_transport.go @@ -28,7 +28,7 @@ func NewTCPTransport( timeout time.Duration, logOutput io.Writer, ) (*NetworkTransport, error) { - return newTCPTransport(bindAddr, advertise, maxPool, timeout, func(stream StreamLayer) *NetworkTransport { + return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { return NewNetworkTransport(stream, maxPool, timeout, logOutput) }) } @@ -42,15 +42,26 @@ func NewTCPTransportWithLogger( timeout time.Duration, logger *log.Logger, ) (*NetworkTransport, error) { - return newTCPTransport(bindAddr, advertise, maxPool, timeout, func(stream StreamLayer) *NetworkTransport { + return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { return NewNetworkTransportWithLogger(stream, maxPool, timeout, logger) }) } +// NewTCPTransportWithLogger returns a NetworkTransport that is built on top of +// a TCP streaming transport layer, using a default logger and the address provider +func NewTCPTransportWithConfig( + bindAddr string, + advertise net.Addr, + config *NetworkTransportConfig, +) (*NetworkTransport, error) { + return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { + config.Stream = stream + return NewNetworkTransportWithConfig(config) + }) +} + func newTCPTransport(bindAddr string, advertise net.Addr, - maxPool int, - timeout time.Duration, transportCreator func(stream StreamLayer) *NetworkTransport) (*NetworkTransport, error) { // Try to bind list, err := net.Listen("tcp", bindAddr) diff --git a/vendor/github.com/hashicorp/raft/transport.go b/vendor/github.com/hashicorp/raft/transport.go index 633f97a8c..85459b221 100644 --- a/vendor/github.com/hashicorp/raft/transport.go +++ b/vendor/github.com/hashicorp/raft/transport.go @@ -35,20 +35,20 @@ type Transport interface { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. - AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) + AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) // AppendEntries sends the appropriate RPC to the target node. - AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error + AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error // RequestVote sends the appropriate RPC to the target node. - RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error + RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error // InstallSnapshot is used to push a snapshot down to a follower. The data is read from // the ReadCloser and streamed to the client. - InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error + InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error // EncodePeer is used to serialize a peer's address. - EncodePeer(ServerAddress) []byte + EncodePeer(id ServerID, addr ServerAddress) []byte // DecodePeer is used to deserialize a peer's address. DecodePeer([]byte) ServerAddress diff --git a/vendor/vendor.json b/vendor/vendor.json index c1f64dda1..2af79a486 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -162,7 +162,7 @@ {"path":"github.com/hashicorp/logutils","revision":"0dc08b1671f34c4250ce212759ebd880f743d883"}, {"path":"github.com/hashicorp/memberlist","checksumSHA1":"1zk7IeGClUqBo+Phsx89p7fQ/rQ=","revision":"23ad4b7d7b38496cd64c241dfd4c60b7794c254a","revisionTime":"2017-02-08T21:15:06Z"}, {"path":"github.com/hashicorp/net-rpc-msgpackrpc","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3"}, - {"path":"github.com/hashicorp/raft","checksumSHA1":"ecpaHOImbL/NaivWrUDUUe5461E=","revision":"3a6f3bdfe4fc69e300c6d122b1a92051af6f0b95","revisionTime":"2017-08-07T22:22:24Z"}, + {"path":"github.com/hashicorp/raft","checksumSHA1":"zkA9uvbj1BdlveyqXpVTh1N6ers=","revision":"077966dbc90f342107eb723ec52fdb0463ec789b","revisionTime":"2018-01-17T20:29:25Z","version":"=master","versionExact":"master"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"/oss17GO4hXGM7QnUdI3VzcAHzA=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"}, {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"pvLOzocYsZtxuJ9pREHRTxYnoa4=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"},