client: faster retries in dev mode

This commit is contained in:
Armon Dadgar
2015-08-23 17:40:14 -07:00
parent fdd1331db6
commit f208364a3e

View File

@@ -27,11 +27,14 @@ const (
// registerRetryIntv is minimum interval on which we retry
// registration. We pick a value between this and 2x this.
registerRetryIntv = 30 * time.Second
registerRetryIntv = 15 * time.Second
// getAllocRetryIntv is minimum interval on which we retry
// to fetch allocations. We pick a value between this and 2x this.
getAllocRetryIntv = 30 * time.Second
// devModeRetryIntv is the retry interval used for development
devModeRetryIntv = time.Second
)
// RPCHandler can be provided to the Client if there is a local server
@@ -344,6 +347,14 @@ func (c *Client) setupDrivers() error {
return nil
}
// retryIntv calculates a retry interval value given the base
func (c *Client) retryIntv(base time.Duration) time.Duration {
if c.config.DevMode {
return devModeRetryIntv
}
return base + randomStagger(base)
}
// run is a long lived goroutine used to run the client
func (c *Client) run() {
// Register the client
@@ -352,7 +363,7 @@ func (c *Client) run() {
break
}
select {
case <-time.After(registerRetryIntv + randomStagger(registerRetryIntv)):
case <-time.After(c.retryIntv(registerRetryIntv)):
case <-c.shutdownCh:
return
}
@@ -366,19 +377,21 @@ func (c *Client) run() {
go c.watchAllocations(allocUpdates)
// Periodically update our status and wait for termination
select {
case allocs := <-allocUpdates:
c.runAllocs(allocs)
for {
select {
case allocs := <-allocUpdates:
c.runAllocs(allocs)
case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
heartbeat = time.After(registerRetryIntv + randomStagger(registerRetryIntv))
} else {
heartbeat = time.After(c.heartbeatTTL)
case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
heartbeat = time.After(c.retryIntv(registerRetryIntv))
} else {
heartbeat = time.After(c.heartbeatTTL)
}
case <-c.shutdownCh:
return
}
case <-c.shutdownCh:
return
}
}
@@ -422,7 +435,7 @@ func (c *Client) updateNodeStatus() error {
c.logger.Printf("[DEBUG] client: %d evaluations triggered by node update", len(resp.EvalIDs))
}
if resp.Index != 0 {
c.logger.Printf("[DEBUG] client: client state updated")
c.logger.Printf("[DEBUG] client: state updated to %s", req.Status)
}
c.lastHeartbeat = time.Now()
c.heartbeatTTL = resp.HeartbeatTTL
@@ -431,13 +444,11 @@ func (c *Client) updateNodeStatus() error {
// watchAllocations is used to scan for updates to allocations
func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
var lastIndex uint64
req := structs.NodeSpecificRequest{
NodeID: c.Node().ID,
QueryOptions: structs.QueryOptions{
Region: c.config.Region,
MinQueryIndex: lastIndex,
AllowStale: true,
Region: c.config.Region,
AllowStale: true,
},
}
var resp structs.NodeAllocsResponse
@@ -447,7 +458,7 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
err := c.RPC("Client.GetAllocs", &req, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err)
retry := getAllocRetryIntv + randomStagger(getAllocRetryIntv)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-time.After(retry):
continue
@@ -464,11 +475,11 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
}
// Check for updates
if resp.Index == lastIndex {
if resp.Index <= req.MinQueryIndex {
continue
}
lastIndex = resp.Index
c.logger.Printf("[DEBUG] client: updated allocations at index %d (%d allocs)", lastIndex, len(resp.Allocs))
req.MinQueryIndex = resp.Index
c.logger.Printf("[DEBUG] client: updated allocations at index %d (%d allocs)", resp.Index, len(resp.Allocs))
// Push the updates
select {