update hashicorp dependencies

This commit is contained in:
Alex Dadgar
2016-02-15 17:38:08 -08:00
parent 6d7ef819da
commit 7a19f4de34
16 changed files with 341 additions and 105 deletions

28
Godeps/Godeps.json generated
View File

@@ -138,13 +138,13 @@
},
{
"ImportPath": "github.com/hashicorp/consul/api",
"Comment": "v0.6.3-28-g3215b87",
"Rev": "3215b8727f44c778dd7045dcfd5ac42735c581a9"
"Comment": "v0.6.3-119-g0562b95",
"Rev": "0562b95a551568e8dd2f93af6c60982f1a252a3a"
},
{
"ImportPath": "github.com/hashicorp/consul/tlsutil",
"Comment": "v0.6.3-28-g3215b87",
"Rev": "3215b8727f44c778dd7045dcfd5ac42735c581a9"
"Comment": "v0.6.3-119-g0562b95",
"Rev": "0562b95a551568e8dd2f93af6c60982f1a252a3a"
},
{
"ImportPath": "github.com/hashicorp/errwrap",
@@ -160,15 +160,15 @@
},
{
"ImportPath": "github.com/hashicorp/go-getter",
"Rev": "c5e245982bdb4708f89578c8e0054d82b5197401"
"Rev": "848242c76c346ef0aeb34787753b068f5f6f92fe"
},
{
"ImportPath": "github.com/hashicorp/go-immutable-radix",
"Rev": "aca1bd0689e10884f20d114aff148ddb849ece80"
"Rev": "12e90058b2897552deea141eff51bb7a07a09e63"
},
{
"ImportPath": "github.com/hashicorp/go-memdb",
"Rev": "31949d523ade8a236956c6f1761e9dcf902d1638"
"Rev": "e16093a4c7dd00f7ce4c2452ded2c7e37d8df8be"
},
{
"ImportPath": "github.com/hashicorp/go-msgpack/codec",
@@ -188,11 +188,11 @@
},
{
"ImportPath": "github.com/hashicorp/go-version",
"Rev": "7e3c02b30806fa5779d3bdfc152ce4c6f40e7b38"
"Rev": "2e7f5ea8e27bb3fdf9baa0881d16757ac4637332"
},
{
"ImportPath": "github.com/hashicorp/hcl",
"Rev": "578dd9746824a54637686b51a41bad457a56bcef"
"Rev": "1c284ec98f4b398443cbabb0d9197f7f4cc0077c"
},
{
"ImportPath": "github.com/hashicorp/logutils",
@@ -208,7 +208,7 @@
},
{
"ImportPath": "github.com/hashicorp/raft",
"Rev": "d136cd15dfb7876fd7c89cad1995bc4f19ceb294"
"Rev": "057b893fd996696719e98b6c44649ea14968c811"
},
{
"ImportPath": "github.com/hashicorp/raft-boltdb",
@@ -220,13 +220,13 @@
},
{
"ImportPath": "github.com/hashicorp/serf/coordinate",
"Comment": "v0.7.0-12-ge4ec8cc",
"Rev": "e4ec8cc423bbe20d26584b96efbeb9102e16d05f"
"Comment": "v0.7.0-18-gc4c55f1",
"Rev": "c4c55f16bae1aed9b355ad655d3ebf0215734461"
},
{
"ImportPath": "github.com/hashicorp/serf/serf",
"Comment": "v0.7.0-12-ge4ec8cc",
"Rev": "e4ec8cc423bbe20d26584b96efbeb9102e16d05f"
"Comment": "v0.7.0-18-gc4c55f1",
"Rev": "c4c55f16bae1aed9b355ad655d3ebf0215734461"
},
{
"ImportPath": "github.com/hashicorp/yamux",

View File

@@ -74,4 +74,5 @@ job "binstore-storagelocker" {
}
}
}
}
}

View File

