Files
nomad/client/allocwatcher/group_alloc_watcher.go
Danielle Tomlinson c6d1981955 allocwatcher: Add Group AllocWatcher
The Group Alloc watcher is an implementation of a PrevAllocWatcher that
can wait for multiple previous allocs before terminating.

This is to be used when running an allocation that is preempting upstream
allocations, and thus only supports being ran with a local alloc watcher.

It also currently requires all of its child watchers to correctly handle
context cancellation. Should this be a problem, it should be fairly easy
to implement a replacement using channels rather than a waitgroup.

It obeys the PrevAllocWatcher interface for convenience, but it may be
better to extract Migration capabilities into a seperate interface for
greater clarity.
2018-12-11 00:58:27 +01:00

93 lines
2.2 KiB
Go

package allocwatcher
import (
"context"
"errors"
"sync"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
)
type groupPrevAllocWatcher struct {
prevAllocs []PrevAllocWatcher
wg sync.WaitGroup
// waiting and migrating are true when alloc runner is waiting on the
// prevAllocWatcher. Writers must acquire the waitingLock and readers
// should use the helper methods IsWaiting and IsMigrating.
waiting bool
waitingLock sync.RWMutex
}
func NewGroupAllocWatcher(watchers ...PrevAllocWatcher) (PrevAllocWatcher, error) {
for _, watcher := range watchers {
_, ok := watcher.(*localPrevAlloc)
if !ok {
return nil, errors.New("PrevAllocWatchers must all be local watchers")
}
}
return &groupPrevAllocWatcher{
prevAllocs: watchers,
}, nil
}
// Wait on the previous allocs to become terminal, exit, or, return due to
// context termination. Usage of the groupPrevAllocWatcher requires that all
// sub-watchers correctly handle context cancellation.
// We may need to adjust this to use channels rather than a wait group, if we
// wish to more strictly enforce timeouts.
func (g *groupPrevAllocWatcher) Wait(ctx context.Context) error {
g.waitingLock.Lock()
g.waiting = true
g.waitingLock.Unlock()
defer func() {
g.waitingLock.Lock()
g.waiting = false
g.waitingLock.Unlock()
}()
var merr multierror.Error
var errmu sync.Mutex
g.wg.Add(len(g.prevAllocs))
for _, alloc := range g.prevAllocs {
go func(ctx context.Context, alloc PrevAllocWatcher) {
defer g.wg.Done()
err := alloc.Wait(ctx)
if err != nil {
errmu.Lock()
merr.Errors = append(merr.Errors, err)
errmu.Unlock()
}
}(ctx, alloc)
}
g.wg.Wait()
// Check ctx.Err first, to avoid returning an mErr of ctx.Err from prevAlloc
// Wait routines.
if err := ctx.Err(); err != nil {
return err
}
return merr.ErrorOrNil()
}
func (g *groupPrevAllocWatcher) Migrate(ctx context.Context, dest *allocdir.AllocDir) error {
return errors.New("Migration unimplemented for a groupPrevAllocWatcher")
}
func (g *groupPrevAllocWatcher) IsWaiting() bool {
g.waitingLock.RLock()
defer g.waitingLock.RUnlock()
return g.waiting
}
func (g *groupPrevAllocWatcher) IsMigrating() bool {
return false
}