From d44d4b57de9d76e8179f5adebd4bd06c9d17050b Mon Sep 17 00:00:00 2001 From: Danielle Tomlinson Date: Thu, 6 Dec 2018 12:15:59 +0100 Subject: [PATCH] client: Unify handling of previous and preempted allocs --- client/allocrunner/alloc_runner.go | 14 +-- client/allocrunner/alloc_runner_hooks.go | 4 +- client/allocrunner/config.go | 9 +- client/allocrunner/migrate_hook.go | 4 +- ...mption_hook.go => upstream_allocs_hook.go} | 14 +-- client/allocwatcher/alloc_watcher.go | 97 +++++++++++++------ client/allocwatcher/alloc_watcher_test.go | 14 +-- client/allocwatcher/group_alloc_watcher.go | 21 +--- .../allocwatcher/group_alloc_watcher_test.go | 16 +-- client/client.go | 48 ++++----- 10 files changed, 124 insertions(+), 117 deletions(-) rename client/allocrunner/{preemption_hook.go => upstream_allocs_hook.go} (50%) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 06eab7f0f..3a5ab0f6d 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -98,12 +98,12 @@ type allocRunner struct { // allocBroadcaster sends client allocation updates to all listeners allocBroadcaster *cstructs.AllocBroadcaster - // prevAllocWatcher allows waiting for a previous allocation to exit - // and if necessary migrate its alloc dir. + // prevAllocWatcher allows waiting for any previous or preempted allocations + // to exit prevAllocWatcher allocwatcher.PrevAllocWatcher - // preemptedAllocWatcher allows waiting for preempted allocations to exit - preemptedAllocWatcher allocwatcher.PrevAllocWatcher + // prevAllocMigrator allows the migration of a previous allocations alloc dir. + prevAllocMigrator allocwatcher.PrevAllocMigrator // pluginSingletonLoader is a plugin loader that will returns singleton // instances of the plugins. @@ -137,7 +137,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { taskStateUpdateHandlerCh: make(chan struct{}), deviceStatsReporter: config.DeviceStatsReporter, prevAllocWatcher: config.PrevAllocWatcher, - preemptedAllocWatcher: config.PreemptedAllocWatcher, + prevAllocMigrator: config.PrevAllocMigrator, pluginSingletonLoader: config.PluginSingletonLoader, devicemanager: config.DeviceManager, } @@ -672,7 +672,7 @@ func (ar *allocRunner) IsDestroyed() bool { // // This method is safe for calling concurrently with Run(). func (ar *allocRunner) IsWaiting() bool { - return ar.preemptedAllocWatcher.IsWaiting() || ar.prevAllocWatcher.IsWaiting() + return ar.prevAllocWatcher.IsWaiting() } // Shutdown AllocRunner gracefully. Blocks while shutting down all TaskRunners. @@ -717,7 +717,7 @@ func (ar *allocRunner) Shutdown() { // // This method is safe for calling concurrently with Run(). func (ar *allocRunner) IsMigrating() bool { - return ar.prevAllocWatcher.IsMigrating() + return ar.prevAllocMigrator.IsMigrating() } func (ar *allocRunner) StatsReporter() interfaces.AllocStatsReporter { diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index c147d2590..9119663e3 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -77,8 +77,8 @@ func (ar *allocRunner) initRunnerHooks() { // directory path exists for other hooks. ar.runnerHooks = []interfaces.RunnerHook{ newAllocDirHook(hookLogger, ar.allocDir), - newPreemptionHook(hookLogger, ar.preemptedAllocWatcher), - newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir), + newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher), + newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir), newAllocHealthWatcherHook(hookLogger, ar.Alloc(), hs, ar.Listener(), ar.consulClient), } } diff --git a/client/allocrunner/config.go b/client/allocrunner/config.go index 476644d7a..6406d2a5a 100644 --- a/client/allocrunner/config.go +++ b/client/allocrunner/config.go @@ -36,15 +36,14 @@ type Config struct { // StateUpdater is used to emit updated task state StateUpdater interfaces.AllocStateHandler - // deviceStatsReporter is used to lookup resource usage for alloc devices + // DeviceStatsReporter is used to lookup resource usage for alloc devices DeviceStatsReporter interfaces.DeviceStatsReporter - // PrevAllocWatcher handles waiting on previous allocations and - // migrating their ephemeral disk when necessary. + // PrevAllocWatcher handles waiting on previous or preempted allocations PrevAllocWatcher allocwatcher.PrevAllocWatcher - // PreemptedAllocWatcher allows waiting for preempted allocations to exit - PreemptedAllocWatcher allocwatcher.PrevAllocWatcher + // PrevAllocMigrator allows the migration of a previous allocations alloc dir + PrevAllocMigrator allocwatcher.PrevAllocMigrator // PluginLoader is used to load plugins. PluginLoader loader.PluginCatalog diff --git a/client/allocrunner/migrate_hook.go b/client/allocrunner/migrate_hook.go index ca9efcc25..3a6f5e3ee 100644 --- a/client/allocrunner/migrate_hook.go +++ b/client/allocrunner/migrate_hook.go @@ -13,11 +13,11 @@ import ( // being built but must be run before anything else manipulates the alloc dir. type diskMigrationHook struct { allocDir *allocdir.AllocDir - allocWatcher allocwatcher.PrevAllocWatcher + allocWatcher allocwatcher.PrevAllocMigrator logger log.Logger } -func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher, allocDir *allocdir.AllocDir) *diskMigrationHook { +func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocMigrator, allocDir *allocdir.AllocDir) *diskMigrationHook { h := &diskMigrationHook{ allocDir: allocDir, allocWatcher: allocWatcher, diff --git a/client/allocrunner/preemption_hook.go b/client/allocrunner/upstream_allocs_hook.go similarity index 50% rename from client/allocrunner/preemption_hook.go rename to client/allocrunner/upstream_allocs_hook.go index 112c5d8e5..eb53c4436 100644 --- a/client/allocrunner/preemption_hook.go +++ b/client/allocrunner/upstream_allocs_hook.go @@ -7,26 +7,26 @@ import ( "github.com/hashicorp/nomad/client/allocwatcher" ) -// preemptionWatchingHook waits for a PrevAllocWatcher to exit before allowing +// upstreamAllocsHook waits for a PrevAllocWatcher to exit before allowing // an allocation to be executed -type preemptionWatchingHook struct { +type upstreamAllocsHook struct { allocWatcher allocwatcher.PrevAllocWatcher logger log.Logger } -func newPreemptionHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher) *preemptionWatchingHook { - h := &preemptionWatchingHook{ +func newUpstreamAllocsHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher) *upstreamAllocsHook { + h := &upstreamAllocsHook{ allocWatcher: allocWatcher, } h.logger = logger.Named(h.Name()) return h } -func (h *preemptionWatchingHook) Name() string { - return "await_preemptions" +func (h *upstreamAllocsHook) Name() string { + return "await_previous_allocations" } -func (h *preemptionWatchingHook) Prerun(ctx context.Context) error { +func (h *upstreamAllocsHook) Prerun(ctx context.Context) error { // Wait for a previous alloc - if any - to terminate return h.allocWatcher.Wait(ctx) } diff --git a/client/allocwatcher/alloc_watcher.go b/client/allocwatcher/alloc_watcher.go index 1ca326bca..dc24e16c6 100644 --- a/client/allocwatcher/alloc_watcher.go +++ b/client/allocwatcher/alloc_watcher.go @@ -47,20 +47,26 @@ type AllocRunnerMeta interface { } // PrevAllocWatcher allows AllocRunners to wait for a previous allocation to -// terminate and migrate its data whether or not the previous allocation is -// local or remote. +// terminate whether or not the previous allocation is local or remote. +// See `PrevAllocMigrator` for migrating workloads. type PrevAllocWatcher interface { // Wait for previous alloc to terminate Wait(context.Context) error - // Migrate data from previous alloc - Migrate(ctx context.Context, dest *allocdir.AllocDir) error - // IsWaiting returns true if a concurrent caller is blocked in Wait IsWaiting() bool +} + +// PrevAllocMigrator allows AllocRunners to migrate a previous allocation +// whether or not the previous allocation is local or remote. +type PrevAllocMigrator interface { + PrevAllocWatcher // IsMigrating returns true if a concurrent caller is in Migrate IsMigrating() bool + + // Migrate data from previous alloc + Migrate(ctx context.Context, dest *allocdir.AllocDir) error } type Config struct { @@ -68,10 +74,13 @@ type Config struct { // previous allocation stopping. Alloc *structs.Allocation - // PreviousRunner is non-nil iff All has a PreviousAllocation and it is + // PreviousRunner is non-nil if Alloc has a PreviousAllocation and it is // running locally. PreviousRunner AllocRunnerMeta + // PreemptedRunners is non-nil if Alloc has one or more PreemptedAllocations. + PreemptedRunners map[string]AllocRunnerMeta + // RPC allows the alloc watcher to monitor remote allocations. RPC RPCer @@ -85,31 +94,30 @@ type Config struct { Logger hclog.Logger } -// NewAllocWatcher creates a PrevAllocWatcher appropriate for whether this -// alloc's previous allocation was local or remote. If this alloc has no -// previous alloc then a noop implementation is returned. -func NewAllocWatcher(c Config) PrevAllocWatcher { - if c.Alloc.PreviousAllocation == "" { - // No previous allocation, use noop transitioner - return NoopPrevAlloc{} - } - +func newWatcherForAlloc(c Config, tg *structs.TaskGroup, watchedAllocID string, m AllocRunnerMeta) PrevAllocMigrator { logger := c.Logger.Named("alloc_watcher") logger = logger.With("alloc_id", c.Alloc.ID) - logger = logger.With("previous_alloc", c.Alloc.PreviousAllocation) + logger = logger.With("previous_alloc", watchedAllocID) - tg := c.Alloc.Job.LookupTaskGroup(c.Alloc.TaskGroup) + var tasks []*structs.Task + var sticky bool + var migrate bool + if tg != nil { + tasks = tg.Tasks + sticky = tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky + migrate = tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate + } - if c.PreviousRunner != nil { - // Previous allocation is local, use local transitioner + if m != nil { + // Local Allocation because there's no meta return &localPrevAlloc{ allocID: c.Alloc.ID, - prevAllocID: c.Alloc.PreviousAllocation, - tasks: tg.Tasks, - sticky: tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky, - prevAllocDir: c.PreviousRunner.GetAllocDir(), - prevListener: c.PreviousRunner.Listener(), - prevStatus: c.PreviousRunner.Alloc(), + prevAllocID: watchedAllocID, + tasks: tasks, + sticky: sticky, + prevAllocDir: m.GetAllocDir(), + prevListener: m.Listener(), + prevStatus: m.Alloc(), logger: logger, } } @@ -117,15 +125,50 @@ func NewAllocWatcher(c Config) PrevAllocWatcher { return &remotePrevAlloc{ allocID: c.Alloc.ID, prevAllocID: c.Alloc.PreviousAllocation, - tasks: tg.Tasks, + tasks: tasks, config: c.Config, - migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate, + migrate: migrate, rpc: c.RPC, migrateToken: c.MigrateToken, logger: logger, } } +// NewAllocWatcher creates a PrevAllocWatcher appropriate for whether this +// alloc's previous allocation was local or remote. If this alloc has no +// previous alloc then a noop implementation is returned. +func NewAllocWatcher(c Config) (PrevAllocWatcher, PrevAllocMigrator) { + if c.Alloc.PreviousAllocation == "" && c.PreemptedRunners == nil { + return NoopPrevAlloc{}, NoopPrevAlloc{} + } + + var prevAllocWatchers []PrevAllocWatcher + var prevAllocMigrator PrevAllocMigrator = NoopPrevAlloc{} + + // We have a previous allocation, add its listener to the watchers, and + // use a migrator. + if c.Alloc.PreviousAllocation != "" { + tg := c.Alloc.Job.LookupTaskGroup(c.Alloc.TaskGroup) + w := newWatcherForAlloc(c, tg, c.Alloc.PreviousAllocation, c.PreviousRunner) + prevAllocWatchers = append(prevAllocWatchers, w) + prevAllocMigrator = w + } + + // We are preempting allocations, add their listeners to the watchers. + if c.PreemptedRunners != nil { + for aid, r := range c.PreemptedRunners { + w := newWatcherForAlloc(c, nil, aid, r) + prevAllocWatchers = append(prevAllocWatchers, w) + } + } + + groupWatcher := &groupPrevAllocWatcher{ + prevAllocs: prevAllocWatchers, + } + + return groupWatcher, prevAllocMigrator +} + // localPrevAlloc is a prevAllocWatcher for previous allocations on the same // node as an updated allocation. type localPrevAlloc struct { diff --git a/client/allocwatcher/alloc_watcher_test.go b/client/allocwatcher/alloc_watcher_test.go index 1eba8cd57..2f34894a2 100644 --- a/client/allocwatcher/alloc_watcher_test.go +++ b/client/allocwatcher/alloc_watcher_test.go @@ -97,20 +97,20 @@ func TestPrevAlloc_Noop(t *testing.T) { conf.Alloc.PreviousAllocation = "" - watcher := NewAllocWatcher(conf) + watcher, migrator := NewAllocWatcher(conf) require.NotNil(t, watcher) - _, ok := watcher.(NoopPrevAlloc) - require.True(t, ok, "expected watcher to be NoopPrevAlloc") + _, ok := migrator.(NoopPrevAlloc) + require.True(t, ok, "expected migrator to be NoopPrevAlloc") done := make(chan int, 2) go func() { watcher.Wait(context.Background()) done <- 1 - watcher.Migrate(context.Background(), nil) + migrator.Migrate(context.Background(), nil) done <- 1 }() require.False(t, watcher.IsWaiting()) - require.False(t, watcher.IsMigrating()) + require.False(t, migrator.IsMigrating()) <-done <-done } @@ -127,7 +127,7 @@ func TestPrevAlloc_LocalPrevAlloc_Block(t *testing.T) { "run_for": "500ms", } - waiter := NewAllocWatcher(conf) + _, waiter := NewAllocWatcher(conf) // Wait in a goroutine with a context to make sure it exits at the right time ctx, cancel := context.WithCancel(context.Background()) @@ -191,7 +191,7 @@ func TestPrevAlloc_LocalPrevAlloc_Terminated(t *testing.T) { conf.PreviousRunner.Alloc().ClientStatus = structs.AllocClientStatusComplete - waiter := NewAllocWatcher(conf) + waiter, _ := NewAllocWatcher(conf) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/client/allocwatcher/group_alloc_watcher.go b/client/allocwatcher/group_alloc_watcher.go index 22201cc55..087e6256f 100644 --- a/client/allocwatcher/group_alloc_watcher.go +++ b/client/allocwatcher/group_alloc_watcher.go @@ -2,11 +2,9 @@ package allocwatcher import ( "context" - "errors" "sync" multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/nomad/client/allocdir" ) type groupPrevAllocWatcher struct { @@ -20,17 +18,10 @@ type groupPrevAllocWatcher struct { waitingLock sync.RWMutex } -func NewGroupAllocWatcher(watchers ...PrevAllocWatcher) (PrevAllocWatcher, error) { - for _, watcher := range watchers { - _, ok := watcher.(*localPrevAlloc) - if !ok { - return nil, errors.New("PrevAllocWatchers must all be local watchers") - } - } - +func NewGroupAllocWatcher(watchers ...PrevAllocWatcher) PrevAllocWatcher { return &groupPrevAllocWatcher{ prevAllocs: watchers, - }, nil + } } // Wait on the previous allocs to become terminal, exit, or, return due to @@ -76,17 +67,9 @@ func (g *groupPrevAllocWatcher) Wait(ctx context.Context) error { return merr.ErrorOrNil() } -func (g *groupPrevAllocWatcher) Migrate(ctx context.Context, dest *allocdir.AllocDir) error { - return errors.New("Migration unimplemented for a groupPrevAllocWatcher") -} - func (g *groupPrevAllocWatcher) IsWaiting() bool { g.waitingLock.RLock() defer g.waitingLock.RUnlock() return g.waiting } - -func (g *groupPrevAllocWatcher) IsMigrating() bool { - return false -} diff --git a/client/allocwatcher/group_alloc_watcher_test.go b/client/allocwatcher/group_alloc_watcher_test.go index 0740fb3f1..f992f3410 100644 --- a/client/allocwatcher/group_alloc_watcher_test.go +++ b/client/allocwatcher/group_alloc_watcher_test.go @@ -22,7 +22,7 @@ func TestPrevAlloc_GroupPrevAllocWatcher_Block(t *testing.T) { "run_for": "500ms", } - waiter := NewAllocWatcher(conf) + waiter, _ := NewAllocWatcher(conf) groupWaiter := &groupPrevAllocWatcher{prevAllocs: []PrevAllocWatcher{waiter}} @@ -70,10 +70,7 @@ func TestPrevAlloc_GroupPrevAllocWatcher_Block(t *testing.T) { require.NoError(t, err) testutil.WaitForResult(func() (bool, error) { - if groupWaiter.IsWaiting() { - return false, fmt.Errorf("did not expect watcher to be waiting") - } - return !groupWaiter.IsMigrating(), fmt.Errorf("did not expect watcher to be migrating") + return !groupWaiter.IsWaiting(), fmt.Errorf("did not expect watcher to be waiting") }, func(err error) { t.Fatalf("error: %v", err) }) @@ -96,8 +93,8 @@ func TestPrevAlloc_GroupPrevAllocWatcher_BlockMulti(t *testing.T) { "run_for": "500ms", } - waiter1 := NewAllocWatcher(conf1) - waiter2 := NewAllocWatcher(conf2) + waiter1, _ := NewAllocWatcher(conf1) + waiter2, _ := NewAllocWatcher(conf2) groupWaiter := &groupPrevAllocWatcher{ prevAllocs: []PrevAllocWatcher{ @@ -147,10 +144,7 @@ func TestPrevAlloc_GroupPrevAllocWatcher_BlockMulti(t *testing.T) { terminalBroadcastFn(conf2) testutil.WaitForResult(func() (bool, error) { - if groupWaiter.IsWaiting() { - return false, fmt.Errorf("did not expect watcher to be waiting") - } - return !groupWaiter.IsMigrating(), fmt.Errorf("did not expect watcher to be migrating") + return !groupWaiter.IsWaiting(), fmt.Errorf("did not expect watcher to be waiting") }, func(err error) { t.Fatalf("error: %v", err) }) diff --git a/client/client.go b/client/client.go index 27a91234f..eed0a8373 100644 --- a/client/client.go +++ b/client/client.go @@ -2028,39 +2028,27 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error return err } + // Collect any preempted allocations to pass into the previous alloc watcher + var preemptedAllocs map[string]allocwatcher.AllocRunnerMeta + if len(alloc.PreemptedAllocations) > 0 { + preemptedAllocs = make(map[string]allocwatcher.AllocRunnerMeta) + for _, palloc := range alloc.PreemptedAllocations { + preemptedAllocs[palloc] = c.allocs[palloc] + } + } + // Since only the Client has access to other AllocRunners and the RPC // client, create the previous allocation watcher here. watcherConfig := allocwatcher.Config{ - Alloc: alloc, - PreviousRunner: c.allocs[alloc.PreviousAllocation], - RPC: c, - Config: c.configCopy, - MigrateToken: migrateToken, - Logger: c.logger, - } - prevAllocWatcher := allocwatcher.NewAllocWatcher(watcherConfig) - - var preemptedAllocWatchers []allocwatcher.PrevAllocWatcher - for _, palloc := range alloc.PreemptedAllocations { - cfg := allocwatcher.Config{ - Alloc: alloc, - PreviousRunner: c.allocs[palloc], - RPC: c, - Config: c.configCopy, - Logger: c.logger, - } - w := allocwatcher.NewAllocWatcher(cfg) - preemptedAllocWatchers = append(preemptedAllocWatchers, w) - } - - var preemptedAllocWatcher allocwatcher.PrevAllocWatcher = allocwatcher.NoopPrevAlloc{} - if len(preemptedAllocWatchers) > 0 { - var err error - preemptedAllocWatcher, err = allocwatcher.NewGroupAllocWatcher(preemptedAllocWatchers...) - if err != nil { - return err - } + Alloc: alloc, + PreviousRunner: c.allocs[alloc.PreviousAllocation], + PreemptedRunners: preemptedAllocs, + RPC: c, + Config: c.configCopy, + MigrateToken: migrateToken, + Logger: c.logger, } + prevAllocWatcher, prevAllocMigrator := allocwatcher.NewAllocWatcher(watcherConfig) // Copy the config since the node can be swapped out as it is being updated. // The long term fix is to pass in the config and node separately and then @@ -2076,7 +2064,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error StateUpdater: c, DeviceStatsReporter: c, PrevAllocWatcher: prevAllocWatcher, - PreemptedAllocWatcher: preemptedAllocWatcher, + PrevAllocMigrator: prevAllocMigrator, PluginLoader: c.config.PluginLoader, PluginSingletonLoader: c.config.PluginSingletonLoader, DeviceManager: c.devicemanager,