Locking appropriately before closing the channel to indicate migration

This commit is contained in:
Diptanu Choudhury
2017-01-23 10:35:00 -08:00
parent 0d2e8eaa33
commit c254fbfa4f

View File

@@ -150,7 +150,7 @@ type Client struct {
vaultClient vaultclient.VaultClient
// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]chan struct{}
migratingAllocs map[string]*migrateAllocCtrl
migratingAllocsLock sync.Mutex
// garbageCollector is used to garbage collect terminal allocations present
@@ -158,6 +158,32 @@ type Client struct {
garbageCollector *AllocGarbageCollector
}
// migrateAllocCtrl indicates whether migration is complete
type migrateAllocCtrl struct {
ch chan struct{}
closed bool
chLock sync.Mutex
}
func newMigrateAllocCtrl() *migrateAllocCtrl {
return &migrateAllocCtrl{
ch: make(chan struct{}),
}
}
func (m *migrateAllocCtrl) closeCh() {
m.chLock.Lock()
defer m.chLock.Unlock()
if m.closed {
return
}
// If channel is not closed then close it
m.closed = true
close(m.ch)
}
var (
// noServersErr is returned by the RPC method when the client has no
// configured servers. This is used to trigger Consul discovery if
@@ -188,7 +214,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
migratingAllocs: make(map[string]chan struct{}),
migratingAllocs: make(map[string]*migrateAllocCtrl),
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
serversDiscoveredCh: make(chan struct{}),
@@ -1420,7 +1446,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Stopping the migration if the allocation doesn't need any
// migration
if !update.updated.ShouldMigrate() {
close(ch)
ch.closeCh()
}
}
}
@@ -1455,7 +1481,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// prevents a race between a finishing blockForRemoteAlloc and
// another invocation of runAllocs
if _, ok := c.getAllocRunners()[add.PreviousAllocation]; !ok {
c.migratingAllocs[add.ID] = make(chan struct{})
c.migratingAllocs[add.ID] = newMigrateAllocCtrl()
go c.blockForRemoteAlloc(add)
}
}
@@ -1533,7 +1559,7 @@ ADDALLOC:
// waitForAllocTerminal waits for an allocation with the given alloc id to
// transition to terminal state and blocks the caller until then.
func (c *Client) waitForAllocTerminal(allocID string, stopCh chan struct{}) (*structs.Allocation, error) {
func (c *Client) waitForAllocTerminal(allocID string, stopCh *migrateAllocCtrl) (*structs.Allocation, error) {
req := structs.AllocSpecificRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
@@ -1551,7 +1577,7 @@ func (c *Client) waitForAllocTerminal(allocID string, stopCh chan struct{}) (*st
select {
case <-time.After(retry):
continue
case <-stopCh:
case <-stopCh.ch:
return nil, fmt.Errorf("giving up waiting on alloc %v since migration is not needed", allocID)
case <-c.shutdownCh:
return nil, fmt.Errorf("aborting because client is shutting down")
@@ -1665,7 +1691,7 @@ func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAll
for {
// See if the alloc still needs migration
select {
case <-stopMigrating:
case <-stopMigrating.ch:
os.RemoveAll(pathToAllocDir)
c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", allocID)
return nil