diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 80fc1d6a1..6c1fbf1fd 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -17,13 +17,13 @@ import ( "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" cinterfaces "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" - "github.com/hashicorp/nomad/plugins/shared/loader" ) // allocRunner is used to run all the tasks in a given allocation @@ -131,13 +131,13 @@ type allocRunner struct { // prevAllocMigrator allows the migration of a previous allocations alloc dir. prevAllocMigrator allocwatcher.PrevAllocMigrator - // pluginSingletonLoader is a plugin loader that will returns singleton - // instances of the plugins. - pluginSingletonLoader loader.PluginCatalog - // devicemanager is used to mount devices as well as lookup device // statistics devicemanager devicemanager.Manager + + // driverManager is responsible for dispensing driver plugins and registering + // event handlers + driverManager drivermanager.Manager } // NewAllocRunner returns a new allocation runner. @@ -167,8 +167,8 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { deviceStatsReporter: config.DeviceStatsReporter, prevAllocWatcher: config.PrevAllocWatcher, prevAllocMigrator: config.PrevAllocMigrator, - pluginSingletonLoader: config.PluginSingletonLoader, devicemanager: config.DeviceManager, + driverManager: config.DriverManager, } // Create the logger based on the allocation ID @@ -195,18 +195,18 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { for _, task := range tasks { config := &taskrunner.Config{ - Alloc: ar.alloc, - ClientConfig: ar.clientConfig, - Task: task, - TaskDir: ar.allocDir.NewTaskDir(task.Name), - Logger: ar.logger, - StateDB: ar.stateDB, - StateUpdater: ar, - Consul: ar.consulClient, - Vault: ar.vaultClient, - PluginSingletonLoader: ar.pluginSingletonLoader, - DeviceStatsReporter: ar.deviceStatsReporter, - DeviceManager: ar.devicemanager, + Alloc: ar.alloc, + ClientConfig: ar.clientConfig, + Task: task, + TaskDir: ar.allocDir.NewTaskDir(task.Name), + Logger: ar.logger, + StateDB: ar.stateDB, + StateUpdater: ar, + Consul: ar.consulClient, + Vault: ar.vaultClient, + DeviceStatsReporter: ar.deviceStatsReporter, + DeviceManager: ar.devicemanager, + DriverManager: ar.driverManager, } // Create, but do not Run, the task runner diff --git a/client/allocrunner/config.go b/client/allocrunner/config.go index 6406d2a5a..16f3e27db 100644 --- a/client/allocrunner/config.go +++ b/client/allocrunner/config.go @@ -7,10 +7,10 @@ import ( "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" cstate "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/shared/loader" ) // Config holds the configuration for creating an allocation runner. @@ -45,14 +45,10 @@ type Config struct { // PrevAllocMigrator allows the migration of a previous allocations alloc dir PrevAllocMigrator allocwatcher.PrevAllocMigrator - // PluginLoader is used to load plugins. - PluginLoader loader.PluginCatalog - - // PluginSingletonLoader is a plugin loader that will returns singleton - // instances of the plugins. - PluginSingletonLoader loader.PluginCatalog - // DeviceManager is used to mount devices as well as lookup device // statistics DeviceManager devicemanager.Manager + + // DriverManager handles dispensing of driver plugins + DriverManager drivermanager.Manager } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 92ef7b067..b1749e350 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -21,17 +21,16 @@ import ( "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" cinterfaces "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" - "github.com/hashicorp/nomad/plugins/shared/loader" ) const ( @@ -174,13 +173,13 @@ type TaskRunner struct { // deviceStatsReporter is used to lookup resource usage for alloc devices deviceStatsReporter cinterfaces.DeviceStatsReporter - // PluginSingletonLoader is a plugin loader that will returns singleton - // instances of the plugins. - pluginSingletonLoader loader.PluginCatalog - // devicemanager is used to mount devices as well as lookup device // statistics devicemanager devicemanager.Manager + + // driverManager is used to dispense driver plugins and register event + // handlers + driverManager drivermanager.Manager } type Config struct { @@ -203,13 +202,13 @@ type Config struct { // deviceStatsReporter is used to lookup resource usage for alloc devices DeviceStatsReporter cinterfaces.DeviceStatsReporter - // PluginSingletonLoader is a plugin loader that will returns singleton - // instances of the plugins. - PluginSingletonLoader loader.PluginCatalog - // DeviceManager is used to mount devices as well as lookup device // statistics DeviceManager devicemanager.Manager + + // DriverManager is used to dispense driver plugins and register event + // handlers + DriverManager drivermanager.Manager } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -234,29 +233,29 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { } tr := &TaskRunner{ - alloc: config.Alloc, - allocID: config.Alloc.ID, - clientConfig: config.ClientConfig, - task: config.Task, - taskDir: config.TaskDir, - taskName: config.Task.Name, - taskLeader: config.Task.Leader, - envBuilder: envBuilder, - consulClient: config.Consul, - vaultClient: config.Vault, - state: tstate, - localState: state.NewLocalState(), - stateDB: config.StateDB, - stateUpdater: config.StateUpdater, - deviceStatsReporter: config.DeviceStatsReporter, - killCtx: killCtx, - killCtxCancel: killCancel, - ctx: trCtx, - ctxCancel: trCancel, - triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), - waitCh: make(chan struct{}), - pluginSingletonLoader: config.PluginSingletonLoader, - devicemanager: config.DeviceManager, + alloc: config.Alloc, + allocID: config.Alloc.ID, + clientConfig: config.ClientConfig, + task: config.Task, + taskDir: config.TaskDir, + taskName: config.Task.Name, + taskLeader: config.Task.Leader, + envBuilder: envBuilder, + consulClient: config.Consul, + vaultClient: config.Vault, + state: tstate, + localState: state.NewLocalState(), + stateDB: config.StateDB, + stateUpdater: config.StateUpdater, + deviceStatsReporter: config.DeviceStatsReporter, + killCtx: killCtx, + killCtxCancel: killCancel, + ctx: trCtx, + ctxCancel: trCancel, + triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), + waitCh: make(chan struct{}), + devicemanager: config.DeviceManager, + driverManager: config.DriverManager, } // Create the logger based on the allocation ID @@ -568,6 +567,9 @@ func (tr *TaskRunner) runDriver() error { //XXX Evaluate and encode driver config + // Register an event handler with the diver mananger to emit task events + tr.driverManager.RegisterEventHandler(tr.Task().Driver, taskConfig.ID, tr.handleTaskEvent) + // If there's already a task handle (eg from a Restore) there's nothing // to do except update state. if tr.getDriverHandle() != nil { @@ -606,6 +608,15 @@ func (tr *TaskRunner) runDriver() error { return nil } +func (tr *TaskRunner) handleTaskEvent(ev *drivers.TaskEvent) { + tr.EmitEvent(&structs.TaskEvent{ + Type: structs.TaskDriverMessage, + Time: ev.Timestamp.UnixNano(), + Details: ev.Annotations, + DriverMessage: ev.Message, + }) +} + // initDriver creates the driver for the task /*func (tr *TaskRunner) initDriver() error { // Create a task-specific event emitter callback to expose minimal @@ -639,17 +650,10 @@ func (tr *TaskRunner) runDriver() error { // initDriver retrives the DriverPlugin from the plugin loader for this task func (tr *TaskRunner) initDriver() error { - plugin, err := tr.pluginSingletonLoader.Dispense(tr.Task().Driver, base.PluginTypeDriver, tr.clientConfig.NomadPluginConfig(), tr.logger) + driver, err := tr.driverManager.Dispense(tr.Task().Driver) if err != nil { return err } - - // XXX need to be able to reattach to running drivers - driver, ok := plugin.Plugin().(drivers.DriverPlugin) - if !ok { - return fmt.Errorf("plugin loaded for driver %s does not implement DriverPlugin interface", tr.task.Driver) - } - tr.driver = driver schema, err := tr.driver.TaskConfigSchema() diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 19a42aa71..54037dcbe 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/client/config" consulapi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" cstate "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" mockdriver "github.com/hashicorp/nomad/drivers/mock" @@ -19,8 +20,6 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" - "github.com/hashicorp/nomad/plugins/shared/catalog" - "github.com/hashicorp/nomad/plugins/shared/singleton" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -47,7 +46,6 @@ func (m *MockTaskStateUpdater) TaskStateUpdated() { // plus a cleanup func. func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName string) (*Config, func()) { logger := testlog.HCLogger(t) - pluginLoader := catalog.TestPluginLoader(t) clientConf, cleanup := config.TestClientConfig(t) // Find the task @@ -85,17 +83,17 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri } conf := &Config{ - Alloc: alloc, - ClientConfig: clientConf, - Consul: consulapi.NewMockConsulServiceClient(t, logger), - Task: thisTask, - TaskDir: taskDir, - Logger: clientConf.Logger, - Vault: vaultclient.NewMockVaultClient(), - StateDB: cstate.NoopDB{}, - StateUpdater: NewMockTaskStateUpdater(), - PluginSingletonLoader: singleton.NewSingletonLoader(logger, pluginLoader), - DeviceManager: devicemanager.NoopMockManager(), + Alloc: alloc, + ClientConfig: clientConf, + Consul: consulapi.NewMockConsulServiceClient(t, logger), + Task: thisTask, + TaskDir: taskDir, + Logger: clientConf.Logger, + Vault: vaultclient.NewMockVaultClient(), + StateDB: cstate.NoopDB{}, + StateUpdater: NewMockTaskStateUpdater(), + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), } return conf, trCleanup } @@ -196,14 +194,9 @@ func TestTaskRunner_TaskEnv(t *testing.T) { } // Get the mock driver plugin - driverPlugin, err := conf.PluginSingletonLoader.Dispense( - mockdriver.PluginID.Name, - mockdriver.PluginID.PluginType, - nil, - conf.Logger, - ) + driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name) require.NoError(err) - mockDriver := driverPlugin.Plugin().(*mockdriver.Driver) + mockDriver := driverPlugin.(*mockdriver.Driver) // Assert its config has been properly interpolated driverCfg, mockCfg := mockDriver.GetTaskConfig() diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index 0a3851b1d..7c8e4c296 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -8,11 +8,10 @@ import ( clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/shared/catalog" - "github.com/hashicorp/nomad/plugins/shared/singleton" "github.com/stretchr/testify/require" ) @@ -51,21 +50,20 @@ func (m *MockStateUpdater) Reset() { } func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) { - pluginLoader := catalog.TestPluginLoader(t) clientConf, cleanup := clientconfig.TestClientConfig(t) conf := &Config{ // Copy the alloc in case the caller edits and reuses it - Alloc: alloc.Copy(), - Logger: clientConf.Logger, - ClientConfig: clientConf, - StateDB: state.NoopDB{}, - Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), - Vault: vaultclient.NewMockVaultClient(), - StateUpdater: &MockStateUpdater{}, - PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, - PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, - PluginSingletonLoader: singleton.NewSingletonLoader(clientConf.Logger, pluginLoader), - DeviceManager: devicemanager.NoopMockManager(), + Alloc: alloc.Copy(), + Logger: clientConf.Logger, + ClientConfig: clientConf, + StateDB: state.NoopDB{}, + Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), + Vault: vaultclient.NewMockVaultClient(), + StateUpdater: &MockStateUpdater{}, + PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, + PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), } return conf, cleanup } diff --git a/client/client.go b/client/client.go index 882b7d42e..15acbb31b 100644 --- a/client/client.go +++ b/client/client.go @@ -28,6 +28,8 @@ import ( consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/client/pluginmanager" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/servers" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/stats" @@ -219,9 +221,15 @@ type Client struct { endpoints rpcEndpoints streamingRpcs *structs.StreamingRpcRegistry + // pluginManagers is the set of PluginManagers registered by the client + pluginManagers *pluginmanager.PluginGroup + // devicemanger is responsible for managing device plugins. devicemanager devicemanager.Manager + // drivermanager is responsible for managing driver plugins + drivermanager drivermanager.Manager + // baseLabels are used when emitting tagged metrics. All client metrics will // have these tags, and optionally more. baseLabels []metrics.Label @@ -298,15 +306,35 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic c.configCopy = c.config.Copy() c.configLock.Unlock() - fingerprintManager := NewFingerprintManager(c.configCopy.PluginSingletonLoader, c.GetConfig, c.configCopy.Node, - c.shutdownCh, c.updateNodeFromFingerprint, c.updateNodeFromDriver, - c.logger) + fingerprintManager := NewFingerprintManager( + c.configCopy.PluginSingletonLoader, c.GetConfig, c.configCopy.Node, + c.shutdownCh, c.updateNodeFromFingerprint, c.logger) + + c.pluginManagers = pluginmanager.New(c.logger) // Fingerprint the node and scan for drivers if err := fingerprintManager.Run(); err != nil { return nil, fmt.Errorf("fingerprinting failed: %v", err) } + // Build the white/blacklists of drivers. + allowlistDrivers := cfg.ReadStringListToMap("driver.whitelist") + blocklistDrivers := cfg.ReadStringListToMap("driver.blacklist") + + // Setup the driver manager + driverConfig := &drivermanager.Config{ + Logger: c.logger, + Loader: c.configCopy.PluginSingletonLoader, + PluginConfig: c.configCopy.NomadPluginConfig(), + Updater: c.updateNodeFromDriver, + State: c.stateDB, + AllowedDrivers: allowlistDrivers, + BlockedDrivers: blocklistDrivers, + } + drvManager := drivermanager.New(driverConfig) + c.drivermanager = drvManager + c.pluginManagers.RegisterAndRun(drvManager) + // Setup the device manager devConfig := &devicemanager.Config{ Logger: c.logger, @@ -316,8 +344,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic StatsInterval: c.configCopy.StatsCollectionInterval, State: c.stateDB, } - c.devicemanager = devicemanager.New(devConfig) - go c.devicemanager.Run() + devManager := devicemanager.New(devConfig) + c.devicemanager = devManager + c.pluginManagers.RegisterAndRun(devManager) // Add the stats collector statsCollector := stats.NewHostStatsCollector(c.logger, c.config.AllocDir, c.devicemanager.AllStats) @@ -358,6 +387,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic return nil, fmt.Errorf("failed to setup vault client: %v", err) } + // Wait for plugin manangers to initialize + pluginReadyCh, err := c.pluginManagers.Ready(pluginmanager.DefaultManagerReadyTimeout) + if err != nil { + return nil, err + } + <-pluginReadyCh + // Restore the state if err := c.restoreState(); err != nil { logger.Error("failed to restore state", "error", err) @@ -558,8 +594,8 @@ func (c *Client) Shutdown() error { } c.logger.Info("shutting down") - // Shutdown the device manager - c.devicemanager.Shutdown() + // Shutdown the plugin managers + c.pluginManagers.Shutdown() // Stop renewing tokens and secrets if c.vaultClient != nil { @@ -866,19 +902,18 @@ func (c *Client) restoreState() error { c.configLock.RLock() arConf := &allocrunner.Config{ - Alloc: alloc, - Logger: c.logger, - ClientConfig: c.configCopy, - StateDB: c.stateDB, - StateUpdater: c, - DeviceStatsReporter: c, - Consul: c.consulService, - Vault: c.vaultClient, - PrevAllocWatcher: prevAllocWatcher, - PrevAllocMigrator: prevAllocMigrator, - PluginLoader: c.config.PluginLoader, - PluginSingletonLoader: c.config.PluginSingletonLoader, - DeviceManager: c.devicemanager, + Alloc: alloc, + Logger: c.logger, + ClientConfig: c.config, + StateDB: c.stateDB, + StateUpdater: c, + DeviceStatsReporter: c, + Consul: c.consulService, + Vault: c.vaultClient, + PrevAllocWatcher: prevAllocWatcher, + PrevAllocMigrator: prevAllocMigrator, + DeviceManager: c.devicemanager, + DriverManager: c.drivermanager, } c.configLock.RUnlock() @@ -2056,19 +2091,18 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error // we don't have to do a copy. c.configLock.RLock() arConf := &allocrunner.Config{ - Alloc: alloc, - Logger: c.logger, - ClientConfig: c.configCopy, - StateDB: c.stateDB, - Consul: c.consulService, - Vault: c.vaultClient, - StateUpdater: c, - DeviceStatsReporter: c, - PrevAllocWatcher: prevAllocWatcher, - PrevAllocMigrator: prevAllocMigrator, - PluginLoader: c.config.PluginLoader, - PluginSingletonLoader: c.config.PluginSingletonLoader, - DeviceManager: c.devicemanager, + Alloc: alloc, + Logger: c.logger, + ClientConfig: c.config, + StateDB: c.stateDB, + Consul: c.consulService, + Vault: c.vaultClient, + StateUpdater: c, + DeviceStatsReporter: c, + PrevAllocWatcher: prevAllocWatcher, + PrevAllocMigrator: prevAllocMigrator, + DeviceManager: c.devicemanager, + DriverManager: c.drivermanager, } c.configLock.RUnlock() diff --git a/client/devicemanager/manager.go b/client/devicemanager/manager.go index 2709b2230..8fb0abc79 100644 --- a/client/devicemanager/manager.go +++ b/client/devicemanager/manager.go @@ -18,14 +18,8 @@ import ( "github.com/hashicorp/nomad/plugins/shared/loader" ) -// Manaager is the interface used to manage device plugins +// Manager is the interface used to manage device plugins type Manager interface { - // Run starts the device manager - Run() - - // Shutdown shutsdown the manager and all launched plugins - Shutdown() - // Reserve is used to reserve a set of devices Reserve(d *structs.AllocatedDeviceResource) (*device.ContainerReservation, error) @@ -127,6 +121,9 @@ func New(c *Config) *manager { } } +// PluginType identifies this manager to the plugin manager and satisfies the PluginManager interface. +func (*manager) PluginType() string { return base.PluginTypeDevice } + // Run starts thed device manager. The manager will shutdown any previously // launched plugin and then begin fingerprinting and stats collection on all new // device plugins. @@ -161,16 +158,6 @@ func (m *manager) Run() { }) } - // XXX we should eventually remove this and have it be done in the client - // Give all the fingerprinters a chance to run at least once before we - // update the node. This prevents initial fingerprinting from causing too - // many server side updates. - ctx, cancel := context.WithTimeout(m.ctx, 5*time.Second) - for _, i := range m.instances { - i.WaitForFirstFingerprint(ctx) - } - cancel() - // Now start the fingerprint handler for { select { @@ -207,6 +194,19 @@ func (m *manager) Shutdown() { } } +func (m *manager) Ready() <-chan struct{} { + ret := make(chan struct{}) + go func() { + ctx, cancel := context.WithTimeout(m.ctx, 5*time.Second) + for _, i := range m.instances { + i.WaitForFirstFingerprint(ctx) + } + cancel() + close(ret) + }() + return ret +} + // Reserve reserves the given allocated device. If the device is unknown, an // UnknownDeviceErr is returned. func (m *manager) Reserve(d *structs.AllocatedDeviceResource) (*device.ContainerReservation, error) { diff --git a/client/fingerprint_manager.go b/client/fingerprint_manager.go index 4b856d9da..03a2b3ca9 100644 --- a/client/fingerprint_manager.go +++ b/client/fingerprint_manager.go @@ -1,8 +1,6 @@ package client import ( - "context" - "fmt" "sync" "time" @@ -10,20 +8,7 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/base" - "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/shared/loader" - pstructs "github.com/hashicorp/nomad/plugins/shared/structs" -) - -const ( - // driverFPBackoffBaseline is the baseline time for exponential backoff while - // fingerprinting a driver. - driverFPBackoffBaseline = 5 * time.Second - - // driverFPBackoffLimit is the limit of the exponential backoff for fingerprinting - // a driver. - driverFPBackoffLimit = 2 * time.Minute ) // FingerprintManager runs a client fingerprinters on a continuous basis, and @@ -39,10 +24,7 @@ type FingerprintManager struct { // associated node updateNodeAttributes func(*fingerprint.FingerprintResponse) *structs.Node - // updateNodeFromDriver is a callback to the client to update the state of a - // specific driver for the node - updateNodeFromDriver func(string, *structs.DriverInfo) *structs.Node - logger log.Logger + logger log.Logger } // NewFingerprintManager is a constructor that creates and returns an instance @@ -53,14 +35,12 @@ func NewFingerprintManager( node *structs.Node, shutdownCh chan struct{}, updateNodeAttributes func(*fingerprint.FingerprintResponse) *structs.Node, - updateNodeFromDriver func(string, *structs.DriverInfo) *structs.Node, logger log.Logger) *FingerprintManager { return &FingerprintManager{ singletonLoader: singletonLoader, getConfig: getConfig, updateNodeAttributes: updateNodeAttributes, - updateNodeFromDriver: updateNodeFromDriver, node: node, shutdownCh: shutdownCh, logger: logger.Named("fingerprint_mgr"), @@ -120,39 +100,6 @@ func (fp *FingerprintManager) Run() error { "skipped_fingerprinters", skippedFingerprints) } - // Next, set up drivers - // Build the white/blacklists of drivers. - whitelistDrivers := cfg.ReadStringListToMap("driver.whitelist") - whitelistDriversEnabled := len(whitelistDrivers) > 0 - blacklistDrivers := cfg.ReadStringListToMap("driver.blacklist") - - var availDrivers []string - var skippedDrivers []string - - for _, pl := range fp.singletonLoader.Catalog()[base.PluginTypeDriver] { - name := pl.Name - // Skip fingerprinting drivers that are not in the whitelist if it is - // enabled. - if _, ok := whitelistDrivers[name]; whitelistDriversEnabled && !ok { - skippedDrivers = append(skippedDrivers, name) - continue - } - // Skip fingerprinting drivers that are in the blacklist - if _, ok := blacklistDrivers[name]; ok { - skippedDrivers = append(skippedDrivers, name) - continue - } - - availDrivers = append(availDrivers, name) - } - - if err := fp.setupDrivers(availDrivers); err != nil { - return err - } - - if len(skippedDrivers) > 0 { - fp.logger.Debug("drivers skipped due to white/blacklist", "skipped_drivers", skippedDrivers) - } return nil } @@ -189,38 +136,6 @@ func (fm *FingerprintManager) setupFingerprinters(fingerprints []string) error { return nil } -// setupDrivers is used to fingerprint the node to see if these drivers are -// supported -func (fm *FingerprintManager) setupDrivers(driverNames []string) error { - //TODO(alex,hclog) Update fingerprinters to hclog - var availDrivers []string - for _, name := range driverNames { - // TODO: driver reattach - fingerCh, cancel, err := fm.dispenseDriverFingerprint(name) - if err != nil { - return err - } - - finger := <-fingerCh - - // Start a periodic watcher to detect changes to a drivers health and - // attributes. - go fm.watchDriverFingerprint(fingerCh, name, cancel) - - if fm.logger.IsTrace() { - fm.logger.Trace("initial driver fingerprint", "driver", name, "fingerprint", finger) - } - // Log the fingerprinters which have been applied - if finger.Health != drivers.HealthStateUndetected { - availDrivers = append(availDrivers, name) - } - fm.processDriverFingerprint(finger, name) - } - - fm.logger.Debug("detected drivers", "drivers", availDrivers) - return nil -} - // runFingerprint runs each fingerprinter individually on an ongoing basis func (fm *FingerprintManager) runFingerprint(f fingerprint.Fingerprint, period time.Duration, name string) { fm.logger.Debug("fingerprinting periodically", "fingerprinter", name, "period", period) @@ -266,99 +181,3 @@ func (fm *FingerprintManager) fingerprint(name string, f fingerprint.Fingerprint return response.Detected, nil } - -// watchDrivers facilitates the different periods between fingerprint and -// health checking a driver -func (fm *FingerprintManager) watchDriverFingerprint(fpChan <-chan *drivers.Fingerprint, name string, cancel context.CancelFunc) { - var backoff time.Duration - var retry int - for { - if backoff > 0 { - time.Sleep(backoff) - } - select { - case <-fm.shutdownCh: - cancel() - return - case fp, ok := <-fpChan: - if ok && fp.Err == nil { - fm.processDriverFingerprint(fp, name) - continue - } - // if the channel is closed attempt to open a new one - newFpChan, newCancel, err := fm.dispenseDriverFingerprint(name) - if err != nil { - fm.logger.Warn("failed to fingerprint driver, retrying in 30s", "error", err, "retry", retry) - di := &structs.DriverInfo{ - Healthy: false, - HealthDescription: "failed to fingerprint driver", - UpdateTime: time.Now(), - } - if n := fm.updateNodeFromDriver(name, di); n != nil { - fm.setNode(n) - } - - // Calculate the new backoff - backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline - if backoff > driverFPBackoffLimit { - backoff = driverFPBackoffLimit - } - retry++ - continue - } - cancel() - fpChan = newFpChan - cancel = newCancel - - // Reset backoff - backoff = 0 - retry = 0 - } - } -} - -// processDriverFringerprint converts a Fingerprint from a driver into a DriverInfo -// struct and updates the Node with it -func (fm *FingerprintManager) processDriverFingerprint(fp *drivers.Fingerprint, driverName string) { - di := &structs.DriverInfo{ - Attributes: stringify(fp.Attributes), - Detected: fp.Health != drivers.HealthStateUndetected, - Healthy: fp.Health == drivers.HealthStateHealthy, - HealthDescription: fp.HealthDescription, - UpdateTime: time.Now(), - } - if n := fm.updateNodeFromDriver(driverName, di); n != nil { - fm.setNode(n) - } -} -func stringify(attributes map[string]*pstructs.Attribute) map[string]string { - ret := make(map[string]string, len(attributes)) - for key, attribute := range attributes { - ret[key] = attribute.GoString() - } - return ret -} - -// dispenseDriverFingerprint dispenses a driver plugin for the given driver name -// and requests a fingerprint channel. The channel and a context cancel function -// is returned to the caller -func (fm *FingerprintManager) dispenseDriverFingerprint(driverName string) (<-chan *drivers.Fingerprint, context.CancelFunc, error) { - plug, err := fm.singletonLoader.Dispense(driverName, base.PluginTypeDriver, fm.getConfig().NomadPluginConfig(), fm.logger) - if err != nil { - return nil, nil, err - } - - driver, ok := plug.Plugin().(drivers.DriverPlugin) - if !ok { - return nil, nil, fmt.Errorf("registered driver plugin %q does not implement DriverPlugin interface", driverName) - } - - ctx, cancel := context.WithCancel(context.Background()) - fingerCh, err := driver.Fingerprint(ctx) - if err != nil { - cancel() - return nil, nil, err - } - - return fingerCh, cancel, nil -} diff --git a/client/fingerprint_manager_test.go b/client/fingerprint_manager_test.go index f9095a0b2..a70000954 100644 --- a/client/fingerprint_manager_test.go +++ b/client/fingerprint_manager_test.go @@ -1,50 +1,12 @@ package client import ( - "fmt" "testing" - log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/helper/testlog" - "github.com/hashicorp/nomad/plugins/base" - "github.com/hashicorp/nomad/plugins/shared/loader" - "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" - - // registering raw_exec driver plugin used in testing - _ "github.com/hashicorp/nomad/drivers/rawexec" ) -func TestFingerprintManager_Run_MockDriver(t *testing.T) { - t.Skip("missing mock driver plugin implementation") - t.Parallel() - require := require.New(t) - testClient, cleanup := TestClient(t, nil) - - testClient.logger = testlog.HCLogger(t) - defer cleanup() - - fm := NewFingerprintManager( - testClient.config.PluginSingletonLoader, - testClient.GetConfig, - testClient.config.Node, - testClient.shutdownCh, - testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, - testlog.HCLogger(t), - ) - - err := fm.Run() - require.Nil(err) - - node := testClient.config.Node - - require.NotNil(node.Drivers["mock_driver"]) - require.True(node.Drivers["mock_driver"].Detected) - require.True(node.Drivers["mock_driver"].Healthy) -} - func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) { t.Parallel() require := require.New(t) @@ -57,7 +19,6 @@ func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) { testClient.config.Node, testClient.shutdownCh, testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, testClient.logger, ) @@ -71,295 +32,7 @@ func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) { require.NotZero(node.Resources.DiskMB) } -func TestFingerprintManager_Fingerprint_Run(t *testing.T) { - t.Parallel() - require := require.New(t) - testClient, cleanup := TestClient(t, func(c *config.Config) { - c.Options = map[string]string{ - "driver.raw_exec.enable": "true", - } - }) - defer cleanup() - - fm := NewFingerprintManager( - testClient.config.PluginSingletonLoader, - testClient.GetConfig, - testClient.config.Node, - testClient.shutdownCh, - testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, - testClient.logger, - ) - - err := fm.Run() - require.Nil(err) - - node := testClient.config.Node - - require.NotNil(node.Drivers["raw_exec"]) - require.True(node.Drivers["raw_exec"].Detected) - require.True(node.Drivers["raw_exec"].Healthy) -} - -func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) { - t.Skip("missing mock driver plugin implementation") - t.Parallel() - require := require.New(t) - testClient, cleanup := TestClient(t, func(c *config.Config) { - c.Options = map[string]string{ - "test.shutdown_periodic_after": "true", - "test.shutdown_periodic_duration": "2", - } - }) - defer cleanup() - - fm := NewFingerprintManager( - testClient.config.PluginSingletonLoader, - testClient.GetConfig, - testClient.config.Node, - testClient.shutdownCh, - testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, - testClient.logger, - ) - - err := fm.Run() - require.Nil(err) - - { - // Ensure the mock driver is registered and healthy on the client - testutil.WaitForResult(func() (bool, error) { - fm.nodeLock.Lock() - defer fm.nodeLock.Unlock() - node := fm.node - dinfo, ok := node.Drivers["mock_driver"] - if !ok || !dinfo.Detected || !dinfo.Healthy { - return false, fmt.Errorf("mock driver should be detected and healthy: %+v", dinfo) - } - - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - } - // Ensure that the client fingerprinter eventually removes this attribute and - // marks the driver as unhealthy - { - testutil.WaitForResult(func() (bool, error) { - fm.nodeLock.Lock() - defer fm.nodeLock.Unlock() - node := fm.node - dinfo, ok := node.Drivers["mock_driver"] - if !ok || dinfo.Detected || dinfo.Healthy { - return false, fmt.Errorf("mock driver should not be detected and healthy") - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - } -} - -// This is a temporary measure to check that a driver has both attributes on a -// node set as well as DriverInfo. -func TestFingerprintManager_HealthCheck_Driver(t *testing.T) { - t.Skip("missing mock driver plugin implementation") - t.Parallel() - require := require.New(t) - testClient, cleanup := TestClient(t, func(c *config.Config) { - c.Options = map[string]string{ - "driver.raw_exec.enable": "1", - "test.shutdown_periodic_after": "true", - "test.shutdown_periodic_duration": "2", - } - }) - defer cleanup() - - fm := NewFingerprintManager( - testClient.config.PluginSingletonLoader, - testClient.GetConfig, - testClient.config.Node, - testClient.shutdownCh, - testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, - testClient.logger, - ) - - err := fm.Run() - require.Nil(err) - - // Ensure the mock driver is registered and healthy on the client - testutil.WaitForResult(func() (bool, error) { - fm.nodeLock.Lock() - node := fm.node - defer fm.nodeLock.Unlock() - - mockDriverAttribute := node.Attributes["driver.mock_driver"] - if mockDriverAttribute == "" { - return false, fmt.Errorf("mock driver info should be set on the client attributes") - } - mockDriverInfo := node.Drivers["mock_driver"] - if mockDriverInfo == nil { - return false, fmt.Errorf("mock driver info should be set on the client") - } - if !mockDriverInfo.Healthy { - return false, fmt.Errorf("mock driver info should be healthy") - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Ensure that a default driver without health checks enabled is registered and healthy on the client - testutil.WaitForResult(func() (bool, error) { - fm.nodeLock.Lock() - node := fm.node - defer fm.nodeLock.Unlock() - - rawExecAttribute := node.Attributes["driver.raw_exec"] - if rawExecAttribute == "" { - return false, fmt.Errorf("raw exec info should be set on the client attributes") - } - rawExecInfo := node.Drivers["raw_exec"] - if rawExecInfo == nil { - return false, fmt.Errorf("raw exec driver info should be set on the client") - } - if !rawExecInfo.Detected { - return false, fmt.Errorf("raw exec driver should be detected") - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Ensure the mock driver is registered - testutil.WaitForResult(func() (bool, error) { - fm.nodeLock.Lock() - node := fm.node - defer fm.nodeLock.Unlock() - - mockDriverAttribute := node.Attributes["driver.mock_driver"] - if mockDriverAttribute == "" { - return false, fmt.Errorf("mock driver info should set on the client attributes") - } - mockDriverInfo := node.Drivers["mock_driver"] - if mockDriverInfo == nil { - return false, fmt.Errorf("mock driver info should be set on the client") - } - if !mockDriverInfo.Healthy { - return false, fmt.Errorf("mock driver info should not be healthy") - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Ensure that we don't duplicate health check information on the driver - // health information - fm.nodeLock.Lock() - node := fm.node - fm.nodeLock.Unlock() - mockDriverAttributes := node.Drivers["mock_driver"].Attributes - require.NotContains(mockDriverAttributes, "driver.mock_driver") -} - -func TestFingerprintManager_HealthCheck_Periodic(t *testing.T) { - t.Skip("missing mock driver plugin implementation") - t.Parallel() - require := require.New(t) - testClient, cleanup := TestClient(t, func(c *config.Config) { - c.Options = map[string]string{ - "test.shutdown_periodic_after": "true", - "test.shutdown_periodic_duration": "2", - } - }) - defer cleanup() - - fm := NewFingerprintManager( - testClient.config.PluginSingletonLoader, - testClient.GetConfig, - testClient.config.Node, - testClient.shutdownCh, - testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, - testClient.logger, - ) - - err := fm.Run() - require.Nil(err) - - { - // Ensure the mock driver is registered and healthy on the client - testutil.WaitForResult(func() (bool, error) { - fm.nodeLock.Lock() - node := fm.node - defer fm.nodeLock.Unlock() - - mockDriverInfo := node.Drivers["mock_driver"] - if mockDriverInfo == nil { - return false, fmt.Errorf("mock driver info should be set on the client") - } - if !mockDriverInfo.Detected { - return false, fmt.Errorf("mock driver info should be detected") - } - if !mockDriverInfo.Healthy { - return false, fmt.Errorf("mock driver info should be healthy") - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - } - { - // Ensure that the client health check eventually removes this attribute and - // marks the driver as unhealthy - testutil.WaitForResult(func() (bool, error) { - fm.nodeLock.Lock() - node := fm.node - defer fm.nodeLock.Unlock() - - mockDriverInfo := node.Drivers["mock_driver"] - if mockDriverInfo == nil { - return false, fmt.Errorf("mock driver info should be set on the client") - } - if !mockDriverInfo.Detected { - return false, fmt.Errorf("mock driver info should be detected") - } - if !mockDriverInfo.Healthy { - return false, fmt.Errorf("mock driver info should be healthy") - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - } - { - // Ensure that the client health check eventually removes this attribute and - // marks the driver as unhealthy - testutil.WaitForResult(func() (bool, error) { - fm.nodeLock.Lock() - node := fm.node - defer fm.nodeLock.Unlock() - - mockDriverInfo := node.Drivers["mock_driver"] - if mockDriverInfo == nil { - return false, fmt.Errorf("mock driver info should be set on the client") - } - if mockDriverInfo.Detected { - return false, fmt.Errorf("mock driver should be detected") - } - if mockDriverInfo.Healthy { - return false, fmt.Errorf("mock driver should not be healthy") - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - } -} - func TestFimgerprintManager_Run_InWhitelist(t *testing.T) { - t.Skip("missing mock driver plugin implementation") t.Parallel() require := require.New(t) @@ -377,7 +50,6 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) { testClient.config.Node, testClient.shutdownCh, testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, testClient.logger, ) @@ -406,7 +78,6 @@ func TestFingerprintManager_Run_InBlacklist(t *testing.T) { testClient.config.Node, testClient.shutdownCh, testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, testClient.logger, ) @@ -437,7 +108,6 @@ func TestFingerprintManager_Run_Combination(t *testing.T) { testClient.config.Node, testClient.shutdownCh, testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, testClient.logger, ) @@ -451,187 +121,3 @@ func TestFingerprintManager_Run_Combination(t *testing.T) { require.NotContains(node.Attributes, "memory.totalbytes") require.NotContains(node.Attributes, "os.name") } - -func TestFingerprintManager_Run_WhitelistDrivers(t *testing.T) { - t.Parallel() - require := require.New(t) - testClient, cleanup := TestClient(t, func(c *config.Config) { - c.Options = map[string]string{ - "driver.raw_exec.enable": "1", - "driver.whitelist": " raw_exec , foo ", - } - }) - defer cleanup() - - fm := NewFingerprintManager( - testClient.config.PluginSingletonLoader, - testClient.GetConfig, - testClient.config.Node, - testClient.shutdownCh, - testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, - testClient.logger, - ) - - err := fm.Run() - require.Nil(err) - - node := testClient.config.Node - require.NotNil(node.Drivers["raw_exec"]) - require.NotContains(node.Drivers, "java") -} - -func TestFingerprintManager_Run_AllDriversBlacklisted(t *testing.T) { - t.Parallel() - require := require.New(t) - - testClient, cleanup := TestClient(t, func(c *config.Config) { - c.Options = map[string]string{ - "driver.raw_exec.enable": "1", - "driver.whitelist": " foo,bar,baz ", - } - }) - defer cleanup() - - fm := NewFingerprintManager( - testClient.config.PluginSingletonLoader, - testClient.GetConfig, - testClient.config.Node, - testClient.shutdownCh, - testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, - testClient.logger, - ) - - err := fm.Run() - require.Nil(err) - - node := testClient.config.Node - - require.NotContains(node.Attributes, "driver.raw_exec") - // TODO(nickethier): uncomment after missing driver implementations added - //require.NotContains(node.Attributes, "driver.exec") - //require.NotContains(node.Attributes, "driver.docker") -} - -func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing.T) { - t.Parallel() - require := require.New(t) - - testClient, cleanup := TestClient(t, func(c *config.Config) { - c.Options = map[string]string{ - "driver.raw_exec.enable": "1", - "driver.whitelist": " raw_exec,exec,foo,bar,baz ", - "driver.blacklist": " exec,foo,bar,baz ", - } - }) - - testClient.logger = testlog.HCLogger(t) - defer cleanup() - - fm := NewFingerprintManager( - testClient.config.PluginSingletonLoader, - testClient.GetConfig, - testClient.config.Node, - testClient.shutdownCh, - testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, - testClient.logger, - ) - - err := fm.Run() - require.Nil(err) - - node := testClient.config.Node - - require.NotNil(node.Drivers["raw_exec"]) - require.NotContains(node.Drivers, "exec") -} - -func TestFingerprintManager_Run_DriverFailure(t *testing.T) { - t.Parallel() - require := require.New(t) - - testClient, cleanup := TestClient(t, func(c *config.Config) { - c.Options = map[string]string{ - "driver.raw_exec.enable": "1", - } - }) - - testClient.logger = testlog.HCLogger(t) - defer cleanup() - - singLoader := testClient.config.PluginSingletonLoader - - dispenseCalls := 0 - loader := &loader.MockCatalog{ - DispenseF: func(name, pluginType string, cfg *base.AgentConfig, logger log.Logger) (loader.PluginInstance, error) { - if pluginType == base.PluginTypeDriver && name == "raw_exec" { - dispenseCalls++ - } - return singLoader.Dispense(name, pluginType, cfg, logger) - }, - ReattachF: singLoader.Reattach, - CatalogF: singLoader.Catalog, - } - - fm := NewFingerprintManager( - loader, - testClient.GetConfig, - testClient.config.Node, - testClient.shutdownCh, - testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, - testClient.logger, - ) - - fpChan, cancel, err := fm.dispenseDriverFingerprint("raw_exec") - require.NoError(err) - require.Equal(1, dispenseCalls) - - cancel() - go fm.watchDriverFingerprint(fpChan, "raw_exec", cancel) - - testutil.WaitForResult(func() (bool, error) { - if 2 != dispenseCalls { - return false, fmt.Errorf("expected dispenseCalls to be 2 but was %d", dispenseCalls) - } - return true, nil - }, func(err error) { - require.NoError(err) - }) -} - -func TestFingerprintManager_Run_DriversInBlacklist(t *testing.T) { - t.Parallel() - require := require.New(t) - - testClient, cleanup := TestClient(t, func(c *config.Config) { - c.Options = map[string]string{ - "driver.raw_exec.enable": "1", - "driver.whitelist": " raw_exec,foo,bar,baz ", - "driver.blacklist": " exec,foo,bar,baz ", - } - }) - - testClient.logger = testlog.HCLogger(t) - defer cleanup() - - fm := NewFingerprintManager( - testClient.config.PluginSingletonLoader, - testClient.GetConfig, - testClient.config.Node, - testClient.shutdownCh, - testClient.updateNodeFromFingerprint, - testClient.updateNodeFromDriver, - testClient.logger, - ) - - err := fm.Run() - require.Nil(err) - - node := testClient.config.Node - - require.NotNil(node.Drivers["raw_exec"]) - require.NotContains(node.Drivers, "exec") -} diff --git a/client/pluginmanager/drivermanager/instance.go b/client/pluginmanager/drivermanager/instance.go new file mode 100644 index 000000000..af467f16e --- /dev/null +++ b/client/pluginmanager/drivermanager/instance.go @@ -0,0 +1,474 @@ +package drivermanager + +import ( + "context" + "fmt" + "sync" + "time" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared/loader" + "github.com/hashicorp/nomad/plugins/shared/singleton" +) + +const ( + // driverFPBackoffBaseline is the baseline time for exponential backoff while + // fingerprinting a driver. + driverFPBackoffBaseline = 5 * time.Second + + // driverFPBackoffLimit is the limit of the exponential backoff for fingerprinting + // a driver. + driverFPBackoffLimit = 2 * time.Minute +) + +// instanceManagerConfig configures a driver instance manager +type instanceManagerConfig struct { + // Logger is the logger used by the driver instance manager + Logger log.Logger + + // Ctx is used to shutdown the driver instance manager + Ctx context.Context + + // Loader is the plugin loader + Loader loader.PluginCatalog + + // StoreReattach is used to store a plugins reattach config + StoreReattach StorePluginReattachFn + + // FetchReattach is used to retrieve a plugin's reattach config + FetchReattach FetchPluginReattachFn + + // PluginConfig is the config passed to the launched plugins + PluginConfig *base.ClientAgentConfig + + // ID is the ID of the plugin being managed + ID *loader.PluginID + + // updateNodeFromDriver is the callback used to update the node from fingerprinting + UpdateNodeFromDriver UpdateNodeDriverInfoFn +} + +// instanceManager is used to manage a single driver plugin +type instanceManager struct { + // logger is the logger used by the driver instance manager + logger log.Logger + + // ctx is used to shutdown the driver manager + ctx context.Context + + // cancel is used to shutdown management of this driver plugin + cancel context.CancelFunc + + // loader is the plugin loader + loader loader.PluginCatalog + + // storeReattach is used to store a plugins reattach config + storeReattach StorePluginReattachFn + + // fetchReattach is used to retrieve a plugin's reattach config + fetchReattach FetchPluginReattachFn + + // pluginConfig is the config passed to the launched plugins + pluginConfig *base.ClientAgentConfig + + // id is the ID of the plugin being managed + id *loader.PluginID + + // plugin is the plugin instance being managed + plugin loader.PluginInstance + + // driver is the driver plugin being managed + driver drivers.DriverPlugin + + // pluginLock locks access to the driver and plugin + pluginLock sync.Mutex + + // shutdownLock is used to serialize attempts to shutdown + shutdownLock sync.Mutex + + // updateNodeFromDriver is the callback used to update the node from fingerprinting + updateNodeFromDriver UpdateNodeDriverInfoFn + + // handlers is the map of taskID to handler func + handlers map[string]EventHandler + handlersLock sync.RWMutex + + // firstFingerprintCh is used to trigger that we have successfully + // fingerprinted once. It is used to gate launching the stats collection. + firstFingerprintCh chan struct{} + hasFingerprinted bool + + // lastHealthState is the last known health fingerprinted by the manager + lastHealthState drivers.HealthState + lastHealthStateMu sync.Mutex +} + +// newInstanceManager returns a new driver instance manager. It is expected that +// the context passed in the configuration is cancelled in order to shutdown +// launched goroutines. +func newInstanceManager(c *instanceManagerConfig) *instanceManager { + + ctx, cancel := context.WithCancel(c.Ctx) + i := &instanceManager{ + logger: c.Logger.With("driver", c.ID.Name), + ctx: ctx, + cancel: cancel, + loader: c.Loader, + storeReattach: c.StoreReattach, + fetchReattach: c.FetchReattach, + pluginConfig: c.PluginConfig, + id: c.ID, + updateNodeFromDriver: c.UpdateNodeFromDriver, + handlers: map[string]EventHandler{}, + firstFingerprintCh: make(chan struct{}), + } + + go i.run() + return i +} + +// WaitForFirstFingerprint waits until either the plugin fingerprints, the +// passed context is done, or the plugin instance manager is shutdown. +func (i *instanceManager) WaitForFirstFingerprint(ctx context.Context) { + select { + case <-i.ctx.Done(): + case <-ctx.Done(): + case <-i.firstFingerprintCh: + } +} + +// registerEventHandler registers the given handler to run for events with the +// given taskID +func (i *instanceManager) registerEventHandler(taskID string, handler EventHandler) { + i.handlersLock.Lock() + defer i.handlersLock.Unlock() + i.handlers[taskID] = handler +} + +// deregisterEventHandler removed the handlers registered for the given taskID +// and is serlialized so as to not be called concurrently with the registered +// handler +func (i *instanceManager) deregisterEventHandler(taskID string) { + i.handlersLock.Lock() + defer i.handlersLock.Unlock() + delete(i.handlers, taskID) +} + +// run is a long lived goroutine that starts the fingerprinting and stats +// collection goroutine and then shutsdown the plugin on exit. +func (i *instanceManager) run() { + // Dispense once to ensure we are given a valid plugin + if _, err := i.dispense(); err != nil { + i.logger.Error("dispensing initial plugin failed", "error", err) + return + } + + // Create a waitgroup to block on shutdown for all created goroutines to + // exit + var wg sync.WaitGroup + + // Start the fingerprinter + wg.Add(1) + go func() { + i.fingerprint() + wg.Done() + }() + + // Start event handler + wg.Add(1) + go func() { + i.handleEvents() + wg.Done() + }() + + // Do a final cleanup + wg.Wait() + i.cleanup() +} + +// dispense is used to dispense a plugin. +func (i *instanceManager) dispense() (plugin drivers.DriverPlugin, err error) { + i.pluginLock.Lock() + defer i.pluginLock.Unlock() + + // See if we already have a running instance + if i.plugin != nil && !i.plugin.Exited() { + return i.driver, nil + } + + var pluginInstance loader.PluginInstance + + if reattach, ok := i.fetchReattach(); ok { + // Reattach to existing plugin + pluginInstance, err = i.loader.Reattach(i.id.Name, i.id.PluginType, reattach) + } else { + // Get an instance of the plugin + pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger) + } + if err != nil { + // Retry as the error just indicates the singleton has exited + if err == singleton.SingletonPluginExited { + pluginInstance, err = i.loader.Dispense(i.id.Name, i.id.PluginType, i.pluginConfig, i.logger) + } + + // If we still have an error there is a real problem + if err != nil { + return nil, fmt.Errorf("failed to start plugin: %v", err) + } + } + + // Convert to a fingerprint plugin + driver, ok := pluginInstance.Plugin().(drivers.DriverPlugin) + if !ok { + pluginInstance.Kill() + return nil, fmt.Errorf("plugin loaded does not implement the driver interface") + } + + // Store the plugin and driver + i.plugin = pluginInstance + i.driver = driver + + // Store the reattach config + if c, ok := pluginInstance.ReattachConfig(); ok { + if err := i.storeReattach(c); err != nil { + i.logger.Error("error storing driver plugin reattach config", "error", err) + } + } + + return driver, nil +} + +// cleanup shutsdown the plugin +func (i *instanceManager) cleanup() { + i.shutdownLock.Lock() + i.pluginLock.Lock() + defer i.pluginLock.Unlock() + defer i.shutdownLock.Unlock() + + if i.plugin != nil && !i.plugin.Exited() { + i.plugin.Kill() + if err := i.storeReattach(nil); err != nil { + i.logger.Warn("error clearing plugin reattach config from state store", "error", err) + } + } +} + +// dispenseFingerprintCh dispenses a driver and makes a Fingerprint RPC call +// to the driver. The fingerprint chan is returned along with the cancel func +// for the context used in the RPC. This cancel func should always be called +// when the caller is finished with the channel. +func (i *instanceManager) dispenseFingerprintCh() (<-chan *drivers.Fingerprint, context.CancelFunc, error) { + driver, err := i.dispense() + if err != nil { + return nil, nil, err + } + + ctx, cancel := context.WithCancel(i.ctx) + fingerCh, err := driver.Fingerprint(ctx) + if err != nil { + cancel() + return nil, nil, err + } + + return fingerCh, cancel, nil +} + +// fingerprint is the main loop for fingerprinting. +func (i *instanceManager) fingerprint() { + fpChan, cancel, err := i.dispenseFingerprintCh() + if err != nil { + i.logger.Error("failed to dispense driver plugin", "error", err) + } + + // backoff and retry used if the RPC is closed by the other end + var backoff time.Duration + var retry int + for { + if backoff > 0 { + select { + case <-time.After(backoff): + case <-i.ctx.Done(): + cancel() + return + } + } + + select { + case <-i.ctx.Done(): + cancel() + return + case fp, ok := <-fpChan: + if ok { + if fp.Err == nil { + i.handleFingerprint(fp) + } else { + i.logger.Warn("received fingerprint error from driver", "error", fp.Err) + i.handleFingerprintError() + } + continue + } + + // if the channel is closed attempt to open a new one + newFpChan, newCancel, err := i.dispenseFingerprintCh() + if err != nil { + i.logger.Warn("error fingerprinting driver", "error", err, "retry", retry) + i.handleFingerprintError() + + // Calculate the new backoff + backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline + if backoff > driverFPBackoffLimit { + backoff = driverFPBackoffLimit + } + // Increment retry counter + retry++ + continue + } + cancel() + fpChan = newFpChan + cancel = newCancel + + // Reset backoff + backoff = 0 + retry = 0 + } + } +} + +// handleFingerprintError is called when an error occurred while fingerprinting +// and will set the driver to unhealthy +func (i *instanceManager) handleFingerprintError() { + di := &structs.DriverInfo{ + Healthy: false, + HealthDescription: "failed to fingerprint driver", + UpdateTime: time.Now(), + } + i.updateNodeFromDriver(i.id.Name, di) +} + +// handleFingerprint updates the node with the current fingerprint status +func (i *instanceManager) handleFingerprint(fp *drivers.Fingerprint) { + attrs := make(map[string]string, len(fp.Attributes)) + for key, attr := range fp.Attributes { + attrs[key] = attr.GoString() + } + di := &structs.DriverInfo{ + Attributes: attrs, + Detected: fp.Health != drivers.HealthStateUndetected, + Healthy: fp.Health == drivers.HealthStateHealthy, + HealthDescription: fp.HealthDescription, + UpdateTime: time.Now(), + } + i.updateNodeFromDriver(i.id.Name, di) + + // log detected/undetected state changes after the initial fingerprint + i.lastHealthStateMu.Lock() + if i.hasFingerprinted { + if i.lastHealthState != fp.Health { + i.logger.Info("driver health state has changed", "previous", i.lastHealthState, "current", fp.Health, "description", fp.HealthDescription) + } + } + i.lastHealthState = fp.Health + i.lastHealthStateMu.Unlock() + + // if this is the first fingerprint, mark that we have received it + if !i.hasFingerprinted { + i.logger.Trace("initial driver fingerprint", "fingerprint", fp) + close(i.firstFingerprintCh) + i.hasFingerprinted = true + } +} + +// getLastHealth returns the most recent HealthState from fingerprinting +func (i *instanceManager) getLastHealth() drivers.HealthState { + i.lastHealthStateMu.Lock() + defer i.lastHealthStateMu.Unlock() + return i.lastHealthState +} + +// dispenseTaskEventsCh dispenses a driver plugin and makes a TaskEvents RPC. +// The TaskEvent chan and cancel func for the RPC is return. The cancel func must +// be called by the caller to properly cleanup the context +func (i *instanceManager) dispenseTaskEventsCh() (<-chan *drivers.TaskEvent, context.CancelFunc, error) { + driver, err := i.dispense() + if err != nil { + return nil, nil, err + } + + ctx, cancel := context.WithCancel(i.ctx) + eventsCh, err := driver.TaskEvents(ctx) + if err != nil { + cancel() + return nil, nil, err + } + + return eventsCh, cancel, nil +} + +// handleEvents is the main loop that receives task events from the driver +func (i *instanceManager) handleEvents() { + eventsCh, cancel, err := i.dispenseTaskEventsCh() + if err != nil { + i.logger.Error("failed to dispense driver", "error", err) + } + + var backoff time.Duration + var retry int + for { + if backoff > 0 { + select { + case <-time.After(backoff): + case <-i.ctx.Done(): + cancel() + return + } + } + + select { + case <-i.ctx.Done(): + cancel() + return + case ev, ok := <-eventsCh: + if ok { + i.handleEvent(ev) + continue + } + + // if the channel is closed attempt to open a new one + newEventsChan, newCancel, err := i.dispenseTaskEventsCh() + if err != nil { + i.logger.Warn("failed to receive task events, retrying", "error", err, "retry", retry) + + // Calculate the new backoff + backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline + if backoff > driverFPBackoffLimit { + backoff = driverFPBackoffLimit + } + retry++ + continue + } + cancel() + eventsCh = newEventsChan + cancel = newCancel + + // Reset backoff + backoff = 0 + retry = 0 + } + } +} + +// handleEvent looks up the event handler(s) for the event and runs them +func (i *instanceManager) handleEvent(ev *drivers.TaskEvent) { + i.handlersLock.RLock() + defer i.handlersLock.RUnlock() + if handler, ok := i.handlers[ev.TaskID]; ok { + i.logger.Trace("task event received", "event", ev) + handler(ev) + return + } + + i.logger.Warn("no handler registered for event", "event", ev) +} diff --git a/client/pluginmanager/drivermanager/manager.go b/client/pluginmanager/drivermanager/manager.go new file mode 100644 index 000000000..0dabc47df --- /dev/null +++ b/client/pluginmanager/drivermanager/manager.go @@ -0,0 +1,320 @@ +package drivermanager + +import ( + "context" + "fmt" + "sync" + "time" + + log "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared" + "github.com/hashicorp/nomad/plugins/shared/loader" +) + +// Manager is the interface used to manage driver plugins +type Manager interface { + RegisterEventHandler(driver, taskID string, handler EventHandler) + DeregisterEventHandler(driver, taskID string) + + Dispense(driver string) (drivers.DriverPlugin, error) +} + +// EventHandler can be registered with a Manager to be called for a matching task. +// The handler should not block execution. +type EventHandler func(*drivers.TaskEvent) + +// StateStorage is used to persist the driver managers state across +// agent restarts. +type StateStorage interface { + // GetDevicePluginState is used to retrieve the device manager's plugin + // state. + GetDriverPluginState() (*state.PluginState, error) + + // PutDevicePluginState is used to store the device manager's plugin + // state. + PutDriverPluginState(state *state.PluginState) error +} + +// UpdateNodeDriverInfoFn is the callback used to update the node from +// fingerprinting +type UpdateNodeDriverInfoFn func(string, *structs.DriverInfo) *structs.Node + +// StorePluginReattachFn is used to store plugin reattachment configurations. +type StorePluginReattachFn func(*plugin.ReattachConfig) error + +// FetchPluginReattachFn is used to retrieve the stored plugin reattachment +// configuration. +type FetchPluginReattachFn func() (*plugin.ReattachConfig, bool) + +// Config is used to configure a driver manager +type Config struct { + // Logger is the logger used by the device manager + Logger log.Logger + + // Loader is the plugin loader + Loader loader.PluginCatalog + + // PluginConfig is the config passed to the launched plugins + PluginConfig *base.ClientAgentConfig + + // Updater is used to update the node when driver information changes + Updater UpdateNodeDriverInfoFn + + // State is used to manage the device managers state + State StateStorage + + // AllowedDrivers if set will only start driver plugins for the given + // drivers + AllowedDrivers map[string]struct{} + + // BlockedDrivers if set will not allow the given driver plugins to start + BlockedDrivers map[string]struct{} +} + +// manager is used to manage a set of driver plugins +type manager struct { + // logger is the logger used by the device manager + logger log.Logger + + // state is used to manage the device managers state + state StateStorage + + // ctx is used to shutdown the device manager + ctx context.Context + cancel context.CancelFunc + + // loader is the plugin loader + loader loader.PluginCatalog + + // pluginConfig is the config passed to the launched plugins + pluginConfig *base.ClientAgentConfig + + // updater is used to update the node when device information changes + updater UpdateNodeDriverInfoFn + + // instances is the list of managed devices, access is serialized by instanceMu + instances map[string]*instanceManager + instancesMu sync.Mutex + + // reattachConfigs stores the plugin reattach configs + reattachConfigs map[loader.PluginID]*shared.ReattachConfig + reattachConfigLock sync.Mutex + + // allows/block lists + allowedDrivers map[string]struct{} + blockedDrivers map[string]struct{} + + // readyCh is ticked once at the end of Run() + readyCh chan struct{} +} + +// New returns a new driver manager +func New(c *Config) *manager { + ctx, cancel := context.WithCancel(context.Background()) + return &manager{ + logger: c.Logger.Named("driver_mgr"), + state: c.State, + ctx: ctx, + cancel: cancel, + loader: c.Loader, + pluginConfig: c.PluginConfig, + updater: c.Updater, + instances: make(map[string]*instanceManager), + reattachConfigs: make(map[loader.PluginID]*shared.ReattachConfig), + allowedDrivers: c.AllowedDrivers, + blockedDrivers: c.BlockedDrivers, + readyCh: make(chan struct{}), + } +} + +// PluginType returns the type of plugin this mananger mananges +func (*manager) PluginType() string { return base.PluginTypeDriver } + +// Run starts the mananger, initializes driver plugins and blocks until Shutdown +// is called. +func (m *manager) Run() { + // Load any previous plugin reattach configuration + m.loadReattachConfigs() + + // Get driver plugins + driversPlugins := m.loader.Catalog()[base.PluginTypeDriver] + if len(driversPlugins) == 0 { + m.logger.Debug("exiting since there are no driver plugins") + m.cancel() + return + } + + var skippedDrivers []string + for _, d := range driversPlugins { + id := loader.PluginInfoID(d) + // Skip drivers that are not in the allowed list if it is set. + if _, ok := m.allowedDrivers[id.Name]; len(m.allowedDrivers) > 0 && !ok { + skippedDrivers = append(skippedDrivers, id.Name) + continue + } + // Skip fingerprinting drivers that are in the blocked list + if _, ok := m.blockedDrivers[id.Name]; ok { + skippedDrivers = append(skippedDrivers, id.Name) + continue + } + + storeFn := func(c *plugin.ReattachConfig) error { + return m.storePluginReattachConfig(id, c) + } + fetchFn := func() (*plugin.ReattachConfig, bool) { + return m.fetchPluginReattachConfig(id) + } + + instance := newInstanceManager(&instanceManagerConfig{ + Logger: m.logger, + Ctx: m.ctx, + Loader: m.loader, + StoreReattach: storeFn, + FetchReattach: fetchFn, + PluginConfig: m.pluginConfig, + ID: &id, + UpdateNodeFromDriver: m.updater, + }) + + m.instancesMu.Lock() + m.instances[id.Name] = instance + m.instancesMu.Unlock() + } + + if len(skippedDrivers) > 0 { + m.logger.Debug("drivers skipped due to allow/block list", "skipped_drivers", skippedDrivers) + } + + // signal ready + select { + case <-m.ctx.Done(): + return + case m.readyCh <- struct{}{}: + } + + // wait for shutdown + <-m.ctx.Done() +} + +// Shutdown cleans up all the plugins +func (m *manager) Shutdown() { + // Cancel the context to stop any requests + m.cancel() + + m.instancesMu.Lock() + defer m.instancesMu.Unlock() + + // Go through and shut everything down + for _, i := range m.instances { + i.cleanup() + } +} + +func (m *manager) Ready() <-chan struct{} { + ret := make(chan struct{}) + go func() { + // We don't want to start initial fingerprint wait until Run loop has + // finished + <-m.readyCh + + var availDrivers []string + ctx, cancel := context.WithTimeout(m.ctx, time.Second*10) + for name, instance := range m.instances { + instance.WaitForFirstFingerprint(ctx) + if instance.lastHealthState != drivers.HealthStateUndetected { + availDrivers = append(availDrivers, name) + } + } + cancel() + m.logger.Debug("detected drivers", "drivers", availDrivers) + close(ret) + }() + return ret +} + +func (m *manager) loadReattachConfigs() error { + m.reattachConfigLock.Lock() + defer m.reattachConfigLock.Unlock() + + s, err := m.state.GetDriverPluginState() + if err != nil { + return err + } + + if s != nil { + for name, c := range s.ReattachConfigs { + id := loader.PluginID{ + PluginType: base.PluginTypeDriver, + Name: name, + } + m.reattachConfigs[id] = c + } + } + return nil +} + +// storePluginReattachConfig is used as a callback to the instance managers and +// persists thhe plugin reattach configurations. +func (m *manager) storePluginReattachConfig(id loader.PluginID, c *plugin.ReattachConfig) error { + m.reattachConfigLock.Lock() + defer m.reattachConfigLock.Unlock() + + // Store the new reattach config + m.reattachConfigs[id] = shared.ReattachConfigFromGoPlugin(c) + + // Persist the state + s := &state.PluginState{ + ReattachConfigs: make(map[string]*shared.ReattachConfig, len(m.reattachConfigs)), + } + + for id, c := range m.reattachConfigs { + s.ReattachConfigs[id.Name] = c + } + + return m.state.PutDriverPluginState(s) +} + +// fetchPluginReattachConfig is used as a callback to the instance managers and +// retrieves the plugin reattach config. If it has not been stored it will +// return nil +func (m *manager) fetchPluginReattachConfig(id loader.PluginID) (*plugin.ReattachConfig, bool) { + m.reattachConfigLock.Lock() + defer m.reattachConfigLock.Unlock() + + if cfg, ok := m.reattachConfigs[id]; ok { + c, err := shared.ReattachConfigToGoPlugin(cfg) + if err != nil { + m.logger.Warn("failed to read plugin reattach config", "config", cfg, "error", err) + return nil, false + } + return c, true + } + return nil, false +} + +func (m *manager) RegisterEventHandler(driver, taskID string, handler EventHandler) { + m.instancesMu.Lock() + m.instances[driver].registerEventHandler(taskID, handler) + m.instancesMu.Unlock() +} + +func (m *manager) DeregisterEventHandler(driver, taskID string) { + m.instancesMu.Lock() + m.instances[driver].deregisterEventHandler(taskID) + m.instancesMu.Unlock() +} + +func (m *manager) Dispense(d string) (drivers.DriverPlugin, error) { + m.instancesMu.Lock() + defer m.instancesMu.Unlock() + if instance, ok := m.instances[d]; ok { + return instance.dispense() + } + + return nil, fmt.Errorf("driver not found") +} diff --git a/client/pluginmanager/drivermanager/manager_test.go b/client/pluginmanager/drivermanager/manager_test.go new file mode 100644 index 000000000..d3e6e6812 --- /dev/null +++ b/client/pluginmanager/drivermanager/manager_test.go @@ -0,0 +1,305 @@ +package drivermanager + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + log "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/pluginmanager" + "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared/loader" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +var _ Manager = (*manager)(nil) +var _ pluginmanager.PluginManager = (*manager)(nil) + +func testSetup(t *testing.T) (chan *drivers.Fingerprint, chan *drivers.TaskEvent, *manager) { + fpChan := make(chan *drivers.Fingerprint) + evChan := make(chan *drivers.TaskEvent) + drv := mockDriver(fpChan, evChan) + cat := mockCatalog(map[string]drivers.DriverPlugin{"mock": drv}) + cfg := &Config{ + Logger: testlog.HCLogger(t), + Loader: cat, + PluginConfig: &base.ClientAgentConfig{}, + Updater: noopUpdater, + State: state.NoopDB{}, + AllowedDrivers: make(map[string]struct{}), + BlockedDrivers: make(map[string]struct{}), + } + + mgr := New(cfg) + return fpChan, evChan, mgr +} + +func mockDriver(fpChan chan *drivers.Fingerprint, evChan chan *drivers.TaskEvent) drivers.DriverPlugin { + return &drivers.MockDriver{ + FingerprintF: func(ctx context.Context) (<-chan *drivers.Fingerprint, error) { + return fpChan, nil + }, + TaskEventsF: func(ctx context.Context) (<-chan *drivers.TaskEvent, error) { + return evChan, nil + }, + } +} + +func mockCatalog(drivers map[string]drivers.DriverPlugin) *loader.MockCatalog { + cat := map[string][]*base.PluginInfoResponse{ + base.PluginTypeDriver: {}, + } + for d := range drivers { + cat[base.PluginTypeDriver] = append(cat[base.PluginTypeDriver], &base.PluginInfoResponse{ + Name: d, + Type: base.PluginTypeDriver, + }) + } + + return &loader.MockCatalog{ + DispenseF: func(name, pluginType string, cfg *base.ClientAgentConfig, logger log.Logger) (loader.PluginInstance, error) { + d, ok := drivers[name] + if !ok { + return nil, fmt.Errorf("driver not found") + } + return loader.MockBasicExternalPlugin(d), nil + }, + ReattachF: func(name, pluginType string, config *plugin.ReattachConfig) (loader.PluginInstance, error) { + d, ok := drivers[name] + if !ok { + return nil, fmt.Errorf("driver not found") + } + return loader.MockBasicExternalPlugin(d), nil + }, + CatalogF: func() map[string][]*base.PluginInfoResponse { + return cat + }, + } +} + +func mockTaskEvent(taskID string) *drivers.TaskEvent { + return &drivers.TaskEvent{ + TaskID: taskID, + Timestamp: time.Now(), + Annotations: map[string]string{}, + Message: "event from " + taskID, + } +} + +func noopUpdater(string, *structs.DriverInfo) *structs.Node { return nil } + +func TestMananger_Fingerprint(t *testing.T) { + t.Parallel() + require := require.New(t) + fpChan, _, mgr := testSetup(t) + var infos []*structs.DriverInfo + mgr.updater = func(d string, i *structs.DriverInfo) *structs.Node { + infos = append(infos, i) + return nil + } + go mgr.Run() + defer mgr.Shutdown() + fpChan <- &drivers.Fingerprint{Health: drivers.HealthStateHealthy} + testutil.WaitForResult(func() (bool, error) { + mgr.instancesMu.Lock() + defer mgr.instancesMu.Unlock() + if len(mgr.instances) != 1 { + return false, fmt.Errorf("mananger should have registered an instance") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + testutil.WaitForResult(func() (bool, error) { + mgr.instancesMu.Lock() + defer mgr.instancesMu.Unlock() + if mgr.instances["mock"].getLastHealth() != drivers.HealthStateHealthy { + return false, fmt.Errorf("mock instance should be healthy") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + fpChan <- &drivers.Fingerprint{ + Health: drivers.HealthStateUnhealthy, + } + testutil.WaitForResult(func() (bool, error) { + mgr.instancesMu.Lock() + defer mgr.instancesMu.Unlock() + if mgr.instances["mock"].getLastHealth() == drivers.HealthStateHealthy { + return false, fmt.Errorf("mock instance should be unhealthy") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + fpChan <- &drivers.Fingerprint{ + Health: drivers.HealthStateUndetected, + } + testutil.WaitForResult(func() (bool, error) { + mgr.instancesMu.Lock() + defer mgr.instancesMu.Unlock() + if mgr.instances["mock"].getLastHealth() != drivers.HealthStateUndetected { + return false, fmt.Errorf("mock instance should be undetected") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + require.Len(infos, 3) + require.True(infos[0].Healthy) + require.True(infos[0].Detected) + require.False(infos[1].Healthy) + require.True(infos[1].Detected) + require.False(infos[2].Healthy) + require.False(infos[2].Detected) +} + +func TestMananger_TaskEvents(t *testing.T) { + t.Parallel() + require := require.New(t) + fpChan, evChan, mgr := testSetup(t) + go mgr.Run() + defer mgr.Shutdown() + fpChan <- &drivers.Fingerprint{Health: drivers.HealthStateHealthy} + testutil.WaitForResult(func() (bool, error) { + mgr.instancesMu.Lock() + defer mgr.instancesMu.Unlock() + if len(mgr.instances) != 1 { + return false, fmt.Errorf("mananger should have registered 1 instance") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + event1 := mockTaskEvent("abc1") + var wg sync.WaitGroup + wg.Add(1) + mgr.RegisterEventHandler("mock", "abc1", func(ev *drivers.TaskEvent) { + defer wg.Done() + require.Exactly(event1, ev) + }) + + evChan <- event1 + wg.Wait() +} + +func TestManager_Run_AllowedDrivers(t *testing.T) { + t.Parallel() + require := require.New(t) + fpChan, _, mgr := testSetup(t) + mgr.allowedDrivers = map[string]struct{}{"foo": {}} + go mgr.Run() + defer mgr.Shutdown() + select { + case fpChan <- &drivers.Fingerprint{Health: drivers.HealthStateHealthy}: + default: + } + testutil.AssertUntil(200*time.Millisecond, func() (bool, error) { + mgr.instancesMu.Lock() + defer mgr.instancesMu.Unlock() + if len(mgr.instances) > 0 { + return false, fmt.Errorf("mananger should have no registered instances") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) +} + +func TestManager_Run_BlockedDrivers(t *testing.T) { + t.Parallel() + require := require.New(t) + fpChan, _, mgr := testSetup(t) + mgr.blockedDrivers = map[string]struct{}{"mock": {}} + go mgr.Run() + defer mgr.Shutdown() + select { + case fpChan <- &drivers.Fingerprint{Health: drivers.HealthStateHealthy}: + default: + } + testutil.AssertUntil(200*time.Millisecond, func() (bool, error) { + mgr.instancesMu.Lock() + defer mgr.instancesMu.Unlock() + if len(mgr.instances) > 0 { + return false, fmt.Errorf("mananger should have no registered instances") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) +} + +func TestManager_Run_AllowedBlockedDrivers_Combined(t *testing.T) { + t.Parallel() + require := require.New(t) + drvs := map[string]drivers.DriverPlugin{} + fpChs := map[string]chan *drivers.Fingerprint{} + names := []string{"mock1", "mock2", "mock3", "mock4", "mock5"} + for _, d := range names { + ch := make(chan *drivers.Fingerprint) + drv := mockDriver(ch, nil) + drvs[d] = drv + fpChs[d] = ch + } + cat := mockCatalog(drvs) + cfg := &Config{ + Logger: testlog.HCLogger(t), + Loader: cat, + PluginConfig: nil, + Updater: noopUpdater, + State: state.NoopDB{}, + AllowedDrivers: map[string]struct{}{ + "mock2": {}, + "mock3": {}, + "mock4": {}, + "foo": {}, + }, + BlockedDrivers: map[string]struct{}{ + "mock2": {}, + "mock4": {}, + "bar": {}, + }, + } + mgr := New(cfg) + + go mgr.Run() + defer mgr.Shutdown() + for _, d := range names { + go func(drv string) { + select { + case fpChs[drv] <- &drivers.Fingerprint{Health: drivers.HealthStateHealthy}: + case <-time.After(200 * time.Millisecond): + } + }(d) + } + + testutil.AssertUntil(200*time.Millisecond, func() (bool, error) { + mgr.instancesMu.Lock() + defer mgr.instancesMu.Unlock() + if len(mgr.instances) > 1 { + return false, fmt.Errorf("mananger should have 1 registered instance") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + mgr.instancesMu.Lock() + require.Len(mgr.instances, 1) + _, ok := mgr.instances["mock3"] + mgr.instancesMu.Unlock() + require.True(ok) +} diff --git a/client/pluginmanager/drivermanager/state/state.go b/client/pluginmanager/drivermanager/state/state.go new file mode 100644 index 000000000..53be7616e --- /dev/null +++ b/client/pluginmanager/drivermanager/state/state.go @@ -0,0 +1,11 @@ +package state + +import "github.com/hashicorp/nomad/plugins/shared" + +// PluginState is used to store the driver managers state across restarts of the +// agent +type PluginState struct { + // ReattachConfigs are the set of reattach configs for plugin's launched by + // the driver manager + ReattachConfigs map[string]*shared.ReattachConfig +} diff --git a/client/pluginmanager/drivermanager/testing.go b/client/pluginmanager/drivermanager/testing.go new file mode 100644 index 000000000..07c319c93 --- /dev/null +++ b/client/pluginmanager/drivermanager/testing.go @@ -0,0 +1,47 @@ +// +build !release + +package drivermanager + +import ( + "fmt" + "testing" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared/catalog" + "github.com/hashicorp/nomad/plugins/shared/loader" + "github.com/hashicorp/nomad/plugins/shared/singleton" +) + +type testManager struct { + logger log.Logger + loader loader.PluginCatalog +} + +func TestDriverManager(t *testing.T) Manager { + logger := testlog.HCLogger(t).Named("driver_mgr") + pluginLoader := catalog.TestPluginLoader(t) + return &testManager{ + logger: logger, + loader: singleton.NewSingletonLoader(logger, pluginLoader), + } +} + +func (m *testManager) Dispense(driver string) (drivers.DriverPlugin, error) { + instance, err := m.loader.Dispense(driver, base.PluginTypeDriver, nil, m.logger) + if err != nil { + return nil, err + } + + d, ok := instance.Plugin().(drivers.DriverPlugin) + if !ok { + return nil, fmt.Errorf("plugin does not implement DriverPlugin interface") + } + + return d, nil +} + +func (m *testManager) RegisterEventHandler(driver, taskID string, handler EventHandler) {} +func (m *testManager) DeregisterEventHandler(driver, taskID string) {} diff --git a/client/pluginmanager/group.go b/client/pluginmanager/group.go index db9c75674..3f06c1d25 100644 --- a/client/pluginmanager/group.go +++ b/client/pluginmanager/group.go @@ -3,10 +3,17 @@ package pluginmanager import ( "fmt" "sync" + "time" log "github.com/hashicorp/go-hclog" ) +var ( + // DefaultManagerReadyTimeout is the default amount of time we will wait + // for a plugin mananger to be ready before logging it and moving on. + DefaultManagerReadyTimeout = time.Second * 5 +) + // PluginGroup is a utility struct to manage a collectively orchestrate a // set of PluginManagers type PluginGroup struct { @@ -33,11 +40,12 @@ func New(logger log.Logger) *PluginGroup { // RegisterAndRun registers the manager and starts it in a separate goroutine func (m *PluginGroup) RegisterAndRun(manager PluginManager) error { m.mLock.Lock() + defer m.mLock.Unlock() if m.shutdown { + m.mLock.Unlock() return fmt.Errorf("plugin group already shutdown") } m.managers = append(m.managers, manager) - m.mLock.Unlock() go func() { m.logger.Info("starting plugin manager", "plugin-type", manager.PluginType()) @@ -47,6 +55,38 @@ func (m *PluginGroup) RegisterAndRun(manager PluginManager) error { return nil } +// Ready returns a channel which will be closed once all plugin manangers are ready. +// A timeout for waiting on each manager is given +func (m *PluginGroup) Ready(timeout time.Duration) (<-chan struct{}, error) { + m.mLock.Lock() + defer m.mLock.Unlock() + if m.shutdown { + return nil, fmt.Errorf("plugin group already shutdown") + } + + var wg sync.WaitGroup + for i := range m.managers { + manager := m.managers[i] + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-manager.Ready(): + case <-time.After(timeout): + m.logger.Warn("timeout waiting for plugin manager to be ready", + "plugin-type", manager.PluginType()) + } + }() + } + + ret := make(chan struct{}) + go func() { + wg.Wait() + close(ret) + }() + return ret, nil +} + // Shutdown shutsdown all registered PluginManagers in reverse order of how // they were started. func (m *PluginGroup) Shutdown() { diff --git a/client/pluginmanager/manager.go b/client/pluginmanager/manager.go index 0acce2ac9..3e67d12f6 100644 --- a/client/pluginmanager/manager.go +++ b/client/pluginmanager/manager.go @@ -5,6 +5,10 @@ type PluginManager interface { // Run starts a plugin manager and must block until shutdown Run() + // Ready returns a channel that blocks until the plugin mananger has + // initialized all plugins + Ready() <-chan struct{} + // Shutdown should gracefully shutdown all plugins managed by the manager. // It must block until shutdown is complete Shutdown() diff --git a/client/state/db_test.go b/client/state/db_test.go index 9e6be8a14..c28078657 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -8,6 +8,7 @@ import ( trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" @@ -216,3 +217,28 @@ func TestStateDB_DeviceManager(t *testing.T) { require.Equal(state, ps) }) } + +// TestStateDB_DriverManager asserts the behavior of device manager state related StateDB +// methods. +func TestStateDB_DriverManager(t *testing.T) { + t.Parallel() + + testDB(t, func(t *testing.T, db StateDB) { + require := require.New(t) + + // Getting nonexistent state should return nils + ps, err := db.GetDriverPluginState() + require.NoError(err) + require.Nil(ps) + + // Putting PluginState should work + state := &driverstate.PluginState{} + require.NoError(db.PutDriverPluginState(state)) + + // Getting should return the available state + ps, err = db.GetDriverPluginState() + require.NoError(err) + require.NotNil(ps) + require.Equal(state, ps) + }) +} diff --git a/client/state/interface.go b/client/state/interface.go index 5ccc3f0c3..7f2cac7a9 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -3,6 +3,7 @@ package state import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -50,6 +51,14 @@ type StateDB interface { // state. PutDevicePluginState(state *dmstate.PluginState) error + // GetDriverPluginState is used to retrieve the driver manager's plugin + // state. + GetDriverPluginState() (*driverstate.PluginState, error) + + // PutDriverPluginState is used to store the driver manager's plugin + // state. + PutDriverPluginState(state *driverstate.PluginState) error + // Close the database. Unsafe for further use after calling regardless // of return value. Close() error diff --git a/client/state/memdb.go b/client/state/memdb.go index d16e6f232..0e01c01dc 100644 --- a/client/state/memdb.go +++ b/client/state/memdb.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -21,6 +22,9 @@ type MemDB struct { // devicemanager -> plugin-state devManagerPs *dmstate.PluginState + // drivermanager -> plugin-state + driverManagerPs *driverstate.PluginState + mu sync.RWMutex } @@ -150,6 +154,19 @@ func (m *MemDB) GetDevicePluginState() (*dmstate.PluginState, error) { return m.devManagerPs, nil } +func (m *MemDB) GetDriverPluginState() (*driverstate.PluginState, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.driverManagerPs, nil +} + +func (m *MemDB) PutDriverPluginState(ps *driverstate.PluginState) error { + m.mu.Lock() + defer m.mu.Unlock() + m.driverManagerPs = ps + return nil +} + func (m *MemDB) Close() error { m.mu.Lock() defer m.mu.Unlock() diff --git a/client/state/noopdb.go b/client/state/noopdb.go index 1d4975de2..cb03a55f5 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -3,6 +3,7 @@ package state import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -49,6 +50,14 @@ func (n NoopDB) GetDevicePluginState() (*dmstate.PluginState, error) { return nil, nil } +func (n NoopDB) PutDriverPluginState(ps *driverstate.PluginState) error { + return nil +} + +func (n NoopDB) GetDriverPluginState() (*driverstate.PluginState, error) { + return nil, nil +} + func (n NoopDB) Close() error { return nil } diff --git a/client/state/state_database.go b/client/state/state_database.go index ef499b502..2fa6d88b7 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -6,6 +6,7 @@ import ( trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/helper/boltdd" "github.com/hashicorp/nomad/nomad/structs" ) @@ -22,6 +23,9 @@ allocations/ (bucket) devicemanager/ |--> plugin-state -> *dmstate.PluginState + +drivermanager/ +|--> plugin-state -> *driverstate.PluginState */ var ( @@ -45,9 +49,13 @@ var ( // data devManagerBucket = []byte("devicemanager") - // devManagerPluginStateKey is the key serialized device manager - // plugin state is stored at - devManagerPluginStateKey = []byte("plugin_state") + // driverManagerBucket is the bucket name container all driver manager + // related data + driverManagerBucket = []byte("drivermanager") + + // managerPluginStateKey is the key by which plugin manager plugin state is + // stored at + managerPluginStateKey = []byte("plugin_state") ) // NewStateDBFunc creates a StateDB given a state directory. @@ -377,7 +385,7 @@ func (s *BoltStateDB) PutDevicePluginState(ps *dmstate.PluginState) error { return err } - return devBkt.Put(devManagerPluginStateKey, ps) + return devBkt.Put(managerPluginStateKey, ps) }) } @@ -395,7 +403,7 @@ func (s *BoltStateDB) GetDevicePluginState() (*dmstate.PluginState, error) { // Restore Plugin State if it exists ps = &dmstate.PluginState{} - if err := devBkt.Get(devManagerPluginStateKey, ps); err != nil { + if err := devBkt.Get(managerPluginStateKey, ps); err != nil { if !boltdd.IsErrNotFound(err) { return fmt.Errorf("failed to read device manager plugin state: %v", err) } @@ -413,3 +421,50 @@ func (s *BoltStateDB) GetDevicePluginState() (*dmstate.PluginState, error) { return ps, nil } + +// PutDriverPluginState stores the driver manager's plugin state or returns an +// error. +func (s *BoltStateDB) PutDriverPluginState(ps *driverstate.PluginState) error { + return s.db.Update(func(tx *boltdd.Tx) error { + // Retrieve the root driver manager bucket + driverBkt, err := tx.CreateBucketIfNotExists(driverManagerBucket) + if err != nil { + return err + } + + return driverBkt.Put(managerPluginStateKey, ps) + }) +} + +// GetDriverPluginState stores the driver manager's plugin state or returns an +// error. +func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) { + var ps *driverstate.PluginState + + err := s.db.View(func(tx *boltdd.Tx) error { + driverBkt := tx.Bucket(driverManagerBucket) + if driverBkt == nil { + // No state, return + return nil + } + + // Restore Plugin State if it exists + ps = &driverstate.PluginState{} + if err := driverBkt.Get(managerPluginStateKey, ps); err != nil { + if !boltdd.IsErrNotFound(err) { + return fmt.Errorf("failed to read driver manager plugin state: %v", err) + } + + // Key not found, reset ps to nil + ps = nil + } + + return nil + }) + + if err != nil { + return nil, err + } + + return ps, nil +} diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index da6431ffc..816607e1e 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -15,14 +15,13 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/devicemanager" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/shared/catalog" - "github.com/hashicorp/nomad/plugins/shared/singleton" "github.com/stretchr/testify/require" ) @@ -146,19 +145,18 @@ func TestConsul_Integration(t *testing.T) { }() // Build the config - pluginLoader := catalog.TestPluginLoader(t) config := &taskrunner.Config{ - Alloc: alloc, - ClientConfig: conf, - Consul: serviceClient, - Task: task, - TaskDir: taskDir, - Logger: logger, - Vault: vclient, - StateDB: state.NoopDB{}, - StateUpdater: logUpdate, - PluginSingletonLoader: singleton.NewSingletonLoader(logger, pluginLoader), - DeviceManager: devicemanager.NoopMockManager(), + Alloc: alloc, + ClientConfig: conf, + Consul: serviceClient, + Task: task, + TaskDir: taskDir, + Logger: logger, + Vault: vclient, + StateDB: state.NoopDB{}, + StateUpdater: logUpdate, + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), } tr, err := taskrunner.NewTaskRunner(config)