Fix server shutdown not waiting for worker run completion (#19560)

* Move group into a separate helper module for reuse

* Add shutdownCh to worker

The shutdown channel is used to signal that worker has stopped.

* Make server shutdown block on workers' shutdownCh

* Fix waiting for eval broker state change blocking indefinitely

There was a race condition in the GenericNotifier between the
Run and WaitForChange functions, where WaitForChange blocks
trying to write to a full unsubscribeCh, but the Run function never
reads from the unsubscribeCh as it has already stopped.

This commit fixes it by unblocking if the notifier has been stopped.

* Bound the amount of time server shutdown waits on worker completion

* Fix lostcancel linter error

* Fix worker test using unexpected worker constructor

* Add changelog

---------

Co-authored-by: Marvin Chin <marvinchin@users.noreply.github.com>
This commit is contained in:
Marvin Chin
2024-01-05 22:45:07 +08:00
committed by GitHub
parent 5a00440b06
commit be8575a8a2
11 changed files with 129 additions and 72 deletions

3
.changelog/19560.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
server: Fix server not waiting for workers to submit nacks for dequeued evaluations before shutting down
```

View File

@@ -52,6 +52,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/envoy"
"github.com/hashicorp/nomad/helper/goruntime"
"github.com/hashicorp/nomad/helper/group"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/tlsutil"
@@ -259,7 +260,7 @@ type Client struct {
// shutdownGroup are goroutines that exit when shutdownCh is closed.
// Shutdown() blocks on Wait() after closing shutdownCh.
shutdownGroup group
shutdownGroup group.Group
// tokensClient is Nomad Client's custom Consul client for requesting Consul
// Service Identity tokens through Nomad Server.
@@ -876,7 +877,7 @@ func (c *Client) Shutdown() error {
// Stop Garbage collector
c.garbageCollector.Stop()
arGroup := group{}
arGroup := group.Group{}
if c.GetConfig().DevMode {
// In DevMode destroy all the running allocations.
for _, ar := range c.getAllocRunners() {
@@ -3390,33 +3391,6 @@ func (c *Client) GetTaskEventHandler(allocID, taskName string) drivermanager.Eve
return nil
}
// group wraps a func() in a goroutine and provides a way to block until it
// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup
type group struct {
wg sync.WaitGroup
}
// Go starts f in a goroutine and must be called before Wait.
func (g *group) Go(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
func (g *group) AddCh(ch <-chan struct{}) {
g.Go(func() {
<-ch
})
}
// Wait for all goroutines to exit. Must be called after all calls to Go
// complete.
func (g *group) Wait() {
g.wg.Wait()
}
// pendingClientUpdates are the set of allocation updates that the client is
// waiting to send
type pendingClientUpdates struct {

View File

@@ -4,6 +4,7 @@
package broker
import (
"context"
"time"
"github.com/hashicorp/nomad/helper"
@@ -21,15 +22,18 @@ type GenericNotifier struct {
// subscription membership mapping.
subscribeCh chan chan interface{}
unsubscribeCh chan chan interface{}
ctx context.Context
}
// NewGenericNotifier returns a generic notifier which can be used by a process
// to notify many subscribers when a specific update is triggered.
func NewGenericNotifier() *GenericNotifier {
func NewGenericNotifier(ctx context.Context) *GenericNotifier {
return &GenericNotifier{
publishCh: make(chan interface{}, 1),
subscribeCh: make(chan chan interface{}, 1),
unsubscribeCh: make(chan chan interface{}, 1),
ctx: ctx,
}
}
@@ -46,7 +50,7 @@ func (g *GenericNotifier) Notify(msg interface{}) {
// Run is a long-lived process which handles updating subscribers as well as
// ensuring any update is sent to them. The passed stopCh is used to coordinate
// shutdown.
func (g *GenericNotifier) Run(stopCh <-chan struct{}) {
func (g *GenericNotifier) Run() {
// Store our subscribers inline with a map. This map can only be accessed
// via a single channel update at a time, meaning we can manage without
@@ -55,7 +59,7 @@ func (g *GenericNotifier) Run(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
case <-g.ctx.Done():
return
case msgCh := <-g.subscribeCh:
subscribers[msgCh] = struct{}{}
@@ -83,7 +87,11 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {
// Create a channel and subscribe to any update. This channel is buffered
// to ensure we do not block the main broker process.
updateCh := make(chan interface{}, 1)
g.subscribeCh <- updateCh
select {
case <-g.ctx.Done():
return "shutting down"
case g.subscribeCh <- updateCh:
}
// Create a timeout timer and use the helper to ensure this routine doesn't
// panic and making the stop call clear.
@@ -93,7 +101,10 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {
// subscriber once it has been notified of a change, or reached its wait
// timeout.
defer func() {
g.unsubscribeCh <- updateCh
select {
case <-g.ctx.Done():
case g.unsubscribeCh <- updateCh:
}
close(updateCh)
timeoutStop()
}()
@@ -101,6 +112,8 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {
// Enter the main loop which listens for an update or timeout and returns
// this information to the subscriber.
select {
case <-g.ctx.Done():
return "shutting down"
case <-timeoutTimer.C:
return "wait timed out after " + timeout.String()
case update := <-updateCh:

View File

@@ -4,6 +4,7 @@
package broker
import (
"context"
"sync"
"testing"
"time"
@@ -16,11 +17,11 @@ func TestGenericNotifier(t *testing.T) {
ci.Parallel(t)
// Create the new notifier.
stopChan := make(chan struct{})
defer close(stopChan)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
notifier := NewGenericNotifier()
go notifier.Run(stopChan)
notifier := NewGenericNotifier(ctx)
go notifier.Run()
// Ensure we have buffered channels.
require.Equal(t, 1, cap(notifier.publishCh))

50
helper/group/group.go Normal file
View File

@@ -0,0 +1,50 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package group
import (
"context"
"sync"
)
// group wraps a func() in a goroutine and provides a way to block until it
// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup
type Group struct {
wg sync.WaitGroup
}
// Go starts f in a goroutine and must be called before Wait.
func (g *Group) Go(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
func (g *Group) AddCh(ch <-chan struct{}) {
g.Go(func() {
<-ch
})
}
// Wait for all goroutines to exit. Must be called after all calls to Go
// complete.
func (g *Group) Wait() {
g.wg.Wait()
}
// Wait for all goroutines to exit, or for the context to finish.
// Must be called after all calls to Go complete.
func (g *Group) WaitWithContext(ctx context.Context) {
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
g.Wait()
}()
select {
case <-doneCh:
case <-ctx.Done():
}
}

View File

@@ -4,6 +4,7 @@
package raftutil
import (
"context"
"fmt"
"io"
"path/filepath"
@@ -79,7 +80,7 @@ func dummyFSM(logger hclog.Logger) (nomadFSM, error) {
// use dummy non-enabled FSM dependencies
periodicDispatch := nomad.NewPeriodicDispatch(logger, nil)
blockedEvals := nomad.NewBlockedEvals(nil, logger)
evalBroker, err := nomad.NewEvalBroker(1, 1, 1, 1)
evalBroker, err := nomad.NewEvalBroker(context.Background(), 1, 1, 1, 1)
if err != nil {
return nil, err
}

View File

@@ -137,7 +137,7 @@ type PendingEvaluations []*structs.Evaluation
// initialNackDelay is the delay before making a Nacked evaluation available
// again for the first Nack and subsequentNackDelay is the compounding delay
// after the first Nack.
func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
func NewEvalBroker(ctx context.Context, timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
if timeout < 0 {
return nil, fmt.Errorf("timeout cannot be negative")
}
@@ -145,7 +145,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
enabledNotifier: broker.NewGenericNotifier(),
enabledNotifier: broker.NewGenericNotifier(ctx),
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),

View File

@@ -5,6 +5,7 @@ package nomad
import (
"container/heap"
"context"
"encoding/json"
"errors"
"fmt"
@@ -54,7 +55,7 @@ func testBroker(t *testing.T, timeout time.Duration) *EvalBroker {
}
func testBrokerFromConfig(t *testing.T, c *Config) *EvalBroker {
b, err := NewEvalBroker(c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
b, err := NewEvalBroker(context.Background(), c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@@ -35,6 +35,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/goruntime"
"github.com/hashicorp/nomad/helper/group"
"github.com/hashicorp/nomad/helper/iterator"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/tlsutil"
@@ -86,6 +87,10 @@ const (
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second
// workerShutdownGracePeriod is the maximum time we will wait for workers to stop
// gracefully when the server shuts down
workerShutdownGracePeriod = 5 * time.Second
// defaultConsulDiscoveryInterval is how often to poll Consul for new
// servers if there is no leader.
defaultConsulDiscoveryInterval time.Duration = 3 * time.Second
@@ -263,6 +268,10 @@ type Server struct {
workerConfigLock sync.RWMutex
workersEventCh chan interface{}
// workerShutdownGroup tracks the running worker goroutines so that Shutdown()
// can wait on their completion
workerShutdownGroup group.Group
// oidcProviderCache maintains a cache of OIDC providers. This is useful as
// the provider performs background HTTP requests. When the Nomad server is
// shutting down, the oidcProviderCache.Shutdown() function must be called.
@@ -315,17 +324,6 @@ type Server struct {
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc consul.ConfigAPIFunc, consulACLs consul.ACLsAPI) (*Server, error) {
// Create an eval broker
evalBroker, err := NewEvalBroker(
config.EvalNackTimeout,
config.EvalNackInitialReenqueueDelay,
config.EvalNackSubsequentReenqueueDelay,
config.EvalDeliveryLimit)
if err != nil {
return nil, err
}
// Configure TLS
tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, true, true)
if err != nil {
@@ -361,9 +359,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc
reconcileCh: make(chan serf.Member, 32),
readyForConsistentReads: &atomic.Bool{},
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
reapCancelableEvalsCh: make(chan struct{}),
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
workersEventCh: make(chan interface{}, 1),
lockTTLTimer: lock.NewTTLTimer(),
@@ -373,6 +369,21 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc
s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
s.shutdownCh = s.shutdownCtx.Done()
// Create an eval broker
evalBroker, err := NewEvalBroker(
s.shutdownCtx,
config.EvalNackTimeout,
config.EvalNackInitialReenqueueDelay,
config.EvalNackSubsequentReenqueueDelay,
config.EvalDeliveryLimit)
if err != nil {
return nil, err
}
s.evalBroker = evalBroker
// Create the blocked evals
s.blockedEvals = NewBlockedEvals(s.evalBroker, s.logger)
// Create the RPC handler
s.rpcHandler = newRpcHandler(s)
@@ -494,7 +505,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc
// Start the eval broker notification system so any subscribers can get
// updates when the processes SetEnabled is triggered.
go s.evalBroker.enabledNotifier.Run(s.shutdownCh)
go s.evalBroker.enabledNotifier.Run()
// Setup the node drainer.
s.setupNodeDrainer()
@@ -711,6 +722,13 @@ func (s *Server) Shutdown() error {
s.shutdown = true
s.shutdownCancel()
s.workerLock.Lock()
defer s.workerLock.Unlock()
s.stopOldWorkers(s.workers)
workerShutdownTimeoutCtx, cancelWorkerShutdownTimeoutCtx := context.WithTimeout(context.Background(), workerShutdownGracePeriod)
defer cancelWorkerShutdownTimeoutCtx()
s.workerShutdownGroup.WaitWithContext(workerShutdownTimeoutCtx)
if s.serf != nil {
s.serf.Shutdown()
}
@@ -1788,7 +1806,7 @@ func (s *Server) setupWorkersLocked(ctx context.Context, poolArgs SchedulerWorke
return err
} else {
s.logger.Debug("started scheduling worker", "id", w.ID(), "index", i+1, "of", s.config.NumSchedulers)
s.workerShutdownGroup.AddCh(w.ShutdownCh())
s.workers = append(s.workers, w)
}
}

View File

@@ -98,6 +98,9 @@ type Worker struct {
workloadStatus SchedulerWorkerStatus
statusLock sync.RWMutex
// shutdownCh is closed when the run function has exited
shutdownCh chan struct{}
pauseFlag bool
pauseLock sync.Mutex
pauseCond *sync.Cond
@@ -134,6 +137,7 @@ func newWorker(ctx context.Context, srv *Server, args SchedulerWorkerPoolArgs) *
srv: srv,
start: time.Now(),
status: WorkerStarting,
shutdownCh: make(chan struct{}),
enabledSchedulers: make([]string, len(args.EnabledSchedulers)),
failureBackoff: time.Duration(0),
}
@@ -393,6 +397,7 @@ func (w *Worker) workerShuttingDown() bool {
func (w *Worker) run(raftSyncLimit time.Duration) {
defer func() {
w.markStopped()
close(w.shutdownCh)
}()
w.setStatuses(WorkerStarted, WorkloadRunning)
w.logger.Debug("running")
@@ -894,3 +899,7 @@ func (w *Worker) backoffErr(base, limit time.Duration) bool {
func (w *Worker) backoffReset() {
w.failures = 0
}
func (w *Worker) ShutdownCh() <-chan struct{} {
return w.shutdownCh
}

View File

@@ -57,20 +57,6 @@ func init() {
}
}
// NewTestWorker returns the worker without calling it's run method.
func NewTestWorker(shutdownCtx context.Context, srv *Server) *Worker {
w := &Worker{
srv: srv,
start: time.Now(),
id: uuid.Generate(),
enabledSchedulers: srv.config.EnabledSchedulers,
}
w.logger = srv.logger.ResetNamed("worker").With("worker_id", w.id)
w.pauseCond = sync.NewCond(&w.pauseLock)
w.ctx, w.cancelFn = context.WithCancel(shutdownCtx)
return w
}
func TestWorker_dequeueEvaluation(t *testing.T) {
ci.Parallel(t)
@@ -364,7 +350,8 @@ func TestWorker_runBackoff(t *testing.T) {
workerCtx, workerCancel := context.WithCancel(srv.shutdownCtx)
defer workerCancel()
w := NewTestWorker(workerCtx, srv)
poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(srv.config).Copy()
w := newWorker(workerCtx, srv, poolArgs)
doneCh := make(chan struct{})
go func() {