From 6aaaa396f5dbbcf7318b2d5f8eae02c3df849312 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 18 Sep 2018 10:08:46 -0700 Subject: [PATCH] singleton wrapper --- plugins/shared/loader/instance.go | 54 +++++ plugins/shared/loader/loader.go | 51 +---- plugins/shared/loader/loader_test.go | 2 +- plugins/shared/loader/testing.go | 41 ++++ plugins/shared/singleton/future.go | 54 +++++ plugins/shared/singleton/singleton.go | 116 ++++++++++ plugins/shared/singleton/singleton_test.go | 249 +++++++++++++++++++++ 7 files changed, 523 insertions(+), 44 deletions(-) create mode 100644 plugins/shared/loader/instance.go create mode 100644 plugins/shared/loader/testing.go create mode 100644 plugins/shared/singleton/future.go create mode 100644 plugins/shared/singleton/singleton.go create mode 100644 plugins/shared/singleton/singleton_test.go diff --git a/plugins/shared/loader/instance.go b/plugins/shared/loader/instance.go new file mode 100644 index 000000000..f879f5231 --- /dev/null +++ b/plugins/shared/loader/instance.go @@ -0,0 +1,54 @@ +package loader + +import plugin "github.com/hashicorp/go-plugin" + +// PluginInstance wraps an instance of a plugin. If the plugin is external, it +// provides methods to retrieve the ReattachConfig and to kill the plugin. +type PluginInstance interface { + // Internal returns if the plugin is internal + Internal() bool + + // Kill kills the plugin if it is external. It is safe to call on internal + // plugins. + Kill() + + // ReattachConfig returns the ReattachConfig and whether the plugin is internal + // or not. If the second return value is true, no ReattachConfig is possible to + // return. + ReattachConfig() (*plugin.ReattachConfig, bool) + + // Plugin returns the wrapped plugin instance. + Plugin() interface{} + + // Exited returns whether the plugin has exited + Exited() bool +} + +// internalPluginInstance wraps an internal plugin +type internalPluginInstance struct { + instance interface{} +} + +func (p *internalPluginInstance) Internal() bool { return true } +func (p *internalPluginInstance) Kill() {} +func (p *internalPluginInstance) ReattachConfig() (*plugin.ReattachConfig, bool) { return nil, false } +func (p *internalPluginInstance) Plugin() interface{} { return p.instance } +func (p *internalPluginInstance) Exited() bool { return false } + +// externalPluginInstance wraps an external plugin +type externalPluginInstance struct { + client *plugin.Client + instance interface{} +} + +func (p *externalPluginInstance) Internal() bool { return false } +func (p *externalPluginInstance) Plugin() interface{} { return p.instance } +func (p *externalPluginInstance) Exited() bool { return p.client.Exited() } + +func (p *externalPluginInstance) ReattachConfig() (*plugin.ReattachConfig, bool) { + return p.client.ReattachConfig(), true +} + +func (p *externalPluginInstance) Kill() { + p.client.Kill() +} diff --git a/plugins/shared/loader/loader.go b/plugins/shared/loader/loader.go index 514bce532..7e0f8a620 100644 --- a/plugins/shared/loader/loader.go +++ b/plugins/shared/loader/loader.go @@ -18,50 +18,15 @@ import ( type PluginCatalog interface { // Dispense returns the plugin given its name and type. This will also // configure the plugin - Dispense(name, pluginType string, logger log.Logger) (*PluginInstance, error) + Dispense(name, pluginType string, logger log.Logger) (PluginInstance, error) // Reattach is used to reattach to a previously launched external plugin. - Reattach(pluginType string, config *plugin.ReattachConfig) (*PluginInstance, error) + Reattach(name, pluginType string, config *plugin.ReattachConfig) (PluginInstance, error) // Catalog returns the catalog of all plugins keyed by plugin type Catalog() map[string][]*base.PluginInfoResponse } -// PluginInstance wraps an instance of a plugin. If the plugin is external, it -// provides methods to retrieve the ReattachConfig and to kill the plugin. -type PluginInstance struct { - client *plugin.Client - instance interface{} -} - -// Internal returns if the plugin is internal -func (p *PluginInstance) Internal() bool { - return p.client == nil -} - -// Kill kills the plugin if it is external. It is safe to call on internal -// plugins. -func (p *PluginInstance) Kill() { - if p.client != nil { - p.client.Kill() - } -} - -// ReattachConfig returns the ReattachConfig and whether the plugin is internal -// or not. If the second return value is true, no ReattachConfig is possible to -// return. -func (p *PluginInstance) ReattachConfig() (*plugin.ReattachConfig, bool) { - if p.client == nil { - return nil, false - } - return p.client.ReattachConfig(), true -} - -// Plugin returns the wrapped plugin instance. -func (p *PluginInstance) Plugin() interface{} { - return p.instance -} - // PluginLoader is used to retrieve plugins either externally or from internal // factories. type PluginLoader struct { @@ -149,7 +114,7 @@ func NewPluginLoader(config *PluginLoaderConfig) (*PluginLoader, error) { // Dispense returns a plugin instance, loading it either internally or by // launching an external plugin. -func (l *PluginLoader) Dispense(name, pluginType string, logger log.Logger) (*PluginInstance, error) { +func (l *PluginLoader) Dispense(name, pluginType string, logger log.Logger) (PluginInstance, error) { id := PluginID{ Name: name, PluginType: pluginType, @@ -160,9 +125,9 @@ func (l *PluginLoader) Dispense(name, pluginType string, logger log.Logger) (*Pl } // If the plugin is internal, launch via the factory - var instance *PluginInstance + var instance PluginInstance if pinfo.factory != nil { - instance = &PluginInstance{ + instance = &internalPluginInstance{ instance: pinfo.factory(logger), } } else { @@ -189,14 +154,14 @@ func (l *PluginLoader) Dispense(name, pluginType string, logger log.Logger) (*Pl } // Reattach reattaches to a previously launched external plugin. -func (l *PluginLoader) Reattach(pluginType string, config *plugin.ReattachConfig) (*PluginInstance, error) { +func (l *PluginLoader) Reattach(name, pluginType string, config *plugin.ReattachConfig) (PluginInstance, error) { return l.dispensePlugin(pluginType, "", nil, config, l.logger) } // dispensePlugin is used to launch or reattach to an external plugin. func (l *PluginLoader) dispensePlugin( pluginType, cmd string, args []string, reattach *plugin.ReattachConfig, - logger log.Logger) (*PluginInstance, error) { + logger log.Logger) (PluginInstance, error) { var pluginCmd *exec.Cmd if cmd != "" && reattach != nil { @@ -230,7 +195,7 @@ func (l *PluginLoader) dispensePlugin( return nil, err } - instance := &PluginInstance{ + instance := &externalPluginInstance{ client: client, instance: raw, } diff --git a/plugins/shared/loader/loader_test.go b/plugins/shared/loader/loader_test.go index 0b8db21c2..e06bfc881 100644 --- a/plugins/shared/loader/loader_test.go +++ b/plugins/shared/loader/loader_test.go @@ -809,7 +809,7 @@ func TestPluginLoader_Reattach_External(t *testing.T) { reattach, ok := p.ReattachConfig() require.True(ok) - p2, err := l.Reattach(base.PluginTypeDevice, reattach) + p2, err := l.Reattach(plugin, base.PluginTypeDevice, reattach) require.NoError(err) // Get the reattached plugin and ensure its the same diff --git a/plugins/shared/loader/testing.go b/plugins/shared/loader/testing.go new file mode 100644 index 000000000..55f369b07 --- /dev/null +++ b/plugins/shared/loader/testing.go @@ -0,0 +1,41 @@ +package loader + +import ( + log "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/plugins/base" +) + +// MockCatalog provides a mock PluginCatalog to be used for testing +type MockCatalog struct { + DispenseF func(name, pluginType string, logger log.Logger) (PluginInstance, error) + ReattachF func(name, pluginType string, config *plugin.ReattachConfig) (PluginInstance, error) + CatalogF func() map[string][]*base.PluginInfoResponse +} + +func (m *MockCatalog) Dispense(name, pluginType string, logger log.Logger) (PluginInstance, error) { + return m.DispenseF(name, pluginType, logger) +} + +func (m *MockCatalog) Reattach(name, pluginType string, config *plugin.ReattachConfig) (PluginInstance, error) { + return m.ReattachF(name, pluginType, config) +} + +func (m *MockCatalog) Catalog() map[string][]*base.PluginInfoResponse { + return m.CatalogF() +} + +// MockInstance provides a mock PluginInstance to be used for testing +type MockInstance struct { + InternalPlugin bool + KillF func() + ReattachConfigF func() (*plugin.ReattachConfig, bool) + PluginF func() interface{} + ExitedF func() bool +} + +func (m *MockInstance) Internal() bool { return m.InternalPlugin } +func (m *MockInstance) Kill() { m.KillF() } +func (m *MockInstance) ReattachConfig() (*plugin.ReattachConfig, bool) { return m.ReattachConfigF() } +func (m *MockInstance) Plugin() interface{} { return m.PluginF() } +func (m *MockInstance) Exited() bool { return m.ExitedF() } diff --git a/plugins/shared/singleton/future.go b/plugins/shared/singleton/future.go new file mode 100644 index 000000000..10b8a7b65 --- /dev/null +++ b/plugins/shared/singleton/future.go @@ -0,0 +1,54 @@ +package singleton + +import ( + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/plugins/shared/loader" +) + +// future is a sharable future for retrieving a plugin instance or any error +// that may have occurred during the creation. +type future struct { + waitCh chan struct{} + id string + + err error + instance loader.PluginInstance +} + +// newFuture returns a new pull future +func newFuture() *future { + return &future{ + waitCh: make(chan struct{}), + id: uuid.Generate(), + } +} + +func (f *future) equal(o *future) bool { + if f == nil && o == nil { + return true + } else if f != nil && o != nil { + return f.id == o.id + } else { + return false + } +} + +// wait waits till the future has a result +func (f *future) wait() *future { + <-f.waitCh + return f +} + +// result returns the results of the future and should only ever be called after +// wait returns. +func (f *future) result() (loader.PluginInstance, error) { + return f.instance, f.err +} + +// set is used to set the results and unblock any waiter. This may only be +// called once. +func (f *future) set(instance loader.PluginInstance, err error) { + f.instance = instance + f.err = err + close(f.waitCh) +} diff --git a/plugins/shared/singleton/singleton.go b/plugins/shared/singleton/singleton.go new file mode 100644 index 000000000..4e3d02106 --- /dev/null +++ b/plugins/shared/singleton/singleton.go @@ -0,0 +1,116 @@ +package singleton + +import ( + "fmt" + "sync" + + log "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/shared/loader" +) + +// SingletonLoader is used to only load a single external plugin at a time. +type SingletonLoader struct { + // Loader is the underlying plugin loader that we wrap to give a singleton + // behavior. + loader loader.PluginCatalog + + // instances is a mapping of the plugin to a future which holds a plugin + // instance + instances map[loader.PluginID]*future + instanceLock sync.Mutex + + // logger is the logger used by the singleton + logger log.Logger +} + +// NewSingletonLoader wraps a plugin catalog and provides singleton behavior on +// top by caching running instances. +func NewSingletonLoader(logger log.Logger, catalog loader.PluginCatalog) *SingletonLoader { + return &SingletonLoader{ + loader: catalog, + logger: logger.Named("singleton_plugin_loader"), + instances: make(map[loader.PluginID]*future, 4), + } +} + +// Catalog returns the catalog of all plugins keyed by plugin type +func (s *SingletonLoader) Catalog() map[string][]*base.PluginInfoResponse { + return s.loader.Catalog() +} + +// Dispense returns the plugin given its name and type. This will also +// configure the plugin. If there is an instance of an already running plugin, +// this is used. +func (s *SingletonLoader) Dispense(name, pluginType string, logger log.Logger) (loader.PluginInstance, error) { + return s.getPlugin(false, name, pluginType, logger, nil) +} + +// Reattach is used to reattach to a previously launched external plugin. +func (s *SingletonLoader) Reattach(name, pluginType string, config *plugin.ReattachConfig) (loader.PluginInstance, error) { + return s.getPlugin(true, name, pluginType, nil, config) +} + +// getPlugin is a helper that either dispenses or reattaches to a plugin using +// futures to ensure only a single instance is retrieved +func (s *SingletonLoader) getPlugin(reattach bool, name, pluginType string, logger log.Logger, config *plugin.ReattachConfig) (loader.PluginInstance, error) { + // Lock the instance map to prevent races + s.instanceLock.Lock() + + // Check if there is a future already + id := loader.PluginID{Name: name, PluginType: pluginType} + f, ok := s.instances[id] + + // Create the future and go get a plugin + if !ok { + f = newFuture() + s.instances[id] = f + + if reattach { + go s.reattach(f, name, pluginType, config) + } else { + go s.dispense(f, name, pluginType, logger) + } + } + + // Unlock so that the created future can be shared + s.instanceLock.Unlock() + + i, err := f.wait().result() + if err != nil { + s.clearFuture(id, f) + return nil, err + } + + if i.Exited() { + s.clearFuture(id, f) + return nil, fmt.Errorf("plugin %q exited", id) + } + + return i, nil +} + +// dispense should be called in a go routine to not block and creates the +// desired plugin, setting the results in the future. +func (s *SingletonLoader) dispense(f *future, name, pluginType string, logger log.Logger) { + i, err := s.loader.Dispense(name, pluginType, logger) + f.set(i, err) +} + +// reattach should be called in a go routine to not block and reattaches to the +// desired plugin, setting the results in the future. +func (s *SingletonLoader) reattach(f *future, name, pluginType string, config *plugin.ReattachConfig) { + i, err := s.loader.Reattach(name, pluginType, config) + f.set(i, err) +} + +// clearFuture clears the future from the instances map only if the futures +// match. This prevents clearing the unintented instance. +func (s *SingletonLoader) clearFuture(id loader.PluginID, f *future) { + s.instanceLock.Lock() + defer s.instanceLock.Unlock() + if f.equal(s.instances[id]) { + delete(s.instances, id) + } +} diff --git a/plugins/shared/singleton/singleton_test.go b/plugins/shared/singleton/singleton_test.go new file mode 100644 index 000000000..71dd204c5 --- /dev/null +++ b/plugins/shared/singleton/singleton_test.go @@ -0,0 +1,249 @@ +package singleton + +import ( + "fmt" + "sync" + "testing" + "time" + + log "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/shared/loader" + "github.com/stretchr/testify/require" +) + +func harness(t *testing.T) (*SingletonLoader, *loader.MockCatalog) { + c := &loader.MockCatalog{} + s := NewSingletonLoader(testlog.HCLogger(t), c) + return s, c +} + +// Test that multiple dispenses return the same instance +func TestSingleton_Dispense(t *testing.T) { + t.Parallel() + require := require.New(t) + + dispenseCalled := 0 + s, c := harness(t) + c.DispenseF = func(_, _ string, _ log.Logger) (loader.PluginInstance, error) { + p := &base.MockPlugin{} + i := &loader.MockInstance{ + ExitedF: func() bool { return false }, + PluginF: func() interface{} { return p }, + } + dispenseCalled++ + return i, nil + } + + // Retrieve the plugin many times in parallel + const count = 128 + var l sync.Mutex + var wg sync.WaitGroup + plugins := make(map[interface{}]struct{}, 1) + waitCh := make(chan struct{}) + for i := 0; i < count; i++ { + wg.Add(1) + go func() { + // Wait for unblock + <-waitCh + + // Retrieve the plugin + p1, err := s.Dispense("foo", "bar", testlog.HCLogger(t)) + require.NotNil(p1) + require.NoError(err) + i1 := p1.Plugin() + require.NotNil(i1) + l.Lock() + plugins[i1] = struct{}{} + l.Unlock() + wg.Done() + }() + } + time.Sleep(10 * time.Millisecond) + close(waitCh) + wg.Wait() + require.Len(plugins, 1) + require.Equal(1, dispenseCalled) +} + +// Test that after a plugin is dispensed, if it exits, an error is returned on +// the next dispense +func TestSingleton_Dispense_Exit_Dispense(t *testing.T) { + t.Parallel() + require := require.New(t) + + exited := false + dispenseCalled := 0 + s, c := harness(t) + c.DispenseF = func(_, _ string, _ log.Logger) (loader.PluginInstance, error) { + p := &base.MockPlugin{} + i := &loader.MockInstance{ + ExitedF: func() bool { return exited }, + PluginF: func() interface{} { return p }, + } + dispenseCalled++ + return i, nil + } + + // Retrieve the plugin + logger := testlog.HCLogger(t) + p1, err := s.Dispense("foo", "bar", logger) + require.NotNil(p1) + require.NoError(err) + + i1 := p1.Plugin() + require.NotNil(i1) + require.Equal(1, dispenseCalled) + + // Mark the plugin as exited and retrieve again + exited = true + _, err = s.Dispense("foo", "bar", logger) + require.Error(err) + require.Contains(err.Error(), "exited") + require.Equal(1, dispenseCalled) + + // Mark the plugin as non-exited and retrieve again + exited = false + p2, err := s.Dispense("foo", "bar", logger) + require.NotNil(p2) + require.NoError(err) + require.Equal(2, dispenseCalled) + + i2 := p2.Plugin() + require.NotNil(i2) + if i1 == i2 { + t.Fatalf("i1 and i2 shouldn't be the same instance: %p vs %p", i1, i2) + } +} + +// Test that if a plugin errors while being dispensed, the error is returned but +// not saved +func TestSingleton_DispenseError_Dispense(t *testing.T) { + t.Parallel() + require := require.New(t) + + dispenseCalled := 0 + good := func(_, _ string, _ log.Logger) (loader.PluginInstance, error) { + p := &base.MockPlugin{} + i := &loader.MockInstance{ + ExitedF: func() bool { return false }, + PluginF: func() interface{} { return p }, + } + dispenseCalled++ + return i, nil + } + + bad := func(_, _ string, _ log.Logger) (loader.PluginInstance, error) { + dispenseCalled++ + return nil, fmt.Errorf("bad") + } + + s, c := harness(t) + c.DispenseF = bad + + // Retrieve the plugin + logger := testlog.HCLogger(t) + p1, err := s.Dispense("foo", "bar", logger) + require.Nil(p1) + require.Error(err) + require.Equal(1, dispenseCalled) + + // Dispense again and ensure the same error isn't saved + c.DispenseF = good + p2, err := s.Dispense("foo", "bar", logger) + require.NotNil(p2) + require.NoError(err) + require.Equal(2, dispenseCalled) + + i2 := p2.Plugin() + require.NotNil(i2) +} + +// Test that if a plugin errors while being reattached, the error is returned but +// not saved +func TestSingleton_ReattachError_Dispense(t *testing.T) { + t.Parallel() + require := require.New(t) + + dispenseCalled, reattachCalled := 0, 0 + s, c := harness(t) + c.DispenseF = func(_, _ string, _ log.Logger) (loader.PluginInstance, error) { + p := &base.MockPlugin{} + i := &loader.MockInstance{ + ExitedF: func() bool { return false }, + PluginF: func() interface{} { return p }, + } + dispenseCalled++ + return i, nil + } + c.ReattachF = func(_, _ string, _ *plugin.ReattachConfig) (loader.PluginInstance, error) { + reattachCalled++ + return nil, fmt.Errorf("bad") + } + + // Retrieve the plugin + logger := testlog.HCLogger(t) + p1, err := s.Reattach("foo", "bar", nil) + require.Nil(p1) + require.Error(err) + require.Equal(0, dispenseCalled) + require.Equal(1, reattachCalled) + + // Dispense and ensure the same error isn't saved + p2, err := s.Dispense("foo", "bar", logger) + require.NotNil(p2) + require.NoError(err) + require.Equal(1, dispenseCalled) + require.Equal(1, reattachCalled) + + i2 := p2.Plugin() + require.NotNil(i2) +} + +// Test that after reattaching, dispense returns the same instance +func TestSingleton_Reattach_Dispense(t *testing.T) { + t.Parallel() + require := require.New(t) + + dispenseCalled, reattachCalled := 0, 0 + s, c := harness(t) + c.DispenseF = func(_, _ string, _ log.Logger) (loader.PluginInstance, error) { + dispenseCalled++ + return nil, fmt.Errorf("bad") + } + c.ReattachF = func(_, _ string, _ *plugin.ReattachConfig) (loader.PluginInstance, error) { + p := &base.MockPlugin{} + i := &loader.MockInstance{ + ExitedF: func() bool { return false }, + PluginF: func() interface{} { return p }, + } + reattachCalled++ + return i, nil + } + + // Retrieve the plugin + logger := testlog.HCLogger(t) + p1, err := s.Reattach("foo", "bar", nil) + require.NotNil(p1) + require.NoError(err) + require.Equal(0, dispenseCalled) + require.Equal(1, reattachCalled) + + i1 := p1.Plugin() + require.NotNil(i1) + + // Dispense and ensure the same instance returned + p2, err := s.Dispense("foo", "bar", logger) + require.NotNil(p2) + require.NoError(err) + require.Equal(0, dispenseCalled) + require.Equal(1, reattachCalled) + + i2 := p2.Plugin() + require.NotNil(i2) + if i1 != i2 { + t.Fatalf("i1 and i2 should be the same instance: %p vs %p", i1, i2) + } +}