diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index b1749e350..fbd3578ef 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -567,7 +567,7 @@ func (tr *TaskRunner) runDriver() error { //XXX Evaluate and encode driver config - // Register an event handler with the diver mananger to emit task events + // Register an event handler with the driver manager to emit task events tr.driverManager.RegisterEventHandler(tr.Task().Driver, taskConfig.ID, tr.handleTaskEvent) // If there's already a task handle (eg from a Restore) there's nothing diff --git a/client/client.go b/client/client.go index 538835f07..1ea068887 100644 --- a/client/client.go +++ b/client/client.go @@ -1,7 +1,6 @@ package client import ( - "context" "errors" "fmt" "io/ioutil" @@ -234,6 +233,9 @@ type Client struct { // baseLabels are used when emitting tagged metrics. All client metrics will // have these tags, and optionally more. baseLabels []metrics.Label + + // batchNodeUpdates is used to batch initial updates to the node + batchNodeUpdates *batchNodeUpdates } var ( @@ -280,6 +282,11 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8), } + c.batchNodeUpdates = newBatchNodeUpdates( + c.updateNodeFromDriver, + c.updateNodeFromDevices, + ) + // Initialize the server manager c.servers = servers.New(c.logger, c.shutdownCh, c) @@ -327,7 +334,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic Logger: c.logger, Loader: c.configCopy.PluginSingletonLoader, PluginConfig: c.configCopy.NomadPluginConfig(), - Updater: c.updateNodeFromDriver, + Updater: c.batchNodeUpdates.updateNodeFromDriver, State: c.stateDB, AllowedDrivers: allowlistDrivers, BlockedDrivers: blocklistDrivers, @@ -341,7 +348,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic Logger: c.logger, Loader: c.configCopy.PluginSingletonLoader, PluginConfig: c.configCopy.NomadPluginConfig(), - Updater: c.updateNodeFromDevices, + Updater: c.batchNodeUpdates.updateNodeFromDevices, StatsInterval: c.configCopy.StatsCollectionInterval, State: c.stateDB, } @@ -349,6 +356,10 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic c.devicemanager = devManager c.pluginManagers.RegisterAndRun(devManager) + // Batching of initial fingerprints is done to reduce the number of node + // updates sent to the server on startup. + go c.batchFirstFingerprints() + // Add the stats collector statsCollector := stats.NewHostStatsCollector(c.logger, c.config.AllocDir, c.devicemanager.AllStats) c.hostStatsCollector = statsCollector @@ -388,20 +399,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic return nil, fmt.Errorf("failed to setup vault client: %v", err) } - // Wait for plugin manangers to initialize. - // Plugins must be initialized before restore is called otherwise restoring - // tasks that use uninitialized plugins will fail. - ctx, cancel := context.WithTimeout(context.Background(), pluginmanager.DefaultManagerReadyTimeout) - defer cancel() - pluginReadyCh, err := c.pluginManagers.Ready(ctx) - if err != nil { - return nil, err - } - select { - case <-pluginReadyCh: - case <-ctx.Done(): - } - // Restore the state if err := c.restoreState(); err != nil { logger.Error("failed to restore state", "error", err) @@ -1185,97 +1182,6 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp return c.configCopy.Node } -// updateNodeFromDriver receives a DriverInfo struct for the driver and updates -// the node accordingly -func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) *structs.Node { - c.configLock.Lock() - defer c.configLock.Unlock() - if info == nil { - return c.configCopy.Node - } - - var hasChanged bool - - hadDriver := c.config.Node.Drivers[name] != nil - if !hadDriver { - // If the driver info has not yet been set, do that here - hasChanged = true - for attrName, newVal := range info.Attributes { - c.config.Node.Attributes[attrName] = newVal - } - } else { - oldVal := c.config.Node.Drivers[name] - // The driver info has already been set, fix it up - if oldVal.Detected != info.Detected { - hasChanged = true - } - - if oldVal.Healthy != info.Healthy || oldVal.HealthDescription != info.HealthDescription { - hasChanged = true - - if info.HealthDescription != "" { - event := &structs.NodeEvent{ - Subsystem: "Driver", - Message: info.HealthDescription, - Timestamp: time.Now(), - Details: map[string]string{"driver": name}, - } - c.triggerNodeEvent(event) - } - } - - for attrName, newVal := range info.Attributes { - oldVal := c.config.Node.Drivers[name].Attributes[attrName] - if oldVal == newVal { - continue - } - - hasChanged = true - - if newVal == "" { - delete(c.config.Node.Attributes, attrName) - } else { - c.config.Node.Attributes[attrName] = newVal - } - } - } - - // COMPAT Remove in Nomad 0.10 - // We maintain the driver enabled attribute until all drivers expose - // their attributes as DriverInfo - driverName := fmt.Sprintf("driver.%s", name) - if info.Detected { - c.config.Node.Attributes[driverName] = "1" - } else { - delete(c.config.Node.Attributes, driverName) - } - - if hasChanged { - c.config.Node.Drivers[name] = info - c.config.Node.Drivers[name].UpdateTime = time.Now() - c.updateNodeLocked() - } - - return c.configCopy.Node -} - -// updateNodeFromFingerprint updates the node with the result of -// fingerprinting the node from the diff that was created -func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) { - c.configLock.Lock() - defer c.configLock.Unlock() - - // Not updating node.Resources: the field is deprecated and includes - // dispatched task resources and not appropriate for expressing - // node available device resources - if !structs.DevicesEquals(c.config.Node.NodeResources.Devices, devices) { - c.logger.Debug("new devices detected", "devices", len(devices)) - - c.config.Node.NodeResources.Devices = devices - c.updateNodeLocked() - } -} - // resourcesAreEqual is a temporary function to compare whether resources are // equal. We can use this until we change fingerprinters to set pointers on a // return type. diff --git a/client/devicemanager/manager.go b/client/devicemanager/manager.go index 94e075b6a..38949bf20 100644 --- a/client/devicemanager/manager.go +++ b/client/devicemanager/manager.go @@ -159,6 +159,11 @@ func (m *manager) Run() { } // Now start the fingerprint handler + go m.fingerprint() +} + +// fingerprint is the main fingerprint loop +func (m *manager) fingerprint() { for { select { case <-m.ctx.Done(): @@ -194,12 +199,18 @@ func (m *manager) Shutdown() { } } -func (m *manager) Ready() <-chan struct{} { - ctx, cancel := context.WithTimeout(m.ctx, 5*time.Second) +func (m *manager) WaitForFirstFingerprint(ctx context.Context) <-chan struct{} { + ctx, cancel := context.WithCancel(ctx) go func() { - for _, i := range m.instances { - i.WaitForFirstFingerprint(ctx) + var wg sync.WaitGroup + for i := range m.instances { + wg.Add(1) + go func(instance *instanceManager) { + instance.WaitForFirstFingerprint(ctx) + wg.Done() + }(m.instances[i]) } + wg.Wait() cancel() }() return ctx.Done() diff --git a/client/node_updater.go b/client/node_updater.go new file mode 100644 index 000000000..2757aa0da --- /dev/null +++ b/client/node_updater.go @@ -0,0 +1,252 @@ +package client + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/hashicorp/nomad/client/devicemanager" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + "github.com/hashicorp/nomad/nomad/structs" +) + +var ( + // batchFirstFingerprintsTimeout is the maximum amount of time to wait for + // intial fingerprinting to complete before sending a batched Node update + batchFirstFingerprintsTimeout = 5 * time.Second +) + +// batchFirstFingerprints waits for the first fingerprint response from all +// plugin managers and sends a single Node update for all fingerprints +func (c *Client) batchFirstFingerprints() { + ctx, cancel := context.WithTimeout(context.Background(), batchFirstFingerprintsTimeout) + defer cancel() + + ch, err := c.pluginManagers.WaitForFirstFingerprint(ctx) + if err != nil { + c.logger.Warn("failed to batch initial fingerprint updates, switching to incemental updates") + goto SEND_BATCH + } + + // Wait for fingerprinting to complete or timeout before processing batches + select { + case <-ch: + case <-ctx.Done(): + } + +SEND_BATCH: + c.configLock.Lock() + defer c.configLock.Unlock() + + // driver node updates + var driverChanged bool + c.batchNodeUpdates.batchDriverUpdates(func(driver string, info *structs.DriverInfo) { + if c.updateNodeFromDriverLocked(driver, info) { + c.config.Node.Drivers[driver].UpdateTime = time.Now() + driverChanged = true + } + }) + + // device node updates + var devicesChanged bool + c.batchNodeUpdates.batchDevicesUpdates(func(devices []*structs.NodeDeviceResource) { + if c.updateNodeFromDevicesLocked(devices) { + devicesChanged = true + } + }) + + // only update the node if changes occured + if driverChanged || devicesChanged { + c.updateNodeLocked() + } +} + +// updateNodeFromDriver receives a DriverInfo struct for the driver and updates +// the node accordingly +func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) { + c.configLock.Lock() + defer c.configLock.Unlock() + + if c.updateNodeFromDriverLocked(name, info) { + c.config.Node.Drivers[name].UpdateTime = time.Now() + c.updateNodeLocked() + } +} + +// updateNodeFromDriverLocked makes the changes to the node from a driver update +// but does not send the update to the server. c.configLock must be held before +// calling this func +func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInfo) bool { + var hasChanged bool + + hadDriver := c.config.Node.Drivers[name] != nil + if !hadDriver { + // If the driver info has not yet been set, do that here + hasChanged = true + c.config.Node.Drivers[name] = info + for attrName, newVal := range info.Attributes { + c.config.Node.Attributes[attrName] = newVal + } + } else { + oldVal := c.config.Node.Drivers[name] + // The driver info has already been set, fix it up + if oldVal.Detected != info.Detected { + hasChanged = true + c.config.Node.Drivers[name].Detected = info.Detected + } + + if oldVal.Healthy != info.Healthy || oldVal.HealthDescription != info.HealthDescription { + hasChanged = true + if info.HealthDescription != "" { + event := &structs.NodeEvent{ + Subsystem: "Driver", + Message: info.HealthDescription, + Timestamp: time.Now(), + Details: map[string]string{"driver": name}, + } + c.triggerNodeEvent(event) + } + // Update the node with the latest information + c.config.Node.Drivers[name].MergeHealthCheck(info) + } + + for attrName, newVal := range info.Attributes { + oldVal := c.config.Node.Drivers[name].Attributes[attrName] + if oldVal == newVal { + continue + } + + hasChanged = true + if newVal == "" { + delete(c.config.Node.Attributes, attrName) + } else { + c.config.Node.Attributes[attrName] = newVal + } + } + } + + // COMPAT Remove in Nomad 0.10 + // We maintain the driver enabled attribute until all drivers expose + // their attributes as DriverInfo + driverName := fmt.Sprintf("driver.%s", name) + if info.Detected { + c.config.Node.Attributes[driverName] = "1" + } else { + delete(c.config.Node.Attributes, driverName) + } + + return hasChanged +} + +// updateNodeFromFingerprint updates the node with the result of +// fingerprinting the node from the diff that was created +func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) { + c.configLock.Lock() + defer c.configLock.Unlock() + + // Not updating node.Resources: the field is deprecated and includes + // dispatched task resources and not appropriate for expressing + // node available device resources + if c.updateNodeFromDevicesLocked(devices) { + c.updateNodeLocked() + } +} + +// updateNodeFromDevicesLocked updates the node with the results of devices, +// but does send the update to the server. c.configLock must be held before +// calling this func +func (c *Client) updateNodeFromDevicesLocked(devices []*structs.NodeDeviceResource) bool { + if !structs.DevicesEquals(c.config.Node.NodeResources.Devices, devices) { + c.logger.Debug("new devices detected", "devices", len(devices)) + c.config.Node.NodeResources.Devices = devices + return true + } + + return false +} + +// batchNodeUpdates allows for batching multiple Node updates from fingerprinting. +// Once ready, the batches can be flushed and toggled to stop batching and forward +// all updates to a configured callback to be performed incrementally +type batchNodeUpdates struct { + // access to driver fields must hold driversMu lock + drivers map[string]*structs.DriverInfo + driversBatched bool + driverCB drivermanager.UpdateNodeDriverInfoFn + driversMu sync.Mutex + + // access to devices fields must hold devicesMu lock + devices []*structs.NodeDeviceResource + devicesBatched bool + devicesCB devicemanager.UpdateNodeDevicesFn + devicesMu sync.Mutex +} + +func newBatchNodeUpdates( + driverCB drivermanager.UpdateNodeDriverInfoFn, + devicesCB devicemanager.UpdateNodeDevicesFn) *batchNodeUpdates { + + return &batchNodeUpdates{ + drivers: make(map[string]*structs.DriverInfo), + driverCB: driverCB, + devices: []*structs.NodeDeviceResource{}, + devicesCB: devicesCB, + } +} + +// updateNodeFromDriver implements drivermanager.UpdateNodeDriverInfoFn and is +// used in the driver manager to send driver fingerprints to +func (b *batchNodeUpdates) updateNodeFromDriver(driver string, info *structs.DriverInfo) { + b.driversMu.Lock() + defer b.driversMu.Unlock() + if b.driversBatched { + b.driverCB(driver, info) + return + } + + b.drivers[driver] = info +} + +// batchDriverUpdates sends all of the batched driver node updates by calling f +// for each driver batched +func (b *batchNodeUpdates) batchDriverUpdates(f drivermanager.UpdateNodeDriverInfoFn) error { + b.driversMu.Lock() + defer b.driversMu.Unlock() + if b.driversBatched { + return fmt.Errorf("driver updates already batched") + } + + b.driversBatched = true + for driver, info := range b.drivers { + f(driver, info) + } + return nil +} + +// updateNodeFromDevices implements devicemanager.UpdateNodeDevicesFn and is +// used in teh device manager to send device fingerprints to +func (b *batchNodeUpdates) updateNodeFromDevices(devices []*structs.NodeDeviceResource) { + b.devicesMu.Lock() + defer b.devicesMu.Unlock() + if b.devicesBatched { + b.devicesCB(devices) + return + } + + b.devices = devices +} + +// batchDevicesUpdates sends the batched device node updates by calling f with +// the devices +func (b *batchNodeUpdates) batchDevicesUpdates(f devicemanager.UpdateNodeDevicesFn) error { + b.devicesMu.Lock() + defer b.devicesMu.Unlock() + if b.devicesBatched { + return fmt.Errorf("devices updates already batched") + } + + b.devicesBatched = true + f(b.devices) + return nil +} diff --git a/client/pluginmanager/drivermanager/instance.go b/client/pluginmanager/drivermanager/instance.go index af467f16e..8a7770adb 100644 --- a/client/pluginmanager/drivermanager/instance.go +++ b/client/pluginmanager/drivermanager/instance.go @@ -220,7 +220,7 @@ func (i *instanceManager) dispense() (plugin drivers.DriverPlugin, err error) { } } - // Convert to a fingerprint plugin + // Convert to a driver plugin driver, ok := pluginInstance.Plugin().(drivers.DriverPlugin) if !ok { pluginInstance.Kill() diff --git a/client/pluginmanager/drivermanager/manager.go b/client/pluginmanager/drivermanager/manager.go index 30f3cfc1c..eb7c746b3 100644 --- a/client/pluginmanager/drivermanager/manager.go +++ b/client/pluginmanager/drivermanager/manager.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sync" - "time" log "github.com/hashicorp/go-hclog" plugin "github.com/hashicorp/go-plugin" @@ -16,11 +15,22 @@ import ( "github.com/hashicorp/nomad/plugins/shared/loader" ) +// ErrDriverNotFound is returned during Dispense when the requested driver +// plugin is not found in the plugin catalog +var ErrDriverNotFound = fmt.Errorf("driver not found") + // Manager is the interface used to manage driver plugins type Manager interface { + // RegisterEventHandler will cause the given EventHandler to be called when + // an event is recieved that matches the given driver and taskID RegisterEventHandler(driver, taskID string, handler EventHandler) + + // DeregisterEventHandler stops the EventHandler registered for the given + // driver and taskID to be called if exists DeregisterEventHandler(driver, taskID string) + // Dispense returns a drivers.DriverPlugin for the given driver plugin name + // handling reattaching to an existing driver if available Dispense(driver string) (drivers.DriverPlugin, error) } @@ -42,7 +52,7 @@ type StateStorage interface { // UpdateNodeDriverInfoFn is the callback used to update the node from // fingerprinting -type UpdateNodeDriverInfoFn func(string, *structs.DriverInfo) *structs.Node +type UpdateNodeDriverInfoFn func(string, *structs.DriverInfo) // StorePluginReattachFn is used to store plugin reattachment configurations. type StorePluginReattachFn func(*plugin.ReattachConfig) error @@ -99,7 +109,7 @@ type manager struct { // instances is the list of managed devices, access is serialized by instanceMu instances map[string]*instanceManager - instancesMu sync.Mutex + instancesMu sync.RWMutex // reattachConfigs stores the plugin reattach configs reattachConfigs map[loader.PluginID]*shared.ReattachConfig @@ -139,7 +149,10 @@ func (*manager) PluginType() string { return base.PluginTypeDriver } // is called. func (m *manager) Run() { // Load any previous plugin reattach configuration - m.loadReattachConfigs() + if err := m.loadReattachConfigs(); err != nil { + m.logger.Warn("unable to load driver plugin reattach configs, a driver process may have been leaked", + "error", err) + } // Get driver plugins driversPlugins := m.loader.Catalog()[base.PluginTypeDriver] @@ -152,13 +165,7 @@ func (m *manager) Run() { var skippedDrivers []string for _, d := range driversPlugins { id := loader.PluginInfoID(d) - // Skip drivers that are not in the allowed list if it is set. - if _, ok := m.allowedDrivers[id.Name]; len(m.allowedDrivers) > 0 && !ok { - skippedDrivers = append(skippedDrivers, id.Name) - continue - } - // Skip fingerprinting drivers that are in the blocked list - if _, ok := m.blockedDrivers[id.Name]; ok { + if m.isDriverBlocked(id.Name) { skippedDrivers = append(skippedDrivers, id.Name) continue } @@ -192,9 +199,6 @@ func (m *manager) Run() { // signal ready close(m.readyCh) - - // wait for shutdown - <-m.ctx.Done() } // Shutdown cleans up all the plugins @@ -202,8 +206,8 @@ func (m *manager) Shutdown() { // Cancel the context to stop any requests m.cancel() - m.instancesMu.Lock() - defer m.instancesMu.Unlock() + m.instancesMu.RLock() + defer m.instancesMu.RUnlock() // Go through and shut everything down for _, i := range m.instances { @@ -211,28 +215,44 @@ func (m *manager) Shutdown() { } } -func (m *manager) Ready() <-chan struct{} { - ctx, cancel := context.WithTimeout(m.ctx, time.Second*10) - go func() { - defer cancel() - // We don't want to start initial fingerprint wait until Run loop has - // finished - select { - case <-m.readyCh: - case <-m.ctx.Done(): - return - } +func (m *manager) WaitForFirstFingerprint(ctx context.Context) <-chan struct{} { + ctx, cancel := context.WithCancel(ctx) + go m.waitForFirstFingerprint(ctx, cancel) + return ctx.Done() +} - var availDrivers []string - for name, instance := range m.instances { +func (m *manager) waitForFirstFingerprint(ctx context.Context, cancel context.CancelFunc) { + defer cancel() + // We don't want to start initial fingerprint wait until Run loop has + // finished + select { + case <-m.readyCh: + case <-ctx.Done(): + // parent context canceled or timedout + return + case <-m.ctx.Done(): + // shutdown called + return + } + + var availDrivers []string + var wg sync.WaitGroup + + // loop through instances and wait for each to finish initial fingerprint + m.instancesMu.RLock() + for n, i := range m.instances { + wg.Add(1) + go func(name string, instance *instanceManager) { + defer wg.Done() instance.WaitForFirstFingerprint(ctx) - if instance.lastHealthState != drivers.HealthStateUndetected { + if instance.getLastHealth() != drivers.HealthStateUndetected { availDrivers = append(availDrivers, name) } - } - m.logger.Debug("detected drivers", "drivers", availDrivers) - }() - return ctx.Done() + }(n, i) + } + m.instancesMu.RUnlock() + wg.Wait() + m.logger.Debug("detected drivers", "drivers", availDrivers) } func (m *manager) loadReattachConfigs() error { @@ -246,10 +266,15 @@ func (m *manager) loadReattachConfigs() error { if s != nil { for name, c := range s.ReattachConfigs { + if m.isDriverBlocked(name) { + continue + } + id := loader.PluginID{ PluginType: base.PluginTypeDriver, Name: name, } + m.reattachConfigs[id] = c } } @@ -288,6 +313,7 @@ func (m *manager) fetchPluginReattachConfig(id loader.PluginID) (*plugin.Reattac c, err := shared.ReattachConfigToGoPlugin(cfg) if err != nil { m.logger.Warn("failed to read plugin reattach config", "config", cfg, "error", err) + delete(m.reattachConfigs, id) return nil, false } return c, true @@ -296,7 +322,7 @@ func (m *manager) fetchPluginReattachConfig(id loader.PluginID) (*plugin.Reattac } func (m *manager) RegisterEventHandler(driver, taskID string, handler EventHandler) { - m.instancesMu.Lock() + m.instancesMu.RLock() if d, ok := m.instances[driver]; ok { d.registerEventHandler(taskID, handler) } @@ -304,7 +330,7 @@ func (m *manager) RegisterEventHandler(driver, taskID string, handler EventHandl } func (m *manager) DeregisterEventHandler(driver, taskID string) { - m.instancesMu.Lock() + m.instancesMu.RLock() if d, ok := m.instances[driver]; ok { d.deregisterEventHandler(taskID) } @@ -312,11 +338,24 @@ func (m *manager) DeregisterEventHandler(driver, taskID string) { } func (m *manager) Dispense(d string) (drivers.DriverPlugin, error) { - m.instancesMu.Lock() - defer m.instancesMu.Unlock() + m.instancesMu.RLock() + defer m.instancesMu.RUnlock() if instance, ok := m.instances[d]; ok { return instance.dispense() } - return nil, fmt.Errorf("driver not found") + return nil, ErrDriverNotFound +} + +func (m *manager) isDriverBlocked(name string) bool { + // Block drivers that are not in the allowed list if it is set. + if _, ok := m.allowedDrivers[name]; len(m.allowedDrivers) > 0 && !ok { + return true + } + + // Block drivers that are in the blocked list + if _, ok := m.blockedDrivers[name]; ok { + return true + } + return false } diff --git a/client/pluginmanager/group.go b/client/pluginmanager/group.go index 243a051eb..a246334b1 100644 --- a/client/pluginmanager/group.go +++ b/client/pluginmanager/group.go @@ -4,17 +4,10 @@ import ( "context" "fmt" "sync" - "time" log "github.com/hashicorp/go-hclog" ) -var ( - // DefaultManagerReadyTimeout is the default amount of time we will wait - // for a plugin mananger to be ready before logging it and moving on. - DefaultManagerReadyTimeout = time.Second * 5 -) - // PluginGroup is a utility struct to manage a collectively orchestrate a // set of PluginManagers type PluginGroup struct { @@ -47,17 +40,14 @@ func (m *PluginGroup) RegisterAndRun(manager PluginManager) error { } m.managers = append(m.managers, manager) - go func() { - m.logger.Info("starting plugin manager", "plugin-type", manager.PluginType()) - manager.Run() - m.logger.Info("plugin manager finished", "plugin-type", manager.PluginType()) - }() + m.logger.Info("starting plugin manager", "plugin-type", manager.PluginType()) + manager.Run() return nil } // Ready returns a channel which will be closed once all plugin manangers are ready. // A timeout for waiting on each manager is given -func (m *PluginGroup) Ready(ctx context.Context) (<-chan struct{}, error) { +func (m *PluginGroup) WaitForFirstFingerprint(ctx context.Context) (<-chan struct{}, error) { m.mLock.Lock() defer m.mLock.Unlock() if m.shutdown { @@ -66,15 +56,23 @@ func (m *PluginGroup) Ready(ctx context.Context) (<-chan struct{}, error) { var wg sync.WaitGroup for i := range m.managers { - manager := m.managers[i] + manager, ok := m.managers[i].(FingerprintingPluginManager) + if !ok { + continue + } + logger := m.logger.With("plugin-type", manager.PluginType()) wg.Add(1) go func() { defer wg.Done() + logger.Debug("waiting on plugin mananger initial fingerprint") select { - case <-manager.Ready(): - case <-ctx.Done(): - m.logger.Warn("timeout waiting for plugin manager to be ready", - "plugin-type", manager.PluginType()) + case <-manager.WaitForFirstFingerprint(ctx): + select { + case <-ctx.Done(): + logger.Warn("timeout waiting for plugin manager to be ready") + default: + logger.Debug("finished plugin mananger initial fingerprint") + } } }() } @@ -95,6 +93,7 @@ func (m *PluginGroup) Shutdown() { for i := len(m.managers) - 1; i >= 0; i-- { m.logger.Info("shutting down plugin manager", "plugin-type", m.managers[i].PluginType()) m.managers[i].Shutdown() + m.logger.Info("plugin manager finished", "plugin-type", m.managers[i].PluginType()) } m.shutdown = true } diff --git a/client/pluginmanager/manager.go b/client/pluginmanager/manager.go index 3e67d12f6..f9d83a9ee 100644 --- a/client/pluginmanager/manager.go +++ b/client/pluginmanager/manager.go @@ -1,14 +1,12 @@ package pluginmanager +import "context" + // PluginManager orchestrates the lifecycle of a set of plugins type PluginManager interface { - // Run starts a plugin manager and must block until shutdown + // Run starts a plugin manager and should return early Run() - // Ready returns a channel that blocks until the plugin mananger has - // initialized all plugins - Ready() <-chan struct{} - // Shutdown should gracefully shutdown all plugins managed by the manager. // It must block until shutdown is complete Shutdown() @@ -16,3 +14,14 @@ type PluginManager interface { // PluginType is the type of plugin which the manager manages PluginType() string } + +// FingerprintingPluginManager is an interface that exposes fingerprinting +// coordination for plugin managers +type FingerprintingPluginManager interface { + PluginManager + + // WaitForFirstFingerprint returns a channel that is closed once all plugin + // instances managed by the plugin manager have fingerprinted once. A + // context can be passed which when done will also close the channel + WaitForFirstFingerprint(context.Context) <-chan struct{} +} diff --git a/client/pluginmanager/testing.go b/client/pluginmanager/testing.go index 93504cdb5..acb378414 100644 --- a/client/pluginmanager/testing.go +++ b/client/pluginmanager/testing.go @@ -8,8 +8,3 @@ type MockPluginManager struct { func (m *MockPluginManager) Run() { m.RunF() } func (m *MockPluginManager) Shutdown() { m.ShutdownF() } func (m *MockPluginManager) PluginType() string { return "mock" } -func (m *MockPluginManager) Ready() <-chan struct{} { - ch := make(chan struct{}) - close(ch) - return ch -}