mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Review comments
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -39,16 +40,20 @@ type LazyHandle struct {
|
||||
// h is the current handle and may be nil
|
||||
h *DriverHandle
|
||||
|
||||
// shutdownCtx is used to cancel retries if the agent is shutting down
|
||||
shutdownCtx context.Context
|
||||
|
||||
logger log.Logger
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// NewLazyHandle takes the function to receive the latest handle and a logger
|
||||
// and returns a LazyHandle
|
||||
func NewLazyHandle(fn retrieveHandleFn, logger log.Logger) *LazyHandle {
|
||||
func NewLazyHandle(shutdownCtx context.Context, fn retrieveHandleFn, logger log.Logger) *LazyHandle {
|
||||
return &LazyHandle{
|
||||
retrieveHandle: fn,
|
||||
h: fn(),
|
||||
shutdownCtx: shutdownCtx,
|
||||
logger: logger.Named("lazy_handle"),
|
||||
}
|
||||
}
|
||||
@@ -89,7 +94,12 @@ func (l *LazyHandle) refreshHandleLocked() (*DriverHandle, error) {
|
||||
}
|
||||
|
||||
l.logger.Debug("failed to retrieve handle", "backoff", backoff)
|
||||
time.Sleep(backoff)
|
||||
|
||||
select {
|
||||
case <-l.shutdownCtx.Done():
|
||||
return nil, l.shutdownCtx.Err()
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no driver handle")
|
||||
|
||||
@@ -99,12 +99,9 @@ func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, sto
|
||||
return
|
||||
}
|
||||
|
||||
// We do not log when the plugin is shutdown as this is either:
|
||||
// - A race between the stopCollection channel being closed and
|
||||
// calling Stats on the handle.
|
||||
// - The driver plugin has unexpectedly exited
|
||||
//
|
||||
// In either case sleeping and trying again or returning based
|
||||
// We do not log when the plugin is shutdown since this is
|
||||
// likely because the driver plugin has unexpectedly exited,
|
||||
// in which case sleeping and trying again or returning based
|
||||
// on the stop channel is the correct behavior
|
||||
if err != bstructs.ErrPluginShutdown {
|
||||
h.logger.Debug("error fetching stats of task", "error", err)
|
||||
|
||||
@@ -483,11 +483,12 @@ func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) (retryWai
|
||||
}
|
||||
|
||||
if result.Err == bstructs.ErrPluginShutdown {
|
||||
tr.logger.Warn("driver plugin has shutdown; attempting to recover task")
|
||||
dn := tr.Task().Driver
|
||||
tr.logger.Debug("driver plugin has shutdown; attempting to recover task", "driver", dn)
|
||||
|
||||
// Initialize a new driver handle
|
||||
if err := tr.initDriver(); err != nil {
|
||||
tr.logger.Error("failed to initialize driver after it exited unexpectedly", "error", err)
|
||||
tr.logger.Error("failed to initialize driver after it exited unexpectedly", "error", err, "driver", dn)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -497,11 +498,11 @@ func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) (retryWai
|
||||
net := tr.localState.DriverNetwork
|
||||
tr.stateLock.RUnlock()
|
||||
if !tr.restoreHandle(h, net) {
|
||||
tr.logger.Error("failed to restore handle on driver after it exited unexpectedly")
|
||||
tr.logger.Error("failed to restore handle on driver after it exited unexpectedly", "driver", dn)
|
||||
return false
|
||||
}
|
||||
|
||||
tr.logger.Info("task successfully recovered on driver")
|
||||
tr.logger.Debug("task successfully recovered on driver", "driver", dn)
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -622,14 +623,12 @@ func (tr *TaskRunner) runDriver() error {
|
||||
if err == bstructs.ErrPluginShutdown {
|
||||
tr.logger.Info("failed to start task because plugin shutdown unexpectedly; attempting to recover")
|
||||
if err := tr.initDriver(); err != nil {
|
||||
tr.logger.Error("failed to initialize driver after it exited unexpectedly", "error", err)
|
||||
return fmt.Errorf("driver exited and couldn't be started again: %v", err)
|
||||
return fmt.Errorf("failed to initialize driver after it exited unexpectedly: %v", err)
|
||||
}
|
||||
|
||||
handle, net, err = tr.driver.StartTask(taskConfig)
|
||||
if err != nil {
|
||||
tr.logger.Error("failed to start task after driver exited unexpectedly", "error", err)
|
||||
return fmt.Errorf("driver start failed: %v", err)
|
||||
return fmt.Errorf("failed to start task after driver exited unexpectedly: %v", err)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("driver start failed: %v", err)
|
||||
|
||||
@@ -256,7 +256,7 @@ func (tr *TaskRunner) poststart() error {
|
||||
|
||||
// Pass the lazy handle to the hooks so even if the driver exits and we
|
||||
// launch a new one (external plugin), the handle will refresh.
|
||||
lazyHandle := NewLazyHandle(tr.getDriverHandle, tr.logger)
|
||||
lazyHandle := NewLazyHandle(tr.ctx, tr.getDriverHandle, tr.logger)
|
||||
|
||||
var merr multierror.Error
|
||||
for _, hook := range tr.runnerHooks {
|
||||
|
||||
@@ -1,3 +1,8 @@
|
||||
// This package provides a mechanism to build the Docker driver plugin as an
|
||||
// external binary. The binary has two entry points; the docker driver and the
|
||||
// docker plugin's logging child binary. An example of using this is `go build
|
||||
// -o </nomad/plugin/dir/docker`. When Nomad agent is then launched, the
|
||||
// external docker plugin will be used.
|
||||
package main
|
||||
|
||||
import (
|
||||
@@ -33,7 +38,7 @@ func main() {
|
||||
plugins.Serve(factory)
|
||||
}
|
||||
|
||||
// factory returns a new instance of the Nvidia GPU plugin
|
||||
// factory returns a new instance of the docker driver plugin
|
||||
func factory(log log.Logger) interface{} {
|
||||
return docker.NewDockerDriver(log)
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@@ -196,6 +197,7 @@ type Driver struct {
|
||||
|
||||
// hasFingerprinted is used to store whether we have fingerprinted before
|
||||
hasFingerprinted bool
|
||||
fingerprintLock sync.Mutex
|
||||
}
|
||||
|
||||
func NewRktDriver(logger hclog.Logger) drivers.DriverPlugin {
|
||||
@@ -264,9 +266,23 @@ func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerp
|
||||
}
|
||||
}
|
||||
|
||||
// setFingerprinted marks the driver as having fingerprinted once before
|
||||
func (d *Driver) setFingerprinted() {
|
||||
d.fingerprintLock.Lock()
|
||||
d.hasFingerprinted = true
|
||||
d.fingerprintLock.Unlock()
|
||||
}
|
||||
|
||||
// fingerprinted returns whether the driver has fingerprinted before
|
||||
func (d *Driver) fingerprinted() bool {
|
||||
d.fingerprintLock.Lock()
|
||||
defer d.fingerprintLock.Unlock()
|
||||
return d.hasFingerprinted
|
||||
}
|
||||
|
||||
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
|
||||
defer func() {
|
||||
d.hasFingerprinted = true
|
||||
d.setFingerprinted()
|
||||
}()
|
||||
|
||||
fingerprint := &drivers.Fingerprint{
|
||||
@@ -277,7 +293,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
|
||||
|
||||
// Only enable if we are root
|
||||
if syscall.Geteuid() != 0 {
|
||||
if !d.hasFingerprinted {
|
||||
if !d.fingerprinted() {
|
||||
d.logger.Debug("must run as root user, disabling")
|
||||
}
|
||||
fingerprint.Health = drivers.HealthStateUndetected
|
||||
@@ -307,7 +323,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
|
||||
// Do not allow ancient rkt versions
|
||||
fingerprint.Health = drivers.HealthStateUndetected
|
||||
fingerprint.HealthDescription = fmt.Sprintf("Unsuported rkt version %s", currentVersion)
|
||||
if !d.hasFingerprinted {
|
||||
if !d.fingerprinted() {
|
||||
d.logger.Warn("unsupported rkt version please upgrade to >= "+minVersion.String(),
|
||||
"rkt_version", currentVersion)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user