loader and singleton

This commit is contained in:
Alex Dadgar
2019-01-14 16:50:05 -08:00
committed by Michael Schurter
parent b9f36134dc
commit c19cd2e5cf
35 changed files with 31 additions and 31 deletions

View File

@@ -0,0 +1,54 @@
package singleton
import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/pluginutils/loader"
)
// future is a sharable future for retrieving a plugin instance or any error
// that may have occurred during the creation.
type future struct {
waitCh chan struct{}
id string
err error
instance loader.PluginInstance
}
// newFuture returns a new pull future
func newFuture() *future {
return &future{
waitCh: make(chan struct{}),
id: uuid.Generate(),
}
}
func (f *future) equal(o *future) bool {
if f == nil && o == nil {
return true
} else if f != nil && o != nil {
return f.id == o.id
} else {
return false
}
}
// wait waits till the future has a result
func (f *future) wait() *future {
<-f.waitCh
return f
}
// result returns the results of the future and should only ever be called after
// wait returns.
func (f *future) result() (loader.PluginInstance, error) {
return f.instance, f.err
}
// set is used to set the results and unblock any waiter. This may only be
// called once.
func (f *future) set(instance loader.PluginInstance, err error) {
f.instance = instance
f.err = err
close(f.waitCh)
}

View File

@@ -0,0 +1,125 @@
package singleton
import (
"fmt"
"sync"
log "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/pluginutils/loader"
)
var (
// SingletonPluginExited is returned when the dispense is called and the
// existing plugin has exited. The caller should retry, and this will issue
// a new plugin instance.
SingletonPluginExited = fmt.Errorf("singleton plugin exited")
)
// SingletonLoader is used to only load a single external plugin at a time.
type SingletonLoader struct {
// Loader is the underlying plugin loader that we wrap to give a singleton
// behavior.
loader loader.PluginCatalog
// instances is a mapping of the plugin to a future which holds a plugin
// instance
instances map[loader.PluginID]*future
instanceLock sync.Mutex
// logger is the logger used by the singleton
logger log.Logger
}
// NewSingletonLoader wraps a plugin catalog and provides singleton behavior on
// top by caching running instances.
func NewSingletonLoader(logger log.Logger, catalog loader.PluginCatalog) *SingletonLoader {
return &SingletonLoader{
loader: catalog,
logger: logger.Named("singleton_plugin_loader"),
instances: make(map[loader.PluginID]*future, 4),
}
}
// Catalog returns the catalog of all plugins keyed by plugin type
func (s *SingletonLoader) Catalog() map[string][]*base.PluginInfoResponse {
return s.loader.Catalog()
}
// Dispense returns the plugin given its name and type. This will also
// configure the plugin. If there is an instance of an already running plugin,
// this is used.
func (s *SingletonLoader) Dispense(name, pluginType string, config *base.AgentConfig, logger log.Logger) (loader.PluginInstance, error) {
return s.getPlugin(false, name, pluginType, logger, config, nil)
}
// Reattach is used to reattach to a previously launched external plugin.
func (s *SingletonLoader) Reattach(name, pluginType string, config *plugin.ReattachConfig) (loader.PluginInstance, error) {
return s.getPlugin(true, name, pluginType, nil, nil, config)
}
// getPlugin is a helper that either dispenses or reattaches to a plugin using
// futures to ensure only a single instance is retrieved
func (s *SingletonLoader) getPlugin(reattach bool, name, pluginType string, logger log.Logger,
nomadConfig *base.AgentConfig, config *plugin.ReattachConfig) (loader.PluginInstance, error) {
// Lock the instance map to prevent races
s.instanceLock.Lock()
// Check if there is a future already
id := loader.PluginID{Name: name, PluginType: pluginType}
f, ok := s.instances[id]
// Create the future and go get a plugin
if !ok {
f = newFuture()
s.instances[id] = f
if reattach {
go s.reattach(f, name, pluginType, config)
} else {
go s.dispense(f, name, pluginType, nomadConfig, logger)
}
}
// Unlock so that the created future can be shared
s.instanceLock.Unlock()
i, err := f.wait().result()
if err != nil {
s.clearFuture(id, f)
return nil, err
}
if i.Exited() {
s.clearFuture(id, f)
return nil, SingletonPluginExited
}
return i, nil
}
// dispense should be called in a go routine to not block and creates the
// desired plugin, setting the results in the future.
func (s *SingletonLoader) dispense(f *future, name, pluginType string, config *base.AgentConfig, logger log.Logger) {
i, err := s.loader.Dispense(name, pluginType, config, logger)
f.set(i, err)
}
// reattach should be called in a go routine to not block and reattaches to the
// desired plugin, setting the results in the future.
func (s *SingletonLoader) reattach(f *future, name, pluginType string, config *plugin.ReattachConfig) {
i, err := s.loader.Reattach(name, pluginType, config)
f.set(i, err)
}
// clearFuture clears the future from the instances map only if the futures
// match. This prevents clearing the unintented instance.
func (s *SingletonLoader) clearFuture(id loader.PluginID, f *future) {
s.instanceLock.Lock()
defer s.instanceLock.Unlock()
if f.equal(s.instances[id]) {
delete(s.instances, id)
}
}