@@ -107,3 +107,45 @@ is shown below:
The checksum query parameter is never sent to the backend protocol
implementation. It is used at a higher level by go-getter itself.
### Unarchiving
go-getter will automatically unarchive files into a file or directory
based on the extension of the file being requested (over any protocol).
This works for both file and directory downloads.
go-getter looks for an `archive` query parameter to specify the format of
the archive. If this isn't specified, go-getter will use the extension of
the path to see if it appears archived. Unarchiving can be explicitly
disabled by setting the `archive` query parameter to `false`.
The following archive formats are supported:
* `tar.gz` and `tgz`
* `tar.bz2` and `tbz2`
* `zip`
* `gz`
* `bz2`
For example, an example URL is shown below:
```
./foo.zip
```
This will automatically be inferred to be a ZIP file and will be extracted.
You can also be explicit about the archive type:
```
./some/other/path?archive=zip
```
And finally, you can disable archiving completely:
```
./some/path?archive=false
```
You can combine unarchiving with the other features of go-getter such
as checksumming. The special `archive` query parameter will be removed
from the URL before going to the final protocol downloader.

View File

@@ -13,6 +13,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
urlhelper "github.com/hashicorp/go-getter/helper/url"
@@ -47,6 +48,10 @@ type Client struct {
// If this is nil, then the default Detectors will be used.
Detectors []Detector
// Decompressors is the map of decompressors supported by this client.
// If this is nil, then the default value is the Decompressors global.
Decompressors map[string]Decompressor
// Getters is the map of protocols supported by this client. If this
// is nil, then the default Getters variable will be used.
Getters map[string]Getter
@@ -54,6 +59,15 @@ type Client struct {
// Get downloads the configured source to the destination.
func (c *Client) Get() error {
// Store this locally since there are cases we swap this
dir := c.Dir
// Default decompressor value
decompressors := c.Decompressors
if decompressors == nil {
decompressors = Decompressors
}
// Detect the URL. This is safe if it is already detected.
detectors := c.Detectors
if detectors == nil {
@@ -105,10 +119,61 @@ func (c *Client) Get() error {
"download not supported for scheme '%s'", force)
}
// We have magic query parameters that we use to signal different features
q := u.Query()
// Determine if we have an archive type
archiveV := q.Get("archive")
if archiveV != "" {
// Delete the paramter since it is a magic parameter we don't
// want to pass on to the Getter
q.Del("archive")
u.RawQuery = q.Encode()
// If we can parse the value as a bool and it is false, then
// set the archive to "-" which should never map to a decompressor
if b, err := strconv.ParseBool(archiveV); err == nil && !b {
archiveV = "-"
}
}
if archiveV == "" {
// We don't appear to... but is it part of the filename?
matchingLen := 0
for k, _ := range decompressors {
if strings.HasSuffix(u.Path, k) && len(k) > matchingLen {
archiveV = k
matchingLen = len(k)
}
}
}
// If we have a decompressor, then we need to change the destination
// to download to a temporary path. We unarchive this into the final,
// real path.
var decompressDst string
var decompressDir bool
decompressor := decompressors[archiveV]
if decompressor != nil {
// Create a temporary directory to store our archive. We delete
// this at the end of everything.
td, err := ioutil.TempDir("", "getter")
if err != nil {
return fmt.Errorf(
"Error creating temporary directory for archive: %s", err)
}
defer os.RemoveAll(td)
// Swap the download directory to be our temporary path and
// store the old values.
decompressDst = dst
decompressDir = dir
dst = filepath.Join(td, "archive")
dir = false
}
// Determine if we have a checksum
var checksumHash hash.Hash
var checksumValue []byte
q := u.Query()
if v := q.Get("checksum"); v != "" {
// Delete the query parameter if we have it.
q.Del("checksum")
@@ -116,7 +181,7 @@ func (c *Client) Get() error {
// If we're getting a directory, then this is an error. You cannot
// checksum a directory. TODO: test
if c.Dir {
if dir {
return fmt.Errorf(
"checksum cannot be specified for directory download")
}
@@ -153,25 +218,51 @@ func (c *Client) Get() error {
// If we're not downloading a directory, then just download the file
// and return.
if !c.Dir {
if !dir {
err := g.GetFile(dst, u)
if err != nil {
return err
}
if checksumHash != nil {
return checksum(dst, checksumHash, checksumValue)
if err := checksum(dst, checksumHash, checksumValue); err != nil {
return err
}
}
return nil
if decompressor != nil {
// We have a decompressor, so decompress the current destination
// into the final destination with the proper mode.
err := decompressor.Decompress(decompressDst, dst, decompressDir)
if err != nil {
return err
}
// Swap the information back
dst = decompressDst
dir = decompressDir
}
// We check the dir value again because it can be switched back
// if we were unarchiving. If we're still only Get-ing a file, then
// we're done.
if !dir {
return nil
}
}
// We're downloading a directory, which might require a bit more work
// if we're specifying a subdir.
err = g.Get(dst, u)
if err != nil {
err = fmt.Errorf("error downloading '%s': %s", src, err)
return err
// If we're at this point we're either downloading a directory or we've
// downloaded and unarchived a directory and we're just checking subdir.
// In the case we have a decompressor we don't Get because it was Get
// above.
if decompressor == nil {
// We're downloading a directory, which might require a bit more work
// if we're specifying a subdir.
err := g.Get(dst, u)
if err != nil {
err = fmt.Errorf("error downloading '%s': %s", src, err)
return err
}
}
// If we have a subdir, copy that over

View File

@@ -1,6 +1,19 @@
package iradix
import "bytes"
import (
"bytes"
"github.com/hashicorp/golang-lru/simplelru"
)
const (
// defaultModifiedCache is the default size of the modified node
// cache used per transaction. This is used to cache the updates
// to the nodes near the root, while the leaves do not need to be
// cached. This is important for very large transactions to prevent
// the modified cache from growing to be enormous.
defaultModifiedCache = 8192
)
// Tree implements an immutable radix tree. This can be treated as a
// Dictionary abstract data type. The main advantage over a standard
@@ -29,7 +42,7 @@ func (t *Tree) Len() int {
type Txn struct {
root *Node
size int
modified map[*Node]struct{}
modified *simplelru.LRU
}
// Txn starts a new transaction that can be used to mutate the tree
@@ -41,18 +54,22 @@ func (t *Tree) Txn() *Txn {
return txn
}
// writeNode returns a ndoe to be modified, if the current
// writeNode returns a node to be modified, if the current
// node as already been modified during the course of
// the transaction, it is used in-place.
func (t *Txn) writeNode(n *Node) *Node {
// Ensure the modified set exists
if t.modified == nil {
t.modified = make(map[*Node]struct{})
lru, err := simplelru.NewLRU(defaultModifiedCache, nil)
if err != nil {
panic(err)
}
t.modified = lru
}
// If this node has already been modified, we can
// continue to use it during this transaction.
if _, ok := t.modified[n]; ok {
if _, ok := t.modified.Get(n); ok {
return n
}
@@ -72,7 +89,7 @@ func (t *Txn) writeNode(n *Node) *Node {
}
// Mark this node as modified
t.modified[nc] = struct{}{}
t.modified.Add(n, nil)
return nc
}

View File

@@ -84,8 +84,18 @@ func (n *Node) mergeChild() {
e := n.edges[0]
child := e.node
n.prefix = concat(n.prefix, child.prefix)
n.leaf = child.leaf
n.edges = child.edges
if child.leaf != nil {
n.leaf = new(leafNode)
*n.leaf = *child.leaf
} else {
n.leaf = nil
}
if len(child.edges) != 0 {
n.edges = make([]edge, len(child.edges))
copy(n.edges, child.edges)
} else {
n.edges = nil
}
}
func (n *Node) Get(k []byte) (interface{}, bool) {

View File

@@ -2,6 +2,8 @@ package memdb
import (
"sync"
"sync/atomic"
"unsafe"
"github.com/hashicorp/go-immutable-radix"
)
@@ -12,7 +14,7 @@ import (
// transactions and MVCC.
type MemDB struct {
schema *DBSchema
root *iradix.Tree
root unsafe.Pointer // *iradix.Tree underneath
// There can only be a single writter at once
writer sync.Mutex
@@ -28,7 +30,7 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) {
// Create the MemDB
db := &MemDB{
schema: schema,
root: iradix.New(),
root: unsafe.Pointer(iradix.New()),
}
if err := db.initialize(); err != nil {
return nil, err
@@ -36,6 +38,12 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) {
return db, nil
}
// getRoot is used to do an atomic load of the root pointer
func (db *MemDB) getRoot() *iradix.Tree {
root := (*iradix.Tree)(atomic.LoadPointer(&db.root))
return root
}
// Txn is used to start a new transaction, in either read or write mode.
// There can only be a single concurrent writer, but any number of readers.
func (db *MemDB) Txn(write bool) *Txn {
@@ -45,7 +53,7 @@ func (db *MemDB) Txn(write bool) *Txn {
txn := &Txn{
db: db,
write: write,
rootTxn: db.root.Txn(),
rootTxn: db.getRoot().Txn(),
}
return txn
}
@@ -56,20 +64,22 @@ func (db *MemDB) Txn(write bool) *Txn {
func (db *MemDB) Snapshot() *MemDB {
clone := &MemDB{
schema: db.schema,
root: db.root,
root: unsafe.Pointer(db.getRoot()),
}
return clone
}
// initialize is used to setup the DB for use after creation
func (db *MemDB) initialize() error {
root := db.getRoot()
for tName, tableSchema := range db.schema.Tables {
for iName, _ := range tableSchema.Indexes {
index := iradix.New()
path := indexPath(tName, iName)
db.root, _, _ = db.root.Insert(path, index)
root, _, _ = root.Insert(path, index)
}
}
db.root = unsafe.Pointer(root)
return nil
}

View File

@@ -3,6 +3,8 @@ package memdb
import (
"fmt"
"strings"
"sync/atomic"
"unsafe"
"github.com/hashicorp/go-immutable-radix"
)
@@ -10,6 +12,7 @@ import (
const (
id = "id"
)
// tableIndex is a tuple of (Table, Index) used for lookups
type tableIndex struct {
Table string
@@ -113,7 +116,8 @@ func (txn *Txn) Commit() {
}
// Update the root of the DB
txn.db.root = txn.rootTxn.Commit()
newRoot := txn.rootTxn.Commit()
atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot))
// Clear the txn
txn.rootTxn = nil
@@ -281,7 +285,7 @@ func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error)
// Do the deletes
num := 0
for _, obj := range(objs) {
for _, obj := range objs {
if err := txn.Delete(table, obj); err != nil {
return num, err
}

View File

@@ -14,7 +14,7 @@ var versionRegexp *regexp.Regexp
// The raw regular expression string used for testing the validity
// of a version.
const VersionRegexpRaw string = `([0-9]+(\.[0-9]+){0,2})` +
const VersionRegexpRaw string = `v?([0-9]+(\.[0-9]+){0,2})` +
`(-([0-9A-Za-z\-]+(\.[0-9A-Za-z\-]+)*))?` +
`(\+([0-9A-Za-z\-]+(\.[0-9A-Za-z\-]+)*))?` +
`?`

View File

@@ -64,6 +64,16 @@ of the syntax and grammar is listed here.
* Strings are double-quoted and can contain any UTF-8 characters.
Example: `"Hello, World"`
* Multi-line strings start with `<<EOF` at the end of a line, and end
with `EOF` on its own line ([here documents](https://en.wikipedia.org/wiki/Here_document)).
Any text may be used in place of `EOF`. Example:
```
<<FOO
hello
world
FOO
```
* Numbers are assumed to be base 10. If you prefix a number with 0x,
it is treated as a hexadecimal. If it is prefixed with 0, it is
treated as an octal. Numbers can be in scientific notation: "1e10".

View File

@@ -246,6 +246,11 @@ func (p *Parser) objectType() (*ast.ObjectType, error) {
return nil, err
}
// If there is no error, we should be at a RBRACE to end the object
if p.tok.Type != token.RBRACE {
return nil, fmt.Errorf("object expected closing RBRACE got: %s", p.tok.Type)
}
o.List = l
o.Rbrace = p.tok.Pos // advanced via parseObjectList
return o, nil

View File

@@ -419,10 +419,12 @@ func (p *printer) list(l *ast.ListType) []byte {
}
}
insertSpaceBeforeItem := false
for i, item := range l.List {
if item.Pos().Line != l.Lbrack.Line {
// multiline list, add newline before we add each item
buf.WriteByte(newline)
insertSpaceBeforeItem = false
// also indent each line
val := p.output(item)
curLen := len(val)
@@ -432,12 +434,8 @@ func (p *printer) list(l *ast.ListType) []byte {
if lit, ok := item.(*ast.LiteralType); ok && lit.LineComment != nil {
// if the next item doesn't have any comments, do not align
buf.WriteByte(blank) // align one space
if i != len(l.List)-1 {
if lit, ok := l.List[i+1].(*ast.LiteralType); ok && lit.LineComment != nil {
for i := 0; i < longestLine-curLen; i++ {
buf.WriteByte(blank)
}
}
for i := 0; i < longestLine-curLen; i++ {
buf.WriteByte(blank)
}
for _, comment := range lit.LineComment.List {
@@ -449,10 +447,14 @@ func (p *printer) list(l *ast.ListType) []byte {
buf.WriteByte(newline)
}
} else {
if insertSpaceBeforeItem {
buf.WriteByte(blank)
insertSpaceBeforeItem = false
}
buf.Write(p.output(item))
if i != len(l.List)-1 {
buf.WriteString(",")
buf.WriteByte(blank)
insertSpaceBeforeItem = true
}
}

View File

@@ -29,6 +29,10 @@ type AppendEntriesResponse struct {
// We may not succeed if we have a conflicting entry
Success bool
// There are scenarios where this request didn't succeed
// but there's no need to wait/back-off the next attempt.
NoRetryBackoff bool
}
// RequestVoteRequest is the command used by a candidate to ask a Raft peer

View File

@@ -690,7 +690,7 @@ func (r *Raft) runCandidate() {
// Check if the vote is granted
if vote.Granted {
grantedVotes++
r.logger.Printf("[DEBUG] raft: Vote granted. Tally: %d", grantedVotes)
r.logger.Printf("[DEBUG] raft: Vote granted from %s. Tally: %d", vote.voter, grantedVotes)
}
// Check if we've become the leader
@@ -755,6 +755,13 @@ func (r *Raft) runLeader() {
// Cleanup state on step down
defer func() {
// Since we were the leader previously, we update our
// last contact time when we step down, so that we are not
// reporting a last contact time from before we were the
// leader. Otherwise, to a client it would seem our data
// is extremely stale.
r.setLastContact()
// Stop replication
for _, p := range r.leaderState.replState {
close(p.stopCh)
@@ -792,6 +799,11 @@ func (r *Raft) runLeader() {
select {
case notify <- false:
case <-r.shutdownCh:
// On shutdown, make a best effort but do not block
select {
case notify <- false:
default:
}
}
}
}()
@@ -849,6 +861,14 @@ func (r *Raft) startReplication(peer string) {
// leaderLoop is the hot loop for a leader. It is invoked
// after all the various leader setup is done.
func (r *Raft) leaderLoop() {
// stepDown is used to track if there is an inflight log that
// would cause us to lose leadership (specifically a RemovePeer of
// ourselves). If this is the case, we must not allow any logs to
// be processed in parallel, otherwise we are basing commit on
// only a single peer (ourself) and replicating to an undefined set
// of peers.
stepDown := false
lease := time.After(r.conf.LeaderLeaseTimeout)
for r.getState() == Leader {
select {
@@ -908,13 +928,24 @@ func (r *Raft) leaderLoop() {
// Handle any peer set changes
n := len(ready)
for i := 0; i < n; i++ {
// Fail all future transactions once stepDown is on
if stepDown {
ready[i].respond(ErrNotLeader)
ready[i], ready[n-1] = ready[n-1], nil
n--
i--
continue
}
// Special case AddPeer and RemovePeer
log := ready[i]
if log.log.Type != LogAddPeer && log.log.Type != LogRemovePeer {
continue
}
// Check if this log should be ignored
// Check if this log should be ignored. The logs can be
// reordered here since we have not yet assigned an index
// and are not violating any promises.
if !r.preparePeerChange(log) {
ready[i], ready[n-1] = ready[n-1], nil
n--
@@ -922,8 +953,13 @@ func (r *Raft) leaderLoop() {
continue
}
// Apply peer set changes early
r.processLog(&log.log, nil, true)
// Apply peer set changes early and check if we will step
// down after the commit of this log. If so, we must not
// allow any future entries to make progress to avoid undefined
// behavior.
if ok := r.processLog(&log.log, nil, true); ok {
stepDown = true
}
}
// Nothing to do if all logs are invalid
@@ -1129,7 +1165,8 @@ func (r *Raft) processLogs(index uint64, future *logFuture) {
}
// processLog is invoked to process the application of a single committed log.
func (r *Raft) processLog(l *Log, future *logFuture, precommit bool) {
// Returns if this log entry would cause us to stepDown after it commits.
func (r *Raft) processLog(l *Log, future *logFuture, precommit bool) (stepDown bool) {
switch l.Type {
case LogBarrier:
// Barrier is handled by the FSM
@@ -1158,8 +1195,18 @@ func (r *Raft) processLog(l *Log, future *logFuture, precommit bool) {
// If the peer set does not include us, remove all other peers
removeSelf := !PeerContained(peers, r.localAddr) && l.Type == LogRemovePeer
if removeSelf {
r.peers = nil
r.peerStore.SetPeers([]string{r.localAddr})
// Mark that this operation will cause us to step down as
// leader. This prevents the future logs from being Applied
// from this leader.
stepDown = true
// We only modify the peers after the commit, otherwise we
// would be using a quorum size of 1 for the RemovePeer operation.
// This is used with the stepDown guard to prevent any other logs.
if !precommit {
r.peers = nil
r.peerStore.SetPeers([]string{r.localAddr})
}
} else {
r.peers = ExcludePeer(peers, r.localAddr)
r.peerStore.SetPeers(peers)
@@ -1214,6 +1261,7 @@ func (r *Raft) processLog(l *Log, future *logFuture, precommit bool) {
if future != nil && !precommit {
future.respond(nil)
}
return
}
// processRPC is called to handle an incoming RPC request.
@@ -1258,9 +1306,10 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "appendEntries"}, time.Now())
// Setup a response
resp := &AppendEntriesResponse{
Term: r.getCurrentTerm(),
LastLog: r.getLastIndex(),
Success: false,
Term: r.getCurrentTerm(),
LastLog: r.getLastIndex(),
Success: false,
NoRetryBackoff: false,
}
var rpcErr error
defer func() {
@@ -1297,6 +1346,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil {
r.logger.Printf("[WARN] raft: Failed to get previous log: %d %v (last: %d)",
a.PrevLogEntry, err, lastIdx)
resp.NoRetryBackoff = true
return
}
prevLogTerm = prevLog.Term
@@ -1305,6 +1355,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
if a.PrevLogTerm != prevLogTerm {
r.logger.Printf("[WARN] raft: Previous log term mis-match: ours: %d remote: %d",
prevLogTerm, a.PrevLogTerm)
resp.NoRetryBackoff = true
return
}
}
@@ -1348,9 +1399,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// Everything went well, set success
resp.Success = true
r.lastContactLock.Lock()
r.lastContact = time.Now()
r.lastContactLock.Unlock()
r.setLastContact()
return
}
@@ -1368,10 +1417,11 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
rpc.Respond(resp, rpcErr)
}()
// Check if we have an existing leader
if leader := r.Leader(); leader != "" {
r.logger.Printf("[WARN] raft: Rejecting vote from %v since we have a leader: %v",
r.trans.DecodePeer(req.Candidate), leader)
// Check if we have an existing leader [who's not the candidate]
candidate := r.trans.DecodePeer(req.Candidate)
if leader := r.Leader(); leader != "" && leader != candidate {
r.logger.Printf("[WARN] raft: Rejecting vote request from %v since we have a leader: %v",
candidate, leader)
return
}
@@ -1413,14 +1463,14 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
// Reject if their term is older
lastIdx, lastTerm := r.getLastEntry()
if lastTerm > req.LastLogTerm {
r.logger.Printf("[WARN] raft: Rejecting vote from %v since our last term is greater (%d, %d)",
r.trans.DecodePeer(req.Candidate), lastTerm, req.LastLogTerm)
r.logger.Printf("[WARN] raft: Rejecting vote request from %v since our last term is greater (%d, %d)",
candidate, lastTerm, req.LastLogTerm)
return
}
if lastIdx > req.LastLogIndex {
r.logger.Printf("[WARN] raft: Rejecting vote from %v since our last index is greater (%d, %d)",
r.trans.DecodePeer(req.Candidate), lastIdx, req.LastLogIndex)
r.logger.Printf("[WARN] raft: Rejecting vote request from %v since our last index is greater (%d, %d)",
candidate, lastIdx, req.LastLogIndex)
return
}
@@ -1534,19 +1584,29 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.logger.Printf("[INFO] raft: Installed remote snapshot")
resp.Success = true
r.setLastContact()
return
}
// setLastContact is used to set the last contact time to now
func (r *Raft) setLastContact() {
r.lastContactLock.Lock()
r.lastContact = time.Now()
r.lastContactLock.Unlock()
return
}
type voteResult struct {
RequestVoteResponse
voter string
}
// electSelf is used to send a RequestVote RPC to all peers,
// and vote for ourself. This has the side affecting of incrementing
// the current term. The response channel returned is used to wait
// for all the responses (including a vote for ourself).
func (r *Raft) electSelf() <-chan *RequestVoteResponse {
func (r *Raft) electSelf() <-chan *voteResult {
// Create a response channel
respCh := make(chan *RequestVoteResponse, len(r.peers)+1)
respCh := make(chan *voteResult, len(r.peers)+1)
// Increment the term
r.setCurrentTerm(r.getCurrentTerm() + 1)
@@ -1564,8 +1624,8 @@ func (r *Raft) electSelf() <-chan *RequestVoteResponse {
askPeer := func(peer string) {
r.goFunc(func() {
defer metrics.MeasureSince([]string{"raft", "candidate", "electSelf"}, time.Now())
resp := new(RequestVoteResponse)
err := r.trans.RequestVote(peer, req, resp)
resp := &voteResult{voter: peer}
err := r.trans.RequestVote(peer, req, &resp.RequestVoteResponse)
if err != nil {
r.logger.Printf("[ERR] raft: Failed to make RequestVote RPC to %v: %v", peer, err)
resp.Term = req.Term
@@ -1599,9 +1659,12 @@ func (r *Raft) electSelf() <-chan *RequestVoteResponse {
}
// Include our own vote
respCh <- &RequestVoteResponse{
Term: req.Term,
Granted: true,
respCh <- &voteResult{
RequestVoteResponse: RequestVoteResponse{
Term: req.Term,
Granted: true,
},
voter: r.localAddr,
}
return respCh
}

View File

@@ -181,7 +181,11 @@ START:
} else {
s.nextIndex = max(min(s.nextIndex-1, resp.LastLog+1), 1)
s.matchIndex = s.nextIndex - 1
s.failures++
if resp.NoRetryBackoff {
s.failures = 0
} else {
s.failures++
}
r.logger.Printf("[WARN] raft: AppendEntries to %v rejected, sending older logs (next: %d)", s.peer, s.nextIndex)
}

View File

@@ -1,27 +0,0 @@
package coordinate
import (
"math"
"testing"
)
// verifyEqualFloats will compare f1 and f2 and fail if they are not
// "equal" within a threshold.
func verifyEqualFloats(t *testing.T, f1 float64, f2 float64) {
const zeroThreshold = 1.0e-6
if math.Abs(f1-f2) > zeroThreshold {
t.Fatalf("equal assertion fail, %9.6f != %9.6f", f1, f2)
}
}
// verifyEqualVectors will compare vec1 and vec2 and fail if they are not
// "equal" within a threshold.
func verifyEqualVectors(t *testing.T, vec1 []float64, vec2 []float64) {
if len(vec1) != len(vec2) {
t.Fatalf("vector length mismatch, %d != %d", len(vec1), len(vec2))
}
for i, _ := range vec1 {
verifyEqualFloats(t, vec1[i], vec2[i])
}
}