mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
client: batch initial fingerprinting in plugin manangers
drivermanager: fix pr comments/feedback
This commit is contained in:
@@ -567,7 +567,7 @@ func (tr *TaskRunner) runDriver() error {
|
||||
|
||||
//XXX Evaluate and encode driver config
|
||||
|
||||
// Register an event handler with the diver mananger to emit task events
|
||||
// Register an event handler with the driver manager to emit task events
|
||||
tr.driverManager.RegisterEventHandler(tr.Task().Driver, taskConfig.ID, tr.handleTaskEvent)
|
||||
|
||||
// If there's already a task handle (eg from a Restore) there's nothing
|
||||
|
||||
122
client/client.go
122
client/client.go
@@ -1,7 +1,6 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@@ -234,6 +233,9 @@ type Client struct {
|
||||
// baseLabels are used when emitting tagged metrics. All client metrics will
|
||||
// have these tags, and optionally more.
|
||||
baseLabels []metrics.Label
|
||||
|
||||
// batchNodeUpdates is used to batch initial updates to the node
|
||||
batchNodeUpdates *batchNodeUpdates
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -280,6 +282,11 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
|
||||
}
|
||||
|
||||
c.batchNodeUpdates = newBatchNodeUpdates(
|
||||
c.updateNodeFromDriver,
|
||||
c.updateNodeFromDevices,
|
||||
)
|
||||
|
||||
// Initialize the server manager
|
||||
c.servers = servers.New(c.logger, c.shutdownCh, c)
|
||||
|
||||
@@ -327,7 +334,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
Logger: c.logger,
|
||||
Loader: c.configCopy.PluginSingletonLoader,
|
||||
PluginConfig: c.configCopy.NomadPluginConfig(),
|
||||
Updater: c.updateNodeFromDriver,
|
||||
Updater: c.batchNodeUpdates.updateNodeFromDriver,
|
||||
State: c.stateDB,
|
||||
AllowedDrivers: allowlistDrivers,
|
||||
BlockedDrivers: blocklistDrivers,
|
||||
@@ -341,7 +348,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
Logger: c.logger,
|
||||
Loader: c.configCopy.PluginSingletonLoader,
|
||||
PluginConfig: c.configCopy.NomadPluginConfig(),
|
||||
Updater: c.updateNodeFromDevices,
|
||||
Updater: c.batchNodeUpdates.updateNodeFromDevices,
|
||||
StatsInterval: c.configCopy.StatsCollectionInterval,
|
||||
State: c.stateDB,
|
||||
}
|
||||
@@ -349,6 +356,10 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
c.devicemanager = devManager
|
||||
c.pluginManagers.RegisterAndRun(devManager)
|
||||
|
||||
// Batching of initial fingerprints is done to reduce the number of node
|
||||
// updates sent to the server on startup.
|
||||
go c.batchFirstFingerprints()
|
||||
|
||||
// Add the stats collector
|
||||
statsCollector := stats.NewHostStatsCollector(c.logger, c.config.AllocDir, c.devicemanager.AllStats)
|
||||
c.hostStatsCollector = statsCollector
|
||||
@@ -388,20 +399,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
return nil, fmt.Errorf("failed to setup vault client: %v", err)
|
||||
}
|
||||
|
||||
// Wait for plugin manangers to initialize.
|
||||
// Plugins must be initialized before restore is called otherwise restoring
|
||||
// tasks that use uninitialized plugins will fail.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), pluginmanager.DefaultManagerReadyTimeout)
|
||||
defer cancel()
|
||||
pluginReadyCh, err := c.pluginManagers.Ready(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
select {
|
||||
case <-pluginReadyCh:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
// Restore the state
|
||||
if err := c.restoreState(); err != nil {
|
||||
logger.Error("failed to restore state", "error", err)
|
||||
@@ -1185,97 +1182,6 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp
|
||||
return c.configCopy.Node
|
||||
}
|
||||
|
||||
// updateNodeFromDriver receives a DriverInfo struct for the driver and updates
|
||||
// the node accordingly
|
||||
func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) *structs.Node {
|
||||
c.configLock.Lock()
|
||||
defer c.configLock.Unlock()
|
||||
if info == nil {
|
||||
return c.configCopy.Node
|
||||
}
|
||||
|
||||
var hasChanged bool
|
||||
|
||||
hadDriver := c.config.Node.Drivers[name] != nil
|
||||
if !hadDriver {
|
||||
// If the driver info has not yet been set, do that here
|
||||
hasChanged = true
|
||||
for attrName, newVal := range info.Attributes {
|
||||
c.config.Node.Attributes[attrName] = newVal
|
||||
}
|
||||
} else {
|
||||
oldVal := c.config.Node.Drivers[name]
|
||||
// The driver info has already been set, fix it up
|
||||
if oldVal.Detected != info.Detected {
|
||||
hasChanged = true
|
||||
}
|
||||
|
||||
if oldVal.Healthy != info.Healthy || oldVal.HealthDescription != info.HealthDescription {
|
||||
hasChanged = true
|
||||
|
||||
if info.HealthDescription != "" {
|
||||
event := &structs.NodeEvent{
|
||||
Subsystem: "Driver",
|
||||
Message: info.HealthDescription,
|
||||
Timestamp: time.Now(),
|
||||
Details: map[string]string{"driver": name},
|
||||
}
|
||||
c.triggerNodeEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
for attrName, newVal := range info.Attributes {
|
||||
oldVal := c.config.Node.Drivers[name].Attributes[attrName]
|
||||
if oldVal == newVal {
|
||||
continue
|
||||
}
|
||||
|
||||
hasChanged = true
|
||||
|
||||
if newVal == "" {
|
||||
delete(c.config.Node.Attributes, attrName)
|
||||
} else {
|
||||
c.config.Node.Attributes[attrName] = newVal
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// COMPAT Remove in Nomad 0.10
|
||||
// We maintain the driver enabled attribute until all drivers expose
|
||||
// their attributes as DriverInfo
|
||||
driverName := fmt.Sprintf("driver.%s", name)
|
||||
if info.Detected {
|
||||
c.config.Node.Attributes[driverName] = "1"
|
||||
} else {
|
||||
delete(c.config.Node.Attributes, driverName)
|
||||
}
|
||||
|
||||
if hasChanged {
|
||||
c.config.Node.Drivers[name] = info
|
||||
c.config.Node.Drivers[name].UpdateTime = time.Now()
|
||||
c.updateNodeLocked()
|
||||
}
|
||||
|
||||
return c.configCopy.Node
|
||||
}
|
||||
|
||||
// updateNodeFromFingerprint updates the node with the result of
|
||||
// fingerprinting the node from the diff that was created
|
||||
func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
|
||||
c.configLock.Lock()
|
||||
defer c.configLock.Unlock()
|
||||
|
||||
// Not updating node.Resources: the field is deprecated and includes
|
||||
// dispatched task resources and not appropriate for expressing
|
||||
// node available device resources
|
||||
if !structs.DevicesEquals(c.config.Node.NodeResources.Devices, devices) {
|
||||
c.logger.Debug("new devices detected", "devices", len(devices))
|
||||
|
||||
c.config.Node.NodeResources.Devices = devices
|
||||
c.updateNodeLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// resourcesAreEqual is a temporary function to compare whether resources are
|
||||
// equal. We can use this until we change fingerprinters to set pointers on a
|
||||
// return type.
|
||||
|
||||
@@ -159,6 +159,11 @@ func (m *manager) Run() {
|
||||
}
|
||||
|
||||
// Now start the fingerprint handler
|
||||
go m.fingerprint()
|
||||
}
|
||||
|
||||
// fingerprint is the main fingerprint loop
|
||||
func (m *manager) fingerprint() {
|
||||
for {
|
||||
select {
|
||||
case <-m.ctx.Done():
|
||||
@@ -194,12 +199,18 @@ func (m *manager) Shutdown() {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *manager) Ready() <-chan struct{} {
|
||||
ctx, cancel := context.WithTimeout(m.ctx, 5*time.Second)
|
||||
func (m *manager) WaitForFirstFingerprint(ctx context.Context) <-chan struct{} {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
go func() {
|
||||
for _, i := range m.instances {
|
||||
i.WaitForFirstFingerprint(ctx)
|
||||
var wg sync.WaitGroup
|
||||
for i := range m.instances {
|
||||
wg.Add(1)
|
||||
go func(instance *instanceManager) {
|
||||
instance.WaitForFirstFingerprint(ctx)
|
||||
wg.Done()
|
||||
}(m.instances[i])
|
||||
}
|
||||
wg.Wait()
|
||||
cancel()
|
||||
}()
|
||||
return ctx.Done()
|
||||
|
||||
252
client/node_updater.go
Normal file
252
client/node_updater.go
Normal file
@@ -0,0 +1,252 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
var (
|
||||
// batchFirstFingerprintsTimeout is the maximum amount of time to wait for
|
||||
// intial fingerprinting to complete before sending a batched Node update
|
||||
batchFirstFingerprintsTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// batchFirstFingerprints waits for the first fingerprint response from all
|
||||
// plugin managers and sends a single Node update for all fingerprints
|
||||
func (c *Client) batchFirstFingerprints() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), batchFirstFingerprintsTimeout)
|
||||
defer cancel()
|
||||
|
||||
ch, err := c.pluginManagers.WaitForFirstFingerprint(ctx)
|
||||
if err != nil {
|
||||
c.logger.Warn("failed to batch initial fingerprint updates, switching to incemental updates")
|
||||
goto SEND_BATCH
|
||||
}
|
||||
|
||||
// Wait for fingerprinting to complete or timeout before processing batches
|
||||
select {
|
||||
case <-ch:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
SEND_BATCH:
|
||||
c.configLock.Lock()
|
||||
defer c.configLock.Unlock()
|
||||
|
||||
// driver node updates
|
||||
var driverChanged bool
|
||||
c.batchNodeUpdates.batchDriverUpdates(func(driver string, info *structs.DriverInfo) {
|
||||
if c.updateNodeFromDriverLocked(driver, info) {
|
||||
c.config.Node.Drivers[driver].UpdateTime = time.Now()
|
||||
driverChanged = true
|
||||
}
|
||||
})
|
||||
|
||||
// device node updates
|
||||
var devicesChanged bool
|
||||
c.batchNodeUpdates.batchDevicesUpdates(func(devices []*structs.NodeDeviceResource) {
|
||||
if c.updateNodeFromDevicesLocked(devices) {
|
||||
devicesChanged = true
|
||||
}
|
||||
})
|
||||
|
||||
// only update the node if changes occured
|
||||
if driverChanged || devicesChanged {
|
||||
c.updateNodeLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// updateNodeFromDriver receives a DriverInfo struct for the driver and updates
|
||||
// the node accordingly
|
||||
func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) {
|
||||
c.configLock.Lock()
|
||||
defer c.configLock.Unlock()
|
||||
|
||||
if c.updateNodeFromDriverLocked(name, info) {
|
||||
c.config.Node.Drivers[name].UpdateTime = time.Now()
|
||||
c.updateNodeLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// updateNodeFromDriverLocked makes the changes to the node from a driver update
|
||||
// but does not send the update to the server. c.configLock must be held before
|
||||
// calling this func
|
||||
func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInfo) bool {
|
||||
var hasChanged bool
|
||||
|
||||
hadDriver := c.config.Node.Drivers[name] != nil
|
||||
if !hadDriver {
|
||||
// If the driver info has not yet been set, do that here
|
||||
hasChanged = true
|
||||
c.config.Node.Drivers[name] = info
|
||||
for attrName, newVal := range info.Attributes {
|
||||
c.config.Node.Attributes[attrName] = newVal
|
||||
}
|
||||
} else {
|
||||
oldVal := c.config.Node.Drivers[name]
|
||||
// The driver info has already been set, fix it up
|
||||
if oldVal.Detected != info.Detected {
|
||||
hasChanged = true
|
||||
c.config.Node.Drivers[name].Detected = info.Detected
|
||||
}
|
||||
|
||||
if oldVal.Healthy != info.Healthy || oldVal.HealthDescription != info.HealthDescription {
|
||||
hasChanged = true
|
||||
if info.HealthDescription != "" {
|
||||
event := &structs.NodeEvent{
|
||||
Subsystem: "Driver",
|
||||
Message: info.HealthDescription,
|
||||
Timestamp: time.Now(),
|
||||
Details: map[string]string{"driver": name},
|
||||
}
|
||||
c.triggerNodeEvent(event)
|
||||
}
|
||||
// Update the node with the latest information
|
||||
c.config.Node.Drivers[name].MergeHealthCheck(info)
|
||||
}
|
||||
|
||||
for attrName, newVal := range info.Attributes {
|
||||
oldVal := c.config.Node.Drivers[name].Attributes[attrName]
|
||||
if oldVal == newVal {
|
||||
continue
|
||||
}
|
||||
|
||||
hasChanged = true
|
||||
if newVal == "" {
|
||||
delete(c.config.Node.Attributes, attrName)
|
||||
} else {
|
||||
c.config.Node.Attributes[attrName] = newVal
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// COMPAT Remove in Nomad 0.10
|
||||
// We maintain the driver enabled attribute until all drivers expose
|
||||
// their attributes as DriverInfo
|
||||
driverName := fmt.Sprintf("driver.%s", name)
|
||||
if info.Detected {
|
||||
c.config.Node.Attributes[driverName] = "1"
|
||||
} else {
|
||||
delete(c.config.Node.Attributes, driverName)
|
||||
}
|
||||
|
||||
return hasChanged
|
||||
}
|
||||
|
||||
// updateNodeFromFingerprint updates the node with the result of
|
||||
// fingerprinting the node from the diff that was created
|
||||
func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
|
||||
c.configLock.Lock()
|
||||
defer c.configLock.Unlock()
|
||||
|
||||
// Not updating node.Resources: the field is deprecated and includes
|
||||
// dispatched task resources and not appropriate for expressing
|
||||
// node available device resources
|
||||
if c.updateNodeFromDevicesLocked(devices) {
|
||||
c.updateNodeLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// updateNodeFromDevicesLocked updates the node with the results of devices,
|
||||
// but does send the update to the server. c.configLock must be held before
|
||||
// calling this func
|
||||
func (c *Client) updateNodeFromDevicesLocked(devices []*structs.NodeDeviceResource) bool {
|
||||
if !structs.DevicesEquals(c.config.Node.NodeResources.Devices, devices) {
|
||||
c.logger.Debug("new devices detected", "devices", len(devices))
|
||||
c.config.Node.NodeResources.Devices = devices
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// batchNodeUpdates allows for batching multiple Node updates from fingerprinting.
|
||||
// Once ready, the batches can be flushed and toggled to stop batching and forward
|
||||
// all updates to a configured callback to be performed incrementally
|
||||
type batchNodeUpdates struct {
|
||||
// access to driver fields must hold driversMu lock
|
||||
drivers map[string]*structs.DriverInfo
|
||||
driversBatched bool
|
||||
driverCB drivermanager.UpdateNodeDriverInfoFn
|
||||
driversMu sync.Mutex
|
||||
|
||||
// access to devices fields must hold devicesMu lock
|
||||
devices []*structs.NodeDeviceResource
|
||||
devicesBatched bool
|
||||
devicesCB devicemanager.UpdateNodeDevicesFn
|
||||
devicesMu sync.Mutex
|
||||
}
|
||||
|
||||
func newBatchNodeUpdates(
|
||||
driverCB drivermanager.UpdateNodeDriverInfoFn,
|
||||
devicesCB devicemanager.UpdateNodeDevicesFn) *batchNodeUpdates {
|
||||
|
||||
return &batchNodeUpdates{
|
||||
drivers: make(map[string]*structs.DriverInfo),
|
||||
driverCB: driverCB,
|
||||
devices: []*structs.NodeDeviceResource{},
|
||||
devicesCB: devicesCB,
|
||||
}
|
||||
}
|
||||
|
||||
// updateNodeFromDriver implements drivermanager.UpdateNodeDriverInfoFn and is
|
||||
// used in the driver manager to send driver fingerprints to
|
||||
func (b *batchNodeUpdates) updateNodeFromDriver(driver string, info *structs.DriverInfo) {
|
||||
b.driversMu.Lock()
|
||||
defer b.driversMu.Unlock()
|
||||
if b.driversBatched {
|
||||
b.driverCB(driver, info)
|
||||
return
|
||||
}
|
||||
|
||||
b.drivers[driver] = info
|
||||
}
|
||||
|
||||
// batchDriverUpdates sends all of the batched driver node updates by calling f
|
||||
// for each driver batched
|
||||
func (b *batchNodeUpdates) batchDriverUpdates(f drivermanager.UpdateNodeDriverInfoFn) error {
|
||||
b.driversMu.Lock()
|
||||
defer b.driversMu.Unlock()
|
||||
if b.driversBatched {
|
||||
return fmt.Errorf("driver updates already batched")
|
||||
}
|
||||
|
||||
b.driversBatched = true
|
||||
for driver, info := range b.drivers {
|
||||
f(driver, info)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateNodeFromDevices implements devicemanager.UpdateNodeDevicesFn and is
|
||||
// used in teh device manager to send device fingerprints to
|
||||
func (b *batchNodeUpdates) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
|
||||
b.devicesMu.Lock()
|
||||
defer b.devicesMu.Unlock()
|
||||
if b.devicesBatched {
|
||||
b.devicesCB(devices)
|
||||
return
|
||||
}
|
||||
|
||||
b.devices = devices
|
||||
}
|
||||
|
||||
// batchDevicesUpdates sends the batched device node updates by calling f with
|
||||
// the devices
|
||||
func (b *batchNodeUpdates) batchDevicesUpdates(f devicemanager.UpdateNodeDevicesFn) error {
|
||||
b.devicesMu.Lock()
|
||||
defer b.devicesMu.Unlock()
|
||||
if b.devicesBatched {
|
||||
return fmt.Errorf("devices updates already batched")
|
||||
}
|
||||
|
||||
b.devicesBatched = true
|
||||
f(b.devices)
|
||||
return nil
|
||||
}
|
||||
@@ -220,7 +220,7 @@ func (i *instanceManager) dispense() (plugin drivers.DriverPlugin, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to a fingerprint plugin
|
||||
// Convert to a driver plugin
|
||||
driver, ok := pluginInstance.Plugin().(drivers.DriverPlugin)
|
||||
if !ok {
|
||||
pluginInstance.Kill()
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
plugin "github.com/hashicorp/go-plugin"
|
||||
@@ -16,11 +15,22 @@ import (
|
||||
"github.com/hashicorp/nomad/plugins/shared/loader"
|
||||
)
|
||||
|
||||
// ErrDriverNotFound is returned during Dispense when the requested driver
|
||||
// plugin is not found in the plugin catalog
|
||||
var ErrDriverNotFound = fmt.Errorf("driver not found")
|
||||
|
||||
// Manager is the interface used to manage driver plugins
|
||||
type Manager interface {
|
||||
// RegisterEventHandler will cause the given EventHandler to be called when
|
||||
// an event is recieved that matches the given driver and taskID
|
||||
RegisterEventHandler(driver, taskID string, handler EventHandler)
|
||||
|
||||
// DeregisterEventHandler stops the EventHandler registered for the given
|
||||
// driver and taskID to be called if exists
|
||||
DeregisterEventHandler(driver, taskID string)
|
||||
|
||||
// Dispense returns a drivers.DriverPlugin for the given driver plugin name
|
||||
// handling reattaching to an existing driver if available
|
||||
Dispense(driver string) (drivers.DriverPlugin, error)
|
||||
}
|
||||
|
||||
@@ -42,7 +52,7 @@ type StateStorage interface {
|
||||
|
||||
// UpdateNodeDriverInfoFn is the callback used to update the node from
|
||||
// fingerprinting
|
||||
type UpdateNodeDriverInfoFn func(string, *structs.DriverInfo) *structs.Node
|
||||
type UpdateNodeDriverInfoFn func(string, *structs.DriverInfo)
|
||||
|
||||
// StorePluginReattachFn is used to store plugin reattachment configurations.
|
||||
type StorePluginReattachFn func(*plugin.ReattachConfig) error
|
||||
@@ -99,7 +109,7 @@ type manager struct {
|
||||
|
||||
// instances is the list of managed devices, access is serialized by instanceMu
|
||||
instances map[string]*instanceManager
|
||||
instancesMu sync.Mutex
|
||||
instancesMu sync.RWMutex
|
||||
|
||||
// reattachConfigs stores the plugin reattach configs
|
||||
reattachConfigs map[loader.PluginID]*shared.ReattachConfig
|
||||
@@ -139,7 +149,10 @@ func (*manager) PluginType() string { return base.PluginTypeDriver }
|
||||
// is called.
|
||||
func (m *manager) Run() {
|
||||
// Load any previous plugin reattach configuration
|
||||
m.loadReattachConfigs()
|
||||
if err := m.loadReattachConfigs(); err != nil {
|
||||
m.logger.Warn("unable to load driver plugin reattach configs, a driver process may have been leaked",
|
||||
"error", err)
|
||||
}
|
||||
|
||||
// Get driver plugins
|
||||
driversPlugins := m.loader.Catalog()[base.PluginTypeDriver]
|
||||
@@ -152,13 +165,7 @@ func (m *manager) Run() {
|
||||
var skippedDrivers []string
|
||||
for _, d := range driversPlugins {
|
||||
id := loader.PluginInfoID(d)
|
||||
// Skip drivers that are not in the allowed list if it is set.
|
||||
if _, ok := m.allowedDrivers[id.Name]; len(m.allowedDrivers) > 0 && !ok {
|
||||
skippedDrivers = append(skippedDrivers, id.Name)
|
||||
continue
|
||||
}
|
||||
// Skip fingerprinting drivers that are in the blocked list
|
||||
if _, ok := m.blockedDrivers[id.Name]; ok {
|
||||
if m.isDriverBlocked(id.Name) {
|
||||
skippedDrivers = append(skippedDrivers, id.Name)
|
||||
continue
|
||||
}
|
||||
@@ -192,9 +199,6 @@ func (m *manager) Run() {
|
||||
|
||||
// signal ready
|
||||
close(m.readyCh)
|
||||
|
||||
// wait for shutdown
|
||||
<-m.ctx.Done()
|
||||
}
|
||||
|
||||
// Shutdown cleans up all the plugins
|
||||
@@ -202,8 +206,8 @@ func (m *manager) Shutdown() {
|
||||
// Cancel the context to stop any requests
|
||||
m.cancel()
|
||||
|
||||
m.instancesMu.Lock()
|
||||
defer m.instancesMu.Unlock()
|
||||
m.instancesMu.RLock()
|
||||
defer m.instancesMu.RUnlock()
|
||||
|
||||
// Go through and shut everything down
|
||||
for _, i := range m.instances {
|
||||
@@ -211,28 +215,44 @@ func (m *manager) Shutdown() {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *manager) Ready() <-chan struct{} {
|
||||
ctx, cancel := context.WithTimeout(m.ctx, time.Second*10)
|
||||
go func() {
|
||||
defer cancel()
|
||||
// We don't want to start initial fingerprint wait until Run loop has
|
||||
// finished
|
||||
select {
|
||||
case <-m.readyCh:
|
||||
case <-m.ctx.Done():
|
||||
return
|
||||
}
|
||||
func (m *manager) WaitForFirstFingerprint(ctx context.Context) <-chan struct{} {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
go m.waitForFirstFingerprint(ctx, cancel)
|
||||
return ctx.Done()
|
||||
}
|
||||
|
||||
var availDrivers []string
|
||||
for name, instance := range m.instances {
|
||||
func (m *manager) waitForFirstFingerprint(ctx context.Context, cancel context.CancelFunc) {
|
||||
defer cancel()
|
||||
// We don't want to start initial fingerprint wait until Run loop has
|
||||
// finished
|
||||
select {
|
||||
case <-m.readyCh:
|
||||
case <-ctx.Done():
|
||||
// parent context canceled or timedout
|
||||
return
|
||||
case <-m.ctx.Done():
|
||||
// shutdown called
|
||||
return
|
||||
}
|
||||
|
||||
var availDrivers []string
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// loop through instances and wait for each to finish initial fingerprint
|
||||
m.instancesMu.RLock()
|
||||
for n, i := range m.instances {
|
||||
wg.Add(1)
|
||||
go func(name string, instance *instanceManager) {
|
||||
defer wg.Done()
|
||||
instance.WaitForFirstFingerprint(ctx)
|
||||
if instance.lastHealthState != drivers.HealthStateUndetected {
|
||||
if instance.getLastHealth() != drivers.HealthStateUndetected {
|
||||
availDrivers = append(availDrivers, name)
|
||||
}
|
||||
}
|
||||
m.logger.Debug("detected drivers", "drivers", availDrivers)
|
||||
}()
|
||||
return ctx.Done()
|
||||
}(n, i)
|
||||
}
|
||||
m.instancesMu.RUnlock()
|
||||
wg.Wait()
|
||||
m.logger.Debug("detected drivers", "drivers", availDrivers)
|
||||
}
|
||||
|
||||
func (m *manager) loadReattachConfigs() error {
|
||||
@@ -246,10 +266,15 @@ func (m *manager) loadReattachConfigs() error {
|
||||
|
||||
if s != nil {
|
||||
for name, c := range s.ReattachConfigs {
|
||||
if m.isDriverBlocked(name) {
|
||||
continue
|
||||
}
|
||||
|
||||
id := loader.PluginID{
|
||||
PluginType: base.PluginTypeDriver,
|
||||
Name: name,
|
||||
}
|
||||
|
||||
m.reattachConfigs[id] = c
|
||||
}
|
||||
}
|
||||
@@ -288,6 +313,7 @@ func (m *manager) fetchPluginReattachConfig(id loader.PluginID) (*plugin.Reattac
|
||||
c, err := shared.ReattachConfigToGoPlugin(cfg)
|
||||
if err != nil {
|
||||
m.logger.Warn("failed to read plugin reattach config", "config", cfg, "error", err)
|
||||
delete(m.reattachConfigs, id)
|
||||
return nil, false
|
||||
}
|
||||
return c, true
|
||||
@@ -296,7 +322,7 @@ func (m *manager) fetchPluginReattachConfig(id loader.PluginID) (*plugin.Reattac
|
||||
}
|
||||
|
||||
func (m *manager) RegisterEventHandler(driver, taskID string, handler EventHandler) {
|
||||
m.instancesMu.Lock()
|
||||
m.instancesMu.RLock()
|
||||
if d, ok := m.instances[driver]; ok {
|
||||
d.registerEventHandler(taskID, handler)
|
||||
}
|
||||
@@ -304,7 +330,7 @@ func (m *manager) RegisterEventHandler(driver, taskID string, handler EventHandl
|
||||
}
|
||||
|
||||
func (m *manager) DeregisterEventHandler(driver, taskID string) {
|
||||
m.instancesMu.Lock()
|
||||
m.instancesMu.RLock()
|
||||
if d, ok := m.instances[driver]; ok {
|
||||
d.deregisterEventHandler(taskID)
|
||||
}
|
||||
@@ -312,11 +338,24 @@ func (m *manager) DeregisterEventHandler(driver, taskID string) {
|
||||
}
|
||||
|
||||
func (m *manager) Dispense(d string) (drivers.DriverPlugin, error) {
|
||||
m.instancesMu.Lock()
|
||||
defer m.instancesMu.Unlock()
|
||||
m.instancesMu.RLock()
|
||||
defer m.instancesMu.RUnlock()
|
||||
if instance, ok := m.instances[d]; ok {
|
||||
return instance.dispense()
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("driver not found")
|
||||
return nil, ErrDriverNotFound
|
||||
}
|
||||
|
||||
func (m *manager) isDriverBlocked(name string) bool {
|
||||
// Block drivers that are not in the allowed list if it is set.
|
||||
if _, ok := m.allowedDrivers[name]; len(m.allowedDrivers) > 0 && !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
// Block drivers that are in the blocked list
|
||||
if _, ok := m.blockedDrivers[name]; ok {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -4,17 +4,10 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultManagerReadyTimeout is the default amount of time we will wait
|
||||
// for a plugin mananger to be ready before logging it and moving on.
|
||||
DefaultManagerReadyTimeout = time.Second * 5
|
||||
)
|
||||
|
||||
// PluginGroup is a utility struct to manage a collectively orchestrate a
|
||||
// set of PluginManagers
|
||||
type PluginGroup struct {
|
||||
@@ -47,17 +40,14 @@ func (m *PluginGroup) RegisterAndRun(manager PluginManager) error {
|
||||
}
|
||||
m.managers = append(m.managers, manager)
|
||||
|
||||
go func() {
|
||||
m.logger.Info("starting plugin manager", "plugin-type", manager.PluginType())
|
||||
manager.Run()
|
||||
m.logger.Info("plugin manager finished", "plugin-type", manager.PluginType())
|
||||
}()
|
||||
m.logger.Info("starting plugin manager", "plugin-type", manager.PluginType())
|
||||
manager.Run()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ready returns a channel which will be closed once all plugin manangers are ready.
|
||||
// A timeout for waiting on each manager is given
|
||||
func (m *PluginGroup) Ready(ctx context.Context) (<-chan struct{}, error) {
|
||||
func (m *PluginGroup) WaitForFirstFingerprint(ctx context.Context) (<-chan struct{}, error) {
|
||||
m.mLock.Lock()
|
||||
defer m.mLock.Unlock()
|
||||
if m.shutdown {
|
||||
@@ -66,15 +56,23 @@ func (m *PluginGroup) Ready(ctx context.Context) (<-chan struct{}, error) {
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := range m.managers {
|
||||
manager := m.managers[i]
|
||||
manager, ok := m.managers[i].(FingerprintingPluginManager)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
logger := m.logger.With("plugin-type", manager.PluginType())
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
logger.Debug("waiting on plugin mananger initial fingerprint")
|
||||
select {
|
||||
case <-manager.Ready():
|
||||
case <-ctx.Done():
|
||||
m.logger.Warn("timeout waiting for plugin manager to be ready",
|
||||
"plugin-type", manager.PluginType())
|
||||
case <-manager.WaitForFirstFingerprint(ctx):
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Warn("timeout waiting for plugin manager to be ready")
|
||||
default:
|
||||
logger.Debug("finished plugin mananger initial fingerprint")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -95,6 +93,7 @@ func (m *PluginGroup) Shutdown() {
|
||||
for i := len(m.managers) - 1; i >= 0; i-- {
|
||||
m.logger.Info("shutting down plugin manager", "plugin-type", m.managers[i].PluginType())
|
||||
m.managers[i].Shutdown()
|
||||
m.logger.Info("plugin manager finished", "plugin-type", m.managers[i].PluginType())
|
||||
}
|
||||
m.shutdown = true
|
||||
}
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
package pluginmanager
|
||||
|
||||
import "context"
|
||||
|
||||
// PluginManager orchestrates the lifecycle of a set of plugins
|
||||
type PluginManager interface {
|
||||
// Run starts a plugin manager and must block until shutdown
|
||||
// Run starts a plugin manager and should return early
|
||||
Run()
|
||||
|
||||
// Ready returns a channel that blocks until the plugin mananger has
|
||||
// initialized all plugins
|
||||
Ready() <-chan struct{}
|
||||
|
||||
// Shutdown should gracefully shutdown all plugins managed by the manager.
|
||||
// It must block until shutdown is complete
|
||||
Shutdown()
|
||||
@@ -16,3 +14,14 @@ type PluginManager interface {
|
||||
// PluginType is the type of plugin which the manager manages
|
||||
PluginType() string
|
||||
}
|
||||
|
||||
// FingerprintingPluginManager is an interface that exposes fingerprinting
|
||||
// coordination for plugin managers
|
||||
type FingerprintingPluginManager interface {
|
||||
PluginManager
|
||||
|
||||
// WaitForFirstFingerprint returns a channel that is closed once all plugin
|
||||
// instances managed by the plugin manager have fingerprinted once. A
|
||||
// context can be passed which when done will also close the channel
|
||||
WaitForFirstFingerprint(context.Context) <-chan struct{}
|
||||
}
|
||||
|
||||
@@ -8,8 +8,3 @@ type MockPluginManager struct {
|
||||
func (m *MockPluginManager) Run() { m.RunF() }
|
||||
func (m *MockPluginManager) Shutdown() { m.ShutdownF() }
|
||||
func (m *MockPluginManager) PluginType() string { return "mock" }
|
||||
func (m *MockPluginManager) Ready() <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user