Merge pull request #2649 from hashicorp/b-update-consul-api-vendor

Update consul/api
This commit is contained in:
Michael Schurter
2017-05-30 16:15:11 -07:00
committed by GitHub
19 changed files with 253 additions and 98 deletions

View File

@@ -1,6 +1,7 @@
package config
import (
"net/http"
"strings"
"time"
@@ -144,6 +145,8 @@ func (a *ConsulConfig) Merge(b *ConsulConfig) *ConsulConfig {
// ApiConfig returns a usable Consul config that can be passed directly to
// hashicorp/consul/api. NOTE: datacenter is not set
func (c *ConsulConfig) ApiConfig() (*consul.Config, error) {
// Get the default config from consul to reuse things like the default
// http.Transport.
config := consul.DefaultConfig()
if c.Addr != "" {
config.Address = c.Addr
@@ -152,7 +155,12 @@ func (c *ConsulConfig) ApiConfig() (*consul.Config, error) {
config.Token = c.Token
}
if c.Timeout != 0 {
// Create a custom Client to set the timeout
if config.HttpClient == nil {
config.HttpClient = &http.Client{}
}
config.HttpClient.Timeout = c.Timeout
config.HttpClient.Transport = config.Transport
}
if c.Auth != "" {
var username, password string
@@ -180,6 +188,11 @@ func (c *ConsulConfig) ApiConfig() (*consul.Config, error) {
if c.VerifySSL != nil {
config.TLSConfig.InsecureSkipVerify = !*c.VerifySSL
}
tlsConfig, err := consul.SetupTLSConfig(&config.TLSConfig)
if err != nil {
return nil, err
}
config.Transport.TLSClientConfig = tlsConfig
}
return config, nil

View File

@@ -14,6 +14,12 @@ import (
"time"
)
func init() {
// Seed the default rand Source with current time to produce better random
// numbers used with splay
rand.Seed(time.Now().UnixNano())
}
var (
// ErrMissingCommand is the error returned when no command is specified
// to run.

View File

@@ -8,11 +8,14 @@ import (
const (
// DefaultRetryAttempts is the default number of maximum retry attempts.
DefaultRetryAttempts = 5
DefaultRetryAttempts = 12
// DefaultRetryBackoff is the default base for the exponential backoff
// algorithm.
DefaultRetryBackoff = 250 * time.Millisecond
// DefaultRetryMaxBackoff is the default maximum of backoff time
DefaultRetryMaxBackoff = 1 * time.Minute
)
// RetryFunc is the signature of a function that supports retries.
@@ -23,12 +26,17 @@ type RetryFunc func(int) (bool, time.Duration)
type RetryConfig struct {
// Attempts is the total number of maximum attempts to retry before letting
// the error fall through.
// 0 means unlimited.
Attempts *int
// Backoff is the base of the exponentialbackoff. This number will be
// multipled by the next power of 2 on each iteration.
Backoff *time.Duration
// MaxBackoff is an upper limit to the sleep time between retries
// A MaxBackoff of zero means there is no limit to the exponential growth of the backoff.
MaxBackoff *time.Duration `mapstructure:"max_backoff"`
// Enabled signals if this retry is enabled.
Enabled *bool
}
@@ -51,6 +59,8 @@ func (c *RetryConfig) Copy() *RetryConfig {
o.Backoff = c.Backoff
o.MaxBackoff = c.MaxBackoff
o.Enabled = c.Enabled
return &o
@@ -82,6 +92,10 @@ func (c *RetryConfig) Merge(o *RetryConfig) *RetryConfig {
r.Backoff = o.Backoff
}
if o.MaxBackoff != nil {
r.MaxBackoff = o.MaxBackoff
}
if o.Enabled != nil {
r.Enabled = o.Enabled
}
@@ -103,6 +117,11 @@ func (c *RetryConfig) RetryFunc() RetryFunc {
base := math.Pow(2, float64(retry))
sleep := time.Duration(base) * TimeDurationVal(c.Backoff)
maxSleep := TimeDurationVal(c.MaxBackoff)
if maxSleep > 0 && maxSleep < sleep {
return true, maxSleep
}
return true, sleep
}
}
@@ -117,6 +136,10 @@ func (c *RetryConfig) Finalize() {
c.Backoff = TimeDuration(DefaultRetryBackoff)
}
if c.MaxBackoff == nil {
c.MaxBackoff = TimeDuration(DefaultRetryMaxBackoff)
}
if c.Enabled == nil {
c.Enabled = Bool(true)
}
@@ -131,10 +154,12 @@ func (c *RetryConfig) GoString() string {
return fmt.Sprintf("&RetryConfig{"+
"Attempts:%s, "+
"Backoff:%s, "+
"MaxBackoff:%s, "+
"Enabled:%s"+
"}",
IntGoString(c.Attempts),
TimeDurationGoString(c.Backoff),
TimeDurationGoString(c.MaxBackoff),
BoolGoString(c.Enabled),
)
}

View File

@@ -128,6 +128,7 @@ func (d *CatalogNodeQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interf
ID: node.Node.ID,
Node: node.Node.Node,
Address: node.Node.Address,
Datacenter: node.Node.Datacenter,
TaggedAddresses: node.Node.TaggedAddresses,
Meta: node.Node.Meta,
},

View File

@@ -28,6 +28,7 @@ type Node struct {
ID string
Node string
Address string
Datacenter string
TaggedAddresses map[string]string
Meta map[string]string
}
@@ -86,6 +87,7 @@ func (d *CatalogNodesQuery) Fetch(clients *ClientSet, opts *QueryOptions) (inter
ID: node.ID,
Node: node.Node,
Address: node.Address,
Datacenter: node.Datacenter,
TaggedAddresses: node.TaggedAddresses,
Meta: node.Meta,
})

View File

@@ -27,6 +27,7 @@ type CatalogService struct {
ID string
Node string
Address string
Datacenter string
TaggedAddresses map[string]string
NodeMeta map[string]string
ServiceID string
@@ -101,6 +102,7 @@ func (d *CatalogServiceQuery) Fetch(clients *ClientSet, opts *QueryOptions) (int
ID: s.ID,
Node: s.Node,
Address: s.Address,
Datacenter: s.Datacenter,
TaggedAddresses: s.TaggedAddresses,
NodeMeta: s.NodeMeta,
ServiceID: s.ServiceID,

View File

@@ -25,8 +25,8 @@ type ClientSet struct {
// consulClient is a wrapper around a real Consul API client.
type consulClient struct {
client *consulapi.Client
httpClient *http.Client
client *consulapi.Client
transport *http.Transport
}
// vaultClient is a wrapper around a real Vault API client.
@@ -169,7 +169,7 @@ func (c *ClientSet) CreateConsulClient(i *CreateConsulClientInput) error {
}
// Setup the new transport
consulConfig.HttpClient.Transport = transport
consulConfig.Transport = transport
// Create the API client
client, err := consulapi.NewClient(consulConfig)
@@ -180,8 +180,8 @@ func (c *ClientSet) CreateConsulClient(i *CreateConsulClientInput) error {
// Save the data on ourselves
c.Lock()
c.consul = &consulClient{
client: client,
httpClient: consulConfig.HttpClient,
client: client,
transport: transport,
}
c.Unlock()
@@ -323,7 +323,7 @@ func (c *ClientSet) Stop() {
defer c.Unlock()
if c.consul != nil {
c.consul.httpClient.Transport.(*http.Transport).CloseIdleConnections()
c.consul.transport.CloseIdleConnections()
}
if c.vault != nil {

View File

@@ -47,7 +47,7 @@ type HealthService struct {
ID string
Name string
Tags ServiceTags
Checks []*api.HealthCheck
Checks api.HealthChecks
Status string
Port int
}

View File

@@ -295,6 +295,9 @@ func (d *DedupManager) watchTemplate(client *consulapi.Client, t *template.Templ
WaitTime: 60 * time.Second,
}
var lastData []byte
var lastIndex uint64
START:
// Stop listening if we're stopped
select {
@@ -330,6 +333,13 @@ START:
}
opts.WaitIndex = meta.LastIndex
// Stop listening if we're stopped
select {
case <-d.stopCh:
return
default:
}
// If we've exceeded the maximum staleness, retry without stale
if allowStale && meta.LastContact > *d.config.MaxStale {
allowStale = false
@@ -342,13 +352,28 @@ START:
allowStale = true
}
// Stop listening if we're stopped
select {
case <-d.stopCh:
return
default:
if meta.LastIndex == lastIndex {
log.Printf("[TRACE] (dedup) %s no new data (index was the same)", path)
goto START
}
if meta.LastIndex < lastIndex {
log.Printf("[TRACE] (dedup) %s had a lower index, resetting", path)
lastIndex = 0
goto START
}
lastIndex = meta.LastIndex
var data []byte
if pair != nil {
data = pair.Value
}
if bytes.Equal(lastData, data) {
log.Printf("[TRACE] (dedup) %s no new data (contents were the same)", path)
goto START
}
lastData = data
// If we are current the leader, wait for leadership lost
d.leaderLock.RLock()
lockCh, ok = d.leader[t]

View File

@@ -101,7 +101,7 @@ func (s *Scratch) MapValues(k string) ([]interface{}, error) {
typed, ok := s.values[k].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("%q is not a map", k)
return nil, nil
}
keys := make([]string, 0, len(typed))

View File

@@ -108,9 +108,12 @@ func (v *View) poll(viewCh chan<- *View, errCh chan<- error) {
var retries int
for {
doneCh, fetchErrCh := make(chan struct{}, 1), make(chan error, 1)
go v.fetch(doneCh, fetchErrCh)
doneCh := make(chan struct{}, 1)
successCh := make(chan struct{}, 1)
fetchErrCh := make(chan error, 1)
go v.fetch(doneCh, successCh, fetchErrCh)
WAIT:
select {
case <-doneCh:
// Reset the retry to avoid exponentially incrementing retries when we
@@ -129,6 +132,16 @@ func (v *View) poll(viewCh chan<- *View, errCh chan<- error) {
if v.once {
return
}
case <-successCh:
// We successfully received a non-error response from the server. This
// does not mean we have data (that's dataCh's job), but rather this
// just resets the counter indicating we communciated successfully. For
// example, Consul make have an outage, but when it returns, the view
// is unchanged. We have to reset the counter retries, but not update the
// actual template.
log.Printf("[TRACE] view %s successful contact, resetting retries", v.dependency)
retries = 0
goto WAIT
case err := <-fetchErrCh:
if v.retryFunc != nil {
retry, sleep := v.retryFunc(retries)
@@ -166,7 +179,7 @@ func (v *View) poll(viewCh chan<- *View, errCh chan<- error) {
// written to errCh. It is designed to be run in a goroutine that selects the
// result of doneCh and errCh. It is assumed that only one instance of fetch
// is running per View and therefore no locking or mutexes are used.
func (v *View) fetch(doneCh chan<- struct{}, errCh chan<- error) {
func (v *View) fetch(doneCh, successCh chan<- struct{}, errCh chan<- error) {
log.Printf("[TRACE] (view) %s starting fetch", v.dependency)
var allowStale bool
@@ -203,6 +216,15 @@ func (v *View) fetch(doneCh chan<- struct{}, errCh chan<- error) {
return
}
// If we got this far, we received data successfully. That data might not
// trigger a data update (because we could continue below), but we need to
// inform the poller to reset the retry count.
log.Printf("[TRACE] (view) %s marking successful data response", v.dependency)
select {
case successCh <- struct{}{}:
default:
}
if allowStale && rm.LastContact > v.maxStale {
allowStale = false
log.Printf("[TRACE] (view) %s stale data (last contact exceeded max_stale)", v.dependency)

View File

@@ -1,5 +1,9 @@
package api
import (
"time"
)
const (
// ACLCLientType is the client type token
ACLClientType = "client"
@@ -18,6 +22,16 @@ type ACLEntry struct {
Rules string
}
// ACLReplicationStatus is used to represent the status of ACL replication.
type ACLReplicationStatus struct {
Enabled bool
Running bool
SourceDatacenter string
ReplicatedIndex uint64
LastSuccess time.Time
LastError time.Time
}
// ACL can be used to query the ACL endpoints
type ACL struct {
c *Client
@@ -138,3 +152,24 @@ func (a *ACL) List(q *QueryOptions) ([]*ACLEntry, *QueryMeta, error) {
}
return entries, qm, nil
}
// Replication returns the status of the ACL replication process in the datacenter
func (a *ACL) Replication(q *QueryOptions) (*ACLReplicationStatus, *QueryMeta, error) {
r := a.c.newRequest("GET", "/v1/acl/replication")
r.setQueryOptions(q)
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var entries *ACLReplicationStatus
if err := decodeBody(resp, &entries); err != nil {
return nil, nil, err
}
return entries, qm, nil
}

View File

@@ -168,6 +168,9 @@ type Config struct {
// Datacenter to use. If not provided, the default agent datacenter is used.
Datacenter string
// Transport is the Transport to use for the http client.
Transport *http.Transport
// HttpClient is the client to use. Default will be
// used if not provided.
HttpClient *http.Client
@@ -237,11 +240,9 @@ func DefaultNonPooledConfig() *Config {
// given function to make the transport.
func defaultConfig(transportFn func() *http.Transport) *Config {
config := &Config{
Address: "127.0.0.1:8500",
Scheme: "http",
HttpClient: &http.Client{
Transport: transportFn(),
},
Address: "127.0.0.1:8500",
Scheme: "http",
Transport: transportFn(),
}
if addr := os.Getenv(HTTPAddrEnvName); addr != "" {
@@ -364,8 +365,8 @@ func NewClient(config *Config) (*Client, error) {
config.Scheme = defConfig.Scheme
}
if config.HttpClient == nil {
config.HttpClient = defConfig.HttpClient
if config.Transport == nil {
config.Transport = defConfig.Transport
}
if config.TLSConfig.Address == "" {
@@ -392,17 +393,14 @@ func NewClient(config *Config) (*Client, error) {
config.TLSConfig.InsecureSkipVerify = defConfig.TLSConfig.InsecureSkipVerify
}
tlsClientConfig, err := SetupTLSConfig(&config.TLSConfig)
// We don't expect this to fail given that we aren't
// parsing any of the input, but we panic just in case
// since this doesn't have an error return.
if err != nil {
return nil, err
if config.HttpClient == nil {
var err error
config.HttpClient, err = NewHttpClient(config.Transport, config.TLSConfig)
if err != nil {
return nil, err
}
}
config.HttpClient.Transport.(*http.Transport).TLSClientConfig = tlsClientConfig
parts := strings.SplitN(config.Address, "://", 2)
if len(parts) == 2 {
switch parts[0] {
@@ -429,6 +427,26 @@ func NewClient(config *Config) (*Client, error) {
return client, nil
}
// NewHttpClient returns an http client configured with the given Transport and TLS
// config.
func NewHttpClient(transport *http.Transport, tlsConf TLSConfig) (*http.Client, error) {
client := &http.Client{
Transport: transport,
}
if transport.TLSClientConfig == nil {
tlsClientConfig, err := SetupTLSConfig(&tlsConf)
if err != nil {
return nil, err
}
transport.TLSClientConfig = tlsClientConfig
}
return client, nil
}
// request is used to help build up a request
type request struct {
config *Config
@@ -528,11 +546,11 @@ func (r *request) toHTTP() (*http.Request, error) {
// Check if we should encode the body
if r.body == nil && r.obj != nil {
if b, err := encodeBody(r.obj); err != nil {
b, err := encodeBody(r.obj)
if err != nil {
return nil, err
} else {
r.body = b
}
r.body = b
}
// Create the HTTP request

View File

@@ -4,6 +4,7 @@ type Node struct {
ID string
Node string
Address string
Datacenter string
TaggedAddresses map[string]string
Meta map[string]string
CreateIndex uint64
@@ -14,6 +15,7 @@ type CatalogService struct {
ID string
Node string
Address string
Datacenter string
TaggedAddresses map[string]string
NodeMeta map[string]string
ServiceID string

View File

@@ -33,6 +33,7 @@ type HealthCheck struct {
Output string
ServiceID string
ServiceName string
ServiceTags []string
}
// HealthChecks is a collection of HealthCheck structs.

View File

@@ -49,17 +49,18 @@ type KVPairs []*KVPair
type KVOp string
const (
KVSet KVOp = "set"
KVDelete KVOp = "delete"
KVDeleteCAS KVOp = "delete-cas"
KVDeleteTree KVOp = "delete-tree"
KVCAS KVOp = "cas"
KVLock KVOp = "lock"
KVUnlock KVOp = "unlock"
KVGet KVOp = "get"
KVGetTree KVOp = "get-tree"
KVCheckSession KVOp = "check-session"
KVCheckIndex KVOp = "check-index"
KVSet KVOp = "set"
KVDelete KVOp = "delete"
KVDeleteCAS KVOp = "delete-cas"
KVDeleteTree KVOp = "delete-tree"
KVCAS KVOp = "cas"
KVLock KVOp = "lock"
KVUnlock KVOp = "unlock"
KVGet KVOp = "get"
KVGetTree KVOp = "get-tree"
KVCheckSession KVOp = "check-session"
KVCheckIndex KVOp = "check-index"
KVCheckNotExists KVOp = "check-not-exists"
)
// KVTxnOp defines a single operation inside a transaction.

View File

@@ -143,22 +143,23 @@ func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Check if we need to create a session first
l.lockSession = l.opts.Session
if l.lockSession == "" {
if s, err := l.createSession(); err != nil {
s, err := l.createSession()
if err != nil {
return nil, fmt.Errorf("failed to create session: %v", err)
} else {
l.sessionRenew = make(chan struct{})
l.lockSession = s
session := l.c.Session()
go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
// If we fail to acquire the lock, cleanup the session
defer func() {
if !l.isHeld {
close(l.sessionRenew)
l.sessionRenew = nil
}
}()
}
l.sessionRenew = make(chan struct{})
l.lockSession = s
session := l.c.Session()
go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
// If we fail to acquire the lock, cleanup the session
defer func() {
if !l.isHeld {
close(l.sessionRenew)
l.sessionRenew = nil
}
}()
}
// Setup the query options

View File

@@ -155,22 +155,23 @@ func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Check if we need to create a session first
s.lockSession = s.opts.Session
if s.lockSession == "" {
if sess, err := s.createSession(); err != nil {
sess, err := s.createSession()
if err != nil {
return nil, fmt.Errorf("failed to create session: %v", err)
} else {
s.sessionRenew = make(chan struct{})
s.lockSession = sess
session := s.c.Session()
go session.RenewPeriodic(s.opts.SessionTTL, sess, nil, s.sessionRenew)
// If we fail to acquire the lock, cleanup the session
defer func() {
if !s.isHeld {
close(s.sessionRenew)
s.sessionRenew = nil
}
}()
}
s.sessionRenew = make(chan struct{})
s.lockSession = sess
session := s.c.Session()
go session.RenewPeriodic(s.opts.SessionTTL, sess, nil, s.sessionRenew)
// If we fail to acquire the lock, cleanup the session
defer func() {
if !s.isHeld {
close(s.sessionRenew)
s.sessionRenew = nil
}
}()
}
// Create the contender entry

46
vendor/vendor.json vendored
View File

@@ -591,46 +591,46 @@
"revision": "a557574d6c024ed6e36acc8b610f5f211c91568a"
},
{
"checksumSHA1": "gx2CAg/v3k7kfBA/rT5NCkI0jDI=",
"checksumSHA1": "Nu2j1GusM7ZH0uYrGzqr1K7yH7I=",
"path": "github.com/hashicorp/consul-template/child",
"revision": "e79894aad0b3789b93d0372e23f6eb0d2b75b35a",
"revisionTime": "2017-03-28T18:42:41Z"
"revision": "92746fc5cf86dbb113558bacec43459a65c8df14",
"revisionTime": "2017-05-26T18:30:17Z"
},
{
"checksumSHA1": "VMDorxQ1u/r2BYZ/azJd71UQi4A=",
"checksumSHA1": "7TBPXChZZS84qZbzP7qFYeQding=",
"path": "github.com/hashicorp/consul-template/config",
"revision": "e79894aad0b3789b93d0372e23f6eb0d2b75b35a",
"revisionTime": "2017-03-28T18:42:41Z"
"revision": "92746fc5cf86dbb113558bacec43459a65c8df14",
"revisionTime": "2017-05-26T18:30:17Z"
},
{
"checksumSHA1": "kvyZVRAc/JG3Ua1deyKaFtOrlqc=",
"checksumSHA1": "7rKifM082rlbHN9EcsVyu7VXLoo=",
"path": "github.com/hashicorp/consul-template/dependency",
"revision": "e79894aad0b3789b93d0372e23f6eb0d2b75b35a",
"revisionTime": "2017-03-28T18:42:41Z"
"revision": "92746fc5cf86dbb113558bacec43459a65c8df14",
"revisionTime": "2017-05-26T18:30:17Z"
},
{
"checksumSHA1": "QHR0vkzQMi3UH7q1HdV2QhxrOt8=",
"checksumSHA1": "Ci5EmLs/h7ke9bUg7a34UfTbB5U=",
"path": "github.com/hashicorp/consul-template/manager",
"revision": "e79894aad0b3789b93d0372e23f6eb0d2b75b35a",
"revisionTime": "2017-03-28T18:42:41Z"
"revision": "92746fc5cf86dbb113558bacec43459a65c8df14",
"revisionTime": "2017-05-26T18:30:17Z"
},
{
"checksumSHA1": "oskgb0WteBKOItG8NNDduM7E/D0=",
"path": "github.com/hashicorp/consul-template/signals",
"revision": "e79894aad0b3789b93d0372e23f6eb0d2b75b35a",
"revisionTime": "2017-03-28T18:42:41Z"
"revision": "92746fc5cf86dbb113558bacec43459a65c8df14",
"revisionTime": "2017-05-26T18:30:17Z"
},
{
"checksumSHA1": "KFFY15i/0MuTL2z6OzbQfB4xIBE=",
"checksumSHA1": "804hk7BQd6V2xjBwz+cE0hdzSlI=",
"path": "github.com/hashicorp/consul-template/template",
"revision": "e79894aad0b3789b93d0372e23f6eb0d2b75b35a",
"revisionTime": "2017-03-28T18:42:41Z"
"revision": "92746fc5cf86dbb113558bacec43459a65c8df14",
"revisionTime": "2017-05-26T18:30:17Z"
},
{
"checksumSHA1": "cl9R28+I+YT6a0Z+KQFP//wuC+0=",
"checksumSHA1": "KjcelGP7qPh0ObKouBJuHmXUjqk=",
"path": "github.com/hashicorp/consul-template/watch",
"revision": "e79894aad0b3789b93d0372e23f6eb0d2b75b35a",
"revisionTime": "2017-03-28T18:42:41Z"
"revision": "92746fc5cf86dbb113558bacec43459a65c8df14",
"revisionTime": "2017-05-26T18:30:17Z"
},
{
"checksumSHA1": "jfELEMRhiTcppZmRH+ZwtkVS5Uw=",
@@ -639,10 +639,10 @@
"revisionTime": "2017-04-17T18:01:43Z"
},
{
"checksumSHA1": "k8spDLTgdEFy15C1AdBJLAW+Zng=",
"checksumSHA1": "RmhTKLvlDtxNPKZFnPYnfG/HzrI=",
"path": "github.com/hashicorp/consul/api",
"revision": "e9ca44d0a1757ac9aecc6785904a701936c10e4a",
"revisionTime": "2017-04-17T18:01:43Z"
"revision": "eea8f4ce75e8e6ff97c9913d89f687e8f8489ce6",
"revisionTime": "2017-05-30T15:52:51Z"
},
{
"checksumSHA1": "Z1N3jX/5B7GbLNfNp5GTxrsJItc=",