implement stopping, destroying, and disk migration

* Stopping an alloc is implemented via Updates but update hooks are
  *not* run.
* Destroying an alloc is a best effort cleanup.
* AllocRunner destroy hooks implemented.
* Disk migration and blocking on a previous allocation exiting moved to
  its own package to avoid cycles. Now only depends on alloc broadcaster
  instead of also using a waitch.
* AllocBroadcaster now only drops stale allocations and always keeps the
  latest version.
* Made AllocDir safe for concurrent use

Lots of internal contexts that are currently unused. Unsure if they
should be used or removed.
This commit is contained in:
Michael Schurter
2018-08-23 12:03:17 -07:00
parent de5426124b
commit c95155d45c
16 changed files with 492 additions and 169 deletions

View File

@@ -9,6 +9,7 @@ import (
"log"
"os"
"path/filepath"
"sync"
"time"
"github.com/hashicorp/go-multierror"
@@ -58,6 +59,8 @@ var (
TaskDirs = map[string]os.FileMode{TmpDirName: os.ModeSticky | 0777}
)
// AllocDir allows creating, destroying, and accessing an allocation's
// directory. All methods are safe for concurrent use.
type AllocDir struct {
// AllocDir is the directory used for storing any state
// of this allocation. It will be purged on alloc destroy.
@@ -73,6 +76,8 @@ type AllocDir struct {
// built is true if Build has successfully run
built bool
mu sync.RWMutex
logger *log.Logger
}
@@ -100,6 +105,9 @@ func NewAllocDir(logger *log.Logger, allocDir string) *AllocDir {
// Copy an AllocDir and all of its TaskDirs. Returns nil if AllocDir is
// nil.
func (d *AllocDir) Copy() *AllocDir {
d.mu.RLock()
defer d.mu.RUnlock()
if d == nil {
return nil
}
@@ -117,6 +125,9 @@ func (d *AllocDir) Copy() *AllocDir {
// NewTaskDir creates a new TaskDir and adds it to the AllocDirs TaskDirs map.
func (d *AllocDir) NewTaskDir(name string) *TaskDir {
d.mu.Lock()
defer d.mu.Unlock()
td := newTaskDir(d.logger, d.AllocDir, name)
d.TaskDirs[name] = td
return td
@@ -129,6 +140,9 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir {
// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the
// error message as the contents.
func (d *AllocDir) Snapshot(w io.Writer) error {
d.mu.RLock()
defer d.mu.RUnlock()
allocDataDir := filepath.Join(d.SharedDir, SharedDataDir)
rootPaths := []string{allocDataDir}
for _, taskdir := range d.TaskDirs {
@@ -206,11 +220,16 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
// Move other alloc directory's shared path and local dir to this alloc dir.
func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error {
d.mu.RLock()
if !d.built {
// Enforce the invariant that Build is called before Move
d.mu.RUnlock()
return fmt.Errorf("unable to move to %q - alloc dir is not built", d.AllocDir)
}
// Moving is slow and only reads immutable fields, so unlock during heavy IO
d.mu.RUnlock()
// Move the data directory
otherDataDir := filepath.Join(other.SharedDir, SharedDataDir)
dataDir := filepath.Join(d.SharedDir, SharedDataDir)
@@ -246,7 +265,6 @@ func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error {
// Tears down previously build directory structure.
func (d *AllocDir) Destroy() error {
// Unmount all mounted shared alloc dirs.
var mErr multierror.Error
if err := d.UnmountAll(); err != nil {
@@ -258,12 +276,17 @@ func (d *AllocDir) Destroy() error {
}
// Unset built since the alloc dir has been destroyed.
d.mu.Lock()
d.built = false
d.mu.Unlock()
return mErr.ErrorOrNil()
}
// UnmountAll linked/mounted directories in task dirs.
func (d *AllocDir) UnmountAll() error {
d.mu.RLock()
defer d.mu.RUnlock()
var mErr multierror.Error
for _, dir := range d.TaskDirs {
// Check if the directory has the shared alloc mounted.
@@ -322,7 +345,9 @@ func (d *AllocDir) Build() error {
}
// Mark as built
d.mu.Lock()
d.built = true
d.mu.Unlock()
return nil
}
@@ -386,11 +411,14 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
p := filepath.Join(d.AllocDir, path)
// Check if it is trying to read into a secret directory
d.mu.RLock()
for _, dir := range d.TaskDirs {
if filepath.HasPrefix(p, dir.SecretsDir) {
d.mu.RUnlock()
return nil, fmt.Errorf("Reading secret file prohibited: %s", path)
}
}
d.mu.RUnlock()
f, err := os.Open(p)
if err != nil {

View File

@@ -13,6 +13,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/client/allocwatcher"
"github.com/hashicorp/nomad/client/config"
consulApi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/vaultclient"
@@ -77,7 +78,7 @@ type AllocRunner struct {
// the migrates it data. If sticky volumes aren't used and there's no
// previous allocation a noop implementation is used so it always safe
// to call.
prevAlloc prevAllocWatcher
prevAlloc allocwatcher.PrevAllocWatcher
// ctx is cancelled with exitFn to cause the alloc to be destroyed
// (stopped and GC'd).
@@ -133,26 +134,26 @@ type allocRunnerMutableState struct {
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient consulApi.ConsulServiceAPI,
prevAlloc prevAllocWatcher) *AllocRunner {
prevAlloc allocwatcher.PrevAllocWatcher) *AllocRunner {
ar := &AllocRunner{
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
alloc: alloc,
allocID: alloc.ID,
allocBroadcast: cstructs.NewAllocBroadcaster(8),
prevAlloc: prevAlloc,
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
tasks: make(map[string]*taskrunner.TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
alloc: alloc,
allocID: alloc.ID,
//allocBroadcast: cstructs.NewAllocBroadcaster(8),
prevAlloc: prevAlloc,
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
tasks: make(map[string]*taskrunner.TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
}
// TODO Should be passed a context
@@ -612,9 +613,9 @@ func (r *AllocRunner) sendBroadcast(alloc *structs.Allocation) {
// Try to send the alloc up to three times with a delay to allow recovery.
sent := false
for i := 0; i < 3; i++ {
if sent = r.allocBroadcast.Send(alloc); sent {
break
}
//if sent = r.allocBroadcast.Send(alloc); sent {
// break
//}
time.Sleep(500 * time.Millisecond)
}
if !sent {

View File

@@ -7,6 +7,7 @@ import (
"testing"
"github.com/boltdb/bolt"
"github.com/hashicorp/nomad/client/allocwatcher"
"github.com/hashicorp/nomad/client/config"
consulApi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/vaultclient"
@@ -51,7 +52,7 @@ func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation, restarts
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, consulApi.NewMockConsulServiceClient(t, testlog.HCLogger(t)), NoopPrevAlloc{})
ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, consulApi.NewMockConsulServiceClient(t), allocwatcher.NoopPrevAlloc{})
return upd, ar
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/allocrunnerv2/state"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner"
"github.com/hashicorp/nomad/client/allocwatcher"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
@@ -51,10 +52,14 @@ type allocRunner struct {
// vaultClient is the used to manage Vault tokens
vaultClient vaultclient.VaultClient
// waitCh is closed when the alloc runner has transitioned to a terminal
// state
// waitCh is closed when the Run() loop has exited
waitCh chan struct{}
// destroyed is true when the Run() loop has exited, postrun hooks have
// run, and alloc runner has been destroyed
destroyed bool
destroyedLock sync.Mutex
// Alloc captures the allocation being run.
alloc *structs.Allocation
allocLock sync.RWMutex
@@ -81,6 +86,13 @@ type allocRunner struct {
// have buffer size 1 in order to support dropping pending updates when
// a newer allocation is received.
updateCh chan *structs.Allocation
// allocBroadcaster sends client allocation updates to all listeners
allocBroadcaster *cstructs.AllocBroadcaster
// prevAllocWatcher allows waiting for a previous allocation to exit
// and if necessary migrate its alloc dir.
prevAllocWatcher allocwatcher.PrevAllocWatcher
}
// NewAllocRunner returns a new allocation runner.
@@ -92,17 +104,19 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
}
ar := &allocRunner{
id: alloc.ID,
alloc: alloc,
clientConfig: config.ClientConfig,
consulClient: config.Consul,
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
updateCh: make(chan *structs.Allocation, updateChCap),
state: &state.State{},
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
id: alloc.ID,
alloc: alloc,
clientConfig: config.ClientConfig,
consulClient: config.Consul,
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
updateCh: make(chan *structs.Allocation, updateChCap),
state: &state.State{},
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
allocBroadcaster: cstructs.NewAllocBroadcaster(alloc),
prevAllocWatcher: config.PrevAllocWatcher,
}
// Create alloc dir
@@ -154,7 +168,7 @@ func (ar *allocRunner) WaitCh() <-chan struct{} {
}
// XXX How does alloc Restart work
// Run is the main go-routine that executes all the tasks.
// Run is the main goroutine that executes all the tasks.
func (ar *allocRunner) Run() {
// Close the wait channel
defer close(ar.waitCh)
@@ -220,6 +234,11 @@ func (ar *allocRunner) setAlloc(updated *structs.Allocation) {
ar.allocLock.Unlock()
}
// GetAllocDir returns the alloc dir which is safe for concurrent use.
func (ar *allocRunner) GetAllocDir() *allocdir.AllocDir {
return ar.allocDir
}
// Restore state from database. Must be called after NewAllocRunner but before
// Run.
func (ar *allocRunner) Restore() error {
@@ -287,6 +306,9 @@ func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskStat
// Update the server
ar.stateUpdater.AllocStateUpdated(calloc)
// Broadcast client alloc to listeners
ar.allocBroadcaster.Send(calloc)
}
// clientAlloc takes in the task states and returns an Allocation populated
@@ -395,19 +417,45 @@ func (ar *allocRunner) Update(update *structs.Allocation) {
}
}
// Destroy the alloc runner by stopping it if it is still running and cleaning
// up all of its resources.
//
// This method is safe for calling concurrently with Run(). Callers must
// receive on WaitCh() to block until alloc runner has stopped and been
// destroyed.
//XXX TODO
func (ar *allocRunner) Destroy() {
//TODO
func (ar *allocRunner) Listener() *cstructs.AllocListener {
return ar.allocBroadcaster.Listen()
}
for _, tr := range ar.tasks {
tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskKilled))
// Destroy the alloc runner by synchronously stopping it if it is still running
// and cleaning up all of its resources.
//
// This method is safe for calling concurrently with Run() and will cause it to
// exit (thus closing WaitCh).
func (ar *allocRunner) Destroy() {
// Stop tasks
for name, tr := range ar.tasks {
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil {
if err == taskrunner.ErrTaskNotRunning {
ar.logger.Trace("task not running", "task_name", name)
} else {
ar.logger.Warn("failed to kill task", "error", err, "task_name", name)
}
}
}
// Wait for tasks to exit and postrun hooks to finish
<-ar.waitCh
// Run destroy hooks
if err := ar.destroy(); err != nil {
ar.logger.Warn("error running destroy hooks", "error", err)
}
// Cleanup state db
if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil {
ar.logger.Warn("failed to delete allocation state", "error", err)
}
// Mark alloc as destroyed
ar.destroyedLock.Lock()
ar.destroyed = true
ar.destroyedLock.Unlock()
}
// IsDestroyed returns true if the alloc runner has been destroyed (stopped and
@@ -416,27 +464,26 @@ func (ar *allocRunner) Destroy() {
// This method is safe for calling concurrently with Run(). Callers must
// receive on WaitCh() to block until alloc runner has stopped and been
// destroyed.
//XXX TODO
func (ar *allocRunner) IsDestroyed() bool {
return false
ar.destroyedLock.Lock()
defer ar.destroyedLock.Unlock()
return ar.destroyed
}
// IsWaiting returns true if the alloc runner is waiting for its previous
// allocation to terminate.
//
// This method is safe for calling concurrently with Run().
//XXX TODO
func (ar *allocRunner) IsWaiting() bool {
return false
return ar.prevAllocWatcher.IsWaiting()
}
// IsMigrating returns true if the alloc runner is migrating data from its
// previous allocation.
//
// This method is safe for calling concurrently with Run().
//XXX TODO
func (ar *allocRunner) IsMigrating() bool {
return false
return ar.prevAllocWatcher.IsMigrating()
}
// StatsReporter needs implementing

View File

@@ -6,17 +6,22 @@ import (
"time"
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/allocwatcher"
)
// initRunnerHooks intializes the runners hooks.
func (ar *allocRunner) initRunnerHooks() {
hookLogger := ar.logger.Named("runner_hook")
ar.runnerHooks = make([]interfaces.RunnerHook, 0, 3)
// Create the alloc directory hook. This is run first to ensure the
// directoy path exists for other hooks.
ar.runnerHooks = append(ar.runnerHooks, newAllocDirHook(ar, hookLogger))
ar.runnerHooks = []interfaces.RunnerHook{
newAllocDirHook(hookLogger, ar),
newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir),
}
}
// prerun is used to run the runners prerun hooks.
@@ -36,17 +41,21 @@ func (ar *allocRunner) prerun() error {
continue
}
//TODO Check hook state
name := pre.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running prestart hook", "name", name, "start", start)
ar.logger.Trace("running pre-run hook", "name", name, "start", start)
}
if err := pre.Prerun(); err != nil {
if err := pre.Prerun(context.TODO()); err != nil {
return fmt.Errorf("hook %q failed: %v", name, err)
}
//TODO Persist hook state locally
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished pre-run hooks", "name", name, "end", end, "duration", end.Sub(start))
@@ -93,28 +102,53 @@ func (ar *allocRunner) postrun() error {
return nil
}
/*
What state is needed to transfer:
*/
// destroy is used to run the runners destroy hooks. All hooks are run and
// errors are returned as a multierror.
func (ar *allocRunner) destroy() error {
if ar.logger.IsTrace() {
start := time.Now()
ar.logger.Trace("running destroy hooks", "start", start)
defer func() {
end := time.Now()
ar.logger.Trace("finished destroy hooks", "end", end, "duration", end.Sub(start))
}()
}
/*
AR Hooks:
Alloc Dir Build:
Needs to know the folder to create
var merr multierror.Error
for _, hook := range ar.runnerHooks {
h, ok := hook.(interfaces.RunnerDestroyHook)
if !ok {
continue
}
Alloc Migrate
Needs access to RPC
name := h.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running destroy hook", "name", name, "start", start)
}
Alloc Health Watcher:
Requires: Access to consul to watch health, access to every task event, task status change
*/
if err := h.Destroy(); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("destroy hook %q failed: %v", name, err))
}
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished destroy hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return nil
}
// allocDirHook creates and destroys the root directory and shared directories
// for an allocation.
type allocDirHook struct {
runner *allocRunner
logger log.Logger
}
func newAllocDirHook(runner *allocRunner, logger log.Logger) *allocDirHook {
func newAllocDirHook(logger log.Logger, runner *allocRunner) *allocDirHook {
ad := &allocDirHook{
runner: runner,
}
@@ -126,7 +160,7 @@ func (h *allocDirHook) Name() string {
return "alloc_dir"
}
func (h *allocDirHook) Prerun() error {
func (h *allocDirHook) Prerun(context.Context) error {
return h.runner.allocDir.Build()
}
@@ -134,6 +168,52 @@ func (h *allocDirHook) Destroy() error {
return h.runner.allocDir.Destroy()
}
// diskMigrationHook migrates ephemeral disk volumes. Depends on alloc dir
// being built but must be run before anything else manipulates the alloc dir.
type diskMigrationHook struct {
allocDir *allocdir.AllocDir
allocWatcher allocwatcher.PrevAllocWatcher
logger log.Logger
}
func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher, allocDir *allocdir.AllocDir) *diskMigrationHook {
h := &diskMigrationHook{
allocDir: allocDir,
allocWatcher: allocWatcher,
}
h.logger = logger.Named(h.Name())
return h
}
func (h *diskMigrationHook) Name() string {
return "migrate_disk"
}
func (h *diskMigrationHook) Prerun(ctx context.Context) error {
// Wait for a previous alloc - if any - to terminate
if err := h.allocWatcher.Wait(ctx); err != nil {
return err
}
// Wait for data to be migrated from a previous alloc if applicable
if err := h.allocWatcher.Migrate(ctx, h.allocDir); err != nil {
if err == context.Canceled {
return err
}
// Soft-fail on migration errors
h.logger.Warn("error migrating data from previous alloc", "error", err)
// Recreate alloc dir to ensure a clean slate
h.allocDir.Destroy()
if err := h.allocDir.Build(); err != nil {
return fmt.Errorf("failed to clean task directories after failed migration: %v", err)
}
}
return nil
}
// TODO
type allocHealthWatcherHook struct {
runner *allocRunner

View File

@@ -2,6 +2,7 @@ package allocrunnerv2
import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocwatcher"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/interfaces"
@@ -32,4 +33,8 @@ type Config struct {
// StateUpdater is used to emit updated task state
StateUpdater interfaces.AllocStateHandler
// PrevAllocWatcher handles waiting on previous allocations and
// migrating their ephemeral disk when necessary.
PrevAllocWatcher allocwatcher.PrevAllocWatcher
}

View File

@@ -1,6 +1,8 @@
package interfaces
import (
"context"
"github.com/hashicorp/nomad/client/allocrunnerv2/state"
)
@@ -11,7 +13,7 @@ type RunnerHook interface {
type RunnerPrerunHook interface {
RunnerHook
Prerun() error
Prerun(context.Context) error
}
type RunnerPostrunHook interface {

View File

@@ -55,6 +55,8 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s os.Signal) error {
return handle.Signal(s)
}
// Kill a task. Blocks until task exits or context is canceled. State is set to
// dead.
func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error {
// Grab the handle
handle := tr.getDriverHandle()

View File

@@ -333,11 +333,21 @@ func (tr *TaskRunner) handleUpdates() {
for {
select {
case <-tr.triggerUpdateCh:
// Update triggered; run hooks
tr.updateHooks()
case <-tr.waitCh:
return
}
if tr.Alloc().TerminalStatus() {
// Terminal update: kill TaskRunner and let Run execute postrun hooks
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil {
tr.logger.Warn("error stopping task", "error", err)
}
continue
}
// Non-terminal update; run hooks
tr.updateHooks()
}
}

View File

@@ -1,4 +1,4 @@
package allocrunner
package allocwatcher
import (
"archive/tar"
@@ -38,10 +38,18 @@ type terminated interface {
Terminated() bool
}
// prevAllocWatcher allows AllocRunners to wait for a previous allocation to
// AllocRunnerMeta provides metadata about an AllocRunner such as its alloc and
// alloc dir.
type AllocRunnerMeta interface {
GetAllocDir() *allocdir.AllocDir
Listener() *cstructs.AllocListener
Alloc() *structs.Allocation
}
// PrevAllocWatcher allows AllocRunners to wait for a previous allocation to
// terminate and migrate its data whether or not the previous allocation is
// local or remote.
type prevAllocWatcher interface {
type PrevAllocWatcher interface {
// Wait for previous alloc to terminate
Wait(context.Context) error
@@ -55,10 +63,10 @@ type prevAllocWatcher interface {
IsMigrating() bool
}
// NewAllocWatcher creates a prevAllocWatcher appropriate for whether this
// NewAllocWatcher creates a PrevAllocWatcher appropriate for whether this
// alloc's previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func NewAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher {
func NewAllocWatcher(alloc *structs.Allocation, prevAR AllocRunnerMeta, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) PrevAllocWatcher {
if alloc.PreviousAllocation == "" {
// No previous allocation, use noop transitioner
return NoopPrevAlloc{}
@@ -74,8 +82,7 @@ func NewAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer,
tasks: tg.Tasks,
sticky: tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky,
prevAllocDir: prevAR.GetAllocDir(),
prevListener: prevAR.GetListener(),
prevWaitCh: prevAR.WaitCh(),
prevListener: prevAR.Listener(),
prevStatus: prevAR.Alloc(),
logger: l,
}
@@ -118,10 +125,6 @@ type localPrevAlloc struct {
// terminated (and therefore won't send updates to the listener)
prevStatus terminated
// prevWaitCh is closed when the previous alloc is garbage collected
// which is a failsafe against blocking the new alloc forever
prevWaitCh <-chan struct{}
// waiting and migrating are true when alloc runner is waiting on the
// prevAllocWatcher. Writers must acquire the waitingLock and readers
// should use the helper methods IsWaiting and IsMigrating.
@@ -161,11 +164,6 @@ func (p *localPrevAlloc) Wait(ctx context.Context) error {
defer p.prevListener.Close()
if p.prevStatus.Terminated() {
// Fast path - previous alloc already terminated!
return nil
}
// Block until previous alloc exits
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate", p.allocID, p.prevAllocID)
for {
@@ -174,8 +172,6 @@ func (p *localPrevAlloc) Wait(ctx context.Context) error {
if !ok || prevAlloc.Terminated() {
return nil
}
case <-p.prevWaitCh:
return nil
case <-ctx.Done():
return ctx.Err()
}

View File

@@ -1,4 +1,4 @@
package allocrunner
package allocwatcher
import (
"archive/tar"
@@ -14,26 +14,118 @@ import (
"testing"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/testutil"
cstructs "github.com/hashicorp/nomad/client/structs"
ctestutil "github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
// fakeAllocRunner implements AllocRunnerMeta
type fakeAllocRunner struct {
alloc *structs.Allocation
AllocDir *allocdir.AllocDir
Broadcaster *cstructs.AllocBroadcaster
}
// newFakeAllocRunner creates a new AllocRunnerMeta. Callers must call
// AllocDir.Destroy() when finished.
func newFakeAllocRunner(t *testing.T, logger hclog.Logger) *fakeAllocRunner {
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true
alloc.Job.TaskGroups[0].EphemeralDisk.Migrate = true
path, err := ioutil.TempDir("", "nomad_test_wathcer")
require.NoError(t, err)
return &fakeAllocRunner{
alloc: alloc,
AllocDir: allocdir.NewAllocDir(logger, path),
Broadcaster: cstructs.NewAllocBroadcaster(alloc),
}
}
func (f *fakeAllocRunner) GetAllocDir() *allocdir.AllocDir {
return f.AllocDir
}
func (f *fakeAllocRunner) Listener() *cstructs.AllocListener {
return f.Broadcaster.Listen()
}
func (f *fakeAllocRunner) Alloc() *structs.Allocation {
return f.alloc
}
// newConfig returns a new Config and cleanup func
func newConfig(t *testing.T) (Config, func()) {
logger := testlog.HCLogger(t)
prevAR := newFakeAllocRunner(t, logger)
alloc := mock.Alloc()
alloc.PreviousAllocation = prevAR.Alloc().ID
alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true
alloc.Job.TaskGroups[0].EphemeralDisk.Migrate = true
config := Config{
Alloc: alloc,
PreviousRunner: prevAR,
RPC: nil,
Config: nil,
MigrateToken: "fake_token",
Logger: logger,
}
cleanup := func() {
prevAR.AllocDir.Destroy()
}
return config, cleanup
}
// TestPrevAlloc_Noop asserts that when no previous allocation is set the noop
// implementation is returned that does not block or perform migrations.
func TestPrevAlloc_Noop(t *testing.T) {
conf, cleanup := newConfig(t)
defer cleanup()
conf.Alloc.PreviousAllocation = ""
watcher := NewAllocWatcher(conf)
require.NotNil(t, watcher)
_, ok := watcher.(NoopPrevAlloc)
require.True(t, ok, "expected watcher to be NoopPrevAlloc")
done := make(chan int, 2)
go func() {
watcher.Wait(context.Background())
done <- 1
watcher.Migrate(context.Background(), nil)
done <- 1
}()
require.False(t, watcher.IsWaiting())
require.False(t, watcher.IsMigrating())
<-done
<-done
}
// TestPrevAlloc_LocalPrevAlloc asserts that when a previous alloc runner is
// set a localPrevAlloc will block on it.
func TestPrevAlloc_LocalPrevAlloc(t *testing.T) {
_, prevAR := TestAllocRunner(t, false)
prevAR.alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "10s"
t.Parallel()
conf, cleanup := newConfig(t)
newAlloc := mock.Alloc()
newAlloc.PreviousAllocation = prevAR.Alloc().ID
newAlloc.Job.TaskGroups[0].EphemeralDisk.Sticky = false
task := newAlloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "500ms"
defer cleanup()
waiter := NewAllocWatcher(newAlloc, prevAR, nil, nil, testlog.Logger(t), "")
conf.Alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
conf.Alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "500ms"
waiter := NewAllocWatcher(conf)
// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())
@@ -43,39 +135,55 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) {
waiter.Wait(ctx)
}()
select {
case <-ctx.Done():
t.Fatalf("Wait exited too early")
case <-time.After(33 * time.Millisecond):
// Good! It's blocking
}
// Assert watcher is waiting
testutil.WaitForResult(func() (bool, error) {
return waiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
// Start the previous allocs to cause it to update but not terminate
go prevAR.Run()
defer prevAR.Destroy()
// Broadcast a non-terminal alloc update to assert only terminal
// updates break out of waiting.
update := conf.PreviousRunner.Alloc().Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
update.ModifyIndex++
update.AllocModifyIndex++
select {
case <-ctx.Done():
t.Fatalf("Wait exited too early")
case <-time.After(33 * time.Millisecond):
// Good! It's still blocking
}
broadcaster := conf.PreviousRunner.(*fakeAllocRunner).Broadcaster
err := broadcaster.Send(update)
require.NoError(t, err)
// Stop the previous alloc
prevAR.Destroy()
// Assert watcher is still waiting because alloc isn't terminal
testutil.WaitForResult(func() (bool, error) {
return waiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
select {
case <-ctx.Done():
// Good! We unblocked when the previous alloc stopped
case <-time.After(time.Second):
t.Fatalf("Wait exited too early")
}
// Stop the previous alloc and assert watcher stops blocking
update = update.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
update.ClientStatus = structs.AllocClientStatusComplete
update.ModifyIndex++
update.AllocModifyIndex++
err = broadcaster.Send(update)
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
if waiter.IsWaiting() {
return false, fmt.Errorf("did not expect watcher to be waiting")
}
return !waiter.IsMigrating(), fmt.Errorf("did not expect watcher to be migrating")
}, func(err error) {
t.Fatalf("error: %v", err)
})
}
// TestPrevAlloc_StreamAllocDir_Ok asserts that streaming a tar to an alloc dir
// works.
func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) {
testutil.RequireRoot(t)
ctestutil.RequireRoot(t)
t.Parallel()
dir, err := ioutil.TempDir("", "")
if err != nil {
@@ -178,7 +286,7 @@ func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) {
defer os.RemoveAll(dir1)
rc := ioutil.NopCloser(buf)
prevAlloc := &remotePrevAlloc{logger: testlog.Logger(t)}
prevAlloc := &remotePrevAlloc{logger: testlog.HCLogger(t)}
if err := prevAlloc.streamAllocDir(context.Background(), rc, dir1); err != nil {
t.Fatalf("err: %v", err)
}
@@ -228,7 +336,7 @@ func TestPrevAlloc_StreamAllocDir_Error(t *testing.T) {
// This test only unit tests streamAllocDir so we only need a partially
// complete remotePrevAlloc
prevAlloc := &remotePrevAlloc{
logger: testlog.Logger(t),
logger: testlog.HCLogger(t),
allocID: "123",
prevAllocID: "abc",
migrate: true,

View File

@@ -0,0 +1,4 @@
// Package allocwatcher allows blocking until another allocation - whether
// running locally or remotely - completes and migrates the allocation
// directory if necessary.
package allocwatcher

View File

@@ -30,6 +30,7 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner"
"github.com/hashicorp/nomad/client/allocrunnerv2"
"github.com/hashicorp/nomad/client/allocwatcher"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/client/state"
@@ -103,9 +104,11 @@ type ClientStatsReporter interface {
type AllocRunner interface {
StatsReporter() allocrunner.AllocStatsReporter
Destroy()
GetAllocDir() *allocdir.AllocDir
IsDestroyed() bool
IsWaiting() bool
IsMigrating() bool
Listener() *cstructs.AllocListener
WaitCh() <-chan struct{}
Update(*structs.Allocation)
Alloc() *structs.Allocation
@@ -1913,18 +1916,10 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
return err
}
//FIXME disabled previous alloc waiting/migrating
// get the previous alloc runner - if one exists - for the
// blocking/migrating watcher
/*
var prevAR *allocrunner.AllocRunner
if alloc.PreviousAllocation != "" {
prevAR = c.allocs[alloc.PreviousAllocation]
}
c.configLock.RLock()
prevAlloc := allocrunner.NewAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken)
*/
// Since only the Client has access to other AllocRunners and the RPC
// client, create the previous allocation watcher here.
prevAlloc := c.allocs[alloc.PreviousAllocation]
prevAllocWatcher := allocwatcher.NewAllocWatcher(alloc, prevAlloc, c, c.configCopy, c.logger, migrateToken)
// Copy the config since the node can be swapped out as it is being updated.
// The long term fix is to pass in the config and node separately and then
@@ -1938,13 +1933,14 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
c.configLock.RLock()
arConf := &allocrunnerv2.Config{
Alloc: alloc,
Logger: logger,
ClientConfig: c.config,
StateDB: c.stateDB,
Consul: c.consulService,
Vault: c.vaultClient,
StateUpdater: c,
Alloc: alloc,
Logger: logger,
ClientConfig: c.config,
StateDB: c.stateDB,
Consul: c.consulService,
Vault: c.vaultClient,
StateUpdater: c,
PrevAllocWatcher: prevAllocWatcher,
}
c.configLock.RUnlock()

View File

@@ -12,5 +12,7 @@ type StateDB interface {
GetTaskRunnerState(allocID, taskName string) (*state.LocalState, *structs.TaskState, error)
PutTaskRunnerLocalState(allocID, taskName string, val interface{}) error
PutTaskState(allocID, taskName string, state *structs.TaskState) error
DeleteTaskBucket(allocID, taskName string) error
DeleteAllocationBucket(allocID string) error
Close() error
}

View File

@@ -27,6 +27,14 @@ func (n noopDB) PutTaskState(allocID string, taskName string, state *structs.Tas
return nil
}
func (n noopDB) DeleteTaskBucket(allocID, taskName string) error {
return nil
}
func (n noopDB) DeleteAllocationBucket(allocID string) error {
return nil
}
func (n noopDB) Close() error {
return nil
}

View File

@@ -1,27 +1,44 @@
package structs
import (
"errors"
"sync"
"github.com/hashicorp/nomad/nomad/structs"
)
// AllocBroadcaster implements an allocation broadcast channel.
// The zero value is a usable unbuffered channel.
const (
// listenerCap is the capacity of the listener chans. Must be exactly 1
// to prevent Sends from blocking and allows them to pop old pending
// updates from the chan before enqueueing the latest update.
listenerCap = 1
)
var ErrAllocBroadcasterClosed = errors.New("alloc broadcaster closed")
// AllocBroadcaster implements an allocation broadcast channel where each
// listener receives allocation updates. Pending updates are dropped and
// replaced by newer allocation updates, so listeners may not receive every
// allocation update. However this ensures Sends never block and listeners only
// receive the latest allocation update -- never a stale version.
type AllocBroadcaster struct {
m sync.Mutex
listeners map[int]chan<- *structs.Allocation // lazy init
alloc *structs.Allocation
listeners map[int]chan *structs.Allocation // lazy init
nextId int
capacity int
closed bool
}
// NewAllocBroadcaster returns a new AllocBroadcaster with the given capacity (0 means unbuffered).
func NewAllocBroadcaster(n int) *AllocBroadcaster {
return &AllocBroadcaster{capacity: n}
// NewAllocBroadcaster returns a new AllocBroadcaster with the given initial
// allocation.
func NewAllocBroadcaster(initial *structs.Allocation) *AllocBroadcaster {
return &AllocBroadcaster{
alloc: initial,
}
}
// AllocListener implements a listening endpoint for an allocation broadcast channel.
// AllocListener implements a listening endpoint for an allocation broadcast
// channel.
type AllocListener struct {
// Ch receives the broadcast messages.
Ch <-chan *structs.Allocation
@@ -29,27 +46,33 @@ type AllocListener struct {
id int
}
// Send broadcasts a message to the channel. Send returns whether the message
// was sent to all channels.
func (b *AllocBroadcaster) Send(v *structs.Allocation) bool {
// Send broadcasts an allocation update. Any pending updates are replaced with
// this version of the allocation to prevent blocking on slow receivers.
func (b *AllocBroadcaster) Send(v *structs.Allocation) error {
b.m.Lock()
defer b.m.Unlock()
if b.closed {
return false
return ErrAllocBroadcasterClosed
}
sent := true
// Update alloc on broadcaster to send to newly created listeners
b.alloc = v
// Send alloc to already created listeners
for _, l := range b.listeners {
select {
case l <- v:
default:
sent = false
case <-l:
// Pop pending update and replace with new update
l <- v
}
}
return sent
return nil
}
// Close closes the channel, disabling the sending of further messages.
// Close closes the channel, disabling the sending of further allocation
// updates.
func (b *AllocBroadcaster) Close() {
b.m.Lock()
defer b.m.Unlock()
@@ -57,6 +80,7 @@ func (b *AllocBroadcaster) Close() {
return
}
b.alloc = nil
b.closed = true
for _, l := range b.listeners {
close(l)
@@ -68,16 +92,25 @@ func (b *AllocBroadcaster) Listen() *AllocListener {
b.m.Lock()
defer b.m.Unlock()
if b.listeners == nil {
b.listeners = make(map[int]chan<- *structs.Allocation)
b.listeners = make(map[int]chan *structs.Allocation)
}
for b.listeners[b.nextId] != nil {
b.nextId++
}
ch := make(chan *structs.Allocation, b.capacity)
ch := make(chan *structs.Allocation, listenerCap)
if b.closed {
// Broadcaster is already closed, close this listener
close(ch)
} else {
// Send the current allocation to the listener
ch <- b.alloc
}
b.listeners[b.nextId] = ch
return &AllocListener{ch, b, b.nextId}
}