mirror of
https://github.com/kemko/nomad.git
synced 2026-01-02 16:35:44 +03:00
Noticed that `detected drivers` log line was misleading - when a driver doesn't fingerprint before timeout, their health status is empty string `""` which we would mark as detected. Now, we log all drivers along with their state to ease driver fingerprint debugging.
390 lines
11 KiB
Go
390 lines
11 KiB
Go
package drivermanager
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
plugin "github.com/hashicorp/go-plugin"
|
|
"github.com/hashicorp/nomad/client/pluginmanager"
|
|
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
|
|
"github.com/hashicorp/nomad/helper/pluginutils/loader"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/plugins/base"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
|
|
)
|
|
|
|
// ErrDriverNotFound is returned during Dispense when the requested driver
|
|
// plugin is not found in the plugin catalog
|
|
var ErrDriverNotFound = fmt.Errorf("driver not found")
|
|
|
|
// Manager is the interface used to manage driver plugins
|
|
type Manager interface {
|
|
pluginmanager.PluginManager
|
|
|
|
// Dispense returns a drivers.DriverPlugin for the given driver plugin name
|
|
// handling reattaching to an existing driver if available
|
|
Dispense(driver string) (drivers.DriverPlugin, error)
|
|
}
|
|
|
|
// EventHandler is a callback to be called for a task.
|
|
// The handler should not block execution.
|
|
type EventHandler func(*drivers.TaskEvent)
|
|
|
|
// TaskEventHandlerFactory returns an event handler for a given allocID/task name
|
|
type TaskEventHandlerFactory func(allocID, taskName string) EventHandler
|
|
|
|
// StateStorage is used to persist the driver managers state across
|
|
// agent restarts.
|
|
type StateStorage interface {
|
|
// GetDevicePluginState is used to retrieve the device manager's plugin
|
|
// state.
|
|
GetDriverPluginState() (*state.PluginState, error)
|
|
|
|
// PutDevicePluginState is used to store the device manager's plugin
|
|
// state.
|
|
PutDriverPluginState(state *state.PluginState) error
|
|
}
|
|
|
|
// UpdateNodeDriverInfoFn is the callback used to update the node from
|
|
// fingerprinting
|
|
type UpdateNodeDriverInfoFn func(string, *structs.DriverInfo)
|
|
|
|
// StorePluginReattachFn is used to store plugin reattachment configurations.
|
|
type StorePluginReattachFn func(*plugin.ReattachConfig) error
|
|
|
|
// FetchPluginReattachFn is used to retrieve the stored plugin reattachment
|
|
// configuration.
|
|
type FetchPluginReattachFn func() (*plugin.ReattachConfig, bool)
|
|
|
|
// Config is used to configure a driver manager
|
|
type Config struct {
|
|
// Logger is the logger used by the device manager
|
|
Logger log.Logger
|
|
|
|
// Loader is the plugin loader
|
|
Loader loader.PluginCatalog
|
|
|
|
// PluginConfig is the config passed to the launched plugins
|
|
PluginConfig *base.AgentConfig
|
|
|
|
// Updater is used to update the node when driver information changes
|
|
Updater UpdateNodeDriverInfoFn
|
|
|
|
// EventHandlerFactory is used to retrieve a task event handler
|
|
EventHandlerFactory TaskEventHandlerFactory
|
|
|
|
// State is used to manage the device managers state
|
|
State StateStorage
|
|
|
|
// AllowedDrivers if set will only start driver plugins for the given
|
|
// drivers
|
|
AllowedDrivers map[string]struct{}
|
|
|
|
// BlockedDrivers if set will not allow the given driver plugins to start
|
|
BlockedDrivers map[string]struct{}
|
|
}
|
|
|
|
// manager is used to manage a set of driver plugins
|
|
type manager struct {
|
|
// logger is the logger used by the device manager
|
|
logger log.Logger
|
|
|
|
// state is used to manage the device managers state
|
|
state StateStorage
|
|
|
|
// ctx is used to shutdown the device manager
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
// loader is the plugin loader
|
|
loader loader.PluginCatalog
|
|
|
|
// pluginConfig is the config passed to the launched plugins
|
|
pluginConfig *base.AgentConfig
|
|
|
|
// updater is used to update the node when device information changes
|
|
updater UpdateNodeDriverInfoFn
|
|
|
|
// eventHandlerFactory is passed to the instance managers and used to forward
|
|
// task events
|
|
eventHandlerFactory TaskEventHandlerFactory
|
|
|
|
// instances is the list of managed devices, access is serialized by instanceMu
|
|
instances map[string]*instanceManager
|
|
instancesMu sync.RWMutex
|
|
|
|
// reattachConfigs stores the plugin reattach configs
|
|
reattachConfigs map[loader.PluginID]*pstructs.ReattachConfig
|
|
reattachConfigLock sync.Mutex
|
|
|
|
// allows/block lists
|
|
allowedDrivers map[string]struct{}
|
|
blockedDrivers map[string]struct{}
|
|
|
|
// readyCh is ticked once at the end of Run()
|
|
readyCh chan struct{}
|
|
}
|
|
|
|
// New returns a new driver manager
|
|
func New(c *Config) *manager {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &manager{
|
|
logger: c.Logger.Named("driver_mgr"),
|
|
state: c.State,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
loader: c.Loader,
|
|
pluginConfig: c.PluginConfig,
|
|
updater: c.Updater,
|
|
eventHandlerFactory: c.EventHandlerFactory,
|
|
instances: make(map[string]*instanceManager),
|
|
reattachConfigs: make(map[loader.PluginID]*pstructs.ReattachConfig),
|
|
allowedDrivers: c.AllowedDrivers,
|
|
blockedDrivers: c.BlockedDrivers,
|
|
readyCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// PluginType returns the type of plugin this manager mananges
|
|
func (*manager) PluginType() string { return base.PluginTypeDriver }
|
|
|
|
// Run starts the manager, initializes driver plugins and blocks until Shutdown
|
|
// is called.
|
|
func (m *manager) Run() {
|
|
// Load any previous plugin reattach configuration
|
|
if err := m.loadReattachConfigs(); err != nil {
|
|
m.logger.Warn("unable to load driver plugin reattach configs, a driver process may have been leaked",
|
|
"error", err)
|
|
}
|
|
|
|
// Get driver plugins
|
|
driversPlugins := m.loader.Catalog()[base.PluginTypeDriver]
|
|
if len(driversPlugins) == 0 {
|
|
m.logger.Debug("exiting since there are no driver plugins")
|
|
m.cancel()
|
|
return
|
|
}
|
|
|
|
var skippedDrivers []string
|
|
for _, d := range driversPlugins {
|
|
id := loader.PluginInfoID(d)
|
|
if m.isDriverBlocked(id.Name) {
|
|
skippedDrivers = append(skippedDrivers, id.Name)
|
|
continue
|
|
}
|
|
|
|
storeFn := func(c *plugin.ReattachConfig) error {
|
|
return m.storePluginReattachConfig(id, c)
|
|
}
|
|
fetchFn := func() (*plugin.ReattachConfig, bool) {
|
|
return m.fetchPluginReattachConfig(id)
|
|
}
|
|
|
|
instance := newInstanceManager(&instanceManagerConfig{
|
|
Logger: m.logger,
|
|
Ctx: m.ctx,
|
|
Loader: m.loader,
|
|
StoreReattach: storeFn,
|
|
FetchReattach: fetchFn,
|
|
PluginConfig: m.pluginConfig,
|
|
ID: &id,
|
|
UpdateNodeFromDriver: m.updater,
|
|
EventHandlerFactory: m.eventHandlerFactory,
|
|
})
|
|
|
|
m.instancesMu.Lock()
|
|
m.instances[id.Name] = instance
|
|
m.instancesMu.Unlock()
|
|
}
|
|
|
|
if len(skippedDrivers) > 0 {
|
|
m.logger.Debug("drivers skipped due to allow/block list", "skipped_drivers", skippedDrivers)
|
|
}
|
|
|
|
// signal ready
|
|
close(m.readyCh)
|
|
}
|
|
|
|
// Shutdown cleans up all the plugins
|
|
func (m *manager) Shutdown() {
|
|
// Cancel the context to stop any requests
|
|
m.cancel()
|
|
|
|
m.instancesMu.RLock()
|
|
defer m.instancesMu.RUnlock()
|
|
|
|
// Go through and shut everything down
|
|
for _, i := range m.instances {
|
|
i.cleanup()
|
|
}
|
|
}
|
|
|
|
func (m *manager) WaitForFirstFingerprint(ctx context.Context) <-chan struct{} {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
go m.waitForFirstFingerprint(ctx, cancel)
|
|
return ctx.Done()
|
|
}
|
|
|
|
func (m *manager) waitForFirstFingerprint(ctx context.Context, cancel context.CancelFunc) {
|
|
defer cancel()
|
|
// We don't want to start initial fingerprint wait until Run loop has
|
|
// finished
|
|
select {
|
|
case <-m.readyCh:
|
|
case <-ctx.Done():
|
|
// parent context canceled or timedout
|
|
return
|
|
case <-m.ctx.Done():
|
|
// shutdown called
|
|
return
|
|
}
|
|
|
|
var mu sync.Mutex
|
|
driversByStatus := map[drivers.HealthState][]string{}
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
recordDriver := func(name string, lastHeath drivers.HealthState) {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
updated := append(driversByStatus[lastHeath], name)
|
|
driversByStatus[lastHeath] = updated
|
|
}
|
|
|
|
// loop through instances and wait for each to finish initial fingerprint
|
|
m.instancesMu.RLock()
|
|
for n, i := range m.instances {
|
|
wg.Add(1)
|
|
go func(name string, instance *instanceManager) {
|
|
defer wg.Done()
|
|
instance.WaitForFirstFingerprint(ctx)
|
|
recordDriver(name, instance.getLastHealth())
|
|
}(n, i)
|
|
}
|
|
m.instancesMu.RUnlock()
|
|
wg.Wait()
|
|
|
|
m.logger.Debug("detected drivers", "drivers", driversByStatus)
|
|
}
|
|
|
|
func (m *manager) loadReattachConfigs() error {
|
|
m.reattachConfigLock.Lock()
|
|
defer m.reattachConfigLock.Unlock()
|
|
|
|
s, err := m.state.GetDriverPluginState()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if s != nil {
|
|
for name, c := range s.ReattachConfigs {
|
|
if m.isDriverBlocked(name) {
|
|
m.logger.Warn("reattach config for driver plugin found but driver is blocked due to allow/block list, killing plugin",
|
|
"driver", name)
|
|
m.shutdownBlockedDriver(name, c)
|
|
continue
|
|
}
|
|
|
|
id := loader.PluginID{
|
|
PluginType: base.PluginTypeDriver,
|
|
Name: name,
|
|
}
|
|
|
|
m.reattachConfigs[id] = c
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// shutdownBlockedDriver is used to forcefully shutdown a running driver plugin
|
|
// when it has been blocked due to allow/block lists
|
|
func (m *manager) shutdownBlockedDriver(name string, reattach *pstructs.ReattachConfig) {
|
|
c, err := pstructs.ReattachConfigToGoPlugin(reattach)
|
|
if err != nil {
|
|
m.logger.Warn("failed to reattach and kill blocked driver plugin",
|
|
"driver", name, "error", err)
|
|
return
|
|
|
|
}
|
|
pluginInstance, err := m.loader.Reattach(name, base.PluginTypeDriver, c)
|
|
if err != nil {
|
|
m.logger.Warn("failed to reattach and kill blocked driver plugin",
|
|
"driver", name, "error", err)
|
|
return
|
|
}
|
|
|
|
if !pluginInstance.Exited() {
|
|
pluginInstance.Kill()
|
|
}
|
|
}
|
|
|
|
// storePluginReattachConfig is used as a callback to the instance managers and
|
|
// persists thhe plugin reattach configurations.
|
|
func (m *manager) storePluginReattachConfig(id loader.PluginID, c *plugin.ReattachConfig) error {
|
|
m.reattachConfigLock.Lock()
|
|
defer m.reattachConfigLock.Unlock()
|
|
|
|
if c == nil {
|
|
delete(m.reattachConfigs, id)
|
|
} else {
|
|
// Store the new reattach config
|
|
m.reattachConfigs[id] = pstructs.ReattachConfigFromGoPlugin(c)
|
|
}
|
|
// Persist the state
|
|
s := &state.PluginState{
|
|
ReattachConfigs: make(map[string]*pstructs.ReattachConfig, len(m.reattachConfigs)),
|
|
}
|
|
|
|
for id, c := range m.reattachConfigs {
|
|
s.ReattachConfigs[id.Name] = c
|
|
}
|
|
|
|
return m.state.PutDriverPluginState(s)
|
|
}
|
|
|
|
// fetchPluginReattachConfig is used as a callback to the instance managers and
|
|
// retrieves the plugin reattach config. If it has not been stored it will
|
|
// return nil
|
|
func (m *manager) fetchPluginReattachConfig(id loader.PluginID) (*plugin.ReattachConfig, bool) {
|
|
m.reattachConfigLock.Lock()
|
|
defer m.reattachConfigLock.Unlock()
|
|
|
|
if cfg, ok := m.reattachConfigs[id]; ok {
|
|
c, err := pstructs.ReattachConfigToGoPlugin(cfg)
|
|
if err != nil {
|
|
m.logger.Warn("failed to read plugin reattach config", "config", cfg, "error", err)
|
|
delete(m.reattachConfigs, id)
|
|
return nil, false
|
|
}
|
|
return c, true
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
func (m *manager) Dispense(d string) (drivers.DriverPlugin, error) {
|
|
m.instancesMu.RLock()
|
|
defer m.instancesMu.RUnlock()
|
|
if instance, ok := m.instances[d]; ok {
|
|
return instance.dispense()
|
|
}
|
|
|
|
return nil, ErrDriverNotFound
|
|
}
|
|
|
|
func (m *manager) isDriverBlocked(name string) bool {
|
|
// Block drivers that are not in the allowed list if it is set.
|
|
if _, ok := m.allowedDrivers[name]; len(m.allowedDrivers) > 0 && !ok {
|
|
return true
|
|
}
|
|
|
|
// Block drivers that are in the blocked list
|
|
if _, ok := m.blockedDrivers[name]; ok {
|
|
return true
|
|
}
|
|
return false
|
|
}
|