View File

@@ -0,0 +1,249 @@
package singleton
import (
"fmt"
"sync"
"testing"
"time"
log "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/pluginutils/loader"
"github.com/stretchr/testify/require"
)
func harness(t *testing.T) (*SingletonLoader, *loader.MockCatalog) {
c := &loader.MockCatalog{}
s := NewSingletonLoader(testlog.HCLogger(t), c)
return s, c
}
// Test that multiple dispenses return the same instance
func TestSingleton_Dispense(t *testing.T) {
t.Parallel()
require := require.New(t)
dispenseCalled := 0
s, c := harness(t)
c.DispenseF = func(_, _ string, _ *base.AgentConfig, _ log.Logger) (loader.PluginInstance, error) {
p := &base.MockPlugin{}
i := &loader.MockInstance{
ExitedF: func() bool { return false },
PluginF: func() interface{} { return p },
}
dispenseCalled++
return i, nil
}
// Retrieve the plugin many times in parallel
const count = 128
var l sync.Mutex
var wg sync.WaitGroup
plugins := make(map[interface{}]struct{}, 1)
waitCh := make(chan struct{})
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
// Wait for unblock
<-waitCh
// Retrieve the plugin
p1, err := s.Dispense("foo", "bar", nil, testlog.HCLogger(t))
require.NotNil(p1)
require.NoError(err)
i1 := p1.Plugin()
require.NotNil(i1)
l.Lock()
plugins[i1] = struct{}{}
l.Unlock()
wg.Done()
}()
}
time.Sleep(10 * time.Millisecond)
close(waitCh)
wg.Wait()
require.Len(plugins, 1)
require.Equal(1, dispenseCalled)
}
// Test that after a plugin is dispensed, if it exits, an error is returned on
// the next dispense
func TestSingleton_Dispense_Exit_Dispense(t *testing.T) {
t.Parallel()
require := require.New(t)
exited := false
dispenseCalled := 0
s, c := harness(t)
c.DispenseF = func(_, _ string, _ *base.AgentConfig, _ log.Logger) (loader.PluginInstance, error) {
p := &base.MockPlugin{}
i := &loader.MockInstance{
ExitedF: func() bool { return exited },
PluginF: func() interface{} { return p },
}
dispenseCalled++
return i, nil
}
// Retrieve the plugin
logger := testlog.HCLogger(t)
p1, err := s.Dispense("foo", "bar", nil, logger)
require.NotNil(p1)
require.NoError(err)
i1 := p1.Plugin()
require.NotNil(i1)
require.Equal(1, dispenseCalled)
// Mark the plugin as exited and retrieve again
exited = true
_, err = s.Dispense("foo", "bar", nil, logger)
require.Error(err)
require.Contains(err.Error(), "exited")
require.Equal(1, dispenseCalled)
// Mark the plugin as non-exited and retrieve again
exited = false
p2, err := s.Dispense("foo", "bar", nil, logger)
require.NotNil(p2)
require.NoError(err)
require.Equal(2, dispenseCalled)
i2 := p2.Plugin()
require.NotNil(i2)
if i1 == i2 {
t.Fatalf("i1 and i2 shouldn't be the same instance: %p vs %p", i1, i2)
}
}
// Test that if a plugin errors while being dispensed, the error is returned but
// not saved
func TestSingleton_DispenseError_Dispense(t *testing.T) {
t.Parallel()
require := require.New(t)
dispenseCalled := 0
good := func(_, _ string, _ *base.AgentConfig, _ log.Logger) (loader.PluginInstance, error) {
p := &base.MockPlugin{}
i := &loader.MockInstance{
ExitedF: func() bool { return false },
PluginF: func() interface{} { return p },
}
dispenseCalled++
return i, nil
}
bad := func(_, _ string, _ *base.AgentConfig, _ log.Logger) (loader.PluginInstance, error) {
dispenseCalled++
return nil, fmt.Errorf("bad")
}
s, c := harness(t)
c.DispenseF = bad
// Retrieve the plugin
logger := testlog.HCLogger(t)
p1, err := s.Dispense("foo", "bar", nil, logger)
require.Nil(p1)
require.Error(err)
require.Equal(1, dispenseCalled)
// Dispense again and ensure the same error isn't saved
c.DispenseF = good
p2, err := s.Dispense("foo", "bar", nil, logger)
require.NotNil(p2)
require.NoError(err)
require.Equal(2, dispenseCalled)
i2 := p2.Plugin()
require.NotNil(i2)
}
// Test that if a plugin errors while being reattached, the error is returned but
// not saved
func TestSingleton_ReattachError_Dispense(t *testing.T) {
t.Parallel()
require := require.New(t)
dispenseCalled, reattachCalled := 0, 0
s, c := harness(t)
c.DispenseF = func(_, _ string, _ *base.AgentConfig, _ log.Logger) (loader.PluginInstance, error) {
p := &base.MockPlugin{}
i := &loader.MockInstance{
ExitedF: func() bool { return false },
PluginF: func() interface{} { return p },
}
dispenseCalled++
return i, nil
}
c.ReattachF = func(_, _ string, _ *plugin.ReattachConfig) (loader.PluginInstance, error) {
reattachCalled++
return nil, fmt.Errorf("bad")
}
// Retrieve the plugin
logger := testlog.HCLogger(t)
p1, err := s.Reattach("foo", "bar", nil)
require.Nil(p1)
require.Error(err)
require.Equal(0, dispenseCalled)
require.Equal(1, reattachCalled)
// Dispense and ensure the same error isn't saved
p2, err := s.Dispense("foo", "bar", nil, logger)
require.NotNil(p2)
require.NoError(err)
require.Equal(1, dispenseCalled)
require.Equal(1, reattachCalled)
i2 := p2.Plugin()
require.NotNil(i2)
}
// Test that after reattaching, dispense returns the same instance
func TestSingleton_Reattach_Dispense(t *testing.T) {
t.Parallel()
require := require.New(t)
dispenseCalled, reattachCalled := 0, 0
s, c := harness(t)
c.DispenseF = func(_, _ string, _ *base.AgentConfig, _ log.Logger) (loader.PluginInstance, error) {
dispenseCalled++
return nil, fmt.Errorf("bad")
}
c.ReattachF = func(_, _ string, _ *plugin.ReattachConfig) (loader.PluginInstance, error) {
p := &base.MockPlugin{}
i := &loader.MockInstance{
ExitedF: func() bool { return false },
PluginF: func() interface{} { return p },
}
reattachCalled++
return i, nil
}
// Retrieve the plugin
logger := testlog.HCLogger(t)
p1, err := s.Reattach("foo", "bar", nil)
require.NotNil(p1)
require.NoError(err)
require.Equal(0, dispenseCalled)
require.Equal(1, reattachCalled)
i1 := p1.Plugin()
require.NotNil(i1)
// Dispense and ensure the same instance returned
p2, err := s.Dispense("foo", "bar", nil, logger)
require.NotNil(p2)
require.NoError(err)
require.Equal(0, dispenseCalled)
require.Equal(1, reattachCalled)
i2 := p2.Plugin()
require.NotNil(i2)
if i1 != i2 {
t.Fatalf("i1 and i2 should be the same instance: %p vs %p", i1, i2)
}
}