Merge pull request #1301 from hashicorp/b-update-hashicorp-raft

Update various `vendor/`ed libs with known good fixes.
This commit is contained in:
Sean Chittenden
2016-06-16 16:46:45 -07:00
committed by GitHub
33 changed files with 880 additions and 356 deletions

View File

@@ -1,22 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe

View File

@@ -1,3 +0,0 @@
language: go
go:
- tip

View File

@@ -1,24 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
*.prof

View File

@@ -1,3 +0,0 @@
language: go
go:
- tip

View File

@@ -89,7 +89,7 @@ func (t *Txn) writeNode(n *Node) *Node {
}
// Mark this node as modified
t.modified.Add(n, nil)
t.modified.Add(nc, nil)
return nc
}

View File

@@ -1,24 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
*.prof

View File

@@ -291,10 +291,6 @@ func (c *CompoundIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != len(c.Indexes) {
return nil, fmt.Errorf("less arguments than index fields")
}
return c.PrefixFromArgs(args...)
}
func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
var out []byte
for i, arg := range args {
val, err := c.Indexes[i].FromArgs(arg)
@@ -305,3 +301,30 @@ func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
}
return out, nil
}
func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
if len(args) > len(c.Indexes) {
return nil, fmt.Errorf("more arguments than index fields")
}
var out []byte
for i, arg := range args {
if i+1 < len(args) {
val, err := c.Indexes[i].FromArgs(arg)
if err != nil {
return nil, fmt.Errorf("sub-index %d error: %v", i, err)
}
out = append(out, val...)
} else {
prefixIndexer, ok := c.Indexes[i].(PrefixIndexer)
if !ok {
return nil, fmt.Errorf("sub-index %d does not support prefix scanning", i)
}
val, err := prefixIndexer.PrefixFromArgs(arg)
if err != nil {
return nil, fmt.Errorf("sub-index %d error: %v", i, err)
}
out = append(out, val...)
}
}
return out, nil
}

View File

@@ -334,6 +334,39 @@ func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, er
return value, nil
}
// LongestPrefix is used to fetch the longest prefix match for the given
// constraints on the index. Note that this will not work with the memdb
// StringFieldIndex because it adds null terminators which prevent the
// algorithm from correctly finding a match (it will get to right before the
// null and fail to find a leaf node). This should only be used where the prefix
// given is capable of matching indexed entries directly, which typically only
// applies to a custom indexer. See the unit test for an example.
func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interface{}, error) {
// Enforce that this only works on prefix indexes.
if !strings.HasSuffix(index, "_prefix") {
return nil, fmt.Errorf("must use '%s_prefix' on index", index)
}
// Get the index value.
indexSchema, val, err := txn.getIndexValue(table, index, args...)
if err != nil {
return nil, err
}
// This algorithm only makes sense against a unique index, otherwise the
// index keys will have the IDs appended to them.
if !indexSchema.Unique {
return nil, fmt.Errorf("index '%s' is not unique", index)
}
// Find the longest prefix match with the given index.
indexTxn := txn.readableIndex(table, indexSchema.Name)
if _, value, ok := indexTxn.Root().LongestPrefix(val); ok {
return value, nil
}
return nil, nil
}
// getIndexValue is used to get the IndexSchema and the value
// used to scan the index given the parameters. This handles prefix based
// scans when the index has the "_prefix" suffix. The index must support

View File

@@ -1,25 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
.vagrant/

View File

@@ -82,7 +82,7 @@ least one existing member in order to join the cluster. The new member
does a full state sync with the existing member over TCP and begins gossiping its
existence to the cluster.
Gossip is done over UDP to a with a configurable but fixed fanout and interval.
Gossip is done over UDP with a configurable but fixed fanout and interval.
This ensures that network usage is constant with regards to number of nodes, as opposed to
exponential growth that can occur with traditional heartbeat mechanisms.
Complete state exchanges with a random node are done periodically over

69
vendor/github.com/hashicorp/memberlist/awareness.go generated vendored Normal file
View File

@@ -0,0 +1,69 @@
package memberlist
import (
"sync"
"time"
"github.com/armon/go-metrics"
)
// awareness manages a simple metric for tracking the estimated health of the
// local node. Health is primary the node's ability to respond in the soft
// real-time manner required for correct health checking of other nodes in the
// cluster.
type awareness struct {
sync.RWMutex
// max is the upper threshold for the timeout scale (the score will be
// constrained to be from 0 <= score < max).
max int
// score is the current awareness score. Lower values are healthier and
// zero is the minimum value.
score int
}
// newAwareness returns a new awareness object.
func newAwareness(max int) *awareness {
return &awareness{
max: max,
score: 0,
}
}
// ApplyDelta takes the given delta and applies it to the score in a thread-safe
// manner. It also enforces a floor of zero and a max of max, so deltas may not
// change the overall score if it's railed at one of the extremes.
func (a *awareness) ApplyDelta(delta int) {
a.Lock()
initial := a.score
a.score += delta
if a.score < 0 {
a.score = 0
} else if a.score > (a.max - 1) {
a.score = (a.max - 1)
}
final := a.score
a.Unlock()
if initial != final {
metrics.SetGauge([]string{"memberlist", "health", "score"}, float32(final))
}
}
// GetHealthScore returns the raw health score.
func (a *awareness) GetHealthScore() int {
a.RLock()
score := a.score
a.RUnlock()
return score
}
// ScaleTimeout takes the given duration and scales it based on the current
// score. Less healthyness will lead to longer timeouts.
func (a *awareness) ScaleTimeout(timeout time.Duration) time.Duration {
a.RLock()
score := a.score
a.RUnlock()
return timeout * (time.Duration(score) + 1)
}

View File

