Exit if alloc listener closes

Add test for that case, add comments, remove debug logging
This commit is contained in:
Michael Schurter
2017-08-11 16:14:58 -07:00
parent 8253439de2
commit 4f7c5e6ed1
2 changed files with 101 additions and 18 deletions

View File

@@ -30,6 +30,9 @@ type terminated interface {
Terminated() bool
}
// prevAllocWatcher allows AllocRunners to wait for a previous allocation to
// terminate and migrate its data whether or not the previous allocation is
// local or remote.
type prevAllocWatcher interface {
// Wait for previous alloc to terminate
Wait(context.Context) error
@@ -39,7 +42,7 @@ type prevAllocWatcher interface {
}
// newAllocWatcher creates a prevAllocWatcher appropriate for whether this
// allocs previous allocation was local or remote. If this alloc has no
// alloc's previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger) prevAllocWatcher {
if alloc.PreviousAllocation == "" {
@@ -78,15 +81,31 @@ func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer,
// localPrevAlloc is a prevAllocWatcher for previous allocations on the same
// node as an updated allocation.
type localPrevAlloc struct {
allocID string
prevAllocID string
tasks []*structs.Task
// allocID is the ID of the alloc being blocked
allocID string
sticky bool
// prevAllocID is the ID of the alloc being replaced
prevAllocID string
// tasks on the new alloc
tasks []*structs.Task
// sticky is true if data should be moved
sticky bool
// prevAllocDir is the alloc dir for the previous alloc
prevAllocDir *allocdir.AllocDir
// prevListener allows blocking for updates to the previous alloc
prevListener *cstructs.AllocListener
prevStatus terminated
prevWaitCh <-chan struct{}
// prevStatus allows checking if the previous alloc has already
// terminated (and therefore won't send updates to the listener)
prevStatus terminated
// prevWaitCh is closed when the previous alloc is GC'd which is a
// failsafe against blocking the new alloc forever
prevWaitCh <-chan struct{}
logger *log.Logger
}
@@ -103,18 +122,14 @@ func (p *localPrevAlloc) Wait(ctx context.Context) error {
// Block until previous alloc exits
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate", p.allocID, p.prevAllocID)
for {
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX LOOP", p.allocID, p.prevAllocID)
select {
case prevAlloc := <-p.prevListener.Ch:
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX UPDATE %v", p.allocID, p.prevAllocID, prevAlloc.Terminated())
if prevAlloc.Terminated() {
case prevAlloc, ok := <-p.prevListener.Ch:
if !ok || prevAlloc.Terminated() {
return nil
}
case <-p.prevWaitCh:
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX CLOSED", p.allocID, p.prevAllocID)
return nil
case <-ctx.Done():
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX DONE", p.allocID, p.prevAllocID)
return ctx.Err()
}
}
@@ -140,13 +155,24 @@ func (p *localPrevAlloc) Migrate(ctx context.Context, dest *allocdir.AllocDir) e
// remotePrevAlloc is a prevAllcWatcher for previous allocations on remote
// nodes as an updated allocation.
type remotePrevAlloc struct {
allocID string
prevAllocID string
tasks []*structs.Task
// allocID is the ID of the alloc being blocked
allocID string
config *config.Config
// prevAllocID is the ID of the alloc being replaced
prevAllocID string
// tasks on the new alloc
tasks []*structs.Task
// config for the Client to get AllocDir and Region
config *config.Config
// migrate is true if data should be moved between nodes
migrate bool
rpc rpcer
// rpc provides an RPC method for watching for updates to the previous
// alloc and determining what node it was on.
rpc rpcer
// nodeID is the node the previous alloc. Set by Wait() for use in
// Migrate() iff the previous alloc has not already been GC'd.
@@ -155,6 +181,7 @@ type remotePrevAlloc struct {
logger *log.Logger
}
// Wait until the remote previousl allocation has terminated.
func (p *remotePrevAlloc) Wait(ctx context.Context) error {
p.logger.Printf("[DEBUG] client: alloc %q waiting for remote previous alloc %q to terminate", p.allocID, p.prevAllocID)
req := structs.AllocSpecificRequest{

View File

@@ -10,10 +10,66 @@ import (
"os"
"path/filepath"
"testing"
"time"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/mock"
)
// TestPrevAlloc_LocalPrevAlloc asserts that when a previous alloc runner is
// set a localPrevAlloc will block on it.
func TestPrevAlloc_LocalPrevAlloc(t *testing.T) {
_, prevAR := testAllocRunner(false)
prevAR.alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "10s"
newAlloc := mock.Alloc()
newAlloc.PreviousAllocation = prevAR.Alloc().ID
newAlloc.Job.TaskGroups[0].EphemeralDisk.Sticky = false
task := newAlloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "500ms"
waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger())
// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer cancel()
waiter.Wait(ctx)
}()
select {
case <-ctx.Done():
t.Fatalf("Wait exited too early")
case <-time.After(33 * time.Millisecond):
// Good! It's blocking
}
// Start the previous allocs to cause it to update but not terminate
go prevAR.Run()
defer prevAR.Destroy()
select {
case <-ctx.Done():
t.Fatalf("Wait exited too early")
case <-time.After(33 * time.Millisecond):
// Good! It's still blocking
}
// Stop the previous alloc
prevAR.Destroy()
select {
case <-ctx.Done():
// Good! We unblocked when the previous alloc stopped
case <-time.After(time.Second):
t.Fatalf("Wait exited too early")
}
}
// TestPrevAlloc_StreamAllocDir asserts that streaming a tar to an alloc dir
// works.
func TestPrevAlloc_StreamAllocDir(t *testing.T) {
t.Parallel()
dir, err := ioutil.TempDir("", "")