mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 19:05:42 +03:00
client/drivermananger: add driver manager
The driver manager is modeled after the device manager and is started by the client. It's responsible for handling driver lifecycle and reattachment state, as well as processing the incomming fingerprint and task events from each driver. The mananger exposes a method for registering event handlers for task events that is used by the task runner to update the server when a task has been updated with an event. Since driver fingerprinting has been implemented by the driver manager, it is no longer needed in the fingerprint mananger and has been removed.
This commit is contained in:
@@ -17,13 +17,13 @@ import (
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
cstate "github.com/hashicorp/nomad/client/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/device"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
)
|
||||
|
||||
// allocRunner is used to run all the tasks in a given allocation
|
||||
@@ -131,13 +131,13 @@ type allocRunner struct {
|
||||
// prevAllocMigrator allows the migration of a previous allocations alloc dir.
|
||||
prevAllocMigrator allocwatcher.PrevAllocMigrator
|
||||
|
||||
// pluginSingletonLoader is a plugin loader that will returns singleton
|
||||
// instances of the plugins.
|
||||
pluginSingletonLoader loader.PluginCatalog
|
||||
|
||||
// devicemanager is used to mount devices as well as lookup device
|
||||
// statistics
|
||||
devicemanager devicemanager.Manager
|
||||
|
||||
// driverManager is responsible for dispensing driver plugins and registering
|
||||
// event handlers
|
||||
driverManager drivermanager.Manager
|
||||
}
|
||||
|
||||
// NewAllocRunner returns a new allocation runner.
|
||||
@@ -167,8 +167,8 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
|
||||
deviceStatsReporter: config.DeviceStatsReporter,
|
||||
prevAllocWatcher: config.PrevAllocWatcher,
|
||||
prevAllocMigrator: config.PrevAllocMigrator,
|
||||
pluginSingletonLoader: config.PluginSingletonLoader,
|
||||
devicemanager: config.DeviceManager,
|
||||
driverManager: config.DriverManager,
|
||||
}
|
||||
|
||||
// Create the logger based on the allocation ID
|
||||
@@ -195,18 +195,18 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
|
||||
func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
|
||||
for _, task := range tasks {
|
||||
config := &taskrunner.Config{
|
||||
Alloc: ar.alloc,
|
||||
ClientConfig: ar.clientConfig,
|
||||
Task: task,
|
||||
TaskDir: ar.allocDir.NewTaskDir(task.Name),
|
||||
Logger: ar.logger,
|
||||
StateDB: ar.stateDB,
|
||||
StateUpdater: ar,
|
||||
Consul: ar.consulClient,
|
||||
Vault: ar.vaultClient,
|
||||
PluginSingletonLoader: ar.pluginSingletonLoader,
|
||||
DeviceStatsReporter: ar.deviceStatsReporter,
|
||||
DeviceManager: ar.devicemanager,
|
||||
Alloc: ar.alloc,
|
||||
ClientConfig: ar.clientConfig,
|
||||
Task: task,
|
||||
TaskDir: ar.allocDir.NewTaskDir(task.Name),
|
||||
Logger: ar.logger,
|
||||
StateDB: ar.stateDB,
|
||||
StateUpdater: ar,
|
||||
Consul: ar.consulClient,
|
||||
Vault: ar.vaultClient,
|
||||
DeviceStatsReporter: ar.deviceStatsReporter,
|
||||
DeviceManager: ar.devicemanager,
|
||||
DriverManager: ar.driverManager,
|
||||
}
|
||||
|
||||
// Create, but do not Run, the task runner
|
||||
|
||||
@@ -7,10 +7,10 @@ import (
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
"github.com/hashicorp/nomad/client/interfaces"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
cstate "github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
)
|
||||
|
||||
// Config holds the configuration for creating an allocation runner.
|
||||
@@ -45,14 +45,10 @@ type Config struct {
|
||||
// PrevAllocMigrator allows the migration of a previous allocations alloc dir
|
||||
PrevAllocMigrator allocwatcher.PrevAllocMigrator
|
||||
|
||||
// PluginLoader is used to load plugins.
|
||||
PluginLoader loader.PluginCatalog
|
||||
|
||||
// PluginSingletonLoader is a plugin loader that will returns singleton
|
||||
// instances of the plugins.
|
||||
PluginSingletonLoader loader.PluginCatalog
|
||||
|
||||
// DeviceManager is used to mount devices as well as lookup device
|
||||
// statistics
|
||||
DeviceManager devicemanager.Manager
|
||||
|
||||
// DriverManager handles dispensing of driver plugins
|
||||
DriverManager drivermanager.Manager
|
||||
}
|
||||
|
||||
@@ -21,17 +21,16 @@ import (
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
cstate "github.com/hashicorp/nomad/client/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/hashicorp/nomad/plugins/shared"
|
||||
"github.com/hashicorp/nomad/plugins/shared/hclspec"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -174,13 +173,13 @@ type TaskRunner struct {
|
||||
// deviceStatsReporter is used to lookup resource usage for alloc devices
|
||||
deviceStatsReporter cinterfaces.DeviceStatsReporter
|
||||
|
||||
// PluginSingletonLoader is a plugin loader that will returns singleton
|
||||
// instances of the plugins.
|
||||
pluginSingletonLoader loader.PluginCatalog
|
||||
|
||||
// devicemanager is used to mount devices as well as lookup device
|
||||
// statistics
|
||||
devicemanager devicemanager.Manager
|
||||
|
||||
// driverManager is used to dispense driver plugins and register event
|
||||
// handlers
|
||||
driverManager drivermanager.Manager
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@@ -203,13 +202,13 @@ type Config struct {
|
||||
// deviceStatsReporter is used to lookup resource usage for alloc devices
|
||||
DeviceStatsReporter cinterfaces.DeviceStatsReporter
|
||||
|
||||
// PluginSingletonLoader is a plugin loader that will returns singleton
|
||||
// instances of the plugins.
|
||||
PluginSingletonLoader loader.PluginCatalog
|
||||
|
||||
// DeviceManager is used to mount devices as well as lookup device
|
||||
// statistics
|
||||
DeviceManager devicemanager.Manager
|
||||
|
||||
// DriverManager is used to dispense driver plugins and register event
|
||||
// handlers
|
||||
DriverManager drivermanager.Manager
|
||||
}
|
||||
|
||||
func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
||||
@@ -234,29 +233,29 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
||||
}
|
||||
|
||||
tr := &TaskRunner{
|
||||
alloc: config.Alloc,
|
||||
allocID: config.Alloc.ID,
|
||||
clientConfig: config.ClientConfig,
|
||||
task: config.Task,
|
||||
taskDir: config.TaskDir,
|
||||
taskName: config.Task.Name,
|
||||
taskLeader: config.Task.Leader,
|
||||
envBuilder: envBuilder,
|
||||
consulClient: config.Consul,
|
||||
vaultClient: config.Vault,
|
||||
state: tstate,
|
||||
localState: state.NewLocalState(),
|
||||
stateDB: config.StateDB,
|
||||
stateUpdater: config.StateUpdater,
|
||||
deviceStatsReporter: config.DeviceStatsReporter,
|
||||
killCtx: killCtx,
|
||||
killCtxCancel: killCancel,
|
||||
ctx: trCtx,
|
||||
ctxCancel: trCancel,
|
||||
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
|
||||
waitCh: make(chan struct{}),
|
||||
pluginSingletonLoader: config.PluginSingletonLoader,
|
||||
devicemanager: config.DeviceManager,
|
||||
alloc: config.Alloc,
|
||||
allocID: config.Alloc.ID,
|
||||
clientConfig: config.ClientConfig,
|
||||
task: config.Task,
|
||||
taskDir: config.TaskDir,
|
||||
taskName: config.Task.Name,
|
||||
taskLeader: config.Task.Leader,
|
||||
envBuilder: envBuilder,
|
||||
consulClient: config.Consul,
|
||||
vaultClient: config.Vault,
|
||||
state: tstate,
|
||||
localState: state.NewLocalState(),
|
||||
stateDB: config.StateDB,
|
||||
stateUpdater: config.StateUpdater,
|
||||
deviceStatsReporter: config.DeviceStatsReporter,
|
||||
killCtx: killCtx,
|
||||
killCtxCancel: killCancel,
|
||||
ctx: trCtx,
|
||||
ctxCancel: trCancel,
|
||||
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
|
||||
waitCh: make(chan struct{}),
|
||||
devicemanager: config.DeviceManager,
|
||||
driverManager: config.DriverManager,
|
||||
}
|
||||
|
||||
// Create the logger based on the allocation ID
|
||||
@@ -568,6 +567,9 @@ func (tr *TaskRunner) runDriver() error {
|
||||
|
||||
//XXX Evaluate and encode driver config
|
||||
|
||||
// Register an event handler with the diver mananger to emit task events
|
||||
tr.driverManager.RegisterEventHandler(tr.Task().Driver, taskConfig.ID, tr.handleTaskEvent)
|
||||
|
||||
// If there's already a task handle (eg from a Restore) there's nothing
|
||||
// to do except update state.
|
||||
if tr.getDriverHandle() != nil {
|
||||
@@ -606,6 +608,15 @@ func (tr *TaskRunner) runDriver() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tr *TaskRunner) handleTaskEvent(ev *drivers.TaskEvent) {
|
||||
tr.EmitEvent(&structs.TaskEvent{
|
||||
Type: structs.TaskDriverMessage,
|
||||
Time: ev.Timestamp.UnixNano(),
|
||||
Details: ev.Annotations,
|
||||
DriverMessage: ev.Message,
|
||||
})
|
||||
}
|
||||
|
||||
// initDriver creates the driver for the task
|
||||
/*func (tr *TaskRunner) initDriver() error {
|
||||
// Create a task-specific event emitter callback to expose minimal
|
||||
@@ -639,17 +650,10 @@ func (tr *TaskRunner) runDriver() error {
|
||||
|
||||
// initDriver retrives the DriverPlugin from the plugin loader for this task
|
||||
func (tr *TaskRunner) initDriver() error {
|
||||
plugin, err := tr.pluginSingletonLoader.Dispense(tr.Task().Driver, base.PluginTypeDriver, tr.clientConfig.NomadPluginConfig(), tr.logger)
|
||||
driver, err := tr.driverManager.Dispense(tr.Task().Driver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// XXX need to be able to reattach to running drivers
|
||||
driver, ok := plugin.Plugin().(drivers.DriverPlugin)
|
||||
if !ok {
|
||||
return fmt.Errorf("plugin loaded for driver %s does not implement DriverPlugin interface", tr.task.Driver)
|
||||
}
|
||||
|
||||
tr.driver = driver
|
||||
|
||||
schema, err := tr.driver.TaskConfigSchema()
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulapi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
cstate "github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
mockdriver "github.com/hashicorp/nomad/drivers/mock"
|
||||
@@ -19,8 +20,6 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/device"
|
||||
"github.com/hashicorp/nomad/plugins/shared/catalog"
|
||||
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -47,7 +46,6 @@ func (m *MockTaskStateUpdater) TaskStateUpdated() {
|
||||
// plus a cleanup func.
|
||||
func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName string) (*Config, func()) {
|
||||
logger := testlog.HCLogger(t)
|
||||
pluginLoader := catalog.TestPluginLoader(t)
|
||||
clientConf, cleanup := config.TestClientConfig(t)
|
||||
|
||||
// Find the task
|
||||
@@ -85,17 +83,17 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
|
||||
}
|
||||
|
||||
conf := &Config{
|
||||
Alloc: alloc,
|
||||
ClientConfig: clientConf,
|
||||
Consul: consulapi.NewMockConsulServiceClient(t, logger),
|
||||
Task: thisTask,
|
||||
TaskDir: taskDir,
|
||||
Logger: clientConf.Logger,
|
||||
Vault: vaultclient.NewMockVaultClient(),
|
||||
StateDB: cstate.NoopDB{},
|
||||
StateUpdater: NewMockTaskStateUpdater(),
|
||||
PluginSingletonLoader: singleton.NewSingletonLoader(logger, pluginLoader),
|
||||
DeviceManager: devicemanager.NoopMockManager(),
|
||||
Alloc: alloc,
|
||||
ClientConfig: clientConf,
|
||||
Consul: consulapi.NewMockConsulServiceClient(t, logger),
|
||||
Task: thisTask,
|
||||
TaskDir: taskDir,
|
||||
Logger: clientConf.Logger,
|
||||
Vault: vaultclient.NewMockVaultClient(),
|
||||
StateDB: cstate.NoopDB{},
|
||||
StateUpdater: NewMockTaskStateUpdater(),
|
||||
DeviceManager: devicemanager.NoopMockManager(),
|
||||
DriverManager: drivermanager.TestDriverManager(t),
|
||||
}
|
||||
return conf, trCleanup
|
||||
}
|
||||
@@ -196,14 +194,9 @@ func TestTaskRunner_TaskEnv(t *testing.T) {
|
||||
}
|
||||
|
||||
// Get the mock driver plugin
|
||||
driverPlugin, err := conf.PluginSingletonLoader.Dispense(
|
||||
mockdriver.PluginID.Name,
|
||||
mockdriver.PluginID.PluginType,
|
||||
nil,
|
||||
conf.Logger,
|
||||
)
|
||||
driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name)
|
||||
require.NoError(err)
|
||||
mockDriver := driverPlugin.Plugin().(*mockdriver.Driver)
|
||||
mockDriver := driverPlugin.(*mockdriver.Driver)
|
||||
|
||||
// Assert its config has been properly interpolated
|
||||
driverCfg, mockCfg := mockDriver.GetTaskConfig()
|
||||
|
||||
@@ -8,11 +8,10 @@ import (
|
||||
clientconfig "github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/shared/catalog"
|
||||
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -51,21 +50,20 @@ func (m *MockStateUpdater) Reset() {
|
||||
}
|
||||
|
||||
func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) {
|
||||
pluginLoader := catalog.TestPluginLoader(t)
|
||||
clientConf, cleanup := clientconfig.TestClientConfig(t)
|
||||
conf := &Config{
|
||||
// Copy the alloc in case the caller edits and reuses it
|
||||
Alloc: alloc.Copy(),
|
||||
Logger: clientConf.Logger,
|
||||
ClientConfig: clientConf,
|
||||
StateDB: state.NoopDB{},
|
||||
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
|
||||
Vault: vaultclient.NewMockVaultClient(),
|
||||
StateUpdater: &MockStateUpdater{},
|
||||
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
|
||||
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
|
||||
PluginSingletonLoader: singleton.NewSingletonLoader(clientConf.Logger, pluginLoader),
|
||||
DeviceManager: devicemanager.NoopMockManager(),
|
||||
Alloc: alloc.Copy(),
|
||||
Logger: clientConf.Logger,
|
||||
ClientConfig: clientConf,
|
||||
StateDB: state.NoopDB{},
|
||||
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
|
||||
Vault: vaultclient.NewMockVaultClient(),
|
||||
StateUpdater: &MockStateUpdater{},
|
||||
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
|
||||
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
|
||||
DeviceManager: devicemanager.NoopMockManager(),
|
||||
DriverManager: drivermanager.TestDriverManager(t),
|
||||
}
|
||||
return conf, cleanup
|
||||
}
|
||||
|
||||
100
client/client.go
100
client/client.go
@@ -28,6 +28,8 @@ import (
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
"github.com/hashicorp/nomad/client/servers"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/stats"
|
||||
@@ -219,9 +221,15 @@ type Client struct {
|
||||
endpoints rpcEndpoints
|
||||
streamingRpcs *structs.StreamingRpcRegistry
|
||||
|
||||
// pluginManagers is the set of PluginManagers registered by the client
|
||||
pluginManagers *pluginmanager.PluginGroup
|
||||
|
||||
// devicemanger is responsible for managing device plugins.
|
||||
devicemanager devicemanager.Manager
|
||||
|
||||
// drivermanager is responsible for managing driver plugins
|
||||
drivermanager drivermanager.Manager
|
||||
|
||||
// baseLabels are used when emitting tagged metrics. All client metrics will
|
||||
// have these tags, and optionally more.
|
||||
baseLabels []metrics.Label
|
||||
@@ -298,15 +306,35 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
c.configCopy = c.config.Copy()
|
||||
c.configLock.Unlock()
|
||||
|
||||
fingerprintManager := NewFingerprintManager(c.configCopy.PluginSingletonLoader, c.GetConfig, c.configCopy.Node,
|
||||
c.shutdownCh, c.updateNodeFromFingerprint, c.updateNodeFromDriver,
|
||||
c.logger)
|
||||
fingerprintManager := NewFingerprintManager(
|
||||
c.configCopy.PluginSingletonLoader, c.GetConfig, c.configCopy.Node,
|
||||
c.shutdownCh, c.updateNodeFromFingerprint, c.logger)
|
||||
|
||||
c.pluginManagers = pluginmanager.New(c.logger)
|
||||
|
||||
// Fingerprint the node and scan for drivers
|
||||
if err := fingerprintManager.Run(); err != nil {
|
||||
return nil, fmt.Errorf("fingerprinting failed: %v", err)
|
||||
}
|
||||
|
||||
// Build the white/blacklists of drivers.
|
||||
allowlistDrivers := cfg.ReadStringListToMap("driver.whitelist")
|
||||
blocklistDrivers := cfg.ReadStringListToMap("driver.blacklist")
|
||||
|
||||
// Setup the driver manager
|
||||
driverConfig := &drivermanager.Config{
|
||||
Logger: c.logger,
|
||||
Loader: c.configCopy.PluginSingletonLoader,
|
||||
PluginConfig: c.configCopy.NomadPluginConfig(),
|
||||
Updater: c.updateNodeFromDriver,
|
||||
State: c.stateDB,
|
||||
AllowedDrivers: allowlistDrivers,
|
||||
BlockedDrivers: blocklistDrivers,
|
||||
}
|
||||
drvManager := drivermanager.New(driverConfig)
|
||||
c.drivermanager = drvManager
|
||||
c.pluginManagers.RegisterAndRun(drvManager)
|
||||
|
||||
// Setup the device manager
|
||||
devConfig := &devicemanager.Config{
|
||||
Logger: c.logger,
|
||||
@@ -316,8 +344,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
StatsInterval: c.configCopy.StatsCollectionInterval,
|
||||
State: c.stateDB,
|
||||
}
|
||||
c.devicemanager = devicemanager.New(devConfig)
|
||||
go c.devicemanager.Run()
|
||||
devManager := devicemanager.New(devConfig)
|
||||
c.devicemanager = devManager
|
||||
c.pluginManagers.RegisterAndRun(devManager)
|
||||
|
||||
// Add the stats collector
|
||||
statsCollector := stats.NewHostStatsCollector(c.logger, c.config.AllocDir, c.devicemanager.AllStats)
|
||||
@@ -358,6 +387,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
return nil, fmt.Errorf("failed to setup vault client: %v", err)
|
||||
}
|
||||
|
||||
// Wait for plugin manangers to initialize
|
||||
pluginReadyCh, err := c.pluginManagers.Ready(pluginmanager.DefaultManagerReadyTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
<-pluginReadyCh
|
||||
|
||||
// Restore the state
|
||||
if err := c.restoreState(); err != nil {
|
||||
logger.Error("failed to restore state", "error", err)
|
||||
@@ -558,8 +594,8 @@ func (c *Client) Shutdown() error {
|
||||
}
|
||||
c.logger.Info("shutting down")
|
||||
|
||||
// Shutdown the device manager
|
||||
c.devicemanager.Shutdown()
|
||||
// Shutdown the plugin managers
|
||||
c.pluginManagers.Shutdown()
|
||||
|
||||
// Stop renewing tokens and secrets
|
||||
if c.vaultClient != nil {
|
||||
@@ -866,19 +902,18 @@ func (c *Client) restoreState() error {
|
||||
|
||||
c.configLock.RLock()
|
||||
arConf := &allocrunner.Config{
|
||||
Alloc: alloc,
|
||||
Logger: c.logger,
|
||||
ClientConfig: c.configCopy,
|
||||
StateDB: c.stateDB,
|
||||
StateUpdater: c,
|
||||
DeviceStatsReporter: c,
|
||||
Consul: c.consulService,
|
||||
Vault: c.vaultClient,
|
||||
PrevAllocWatcher: prevAllocWatcher,
|
||||
PrevAllocMigrator: prevAllocMigrator,
|
||||
PluginLoader: c.config.PluginLoader,
|
||||
PluginSingletonLoader: c.config.PluginSingletonLoader,
|
||||
DeviceManager: c.devicemanager,
|
||||
Alloc: alloc,
|
||||
Logger: c.logger,
|
||||
ClientConfig: c.config,
|
||||
StateDB: c.stateDB,
|
||||
StateUpdater: c,
|
||||
DeviceStatsReporter: c,
|
||||
Consul: c.consulService,
|
||||
Vault: c.vaultClient,
|
||||
PrevAllocWatcher: prevAllocWatcher,
|
||||
PrevAllocMigrator: prevAllocMigrator,
|
||||
DeviceManager: c.devicemanager,
|
||||
DriverManager: c.drivermanager,
|
||||
}
|
||||
c.configLock.RUnlock()
|
||||
|
||||
@@ -2056,19 +2091,18 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
|
||||
// we don't have to do a copy.
|
||||
c.configLock.RLock()
|
||||
arConf := &allocrunner.Config{
|
||||
Alloc: alloc,
|
||||
Logger: c.logger,
|
||||
ClientConfig: c.configCopy,
|
||||
StateDB: c.stateDB,
|
||||
Consul: c.consulService,
|
||||
Vault: c.vaultClient,
|
||||
StateUpdater: c,
|
||||
DeviceStatsReporter: c,
|
||||
PrevAllocWatcher: prevAllocWatcher,
|
||||
PrevAllocMigrator: prevAllocMigrator,
|
||||
PluginLoader: c.config.PluginLoader,
|
||||
PluginSingletonLoader: c.config.PluginSingletonLoader,
|
||||
DeviceManager: c.devicemanager,
|
||||
Alloc: alloc,
|
||||
Logger: c.logger,
|
||||
ClientConfig: c.config,
|
||||
StateDB: c.stateDB,
|
||||
Consul: c.consulService,
|
||||
Vault: c.vaultClient,
|
||||
StateUpdater: c,
|
||||
DeviceStatsReporter: c,
|
||||
PrevAllocWatcher: prevAllocWatcher,
|
||||
PrevAllocMigrator: prevAllocMigrator,
|
||||
DeviceManager: c.devicemanager,
|
||||
DriverManager: c.drivermanager,
|
||||
}
|
||||
c.configLock.RUnlock()
|
||||
|
||||
|
||||
@@ -18,14 +18,8 @@ import (
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
)
|
||||
|
||||
// Manaager is the interface used to manage device plugins
|
||||
// Manager is the interface used to manage device plugins
|
||||
type Manager interface {
|
||||
// Run starts the device manager
|
||||
Run()
|
||||
|
||||
// Shutdown shutsdown the manager and all launched plugins
|
||||
Shutdown()
|
||||
|
||||
// Reserve is used to reserve a set of devices
|
||||
Reserve(d *structs.AllocatedDeviceResource) (*device.ContainerReservation, error)
|
||||
|
||||
@@ -127,6 +121,9 @@ func New(c *Config) *manager {
|
||||
}
|
||||
}
|
||||
|
||||
// PluginType identifies this manager to the plugin manager and satisfies the PluginManager interface.
|
||||
func (*manager) PluginType() string { return base.PluginTypeDevice }
|
||||
|
||||
// Run starts thed device manager. The manager will shutdown any previously
|
||||
// launched plugin and then begin fingerprinting and stats collection on all new
|
||||
// device plugins.
|
||||
@@ -161,16 +158,6 @@ func (m *manager) Run() {
|
||||
})
|
||||
}
|
||||
|
||||
// XXX we should eventually remove this and have it be done in the client
|
||||
// Give all the fingerprinters a chance to run at least once before we
|
||||
// update the node. This prevents initial fingerprinting from causing too
|
||||
// many server side updates.
|
||||
ctx, cancel := context.WithTimeout(m.ctx, 5*time.Second)
|
||||
for _, i := range m.instances {
|
||||
i.WaitForFirstFingerprint(ctx)
|
||||
}
|
||||
cancel()
|
||||
|
||||
// Now start the fingerprint handler
|
||||
for {
|
||||
select {
|
||||
@@ -207,6 +194,19 @@ func (m *manager) Shutdown() {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *manager) Ready() <-chan struct{} {
|
||||
ret := make(chan struct{})
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(m.ctx, 5*time.Second)
|
||||
for _, i := range m.instances {
|
||||
i.WaitForFirstFingerprint(ctx)
|
||||
}
|
||||
cancel()
|
||||
close(ret)
|
||||
}()
|
||||
return ret
|
||||
}
|
||||
|
||||
// Reserve reserves the given allocated device. If the device is unknown, an
|
||||
// UnknownDeviceErr is returned.
|
||||
func (m *manager) Reserve(d *structs.AllocatedDeviceResource) (*device.ContainerReservation, error) {
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -10,20 +8,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// driverFPBackoffBaseline is the baseline time for exponential backoff while
|
||||
// fingerprinting a driver.
|
||||
driverFPBackoffBaseline = 5 * time.Second
|
||||
|
||||
// driverFPBackoffLimit is the limit of the exponential backoff for fingerprinting
|
||||
// a driver.
|
||||
driverFPBackoffLimit = 2 * time.Minute
|
||||
)
|
||||
|
||||
// FingerprintManager runs a client fingerprinters on a continuous basis, and
|
||||
@@ -39,10 +24,7 @@ type FingerprintManager struct {
|
||||
// associated node
|
||||
updateNodeAttributes func(*fingerprint.FingerprintResponse) *structs.Node
|
||||
|
||||
// updateNodeFromDriver is a callback to the client to update the state of a
|
||||
// specific driver for the node
|
||||
updateNodeFromDriver func(string, *structs.DriverInfo) *structs.Node
|
||||
logger log.Logger
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewFingerprintManager is a constructor that creates and returns an instance
|
||||
@@ -53,14 +35,12 @@ func NewFingerprintManager(
|
||||
node *structs.Node,
|
||||
shutdownCh chan struct{},
|
||||
updateNodeAttributes func(*fingerprint.FingerprintResponse) *structs.Node,
|
||||
updateNodeFromDriver func(string, *structs.DriverInfo) *structs.Node,
|
||||
logger log.Logger) *FingerprintManager {
|
||||
|
||||
return &FingerprintManager{
|
||||
singletonLoader: singletonLoader,
|
||||
getConfig: getConfig,
|
||||
updateNodeAttributes: updateNodeAttributes,
|
||||
updateNodeFromDriver: updateNodeFromDriver,
|
||||
node: node,
|
||||
shutdownCh: shutdownCh,
|
||||
logger: logger.Named("fingerprint_mgr"),
|
||||
@@ -120,39 +100,6 @@ func (fp *FingerprintManager) Run() error {
|
||||
"skipped_fingerprinters", skippedFingerprints)
|
||||
}
|
||||
|
||||
// Next, set up drivers
|
||||
// Build the white/blacklists of drivers.
|
||||
whitelistDrivers := cfg.ReadStringListToMap("driver.whitelist")
|
||||
whitelistDriversEnabled := len(whitelistDrivers) > 0
|
||||
blacklistDrivers := cfg.ReadStringListToMap("driver.blacklist")
|
||||
|
||||
var availDrivers []string
|
||||
var skippedDrivers []string
|
||||
|
||||
for _, pl := range fp.singletonLoader.Catalog()[base.PluginTypeDriver] {
|
||||
name := pl.Name
|
||||
// Skip fingerprinting drivers that are not in the whitelist if it is
|
||||
// enabled.
|
||||
if _, ok := whitelistDrivers[name]; whitelistDriversEnabled && !ok {
|
||||
skippedDrivers = append(skippedDrivers, name)
|
||||
continue
|
||||
}
|
||||
// Skip fingerprinting drivers that are in the blacklist
|
||||
if _, ok := blacklistDrivers[name]; ok {
|
||||
skippedDrivers = append(skippedDrivers, name)
|
||||
continue
|
||||
}
|
||||
|
||||
availDrivers = append(availDrivers, name)
|
||||
}
|
||||
|
||||
if err := fp.setupDrivers(availDrivers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(skippedDrivers) > 0 {
|
||||
fp.logger.Debug("drivers skipped due to white/blacklist", "skipped_drivers", skippedDrivers)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -189,38 +136,6 @@ func (fm *FingerprintManager) setupFingerprinters(fingerprints []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupDrivers is used to fingerprint the node to see if these drivers are
|
||||
// supported
|
||||
func (fm *FingerprintManager) setupDrivers(driverNames []string) error {
|
||||
//TODO(alex,hclog) Update fingerprinters to hclog
|
||||
var availDrivers []string
|
||||
for _, name := range driverNames {
|
||||
// TODO: driver reattach
|
||||
fingerCh, cancel, err := fm.dispenseDriverFingerprint(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
finger := <-fingerCh
|
||||
|
||||
// Start a periodic watcher to detect changes to a drivers health and
|
||||
// attributes.
|
||||
go fm.watchDriverFingerprint(fingerCh, name, cancel)
|
||||
|
||||
if fm.logger.IsTrace() {
|
||||
fm.logger.Trace("initial driver fingerprint", "driver", name, "fingerprint", finger)
|
||||
}
|
||||
// Log the fingerprinters which have been applied
|
||||
if finger.Health != drivers.HealthStateUndetected {
|
||||
availDrivers = append(availDrivers, name)
|
||||
}
|
||||
fm.processDriverFingerprint(finger, name)
|
||||
}
|
||||
|
||||
fm.logger.Debug("detected drivers", "drivers", availDrivers)
|
||||
return nil
|
||||
}
|
||||
|
||||
// runFingerprint runs each fingerprinter individually on an ongoing basis
|
||||
func (fm *FingerprintManager) runFingerprint(f fingerprint.Fingerprint, period time.Duration, name string) {
|
||||
fm.logger.Debug("fingerprinting periodically", "fingerprinter", name, "period", period)
|
||||
@@ -266,99 +181,3 @@ func (fm *FingerprintManager) fingerprint(name string, f fingerprint.Fingerprint
|
||||
|
||||
return response.Detected, nil
|
||||
}
|
||||
|
||||
// watchDrivers facilitates the different periods between fingerprint and
|
||||
// health checking a driver
|
||||
func (fm *FingerprintManager) watchDriverFingerprint(fpChan <-chan *drivers.Fingerprint, name string, cancel context.CancelFunc) {
|
||||
var backoff time.Duration
|
||||
var retry int
|
||||
for {
|
||||
if backoff > 0 {
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
select {
|
||||
case <-fm.shutdownCh:
|
||||
cancel()
|
||||
return
|
||||
case fp, ok := <-fpChan:
|
||||
if ok && fp.Err == nil {
|
||||
fm.processDriverFingerprint(fp, name)
|
||||
continue
|
||||
}
|
||||
// if the channel is closed attempt to open a new one
|
||||
newFpChan, newCancel, err := fm.dispenseDriverFingerprint(name)
|
||||
if err != nil {
|
||||
fm.logger.Warn("failed to fingerprint driver, retrying in 30s", "error", err, "retry", retry)
|
||||
di := &structs.DriverInfo{
|
||||
Healthy: false,
|
||||
HealthDescription: "failed to fingerprint driver",
|
||||
UpdateTime: time.Now(),
|
||||
}
|
||||
if n := fm.updateNodeFromDriver(name, di); n != nil {
|
||||
fm.setNode(n)
|
||||
}
|
||||
|
||||
// Calculate the new backoff
|
||||
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
|
||||
if backoff > driverFPBackoffLimit {
|
||||
backoff = driverFPBackoffLimit
|
||||
}
|
||||
retry++
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
fpChan = newFpChan
|
||||
cancel = newCancel
|
||||
|
||||
// Reset backoff
|
||||
backoff = 0
|
||||
retry = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processDriverFringerprint converts a Fingerprint from a driver into a DriverInfo
|
||||
// struct and updates the Node with it
|
||||
func (fm *FingerprintManager) processDriverFingerprint(fp *drivers.Fingerprint, driverName string) {
|
||||
di := &structs.DriverInfo{
|
||||
Attributes: stringify(fp.Attributes),
|
||||
Detected: fp.Health != drivers.HealthStateUndetected,
|
||||
Healthy: fp.Health == drivers.HealthStateHealthy,
|
||||
HealthDescription: fp.HealthDescription,
|
||||
UpdateTime: time.Now(),
|
||||
}
|
||||
if n := fm.updateNodeFromDriver(driverName, di); n != nil {
|
||||
fm.setNode(n)
|
||||
}
|
||||
}
|
||||
func stringify(attributes map[string]*pstructs.Attribute) map[string]string {
|
||||
ret := make(map[string]string, len(attributes))
|
||||
for key, attribute := range attributes {
|
||||
ret[key] = attribute.GoString()
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// dispenseDriverFingerprint dispenses a driver plugin for the given driver name
|
||||
// and requests a fingerprint channel. The channel and a context cancel function
|
||||
// is returned to the caller
|
||||
func (fm *FingerprintManager) dispenseDriverFingerprint(driverName string) (<-chan *drivers.Fingerprint, context.CancelFunc, error) {
|
||||
plug, err := fm.singletonLoader.Dispense(driverName, base.PluginTypeDriver, fm.getConfig().NomadPluginConfig(), fm.logger)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
driver, ok := plug.Plugin().(drivers.DriverPlugin)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("registered driver plugin %q does not implement DriverPlugin interface", driverName)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
fingerCh, err := driver.Fingerprint(ctx)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return fingerCh, cancel, nil
|
||||
}
|
||||
|
||||
@@ -1,50 +1,12 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
// registering raw_exec driver plugin used in testing
|
||||
_ "github.com/hashicorp/nomad/drivers/rawexec"
|
||||
)
|
||||
|
||||
func TestFingerprintManager_Run_MockDriver(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient, cleanup := TestClient(t, nil)
|
||||
|
||||
testClient.logger = testlog.HCLogger(t)
|
||||
defer cleanup()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testlog.HCLogger(t),
|
||||
)
|
||||
|
||||
err := fm.Run()
|
||||
require.Nil(err)
|
||||
|
||||
node := testClient.config.Node
|
||||
|
||||
require.NotNil(node.Drivers["mock_driver"])
|
||||
require.True(node.Drivers["mock_driver"].Detected)
|
||||
require.True(node.Drivers["mock_driver"].Healthy)
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
@@ -57,7 +19,6 @@ func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) {
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
@@ -71,295 +32,7 @@ func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) {
|
||||
require.NotZero(node.Resources.DiskMB)
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Fingerprint_Run(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"driver.raw_exec.enable": "true",
|
||||
}
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
err := fm.Run()
|
||||
require.Nil(err)
|
||||
|
||||
node := testClient.config.Node
|
||||
|
||||
require.NotNil(node.Drivers["raw_exec"])
|
||||
require.True(node.Drivers["raw_exec"].Detected)
|
||||
require.True(node.Drivers["raw_exec"].Healthy)
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"test.shutdown_periodic_after": "true",
|
||||
"test.shutdown_periodic_duration": "2",
|
||||
}
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
err := fm.Run()
|
||||
require.Nil(err)
|
||||
|
||||
{
|
||||
// Ensure the mock driver is registered and healthy on the client
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
fm.nodeLock.Lock()
|
||||
defer fm.nodeLock.Unlock()
|
||||
node := fm.node
|
||||
dinfo, ok := node.Drivers["mock_driver"]
|
||||
if !ok || !dinfo.Detected || !dinfo.Healthy {
|
||||
return false, fmt.Errorf("mock driver should be detected and healthy: %+v", dinfo)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
// Ensure that the client fingerprinter eventually removes this attribute and
|
||||
// marks the driver as unhealthy
|
||||
{
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
fm.nodeLock.Lock()
|
||||
defer fm.nodeLock.Unlock()
|
||||
node := fm.node
|
||||
dinfo, ok := node.Drivers["mock_driver"]
|
||||
if !ok || dinfo.Detected || dinfo.Healthy {
|
||||
return false, fmt.Errorf("mock driver should not be detected and healthy")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// This is a temporary measure to check that a driver has both attributes on a
|
||||
// node set as well as DriverInfo.
|
||||
func TestFingerprintManager_HealthCheck_Driver(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"driver.raw_exec.enable": "1",
|
||||
"test.shutdown_periodic_after": "true",
|
||||
"test.shutdown_periodic_duration": "2",
|
||||
}
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
err := fm.Run()
|
||||
require.Nil(err)
|
||||
|
||||
// Ensure the mock driver is registered and healthy on the client
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
fm.nodeLock.Lock()
|
||||
node := fm.node
|
||||
defer fm.nodeLock.Unlock()
|
||||
|
||||
mockDriverAttribute := node.Attributes["driver.mock_driver"]
|
||||
if mockDriverAttribute == "" {
|
||||
return false, fmt.Errorf("mock driver info should be set on the client attributes")
|
||||
}
|
||||
mockDriverInfo := node.Drivers["mock_driver"]
|
||||
if mockDriverInfo == nil {
|
||||
return false, fmt.Errorf("mock driver info should be set on the client")
|
||||
}
|
||||
if !mockDriverInfo.Healthy {
|
||||
return false, fmt.Errorf("mock driver info should be healthy")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Ensure that a default driver without health checks enabled is registered and healthy on the client
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
fm.nodeLock.Lock()
|
||||
node := fm.node
|
||||
defer fm.nodeLock.Unlock()
|
||||
|
||||
rawExecAttribute := node.Attributes["driver.raw_exec"]
|
||||
if rawExecAttribute == "" {
|
||||
return false, fmt.Errorf("raw exec info should be set on the client attributes")
|
||||
}
|
||||
rawExecInfo := node.Drivers["raw_exec"]
|
||||
if rawExecInfo == nil {
|
||||
return false, fmt.Errorf("raw exec driver info should be set on the client")
|
||||
}
|
||||
if !rawExecInfo.Detected {
|
||||
return false, fmt.Errorf("raw exec driver should be detected")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Ensure the mock driver is registered
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
fm.nodeLock.Lock()
|
||||
node := fm.node
|
||||
defer fm.nodeLock.Unlock()
|
||||
|
||||
mockDriverAttribute := node.Attributes["driver.mock_driver"]
|
||||
if mockDriverAttribute == "" {
|
||||
return false, fmt.Errorf("mock driver info should set on the client attributes")
|
||||
}
|
||||
mockDriverInfo := node.Drivers["mock_driver"]
|
||||
if mockDriverInfo == nil {
|
||||
return false, fmt.Errorf("mock driver info should be set on the client")
|
||||
}
|
||||
if !mockDriverInfo.Healthy {
|
||||
return false, fmt.Errorf("mock driver info should not be healthy")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Ensure that we don't duplicate health check information on the driver
|
||||
// health information
|
||||
fm.nodeLock.Lock()
|
||||
node := fm.node
|
||||
fm.nodeLock.Unlock()
|
||||
mockDriverAttributes := node.Drivers["mock_driver"].Attributes
|
||||
require.NotContains(mockDriverAttributes, "driver.mock_driver")
|
||||
}
|
||||
|
||||
func TestFingerprintManager_HealthCheck_Periodic(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"test.shutdown_periodic_after": "true",
|
||||
"test.shutdown_periodic_duration": "2",
|
||||
}
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
err := fm.Run()
|
||||
require.Nil(err)
|
||||
|
||||
{
|
||||
// Ensure the mock driver is registered and healthy on the client
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
fm.nodeLock.Lock()
|
||||
node := fm.node
|
||||
defer fm.nodeLock.Unlock()
|
||||
|
||||
mockDriverInfo := node.Drivers["mock_driver"]
|
||||
if mockDriverInfo == nil {
|
||||
return false, fmt.Errorf("mock driver info should be set on the client")
|
||||
}
|
||||
if !mockDriverInfo.Detected {
|
||||
return false, fmt.Errorf("mock driver info should be detected")
|
||||
}
|
||||
if !mockDriverInfo.Healthy {
|
||||
return false, fmt.Errorf("mock driver info should be healthy")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
{
|
||||
// Ensure that the client health check eventually removes this attribute and
|
||||
// marks the driver as unhealthy
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
fm.nodeLock.Lock()
|
||||
node := fm.node
|
||||
defer fm.nodeLock.Unlock()
|
||||
|
||||
mockDriverInfo := node.Drivers["mock_driver"]
|
||||
if mockDriverInfo == nil {
|
||||
return false, fmt.Errorf("mock driver info should be set on the client")
|
||||
}
|
||||
if !mockDriverInfo.Detected {
|
||||
return false, fmt.Errorf("mock driver info should be detected")
|
||||
}
|
||||
if !mockDriverInfo.Healthy {
|
||||
return false, fmt.Errorf("mock driver info should be healthy")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
{
|
||||
// Ensure that the client health check eventually removes this attribute and
|
||||
// marks the driver as unhealthy
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
fm.nodeLock.Lock()
|
||||
node := fm.node
|
||||
defer fm.nodeLock.Unlock()
|
||||
|
||||
mockDriverInfo := node.Drivers["mock_driver"]
|
||||
if mockDriverInfo == nil {
|
||||
return false, fmt.Errorf("mock driver info should be set on the client")
|
||||
}
|
||||
if mockDriverInfo.Detected {
|
||||
return false, fmt.Errorf("mock driver should be detected")
|
||||
}
|
||||
if mockDriverInfo.Healthy {
|
||||
return false, fmt.Errorf("mock driver should not be healthy")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFimgerprintManager_Run_InWhitelist(t *testing.T) {
|
||||
t.Skip("missing mock driver plugin implementation")
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
@@ -377,7 +50,6 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) {
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
@@ -406,7 +78,6 @@ func TestFingerprintManager_Run_InBlacklist(t *testing.T) {
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
@@ -437,7 +108,6 @@ func TestFingerprintManager_Run_Combination(t *testing.T) {
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
@@ -451,187 +121,3 @@ func TestFingerprintManager_Run_Combination(t *testing.T) {
|
||||
require.NotContains(node.Attributes, "memory.totalbytes")
|
||||
require.NotContains(node.Attributes, "os.name")
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Run_WhitelistDrivers(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
testClient, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"driver.raw_exec.enable": "1",
|
||||
"driver.whitelist": " raw_exec , foo ",
|
||||
}
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
err := fm.Run()
|
||||
require.Nil(err)
|
||||
|
||||
node := testClient.config.Node
|
||||
require.NotNil(node.Drivers["raw_exec"])
|
||||
require.NotContains(node.Drivers, "java")
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Run_AllDriversBlacklisted(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
testClient, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"driver.raw_exec.enable": "1",
|
||||
"driver.whitelist": " foo,bar,baz ",
|
||||
}
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
err := fm.Run()
|
||||
require.Nil(err)
|
||||
|
||||
node := testClient.config.Node
|
||||
|
||||
require.NotContains(node.Attributes, "driver.raw_exec")
|
||||
// TODO(nickethier): uncomment after missing driver implementations added
|
||||
//require.NotContains(node.Attributes, "driver.exec")
|
||||
//require.NotContains(node.Attributes, "driver.docker")
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
testClient, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"driver.raw_exec.enable": "1",
|
||||
"driver.whitelist": " raw_exec,exec,foo,bar,baz ",
|
||||
"driver.blacklist": " exec,foo,bar,baz ",
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testlog.HCLogger(t)
|
||||
defer cleanup()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
err := fm.Run()
|
||||
require.Nil(err)
|
||||
|
||||
node := testClient.config.Node
|
||||
|
||||
require.NotNil(node.Drivers["raw_exec"])
|
||||
require.NotContains(node.Drivers, "exec")
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Run_DriverFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
testClient, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"driver.raw_exec.enable": "1",
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testlog.HCLogger(t)
|
||||
defer cleanup()
|
||||
|
||||
singLoader := testClient.config.PluginSingletonLoader
|
||||
|
||||
dispenseCalls := 0
|
||||
loader := &loader.MockCatalog{
|
||||
DispenseF: func(name, pluginType string, cfg *base.AgentConfig, logger log.Logger) (loader.PluginInstance, error) {
|
||||
if pluginType == base.PluginTypeDriver && name == "raw_exec" {
|
||||
dispenseCalls++
|
||||
}
|
||||
return singLoader.Dispense(name, pluginType, cfg, logger)
|
||||
},
|
||||
ReattachF: singLoader.Reattach,
|
||||
CatalogF: singLoader.Catalog,
|
||||
}
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
loader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
fpChan, cancel, err := fm.dispenseDriverFingerprint("raw_exec")
|
||||
require.NoError(err)
|
||||
require.Equal(1, dispenseCalls)
|
||||
|
||||
cancel()
|
||||
go fm.watchDriverFingerprint(fpChan, "raw_exec", cancel)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if 2 != dispenseCalls {
|
||||
return false, fmt.Errorf("expected dispenseCalls to be 2 but was %d", dispenseCalls)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFingerprintManager_Run_DriversInBlacklist(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
testClient, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.Options = map[string]string{
|
||||
"driver.raw_exec.enable": "1",
|
||||
"driver.whitelist": " raw_exec,foo,bar,baz ",
|
||||
"driver.blacklist": " exec,foo,bar,baz ",
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testlog.HCLogger(t)
|
||||
defer cleanup()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
testClient.config.PluginSingletonLoader,
|
||||
testClient.GetConfig,
|
||||
testClient.config.Node,
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testClient.logger,
|
||||
)
|
||||
|
||||
err := fm.Run()
|
||||
require.Nil(err)
|
||||
|
||||
node := testClient.config.Node
|
||||
|
||||
require.NotNil(node.Drivers["raw_exec"])
|
||||
require.NotContains(node.Drivers, "exec")
|
||||
}
|
||||
|
||||
474
client/pluginmanager/drivermanager/instance.go
Normal file
474
client/pluginmanager/drivermanager/instance.go
Normal file
@@ -0,0 +1,474 @@
|
||||
package drivermanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
||||
)
|
||||
|
||||
const (
|
||||
// driverFPBackoffBaseline is the baseline time for exponential backoff while
|
||||
// fingerprinting a driver.
|
||||
driverFPBackoffBaseline = 5 * time.Second
|
||||
|
||||
// driverFPBackoffLimit is the limit of the exponential backoff for fingerprinting
|
||||
// a driver.
|
||||
driverFPBackoffLimit = 2 * time.Minute
|
||||
)
|
||||
|
||||
// instanceManagerConfig configures a driver instance manager
|
||||
type instanceManagerConfig struct {
|
||||
// Logger is the logger used by the driver instance manager
|
||||
Logger log.Logger
|
||||
|
||||
// Ctx is used to shutdown the driver instance manager
|
||||
Ctx context.Context
|
||||
|
||||
// Loader is the plugin loader
|
||||
Loader loader.PluginCatalog
|
||||
|
||||
// StoreReattach is used to store a plugins reattach config
|
||||
StoreReattach StorePluginReattachFn
|
||||
|
||||
// FetchReattach is used to retrieve a plugin's reattach config
|
||||
FetchReattach FetchPluginReattachFn
|
||||
|
||||
// PluginConfig is the config passed to the launched plugins
|
||||
PluginConfig *base.ClientAgentConfig
|
||||
|
||||
// ID is the ID of the plugin being managed
|
||||
ID *loader.PluginID
|
||||
|
||||
// updateNodeFromDriver is the callback used to update the node from fingerprinting
|
||||
UpdateNodeFromDriver UpdateNodeDriverInfoFn
|
||||
}
|
||||
|
||||
// instanceManager is used to manage a single driver plugin
|
||||
type instanceManager struct {
|
||||
// logger is the logger used by the driver instance manager
|
||||
logger log.Logger
|
||||
|
||||
// ctx is used to shutdown the driver manager
|
||||
ctx context.Context
|
||||
|
||||
// cancel is used to shutdown management of this driver plugin
|
||||
cancel context.CancelFunc
|
||||
|
||||
// loader is the plugin loader
|
||||
loader loader.PluginCatalog
|
||||
|
||||
// storeReattach is used to store a plugins reattach config
|
||||
storeReattach StorePluginReattachFn
|
||||
|
||||
// fetchReattach is used to retrieve a plugin's reattach config
|
||||
fetchReattach FetchPluginReattachFn
|
||||
|
||||
// pluginConfig is the config passed to the launched plugins
|
||||
pluginConfig *base.ClientAgentConfig
|
||||
|
||||
// id is the ID of the plugin being managed
|
||||
id *loader.PluginID
|
||||
|
||||
// plugin is the plugin instance being managed
|
||||
plugin loader.PluginInstance
|
||||
|
||||
// driver is the driver plugin being managed
|
||||
driver drivers.DriverPlugin
|
||||
|
||||
// pluginLock locks access to the driver and plugin
|
||||
pluginLock sync.Mutex
|
||||
|
||||
// shutdownLock is used to serialize attempts to shutdown
|
||||
shutdownLock sync.Mutex
|
||||
|
||||
// updateNodeFromDriver is the callback used to update the node from fingerprinting
|
||||
updateNodeFromDriver UpdateNodeDriverInfoFn
|
||||
|
||||
// handlers is the map of taskID to handler func
|
||||
handlers map[string]EventHandler
|
||||
handlersLock sync.RWMutex
|
||||
|
||||
// firstFingerprintCh is used to trigger that we have successfully
|
||||
// fingerprinted once. It is used to gate launching the stats collection.
|
||||
firstFingerprintCh chan struct{}
|
||||
hasFingerprinted bool
|
||||
|
||||
// lastHealthState is the last known health fingerprinted by the manager
|
||||
lastHealthState drivers.HealthState
|
||||
lastHealthStateMu sync.Mutex
|
||||
}
|
||||
|
||||
// newInstanceManager returns a new driver instance manager. It is expected that
|
||||
// the context passed in the configuration is cancelled in order to shutdown
|
||||
// launched goroutines.
|
||||
func newInstanceManager(c *instanceManagerConfig) *instanceManager {
|
||||
|
||||
ctx, cancel := context.WithCancel(c.Ctx)
|
||||
i := &instanceManager{
|
||||
logger: c.Logger.With("driver", c.ID.Name),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
loader: c.Loader,
|
||||
storeReattach: c.StoreReattach,
|
||||
fetchReattach: c.FetchReattach,
|
||||
pluginConfig: c.PluginConfig,
|
||||
id: c.ID,
|
||||
updateNodeFromDriver: c.UpdateNodeFromDriver,
|
||||
handlers: map[string]EventHandler{},
|
||||
firstFingerprintCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
go i.run()
|
||||
return i
|
||||
}
|
||||
|
||||
// WaitForFirstFingerprint waits until either the plugin fingerprints, the
|
||||
// passed context is done, or the plugin instance manager is shutdown.
|
||||
func (i *instanceManager) WaitForFirstFingerprint(ctx context.Context) {
|
||||
select {
|
||||
case <-i.ctx.Done():
|
||||
case <-ctx.Done():
|
||||
case <-i.firstFingerprintCh:
|
||||
}
|
||||
}
|
||||
|
||||
// registerEventHandler registers the given handler to run for events with the
|
||||
// given taskID
|
||||
func (i *instanceManager) registerEventHandler(taskID string, handler EventHandler) {
|
||||
i.handlersLock.Lock()
|
||||
defer i.handlersLock.Unlock()
|
||||
i.handlers[taskID] = handler
|
||||
}
|
||||
|
||||
// deregisterEventHandler removed the handlers registered for the given taskID
|
||||
// and is serlialized so as to not be called concurrently with the registered
|
||||
// handler
|
||||
func (i *instanceManager) deregisterEventHandler(taskID string) {
|
||||
i.handlersLock.Lock()
|
||||
defer i.handlersLock.Unlock()
|
||||
delete(i.handlers, taskID)
|
||||
}
|
||||
|
||||
// run is a long lived goroutine that starts the fingerprinting and stats
|
||||
// collection goroutine and then shutsdown the plugin on exit.
|
||||
func (i *instanceManager) run() {
|
||||
// Dispense once to ensure we are given a valid plugin
|
||||
if _, err := i.dispense(); err != nil {
|
||||
i.logger.Error("dispensing initial plugin failed", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Create a waitgroup to block on shutdown for all created goroutines to
|
||||
// exit
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start the fingerprinter
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
i.fingerprint()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
// Start event handler
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
i.handleEvents()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
// Do a final cleanup
|
||||
wg.Wait()
|
||||
i.cleanup()
|
||||
}
|
||||
|
||||
// dispense is used to dispense a plugin.
|
||||
func (i *instanceManager) dispense() (plugin drivers.DriverPlugin, err error) {
|
||||
i.pluginLock.Lock()
|
||||
defer i.pluginLock.Unlock()
|
||||
|
||||
// See if we already have a running instance
|
||||
if i.plugin != nil && !i.plugin.Exited() {
|
||||
return i.driver, nil
|
||||
}
|
||||
|
||||
var pluginInstance loader.PluginInstance
|
||||
|
||||
if reattach, ok := i.fetchReattach(); ok {
|
||||
// Reattach to existing plugin
|
||||
pluginInstance, err = i.loader.Reattach(i.id.Name, i.id.PluginType, reattach)
|
||||
} else {
|
||||
// Get an instance of the plugin
|
||||
pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger)
|
||||
}
|
||||
if err != nil {
|
||||
// Retry as the error just indicates the singleton has exited
|
||||
if err == singleton.SingletonPluginExited {
|
||||
pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger)
|
||||
}
|
||||
|
||||
// If we still have an error there is a real problem
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start plugin: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to a fingerprint plugin
|
||||
driver, ok := pluginInstance.Plugin().(drivers.DriverPlugin)
|
||||
if !ok {
|
||||
pluginInstance.Kill()
|
||||
return nil, fmt.Errorf("plugin loaded does not implement the driver interface")
|
||||
}
|
||||
|
||||
// Store the plugin and driver
|
||||
i.plugin = pluginInstance
|
||||
i.driver = driver
|
||||
|
||||
// Store the reattach config
|
||||
if c, ok := pluginInstance.ReattachConfig(); ok {
|
||||
if err := i.storeReattach(c); err != nil {
|
||||
i.logger.Error("error storing driver plugin reattach config", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return driver, nil
|
||||
}
|
||||
|
||||
// cleanup shutsdown the plugin
|
||||
func (i *instanceManager) cleanup() {
|
||||
i.shutdownLock.Lock()
|
||||
i.pluginLock.Lock()
|
||||
defer i.pluginLock.Unlock()
|
||||
defer i.shutdownLock.Unlock()
|
||||
|
||||
if i.plugin != nil && !i.plugin.Exited() {
|
||||
i.plugin.Kill()
|
||||
if err := i.storeReattach(nil); err != nil {
|
||||
i.logger.Warn("error clearing plugin reattach config from state store", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dispenseFingerprintCh dispenses a driver and makes a Fingerprint RPC call
|
||||
// to the driver. The fingerprint chan is returned along with the cancel func
|
||||
// for the context used in the RPC. This cancel func should always be called
|
||||
// when the caller is finished with the channel.
|
||||
func (i *instanceManager) dispenseFingerprintCh() (<-chan *drivers.Fingerprint, context.CancelFunc, error) {
|
||||
driver, err := i.dispense()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(i.ctx)
|
||||
fingerCh, err := driver.Fingerprint(ctx)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return fingerCh, cancel, nil
|
||||
}
|
||||
|
||||
// fingerprint is the main loop for fingerprinting.
|
||||
func (i *instanceManager) fingerprint() {
|
||||
fpChan, cancel, err := i.dispenseFingerprintCh()
|
||||
if err != nil {
|
||||
i.logger.Error("failed to dispense driver plugin", "error", err)
|
||||
}
|
||||
|
||||
// backoff and retry used if the RPC is closed by the other end
|
||||
var backoff time.Duration
|
||||
var retry int
|
||||
for {
|
||||
if backoff > 0 {
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-i.ctx.Done():
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-i.ctx.Done():
|
||||
cancel()
|
||||
return
|
||||
case fp, ok := <-fpChan:
|
||||
if ok {
|
||||
if fp.Err == nil {
|
||||
i.handleFingerprint(fp)
|
||||
} else {
|
||||
i.logger.Warn("received fingerprint error from driver", "error", fp.Err)
|
||||
i.handleFingerprintError()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// if the channel is closed attempt to open a new one
|
||||
newFpChan, newCancel, err := i.dispenseFingerprintCh()
|
||||
if err != nil {
|
||||
i.logger.Warn("error fingerprinting driver", "error", err, "retry", retry)
|
||||
i.handleFingerprintError()
|
||||
|
||||
// Calculate the new backoff
|
||||
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
|
||||
if backoff > driverFPBackoffLimit {
|
||||
backoff = driverFPBackoffLimit
|
||||
}
|
||||
// Increment retry counter
|
||||
retry++
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
fpChan = newFpChan
|
||||
cancel = newCancel
|
||||
|
||||
// Reset backoff
|
||||
backoff = 0
|
||||
retry = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleFingerprintError is called when an error occurred while fingerprinting
|
||||
// and will set the driver to unhealthy
|
||||
func (i *instanceManager) handleFingerprintError() {
|
||||
di := &structs.DriverInfo{
|
||||
Healthy: false,
|
||||
HealthDescription: "failed to fingerprint driver",
|
||||
UpdateTime: time.Now(),
|
||||
}
|
||||
i.updateNodeFromDriver(i.id.Name, di)
|
||||
}
|
||||
|
||||
// handleFingerprint updates the node with the current fingerprint status
|
||||
func (i *instanceManager) handleFingerprint(fp *drivers.Fingerprint) {
|
||||
attrs := make(map[string]string, len(fp.Attributes))
|
||||
for key, attr := range fp.Attributes {
|
||||
attrs[key] = attr.GoString()
|
||||
}
|
||||
di := &structs.DriverInfo{
|
||||
Attributes: attrs,
|
||||
Detected: fp.Health != drivers.HealthStateUndetected,
|
||||
Healthy: fp.Health == drivers.HealthStateHealthy,
|
||||
HealthDescription: fp.HealthDescription,
|
||||
UpdateTime: time.Now(),
|
||||
}
|
||||
i.updateNodeFromDriver(i.id.Name, di)
|
||||
|
||||
// log detected/undetected state changes after the initial fingerprint
|
||||
i.lastHealthStateMu.Lock()
|
||||
if i.hasFingerprinted {
|
||||
if i.lastHealthState != fp.Health {
|
||||
i.logger.Info("driver health state has changed", "previous", i.lastHealthState, "current", fp.Health, "description", fp.HealthDescription)
|
||||
}
|
||||
}
|
||||
i.lastHealthState = fp.Health
|
||||
i.lastHealthStateMu.Unlock()
|
||||
|
||||
// if this is the first fingerprint, mark that we have received it
|
||||
if !i.hasFingerprinted {
|
||||
i.logger.Trace("initial driver fingerprint", "fingerprint", fp)
|
||||
close(i.firstFingerprintCh)
|
||||
i.hasFingerprinted = true
|
||||
}
|
||||
}
|
||||
|
||||
// getLastHealth returns the most recent HealthState from fingerprinting
|
||||
func (i *instanceManager) getLastHealth() drivers.HealthState {
|
||||
i.lastHealthStateMu.Lock()
|
||||
defer i.lastHealthStateMu.Unlock()
|
||||
return i.lastHealthState
|
||||
}
|
||||
|
||||
// dispenseTaskEventsCh dispenses a driver plugin and makes a TaskEvents RPC.
|
||||
// The TaskEvent chan and cancel func for the RPC is return. The cancel func must
|
||||
// be called by the caller to properly cleanup the context
|
||||
func (i *instanceManager) dispenseTaskEventsCh() (<-chan *drivers.TaskEvent, context.CancelFunc, error) {
|
||||
driver, err := i.dispense()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(i.ctx)
|
||||
eventsCh, err := driver.TaskEvents(ctx)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return eventsCh, cancel, nil
|
||||
}
|
||||
|
||||
// handleEvents is the main loop that receives task events from the driver
|
||||
func (i *instanceManager) handleEvents() {
|
||||
eventsCh, cancel, err := i.dispenseTaskEventsCh()
|
||||
if err != nil {
|
||||
i.logger.Error("failed to dispense driver", "error", err)
|
||||
}
|
||||
|
||||
var backoff time.Duration
|
||||
var retry int
|
||||
for {
|
||||
if backoff > 0 {
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-i.ctx.Done():
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-i.ctx.Done():
|
||||
cancel()
|
||||
return
|
||||
case ev, ok := <-eventsCh:
|
||||
if ok {
|
||||
i.handleEvent(ev)
|
||||
continue
|
||||
}
|
||||
|
||||
// if the channel is closed attempt to open a new one
|
||||
newEventsChan, newCancel, err := i.dispenseTaskEventsCh()
|
||||
if err != nil {
|
||||
i.logger.Warn("failed to receive task events, retrying", "error", err, "retry", retry)
|
||||
|
||||
// Calculate the new backoff
|
||||
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
|
||||
if backoff > driverFPBackoffLimit {
|
||||
backoff = driverFPBackoffLimit
|
||||
}
|
||||
retry++
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
eventsCh = newEventsChan
|
||||
cancel = newCancel
|
||||
|
||||
// Reset backoff
|
||||
backoff = 0
|
||||
retry = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleEvent looks up the event handler(s) for the event and runs them
|
||||
func (i *instanceManager) handleEvent(ev *drivers.TaskEvent) {
|
||||
i.handlersLock.RLock()
|
||||
defer i.handlersLock.RUnlock()
|
||||
if handler, ok := i.handlers[ev.TaskID]; ok {
|
||||
i.logger.Trace("task event received", "event", ev)
|
||||
handler(ev)
|
||||
return
|
||||
}
|
||||
|
||||
i.logger.Warn("no handler registered for event", "event", ev)
|
||||
}
|
||||
320
client/pluginmanager/drivermanager/manager.go
Normal file
320
client/pluginmanager/drivermanager/manager.go
Normal file
@@ -0,0 +1,320 @@
|
||||
package drivermanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
plugin "github.com/hashicorp/go-plugin"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/hashicorp/nomad/plugins/shared"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
)
|
||||
|
||||
// Manager is the interface used to manage driver plugins
|
||||
type Manager interface {
|
||||
RegisterEventHandler(driver, taskID string, handler EventHandler)
|
||||
DeregisterEventHandler(driver, taskID string)
|
||||
|
||||
Dispense(driver string) (drivers.DriverPlugin, error)
|
||||
}
|
||||
|
||||
// EventHandler can be registered with a Manager to be called for a matching task.
|
||||
// The handler should not block execution.
|
||||
type EventHandler func(*drivers.TaskEvent)
|
||||
|
||||
// StateStorage is used to persist the driver managers state across
|
||||
// agent restarts.
|
||||
type StateStorage interface {
|
||||
// GetDevicePluginState is used to retrieve the device manager's plugin
|
||||
// state.
|
||||
GetDriverPluginState() (*state.PluginState, error)
|
||||
|
||||
// PutDevicePluginState is used to store the device manager's plugin
|
||||
// state.
|
||||
PutDriverPluginState(state *state.PluginState) error
|
||||
}
|
||||
|
||||
// UpdateNodeDriverInfoFn is the callback used to update the node from
|
||||
// fingerprinting
|
||||
type UpdateNodeDriverInfoFn func(string, *structs.DriverInfo) *structs.Node
|
||||
|
||||
// StorePluginReattachFn is used to store plugin reattachment configurations.
|
||||
type StorePluginReattachFn func(*plugin.ReattachConfig) error
|
||||
|
||||
// FetchPluginReattachFn is used to retrieve the stored plugin reattachment
|
||||
// configuration.
|
||||
type FetchPluginReattachFn func() (*plugin.ReattachConfig, bool)
|
||||
|
||||
// Config is used to configure a driver manager
|
||||
type Config struct {
|
||||
// Logger is the logger used by the device manager
|
||||
Logger log.Logger
|
||||
|
||||
// Loader is the plugin loader
|
||||
Loader loader.PluginCatalog
|
||||
|
||||
// PluginConfig is the config passed to the launched plugins
|
||||
PluginConfig *base.ClientAgentConfig
|
||||
|
||||
// Updater is used to update the node when driver information changes
|
||||
Updater UpdateNodeDriverInfoFn
|
||||
|
||||
// State is used to manage the device managers state
|
||||
State StateStorage
|
||||
|
||||
// AllowedDrivers if set will only start driver plugins for the given
|
||||
// drivers
|
||||
AllowedDrivers map[string]struct{}
|
||||
|
||||
// BlockedDrivers if set will not allow the given driver plugins to start
|
||||
BlockedDrivers map[string]struct{}
|
||||
}
|
||||
|
||||
// manager is used to manage a set of driver plugins
|
||||
type manager struct {
|
||||
// logger is the logger used by the device manager
|
||||
logger log.Logger
|
||||
|
||||
// state is used to manage the device managers state
|
||||
state StateStorage
|
||||
|
||||
// ctx is used to shutdown the device manager
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
// loader is the plugin loader
|
||||
loader loader.PluginCatalog
|
||||
|
||||
// pluginConfig is the config passed to the launched plugins
|
||||
pluginConfig *base.ClientAgentConfig
|
||||
|
||||
// updater is used to update the node when device information changes
|
||||
updater UpdateNodeDriverInfoFn
|
||||
|
||||
// instances is the list of managed devices, access is serialized by instanceMu
|
||||
instances map[string]*instanceManager
|
||||
instancesMu sync.Mutex
|
||||
|
||||
// reattachConfigs stores the plugin reattach configs
|
||||
reattachConfigs map[loader.PluginID]*shared.ReattachConfig
|
||||
reattachConfigLock sync.Mutex
|
||||
|
||||
// allows/block lists
|
||||
allowedDrivers map[string]struct{}
|
||||
blockedDrivers map[string]struct{}
|
||||
|
||||
// readyCh is ticked once at the end of Run()
|
||||
readyCh chan struct{}
|
||||
}
|
||||
|
||||
// New returns a new driver manager
|
||||
func New(c *Config) *manager {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &manager{
|
||||
logger: c.Logger.Named("driver_mgr"),
|
||||
state: c.State,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
loader: c.Loader,
|
||||
pluginConfig: c.PluginConfig,
|
||||
updater: c.Updater,
|
||||
instances: make(map[string]*instanceManager),
|
||||
reattachConfigs: make(map[loader.PluginID]*shared.ReattachConfig),
|
||||
allowedDrivers: c.AllowedDrivers,
|
||||
blockedDrivers: c.BlockedDrivers,
|
||||
readyCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// PluginType returns the type of plugin this mananger mananges
|
||||
func (*manager) PluginType() string { return base.PluginTypeDriver }
|
||||
|
||||
// Run starts the mananger, initializes driver plugins and blocks until Shutdown
|
||||
// is called.
|
||||
func (m *manager) Run() {
|
||||
// Load any previous plugin reattach configuration
|
||||
m.loadReattachConfigs()
|
||||
|
||||
// Get driver plugins
|
||||
driversPlugins := m.loader.Catalog()[base.PluginTypeDriver]
|
||||
if len(driversPlugins) == 0 {
|
||||
m.logger.Debug("exiting since there are no driver plugins")
|
||||
m.cancel()
|
||||
return
|
||||
}
|
||||
|
||||
var skippedDrivers []string
|
||||
for _, d := range driversPlugins {
|
||||
id := loader.PluginInfoID(d)
|
||||
// Skip drivers that are not in the allowed list if it is set.
|
||||
if _, ok := m.allowedDrivers[id.Name]; len(m.allowedDrivers) > 0 && !ok {
|
||||
skippedDrivers = append(skippedDrivers, id.Name)
|
||||
continue
|
||||
}
|
||||
// Skip fingerprinting drivers that are in the blocked list
|
||||
if _, ok := m.blockedDrivers[id.Name]; ok {
|
||||
skippedDrivers = append(skippedDrivers, id.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
storeFn := func(c *plugin.ReattachConfig) error {
|
||||
return m.storePluginReattachConfig(id, c)
|
||||
}
|
||||
fetchFn := func() (*plugin.ReattachConfig, bool) {
|
||||
return m.fetchPluginReattachConfig(id)
|
||||
}
|
||||
|
||||
instance := newInstanceManager(&instanceManagerConfig{
|
||||
Logger: m.logger,
|
||||
Ctx: m.ctx,
|
||||
Loader: m.loader,
|
||||
StoreReattach: storeFn,
|
||||
FetchReattach: fetchFn,
|
||||
PluginConfig: m.pluginConfig,
|
||||
ID: &id,
|
||||
UpdateNodeFromDriver: m.updater,
|
||||
})
|
||||
|
||||
m.instancesMu.Lock()
|
||||
m.instances[id.Name] = instance
|
||||
m.instancesMu.Unlock()
|
||||
}
|
||||
|
||||
if len(skippedDrivers) > 0 {
|
||||
m.logger.Debug("drivers skipped due to allow/block list", "skipped_drivers", skippedDrivers)
|
||||
}
|
||||
|
||||
// signal ready
|
||||
select {
|
||||
case <-m.ctx.Done():
|
||||
return
|
||||
case m.readyCh <- struct{}{}:
|
||||
}
|
||||
|
||||
// wait for shutdown
|
||||
<-m.ctx.Done()
|
||||
}
|
||||
|
||||
// Shutdown cleans up all the plugins
|
||||
func (m *manager) Shutdown() {
|
||||
// Cancel the context to stop any requests
|
||||
m.cancel()
|
||||
|
||||
m.instancesMu.Lock()
|
||||
defer m.instancesMu.Unlock()
|
||||
|
||||
// Go through and shut everything down
|
||||
for _, i := range m.instances {
|
||||
i.cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *manager) Ready() <-chan struct{} {
|
||||
ret := make(chan struct{})
|
||||
go func() {
|
||||
// We don't want to start initial fingerprint wait until Run loop has
|
||||
// finished
|
||||
<-m.readyCh
|
||||
|
||||
var availDrivers []string
|
||||
ctx, cancel := context.WithTimeout(m.ctx, time.Second*10)
|
||||
for name, instance := range m.instances {
|
||||
instance.WaitForFirstFingerprint(ctx)
|
||||
if instance.lastHealthState != drivers.HealthStateUndetected {
|
||||
availDrivers = append(availDrivers, name)
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
m.logger.Debug("detected drivers", "drivers", availDrivers)
|
||||
close(ret)
|
||||
}()
|
||||
return ret
|
||||
}
|
||||
|
||||
func (m *manager) loadReattachConfigs() error {
|
||||
m.reattachConfigLock.Lock()
|
||||
defer m.reattachConfigLock.Unlock()
|
||||
|
||||
s, err := m.state.GetDriverPluginState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s != nil {
|
||||
for name, c := range s.ReattachConfigs {
|
||||
id := loader.PluginID{
|
||||
PluginType: base.PluginTypeDriver,
|
||||
Name: name,
|
||||
}
|
||||
m.reattachConfigs[id] = c
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// storePluginReattachConfig is used as a callback to the instance managers and
|
||||
// persists thhe plugin reattach configurations.
|
||||
func (m *manager) storePluginReattachConfig(id loader.PluginID, c *plugin.ReattachConfig) error {
|
||||
m.reattachConfigLock.Lock()
|
||||
defer m.reattachConfigLock.Unlock()
|
||||
|
||||
// Store the new reattach config
|
||||
m.reattachConfigs[id] = shared.ReattachConfigFromGoPlugin(c)
|
||||
|
||||
// Persist the state
|
||||
s := &state.PluginState{
|
||||
ReattachConfigs: make(map[string]*shared.ReattachConfig, len(m.reattachConfigs)),
|
||||
}
|
||||
|
||||
for id, c := range m.reattachConfigs {
|
||||
s.ReattachConfigs[id.Name] = c
|
||||
}
|
||||
|
||||
return m.state.PutDriverPluginState(s)
|
||||
}
|
||||
|
||||
// fetchPluginReattachConfig is used as a callback to the instance managers and
|
||||
// retrieves the plugin reattach config. If it has not been stored it will
|
||||
// return nil
|
||||
func (m *manager) fetchPluginReattachConfig(id loader.PluginID) (*plugin.ReattachConfig, bool) {
|
||||
m.reattachConfigLock.Lock()
|
||||
defer m.reattachConfigLock.Unlock()
|
||||
|
||||
if cfg, ok := m.reattachConfigs[id]; ok {
|
||||
c, err := shared.ReattachConfigToGoPlugin(cfg)
|
||||
if err != nil {
|
||||
m.logger.Warn("failed to read plugin reattach config", "config", cfg, "error", err)
|
||||
return nil, false
|
||||
}
|
||||
return c, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (m *manager) RegisterEventHandler(driver, taskID string, handler EventHandler) {
|
||||
m.instancesMu.Lock()
|
||||
m.instances[driver].registerEventHandler(taskID, handler)
|
||||
m.instancesMu.Unlock()
|
||||
}
|
||||
|
||||
func (m *manager) DeregisterEventHandler(driver, taskID string) {
|
||||
m.instancesMu.Lock()
|
||||
m.instances[driver].deregisterEventHandler(taskID)
|
||||
m.instancesMu.Unlock()
|
||||
}
|
||||
|
||||
func (m *manager) Dispense(d string) (drivers.DriverPlugin, error) {
|
||||
m.instancesMu.Lock()
|
||||
defer m.instancesMu.Unlock()
|
||||
if instance, ok := m.instances[d]; ok {
|
||||
return instance.dispense()
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("driver not found")
|
||||
}
|
||||
305
client/pluginmanager/drivermanager/manager_test.go
Normal file
305
client/pluginmanager/drivermanager/manager_test.go
Normal file
@@ -0,0 +1,305 @@
|
||||
package drivermanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
plugin "github.com/hashicorp/go-plugin"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var _ Manager = (*manager)(nil)
|
||||
var _ pluginmanager.PluginManager = (*manager)(nil)
|
||||
|
||||
func testSetup(t *testing.T) (chan *drivers.Fingerprint, chan *drivers.TaskEvent, *manager) {
|
||||
fpChan := make(chan *drivers.Fingerprint)
|
||||
evChan := make(chan *drivers.TaskEvent)
|
||||
drv := mockDriver(fpChan, evChan)
|
||||
cat := mockCatalog(map[string]drivers.DriverPlugin{"mock": drv})
|
||||
cfg := &Config{
|
||||
Logger: testlog.HCLogger(t),
|
||||
Loader: cat,
|
||||
PluginConfig: &base.ClientAgentConfig{},
|
||||
Updater: noopUpdater,
|
||||
State: state.NoopDB{},
|
||||
AllowedDrivers: make(map[string]struct{}),
|
||||
BlockedDrivers: make(map[string]struct{}),
|
||||
}
|
||||
|
||||
mgr := New(cfg)
|
||||
return fpChan, evChan, mgr
|
||||
}
|
||||
|
||||
func mockDriver(fpChan chan *drivers.Fingerprint, evChan chan *drivers.TaskEvent) drivers.DriverPlugin {
|
||||
return &drivers.MockDriver{
|
||||
FingerprintF: func(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
|
||||
return fpChan, nil
|
||||
},
|
||||
TaskEventsF: func(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
|
||||
return evChan, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func mockCatalog(drivers map[string]drivers.DriverPlugin) *loader.MockCatalog {
|
||||
cat := map[string][]*base.PluginInfoResponse{
|
||||
base.PluginTypeDriver: {},
|
||||
}
|
||||
for d := range drivers {
|
||||
cat[base.PluginTypeDriver] = append(cat[base.PluginTypeDriver], &base.PluginInfoResponse{
|
||||
Name: d,
|
||||
Type: base.PluginTypeDriver,
|
||||
})
|
||||
}
|
||||
|
||||
return &loader.MockCatalog{
|
||||
DispenseF: func(name, pluginType string, cfg *base.ClientAgentConfig, logger log.Logger) (loader.PluginInstance, error) {
|
||||
d, ok := drivers[name]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("driver not found")
|
||||
}
|
||||
return loader.MockBasicExternalPlugin(d), nil
|
||||
},
|
||||
ReattachF: func(name, pluginType string, config *plugin.ReattachConfig) (loader.PluginInstance, error) {
|
||||
d, ok := drivers[name]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("driver not found")
|
||||
}
|
||||
return loader.MockBasicExternalPlugin(d), nil
|
||||
},
|
||||
CatalogF: func() map[string][]*base.PluginInfoResponse {
|
||||
return cat
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func mockTaskEvent(taskID string) *drivers.TaskEvent {
|
||||
return &drivers.TaskEvent{
|
||||
TaskID: taskID,
|
||||
Timestamp: time.Now(),
|
||||
Annotations: map[string]string{},
|
||||
Message: "event from " + taskID,
|
||||
}
|
||||
}
|
||||
|
||||
func noopUpdater(string, *structs.DriverInfo) *structs.Node { return nil }
|
||||
|
||||
func TestMananger_Fingerprint(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
fpChan, _, mgr := testSetup(t)
|
||||
var infos []*structs.DriverInfo
|
||||
mgr.updater = func(d string, i *structs.DriverInfo) *structs.Node {
|
||||
infos = append(infos, i)
|
||||
return nil
|
||||
}
|
||||
go mgr.Run()
|
||||
defer mgr.Shutdown()
|
||||
fpChan <- &drivers.Fingerprint{Health: drivers.HealthStateHealthy}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
mgr.instancesMu.Lock()
|
||||
defer mgr.instancesMu.Unlock()
|
||||
if len(mgr.instances) != 1 {
|
||||
return false, fmt.Errorf("mananger should have registered an instance")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(err)
|
||||
})
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
mgr.instancesMu.Lock()
|
||||
defer mgr.instancesMu.Unlock()
|
||||
if mgr.instances["mock"].getLastHealth() != drivers.HealthStateHealthy {
|
||||
return false, fmt.Errorf("mock instance should be healthy")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(err)
|
||||
})
|
||||
|
||||
fpChan <- &drivers.Fingerprint{
|
||||
Health: drivers.HealthStateUnhealthy,
|
||||
}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
mgr.instancesMu.Lock()
|
||||
defer mgr.instancesMu.Unlock()
|
||||
if mgr.instances["mock"].getLastHealth() == drivers.HealthStateHealthy {
|
||||
return false, fmt.Errorf("mock instance should be unhealthy")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(err)
|
||||
})
|
||||
|
||||
fpChan <- &drivers.Fingerprint{
|
||||
Health: drivers.HealthStateUndetected,
|
||||
}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
mgr.instancesMu.Lock()
|
||||
defer mgr.instancesMu.Unlock()
|
||||
if mgr.instances["mock"].getLastHealth() != drivers.HealthStateUndetected {
|
||||
return false, fmt.Errorf("mock instance should be undetected")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(err)
|
||||
})
|
||||
|
||||
require.Len(infos, 3)
|
||||
require.True(infos[0].Healthy)
|
||||
require.True(infos[0].Detected)
|
||||
require.False(infos[1].Healthy)
|
||||
require.True(infos[1].Detected)
|
||||
require.False(infos[2].Healthy)
|
||||
require.False(infos[2].Detected)
|
||||
}
|
||||
|
||||
func TestMananger_TaskEvents(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
fpChan, evChan, mgr := testSetup(t)
|
||||
go mgr.Run()
|
||||
defer mgr.Shutdown()
|
||||
fpChan <- &drivers.Fingerprint{Health: drivers.HealthStateHealthy}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
mgr.instancesMu.Lock()
|
||||
defer mgr.instancesMu.Unlock()
|
||||
if len(mgr.instances) != 1 {
|
||||
return false, fmt.Errorf("mananger should have registered 1 instance")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(err)
|
||||
})
|
||||
|
||||
event1 := mockTaskEvent("abc1")
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
mgr.RegisterEventHandler("mock", "abc1", func(ev *drivers.TaskEvent) {
|
||||
defer wg.Done()
|
||||
require.Exactly(event1, ev)
|
||||
})
|
||||
|
||||
evChan <- event1
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestManager_Run_AllowedDrivers(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
fpChan, _, mgr := testSetup(t)
|
||||
mgr.allowedDrivers = map[string]struct{}{"foo": {}}
|
||||
go mgr.Run()
|
||||
defer mgr.Shutdown()
|
||||
select {
|
||||
case fpChan <- &drivers.Fingerprint{Health: drivers.HealthStateHealthy}:
|
||||
default:
|
||||
}
|
||||
testutil.AssertUntil(200*time.Millisecond, func() (bool, error) {
|
||||
mgr.instancesMu.Lock()
|
||||
defer mgr.instancesMu.Unlock()
|
||||
if len(mgr.instances) > 0 {
|
||||
return false, fmt.Errorf("mananger should have no registered instances")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestManager_Run_BlockedDrivers(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
fpChan, _, mgr := testSetup(t)
|
||||
mgr.blockedDrivers = map[string]struct{}{"mock": {}}
|
||||
go mgr.Run()
|
||||
defer mgr.Shutdown()
|
||||
select {
|
||||
case fpChan <- &drivers.Fingerprint{Health: drivers.HealthStateHealthy}:
|
||||
default:
|
||||
}
|
||||
testutil.AssertUntil(200*time.Millisecond, func() (bool, error) {
|
||||
mgr.instancesMu.Lock()
|
||||
defer mgr.instancesMu.Unlock()
|
||||
if len(mgr.instances) > 0 {
|
||||
return false, fmt.Errorf("mananger should have no registered instances")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestManager_Run_AllowedBlockedDrivers_Combined(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
drvs := map[string]drivers.DriverPlugin{}
|
||||
fpChs := map[string]chan *drivers.Fingerprint{}
|
||||
names := []string{"mock1", "mock2", "mock3", "mock4", "mock5"}
|
||||
for _, d := range names {
|
||||
ch := make(chan *drivers.Fingerprint)
|
||||
drv := mockDriver(ch, nil)
|
||||
drvs[d] = drv
|
||||
fpChs[d] = ch
|
||||
}
|
||||
cat := mockCatalog(drvs)
|
||||
cfg := &Config{
|
||||
Logger: testlog.HCLogger(t),
|
||||
Loader: cat,
|
||||
PluginConfig: nil,
|
||||
Updater: noopUpdater,
|
||||
State: state.NoopDB{},
|
||||
AllowedDrivers: map[string]struct{}{
|
||||
"mock2": {},
|
||||
"mock3": {},
|
||||
"mock4": {},
|
||||
"foo": {},
|
||||
},
|
||||
BlockedDrivers: map[string]struct{}{
|
||||
"mock2": {},
|
||||
"mock4": {},
|
||||
"bar": {},
|
||||
},
|
||||
}
|
||||
mgr := New(cfg)
|
||||
|
||||
go mgr.Run()
|
||||
defer mgr.Shutdown()
|
||||
for _, d := range names {
|
||||
go func(drv string) {
|
||||
select {
|
||||
case fpChs[drv] <- &drivers.Fingerprint{Health: drivers.HealthStateHealthy}:
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
}
|
||||
}(d)
|
||||
}
|
||||
|
||||
testutil.AssertUntil(200*time.Millisecond, func() (bool, error) {
|
||||
mgr.instancesMu.Lock()
|
||||
defer mgr.instancesMu.Unlock()
|
||||
if len(mgr.instances) > 1 {
|
||||
return false, fmt.Errorf("mananger should have 1 registered instance")
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(err)
|
||||
})
|
||||
mgr.instancesMu.Lock()
|
||||
require.Len(mgr.instances, 1)
|
||||
_, ok := mgr.instances["mock3"]
|
||||
mgr.instancesMu.Unlock()
|
||||
require.True(ok)
|
||||
}
|
||||
11
client/pluginmanager/drivermanager/state/state.go
Normal file
11
client/pluginmanager/drivermanager/state/state.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package state
|
||||
|
||||
import "github.com/hashicorp/nomad/plugins/shared"
|
||||
|
||||
// PluginState is used to store the driver managers state across restarts of the
|
||||
// agent
|
||||
type PluginState struct {
|
||||
// ReattachConfigs are the set of reattach configs for plugin's launched by
|
||||
// the driver manager
|
||||
ReattachConfigs map[string]*shared.ReattachConfig
|
||||
}
|
||||
47
client/pluginmanager/drivermanager/testing.go
Normal file
47
client/pluginmanager/drivermanager/testing.go
Normal file
@@ -0,0 +1,47 @@
|
||||
// +build !release
|
||||
|
||||
package drivermanager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/hashicorp/nomad/plugins/shared/catalog"
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
||||
)
|
||||
|
||||
type testManager struct {
|
||||
logger log.Logger
|
||||
loader loader.PluginCatalog
|
||||
}
|
||||
|
||||
func TestDriverManager(t *testing.T) Manager {
|
||||
logger := testlog.HCLogger(t).Named("driver_mgr")
|
||||
pluginLoader := catalog.TestPluginLoader(t)
|
||||
return &testManager{
|
||||
logger: logger,
|
||||
loader: singleton.NewSingletonLoader(logger, pluginLoader),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *testManager) Dispense(driver string) (drivers.DriverPlugin, error) {
|
||||
instance, err := m.loader.Dispense(driver, base.PluginTypeDriver, nil, m.logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
d, ok := instance.Plugin().(drivers.DriverPlugin)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("plugin does not implement DriverPlugin interface")
|
||||
}
|
||||
|
||||
return d, nil
|
||||
}
|
||||
|
||||
func (m *testManager) RegisterEventHandler(driver, taskID string, handler EventHandler) {}
|
||||
func (m *testManager) DeregisterEventHandler(driver, taskID string) {}
|
||||
@@ -3,10 +3,17 @@ package pluginmanager
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultManagerReadyTimeout is the default amount of time we will wait
|
||||
// for a plugin mananger to be ready before logging it and moving on.
|
||||
DefaultManagerReadyTimeout = time.Second * 5
|
||||
)
|
||||
|
||||
// PluginGroup is a utility struct to manage a collectively orchestrate a
|
||||
// set of PluginManagers
|
||||
type PluginGroup struct {
|
||||
@@ -33,11 +40,12 @@ func New(logger log.Logger) *PluginGroup {
|
||||
// RegisterAndRun registers the manager and starts it in a separate goroutine
|
||||
func (m *PluginGroup) RegisterAndRun(manager PluginManager) error {
|
||||
m.mLock.Lock()
|
||||
defer m.mLock.Unlock()
|
||||
if m.shutdown {
|
||||
m.mLock.Unlock()
|
||||
return fmt.Errorf("plugin group already shutdown")
|
||||
}
|
||||
m.managers = append(m.managers, manager)
|
||||
m.mLock.Unlock()
|
||||
|
||||
go func() {
|
||||
m.logger.Info("starting plugin manager", "plugin-type", manager.PluginType())
|
||||
@@ -47,6 +55,38 @@ func (m *PluginGroup) RegisterAndRun(manager PluginManager) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ready returns a channel which will be closed once all plugin manangers are ready.
|
||||
// A timeout for waiting on each manager is given
|
||||
func (m *PluginGroup) Ready(timeout time.Duration) (<-chan struct{}, error) {
|
||||
m.mLock.Lock()
|
||||
defer m.mLock.Unlock()
|
||||
if m.shutdown {
|
||||
return nil, fmt.Errorf("plugin group already shutdown")
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := range m.managers {
|
||||
manager := m.managers[i]
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case <-manager.Ready():
|
||||
case <-time.After(timeout):
|
||||
m.logger.Warn("timeout waiting for plugin manager to be ready",
|
||||
"plugin-type", manager.PluginType())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
ret := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(ret)
|
||||
}()
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// Shutdown shutsdown all registered PluginManagers in reverse order of how
|
||||
// they were started.
|
||||
func (m *PluginGroup) Shutdown() {
|
||||
|
||||
@@ -5,6 +5,10 @@ type PluginManager interface {
|
||||
// Run starts a plugin manager and must block until shutdown
|
||||
Run()
|
||||
|
||||
// Ready returns a channel that blocks until the plugin mananger has
|
||||
// initialized all plugins
|
||||
Ready() <-chan struct{}
|
||||
|
||||
// Shutdown should gracefully shutdown all plugins managed by the manager.
|
||||
// It must block until shutdown is complete
|
||||
Shutdown()
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/kr/pretty"
|
||||
@@ -216,3 +217,28 @@ func TestStateDB_DeviceManager(t *testing.T) {
|
||||
require.Equal(state, ps)
|
||||
})
|
||||
}
|
||||
|
||||
// TestStateDB_DriverManager asserts the behavior of device manager state related StateDB
|
||||
// methods.
|
||||
func TestStateDB_DriverManager(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testDB(t, func(t *testing.T, db StateDB) {
|
||||
require := require.New(t)
|
||||
|
||||
// Getting nonexistent state should return nils
|
||||
ps, err := db.GetDriverPluginState()
|
||||
require.NoError(err)
|
||||
require.Nil(ps)
|
||||
|
||||
// Putting PluginState should work
|
||||
state := &driverstate.PluginState{}
|
||||
require.NoError(db.PutDriverPluginState(state))
|
||||
|
||||
// Getting should return the available state
|
||||
ps, err = db.GetDriverPluginState()
|
||||
require.NoError(err)
|
||||
require.NotNil(ps)
|
||||
require.Equal(state, ps)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package state
|
||||
import (
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
@@ -50,6 +51,14 @@ type StateDB interface {
|
||||
// state.
|
||||
PutDevicePluginState(state *dmstate.PluginState) error
|
||||
|
||||
// GetDriverPluginState is used to retrieve the driver manager's plugin
|
||||
// state.
|
||||
GetDriverPluginState() (*driverstate.PluginState, error)
|
||||
|
||||
// PutDriverPluginState is used to store the driver manager's plugin
|
||||
// state.
|
||||
PutDriverPluginState(state *driverstate.PluginState) error
|
||||
|
||||
// Close the database. Unsafe for further use after calling regardless
|
||||
// of return value.
|
||||
Close() error
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
@@ -21,6 +22,9 @@ type MemDB struct {
|
||||
// devicemanager -> plugin-state
|
||||
devManagerPs *dmstate.PluginState
|
||||
|
||||
// drivermanager -> plugin-state
|
||||
driverManagerPs *driverstate.PluginState
|
||||
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -150,6 +154,19 @@ func (m *MemDB) GetDevicePluginState() (*dmstate.PluginState, error) {
|
||||
return m.devManagerPs, nil
|
||||
}
|
||||
|
||||
func (m *MemDB) GetDriverPluginState() (*driverstate.PluginState, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.driverManagerPs, nil
|
||||
}
|
||||
|
||||
func (m *MemDB) PutDriverPluginState(ps *driverstate.PluginState) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.driverManagerPs = ps
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemDB) Close() error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
@@ -3,6 +3,7 @@ package state
|
||||
import (
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
@@ -49,6 +50,14 @@ func (n NoopDB) GetDevicePluginState() (*dmstate.PluginState, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (n NoopDB) PutDriverPluginState(ps *driverstate.PluginState) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n NoopDB) GetDriverPluginState() (*driverstate.PluginState, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (n NoopDB) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
|
||||
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
||||
"github.com/hashicorp/nomad/helper/boltdd"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
@@ -22,6 +23,9 @@ allocations/ (bucket)
|
||||
|
||||
devicemanager/
|
||||
|--> plugin-state -> *dmstate.PluginState
|
||||
|
||||
drivermanager/
|
||||
|--> plugin-state -> *driverstate.PluginState
|
||||
*/
|
||||
|
||||
var (
|
||||
@@ -45,9 +49,13 @@ var (
|
||||
// data
|
||||
devManagerBucket = []byte("devicemanager")
|
||||
|
||||
// devManagerPluginStateKey is the key serialized device manager
|
||||
// plugin state is stored at
|
||||
devManagerPluginStateKey = []byte("plugin_state")
|
||||
// driverManagerBucket is the bucket name container all driver manager
|
||||
// related data
|
||||
driverManagerBucket = []byte("drivermanager")
|
||||
|
||||
// managerPluginStateKey is the key by which plugin manager plugin state is
|
||||
// stored at
|
||||
managerPluginStateKey = []byte("plugin_state")
|
||||
)
|
||||
|
||||
// NewStateDBFunc creates a StateDB given a state directory.
|
||||
@@ -377,7 +385,7 @@ func (s *BoltStateDB) PutDevicePluginState(ps *dmstate.PluginState) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return devBkt.Put(devManagerPluginStateKey, ps)
|
||||
return devBkt.Put(managerPluginStateKey, ps)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -395,7 +403,7 @@ func (s *BoltStateDB) GetDevicePluginState() (*dmstate.PluginState, error) {
|
||||
|
||||
// Restore Plugin State if it exists
|
||||
ps = &dmstate.PluginState{}
|
||||
if err := devBkt.Get(devManagerPluginStateKey, ps); err != nil {
|
||||
if err := devBkt.Get(managerPluginStateKey, ps); err != nil {
|
||||
if !boltdd.IsErrNotFound(err) {
|
||||
return fmt.Errorf("failed to read device manager plugin state: %v", err)
|
||||
}
|
||||
@@ -413,3 +421,50 @@ func (s *BoltStateDB) GetDevicePluginState() (*dmstate.PluginState, error) {
|
||||
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
// PutDriverPluginState stores the driver manager's plugin state or returns an
|
||||
// error.
|
||||
func (s *BoltStateDB) PutDriverPluginState(ps *driverstate.PluginState) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
// Retrieve the root driver manager bucket
|
||||
driverBkt, err := tx.CreateBucketIfNotExists(driverManagerBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return driverBkt.Put(managerPluginStateKey, ps)
|
||||
})
|
||||
}
|
||||
|
||||
// GetDriverPluginState stores the driver manager's plugin state or returns an
|
||||
// error.
|
||||
func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) {
|
||||
var ps *driverstate.PluginState
|
||||
|
||||
err := s.db.View(func(tx *boltdd.Tx) error {
|
||||
driverBkt := tx.Bucket(driverManagerBucket)
|
||||
if driverBkt == nil {
|
||||
// No state, return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Restore Plugin State if it exists
|
||||
ps = &driverstate.PluginState{}
|
||||
if err := driverBkt.Get(managerPluginStateKey, ps); err != nil {
|
||||
if !boltdd.IsErrNotFound(err) {
|
||||
return fmt.Errorf("failed to read driver manager plugin state: %v", err)
|
||||
}
|
||||
|
||||
// Key not found, reset ps to nil
|
||||
ps = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
@@ -15,14 +15,13 @@ import (
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/shared/catalog"
|
||||
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -146,19 +145,18 @@ func TestConsul_Integration(t *testing.T) {
|
||||
}()
|
||||
|
||||
// Build the config
|
||||
pluginLoader := catalog.TestPluginLoader(t)
|
||||
config := &taskrunner.Config{
|
||||
Alloc: alloc,
|
||||
ClientConfig: conf,
|
||||
Consul: serviceClient,
|
||||
Task: task,
|
||||
TaskDir: taskDir,
|
||||
Logger: logger,
|
||||
Vault: vclient,
|
||||
StateDB: state.NoopDB{},
|
||||
StateUpdater: logUpdate,
|
||||
PluginSingletonLoader: singleton.NewSingletonLoader(logger, pluginLoader),
|
||||
DeviceManager: devicemanager.NoopMockManager(),
|
||||
Alloc: alloc,
|
||||
ClientConfig: conf,
|
||||
Consul: serviceClient,
|
||||
Task: task,
|
||||
TaskDir: taskDir,
|
||||
Logger: logger,
|
||||
Vault: vclient,
|
||||
StateDB: state.NoopDB{},
|
||||
StateUpdater: logUpdate,
|
||||
DeviceManager: devicemanager.NoopMockManager(),
|
||||
DriverManager: drivermanager.TestDriverManager(t),
|
||||
}
|
||||
|
||||
tr, err := taskrunner.NewTaskRunner(config)
|
||||
|
||||
Reference in New Issue
Block a user