cleanup driver eventor goroutines

This fixes few cases where driver eventor goroutines are leaked during
normal operations, but especially so in tests.

This change makes few modifications:

First, it switches drivers to use `Context`s to manage shutdown events.
Previously, it relied on callers invoking `.Shutdown()` function that is
specific to internal drivers only and require casting.  Using `Contexts`
provide a consistent idiomatic way to manage lifecycle for both internal
and external drivers.

Also, I discovered few places where we don't clean up a temporary driver
instance in the plugin catalog code, where we dispense a driver to
inspect and validate the schema config without properly cleaning it up.
This commit is contained in:
Mahmood Ali
2020-05-26 09:44:26 -04:00
parent 29eca948ad
commit d6c75e301e
24 changed files with 207 additions and 158 deletions

View File

@@ -248,10 +248,6 @@ func (i *instanceManager) cleanup() {
return
}
if internalPlugin, ok := i.plugin.Plugin().(drivers.InternalDriverPlugin); ok {
internalPlugin.Shutdown()
}
if !i.plugin.Exited() {
i.plugin.Kill()
if err := i.storeReattach(nil); err != nil {

View File

@@ -1,6 +1,8 @@
package main
import (
"context"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/devices/gpu/nvidia"
@@ -9,10 +11,10 @@ import (
func main() {
// Serve the plugin
plugins.Serve(factory)
plugins.ServeCtx(factory)
}
// factory returns a new instance of the Nvidia GPU plugin
func factory(log log.Logger) interface{} {
return nvidia.NewNvidiaDevice(log)
func factory(ctx context.Context, log log.Logger) interface{} {
return nvidia.NewNvidiaDevice(ctx, log)
}

View File

@@ -46,7 +46,7 @@ var (
// PluginConfig is the nvidia factory function registered in the
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Factory: func(l log.Logger) interface{} { return NewNvidiaDevice(l) },
Factory: func(ctx context.Context, l log.Logger) interface{} { return NewNvidiaDevice(ctx, l) },
}
// pluginInfo describes the plugin
@@ -99,7 +99,7 @@ type NvidiaDevice struct {
}
// NewNvidiaDevice returns a new nvidia device plugin.
func NewNvidiaDevice(log log.Logger) *NvidiaDevice {
func NewNvidiaDevice(_ context.Context, log log.Logger) *NvidiaDevice {
nvmlClient, err := nvml.NewNvmlClient()
logger := log.Named(pluginName)
if err != nil && err.Error() != nvml.UnavailableLib.Error() {

View File

@@ -6,6 +6,7 @@
package main
import (
"context"
"os"
log "github.com/hashicorp/go-hclog"
@@ -42,10 +43,10 @@ func main() {
}
// Serve the plugin
plugins.Serve(factory)
plugins.ServeCtx(factory)
}
// factory returns a new instance of the docker driver plugin
func factory(log log.Logger) interface{} {
return docker.NewDockerDriver(log)
func factory(ctx context.Context, log log.Logger) interface{} {
return docker.NewDockerDriver(ctx, log)
}

View File

@@ -1,6 +1,7 @@
package docker
import (
"context"
"fmt"
"strconv"
"strings"
@@ -123,7 +124,7 @@ var (
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewDockerDriver(l) },
Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewDockerDriver(ctx, l) },
}
// pluginInfo is the response returned for the PluginInfo RPC

View File

@@ -90,10 +90,6 @@ type Driver struct {
// coordinate shutdown
ctx context.Context
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// tasks is the in memory datastore mapping taskIDs to taskHandles
tasks *taskStore
@@ -120,16 +116,14 @@ type Driver struct {
}
// NewDockerDriver returns a docker implementation of a driver plugin
func NewDockerDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
func NewDockerDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &DriverConfig{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
eventer: eventer.NewEventer(ctx, logger),
config: &DriverConfig{},
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
}
}
@@ -1622,10 +1616,6 @@ func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) {
return ulimits, nil
}
func (d *Driver) Shutdown() {
d.signalShutdown()
}
func isDockerTransientError(err error) bool {
if err == nil {
return false

View File

@@ -175,7 +175,9 @@ func cleanSlate(client *docker.Client, imageID string) {
// A driver plugin interface and cleanup function is returned
func dockerDriverHarness(t *testing.T, cfg map[string]interface{}) *dtestutil.DriverHarness {
logger := testlog.HCLogger(t)
harness := dtestutil.NewDriverHarness(t, NewDockerDriver(logger))
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() { cancel() })
harness := dtestutil.NewDriverHarness(t, NewDockerDriver(ctx, logger))
if cfg == nil {
cfg = map[string]interface{}{
"gc": map[string]interface{}{
@@ -190,7 +192,7 @@ func dockerDriverHarness(t *testing.T, cfg map[string]interface{}) *dtestutil.Dr
InternalPlugins: map[loader.PluginID]*loader.InternalPluginConfig{
PluginID: {
Config: cfg,
Factory: func(hclog.Logger) interface{} {
Factory: func(context.Context, hclog.Logger) interface{} {
return harness
},
},

View File

@@ -1,6 +1,7 @@
package docker
import (
"context"
"testing"
"github.com/hashicorp/nomad/client/testutil"
@@ -20,7 +21,10 @@ func TestDockerDriver_FingerprintHealth(t *testing.T) {
}
testutil.DockerCompatible(t)
d := NewDockerDriver(testlog.HCLogger(t)).(*Driver)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewDockerDriver(ctx, testlog.HCLogger(t)).(*Driver)
fp := d.buildFingerprint()
require.Equal(t, drivers.HealthStateHealthy, fp.Health)

View File

@@ -47,7 +47,7 @@ var (
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewExecDriver(l) },
Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewExecDriver(ctx, l) },
}
// pluginInfo is the response returned for the PluginInfo RPC
@@ -107,10 +107,6 @@ type Driver struct {
// coordinate shutdown
ctx context.Context
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the Nomad agent
logger hclog.Logger
@@ -144,15 +140,13 @@ type TaskState struct {
}
// NewExecDriver returns a new DrivePlugin implementation
func NewExecDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
func NewExecDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
}
}
@@ -201,10 +195,6 @@ func (d *Driver) SetConfig(cfg *base.Config) error {
return nil
}
func (d *Driver) Shutdown() {
d.signalShutdown()
}
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}

View File

@@ -59,7 +59,10 @@ func TestExecDriver_Fingerprint_NonLinux(t *testing.T) {
t.Skip("Test only available not on Linux")
}
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
fingerCh, err := harness.Fingerprint(context.Background())
@@ -78,7 +81,10 @@ func TestExecDriver_Fingerprint(t *testing.T) {
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
fingerCh, err := harness.Fingerprint(context.Background())
@@ -97,7 +103,10 @@ func TestExecDriver_StartWait(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
@@ -129,7 +138,10 @@ func TestExecDriver_StartWaitStopKill(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
@@ -190,7 +202,10 @@ func TestExecDriver_StartWaitRecover(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
dctx, dcancel := context.WithCancel(context.Background())
defer dcancel()
d := NewExecDriver(dctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
@@ -262,7 +277,10 @@ func TestExecDriver_DestroyKillsAll(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()
@@ -360,7 +378,10 @@ func TestExecDriver_Stats(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
dctx, dcancel := context.WithCancel(context.Background())
defer dcancel()
d := NewExecDriver(dctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
@@ -403,7 +424,10 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
@@ -452,7 +476,10 @@ func TestExecDriver_User(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
@@ -486,7 +513,10 @@ func TestExecDriver_HandlerExec(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
@@ -574,7 +604,10 @@ func TestExecDriver_DevicesAndMounts(t *testing.T) {
err = ioutil.WriteFile(filepath.Join(tmpDir, "testfile"), []byte("from-host"), 600)
require.NoError(err)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
@@ -678,7 +711,10 @@ func TestExecDriver_NoPivotRoot(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
config := &Config{NoPivotRoot: true}

View File

@@ -23,7 +23,10 @@ func TestExecDriver_StartWaitStop(t *testing.T) {
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
@@ -82,7 +85,10 @@ func TestExec_ExecTaskStreaming(t *testing.T) {
t.Parallel()
require := require.New(t)
d := NewExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

View File

@@ -50,7 +50,7 @@ var (
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewDriver(l) },
Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewDriver(ctx, l) },
}
// pluginInfo is the response returned for the PluginInfo RPC
@@ -135,23 +135,17 @@ type Driver struct {
// nomadConf is the client agent's configuration
nomadConfig *base.ClientDriverConfig
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the Nomad agent
logger hclog.Logger
}
func NewDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
func NewDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
}
}
@@ -589,7 +583,3 @@ func GetAbsolutePath(bin string) (string, error) {
return filepath.EvalSymlinks(lp)
}
func (d *Driver) Shutdown() {
d.signalShutdown()
}

View File

@@ -38,7 +38,10 @@ func TestJavaDriver_Fingerprint(t *testing.T) {
t.Parallel()
}
d := NewDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
fpCh, err := harness.Fingerprint(context.Background())
@@ -61,7 +64,10 @@ func TestJavaDriver_Jar_Start_Wait(t *testing.T) {
}
require := require.New(t)
d := NewDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
tc := &TaskConfig{
@@ -101,7 +107,10 @@ func TestJavaDriver_Jar_Stop_Wait(t *testing.T) {
}
require := require.New(t)
d := NewDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
tc := &TaskConfig{
@@ -162,7 +171,10 @@ func TestJavaDriver_Class_Start_Wait(t *testing.T) {
}
require := require.New(t)
d := NewDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
tc := &TaskConfig{
@@ -250,7 +262,10 @@ func TestJavaDriver_ExecTaskStreaming(t *testing.T) {
}
require := require.New(t)
d := NewDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

View File

@@ -45,7 +45,7 @@ var (
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewMockDriver(l) },
Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewMockDriver(ctx, l) },
}
// pluginInfo is the response returned for the PluginInfo RPC
@@ -129,10 +129,6 @@ type Driver struct {
// coordinate shutdown
ctx context.Context
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
shutdownFingerprintTime time.Time
// lastDriverTaskConfig is the last *drivers.TaskConfig passed to StartTask
@@ -149,8 +145,7 @@ type Driver struct {
}
// NewMockDriver returns a new DriverPlugin implementation
func NewMockDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
func NewMockDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
capabilities := &drivers.Capabilities{
@@ -161,13 +156,12 @@ func NewMockDriver(logger hclog.Logger) drivers.DriverPlugin {
}
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
capabilities: capabilities,
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
eventer: eventer.NewEventer(ctx, logger),
capabilities: capabilities,
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
}
}
@@ -676,10 +670,6 @@ func (d *Driver) GetHandle(taskID string) *taskHandle {
return h
}
func (d *Driver) Shutdown() {
d.signalShutdown()
}
func (d *Driver) CreateNetwork(allocID string) (*drivers.NetworkIsolationSpec, error) {
return nil, nil
}

View File

@@ -61,7 +61,7 @@ var (
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewQemuDriver(l) },
Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewQemuDriver(ctx, l) },
}
versionRegex = regexp.MustCompile(`version (\d[\.\d+]+)`)
@@ -142,23 +142,17 @@ type Driver struct {
// nomadConf is the client agent's configuration
nomadConfig *base.ClientDriverConfig
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the Nomad agent
logger hclog.Logger
}
func NewQemuDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
func NewQemuDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
}
}
@@ -653,7 +647,3 @@ func sendQemuShutdown(logger hclog.Logger, monitorPath string, userPid int) erro
}
return err
}
func (d *Driver) Shutdown() {
d.signalShutdown()
}

View File

@@ -32,7 +32,10 @@ func TestQemuDriver_Start_Wait_Stop(t *testing.T) {
}
require := require.New(t)
d := NewQemuDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewQemuDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
@@ -94,7 +97,10 @@ func TestQemuDriver_GetMonitorPathOldQemu(t *testing.T) {
}
require := require.New(t)
d := NewQemuDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewQemuDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
@@ -149,7 +155,10 @@ func TestQemuDriver_GetMonitorPathNewQemu(t *testing.T) {
}
require := require.New(t)
d := NewQemuDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewQemuDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
@@ -229,7 +238,10 @@ func TestQemuDriver_User(t *testing.T) {
}
require := require.New(t)
d := NewQemuDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewQemuDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
@@ -286,7 +298,10 @@ func TestQemuDriver_Stats(t *testing.T) {
}
require := require.New(t)
d := NewQemuDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewQemuDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
@@ -363,7 +378,10 @@ func TestQemuDriver_Fingerprint(t *testing.T) {
t.Parallel()
}
d := NewQemuDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewQemuDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
fingerCh, err := harness.Fingerprint(context.Background())

View File

@@ -45,7 +45,7 @@ var (
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewRawExecDriver(l) },
Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewRawExecDriver(ctx, l) },
}
errDisabledDriver = fmt.Errorf("raw_exec is disabled")
@@ -126,10 +126,6 @@ type Driver struct {
// coordinate shutdown
ctx context.Context
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the Nomad agent
logger hclog.Logger
}
@@ -161,16 +157,14 @@ type TaskState struct {
}
// NewRawExecDriver returns a new DriverPlugin implementation
func NewRawExecDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
func NewRawExecDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
eventer: eventer.NewEventer(ctx, logger),
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
}
}
@@ -197,10 +191,6 @@ func (d *Driver) SetConfig(cfg *base.Config) error {
return nil
}
func (d *Driver) Shutdown() {
d.signalShutdown()
}
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}

View File

@@ -33,7 +33,10 @@ func TestMain(m *testing.M) {
}
func newEnabledRawExecDriver(t *testing.T) *Driver {
d := NewRawExecDriver(testlog.HCLogger(t)).(*Driver)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() { cancel() })
d := NewRawExecDriver(ctx, testlog.HCLogger(t)).(*Driver)
d.config.Enabled = true
return d
}
@@ -42,7 +45,10 @@ func TestRawExecDriver_SetConfig(t *testing.T) {
t.Parallel()
require := require.New(t)
d := NewRawExecDriver(testlog.HCLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewRawExecDriver(ctx, testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

View File

@@ -1,6 +1,7 @@
package loader
import (
"context"
"fmt"
"os"
"os/exec"
@@ -85,11 +86,14 @@ func (l *PluginLoader) init(config *PluginLoaderConfig) error {
// initInternal initializes internal plugins.
func (l *PluginLoader) initInternal(plugins map[PluginID]*InternalPluginConfig, configs map[string]*config.PluginConfig) (map[PluginID]*pluginInfo, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var mErr multierror.Error
fingerprinted := make(map[PluginID]*pluginInfo, len(plugins))
for k, config := range plugins {
// Create an instance
raw := config.Factory(l.logger)
raw := config.Factory(ctx, l.logger)
base, ok := raw.(base.BasePlugin)
if !ok {
multierror.Append(&mErr, fmt.Errorf("internal plugin %s doesn't meet base plugin interface", k))

View File

@@ -31,10 +31,12 @@ type PluginInstance interface {
type internalPluginInstance struct {
instance interface{}
apiVersion string
killFn func()
}
func (p *internalPluginInstance) Internal() bool { return true }
func (p *internalPluginInstance) Kill() {}
func (p *internalPluginInstance) Internal() bool { return true }
func (p *internalPluginInstance) Kill() { p.killFn() }
func (p *internalPluginInstance) ReattachConfig() (*plugin.ReattachConfig, bool) { return nil, false }
func (p *internalPluginInstance) Plugin() interface{} { return p.instance }
func (p *internalPluginInstance) Exited() bool { return false }

View File

@@ -1,6 +1,7 @@
package loader
import (
"context"
"fmt"
"os/exec"
@@ -31,7 +32,7 @@ type PluginCatalog interface {
// InternalPluginConfig is used to configure launching an internal plugin.
type InternalPluginConfig struct {
Config map[string]interface{}
Factory plugins.PluginFactory
Factory plugins.PluginCtxFactory
}
// PluginID is a tuple identifying a plugin
@@ -92,7 +93,7 @@ type PluginLoader struct {
// pluginInfo captures the necessary information to launch and configure a
// plugin.
type pluginInfo struct {
factory plugins.PluginFactory
factory plugins.PluginCtxFactory
exePath string
args []string
@@ -153,9 +154,11 @@ func (l *PluginLoader) Dispense(name, pluginType string, config *base.AgentConfi
// If the plugin is internal, launch via the factory
var instance PluginInstance
if pinfo.factory != nil {
ctx, cancel := context.WithCancel(context.Background())
instance = &internalPluginInstance{
instance: pinfo.factory(logger),
instance: pinfo.factory(ctx, logger),
apiVersion: pinfo.apiVersion,
killFn: cancel,
}
} else {
var err error

View File

@@ -93,8 +93,8 @@ func pluginMain(name, pluginType, version string, apiVersions []string, config b
// mockFactory returns a PluginFactory method which creates the mock plugin with
// the passed parameters
func mockFactory(name, ptype, version string, apiVersions []string, configSchema bool) func(log log.Logger) interface{} {
return func(log log.Logger) interface{} {
func mockFactory(name, ptype, version string, apiVersions []string, configSchema bool) func(context.Context, log.Logger) interface{} {
return func(ctx context.Context, log log.Logger) interface{} {
return &mockPlugin{
name: name,
ptype: ptype,

View File

@@ -87,14 +87,6 @@ type DriverNetworkManager interface {
DestroyNetwork(allocID string, spec *NetworkIsolationSpec) error
}
// InternalDriverPlugin is an interface that exposes functions that are only
// implemented by internal driver plugins.
type InternalDriverPlugin interface {
// Shutdown allows the plugin to cleanup any running state to avoid leaking
// resources. It should not block.
Shutdown()
}
// DriverSignalTaskNotSupported can be embedded by drivers which don't support
// the SignalTask RPC. This satisfies the SignalTask func requirement for the
// DriverPlugin interface.

View File

@@ -1,6 +1,7 @@
package plugins
import (
"context"
"fmt"
log "github.com/hashicorp/go-hclog"
@@ -11,6 +12,9 @@ import (
// PluginFactory returns a new plugin instance
type PluginFactory func(log log.Logger) interface{}
// PluginFactory returns a new plugin instance, that takes in a context
type PluginCtxFactory func(ctx context.Context, log log.Logger) interface{}
// Serve is used to serve a new Nomad plugin
func Serve(f PluginFactory) {
logger := log.New(&log.LoggerOptions{
@@ -19,6 +23,23 @@ func Serve(f PluginFactory) {
})
plugin := f(logger)
serve(plugin, logger)
}
// Serve is used to serve a new Nomad plugin
func ServeCtx(f PluginCtxFactory) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.New(&log.LoggerOptions{
Level: log.Trace,
JSONFormat: true,
})
plugin := f(ctx, logger)
serve(plugin, logger)
}
func serve(plugin interface{}, logger log.Logger) {
switch p := plugin.(type) {
case device.DevicePlugin:
device.Serve(p, logger)