vendor: update consul for grpc

This commit is contained in:
Michael Schurter
2018-05-03 11:27:28 -07:00
parent 93356e7d70
commit 882bf5a62c
23 changed files with 252 additions and 173 deletions

View File

@@ -5,7 +5,7 @@ import (
)
const (
// ACLCLientType is the client type token
// ACLClientType is the client type token
ACLClientType = "client"
// ACLManagementType is the management type token

View File

@@ -15,6 +15,7 @@ type AgentCheck struct {
Output string
ServiceID string
ServiceName string
Definition HealthCheckDefinition
}
// AgentService represents a service known to the agent
@@ -59,12 +60,13 @@ type MembersOpts struct {
// AgentServiceRegistration is used to register a new service
type AgentServiceRegistration struct {
ID string `json:",omitempty"`
Name string `json:",omitempty"`
Tags []string `json:",omitempty"`
Port int `json:",omitempty"`
Address string `json:",omitempty"`
EnableTagOverride bool `json:",omitempty"`
ID string `json:",omitempty"`
Name string `json:",omitempty"`
Tags []string `json:",omitempty"`
Port int `json:",omitempty"`
Address string `json:",omitempty"`
EnableTagOverride bool `json:",omitempty"`
Meta map[string]string `json:",omitempty"`
Check *AgentServiceCheck
Checks AgentServiceChecks
}
@@ -80,7 +82,10 @@ type AgentCheckRegistration struct {
// AgentServiceCheck is used to define a node or service level check
type AgentServiceCheck struct {
Script string `json:",omitempty"`
CheckID string `json:",omitempty"`
Name string `json:",omitempty"`
Args []string `json:"ScriptArgs,omitempty"`
Script string `json:",omitempty"` // Deprecated, use Args.
DockerContainerID string `json:",omitempty"`
Shell string `json:",omitempty"` // Only supported for Docker.
Interval string `json:",omitempty"`
@@ -93,6 +98,8 @@ type AgentServiceCheck struct {
Status string `json:",omitempty"`
Notes string `json:",omitempty"`
TLSSkipVerify bool `json:",omitempty"`
GRPC string `json:",omitempty"`
GRPCUseTLS bool `json:",omitempty"`
// In Consul 0.7 and later, checks that are associated with a service
// may also contain this optional DeregisterCriticalServiceAfter field,
@@ -580,36 +587,36 @@ func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions
// UpdateACLToken updates the agent's "acl_token". See updateToken for more
// details.
func (c *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_token", token, q)
func (a *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) {
return a.updateToken("acl_token", token, q)
}
// UpdateACLAgentToken updates the agent's "acl_agent_token". See updateToken
// for more details.
func (c *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_agent_token", token, q)
func (a *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) {
return a.updateToken("acl_agent_token", token, q)
}
// UpdateACLAgentMasterToken updates the agent's "acl_agent_master_token". See
// updateToken for more details.
func (c *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_agent_master_token", token, q)
func (a *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) {
return a.updateToken("acl_agent_master_token", token, q)
}
// UpdateACLReplicationToken updates the agent's "acl_replication_token". See
// updateToken for more details.
func (c *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_replication_token", token, q)
func (a *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) {
return a.updateToken("acl_replication_token", token, q)
}
// updateToken can be used to update an agent's ACL token after the agent has
// started. The tokens are not persisted, so will need to be updated again if
// the agent is restarted.
func (c *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) {
r := c.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target))
func (a *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) {
r := a.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target))
r.setWriteOptions(q)
r.obj = &AgentToken{Token: token}
rtt, resp, err := requireOK(c.c.doRequest(r))
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}

View File

@@ -101,7 +101,7 @@ type QueryOptions struct {
// be provided for filtering.
NodeMeta map[string]string
// RelayFactor is used in keyring operations to cause reponses to be
// RelayFactor is used in keyring operations to cause responses to be
// relayed back to the sender through N other random nodes. Must be
// a value from 0 to 5 (inclusive).
RelayFactor uint8
@@ -137,7 +137,7 @@ type WriteOptions struct {
// which overrides the agent's default token.
Token string
// RelayFactor is used in keyring operations to cause reponses to be
// RelayFactor is used in keyring operations to cause responses to be
// relayed back to the sender through N other random nodes. Must be
// a value from 0 to 5 (inclusive).
RelayFactor uint8
@@ -377,12 +377,14 @@ func SetupTLSConfig(tlsConfig *TLSConfig) (*tls.Config, error) {
tlsClientConfig.Certificates = []tls.Certificate{tlsCert}
}
rootConfig := &rootcerts.Config{
CAFile: tlsConfig.CAFile,
CAPath: tlsConfig.CAPath,
}
if err := rootcerts.ConfigureTLS(tlsClientConfig, rootConfig); err != nil {
return nil, err
if tlsConfig.CAFile != "" || tlsConfig.CAPath != "" {
rootConfig := &rootcerts.Config{
CAFile: tlsConfig.CAFile,
CAPath: tlsConfig.CAPath,
}
if err := rootcerts.ConfigureTLS(tlsClientConfig, rootConfig); err != nil {
return nil, err
}
}
return tlsClientConfig, nil
@@ -477,6 +479,14 @@ func NewHttpClient(transport *http.Transport, tlsConf TLSConfig) (*http.Client,
Transport: transport,
}
// TODO (slackpad) - Once we get some run time on the HTTP/2 support we
// should turn it on by default if TLS is enabled. We would basically
// just need to call http2.ConfigureTransport(transport) here. We also
// don't want to introduce another external dependency on
// golang.org/x/net/http2 at this time. For a complete recipe for how
// to enable HTTP/2 support on a transport suitable for the API client
// library see agent/http_test.go:TestHTTPServer_H2.
if transport.TLSClientConfig == nil {
tlsClientConfig, err := SetupTLSConfig(&tlsConf)
@@ -623,9 +633,9 @@ func (r *request) toHTTP() (*http.Request, error) {
}
if r.ctx != nil {
return req.WithContext(r.ctx), nil
} else {
return req, nil
}
return req, nil
}
// newRequest is used to create a new request
@@ -661,7 +671,7 @@ func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
}
start := time.Now()
resp, err := c.config.HttpClient.Do(req)
diff := time.Now().Sub(start)
diff := time.Since(start)
return diff, resp, err
}

View File

@@ -22,6 +22,7 @@ type CatalogService struct {
ServiceName string
ServiceAddress string
ServiceTags []string
ServiceMeta map[string]string
ServicePort int
ServiceEnableTagOverride bool
CreateIndex uint64
@@ -42,6 +43,7 @@ type CatalogRegistration struct {
Datacenter string
Service *AgentService
Check *AgentCheck
SkipNodeUpdate bool
}
type CatalogDeregistration struct {

View File

@@ -66,3 +66,41 @@ func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, err
}
return out, qm, nil
}
// Update inserts or updates the LAN coordinate of a node.
func (c *Coordinate) Update(coord *CoordinateEntry, q *WriteOptions) (*WriteMeta, error) {
r := c.c.newRequest("PUT", "/v1/coordinate/update")
r.setWriteOptions(q)
r.obj = coord
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{}
wm.RequestTime = rtt
return wm, nil
}
// Node is used to return the coordinates of a single in the LAN pool.
func (c *Coordinate) Node(node string, q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) {
r := c.c.newRequest("GET", "/v1/coordinate/node/"+node)
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out []*CoordinateEntry
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}

View File

@@ -34,6 +34,21 @@ type HealthCheck struct {
ServiceID string
ServiceName string
ServiceTags []string
Definition HealthCheckDefinition
}
// HealthCheckDefinition is used to store the details about
// a health check's execution.
type HealthCheckDefinition struct {
HTTP string
Header map[string][]string
Method string
TLSSkipVerify bool
TCP string
Interval ReadableDuration
Timeout ReadableDuration
DeregisterCriticalServiceAfter ReadableDuration
}
// HealthChecks is a collection of HealthCheck structs.

View File

@@ -252,7 +252,7 @@ func (k *KV) put(key string, params map[string]string, body []byte, q *WriteOpti
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, fmt.Errorf("Failed to read response: %v", err)
}
res := strings.Contains(string(buf.Bytes()), "true")
res := strings.Contains(buf.String(), "true")
return res, qm, nil
}
@@ -296,7 +296,7 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, fmt.Errorf("Failed to read response: %v", err)
}
res := strings.Contains(string(buf.Bytes()), "true")
res := strings.Contains(buf.String(), "true")
return res, qm, nil
}

View File

@@ -180,7 +180,7 @@ WAIT:
// Handle the one-shot mode.
if l.opts.LockTryOnce && attempts > 0 {
elapsed := time.Now().Sub(start)
elapsed := time.Since(start)
if elapsed > qOpts.WaitTime {
return nil, nil
}

View File

@@ -196,7 +196,7 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, fmt.Errorf("Failed to read response: %v", err)
}
res := strings.Contains(string(buf.Bytes()), "true")
res := strings.Contains(buf.String(), "true")
return res, nil
}

View File

@@ -34,6 +34,12 @@ type ServiceQuery struct {
// local datacenter.
Failover QueryDatacenterOptions
// IgnoreCheckIDs is an optional list of health check IDs to ignore when
// considering which nodes are healthy. It is useful as an emergency measure
// to temporarily override some health check that is producing false negatives
// for example.
IgnoreCheckIDs []string
// If OnlyPassing is true then we will only include nodes with passing
// health checks (critical AND warning checks will cause a node to be
// discarded)
@@ -61,7 +67,7 @@ type QueryTemplate struct {
Regexp string
}
// PrepatedQueryDefinition defines a complete prepared query.
// PreparedQueryDefinition defines a complete prepared query.
type PreparedQueryDefinition struct {
// ID is this UUID-based ID for the query, always generated by Consul.
ID string

View File

@@ -198,7 +198,7 @@ WAIT:
// Handle the one-shot mode.
if s.opts.SemaphoreTryOnce && attempts > 0 {
elapsed := time.Now().Sub(start)
elapsed := time.Since(start)
if elapsed > qOpts.WaitTime {
return nil, nil
}

View File

@@ -87,6 +87,12 @@ func (f *HTTPFlags) Token() string {
func (f *HTTPFlags) APIClient() (*api.Client, error) {
c := api.DefaultConfig()
f.MergeOntoConfig(c)
return api.NewClient(c)
}
func (f *HTTPFlags) MergeOntoConfig(c *api.Config) {
f.address.Merge(&c.Address)
f.token.Merge(&c.Token)
f.caFile.Merge(&c.TLSConfig.CAFile)
@@ -95,6 +101,4 @@ func (f *HTTPFlags) APIClient() (*api.Client, error) {
f.keyFile.Merge(&c.TLSConfig.KeyFile)
f.tlsServerName.Merge(&c.TLSConfig.Address)
f.datacenter.Merge(&c.Datacenter)
return api.NewClient(c)
}

View File

@@ -16,7 +16,7 @@ const (
// blockSize is the size of the allocated port block. ports are given out
// consecutively from that block with roll-over for the lifetime of the
// application/test run.
blockSize = 500
blockSize = 1500
// maxBlocks is the number of available port blocks.
// lowPort + maxBlocks * blockSize must be less than 65535.

View File

@@ -36,14 +36,14 @@ func (cs CoordinateSet) Intersect(other CoordinateSet) (*coordinate.Coordinate,
// we are possibly a client. Any node with more than one segment can only
// be a server, which means it should be in all segments.
if len(cs) == 1 {
for s, _ := range cs {
for s := range cs {
segment = s
}
}
// Likewise for the other set.
if len(other) == 1 {
for s, _ := range other {
for s := range other {
segment = s
}
}

20
vendor/github.com/hashicorp/consul/lib/serf.go generated vendored Normal file
View File

@@ -0,0 +1,20 @@
package lib
import (
"github.com/hashicorp/serf/serf"
)
// SerfDefaultConfig returns a Consul-flavored Serf default configuration,
// suitable as a basis for a LAN, WAN, segment, or area.
func SerfDefaultConfig() *serf.Config {
base := serf.DefaultConfig()
// This effectively disables the annoying queue depth warnings.
base.QueueDepthWarning = 1000000
// This enables dynamic sizing of the message queue depth based on the
// cluster size.
base.MinQueueDepth = 4096
return base
}

View File

@@ -82,7 +82,7 @@ func decorate(s string) string {
}
func Run(t Failer, f func(r *R)) {
run(TwoSeconds(), t, f)
run(DefaultFailer(), t, f)
}
func RunWith(r Retryer, t Failer, f func(r *R)) {
@@ -133,6 +133,11 @@ func run(r Retryer, t Failer, f func(r *R)) {
}
}
// DefaultFailer provides default retry.Run() behavior for unit tests.
func DefaultFailer() *Timer {
return &Timer{Timeout: 7 * time.Second, Wait: 25 * time.Millisecond}
}
// TwoSeconds repeats an operation for two seconds and waits 25ms in between.
func TwoSeconds() *Timer {
return &Timer{Timeout: 2 * time.Second, Wait: 25 * time.Millisecond}

View File

@@ -27,7 +27,7 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/test/porter"
"github.com/hashicorp/consul/lib/freeport"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-uuid"
@@ -111,17 +111,7 @@ func defaultServerConfig() *TestServerConfig {
panic(err)
}
ports, err := porter.RandomPorts(6)
if err != nil {
if _, ok := err.(*porter.PorterExistErr); ok {
// Fall back in the case that the testutil server is being used
// without porter. This should NEVER be used for Consul's own
// unit tests. See comments for getRandomPorts() for more details.
ports = getRandomPorts(6)
} else {
panic(err)
}
}
ports := freeport.Get(6)
return &TestServerConfig{
NodeName: "node-" + nodeID,
NodeID: nodeID,
@@ -324,7 +314,7 @@ func (s *TestServer) waitForAPI() error {
}
defer resp.Body.Close()
if err := s.requireOK(resp); err != nil {
r.Fatal("failed OK respose", err)
r.Fatal("failed OK response", err)
}
})
if f.failed {
@@ -390,22 +380,3 @@ func (s *TestServer) waitForLeader() error {
}
return nil
}
// getRandomPorts returns a set of random port or panics on error. This
// is here to support external uses of testutil which may not have porter,
// but this has been shown not to work well with parallel tests (such as
// Consul's unit tests). This fallback should NEVER be used for Consul's
// own tests.
func getRandomPorts(n int) []int {
ports := make([]int, n)
for i := 0; i < n; i++ {
l, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
l.Close()
ports[i] = l.Addr().(*net.TCPAddr).Port
}
return ports
}

View File

@@ -6,6 +6,8 @@ import (
"sort"
"sync"
"time"
"github.com/armon/go-metrics"
)
// Client manages the estimated network coordinate for a given node, and adjusts
@@ -205,6 +207,20 @@ func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coo
return nil, err
}
// The code down below can handle zero RTTs, which we have seen in
// https://github.com/hashicorp/consul/issues/3789, presumably in
// environments with coarse-grained monotonic clocks (we are still
// trying to pin this down). In any event, this is ok from a code PoV
// so we don't need to alert operators with spammy messages. We did
// add a counter so this is still observable, though.
const maxRTT = 10 * time.Second
if rtt < 0 || rtt > maxRTT {
return nil, fmt.Errorf("round trip time not in valid range, duration %v is not a positive value less than %v ", rtt, maxRTT)
}
if rtt == 0 {
metrics.IncrCounter([]string{"serf", "coordinate", "zero-rtt"}, 1)
}
rttSeconds := c.latencyFilter(node, rtt.Seconds())
c.updateVivaldi(other, rttSeconds)
c.updateAdjustment(other, rttSeconds)

View File

@@ -112,6 +112,10 @@ type Config struct {
// node.
FlapTimeout time.Duration
// QueueCheckInterval is the interval at which we check the message
// queue to apply the warning and max depth.
QueueCheckInterval time.Duration
// QueueDepthWarning is used to generate warning message if the
// number of queued messages to broadcast exceeds this number. This
// is to provide the user feedback if events are being triggered
@@ -123,6 +127,12 @@ type Config struct {
// prevent an unbounded growth of memory utilization
MaxQueueDepth int
// MinQueueDepth, if >0 will enforce a lower limit for dropping messages
// and then the max will be max(MinQueueDepth, 2*SizeOfCluster). This
// defaults to 0 which disables this dynamic sizing feature. If this is
// >0 then MaxQueueDepth will be ignored.
MinQueueDepth int
// RecentIntentTimeout is used to determine how long we store recent
// join and leave intents. This is used to guard against the case where
// Serf broadcasts an intent that arrives before the Memberlist event.
@@ -253,6 +263,7 @@ func DefaultConfig() *Config {
RecentIntentTimeout: 5 * time.Minute,
ReconnectInterval: 30 * time.Second,
ReconnectTimeout: 24 * time.Hour,
QueueCheckInterval: 30 * time.Second,
QueueDepthWarning: 128,
MaxQueueDepth: 4096,
TombstoneTimeout: 24 * time.Hour,

View File

@@ -68,7 +68,8 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
before := p.serf.coordClient.GetCoordinate()
after, err := p.serf.coordClient.Update(other.Name, &coord, rtt)
if err != nil {
p.serf.logger.Printf("[ERR] serf: Rejected coordinate from %s: %v\n",
metrics.IncrCounter([]string{"serf", "coordinate", "rejected"}, 1)
p.serf.logger.Printf("[TRACE] serf: Rejected coordinate from %s: %v\n",
other.Name, err)
return
}

View File

@@ -314,7 +314,6 @@ func Create(conf *Config) (*Serf, error) {
conf.RejoinAfterLeave,
serf.logger,
&serf.clock,
serf.coordClient,
conf.EventCh,
serf.shutdownCh)
if err != nil {
@@ -1516,21 +1515,37 @@ func (s *Serf) reconnect() {
s.memberlist.Join([]string{addr.String()})
}
// getQueueMax will get the maximum queue depth, which might be dynamic depending
// on how Serf is configured.
func (s *Serf) getQueueMax() int {
max := s.config.MaxQueueDepth
if s.config.MinQueueDepth > 0 {
s.memberLock.RLock()
max = 2 * len(s.members)
s.memberLock.RUnlock()
if max < s.config.MinQueueDepth {
max = s.config.MinQueueDepth
}
}
return max
}
// checkQueueDepth periodically checks the size of a queue to see if
// it is too large
func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) {
for {
select {
case <-time.After(time.Second):
case <-time.After(s.config.QueueCheckInterval):
numq := queue.NumQueued()
metrics.AddSample([]string{"serf", "queue", name}, float32(numq))
if numq >= s.config.QueueDepthWarning {
s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq)
}
if numq > s.config.MaxQueueDepth {
if max := s.getQueueMax(); numq > max {
s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!",
name, numq, s.config.MaxQueueDepth)
queue.Prune(s.config.MaxQueueDepth)
name, numq, max)
queue.Prune(max)
}
case <-s.shutdownCh:
return
@@ -1654,11 +1669,18 @@ func (s *Serf) Stats() map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
s.memberLock.RLock()
members := toString(uint64(len(s.members)))
failed := toString(uint64(len(s.failedMembers)))
left := toString(uint64(len(s.leftMembers)))
health_score := toString(uint64(s.memberlist.GetHealthScore()))
s.memberLock.RUnlock()
stats := map[string]string{
"members": toString(uint64(len(s.members))),
"failed": toString(uint64(len(s.failedMembers))),
"left": toString(uint64(len(s.leftMembers))),
"health_score": toString(uint64(s.memberlist.GetHealthScore())),
"members": members,
"failed": failed,
"left": left,
"health_score": health_score,
"member_time": toString(uint64(s.clock.Time())),
"event_time": toString(uint64(s.eventClock.Time())),
"query_time": toString(uint64(s.queryClock.Time())),

View File

@@ -2,7 +2,6 @@ package serf
import (
"bufio"
"encoding/json"
"fmt"
"log"
"math/rand"
@@ -13,7 +12,6 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/serf/coordinate"
)
/*
@@ -29,34 +27,32 @@ old events.
const flushInterval = 500 * time.Millisecond
const clockUpdateInterval = 500 * time.Millisecond
const coordinateUpdateInterval = 60 * time.Second
const tmpExt = ".compact"
const snapshotErrorRecoveryInterval = 30 * time.Second
// Snapshotter is responsible for ingesting events and persisting
// them to disk, and providing a recovery mechanism at start time.
type Snapshotter struct {
aliveNodes map[string]string
clock *LamportClock
coordClient *coordinate.Client
fh *os.File
buffered *bufio.Writer
inCh <-chan Event
lastFlush time.Time
lastClock LamportTime
lastEventClock LamportTime
lastQueryClock LamportTime
leaveCh chan struct{}
leaving bool
logger *log.Logger
maxSize int64
path string
offset int64
outCh chan<- Event
rejoinAfterLeave bool
shutdownCh <-chan struct{}
waitCh chan struct{}
lastAttemptedCompaction time.Time
aliveNodes map[string]string
clock *LamportClock
fh *os.File
buffered *bufio.Writer
inCh <-chan Event
lastFlush time.Time
lastClock LamportTime
lastEventClock LamportTime
lastQueryClock LamportTime
leaveCh chan struct{}
leaving bool
logger *log.Logger
maxSize int64
path string
offset int64
outCh chan<- Event
rejoinAfterLeave bool
shutdownCh <-chan struct{}
waitCh chan struct{}
lastAttemptedCompaction time.Time
}
// PreviousNode is used to represent the previously known alive nodes
@@ -80,7 +76,6 @@ func NewSnapshotter(path string,
rejoinAfterLeave bool,
logger *log.Logger,
clock *LamportClock,
coordClient *coordinate.Client,
outCh chan<- Event,
shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) {
inCh := make(chan Event, 1024)
@@ -103,7 +98,6 @@ func NewSnapshotter(path string,
snap := &Snapshotter{
aliveNodes: make(map[string]string),
clock: clock,
coordClient: coordClient,
fh: fh,
buffered: bufio.NewWriter(fh),
inCh: inCh,
@@ -182,9 +176,6 @@ func (s *Snapshotter) stream() {
clockTicker := time.NewTicker(clockUpdateInterval)
defer clockTicker.Stop()
coordinateTicker := time.NewTicker(coordinateUpdateInterval)
defer coordinateTicker.Stop()
for {
select {
case <-s.leaveCh:
@@ -226,9 +217,6 @@ func (s *Snapshotter) stream() {
case <-clockTicker.C:
s.updateClock()
case <-coordinateTicker.C:
s.updateCoordinate()
case <-s.shutdownCh:
if err := s.buffered.Flush(); err != nil {
s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err)
@@ -275,20 +263,6 @@ func (s *Snapshotter) updateClock() {
}
}
// updateCoordinate is called periodically to write out the current local
// coordinate. It's safe to call this if coordinates aren't enabled (nil
// client) and it will be a no-op.
func (s *Snapshotter) updateCoordinate() {
if s.coordClient != nil {
encoded, err := json.Marshal(s.coordClient.GetCoordinate())
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode coordinate: %v", err)
} else {
s.tryAppend(fmt.Sprintf("coordinate: %s\n", encoded))
}
}
}
// processUserEvent is used to handle a single user event
func (s *Snapshotter) processUserEvent(e UserEvent) {
// Ignore old clocks
@@ -404,30 +378,22 @@ func (s *Snapshotter) compact() error {
}
offset += int64(n)
// Write out the coordinate.
if s.coordClient != nil {
encoded, err := json.Marshal(s.coordClient.GetCoordinate())
if err != nil {
fh.Close()
return err
}
line = fmt.Sprintf("coordinate: %s\n", encoded)
n, err = buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
}
// Flush the new snapshot
err = buf.Flush()
fh.Close()
if err != nil {
return fmt.Errorf("failed to flush new snapshot: %v", err)
}
err = fh.Sync()
if err != nil {
fh.Close()
return fmt.Errorf("failed to fsync new snapshot: %v", err)
}
fh.Close()
// We now need to swap the old snapshot file with the new snapshot.
// Turns out, Windows won't let us rename the files if we have
// open handles to them or if the destination already exists. This
@@ -533,22 +499,7 @@ func (s *Snapshotter) replay() error {
s.lastQueryClock = LamportTime(timeInt)
} else if strings.HasPrefix(line, "coordinate: ") {
if s.coordClient == nil {
s.logger.Printf("[WARN] serf: Ignoring snapshot coordinates since they are disabled")
continue
}
coordStr := strings.TrimPrefix(line, "coordinate: ")
var coord coordinate.Coordinate
err := json.Unmarshal([]byte(coordStr), &coord)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
continue
}
if err := s.coordClient.SetCoordinate(&coord); err != nil {
s.logger.Printf("[WARN] serf: Failed to set coordinate: %v", err)
continue
}
continue // Ignores any coordinate persistence from old snapshots, serf should re-converge
} else if line == "leave" {
// Ignore a leave if we plan on re-joining
if s.rejoinAfterLeave {

18
vendor/vendor.json vendored
View File

@@ -124,14 +124,14 @@
{"path":"github.com/hashicorp/consul-template/template","checksumSHA1":"N9qobVzScLbTEnGE7MgFnnTbGBw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
{"path":"github.com/hashicorp/consul-template/version","checksumSHA1":"NB5+D4AuCNV9Bsqh3YFdPi4AJ6U=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
{"path":"github.com/hashicorp/consul-template/watch","checksumSHA1":"b4+Y+02pY2Y5620F9ALzKg8Zmdw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
{"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"+I7fgoQlrnTUGW5krqNLadWwtjg=","revision":"d1ede2c93dec7b4580e37ef41d24371abab9d9e9","revisionTime":"2018-02-21T18:19:48Z"},
{"path":"github.com/hashicorp/consul/api","checksumSHA1":"XLfcIX2qpRr0o26aFMjCOzvw6jo=","revision":"51ea240df8476e02215d53fbfad5838bf0d44d21","revisionTime":"2017-10-16T16:22:40Z"},
{"path":"github.com/hashicorp/consul/command/flags","checksumSHA1":"XTQIYV+DPUVRKpVp0+y/78bWH3I=","revision":"d08ab9fd199434e5220276356ecf9617cfec1eb2","revisionTime":"2017-12-18T20:26:35Z"},
{"path":"github.com/hashicorp/consul/lib","checksumSHA1":"HGljdtVaqi/e3DgIHymLRLfPYhw=","revision":"bcafded4e60982d0b71e730f0b8564d73cb1d715","revisionTime":"2017-10-31T16:39:15Z"},
{"path":"github.com/hashicorp/consul/lib/freeport","checksumSHA1":"hDJiPli3EEGJE4vAezMi05oOC7o=","revision":"bcafded4e60982d0b71e730f0b8564d73cb1d715","revisionTime":"2017-10-31T16:39:15Z"},
{"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"+I7fgoQlrnTUGW5krqNLadWwtjg=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
{"path":"github.com/hashicorp/consul/api","checksumSHA1":"7UvyPiYTxcB8xqRlULAT3X8+8zE=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
{"path":"github.com/hashicorp/consul/command/flags","checksumSHA1":"soNN4xaHTbeXFgNkZ7cX0gbFXQk=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
{"path":"github.com/hashicorp/consul/lib","checksumSHA1":"Nrh9BhiivRyJiuPzttstmq9xl/w=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
{"path":"github.com/hashicorp/consul/lib/freeport","checksumSHA1":"E28E4zR1FN2v1Xiq4FUER7KVN9M=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
{"path":"github.com/hashicorp/consul/test/porter","checksumSHA1":"5XjgqE4UIfwXvkq5VssGNc7uPhQ=","revision":"ad9425ca6353b8afcfebd19130a8cf768f7eac30","revisionTime":"2017-10-21T00:05:25Z"},
{"path":"github.com/hashicorp/consul/testutil","checksumSHA1":"+go9ycmyfF4b0W174gc7ej5mnE8=","revision":"350932161d6745836c1b2f39849bddb0f9fb52fd","revisionTime":"2017-10-20T23:49:17Z"},
{"path":"github.com/hashicorp/consul/testutil/retry","checksumSHA1":"J8TTDc84MvAyXE/FrfgS+xc/b6s=","revision":"350932161d6745836c1b2f39849bddb0f9fb52fd","revisionTime":"2017-10-20T23:49:17Z"},
{"path":"github.com/hashicorp/consul/testutil","checksumSHA1":"T4CeQD+QRsjf1BJ1n7FSojS5zDQ=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
{"path":"github.com/hashicorp/consul/testutil/retry","checksumSHA1":"SCb2b91UYiB/23+SNDBlU5OZfFA=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
{"path":"github.com/hashicorp/errwrap","revision":"7554cd9344cec97297fa6649b055a8c98c2a1e55"},
{"path":"github.com/hashicorp/go-checkpoint","checksumSHA1":"D267IUMW2rcb+vNe3QU+xhfSrgY=","revision":"1545e56e46dec3bba264e41fde2c1e2aa65b5dd4","revisionTime":"2017-10-09T17:35:28Z"},
{"path":"github.com/hashicorp/go-cleanhttp","checksumSHA1":"6ihdHMkDfFx/rJ1A36com2F6bQk=","revision":"a45970658e51fea2c41445ff0f7e07106d007617","revisionTime":"2017-02-11T00:33:01Z"},
@@ -168,8 +168,8 @@
{"path":"github.com/hashicorp/net-rpc-msgpackrpc","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3"},
{"path":"github.com/hashicorp/raft","checksumSHA1":"zkA9uvbj1BdlveyqXpVTh1N6ers=","revision":"077966dbc90f342107eb723ec52fdb0463ec789b","revisionTime":"2018-01-17T20:29:25Z","version":"master","versionExact":"master"},
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"/oss17GO4hXGM7QnUdI3VzcAHzA=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"pvLOzocYsZtxuJ9pREHRTxYnoa4=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"fc4bdedf2366c64984e280c6eefc703ca7812585","revisionTime":"2018-04-11T17:01:37Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"YzJaaeIJpxLfVDZYT1X2hpd8IK8=","revision":"fc4bdedf2366c64984e280c6eefc703ca7812585","revisionTime":"2018-04-11T17:01:37Z"},
{"path":"github.com/hashicorp/vault","checksumSHA1":"eGzvBRMFD6ZB3A6uO750np7Om/E=","revision":"182ba68a9589d4cef95234134aaa498a686e3de3","revisionTime":"2016-08-21T23:40:57Z"},
{"path":"github.com/hashicorp/vault/api","checksumSHA1":"mKN4rEIWyflT6aqJyjgu9m1tPXI=","revision":"3ddd3bd20cec0588788547aecd15e91461b9d546","revisionTime":"2018-04-03T21:11:47Z"},
{"path":"github.com/hashicorp/vault/helper/compressutil","checksumSHA1":"jHVLe8KMdEpb/ZALp0zu+tenADo=","revision":"3ddd3bd20cec0588788547aecd15e91461b9d546","revisionTime":"2018-04-03T21:11:47Z"},