diff --git a/client/client.go b/client/client.go index fd1a9c5df..199f1a9bd 100644 --- a/client/client.go +++ b/client/client.go @@ -27,11 +27,14 @@ const ( // registerRetryIntv is minimum interval on which we retry // registration. We pick a value between this and 2x this. - registerRetryIntv = 30 * time.Second + registerRetryIntv = 15 * time.Second // getAllocRetryIntv is minimum interval on which we retry // to fetch allocations. We pick a value between this and 2x this. getAllocRetryIntv = 30 * time.Second + + // devModeRetryIntv is the retry interval used for development + devModeRetryIntv = time.Second ) // RPCHandler can be provided to the Client if there is a local server @@ -344,6 +347,14 @@ func (c *Client) setupDrivers() error { return nil } +// retryIntv calculates a retry interval value given the base +func (c *Client) retryIntv(base time.Duration) time.Duration { + if c.config.DevMode { + return devModeRetryIntv + } + return base + randomStagger(base) +} + // run is a long lived goroutine used to run the client func (c *Client) run() { // Register the client @@ -352,7 +363,7 @@ func (c *Client) run() { break } select { - case <-time.After(registerRetryIntv + randomStagger(registerRetryIntv)): + case <-time.After(c.retryIntv(registerRetryIntv)): case <-c.shutdownCh: return } @@ -366,19 +377,21 @@ func (c *Client) run() { go c.watchAllocations(allocUpdates) // Periodically update our status and wait for termination - select { - case allocs := <-allocUpdates: - c.runAllocs(allocs) + for { + select { + case allocs := <-allocUpdates: + c.runAllocs(allocs) - case <-heartbeat: - if err := c.updateNodeStatus(); err != nil { - heartbeat = time.After(registerRetryIntv + randomStagger(registerRetryIntv)) - } else { - heartbeat = time.After(c.heartbeatTTL) + case <-heartbeat: + if err := c.updateNodeStatus(); err != nil { + heartbeat = time.After(c.retryIntv(registerRetryIntv)) + } else { + heartbeat = time.After(c.heartbeatTTL) + } + + case <-c.shutdownCh: + return } - - case <-c.shutdownCh: - return } } @@ -422,7 +435,7 @@ func (c *Client) updateNodeStatus() error { c.logger.Printf("[DEBUG] client: %d evaluations triggered by node update", len(resp.EvalIDs)) } if resp.Index != 0 { - c.logger.Printf("[DEBUG] client: client state updated") + c.logger.Printf("[DEBUG] client: state updated to %s", req.Status) } c.lastHeartbeat = time.Now() c.heartbeatTTL = resp.HeartbeatTTL @@ -431,13 +444,11 @@ func (c *Client) updateNodeStatus() error { // watchAllocations is used to scan for updates to allocations func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { - var lastIndex uint64 req := structs.NodeSpecificRequest{ NodeID: c.Node().ID, QueryOptions: structs.QueryOptions{ - Region: c.config.Region, - MinQueryIndex: lastIndex, - AllowStale: true, + Region: c.config.Region, + AllowStale: true, }, } var resp structs.NodeAllocsResponse @@ -447,7 +458,7 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { err := c.RPC("Client.GetAllocs", &req, &resp) if err != nil { c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err) - retry := getAllocRetryIntv + randomStagger(getAllocRetryIntv) + retry := c.retryIntv(getAllocRetryIntv) select { case <-time.After(retry): continue @@ -464,11 +475,11 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { } // Check for updates - if resp.Index == lastIndex { + if resp.Index <= req.MinQueryIndex { continue } - lastIndex = resp.Index - c.logger.Printf("[DEBUG] client: updated allocations at index %d (%d allocs)", lastIndex, len(resp.Allocs)) + req.MinQueryIndex = resp.Index + c.logger.Printf("[DEBUG] client: updated allocations at index %d (%d allocs)", resp.Index, len(resp.Allocs)) // Push the updates select {