From 42323c41d93ba30fc2d82e0385f441878ff33cc4 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 12 Mar 2020 16:24:58 -0400 Subject: [PATCH] csi: add dynamicplugins registry to client state store (#7330) In order to correctly fingerprint dynamic plugins on client restarts, we need to persist a handle to the plugin (that is, connection info) to the client state store. The dynamic registry will sync automatically to the client state whenever it receives a register/deregister call. --- client/client.go | 21 ++++--- client/dynamicplugins/registry.go | 50 ++++++++++++++-- client/dynamicplugins/registry_test.go | 55 ++++++++++++++++- client/pluginmanager/csimanager/manager.go | 3 + .../pluginmanager/csimanager/manager_test.go | 1 + client/state/db_test.go | 26 ++++++++ client/state/interface.go | 7 +++ client/state/memdb.go | 17 ++++++ client/state/noopdb.go | 9 +++ client/state/state_database.go | 59 ++++++++++++++++++- 10 files changed, 231 insertions(+), 17 deletions(-) diff --git a/client/client.go b/client/client.go index 05ce5dd6a..96100fa44 100644 --- a/client/client.go +++ b/client/client.go @@ -341,14 +341,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic invalidAllocs: make(map[string]struct{}), serversContactedCh: make(chan struct{}), serversContactedOnce: sync.Once{}, - dynamicRegistry: dynamicplugins.NewRegistry(map[string]dynamicplugins.PluginDispenser{ - dynamicplugins.PluginTypeCSIController: func(info *dynamicplugins.PluginInfo) (interface{}, error) { - return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller")) - }, - dynamicplugins.PluginTypeCSINode: func(info *dynamicplugins.PluginInfo) (interface{}, error) { - return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client")) - }, - }), } c.batchNodeUpdates = newBatchNodeUpdates( @@ -363,11 +355,22 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic // Start server manager rebalancing go routine go c.servers.Start() - // Initialize the client + // initialize the client if err := c.init(); err != nil { return nil, fmt.Errorf("failed to initialize client: %v", err) } + // initialize the dynamic registry (needs to happen after init) + c.dynamicRegistry = + dynamicplugins.NewRegistry(c.stateDB, map[string]dynamicplugins.PluginDispenser{ + dynamicplugins.PluginTypeCSIController: func(info *dynamicplugins.PluginInfo) (interface{}, error) { + return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller")) + }, + dynamicplugins.PluginTypeCSINode: func(info *dynamicplugins.PluginInfo) (interface{}, error) { + return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client")) + }, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up + }) + // Setup the clients RPC server c.setupClientRpc() diff --git a/client/dynamicplugins/registry.go b/client/dynamicplugins/registry.go index e9025e73a..b0739f0b8 100644 --- a/client/dynamicplugins/registry.go +++ b/client/dynamicplugins/registry.go @@ -31,16 +31,48 @@ type Registry interface { StubDispenserForType(ptype string, dispenser PluginDispenser) } +// RegistryState is what we persist in the client state store. It contains +// a map of plugin types to maps of plugin name -> PluginInfo. +type RegistryState struct { + Plugins map[string]map[string]*PluginInfo +} + type PluginDispenser func(info *PluginInfo) (interface{}, error) // NewRegistry takes a map of `plugintype` to PluginDispenser functions // that should be used to vend clients for plugins to be used. -func NewRegistry(dispensers map[string]PluginDispenser) Registry { - return &dynamicRegistry{ +func NewRegistry(state StateStorage, dispensers map[string]PluginDispenser) Registry { + + registry := &dynamicRegistry{ plugins: make(map[string]map[string]*PluginInfo), broadcasters: make(map[string]*pluginEventBroadcaster), dispensers: dispensers, + state: state, } + + // populate the state and initial broadcasters if we have an + // existing state DB to restore + if state != nil { + storedState, err := state.GetDynamicPluginRegistryState() + if err == nil && storedState != nil { + registry.plugins = storedState.Plugins + for ptype := range registry.plugins { + registry.broadcasterForPluginType(ptype) + } + } + } + + return registry +} + +// StateStorage is used to persist the dynamic plugin registry's state +// across agent restarts. +type StateStorage interface { + // GetDynamicPluginRegistryState is used to restore the registry state + GetDynamicPluginRegistryState() (*RegistryState, error) + + // PutDynamicPluginRegistryState is used to store the registry state + PutDynamicPluginRegistryState(state *RegistryState) error } // PluginInfo is the metadata that is stored by the registry for a given plugin. @@ -98,6 +130,8 @@ type dynamicRegistry struct { dispensers map[string]PluginDispenser stubDispensers map[string]PluginDispenser + + state StateStorage } // StubDispenserForType allows test functions to provide alternative plugin @@ -159,7 +193,7 @@ func (d *dynamicRegistry) RegisterPlugin(info *PluginInfo) error { } broadcaster.broadcast(event) - return nil + return d.sync() } func (d *dynamicRegistry) broadcasterForPluginType(ptype string) *pluginEventBroadcaster { @@ -210,7 +244,7 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error { } broadcaster.broadcast(event) - return nil + return d.sync() } func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo { @@ -296,6 +330,14 @@ func (d *dynamicRegistry) PluginsUpdatedCh(ctx context.Context, ptype string) <- return ch } +func (d *dynamicRegistry) sync() error { + if d.state != nil { + storedState := &RegistryState{Plugins: d.plugins} + return d.state.PutDynamicPluginRegistryState(storedState) + } + return nil +} + func (d *dynamicRegistry) Shutdown() { for _, b := range d.broadcasters { b.shutdown() diff --git a/client/dynamicplugins/registry_test.go b/client/dynamicplugins/registry_test.go index a3feaaac5..3e6513d18 100644 --- a/client/dynamicplugins/registry_test.go +++ b/client/dynamicplugins/registry_test.go @@ -2,6 +2,7 @@ package dynamicplugins import ( "context" + "sync" "testing" "time" @@ -65,7 +66,7 @@ func TestPluginEventBroadcaster_UnsubscribeWorks(t *testing.T) { func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) { t.Parallel() - r := NewRegistry(nil) + r := NewRegistry(nil, nil) ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() @@ -103,7 +104,7 @@ func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) { func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) { t.Parallel() - r := NewRegistry(nil) + r := NewRegistry(nil, nil) ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() @@ -148,7 +149,7 @@ func TestDynamicRegistry_DispensePlugin_Works(t *testing.T) { return struct{}{}, nil } - registry := NewRegistry(map[string]PluginDispenser{"csi": dispenseFn}) + registry := NewRegistry(nil, map[string]PluginDispenser{"csi": dispenseFn}) err := registry.RegisterPlugin(&PluginInfo{ Type: "csi", @@ -169,3 +170,51 @@ func TestDynamicRegistry_DispensePlugin_Works(t *testing.T) { require.NotNil(t, result) require.NoError(t, err) } + +func TestDynamicRegistry_StateStore(t *testing.T) { + t.Parallel() + dispenseFn := func(i *PluginInfo) (interface{}, error) { + return i, nil + } + + memdb := &MemDB{} + oldR := NewRegistry(memdb, map[string]PluginDispenser{"csi": dispenseFn}) + + err := oldR.RegisterPlugin(&PluginInfo{ + Type: "csi", + Name: "my-plugin", + ConnectionInfo: &PluginConnectionInfo{}, + }) + require.NoError(t, err) + result, err := oldR.DispensePlugin("csi", "my-plugin") + require.NotNil(t, result) + require.NoError(t, err) + + // recreate the registry from the state store and query again + newR := NewRegistry(memdb, map[string]PluginDispenser{"csi": dispenseFn}) + result, err = newR.DispensePlugin("csi", "my-plugin") + require.NotNil(t, result) + require.NoError(t, err) +} + +// MemDB implements a StateDB that stores data in memory and should only be +// used for testing. All methods are safe for concurrent use. This is a +// partial implementation of the MemDB in the client/state package, copied +// here to avoid circular dependencies. +type MemDB struct { + dynamicManagerPs *RegistryState + mu sync.RWMutex +} + +func (m *MemDB) GetDynamicPluginRegistryState() (*RegistryState, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.dynamicManagerPs, nil +} + +func (m *MemDB) PutDynamicPluginRegistryState(ps *RegistryState) error { + m.mu.Lock() + defer m.mu.Unlock() + m.dynamicManagerPs = ps + return nil +} diff --git a/client/pluginmanager/csimanager/manager.go b/client/pluginmanager/csimanager/manager.go index 9f932e384..d79b1b339 100644 --- a/client/pluginmanager/csimanager/manager.go +++ b/client/pluginmanager/csimanager/manager.go @@ -85,6 +85,9 @@ func (c *csiManager) MounterForVolume(ctx context.Context, vol *structs.CSIVolum // Run starts a plugin manager and should return early func (c *csiManager) Run() { + // Ensure we have at least one full sync before starting + c.resyncPluginsFromRegistry("csi-controller") + c.resyncPluginsFromRegistry("csi-node") go c.runLoop() } diff --git a/client/pluginmanager/csimanager/manager_test.go b/client/pluginmanager/csimanager/manager_test.go index 176dc6003..24b854ab5 100644 --- a/client/pluginmanager/csimanager/manager_test.go +++ b/client/pluginmanager/csimanager/manager_test.go @@ -21,6 +21,7 @@ var fakePlugin = &dynamicplugins.PluginInfo{ func setupRegistry() dynamicplugins.Registry { return dynamicplugins.NewRegistry( + nil, map[string]dynamicplugins.PluginDispenser{ "csi-controller": func(*dynamicplugins.PluginInfo) (interface{}, error) { return nil, nil diff --git a/client/state/db_test.go b/client/state/db_test.go index c37ed8bd2..bb63507a2 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -8,6 +8,7 @@ import ( trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/dynamicplugins" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" @@ -238,6 +239,31 @@ func TestStateDB_DriverManager(t *testing.T) { }) } +// TestStateDB_DynamicRegistry asserts the behavior of dynamic registry state related StateDB +// methods. +func TestStateDB_DynamicRegistry(t *testing.T) { + t.Parallel() + + testDB(t, func(t *testing.T, db StateDB) { + require := require.New(t) + + // Getting nonexistent state should return nils + ps, err := db.GetDynamicPluginRegistryState() + require.NoError(err) + require.Nil(ps) + + // Putting PluginState should work + state := &dynamicplugins.RegistryState{} + require.NoError(db.PutDynamicPluginRegistryState(state)) + + // Getting should return the available state + ps, err = db.GetDynamicPluginRegistryState() + require.NoError(err) + require.NotNil(ps) + require.Equal(state, ps) + }) +} + // TestStateDB_Upgrade asserts calling Upgrade on new databases always // succeeds. func TestStateDB_Upgrade(t *testing.T) { diff --git a/client/state/interface.go b/client/state/interface.go index 2624b46ea..dc492d5ec 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -3,6 +3,7 @@ package state import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/dynamicplugins" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -69,6 +70,12 @@ type StateDB interface { // state. PutDriverPluginState(state *driverstate.PluginState) error + // GetDynamicPluginRegistryState is used to retrieve a dynamic plugin manager's state. + GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) + + // PutDynamicPluginRegistryState is used to store the dynamic plugin managers's state. + PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error + // Close the database. Unsafe for further use after calling regardless // of return value. Close() error diff --git a/client/state/memdb.go b/client/state/memdb.go index 5d64870e1..63e967e45 100644 --- a/client/state/memdb.go +++ b/client/state/memdb.go @@ -6,6 +6,7 @@ import ( hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/dynamicplugins" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -29,6 +30,9 @@ type MemDB struct { // drivermanager -> plugin-state driverManagerPs *driverstate.PluginState + // dynamicmanager -> registry-state + dynamicManagerPs *dynamicplugins.RegistryState + logger hclog.Logger mu sync.RWMutex @@ -193,6 +197,19 @@ func (m *MemDB) PutDriverPluginState(ps *driverstate.PluginState) error { return nil } +func (m *MemDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.dynamicManagerPs, nil +} + +func (m *MemDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error { + m.mu.Lock() + defer m.mu.Unlock() + m.dynamicManagerPs = ps + return nil +} + func (m *MemDB) Close() error { m.mu.Lock() defer m.mu.Unlock() diff --git a/client/state/noopdb.go b/client/state/noopdb.go index 53364ecba..28fbd2c15 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -3,6 +3,7 @@ package state import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/dynamicplugins" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -70,6 +71,14 @@ func (n NoopDB) GetDriverPluginState() (*driverstate.PluginState, error) { return nil, nil } +func (n NoopDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error { + return nil +} + +func (n NoopDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) { + return nil, nil +} + func (n NoopDB) Close() error { return nil } diff --git a/client/state/state_database.go b/client/state/state_database.go index 12d2c083d..a9a958f5f 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -11,6 +11,7 @@ import ( hclog "github.com/hashicorp/go-hclog" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/dynamicplugins" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/helper/boltdd" "github.com/hashicorp/nomad/nomad/structs" @@ -35,6 +36,9 @@ devicemanager/ drivermanager/ |--> plugin_state -> *driverstate.PluginState + +dynamicplugins/ +|--> registry_state -> *dynamicplugins.RegistryState */ var ( @@ -73,13 +77,20 @@ var ( // data devManagerBucket = []byte("devicemanager") - // driverManagerBucket is the bucket name container all driver manager + // driverManagerBucket is the bucket name containing all driver manager // related data driverManagerBucket = []byte("drivermanager") // managerPluginStateKey is the key by which plugin manager plugin state is // stored at managerPluginStateKey = []byte("plugin_state") + + // dynamicPluginBucket is the bucket name containing all dynamic plugin + // registry data. each dynamic plugin registry will have its own subbucket. + dynamicPluginBucket = []byte("dynamicplugins") + + // registryStateKey is the key at which dynamic plugin registry state is stored + registryStateKey = []byte("registry_state") ) // taskBucketName returns the bucket name for the given task name. @@ -598,6 +609,52 @@ func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) { return ps, nil } +// PutDynamicPluginRegistryState stores the dynamic plugin registry's +// state or returns an error. +func (s *BoltStateDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error { + return s.db.Update(func(tx *boltdd.Tx) error { + // Retrieve the root dynamic plugin manager bucket + dynamicBkt, err := tx.CreateBucketIfNotExists(dynamicPluginBucket) + if err != nil { + return err + } + return dynamicBkt.Put(registryStateKey, ps) + }) +} + +// GetDynamicPluginRegistryState stores the dynamic plugin registry's +// registry state or returns an error. +func (s *BoltStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) { + var ps *dynamicplugins.RegistryState + + err := s.db.View(func(tx *boltdd.Tx) error { + dynamicBkt := tx.Bucket(dynamicPluginBucket) + if dynamicBkt == nil { + // No state, return + return nil + } + + // Restore Plugin State if it exists + ps = &dynamicplugins.RegistryState{} + if err := dynamicBkt.Get(registryStateKey, ps); err != nil { + if !boltdd.IsErrNotFound(err) { + return fmt.Errorf("failed to read dynamic plugin registry state: %v", err) + } + + // Key not found, reset ps to nil + ps = nil + } + + return nil + }) + + if err != nil { + return nil, err + } + + return ps, nil +} + // init initializes metadata entries in a newly created state database. func (s *BoltStateDB) init() error { return s.db.Update(func(tx *boltdd.Tx) error {