@@ -63,6 +63,23 @@ type Config struct {
// still alive.
SuspicionMult int
// SuspicionMaxTimeoutMult is the multiplier applied to the
// SuspicionTimeout used as an upper bound on detection time. This max
// timeout is calculated using the formula:
//
// SuspicionMaxTimeout = SuspicionMaxTimeoutMult * SuspicionTimeout
//
// If everything is working properly, confirmations from other nodes will
// accelerate suspicion timers in a manner which will cause the timeout
// to reach the base SuspicionTimeout before that elapses, so this value
// will typically only come into play if a node is experiencing issues
// communicating with other nodes. It should be set to a something fairly
// large so that a node having problems will have a lot of chances to
// recover before falsely declaring other nodes as failed, but short
// enough for a legitimately isolated node to still make progress marking
// nodes failed in a reasonable amount of time.
SuspicionMaxTimeoutMult int
// PushPullInterval is the interval between complete state syncs.
// Complete state syncs are done with a single node over TCP and are
// quite expensive relative to standard gossiped messages. Setting this
@@ -91,6 +108,11 @@ type Config struct {
// indirect UDP pings.
DisableTcpPings bool
// AwarenessMaxMultiplier will increase the probe interval if the node
// becomes aware that it might be degraded and not meeting the soft real
// time requirements to reliably probe other nodes.
AwarenessMaxMultiplier int
// GossipInterval and GossipNodes are used to configure the gossip
// behavior of memberlist.
//
@@ -168,20 +190,22 @@ type Config struct {
func DefaultLANConfig() *Config {
hostname, _ := os.Hostname()
return &Config{
Name: hostname,
BindAddr: "0.0.0.0",
BindPort: 7946,
AdvertiseAddr: "",
AdvertisePort: 7946,
ProtocolVersion: ProtocolVersion2Compatible,
TCPTimeout: 10 * time.Second, // Timeout after 10 seconds
IndirectChecks: 3, // Use 3 nodes for the indirect ping
RetransmitMult: 4, // Retransmit a message 4 * log(N+1) nodes
SuspicionMult: 5, // Suspect a node for 5 * log(N+1) * Interval
PushPullInterval: 30 * time.Second, // Low frequency
ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN
ProbeInterval: 1 * time.Second, // Failure check every second
DisableTcpPings: false, // TCP pings are safe, even with mixed versions
Name: hostname,
BindAddr: "0.0.0.0",
BindPort: 7946,
AdvertiseAddr: "",
AdvertisePort: 7946,
ProtocolVersion: ProtocolVersion2Compatible,
TCPTimeout: 10 * time.Second, // Timeout after 10 seconds
IndirectChecks: 3, // Use 3 nodes for the indirect ping
RetransmitMult: 4, // Retransmit a message 4 * log(N+1) nodes
SuspicionMult: 5, // Suspect a node for 5 * log(N+1) * Interval
SuspicionMaxTimeoutMult: 6, // For 10k nodes this will give a max timeout of 120 seconds
PushPullInterval: 30 * time.Second, // Low frequency
ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN
ProbeInterval: 1 * time.Second, // Failure check every second
DisableTcpPings: false, // TCP pings are safe, even with mixed versions
AwarenessMaxMultiplier: 8, // Probe interval backs off to 8 seconds
GossipNodes: 3, // Gossip to 3 nodes
GossipInterval: 200 * time.Millisecond, // Gossip more rapidly

View File

@@ -43,9 +43,11 @@ type Memberlist struct {
tcpListener *net.TCPListener
handoff chan msgHandoff
nodeLock sync.RWMutex
nodes []*nodeState // Known nodes
nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState
nodeLock sync.RWMutex
nodes []*nodeState // Known nodes
nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState
nodeTimers map[string]*suspicion // Maps Addr.String() -> suspicion timer
awareness *awareness
tickerLock sync.Mutex
tickers []*time.Ticker
@@ -61,7 +63,7 @@ type Memberlist struct {
}
// newMemberlist creates the network listeners.
// Does not schedule execution of background maintenence.
// Does not schedule execution of background maintenance.
func newMemberlist(conf *Config) (*Memberlist, error) {
if conf.ProtocolVersion < ProtocolVersionMin {
return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
@@ -129,6 +131,8 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
tcpListener: tcpLn,
handoff: make(chan msgHandoff, 1024),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
ackHandlers: make(map[uint32]*ackHandler),
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
logger: logger,
@@ -625,6 +629,13 @@ func (m *Memberlist) anyAlive() bool {
return false
}
// GetHealthScore gives this instance's idea of how well it is meeting the soft
// real-time requirements of the protocol. Lower numbers are better, and zero
// means "totally healthy".
func (m *Memberlist) GetHealthScore() int {
return m.awareness.GetHealthScore()
}
// ProtocolVersion returns the protocol version currently in use by
// this memberlist.
func (m *Memberlist) ProtocolVersion() uint8 {

View File

@@ -24,9 +24,15 @@ const (
// A memberlist speaking version 2 of the protocol will attempt
// to TCP ping another memberlist who understands version 3 or
// greater.
//
// Version 4 added support for nacks as part of indirect probes.
// A memberlist speaking version 2 of the protocol will expect
// nacks from another memberlist who understands version 4 or
// greater, and likewise nacks will be sent to memberlists who
// understand version 4 or greater.
ProtocolVersion2Compatible = 2
ProtocolVersionMax = 3
ProtocolVersionMax = 4
)
// messageType is an integer ID of a type of message that can be received
@@ -46,6 +52,7 @@ const (
userMsg // User mesg, not handled by us
compressMsg
encryptMsg
nackRespMsg
)
// compressionType is used to specify the compression algorithm
@@ -83,6 +90,7 @@ type indirectPingReq struct {
Target []byte
Port uint16
Node string
Nack bool // true if we'd like a nack back
}
// ack response is sent for a ping
@@ -91,6 +99,13 @@ type ackResp struct {
Payload []byte
}
// nack response is sent for an indirect ping when the pinger doesn't hear from
// the ping-ee within the configured timeout. This lets the original node know
// that the indirect ping attempt happened but didn't succeed.
type nackResp struct {
SeqNo uint32
}
// suspect is broadcast when we suspect a node is dead
type suspect struct {
Incarnation uint32
@@ -121,7 +136,7 @@ type dead struct {
}
// pushPullHeader is used to inform the
// otherside how many states we are transfering
// otherside how many states we are transferring
type pushPullHeader struct {
Nodes int
UserStateLen int // Encodes the byte lengh of user state
@@ -134,7 +149,7 @@ type userMsgHeader struct {
}
// pushNodeState is used for pushPullReq when we are
// transfering out node states
// transferring out node states
type pushNodeState struct {
Name string
Addr []byte
@@ -343,6 +358,8 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
m.handleIndirectPing(buf, from)
case ackRespMsg:
m.handleAck(buf, from, timestamp)
case nackRespMsg:
m.handleNack(buf, from)
case suspectMsg:
fallthrough
@@ -440,18 +457,23 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
}
// For proto versions < 2, there is no port provided. Mask old
// behavior by using the configured port
// behavior by using the configured port.
if m.ProtocolVersion() < 2 || ind.Port == 0 {
ind.Port = uint16(m.config.BindPort)
}
// Send a ping to the correct host
// Send a ping to the correct host.
localSeqNo := m.nextSeqNo()
ping := ping{SeqNo: localSeqNo, Node: ind.Node}
destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)}
// Setup a response handler to relay the ack
cancelCh := make(chan struct{})
respHandler := func(payload []byte, timestamp time.Time) {
// Try to prevent the nack if we've caught it in time.
close(cancelCh)
// Forward the ack back to the requestor.
ack := ackResp{ind.SeqNo, nil}
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from))
@@ -459,10 +481,25 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
}
m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
// Send the ping
// Send the ping.
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from))
}
// Setup a timer to fire off a nack if no ack is seen in time.
if ind.Nack {
go func() {
select {
case <-cancelCh:
return
case <-time.After(m.config.ProbeTimeout):
nack := nackResp{ind.SeqNo}
if err := m.encodeAndSendMsg(from, nackRespMsg, &nack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from))
}
}
}()
}
}
func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) {
@@ -474,6 +511,15 @@ func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) {
m.invokeAckHandler(ack, timestamp)
}
func (m *Memberlist) handleNack(buf []byte, from net.Addr) {
var nack nackResp
if err := decode(buf, &nack); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to decode nack response: %s %s", err, LogAddress(from))
return
}
m.invokeNackHandler(nack)
}
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
var sus suspect
if err := decode(buf, &sus); err != nil {

View File

@@ -42,10 +42,11 @@ type nodeState struct {
StateChange time.Time // Time last state change happened
}
// ackHandler is used to register handlers for incoming acks
// ackHandler is used to register handlers for incoming acks and nacks.
type ackHandler struct {
handler func([]byte, time.Time)
timer *time.Timer
ackFn func([]byte, time.Time)
nackFn func()
timer *time.Timer
}
// NoPingResponseError is used to indicate a 'ping' packet was
@@ -148,7 +149,7 @@ func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
}
}
// Deschedule is used to stop the background maintenence. This is safe
// Deschedule is used to stop the background maintenance. This is safe
// to call multiple times.
func (m *Memberlist) deschedule() {
m.tickerLock.Lock()
@@ -219,17 +220,51 @@ START:
func (m *Memberlist) probeNode(node *nodeState) {
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
// We use our health awareness to scale the overall probe interval, so we
// slow down if we detect problems. The ticker that calls us can handle
// us running over the base interval, and will skip missed ticks.
probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
if probeInterval > m.config.ProbeInterval {
metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
}
// Prepare a ping message and setup an ack handler.
ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name}
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
nackCh := make(chan struct{}, m.config.IndirectChecks+1)
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
// Send a ping to the node.
deadline := time.Now().Add(m.config.ProbeInterval)
// Send a ping to the node. If this node looks like it's suspect or dead,
// also tack on a suspect message so that it has a chance to refute as
// soon as possible.
deadline := time.Now().Add(probeInterval)
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
return
if node.State == stateAlive {
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
return
}
} else {
var msgs [][]byte
if buf, err := encode(pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err)
return
} else {
msgs = append(msgs, buf.Bytes())
}
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
if buf, err := encode(suspectMsg, &s); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err)
return
} else {
msgs = append(msgs, buf.Bytes())
}
compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", destAddr, err)
return
}
}
// Mark the sent time here, which should be after any pre-processing and
@@ -237,6 +272,16 @@ func (m *Memberlist) probeNode(node *nodeState) {
// but it's the best we can do.
sent := time.Now()
// Arrange for our self-awareness to get updated. At this point we've
// sent the ping, so any return statement means the probe succeeded
// which will improve our health until we get to the failure scenarios
// at the end of this function, which will alter this delta variable
// accordingly.
awarenessDelta := -1
defer func() {
m.awareness.ApplyDelta(awarenessDelta)
}()
// Wait for response or round-trip-time.
select {
case v := <-ackCh:
@@ -254,6 +299,12 @@ func (m *Memberlist) probeNode(node *nodeState) {
ackCh <- v
}
case <-time.After(m.config.ProbeTimeout):
// Note that we don't scale this timeout based on awareness and
// the health score. That's because we don't really expect waiting
// longer to help get UDP through. Since health does extend the
// probe interval it will give the TCP fallback more time, which
// is more active in dealing with lost packets, and it gives more
// time to wait for indirect acks/nacks.
m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name)
}
@@ -264,8 +315,15 @@ func (m *Memberlist) probeNode(node *nodeState) {
m.nodeLock.RUnlock()
// Attempt an indirect ping.
expectedNacks := 0
ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
for _, peer := range kNodes {
// We only expect nack to be sent from peers who understand
// version 4 of the protocol.
if ind.Nack = peer.PMax >= 4; ind.Nack {
expectedNacks++
}
destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)}
if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
@@ -319,7 +377,23 @@ func (m *Memberlist) probeNode(node *nodeState) {
}
}
// No acks received from target, suspect
// Update our self-awareness based on the results of this failed probe.
// If we don't have peers who will send nacks then we penalize for any
// failed probe as a simple health metric. If we do have peers to nack
// verify, then we can use that as a more sophisticated measure of self-
// health because we assume them to be working, and they can help us
// decide if the probed node was really dead or if it was something wrong
// with ourselves.
awarenessDelta = 0
if expectedNacks > 0 {
if nackCount := len(nackCh); nackCount < expectedNacks {
awarenessDelta += 2 * (expectedNacks - nackCount)
}
} else {
awarenessDelta += 1
}
// No acks received from target, suspect it as failed.
m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
m.suspectNode(&s)
@@ -330,7 +404,7 @@ func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
// Prepare a ping message and setup an ack handler.
ping := ping{SeqNo: m.nextSeqNo(), Node: node}
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
// Send a ping to the node.
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
@@ -584,6 +658,11 @@ func (m *Memberlist) nextIncarnation() uint32 {
return atomic.AddUint32(&m.incarnation, 1)
}
// skipIncarnation adds the positive offset to the incarnation number.
func (m *Memberlist) skipIncarnation(offset uint32) uint32 {
return atomic.AddUint32(&m.incarnation, offset)
}
// estNumNodes is used to get the current estimate of the number of nodes
func (m *Memberlist) estNumNodes() int {
return int(atomic.LoadUint32(&m.numNodes))
@@ -595,19 +674,27 @@ type ackMessage struct {
Timestamp time.Time
}
// setAckChannel is used to attach a channel to receive a message when an ack with a given
// sequence number is received. The `complete` field of the message will be false on timeout
func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout time.Duration) {
// Create a handler function
handler := func(payload []byte, timestamp time.Time) {
// setProbeChannels is used to attach the ackCh to receive a message when an ack
// with a given sequence number is received. The `complete` field of the message
// will be false on timeout. Any nack messages will cause an empty struct to be
// passed to the nackCh, which can be nil if not needed.
func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) {
// Create handler functions for acks and nacks
ackFn := func(payload []byte, timestamp time.Time) {
select {
case ch <- ackMessage{true, payload, timestamp}:
case ackCh <- ackMessage{true, payload, timestamp}:
default:
}
}
nackFn := func() {
select {
case nackCh <- struct{}{}:
default:
}
}
// Add the handler
ah := &ackHandler{handler, nil}
// Add the handlers
ah := &ackHandler{ackFn, nackFn, nil}
m.ackLock.Lock()
m.ackHandlers[seqNo] = ah
m.ackLock.Unlock()
@@ -618,18 +705,19 @@ func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout tim
delete(m.ackHandlers, seqNo)
m.ackLock.Unlock()
select {
case ch <- ackMessage{false, nil, time.Now()}:
case ackCh <- ackMessage{false, nil, time.Now()}:
default:
}
})
}
// setAckHandler is used to attach a handler to be invoked when an
// ack with a given sequence number is received. If a timeout is reached,
// the handler is deleted
func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time), timeout time.Duration) {
// setAckHandler is used to attach a handler to be invoked when an ack with a
// given sequence number is received. If a timeout is reached, the handler is
// deleted. This is used for indirect pings so does not configure a function
// for nacks.
func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) {
// Add the handler
ah := &ackHandler{handler, nil}
ah := &ackHandler{ackFn, nil, nil}
m.ackLock.Lock()
m.ackHandlers[seqNo] = ah
m.ackLock.Unlock()
@@ -642,7 +730,7 @@ func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time)
})
}
// Invokes an Ack handler if any is associated, and reaps the handler immediately
// Invokes an ack handler if any is associated, and reaps the handler immediately
func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
m.ackLock.Lock()
ah, ok := m.ackHandlers[ack.SeqNo]
@@ -652,7 +740,49 @@ func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
return
}
ah.timer.Stop()
ah.handler(ack.Payload, timestamp)
ah.ackFn(ack.Payload, timestamp)
}
// Invokes nack handler if any is associated.
func (m *Memberlist) invokeNackHandler(nack nackResp) {
m.ackLock.Lock()
ah, ok := m.ackHandlers[nack.SeqNo]
m.ackLock.Unlock()
if !ok || ah.nackFn == nil {
return
}
ah.nackFn()
}
// refute gossips an alive message in response to incoming information that we
// are suspect or dead. It will make sure the incarnation number beats the given
// accusedInc value, or you can supply 0 to just get the next incarnation number.
// This alters the node state that's passed in so this MUST be called while the
// nodeLock is held.
func (m *Memberlist) refute(me *nodeState, accusedInc uint32) {
// Make sure the incarnation number beats the accusation.
inc := m.nextIncarnation()
if accusedInc >= inc {
inc = m.skipIncarnation(accusedInc - inc + 1)
}
me.Incarnation = inc
// Decrease our health because we are being asked to refute a problem.
m.awareness.ApplyDelta(1)
// Format and broadcast an alive message.
a := alive{
Incarnation: inc,
Node: me.Name,
Addr: me.Addr,
Port: me.Port,
Meta: me.Meta,
Vsn: []uint8{
me.PMin, me.PMax, me.PCur,
me.DMin, me.DMax, me.DCur,
},
}
m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a)
}
// aliveNode is invoked by the network layer when we get a message about a
@@ -754,6 +884,9 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
return
}
// Clear out any suspicion timer that may be in effect.
delete(m.nodeTimers, a.Node)
// Store the old state and meta data
oldState := state.State
oldMeta := state.Meta
@@ -783,21 +916,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
return
}
inc := m.nextIncarnation()
for a.Incarnation >= inc {
inc = m.nextIncarnation()
}
state.Incarnation = inc
a := alive{
Incarnation: inc,
Node: state.Name,
Addr: state.Addr,
Port: state.Port,
Meta: state.Meta,
Vsn: versions,
}
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
m.refute(state, a.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting an alive message")
} else {
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
@@ -854,6 +973,17 @@ func (m *Memberlist) suspectNode(s *suspect) {
return
}
// See if there's a suspicion timer we can confirm. If the info is new
// to us we will go ahead and re-gossip it. This allows for multiple
// independent confirmations to flow even when a node probes a node
// that's already suspect.
if timer, ok := m.nodeTimers[s.Node]; ok {
if timer.Confirm(s.From) {
m.encodeAndBroadcast(s.Node, suspectMsg, s)
}
return
}
// Ignore non-alive nodes
if state.State != stateAlive {
return
@@ -861,24 +991,7 @@ func (m *Memberlist) suspectNode(s *suspect) {
// If this is us we need to refute, otherwise re-broadcast
if state.Name == m.config.Name {
inc := m.nextIncarnation()
for s.Incarnation >= inc {
inc = m.nextIncarnation()
}
state.Incarnation = inc
a := alive{
Incarnation: inc,
Node: state.Name,
Addr: state.Addr,
Port: state.Port,
Meta: state.Meta,
Vsn: []uint8{
state.PMin, state.PMax, state.PCur,
state.DMin, state.DMax, state.DCur,
},
}
m.encodeAndBroadcast(s.Node, aliveMsg, a)
m.refute(state, s.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
return // Do not mark ourself suspect
} else {
@@ -894,26 +1007,41 @@ func (m *Memberlist) suspectNode(s *suspect) {
changeTime := time.Now()
state.StateChange = changeTime
// Setup a timeout for this
timeout := suspicionTimeout(m.config.SuspicionMult, m.estNumNodes(), m.config.ProbeInterval)
time.AfterFunc(timeout, func() {
// Setup a suspicion timer. Given that we don't have any known phase
// relationship with our peers, we set up k such that we hit the nominal
// timeout two probe intervals short of what we expect given the suspicion
// multiplier.
k := m.config.SuspicionMult - 2
// If there aren't enough nodes to give the expected confirmations, just
// set k to 0 to say that we don't expect any. Note we subtract 2 from n
// here to take out ourselves and the node being probed.
n := m.estNumNodes()
if n-2 < k {
k = 0
}
// Compute the timeouts based on the size of the cluster.
min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
fn := func(numConfirmations int) {
m.nodeLock.Lock()
state, ok := m.nodeMap[s.Node]
timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
m.nodeLock.Unlock()
if timeout {
m.suspectTimeout(state)
}
})
}
if k > 0 && numConfirmations < k {
metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
}
// suspectTimeout is invoked when a suspect timeout has occurred
func (m *Memberlist) suspectTimeout(n *nodeState) {
// Construct a dead message
m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached", n.Name)
d := dead{Incarnation: n.Incarnation, Node: n.Name, From: m.config.Name}
m.deadNode(&d)
m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
state.Name, numConfirmations)
d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
m.deadNode(&d)
}
}
m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
}
// deadNode is invoked by the network layer when we get a message
@@ -933,6 +1061,9 @@ func (m *Memberlist) deadNode(d *dead) {
return
}
// Clear out any suspicion timer that may be in effect.
delete(m.nodeTimers, d.Node)
// Ignore if node is already dead
if state.State == stateDead {
return
@@ -942,24 +1073,7 @@ func (m *Memberlist) deadNode(d *dead) {
if state.Name == m.config.Name {
// If we are not leaving we need to refute
if !m.leave {
inc := m.nextIncarnation()
for d.Incarnation >= inc {
inc = m.nextIncarnation()
}
state.Incarnation = inc
a := alive{
Incarnation: inc,
Node: state.Name,
Addr: state.Addr,
Port: state.Port,
Meta: state.Meta,
Vsn: []uint8{
state.PMin, state.PMax, state.PCur,
state.DMin, state.DMax, state.DCur,
},
}
m.encodeAndBroadcast(d.Node, aliveMsg, a)
m.refute(state, d.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
return // Do not mark ourself dead
}
@@ -1001,7 +1115,7 @@ func (m *Memberlist) mergeState(remote []pushNodeState) {
m.aliveNode(&a, nil, false)
case stateDead:
// If the remote node belives a node is dead, we prefer to
// If the remote node believes a node is dead, we prefer to
// suspect that node instead of declaring it dead instantly
fallthrough
case stateSuspect:

130
vendor/github.com/hashicorp/memberlist/suspicion.go generated vendored Normal file
View File

@@ -0,0 +1,130 @@
package memberlist
import (
"math"
"sync/atomic"
"time"
)
// suspicion manages the suspect timer for a node and provides an interface
// to accelerate the timeout as we get more independent confirmations that
// a node is suspect.
type suspicion struct {
// n is the number of independent confirmations we've seen. This must
// be updated using atomic instructions to prevent contention with the
// timer callback.
n int32
// k is the number of independent confirmations we'd like to see in
// order to drive the timer to its minimum value.
k int32
// min is the minimum timer value.
min time.Duration
// max is the maximum timer value.
max time.Duration
// start captures the timestamp when we began the timer. This is used
// so we can calculate durations to feed the timer during updates in
// a way the achieves the overall time we'd like.
start time.Time
// timer is the underlying timer that implements the timeout.
timer *time.Timer
// f is the function to call when the timer expires. We hold on to this
// because there are cases where we call it directly.
timeoutFn func()
// confirmations is a map of "from" nodes that have confirmed a given
// node is suspect. This prevents double counting.
confirmations map[string]struct{}
}
// newSuspicion returns a timer started with the max time, and that will drive
// to the min time after seeing k or more confirmations. The from node will be
// excluded from confirmations since we might get our own suspicion message
// gossiped back to us. The minimum time will be used if no confirmations are
// called for (k <= 0).
func newSuspicion(from string, k int, min time.Duration, max time.Duration, fn func(int)) *suspicion {
s := &suspicion{
k: int32(k),
min: min,
max: max,
confirmations: make(map[string]struct{}),
}
// Exclude the from node from any confirmations.
s.confirmations[from] = struct{}{}
// Pass the number of confirmations into the timeout function for
// easy telemetry.
s.timeoutFn = func() {
fn(int(atomic.LoadInt32(&s.n)))
}
// If there aren't any confirmations to be made then take the min
// time from the start.
timeout := max
if k < 1 {
timeout = min
}
s.timer = time.AfterFunc(timeout, s.timeoutFn)
// Capture the start time right after starting the timer above so
// we should always err on the side of a little longer timeout if
// there's any preemption that separates this and the step above.
s.start = time.Now()
return s
}
// remainingSuspicionTime takes the state variables of the suspicion timer and
// calculates the remaining time to wait before considering a node dead. The
// return value can be negative, so be prepared to fire the timer immediately in
// that case.
func remainingSuspicionTime(n, k int32, elapsed time.Duration, min, max time.Duration) time.Duration {
frac := math.Log(float64(n)+1.0) / math.Log(float64(k)+1.0)
raw := max.Seconds() - frac*(max.Seconds()-min.Seconds())
timeout := time.Duration(math.Floor(1000.0*raw)) * time.Millisecond
if timeout < min {
timeout = min
}
// We have to take into account the amount of time that has passed so
// far, so we get the right overall timeout.
return timeout - elapsed
}
// Confirm registers that a possibly new peer has also determined the given
// node is suspect. This returns true if this was new information, and false
// if it was a duplicate confirmation, or if we've got enough confirmations to
// hit the minimum.
func (s *suspicion) Confirm(from string) bool {
// If we've got enough confirmations then stop accepting them.
if atomic.LoadInt32(&s.n) >= s.k {
return false
}
// Only allow one confirmation from each possible peer.
if _, ok := s.confirmations[from]; ok {
return false
}
s.confirmations[from] = struct{}{}
// Compute the new timeout given the current number of confirmations and
// adjust the timer. If the timeout becomes negative *and* we can cleanly
// stop the timer then we will call the timeout function directly from
// here.
n := atomic.AddInt32(&s.n, 1)
elapsed := time.Now().Sub(s.start)
remaining := remainingSuspicionTime(n, s.k, elapsed, s.min, s.max)
if s.timer.Stop() {
if remaining > 0 {
s.timer.Reset(remaining)
} else {
go s.timeoutFn()
}
}
return true
}

View File

@@ -1,23 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test

View File

@@ -1,14 +0,0 @@
language: go
go:
- 1.2
- tip
install: make deps
script:
- make integ
notifications:
flowdock:
secure: fZrcf9rlh2IrQrlch1sHkn3YI7SKvjGnAl/zyV5D6NROe1Bbr6d3QRMuCXWWdhJHzjKmXk5rIzbqJhUc0PNF7YjxGNKSzqWMQ56KcvN1k8DzlqxpqkcA3Jbs6fXCWo2fssRtZ7hj/wOP1f5n6cc7kzHDt9dgaYJ6nO2fqNPJiTc=

View File

@@ -1,7 +1,7 @@
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
test:
go test -timeout=5s ./...
go test -timeout=30s ./...
integ: test
INTEG_TESTS=yes go test -timeout=3s -run=Integ ./...

View File

@@ -24,7 +24,7 @@ go version
For complete documentation, see the associated [Godoc](http://godoc.org/github.com/hashicorp/raft).
To prevent complications with cgo, the primary backend `MDBStore` is in a separate repositoy,
To prevent complications with cgo, the primary backend `MDBStore` is in a separate repository,
called [raft-mdb](http://github.com/hashicorp/raft-mdb). That is the recommended implementation
for the `LogStore` and `StableStore`.

View File

@@ -97,9 +97,15 @@ type shutdownFuture struct {
}
func (s *shutdownFuture) Error() error {
if s.raft == nil {
return nil
}
for s.raft.getRoutines() > 0 {
time.Sleep(5 * time.Millisecond)
}
if closeable, ok := s.raft.trans.(WithClose); ok {
closeable.Close()
}
return nil
}

View File

@@ -44,9 +44,11 @@ type InmemTransport struct {
}
// NewInmemTransport is used to initialize a new transport
// and generates a random local address.
func NewInmemTransport() (string, *InmemTransport) {
addr := NewInmemAddr()
// and generates a random local address if none is specified
func NewInmemTransport(addr string) (string, *InmemTransport) {
if addr == "" {
addr = NewInmemAddr()
}
trans := &InmemTransport{
consumerCh: make(chan RPC, 16),
localAddr: addr,
@@ -170,7 +172,8 @@ func (i *InmemTransport) DecodePeer(buf []byte) string {
// Connect is used to connect this transport to another transport for
// a given peer name. This allows for local routing.
func (i *InmemTransport) Connect(peer string, trans *InmemTransport) {
func (i *InmemTransport) Connect(peer string, t Transport) {
trans := t.(*InmemTransport)
i.Lock()
defer i.Unlock()
i.peers[peer] = trans
@@ -208,6 +211,12 @@ func (i *InmemTransport) DisconnectAll() {
i.pipelines = nil
}
// Close is used to permanently disable the transport
func (i *InmemTransport) Close() error {
i.DisconnectAll()
return nil
}
func newInmemPipeline(trans *InmemTransport, peer *InmemTransport, addr string) *inmemPipeline {
i := &inmemPipeline{
trans: trans,

111
vendor/github.com/hashicorp/raft/observer.go generated vendored Normal file
View File

@@ -0,0 +1,111 @@
package raft
import (
"sync/atomic"
)
// Observation is sent along the given channel to observers when an event occurs.
type Observation struct {
Raft *Raft
Data interface{}
}
// LeaderObservation is used for the data when leadership changes.
type LeaderObservation struct {
leader string
}
// nextObserverId is used to provide a unique ID for each observer to aid in
// deregistration.
var nextObserverId uint64
// FilterFn is a function that can be registered in order to filter observations
// by returning false.
type FilterFn func(o *Observation) bool
// Observer describes what to do with a given observation.
type Observer struct {
// channel receives observations.
channel chan Observation
// blocking, if true, will cause Raft to block when sending an observation
// to this observer. This should generally be set to false.
blocking bool
// filter will be called to determine if an observation should be sent to
// the channel.
filter FilterFn
// id is the ID of this observer in the Raft map.
id uint64
// numObserved and numDropped are performance counters for this observer.
numObserved uint64
numDropped uint64
}
// Create a new observer with the specified channel, blocking behavior, and
// filter (filter can be nil).
func NewObserver(channel chan Observation, blocking bool, filter FilterFn) *Observer {
return &Observer{
channel: channel,
blocking: blocking,
filter: filter,
id: atomic.AddUint64(&nextObserverId, 1),
}
}
// GetNumObserved returns the number of observations.
func (or *Observer) GetNumObserved() uint64 {
return atomic.LoadUint64(&or.numObserved)
}
// GetNumDropped returns the number of dropped observations due to blocking.
func (or *Observer) GetNumDropped() uint64 {
return atomic.LoadUint64(&or.numDropped)
}
// Register a new observer.
func (r *Raft) RegisterObserver(or *Observer) {
r.observersLock.Lock()
defer r.observersLock.Unlock()
r.observers[or.id] = or
}
// Deregister an observer.
func (r *Raft) DeregisterObserver(or *Observer) {
r.observersLock.Lock()
defer r.observersLock.Unlock()
delete(r.observers, or.id)
}
// Send an observation to every observer.
func (r *Raft) observe(o interface{}) {
// In general observers should not block. But in any case this isn't
// disastrous as we only hold a read lock, which merely prevents
// registration / deregistration of observers.
r.observersLock.RLock()
defer r.observersLock.RUnlock()
for _, or := range r.observers {
// It's wasteful to do this in the loop, but for the common case
// where there are no observers we won't create any objects.
ob := Observation{Raft: r, Data: o}
if or.filter != nil && !or.filter(&ob) {
continue
}
if or.channel == nil {
continue
}
if or.blocking {
or.channel <- ob
atomic.AddUint64(&or.numObserved, 1)
} else {
select {
case or.channel <- ob:
atomic.AddUint64(&or.numObserved, 1)
default:
atomic.AddUint64(&or.numDropped, 1)
}
}
}
}

View File

@@ -147,6 +147,11 @@ type Raft struct {
// verifyCh is used to async send verify futures to the main thread
// to verify we are still the leader
verifyCh chan *verifyFuture
// List of observers and the mutex that protects them. The observers list
// is indexed by an artificial ID which is used for deregistration.
observersLock sync.RWMutex
observers map[uint64]*Observer
}
// NewRaft is used to construct a new Raft node. It takes a configuration, as well
@@ -221,6 +226,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
stable: stable,
trans: trans,
verifyCh: make(chan *verifyFuture, 64),
observers: make(map[uint64]*Observer),
}
// Initialize as a follower
@@ -235,8 +241,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// Restore the current term and the last log
r.setCurrentTerm(currentTerm)
r.setLastLogIndex(lastLog.Index)
r.setLastLogTerm(lastLog.Term)
r.setLastLog(lastLog.Index, lastLog.Term)
// Attempt to restore a snapshot if there are any
if err := r.restoreSnapshot(); err != nil {
@@ -268,8 +273,12 @@ func (r *Raft) Leader() string {
// setLeader is used to modify the current leader of the cluster
func (r *Raft) setLeader(leader string) {
r.leaderLock.Lock()
oldLeader := r.leader
r.leader = leader
r.leaderLock.Unlock()
if oldLeader != leader {
r.observe(LeaderObservation{leader: leader})
}
}
// Apply is used to apply a command to the FSM in a highly consistent
@@ -412,9 +421,11 @@ func (r *Raft) Shutdown() Future {
close(r.shutdownCh)
r.shutdown = true
r.setState(Shutdown)
return &shutdownFuture{r}
}
return &shutdownFuture{r}
// avoid closing transport twice
return &shutdownFuture{nil}
}
// Snapshot is used to manually force Raft to take a snapshot.
@@ -463,16 +474,18 @@ func (r *Raft) Stats() map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
lastLogIndex, lastLogTerm := r.getLastLog()
lastSnapIndex, lastSnapTerm := r.getLastSnapshot()
s := map[string]string{
"state": r.getState().String(),
"term": toString(r.getCurrentTerm()),
"last_log_index": toString(r.getLastLogIndex()),
"last_log_term": toString(r.getLastLogTerm()),
"last_log_index": toString(lastLogIndex),
"last_log_term": toString(lastLogTerm),
"commit_index": toString(r.getCommitIndex()),
"applied_index": toString(r.getLastApplied()),
"fsm_pending": toString(uint64(len(r.fsmCommitCh))),
"last_snapshot_index": toString(r.getLastSnapshotIndex()),
"last_snapshot_term": toString(r.getLastSnapshotTerm()),
"last_snapshot_index": toString(lastSnapIndex),
"last_snapshot_term": toString(lastSnapTerm),
"num_peers": toString(uint64(len(r.peers))),
}
last := r.LastContact()
@@ -607,7 +620,7 @@ func (r *Raft) run() {
// runFollower runs the FSM for a follower.
func (r *Raft) runFollower() {
didWarn := false
r.logger.Printf("[INFO] raft: %v entering Follower state", r)
r.logger.Printf("[INFO] raft: %v entering Follower state (Leader: %q)", r, r.Leader())
metrics.IncrCounter([]string{"raft", "state", "follower"}, 1)
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
for {
@@ -639,6 +652,7 @@ func (r *Raft) runFollower() {
}
// Heartbeat failed! Transition to the candidate state
lastLeader := r.Leader()
r.setLeader("")
if len(r.peers) == 0 && !r.conf.EnableSingleNode {
if !didWarn {
@@ -646,9 +660,9 @@ func (r *Raft) runFollower() {
didWarn = true
}
} else {
r.logger.Printf("[WARN] raft: Heartbeat timeout reached, starting election")
r.logger.Printf(`[WARN] raft: Heartbeat timeout from %q reached, starting election`, lastLeader)
metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timout"}, 1)
metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1)
r.setState(Candidate)
return
}
@@ -1125,8 +1139,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
r.leaderState.inflight.StartAll(applyLogs)
// Update the last log since it's on disk now
r.setLastLogIndex(lastIndex + uint64(len(applyLogs)))
r.setLastLogTerm(term)
r.setLastLog(lastIndex+uint64(len(applyLogs)), term)
// Notify the replicators of the new log
for _, f := range r.leaderState.replState {
@@ -1367,7 +1380,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
last := a.Entries[n-1]
// Delete any conflicting entries
lastLogIdx := r.getLastLogIndex()
lastLogIdx, _ := r.getLastLog()
if first.Index <= lastLogIdx {
r.logger.Printf("[WARN] raft: Clearing log suffix from %d to %d", first.Index, lastLogIdx)
if err := r.logs.DeleteRange(first.Index, lastLogIdx); err != nil {
@@ -1383,8 +1396,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
}
// Update the lastLog
r.setLastLogIndex(last.Index)
r.setLastLogTerm(last.Term)
r.setLastLog(last.Index, last.Term)
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "storeLogs"}, start)
}
@@ -1406,6 +1418,8 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// requestVote is invoked when we get an request vote RPC call.
func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "requestVote"}, time.Now())
r.observe(*req)
// Setup a response
resp := &RequestVoteResponse{
Term: r.getCurrentTerm(),
@@ -1468,7 +1482,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
return
}
if lastIdx > req.LastLogIndex {
if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex {
r.logger.Printf("[WARN] raft: Rejecting vote request from %v since our last index is greater (%d, %d)",
candidate, lastIdx, req.LastLogIndex)
return
@@ -1481,6 +1495,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
}
resp.Granted = true
r.setLastContact()
return
}
@@ -1569,8 +1584,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLastApplied(req.LastLogIndex)
// Update the last stable snapshot info
r.setLastSnapshotIndex(req.LastLogIndex)
r.setLastSnapshotTerm(req.LastLogTerm)
r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm)
// Restore the peer set
peers := decodePeers(req.Peers, r.trans)
@@ -1694,7 +1708,11 @@ func (r *Raft) setCurrentTerm(t uint64) {
// that leader should be set only after updating the state.
func (r *Raft) setState(state RaftState) {
r.setLeader("")
oldState := r.raftState.getState()
r.raftState.setState(state)
if oldState != state {
r.observe(state)
}
}
// runSnapshots is a long running goroutine used to manage taking
@@ -1732,7 +1750,7 @@ func (r *Raft) runSnapshots() {
// a new snapshot.
func (r *Raft) shouldSnapshot() bool {
// Check the last snapshot index
lastSnap := r.getLastSnapshotIndex()
lastSnap, _ := r.getLastSnapshot()
// Check the last log index
lastIdx, err := r.logs.LastIndex()
@@ -1797,8 +1815,7 @@ func (r *Raft) takeSnapshot() error {
}
// Update the last stable snapshot info
r.setLastSnapshotIndex(req.index)
r.setLastSnapshotTerm(req.term)
r.setLastSnapshot(req.index, req.term)
// Compact the logs
if err := r.compactLogs(req.index); err != nil {
@@ -1821,7 +1838,8 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
}
// Check if we have enough logs to truncate
if r.getLastLogIndex() <= r.conf.TrailingLogs {
lastLogIdx, _ := r.getLastLog()
if lastLogIdx <= r.conf.TrailingLogs {
return nil
}
@@ -1829,7 +1847,7 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
// back from the head, which ever is further back. This ensures
// at least `TrailingLogs` entries, but does not allow logs
// after the snapshot to be removed.
maxLog := min(snapIdx, r.getLastLogIndex()-r.conf.TrailingLogs)
maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs)
// Log this
r.logger.Printf("[INFO] raft: Compacting logs from %d to %d", minLog, maxLog)
@@ -1872,8 +1890,7 @@ func (r *Raft) restoreSnapshot() error {
r.setLastApplied(snapshot.Index)
// Update the last stable snapshot info
r.setLastSnapshotIndex(snapshot.Index)
r.setLastSnapshotTerm(snapshot.Term)
r.setLastSnapshot(snapshot.Index, snapshot.Term)
// Success!
return nil

View File

@@ -102,9 +102,11 @@ RPC:
}
return
case <-s.triggerCh:
shouldStop = r.replicateTo(s, r.getLastLogIndex())
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
case <-randomTimeout(r.conf.CommitTimeout):
shouldStop = r.replicateTo(s, r.getLastLogIndex())
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
}
// If things looks healthy, switch to pipeline mode
@@ -358,9 +360,11 @@ SEND:
}
break SEND
case <-s.triggerCh:
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndex())
lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
case <-randomTimeout(r.conf.CommitTimeout):
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndex())
lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
}
}
@@ -446,13 +450,14 @@ func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequ
func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error {
// Guard for the first index, since there is no 0 log entry
// Guard against the previous index being a snapshot as well
lastSnapIdx, lastSnapTerm := r.getLastSnapshot()
if nextIndex == 1 {
req.PrevLogEntry = 0
req.PrevLogTerm = 0
} else if (nextIndex - 1) == r.getLastSnapshotIndex() {
req.PrevLogEntry = r.getLastSnapshotIndex()
req.PrevLogTerm = r.getLastSnapshotTerm()
} else if (nextIndex - 1) == lastSnapIdx {
req.PrevLogEntry = lastSnapIdx
req.PrevLogTerm = lastSnapTerm
} else {
var l Log

View File

@@ -1,6 +1,7 @@
package raft
import (
"sync"
"sync/atomic"
)
@@ -44,20 +45,23 @@ type raftState struct {
// The current term, cache of StableStore
currentTerm uint64
// Cache the latest log from LogStore
LastLogIndex uint64
LastLogTerm uint64
// Highest committed log entry
commitIndex uint64
// Last applied log to the FSM
lastApplied uint64
// protects 4 next fields
lastLock sync.Mutex
// Cache the latest snapshot index/term
lastSnapshotIndex uint64
lastSnapshotTerm uint64
// Cache the latest log from LogStore
lastLogIndex uint64
lastLogTerm uint64
// Tracks the number of live routines
runningRoutines int32
@@ -83,52 +87,50 @@ func (r *raftState) setCurrentTerm(term uint64) {
atomic.StoreUint64(&r.currentTerm, term)
}
func (r *raftState) getLastLogIndex() uint64 {
return atomic.LoadUint64(&r.LastLogIndex)
func (r *raftState) getLastLog() (index, term uint64) {
r.lastLock.Lock()
index = r.lastLogIndex
term = r.lastLogTerm
r.lastLock.Unlock()
return
}
func (r *raftState) setLastLogIndex(term uint64) {
atomic.StoreUint64(&r.LastLogIndex, term)
func (r *raftState) setLastLog(index, term uint64) {
r.lastLock.Lock()
r.lastLogIndex = index
r.lastLogTerm = term
r.lastLock.Unlock()
}
func (r *raftState) getLastLogTerm() uint64 {
return atomic.LoadUint64(&r.LastLogTerm)
func (r *raftState) getLastSnapshot() (index, term uint64) {
r.lastLock.Lock()
index = r.lastSnapshotIndex
term = r.lastSnapshotTerm
r.lastLock.Unlock()
return
}
func (r *raftState) setLastLogTerm(term uint64) {
atomic.StoreUint64(&r.LastLogTerm, term)
func (r *raftState) setLastSnapshot(index, term uint64) {
r.lastLock.Lock()
r.lastSnapshotIndex = index
r.lastSnapshotTerm = term
r.lastLock.Unlock()
}
func (r *raftState) getCommitIndex() uint64 {
return atomic.LoadUint64(&r.commitIndex)
}
func (r *raftState) setCommitIndex(term uint64) {
atomic.StoreUint64(&r.commitIndex, term)
func (r *raftState) setCommitIndex(index uint64) {
atomic.StoreUint64(&r.commitIndex, index)
}
func (r *raftState) getLastApplied() uint64 {
return atomic.LoadUint64(&r.lastApplied)
}
func (r *raftState) setLastApplied(term uint64) {
atomic.StoreUint64(&r.lastApplied, term)
}
func (r *raftState) getLastSnapshotIndex() uint64 {
return atomic.LoadUint64(&r.lastSnapshotIndex)
}
func (r *raftState) setLastSnapshotIndex(term uint64) {
atomic.StoreUint64(&r.lastSnapshotIndex, term)
}
func (r *raftState) getLastSnapshotTerm() uint64 {
return atomic.LoadUint64(&r.lastSnapshotTerm)
}
func (r *raftState) setLastSnapshotTerm(term uint64) {
atomic.StoreUint64(&r.lastSnapshotTerm, term)
func (r *raftState) setLastApplied(index uint64) {
atomic.StoreUint64(&r.lastApplied, index)
}
func (r *raftState) incrRoutines() {
@@ -156,14 +158,18 @@ func (r *raftState) goFunc(f func()) {
// getLastIndex returns the last index in stable storage.
// Either from the last log or from the last snapshot.
func (r *raftState) getLastIndex() uint64 {
return max(r.getLastLogIndex(), r.getLastSnapshotIndex())
r.lastLock.Lock()
defer r.lastLock.Unlock()
return max(r.lastLogIndex, r.lastSnapshotIndex)
}
// getLastEntry returns the last index and term in stable storage.
// Either from the last log or from the last snapshot.
func (r *raftState) getLastEntry() (uint64, uint64) {
if r.getLastLogIndex() >= r.getLastSnapshotIndex() {
return r.getLastLogIndex(), r.getLastLogTerm()
r.lastLock.Lock()
defer r.lastLock.Unlock()
if r.lastLogIndex >= r.lastSnapshotIndex {
return r.lastLogIndex, r.lastLogTerm
}
return r.getLastSnapshotIndex(), r.getLastSnapshotTerm()
return r.lastSnapshotIndex, r.lastSnapshotTerm
}

View File

@@ -60,6 +60,30 @@ type Transport interface {
SetHeartbeatHandler(cb func(rpc RPC))
}
// Close() lives in a separate interface as unfortunately it wasn't in the
// original interface specification.
type WithClose interface {
// Permanently close a transport, stop all go-routines etc
Close() error
}
// Loopback transport is an interface that provides a loopback transport suitable for testing
// e.g. InmemTransport. It's there so we don't have to rewrite tests.
type LoopbackTransport interface {
Transport // Embedded transport reference
WithPeers // Embedded peer management
WithClose // with a close routine
}
// WithPeers is an interface that a transport may provide which allows for connection and
// disconnection. Unless the transport is a loopback transport, the transport specified to
// "Connect" is likely to be nil.
type WithPeers interface {
Connect(peer string, t Transport) // Connect a peer
Disconnect(peer string) // Disconnect a given peer
DisconnectAll() // Disconnect all peers, possibly to reconnect them later
}
// AppendPipeline is used for pipelining AppendEntries requests. It is used
// to increase the replication throughput by masking latency and better
// utilizing bandwidth.

View File

@@ -1 +0,0 @@
# TODO - I'll beef this up as I implement each of the enhancements.

View File

@@ -16,7 +16,7 @@ package coordinate
type Config struct {
// The dimensionality of the coordinate system. As discussed in [2], more
// dimensions improves the accuracy of the estimates up to a point. Per [2]
// we chose 4 dimensions plus a non-Euclidean height.
// we chose 8 dimensions plus a non-Euclidean height.
Dimensionality uint
// VivaldiErrorMax is the default error value when a node hasn't yet made

View File

@@ -103,6 +103,13 @@ type Config struct {
ReconnectTimeout time.Duration
TombstoneTimeout time.Duration
// FlapTimeout is the amount of time less than which we consider a node
// being failed and rejoining looks like a flap for telemetry purposes.
// This should be set less than a typical reboot time, but large enough
// to see actual events, given our expected detection times for a failed
// node.
FlapTimeout time.Duration
// QueueDepthWarning is used to generate warning message if the
// number of queued messages to broadcast exceeds this number. This
// is to provide the user feedback if events are being triggered
@@ -241,6 +248,7 @@ func DefaultConfig() *Config {
QueueDepthWarning: 128,
MaxQueueDepth: 4096,
TombstoneTimeout: 24 * time.Hour,
FlapTimeout: 60 * time.Second,
MemberlistConfig: memberlist.DefaultLANConfig(),
QueryTimeoutMult: 16,
QueryResponseSizeLimit: 1024,

View File

@@ -214,8 +214,8 @@ type queries struct {
}
const (
UserEventSizeLimit = 512 // Maximum byte size for event name and payload
snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot
UserEventSizeLimit = 512 // Maximum byte size for event name and payload
snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot
)
// Create creates a new Serf instance, starting all the background tasks
@@ -871,6 +871,11 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) {
s.members[n.Name] = member
} else {
oldStatus = member.Status
deadTime := time.Now().Sub(member.leaveTime)
if oldStatus == StatusFailed && deadTime < s.config.FlapTimeout {
metrics.IncrCounter([]string{"serf", "member", "flap"}, 1)
}
member.Status = StatusAlive
member.leaveTime = time.Time{}
member.Addr = net.IP(n.Addr)
@@ -1617,6 +1622,7 @@ func (s *Serf) Stats() map[string]string {
"members": toString(uint64(len(s.members))),
"failed": toString(uint64(len(s.failedMembers))),
"left": toString(uint64(len(s.leftMembers))),
"health_score": toString(uint64(s.memberlist.GetHealthScore())),
"member_time": toString(uint64(s.clock.Time())),
"event_time": toString(uint64(s.eventClock.Time())),
"query_time": toString(uint64(s.queryClock.Time())),
@@ -1680,3 +1686,13 @@ func (s *Serf) GetCachedCoordinate(name string) (coord *coordinate.Coordinate, o
return nil, false
}
// NumNodes returns the number of nodes in the serf cluster, regardless of
// their health or status.
func (s *Serf) NumNodes() (numNodes int) {
s.memberLock.RLock()
numNodes = len(s.members)
s.memberLock.RUnlock()
return numNodes
}

View File

@@ -22,7 +22,7 @@ Each field is described below:
## Version Field
The version field is used for future backwards compatibily. At the
The version field is used for future backward compatibility. At the
current time, the field is always set to 0, to indicate the initial
version.
@@ -138,4 +138,3 @@ provide an error code:
* 0x0 Normal termination
* 0x1 Protocol error
* 0x2 Internal error

42
vendor/vendor.json vendored
View File

@@ -30,8 +30,10 @@
"revision": "06b60999766278efd6d2b5d8418a58c3d5b99e87"
},
{
"checksumSHA1": "gNO0JNpLzYOdInGeq7HqMZUzx9M=",
"path": "github.com/armon/go-radix",
"revision": "4239b77079c7b5d1243b7b4736304ce8ddb6f0f2"
"revision": "4239b77079c7b5d1243b7b4736304ce8ddb6f0f2",
"revisionTime": "2016-01-15T23:47:25Z"
},
{
"comment": "v1.0.6-2-g80dd495",
@@ -445,12 +447,16 @@
"revision": "3142ddc1d627a166970ddd301bc09cb510c74edc"
},
{
"checksumSHA1": "qmE9mO0WW6ALLpUU81rXDyspP5M=",
"path": "github.com/hashicorp/go-immutable-radix",
"revision": "8e8ed81f8f0bf1bdd829593fdd5c29922c1ea990"
"revision": "afc5a0dbb18abdf82c277a7bc01533e81fa1d6b8",
"revisionTime": "2016-06-09T02:05:29Z"
},
{
"checksumSHA1": "/V57CyN7x2NUlHoOzVL5GgGXX84=",
"path": "github.com/hashicorp/go-memdb",
"revision": "2cc5518f24b906e7cccfc808817ba479f5489821"
"revision": "98f52f52d7a476958fa9da671354d270c50661a7",
"revisionTime": "2016-03-01T23:01:42Z"
},
{
"path": "github.com/hashicorp/go-msgpack/codec",
@@ -521,24 +527,26 @@
"revision": "0dc08b1671f34c4250ce212759ebd880f743d883"
},
{
"checksumSHA1": "pWIsEUE53NffjS1goRml43Cvleg=",
"path": "github.com/hashicorp/memberlist",
"revision": "88ac4de0d1a0ca6def284b571342db3b777a4c37"
"revision": "215aec831f03c9b7c61ac183d3e28fff3c7d3a37",
"revisionTime": "2016-06-04T06:22:06Z"
},
{
"path": "github.com/hashicorp/net-rpc-msgpackrpc",
"revision": "a14192a58a694c123d8fe5481d4a4727d6ae82f3"
},
{
"checksumSHA1": "lXd9Jdzl1JmphijD/sAaw169/0Q=",
"path": "github.com/hashicorp/raft",
"revision": "057b893fd996696719e98b6c44649ea14968c811"
"revision": "4bcac2adb06930200744feb591fee8a0790f9c98",
"revisionTime": "2016-06-03T20:22:43Z"
},
{
"checksumSHA1": "QAxukkv54/iIvLfsUP6IK4R0m/A=",
"path": "github.com/hashicorp/raft-boltdb",
"revision": "057b893fd996696719e98b6c44649ea14968c811"
},
{
"path": "github.com/hashicorp/raft/bench",
"revision": "057b893fd996696719e98b6c44649ea14968c811"
"revision": "d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee",
"revisionTime": "2015-02-01T20:08:39Z"
},
{
"checksumSHA1": "u9qHbpIgMZ7/fjO0gFfds2m/1ck=",
@@ -553,20 +561,24 @@
"revisionTime": "2016-06-01T22:40:23Z"
},
{
"checksumSHA1": "E3Xcanc9ouQwL+CZGOUyA/+giLg=",
"comment": "v0.7.0-18-gc4c55f1",
"path": "github.com/hashicorp/serf/coordinate",
"revision": "c4c55f16bae1aed9b355ad655d3ebf0215734461"
"revision": "6c4672d66fc6312ddde18399262943e21175d831",
"revisionTime": "2016-06-09T00:18:40Z"
},
{
"checksumSHA1": "jgQHuXL6QZLht/1dIYfSytPKWr4=",
"comment": "v0.7.0-18-gc4c55f1",
"path": "github.com/hashicorp/serf/serf",
"revision": "c4c55f16bae1aed9b355ad655d3ebf0215734461"
"revision": "6c4672d66fc6312ddde18399262943e21175d831",
"revisionTime": "2016-06-09T00:18:40Z"
},
{
"checksumSHA1": "xvxetwF2G1XHScrmo8EM3yisjBc=",
"checksumSHA1": "VMaF3Q7RIrRzvbnPbqxuSLryOvc=",
"path": "github.com/hashicorp/yamux",
"revision": "172cde3b6ca5c154ff4e6e2ef96b7451332a9946",
"revisionTime": "2016-05-19T16:00:42Z"
"revision": "badf81fca035b8ebac61b5ab83330b72541056f4",
"revisionTime": "2016-06-09T13:59:02Z"
},
{
"comment": "0.2.2-2-gc01cf91",