From a0aeb3e1d4cf6034cc726c541139d5fcdcdca343 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 19 Feb 2016 19:51:55 -0800 Subject: [PATCH] Reduce alloc lock contention in client --- client/alloc_runner.go | 5 ++++- client/client.go | 38 ++++++++++++++++++++++++++------------ 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d3d1ba444..a8bb0186d 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -344,7 +344,10 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv if r.taskReceivedTimer == nil { r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() { // Send a dirty signal to sync our state. - r.dirtyCh <- struct{}{} + select { + case r.dirtyCh <- struct{}{}: + default: + } }) } return diff --git a/client/client.go b/client/client.go index 17669fb5b..1103e8a7e 100644 --- a/client/client.go +++ b/client/client.go @@ -447,9 +447,7 @@ func (c *Client) saveState() error { } var mErr multierror.Error - c.allocLock.RLock() - defer c.allocLock.RUnlock() - for id, ar := range c.allocs { + for id, ar := range c.getAllocRunners() { if err := ar.SaveState(); err != nil { c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err) @@ -459,6 +457,17 @@ func (c *Client) saveState() error { return mErr.ErrorOrNil() } +// getAllocRunners returns a snapshot of the current set of alloc runners. +func (c *Client) getAllocRunners() map[string]*AllocRunner { + c.allocLock.RLock() + defer c.allocLock.RUnlock() + runners := make(map[string]*AllocRunner, len(c.allocs)) + for id, ar := range c.allocs { + runners[id] = ar + } + return runners +} + // nodeID restores a persistent unique ID or generates a new one func (c *Client) nodeID() (string, error) { // Do not persist in dev mode @@ -887,18 +896,18 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { // need to pull all the allocations. var pull []string filtered := make(map[string]struct{}) - c.allocLock.Lock() + runners := c.getAllocRunners() for allocID, modifyIndex := range resp.Allocs { // Pull the allocation if we don't have an alloc runner for the // allocation or if the alloc runner requires an updated allocation. - runner, ok := c.allocs[allocID] + runner, ok := runners[allocID] if !ok || runner.shouldUpdate(modifyIndex) { pull = append(pull, allocID) } else { filtered[allocID] = struct{}{} } } - c.allocLock.Unlock() + c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)", resp.Index, len(pull), len(filtered)) @@ -1023,38 +1032,43 @@ func (c *Client) runAllocs(update *allocUpdates) { // removeAlloc is invoked when we should remove an allocation func (c *Client) removeAlloc(alloc *structs.Allocation) error { c.allocLock.Lock() - defer c.allocLock.Unlock() ar, ok := c.allocs[alloc.ID] if !ok { + c.allocLock.Unlock() c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID) return nil } - ar.Destroy() delete(c.allocs, alloc.ID) + c.allocLock.Unlock() + + ar.Destroy() return nil } // updateAlloc is invoked when we should update an allocation func (c *Client) updateAlloc(exist, update *structs.Allocation) error { c.allocLock.RLock() - defer c.allocLock.RUnlock() ar, ok := c.allocs[exist.ID] + c.allocLock.RUnlock() if !ok { c.logger.Printf("[WARN] client: missing context for alloc '%s'", exist.ID) return nil } + ar.Update(update) return nil } // addAlloc is invoked when we should add an allocation func (c *Client) addAlloc(alloc *structs.Allocation) error { - c.allocLock.Lock() - defer c.allocLock.Unlock() c.configLock.RLock() ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.consulService) c.configLock.RUnlock() - c.allocs[alloc.ID] = ar go ar.Run() + + // Store the alloc runner. + c.allocLock.Lock() + c.allocs[alloc.ID] = ar + c.allocLock.Unlock() return nil }