Reduce alloc lock contention in client

This commit is contained in:
Alex Dadgar
2016-02-19 19:51:55 -08:00
parent 312cdb51b7
commit a0aeb3e1d4
2 changed files with 30 additions and 13 deletions

View File

@@ -344,7 +344,10 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
if r.taskReceivedTimer == nil {
r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() {
// Send a dirty signal to sync our state.
r.dirtyCh <- struct{}{}
select {
case r.dirtyCh <- struct{}{}:
default:
}
})
}
return

View File

@@ -447,9 +447,7 @@ func (c *Client) saveState() error {
}
var mErr multierror.Error
c.allocLock.RLock()
defer c.allocLock.RUnlock()
for id, ar := range c.allocs {
for id, ar := range c.getAllocRunners() {
if err := ar.SaveState(); err != nil {
c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v",
id, err)
@@ -459,6 +457,17 @@ func (c *Client) saveState() error {
return mErr.ErrorOrNil()
}
// getAllocRunners returns a snapshot of the current set of alloc runners.
func (c *Client) getAllocRunners() map[string]*AllocRunner {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
runners := make(map[string]*AllocRunner, len(c.allocs))
for id, ar := range c.allocs {
runners[id] = ar
}
return runners
}
// nodeID restores a persistent unique ID or generates a new one
func (c *Client) nodeID() (string, error) {
// Do not persist in dev mode
@@ -887,18 +896,18 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
// need to pull all the allocations.
var pull []string
filtered := make(map[string]struct{})
c.allocLock.Lock()
runners := c.getAllocRunners()
for allocID, modifyIndex := range resp.Allocs {
// Pull the allocation if we don't have an alloc runner for the
// allocation or if the alloc runner requires an updated allocation.
runner, ok := c.allocs[allocID]
runner, ok := runners[allocID]
if !ok || runner.shouldUpdate(modifyIndex) {
pull = append(pull, allocID)
} else {
filtered[allocID] = struct{}{}
}
}
c.allocLock.Unlock()
c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)",
resp.Index, len(pull), len(filtered))
@@ -1023,38 +1032,43 @@ func (c *Client) runAllocs(update *allocUpdates) {
// removeAlloc is invoked when we should remove an allocation
func (c *Client) removeAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ar, ok := c.allocs[alloc.ID]
if !ok {
c.allocLock.Unlock()
c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID)
return nil
}
ar.Destroy()
delete(c.allocs, alloc.ID)
c.allocLock.Unlock()
ar.Destroy()
return nil
}
// updateAlloc is invoked when we should update an allocation
func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
ar, ok := c.allocs[exist.ID]
c.allocLock.RUnlock()
if !ok {
c.logger.Printf("[WARN] client: missing context for alloc '%s'", exist.ID)
return nil
}
ar.Update(update)
return nil
}
// addAlloc is invoked when we should add an allocation
func (c *Client) addAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.consulService)
c.configLock.RUnlock()
c.allocs[alloc.ID] = ar
go ar.Run()
// Store the alloc runner.
c.allocLock.Lock()
c.allocs[alloc.ID] = ar
c.allocLock.Unlock()
return nil
}