csi: add dynamicplugins registry to client state store (#7330)

In order to correctly fingerprint dynamic plugins on client restarts,
we need to persist a handle to the plugin (that is, connection info)
to the client state store.

The dynamic registry will sync automatically to the client state
whenever it receives a register/deregister call.
This commit is contained in:
Tim Gross
2020-03-12 16:24:58 -04:00
committed by Tim Gross
parent 13dea1e5bb
commit 42323c41d9
10 changed files with 231 additions and 17 deletions

View File

@@ -341,14 +341,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
invalidAllocs: make(map[string]struct{}),
serversContactedCh: make(chan struct{}),
serversContactedOnce: sync.Once{},
dynamicRegistry: dynamicplugins.NewRegistry(map[string]dynamicplugins.PluginDispenser{
dynamicplugins.PluginTypeCSIController: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller"))
},
dynamicplugins.PluginTypeCSINode: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client"))
},
}),
}
c.batchNodeUpdates = newBatchNodeUpdates(
@@ -363,11 +355,22 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
// Start server manager rebalancing go routine
go c.servers.Start()
// Initialize the client
// initialize the client
if err := c.init(); err != nil {
return nil, fmt.Errorf("failed to initialize client: %v", err)
}
// initialize the dynamic registry (needs to happen after init)
c.dynamicRegistry =
dynamicplugins.NewRegistry(c.stateDB, map[string]dynamicplugins.PluginDispenser{
dynamicplugins.PluginTypeCSIController: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller"))
},
dynamicplugins.PluginTypeCSINode: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client"))
}, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up
})
// Setup the clients RPC server
c.setupClientRpc()

View File

