client: Unify handling of previous and preempted allocs

This commit is contained in:
Danielle Tomlinson
2018-12-06 12:15:59 +01:00
parent a4cf83d00c
commit d44d4b57de
10 changed files with 124 additions and 117 deletions

View File

@@ -98,12 +98,12 @@ type allocRunner struct {
// allocBroadcaster sends client allocation updates to all listeners
allocBroadcaster *cstructs.AllocBroadcaster
// prevAllocWatcher allows waiting for a previous allocation to exit
// and if necessary migrate its alloc dir.
// prevAllocWatcher allows waiting for any previous or preempted allocations
// to exit
prevAllocWatcher allocwatcher.PrevAllocWatcher
// preemptedAllocWatcher allows waiting for preempted allocations to exit
preemptedAllocWatcher allocwatcher.PrevAllocWatcher
// prevAllocMigrator allows the migration of a previous allocations alloc dir.
prevAllocMigrator allocwatcher.PrevAllocMigrator
// pluginSingletonLoader is a plugin loader that will returns singleton
// instances of the plugins.
@@ -137,7 +137,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
taskStateUpdateHandlerCh: make(chan struct{}),
deviceStatsReporter: config.DeviceStatsReporter,
prevAllocWatcher: config.PrevAllocWatcher,
preemptedAllocWatcher: config.PreemptedAllocWatcher,
prevAllocMigrator: config.PrevAllocMigrator,
pluginSingletonLoader: config.PluginSingletonLoader,
devicemanager: config.DeviceManager,
}
@@ -672,7 +672,7 @@ func (ar *allocRunner) IsDestroyed() bool {
//
// This method is safe for calling concurrently with Run().
func (ar *allocRunner) IsWaiting() bool {
return ar.preemptedAllocWatcher.IsWaiting() || ar.prevAllocWatcher.IsWaiting()
return ar.prevAllocWatcher.IsWaiting()
}
// Shutdown AllocRunner gracefully. Blocks while shutting down all TaskRunners.
@@ -717,7 +717,7 @@ func (ar *allocRunner) Shutdown() {
//
// This method is safe for calling concurrently with Run().
func (ar *allocRunner) IsMigrating() bool {
return ar.prevAllocWatcher.IsMigrating()
return ar.prevAllocMigrator.IsMigrating()
}
func (ar *allocRunner) StatsReporter() interfaces.AllocStatsReporter {

View File

@@ -77,8 +77,8 @@ func (ar *allocRunner) initRunnerHooks() {
// directory path exists for other hooks.
ar.runnerHooks = []interfaces.RunnerHook{
newAllocDirHook(hookLogger, ar.allocDir),
newPreemptionHook(hookLogger, ar.preemptedAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir),
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newAllocHealthWatcherHook(hookLogger, ar.Alloc(), hs, ar.Listener(), ar.consulClient),
}
}

View File

@@ -36,15 +36,14 @@ type Config struct {
// StateUpdater is used to emit updated task state
StateUpdater interfaces.AllocStateHandler
// deviceStatsReporter is used to lookup resource usage for alloc devices
// DeviceStatsReporter is used to lookup resource usage for alloc devices
DeviceStatsReporter interfaces.DeviceStatsReporter
// PrevAllocWatcher handles waiting on previous allocations and
// migrating their ephemeral disk when necessary.
// PrevAllocWatcher handles waiting on previous or preempted allocations
PrevAllocWatcher allocwatcher.PrevAllocWatcher
// PreemptedAllocWatcher allows waiting for preempted allocations to exit
PreemptedAllocWatcher allocwatcher.PrevAllocWatcher
// PrevAllocMigrator allows the migration of a previous allocations alloc dir
PrevAllocMigrator allocwatcher.PrevAllocMigrator
// PluginLoader is used to load plugins.
PluginLoader loader.PluginCatalog

View File

@@ -13,11 +13,11 @@ import (
// being built but must be run before anything else manipulates the alloc dir.
type diskMigrationHook struct {
allocDir *allocdir.AllocDir
allocWatcher allocwatcher.PrevAllocWatcher
allocWatcher allocwatcher.PrevAllocMigrator
logger log.Logger
}
func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher, allocDir *allocdir.AllocDir) *diskMigrationHook {
func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocMigrator, allocDir *allocdir.AllocDir) *diskMigrationHook {
h := &diskMigrationHook{
allocDir: allocDir,
allocWatcher: allocWatcher,

View File

@@ -7,26 +7,26 @@ import (
"github.com/hashicorp/nomad/client/allocwatcher"
)
// preemptionWatchingHook waits for a PrevAllocWatcher to exit before allowing
// upstreamAllocsHook waits for a PrevAllocWatcher to exit before allowing
// an allocation to be executed
type preemptionWatchingHook struct {
type upstreamAllocsHook struct {
allocWatcher allocwatcher.PrevAllocWatcher
logger log.Logger
}
func newPreemptionHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher) *preemptionWatchingHook {
h := &preemptionWatchingHook{
func newUpstreamAllocsHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher) *upstreamAllocsHook {
h := &upstreamAllocsHook{
allocWatcher: allocWatcher,
}
h.logger = logger.Named(h.Name())
return h
}
func (h *preemptionWatchingHook) Name() string {
return "await_preemptions"
func (h *upstreamAllocsHook) Name() string {
return "await_previous_allocations"
}
func (h *preemptionWatchingHook) Prerun(ctx context.Context) error {
func (h *upstreamAllocsHook) Prerun(ctx context.Context) error {
// Wait for a previous alloc - if any - to terminate
return h.allocWatcher.Wait(ctx)
}

View File

@@ -47,20 +47,26 @@ type AllocRunnerMeta interface {
}
// PrevAllocWatcher allows AllocRunners to wait for a previous allocation to
// terminate and migrate its data whether or not the previous allocation is
// local or remote.
// terminate whether or not the previous allocation is local or remote.
// See `PrevAllocMigrator` for migrating workloads.
type PrevAllocWatcher interface {
// Wait for previous alloc to terminate
Wait(context.Context) error
// Migrate data from previous alloc
Migrate(ctx context.Context, dest *allocdir.AllocDir) error
// IsWaiting returns true if a concurrent caller is blocked in Wait
IsWaiting() bool
}
// PrevAllocMigrator allows AllocRunners to migrate a previous allocation
// whether or not the previous allocation is local or remote.
type PrevAllocMigrator interface {
PrevAllocWatcher
// IsMigrating returns true if a concurrent caller is in Migrate
IsMigrating() bool
// Migrate data from previous alloc
Migrate(ctx context.Context, dest *allocdir.AllocDir) error
}
type Config struct {
@@ -68,10 +74,13 @@ type Config struct {
// previous allocation stopping.
Alloc *structs.Allocation
// PreviousRunner is non-nil iff All has a PreviousAllocation and it is
// PreviousRunner is non-nil if Alloc has a PreviousAllocation and it is
// running locally.
PreviousRunner AllocRunnerMeta
// PreemptedRunners is non-nil if Alloc has one or more PreemptedAllocations.
PreemptedRunners map[string]AllocRunnerMeta
// RPC allows the alloc watcher to monitor remote allocations.
RPC RPCer
@@ -85,31 +94,30 @@ type Config struct {
Logger hclog.Logger
}
// NewAllocWatcher creates a PrevAllocWatcher appropriate for whether this
// alloc's previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func NewAllocWatcher(c Config) PrevAllocWatcher {
if c.Alloc.PreviousAllocation == "" {
// No previous allocation, use noop transitioner
return NoopPrevAlloc{}
}
func newWatcherForAlloc(c Config, tg *structs.TaskGroup, watchedAllocID string, m AllocRunnerMeta) PrevAllocMigrator {
logger := c.Logger.Named("alloc_watcher")
logger = logger.With("alloc_id", c.Alloc.ID)
logger = logger.With("previous_alloc", c.Alloc.PreviousAllocation)
logger = logger.With("previous_alloc", watchedAllocID)
tg := c.Alloc.Job.LookupTaskGroup(c.Alloc.TaskGroup)
var tasks []*structs.Task
var sticky bool
var migrate bool
if tg != nil {
tasks = tg.Tasks
sticky = tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky
migrate = tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate
}
if c.PreviousRunner != nil {
// Previous allocation is local, use local transitioner
if m != nil {
// Local Allocation because there's no meta
return &localPrevAlloc{
allocID: c.Alloc.ID,
prevAllocID: c.Alloc.PreviousAllocation,
tasks: tg.Tasks,
sticky: tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky,
prevAllocDir: c.PreviousRunner.GetAllocDir(),
prevListener: c.PreviousRunner.Listener(),
prevStatus: c.PreviousRunner.Alloc(),
prevAllocID: watchedAllocID,
tasks: tasks,
sticky: sticky,
prevAllocDir: m.GetAllocDir(),
prevListener: m.Listener(),
prevStatus: m.Alloc(),
logger: logger,
}
}
@@ -117,15 +125,50 @@ func NewAllocWatcher(c Config) PrevAllocWatcher {
return &remotePrevAlloc{
allocID: c.Alloc.ID,
prevAllocID: c.Alloc.PreviousAllocation,
tasks: tg.Tasks,
tasks: tasks,
config: c.Config,
migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate,
migrate: migrate,
rpc: c.RPC,
migrateToken: c.MigrateToken,
logger: logger,
}
}
// NewAllocWatcher creates a PrevAllocWatcher appropriate for whether this
// alloc's previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func NewAllocWatcher(c Config) (PrevAllocWatcher, PrevAllocMigrator) {
if c.Alloc.PreviousAllocation == "" && c.PreemptedRunners == nil {
return NoopPrevAlloc{}, NoopPrevAlloc{}
}
var prevAllocWatchers []PrevAllocWatcher
var prevAllocMigrator PrevAllocMigrator = NoopPrevAlloc{}
// We have a previous allocation, add its listener to the watchers, and
// use a migrator.
if c.Alloc.PreviousAllocation != "" {
tg := c.Alloc.Job.LookupTaskGroup(c.Alloc.TaskGroup)
w := newWatcherForAlloc(c, tg, c.Alloc.PreviousAllocation, c.PreviousRunner)
prevAllocWatchers = append(prevAllocWatchers, w)
prevAllocMigrator = w
}
// We are preempting allocations, add their listeners to the watchers.
if c.PreemptedRunners != nil {
for aid, r := range c.PreemptedRunners {
w := newWatcherForAlloc(c, nil, aid, r)
prevAllocWatchers = append(prevAllocWatchers, w)
}
}
groupWatcher := &groupPrevAllocWatcher{
prevAllocs: prevAllocWatchers,
}
return groupWatcher, prevAllocMigrator
}
// localPrevAlloc is a prevAllocWatcher for previous allocations on the same
// node as an updated allocation.
type localPrevAlloc struct {

View File

@@ -97,20 +97,20 @@ func TestPrevAlloc_Noop(t *testing.T) {
conf.Alloc.PreviousAllocation = ""
watcher := NewAllocWatcher(conf)
watcher, migrator := NewAllocWatcher(conf)
require.NotNil(t, watcher)
_, ok := watcher.(NoopPrevAlloc)
require.True(t, ok, "expected watcher to be NoopPrevAlloc")
_, ok := migrator.(NoopPrevAlloc)
require.True(t, ok, "expected migrator to be NoopPrevAlloc")
done := make(chan int, 2)
go func() {
watcher.Wait(context.Background())
done <- 1
watcher.Migrate(context.Background(), nil)
migrator.Migrate(context.Background(), nil)
done <- 1
}()
require.False(t, watcher.IsWaiting())
require.False(t, watcher.IsMigrating())
require.False(t, migrator.IsMigrating())
<-done
<-done
}
@@ -127,7 +127,7 @@ func TestPrevAlloc_LocalPrevAlloc_Block(t *testing.T) {
"run_for": "500ms",
}
waiter := NewAllocWatcher(conf)
_, waiter := NewAllocWatcher(conf)
// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())
@@ -191,7 +191,7 @@ func TestPrevAlloc_LocalPrevAlloc_Terminated(t *testing.T) {
conf.PreviousRunner.Alloc().ClientStatus = structs.AllocClientStatusComplete
waiter := NewAllocWatcher(conf)
waiter, _ := NewAllocWatcher(conf)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

View File

@@ -2,11 +2,9 @@ package allocwatcher
import (
"context"
"errors"
"sync"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
)
type groupPrevAllocWatcher struct {
@@ -20,17 +18,10 @@ type groupPrevAllocWatcher struct {
waitingLock sync.RWMutex
}
func NewGroupAllocWatcher(watchers ...PrevAllocWatcher) (PrevAllocWatcher, error) {
for _, watcher := range watchers {
_, ok := watcher.(*localPrevAlloc)
if !ok {
return nil, errors.New("PrevAllocWatchers must all be local watchers")
}
}
func NewGroupAllocWatcher(watchers ...PrevAllocWatcher) PrevAllocWatcher {
return &groupPrevAllocWatcher{
prevAllocs: watchers,
}, nil
}
}
// Wait on the previous allocs to become terminal, exit, or, return due to
@@ -76,17 +67,9 @@ func (g *groupPrevAllocWatcher) Wait(ctx context.Context) error {
return merr.ErrorOrNil()
}
func (g *groupPrevAllocWatcher) Migrate(ctx context.Context, dest *allocdir.AllocDir) error {
return errors.New("Migration unimplemented for a groupPrevAllocWatcher")
}
func (g *groupPrevAllocWatcher) IsWaiting() bool {
g.waitingLock.RLock()
defer g.waitingLock.RUnlock()
return g.waiting
}
func (g *groupPrevAllocWatcher) IsMigrating() bool {
return false
}

View File

@@ -22,7 +22,7 @@ func TestPrevAlloc_GroupPrevAllocWatcher_Block(t *testing.T) {
"run_for": "500ms",
}
waiter := NewAllocWatcher(conf)
waiter, _ := NewAllocWatcher(conf)
groupWaiter := &groupPrevAllocWatcher{prevAllocs: []PrevAllocWatcher{waiter}}
@@ -70,10 +70,7 @@ func TestPrevAlloc_GroupPrevAllocWatcher_Block(t *testing.T) {
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
if groupWaiter.IsWaiting() {
return false, fmt.Errorf("did not expect watcher to be waiting")
}
return !groupWaiter.IsMigrating(), fmt.Errorf("did not expect watcher to be migrating")
return !groupWaiter.IsWaiting(), fmt.Errorf("did not expect watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
@@ -96,8 +93,8 @@ func TestPrevAlloc_GroupPrevAllocWatcher_BlockMulti(t *testing.T) {
"run_for": "500ms",
}
waiter1 := NewAllocWatcher(conf1)
waiter2 := NewAllocWatcher(conf2)
waiter1, _ := NewAllocWatcher(conf1)
waiter2, _ := NewAllocWatcher(conf2)
groupWaiter := &groupPrevAllocWatcher{
prevAllocs: []PrevAllocWatcher{
@@ -147,10 +144,7 @@ func TestPrevAlloc_GroupPrevAllocWatcher_BlockMulti(t *testing.T) {
terminalBroadcastFn(conf2)
testutil.WaitForResult(func() (bool, error) {
if groupWaiter.IsWaiting() {
return false, fmt.Errorf("did not expect watcher to be waiting")
}
return !groupWaiter.IsMigrating(), fmt.Errorf("did not expect watcher to be migrating")
return !groupWaiter.IsWaiting(), fmt.Errorf("did not expect watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})

View File

@@ -2028,39 +2028,27 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
return err
}
// Collect any preempted allocations to pass into the previous alloc watcher
var preemptedAllocs map[string]allocwatcher.AllocRunnerMeta
if len(alloc.PreemptedAllocations) > 0 {
preemptedAllocs = make(map[string]allocwatcher.AllocRunnerMeta)
for _, palloc := range alloc.PreemptedAllocations {
preemptedAllocs[palloc] = c.allocs[palloc]
}
}
// Since only the Client has access to other AllocRunners and the RPC
// client, create the previous allocation watcher here.
watcherConfig := allocwatcher.Config{
Alloc: alloc,
PreviousRunner: c.allocs[alloc.PreviousAllocation],
RPC: c,
Config: c.configCopy,
MigrateToken: migrateToken,
Logger: c.logger,
}
prevAllocWatcher := allocwatcher.NewAllocWatcher(watcherConfig)
var preemptedAllocWatchers []allocwatcher.PrevAllocWatcher
for _, palloc := range alloc.PreemptedAllocations {
cfg := allocwatcher.Config{
Alloc: alloc,
PreviousRunner: c.allocs[palloc],
RPC: c,
Config: c.configCopy,
Logger: c.logger,
}
w := allocwatcher.NewAllocWatcher(cfg)
preemptedAllocWatchers = append(preemptedAllocWatchers, w)
}
var preemptedAllocWatcher allocwatcher.PrevAllocWatcher = allocwatcher.NoopPrevAlloc{}
if len(preemptedAllocWatchers) > 0 {
var err error
preemptedAllocWatcher, err = allocwatcher.NewGroupAllocWatcher(preemptedAllocWatchers...)
if err != nil {
return err
}
Alloc: alloc,
PreviousRunner: c.allocs[alloc.PreviousAllocation],
PreemptedRunners: preemptedAllocs,
RPC: c,
Config: c.configCopy,
MigrateToken: migrateToken,
Logger: c.logger,
}
prevAllocWatcher, prevAllocMigrator := allocwatcher.NewAllocWatcher(watcherConfig)
// Copy the config since the node can be swapped out as it is being updated.
// The long term fix is to pass in the config and node separately and then
@@ -2076,7 +2064,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
StateUpdater: c,
DeviceStatsReporter: c,
PrevAllocWatcher: prevAllocWatcher,
PreemptedAllocWatcher: preemptedAllocWatcher,
PrevAllocMigrator: prevAllocMigrator,
PluginLoader: c.config.PluginLoader,
PluginSingletonLoader: c.config.PluginSingletonLoader,
DeviceManager: c.devicemanager,