@@ -31,16 +31,48 @@ type Registry interface {
StubDispenserForType(ptype string, dispenser PluginDispenser)
}
// RegistryState is what we persist in the client state store. It contains
// a map of plugin types to maps of plugin name -> PluginInfo.
type RegistryState struct {
Plugins map[string]map[string]*PluginInfo
}
type PluginDispenser func(info *PluginInfo) (interface{}, error)
// NewRegistry takes a map of `plugintype` to PluginDispenser functions
// that should be used to vend clients for plugins to be used.
func NewRegistry(dispensers map[string]PluginDispenser) Registry {
return &dynamicRegistry{
func NewRegistry(state StateStorage, dispensers map[string]PluginDispenser) Registry {
registry := &dynamicRegistry{
plugins: make(map[string]map[string]*PluginInfo),
broadcasters: make(map[string]*pluginEventBroadcaster),
dispensers: dispensers,
state: state,
}
// populate the state and initial broadcasters if we have an
// existing state DB to restore
if state != nil {
storedState, err := state.GetDynamicPluginRegistryState()
if err == nil && storedState != nil {
registry.plugins = storedState.Plugins
for ptype := range registry.plugins {
registry.broadcasterForPluginType(ptype)
}
}
}
return registry
}
// StateStorage is used to persist the dynamic plugin registry's state
// across agent restarts.
type StateStorage interface {
// GetDynamicPluginRegistryState is used to restore the registry state
GetDynamicPluginRegistryState() (*RegistryState, error)
// PutDynamicPluginRegistryState is used to store the registry state
PutDynamicPluginRegistryState(state *RegistryState) error
}
// PluginInfo is the metadata that is stored by the registry for a given plugin.
@@ -98,6 +130,8 @@ type dynamicRegistry struct {
dispensers map[string]PluginDispenser
stubDispensers map[string]PluginDispenser
state StateStorage
}
// StubDispenserForType allows test functions to provide alternative plugin
@@ -159,7 +193,7 @@ func (d *dynamicRegistry) RegisterPlugin(info *PluginInfo) error {
}
broadcaster.broadcast(event)
return nil
return d.sync()
}
func (d *dynamicRegistry) broadcasterForPluginType(ptype string) *pluginEventBroadcaster {
@@ -210,7 +244,7 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
}
broadcaster.broadcast(event)
return nil
return d.sync()
}
func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo {
@@ -296,6 +330,14 @@ func (d *dynamicRegistry) PluginsUpdatedCh(ctx context.Context, ptype string) <-
return ch
}
func (d *dynamicRegistry) sync() error {
if d.state != nil {
storedState := &RegistryState{Plugins: d.plugins}
return d.state.PutDynamicPluginRegistryState(storedState)
}
return nil
}
func (d *dynamicRegistry) Shutdown() {
for _, b := range d.broadcasters {
b.shutdown()

View File

@@ -2,6 +2,7 @@ package dynamicplugins
import (
"context"
"sync"
"testing"
"time"
@@ -65,7 +66,7 @@ func TestPluginEventBroadcaster_UnsubscribeWorks(t *testing.T) {
func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) {
t.Parallel()
r := NewRegistry(nil)
r := NewRegistry(nil, nil)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
@@ -103,7 +104,7 @@ func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) {
func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {
t.Parallel()
r := NewRegistry(nil)
r := NewRegistry(nil, nil)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
@@ -148,7 +149,7 @@ func TestDynamicRegistry_DispensePlugin_Works(t *testing.T) {
return struct{}{}, nil
}
registry := NewRegistry(map[string]PluginDispenser{"csi": dispenseFn})
registry := NewRegistry(nil, map[string]PluginDispenser{"csi": dispenseFn})
err := registry.RegisterPlugin(&PluginInfo{
Type: "csi",
@@ -169,3 +170,51 @@ func TestDynamicRegistry_DispensePlugin_Works(t *testing.T) {
require.NotNil(t, result)
require.NoError(t, err)
}
func TestDynamicRegistry_StateStore(t *testing.T) {
t.Parallel()
dispenseFn := func(i *PluginInfo) (interface{}, error) {
return i, nil
}
memdb := &MemDB{}
oldR := NewRegistry(memdb, map[string]PluginDispenser{"csi": dispenseFn})
err := oldR.RegisterPlugin(&PluginInfo{
Type: "csi",
Name: "my-plugin",
ConnectionInfo: &PluginConnectionInfo{},
})
require.NoError(t, err)
result, err := oldR.DispensePlugin("csi", "my-plugin")
require.NotNil(t, result)
require.NoError(t, err)
// recreate the registry from the state store and query again
newR := NewRegistry(memdb, map[string]PluginDispenser{"csi": dispenseFn})
result, err = newR.DispensePlugin("csi", "my-plugin")
require.NotNil(t, result)
require.NoError(t, err)
}
// MemDB implements a StateDB that stores data in memory and should only be
// used for testing. All methods are safe for concurrent use. This is a
// partial implementation of the MemDB in the client/state package, copied
// here to avoid circular dependencies.
type MemDB struct {
dynamicManagerPs *RegistryState
mu sync.RWMutex
}
func (m *MemDB) GetDynamicPluginRegistryState() (*RegistryState, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.dynamicManagerPs, nil
}
func (m *MemDB) PutDynamicPluginRegistryState(ps *RegistryState) error {
m.mu.Lock()
defer m.mu.Unlock()
m.dynamicManagerPs = ps
return nil
}

View File

@@ -85,6 +85,9 @@ func (c *csiManager) MounterForVolume(ctx context.Context, vol *structs.CSIVolum
// Run starts a plugin manager and should return early
func (c *csiManager) Run() {
// Ensure we have at least one full sync before starting
c.resyncPluginsFromRegistry("csi-controller")
c.resyncPluginsFromRegistry("csi-node")
go c.runLoop()
}

View File

@@ -21,6 +21,7 @@ var fakePlugin = &dynamicplugins.PluginInfo{
func setupRegistry() dynamicplugins.Registry {
return dynamicplugins.NewRegistry(
nil,
map[string]dynamicplugins.PluginDispenser{
"csi-controller": func(*dynamicplugins.PluginInfo) (interface{}, error) {
return nil, nil

View File

@@ -8,6 +8,7 @@ import (
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
@@ -238,6 +239,31 @@ func TestStateDB_DriverManager(t *testing.T) {
})
}
// TestStateDB_DynamicRegistry asserts the behavior of dynamic registry state related StateDB
// methods.
func TestStateDB_DynamicRegistry(t *testing.T) {
t.Parallel()
testDB(t, func(t *testing.T, db StateDB) {
require := require.New(t)
// Getting nonexistent state should return nils
ps, err := db.GetDynamicPluginRegistryState()
require.NoError(err)
require.Nil(ps)
// Putting PluginState should work
state := &dynamicplugins.RegistryState{}
require.NoError(db.PutDynamicPluginRegistryState(state))
// Getting should return the available state
ps, err = db.GetDynamicPluginRegistryState()
require.NoError(err)
require.NotNil(ps)
require.Equal(state, ps)
})
}
// TestStateDB_Upgrade asserts calling Upgrade on new databases always
// succeeds.
func TestStateDB_Upgrade(t *testing.T) {

View File

@@ -3,6 +3,7 @@ package state
import (
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -69,6 +70,12 @@ type StateDB interface {
// state.
PutDriverPluginState(state *driverstate.PluginState) error
// GetDynamicPluginRegistryState is used to retrieve a dynamic plugin manager's state.
GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error)
// PutDynamicPluginRegistryState is used to store the dynamic plugin managers's state.
PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error
// Close the database. Unsafe for further use after calling regardless
// of return value.
Close() error

View File

@@ -6,6 +6,7 @@ import (
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -29,6 +30,9 @@ type MemDB struct {
// drivermanager -> plugin-state
driverManagerPs *driverstate.PluginState
// dynamicmanager -> registry-state
dynamicManagerPs *dynamicplugins.RegistryState
logger hclog.Logger
mu sync.RWMutex
@@ -193,6 +197,19 @@ func (m *MemDB) PutDriverPluginState(ps *driverstate.PluginState) error {
return nil
}
func (m *MemDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.dynamicManagerPs, nil
}
func (m *MemDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error {
m.mu.Lock()
defer m.mu.Unlock()
m.dynamicManagerPs = ps
return nil
}
func (m *MemDB) Close() error {
m.mu.Lock()
defer m.mu.Unlock()

View File

@@ -3,6 +3,7 @@ package state
import (
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -70,6 +71,14 @@ func (n NoopDB) GetDriverPluginState() (*driverstate.PluginState, error) {
return nil, nil
}
func (n NoopDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error {
return nil
}
func (n NoopDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) {
return nil, nil
}
func (n NoopDB) Close() error {
return nil
}

View File

@@ -11,6 +11,7 @@ import (
hclog "github.com/hashicorp/go-hclog"
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
"github.com/hashicorp/nomad/helper/boltdd"
"github.com/hashicorp/nomad/nomad/structs"
@@ -35,6 +36,9 @@ devicemanager/
drivermanager/
|--> plugin_state -> *driverstate.PluginState
dynamicplugins/
|--> registry_state -> *dynamicplugins.RegistryState
*/
var (
@@ -73,13 +77,20 @@ var (
// data
devManagerBucket = []byte("devicemanager")
// driverManagerBucket is the bucket name container all driver manager
// driverManagerBucket is the bucket name containing all driver manager
// related data
driverManagerBucket = []byte("drivermanager")
// managerPluginStateKey is the key by which plugin manager plugin state is
// stored at
managerPluginStateKey = []byte("plugin_state")
// dynamicPluginBucket is the bucket name containing all dynamic plugin
// registry data. each dynamic plugin registry will have its own subbucket.
dynamicPluginBucket = []byte("dynamicplugins")
// registryStateKey is the key at which dynamic plugin registry state is stored
registryStateKey = []byte("registry_state")
)
// taskBucketName returns the bucket name for the given task name.
@@ -598,6 +609,52 @@ func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) {
return ps, nil
}
// PutDynamicPluginRegistryState stores the dynamic plugin registry's
// state or returns an error.
func (s *BoltStateDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error {
return s.db.Update(func(tx *boltdd.Tx) error {
// Retrieve the root dynamic plugin manager bucket
dynamicBkt, err := tx.CreateBucketIfNotExists(dynamicPluginBucket)
if err != nil {
return err
}
return dynamicBkt.Put(registryStateKey, ps)
})
}
// GetDynamicPluginRegistryState stores the dynamic plugin registry's
// registry state or returns an error.
func (s *BoltStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) {
var ps *dynamicplugins.RegistryState
err := s.db.View(func(tx *boltdd.Tx) error {
dynamicBkt := tx.Bucket(dynamicPluginBucket)
if dynamicBkt == nil {
// No state, return
return nil
}
// Restore Plugin State if it exists
ps = &dynamicplugins.RegistryState{}
if err := dynamicBkt.Get(registryStateKey, ps); err != nil {
if !boltdd.IsErrNotFound(err) {
return fmt.Errorf("failed to read dynamic plugin registry state: %v", err)
}
// Key not found, reset ps to nil
ps = nil
}
return nil
})
if err != nil {
return nil, err
}
return ps, nil
}
// init initializes metadata entries in a newly created state database.
func (s *BoltStateDB) init() error {
return s.db.Update(func(tx *boltdd.Tx) error {