diff --git a/client/allocrunner/taskrunner/lazy_handle.go b/client/allocrunner/taskrunner/lazy_handle.go new file mode 100644 index 000000000..a3dbaed6b --- /dev/null +++ b/client/allocrunner/taskrunner/lazy_handle.go @@ -0,0 +1,152 @@ +package taskrunner + +import ( + "context" + "fmt" + "sync" + "time" + + log "github.com/hashicorp/go-hclog" + cstructs "github.com/hashicorp/nomad/client/structs" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" +) + +const ( + // retrieveBackoffBaseline is the baseline time for exponential backoff while + // retrieving a handle. + retrieveBackoffBaseline = 250 * time.Millisecond + + // retrieveBackoffLimit is the limit of the exponential backoff for + // retrieving a handle. + retrieveBackoffLimit = 5 * time.Second + + // retrieveFailureLimit is how many times we will attempt to retrieve a + // new handle before giving up. + retrieveFailureLimit = 5 +) + +// retrieveHandleFn is used to retrieve the latest driver handle +type retrieveHandleFn func() *DriverHandle + +// LazyHandle is used to front calls to a DriverHandle where it is expected the +// existing handle may no longer be valid because the backing plugin has +// shutdown. LazyHandle detects the plugin shutting down and retrieves a new +// handle so that the consumer does not need to worry whether the handle is to +// the latest driver instance. +type LazyHandle struct { + // retrieveHandle is used to retrieve the latest handle + retrieveHandle retrieveHandleFn + + // h is the current handle and may be nil + h *DriverHandle + + // shutdownCtx is used to cancel retries if the agent is shutting down + shutdownCtx context.Context + + logger log.Logger + sync.Mutex +} + +// NewLazyHandle takes the function to receive the latest handle and a logger +// and returns a LazyHandle +func NewLazyHandle(shutdownCtx context.Context, fn retrieveHandleFn, logger log.Logger) *LazyHandle { + return &LazyHandle{ + retrieveHandle: fn, + h: fn(), + shutdownCtx: shutdownCtx, + logger: logger.Named("lazy_handle"), + } +} + +// getHandle returns the current handle or retrieves a new one +func (l *LazyHandle) getHandle() (*DriverHandle, error) { + l.Lock() + defer l.Unlock() + + if l.h != nil { + return l.h, nil + } + + return l.refreshHandleLocked() +} + +// refreshHandle retrieves a new handle +func (l *LazyHandle) refreshHandle() (*DriverHandle, error) { + l.Lock() + defer l.Unlock() + return l.refreshHandleLocked() +} + +// refreshHandleLocked retrieves a new handle and should be called with the lock +// held. It will retry to give the client time to restart the driver and restore +// the handle. +func (l *LazyHandle) refreshHandleLocked() (*DriverHandle, error) { + for i := 0; i < retrieveFailureLimit; i++ { + l.h = l.retrieveHandle() + if l.h != nil { + return l.h, nil + } + + // Calculate the new backoff + backoff := (1 << (2 * uint64(i))) * retrieveBackoffBaseline + if backoff > retrieveBackoffLimit { + backoff = retrieveBackoffLimit + } + + l.logger.Debug("failed to retrieve handle", "backoff", backoff) + + select { + case <-l.shutdownCtx.Done(): + return nil, l.shutdownCtx.Err() + case <-time.After(backoff): + } + } + + return nil, fmt.Errorf("no driver handle") +} + +func (l *LazyHandle) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) { + h, err := l.getHandle() + if err != nil { + return nil, 0, err + } + + // Only retry once + first := true + +TRY: + out, c, err := h.Exec(timeout, cmd, args) + if err == bstructs.ErrPluginShutdown && first { + first = false + + h, err = l.refreshHandle() + if err == nil { + goto TRY + } + } + + return out, c, err +} + +func (l *LazyHandle) Stats() (*cstructs.TaskResourceUsage, error) { + h, err := l.getHandle() + if err != nil { + return nil, err + } + + // Only retry once + first := true + +TRY: + out, err := h.Stats() + if err == bstructs.ErrPluginShutdown && first { + first = false + + h, err = l.refreshHandle() + if err == nil { + goto TRY + } + } + + return out, err +} diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index 01733e495..4a0362c0b 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -2,13 +2,13 @@ package taskrunner import ( "context" - "strings" "sync" "time" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" cstructs "github.com/hashicorp/nomad/client/structs" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" ) // StatsUpdater is the interface required by the StatsHook to update stats. @@ -99,11 +99,11 @@ func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, sto return } - //XXX This is a net/rpc specific error - // We do not log when the plugin is shutdown as this is simply a - // race between the stopCollection channel being closed and calling - // Stats on the handle. - if !strings.Contains(err.Error(), "connection is shut down") { + // We do not log when the plugin is shutdown since this is + // likely because the driver plugin has unexpectedly exited, + // in which case sleeping and trying again or returning based + // on the stop channel is the correct behavior + if err != bstructs.ErrPluginShutdown { h.logger.Debug("error fetching stats of task", "error", err) } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 88eedf3c8..e120f0ca5 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -28,9 +28,10 @@ import ( "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" ) const ( @@ -408,8 +409,10 @@ MAIN: } // Grab the result proxy and wait for task to exit + WAIT: { handle := tr.getDriverHandle() + result = nil // Do *not* use tr.killCtx here as it would cause // Wait() to unblock before the task exits when Kill() @@ -418,12 +421,15 @@ MAIN: tr.logger.Error("wait task failed", "error", err) } else { select { - case result = <-resultCh: - // WaitCh returned a result - tr.handleTaskExitResult(result) case <-tr.ctx.Done(): // TaskRunner was told to exit immediately return + case result = <-resultCh: + } + + // WaitCh returned a result + if retryWait := tr.handleTaskExitResult(result); retryWait { + goto WAIT } } } @@ -467,9 +473,37 @@ MAIN: tr.logger.Debug("task run loop exiting") } -func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) { +// handleTaskExitResult handles the results returned by the task exiting. If +// retryWait is true, the caller should attempt to wait on the task again since +// it has not actually finished running. This can happen if the driver plugin +// has exited. +func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) (retryWait bool) { if result == nil { - return + return false + } + + if result.Err == bstructs.ErrPluginShutdown { + dn := tr.Task().Driver + tr.logger.Debug("driver plugin has shutdown; attempting to recover task", "driver", dn) + + // Initialize a new driver handle + if err := tr.initDriver(); err != nil { + tr.logger.Error("failed to initialize driver after it exited unexpectedly", "error", err, "driver", dn) + return false + } + + // Try to restore the handle + tr.stateLock.RLock() + h := tr.localState.TaskHandle + net := tr.localState.DriverNetwork + tr.stateLock.RUnlock() + if !tr.restoreHandle(h, net) { + tr.logger.Error("failed to restore handle on driver after it exited unexpectedly", "driver", dn) + return false + } + + tr.logger.Debug("task successfully recovered on driver", "driver", dn) + return true } event := structs.NewTaskEvent(structs.TaskTerminated). @@ -483,6 +517,8 @@ func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) { if result.OOMKilled && !tr.clientConfig.DisableTaggedMetrics { metrics.IncrCounterWithLabels([]string{"client", "allocs", "oom_killed"}, 1, tr.baseLabels) } + + return false } // handleUpdates runs update hooks when triggerUpdateCh is ticked and exits @@ -530,7 +566,6 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { // runDriver runs the driver and waits for it to exit func (tr *TaskRunner) runDriver() error { - // TODO(nickethier): make sure this uses alloc.AllocatedResources once #4750 is rebased taskConfig := tr.buildTaskConfig() // Build hcl context variables @@ -556,10 +591,10 @@ func (tr *TaskRunner) runDriver() error { evalCtx := &hcl.EvalContext{ Variables: vars, - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } - val, diag := shared.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx) + val, diag := hclutils.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx) if diag.HasErrors() { return multierror.Append(errors.New("failed to parse config"), diag.Errs()...) } @@ -568,8 +603,6 @@ func (tr *TaskRunner) runDriver() error { return fmt.Errorf("failed to encode driver config: %v", err) } - //XXX Evaluate and encode driver config - // If there's already a task handle (eg from a Restore) there's nothing // to do except update state. if tr.getDriverHandle() != nil { @@ -586,7 +619,20 @@ func (tr *TaskRunner) runDriver() error { // Start the job if there's no existing handle (or if RecoverTask failed) handle, net, err := tr.driver.StartTask(taskConfig) if err != nil { - return fmt.Errorf("driver start failed: %v", err) + // The plugin has died, try relaunching it + if err == bstructs.ErrPluginShutdown { + tr.logger.Info("failed to start task because plugin shutdown unexpectedly; attempting to recover") + if err := tr.initDriver(); err != nil { + return fmt.Errorf("failed to initialize driver after it exited unexpectedly: %v", err) + } + + handle, net, err = tr.driver.StartTask(taskConfig) + if err != nil { + return fmt.Errorf("failed to start task after driver exited unexpectedly: %v", err) + } + } else { + return fmt.Errorf("driver start failed: %v", err) + } } tr.stateLock.Lock() @@ -735,16 +781,16 @@ func (tr *TaskRunner) Restore() error { // restoreHandle ensures a TaskHandle is valid by calling Driver.RecoverTask // and sets the driver handle. If the TaskHandle is not valid, DestroyTask is // called. -func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstructs.DriverNetwork) { +func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstructs.DriverNetwork) (success bool) { // Ensure handle is well-formed if taskHandle.Config == nil { - return + return true } if err := tr.driver.RecoverTask(taskHandle); err != nil { if tr.TaskState().State != structs.TaskStateRunning { // RecoverTask should fail if the Task wasn't running - return + return true } tr.logger.Error("error recovering task; cleaning up", @@ -760,14 +806,15 @@ func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstruct "error", err, "task_id", taskHandle.Config.ID) } + return false } - return + return true } // Update driver handle on task runner tr.setDriverHandle(NewDriverHandle(tr.driver, taskHandle.Config.ID, tr.Task(), net)) - return + return true } // UpdateState sets the task runners allocation state and triggers a server diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 6aa0da23a..e1ec1d675 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -254,6 +254,10 @@ func (tr *TaskRunner) poststart() error { handle := tr.getDriverHandle() net := handle.Network() + // Pass the lazy handle to the hooks so even if the driver exits and we + // launch a new one (external plugin), the handle will refresh. + lazyHandle := NewLazyHandle(tr.ctx, tr.getDriverHandle, tr.logger) + var merr multierror.Error for _, hook := range tr.runnerHooks { post, ok := hook.(interfaces.TaskPoststartHook) @@ -269,9 +273,9 @@ func (tr *TaskRunner) poststart() error { } req := interfaces.TaskPoststartRequest{ - DriverExec: handle, + DriverExec: lazyHandle, DriverNetwork: net, - DriverStats: handle, + DriverStats: lazyHandle, TaskEnv: tr.envBuilder.Build(), } var resp interfaces.TaskPoststartResponse diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 0ab5be2ff..a28d9591e 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -337,3 +337,90 @@ func TestTaskRunner_Restore_HookEnv(t *testing.T) { require.Contains(env, "mock_hook") require.Equal("1", env["mock_hook"]) } + +// This test asserts that we can recover from an "external" plugin exiting by +// retrieving a new instance of the driver and recovering the task. +func TestTaskRunner_RecoverFromDriverExiting(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Create an allocation using the mock driver that exits simulating the + // driver crashing. We can then test that the task runner recovers from this + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "plugin_exit_after": "1s", + "run_for": "5s", + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + conf.StateDB = cstate.NewMemDB() // "persist" state between prestart calls + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(err) + + start := time.Now() + go tr.Run() + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for the task to be running + testWaitForTaskToStart(t, tr) + + // Get the task ID + tr.stateLock.RLock() + l := tr.localState.TaskHandle + require.NotNil(l) + require.NotNil(l.Config) + require.NotEmpty(l.Config.ID) + id := l.Config.ID + tr.stateLock.RUnlock() + + // Get the mock driver plugin + driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name) + require.NoError(err) + mockDriver := driverPlugin.(*mockdriver.Driver) + + // Wait for the task to start + testutil.WaitForResult(func() (bool, error) { + // Get the handle and check that it was recovered + handle := mockDriver.GetHandle(id) + if handle == nil { + return false, fmt.Errorf("nil handle") + } + if !handle.Recovered { + return false, fmt.Errorf("handle not recovered") + } + return true, nil + }, func(err error) { + t.Fatal(err.Error()) + }) + + // Wait for task to complete + select { + case <-tr.WaitCh(): + case <-time.After(10 * time.Second): + } + + // Ensure that we actually let the task complete + require.True(time.Now().Sub(start) > 5*time.Second) + + // Check it finished successfully + state := tr.TaskState() + require.True(state.Successful()) +} + +// testWaitForTaskToStart waits for the task to or fails the test +func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { + // Wait for the task to start + testutil.WaitForResult(func() (bool, error) { + tr.stateLock.RLock() + started := !tr.state.StartedAt.IsZero() + tr.stateLock.RUnlock() + + return started, nil + }, func(err error) { + t.Fatalf("not started") + }) +} diff --git a/client/client.go b/client/client.go index 7abadd425..df1b75eae 100644 --- a/client/client.go +++ b/client/client.go @@ -622,9 +622,6 @@ func (c *Client) Shutdown() error { } c.logger.Info("shutting down") - // Shutdown the plugin managers - c.pluginManagers.Shutdown() - // Stop renewing tokens and secrets if c.vaultClient != nil { c.vaultClient.Stop() @@ -649,6 +646,9 @@ func (c *Client) Shutdown() error { } arGroup.Wait() + // Shutdown the plugin managers + c.pluginManagers.Shutdown() + c.shutdown = true close(c.shutdownCh) diff --git a/client/devicemanager/instance.go b/client/devicemanager/instance.go index f834062e1..0837abf10 100644 --- a/client/devicemanager/instance.go +++ b/client/devicemanager/instance.go @@ -10,6 +10,7 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" "github.com/hashicorp/nomad/plugins/device" "github.com/hashicorp/nomad/plugins/shared/loader" "github.com/hashicorp/nomad/plugins/shared/singleton" @@ -363,7 +364,7 @@ START: // Handle any errors if fresp.Error != nil { - if fresp.Error == base.ErrPluginShutdown { + if fresp.Error == bstructs.ErrPluginShutdown { i.logger.Error("plugin exited unexpectedly") goto START } @@ -488,7 +489,7 @@ START: // Handle any errors if sresp.Error != nil { - if sresp.Error == base.ErrPluginShutdown { + if sresp.Error == bstructs.ErrPluginShutdown { i.logger.Error("plugin exited unexpectedly") goto START } diff --git a/client/logmon/logmon.go b/client/logmon/logmon.go index 92fff1972..52d804894 100644 --- a/client/logmon/logmon.go +++ b/client/logmon/logmon.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "strings" + "sync" "time" hclog "github.com/hashicorp/go-hclog" @@ -84,12 +85,22 @@ type TaskLogger struct { } func (tl *TaskLogger) Close() { + var wg sync.WaitGroup if tl.lro != nil { - tl.lro.Close() + wg.Add(1) + go func() { + tl.lro.Close() + wg.Done() + }() } if tl.lre != nil { - tl.lre.Close() + wg.Add(1) + go func() { + tl.lre.Close() + wg.Done() + }() } + wg.Wait() } func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) { diff --git a/client/logmon/plugin.go b/client/logmon/plugin.go index 3e0db7317..3e4632ccf 100644 --- a/client/logmon/plugin.go +++ b/client/logmon/plugin.go @@ -15,7 +15,7 @@ import ( // LaunchLogMon an instance of logmon // TODO: Integrate with base plugin loader func LaunchLogMon(logger hclog.Logger) (LogMon, *plugin.Client, error) { - logger = logger.Named("logmon-launcher") + logger = logger.Named("logmon") bin, err := discover.NomadExecutable() if err != nil { return nil, nil, err @@ -24,12 +24,13 @@ func LaunchLogMon(logger hclog.Logger) (LogMon, *plugin.Client, error) { client := plugin.NewClient(&plugin.ClientConfig{ HandshakeConfig: base.Handshake, Plugins: map[string]plugin.Plugin{ - "logmon": NewPlugin(NewLogMon(hclog.L().Named("logmon"))), + "logmon": &Plugin{}, }, Cmd: exec.Command(bin, "logmon"), AllowedProtocols: []plugin.Protocol{ plugin.ProtocolGRPC, }, + Logger: logger, }) rpcClient, err := client.Client() diff --git a/client/pluginmanager/drivermanager/instance.go b/client/pluginmanager/drivermanager/instance.go index a9f21eac6..fe42c196e 100644 --- a/client/pluginmanager/drivermanager/instance.go +++ b/client/pluginmanager/drivermanager/instance.go @@ -9,6 +9,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/shared/loader" "github.com/hashicorp/nomad/plugins/shared/singleton" @@ -447,6 +448,11 @@ func (i *instanceManager) handleEvents() { // handleEvent looks up the event handler(s) for the event and runs them func (i *instanceManager) handleEvent(ev *drivers.TaskEvent) { + // Do not emit that the plugin is shutdown + if ev.Err != nil && ev.Err == bstructs.ErrPluginShutdown { + return + } + if handler := i.eventHandlerFactory(ev.AllocID, ev.TaskName); handler != nil { i.logger.Trace("task event received", "event", ev) handler(ev) diff --git a/client/structs/structs.go b/client/structs/structs.go index 8f6da2d79..6edcbf8d0 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -180,6 +180,10 @@ type MemoryStats struct { } func (ms *MemoryStats) Add(other *MemoryStats) { + if other == nil { + return + } + ms.RSS += other.RSS ms.Cache += other.Cache ms.Swap += other.Swap @@ -203,6 +207,10 @@ type CpuStats struct { } func (cs *CpuStats) Add(other *CpuStats) { + if other == nil { + return + } + cs.SystemMode += other.SystemMode cs.UserMode += other.UserMode cs.TotalTicks += other.TotalTicks @@ -229,7 +237,7 @@ func (ru *ResourceUsage) Add(other *ResourceUsage) { // and the resource usage of the individual pids type TaskResourceUsage struct { ResourceUsage *ResourceUsage - Timestamp int64 + Timestamp int64 // UnixNano Pids map[string]*ResourceUsage } diff --git a/client/taskenv/util.go b/client/taskenv/util.go index 561b03709..1686a100d 100644 --- a/client/taskenv/util.go +++ b/client/taskenv/util.go @@ -14,28 +14,11 @@ var ( ErrInvalidObjectPath = errors.New("invalid object path") ) -type ErrKeyExists struct { - msg string -} - -func NewErrKeyExists(newKey, oldKey string) *ErrKeyExists { - return &ErrKeyExists{ - msg: fmt.Sprintf( - "cannot add key %q because %q already exists with a different type", - newKey, oldKey, - ), - } -} - -func (e *ErrKeyExists) Error() string { - return e.msg -} - // addNestedKey expands keys into their nested form: // // k="foo.bar", v="quux" -> {"foo": {"bar": "quux"}} // -// Existing keys are overwritten. +// Existing keys are overwritten. Map values take precedence over primitives. // // If the key has dots but cannot be converted to a valid nested data structure // (eg "foo...bar", "foo.", or non-object value exists for key), an error is @@ -64,13 +47,17 @@ func addNestedKey(dst map[string]interface{}, k, v string) error { var target map[string]interface{} if existingI, ok := dst[newKey]; ok { - existing, ok := existingI.(map[string]interface{}) - if !ok { - // Existing value is not a map, unable to support this key - cleanup() - return NewErrKeyExists(k, newKey) + if existing, ok := existingI.(map[string]interface{}); ok { + // Target already exists + target = existing + } else { + // Existing value is not a map. Maps should + // take precedence over primitive values (eg + // overwrite attr.driver.qemu = "1" with + // attr.driver.qemu.version = "...") + target = make(map[string]interface{}) + dst[newKey] = target } - target = existing } else { // Does not exist, create target = make(map[string]interface{}) @@ -96,6 +83,13 @@ func addNestedKey(dst map[string]interface{}, k, v string) error { return ErrInvalidObjectPath } + if existingI, ok := dst[newKey]; ok { + if _, ok := existingI.(map[string]interface{}); ok { + // Existing value is a map which takes precedence over + // a primitive value. Drop primitive. + return nil + } + } dst[newKey] = v return nil } diff --git a/client/taskenv/util_test.go b/client/taskenv/util_test.go index c9246713a..e97cc5716 100644 --- a/client/taskenv/util_test.go +++ b/client/taskenv/util_test.go @@ -53,7 +53,7 @@ func TestAddNestedKey_Ok(t *testing.T) { }, }, { - // Nested object b should get overwritten with "x" + // Nested object b should take precedence over values M: map[string]interface{}{ "a": map[string]interface{}{ "b": map[string]interface{}{ @@ -64,7 +64,9 @@ func TestAddNestedKey_Ok(t *testing.T) { K: "a.b", Result: map[string]interface{}{ "a": map[string]interface{}{ - "b": "x", + "b": map[string]interface{}{ + "c": "c", + }, }, }, }, @@ -123,6 +125,81 @@ func TestAddNestedKey_Ok(t *testing.T) { }, }, }, + // Regardless of whether attr.driver.qemu = "1" is added first + // or second, attr.driver.qemu.version = "..." should take + // precedence (nested maps take precedence over values) + { + M: map[string]interface{}{ + "attr": map[string]interface{}{ + "driver": map[string]interface{}{ + "qemu": "1", + }, + }, + }, + K: "attr.driver.qemu.version", + Result: map[string]interface{}{ + "attr": map[string]interface{}{ + "driver": map[string]interface{}{ + "qemu": map[string]interface{}{ + "version": "x", + }, + }, + }, + }, + }, + { + M: map[string]interface{}{ + "attr": map[string]interface{}{ + "driver": map[string]interface{}{ + "qemu": map[string]interface{}{ + "version": "1.2.3", + }, + }, + }, + }, + K: "attr.driver.qemu", + Result: map[string]interface{}{ + "attr": map[string]interface{}{ + "driver": map[string]interface{}{ + "qemu": map[string]interface{}{ + "version": "1.2.3", + }, + }, + }, + }, + }, + { + M: map[string]interface{}{ + "a": "a", + }, + K: "a.b", + Result: map[string]interface{}{ + "a": map[string]interface{}{ + "b": "x", + }, + }, + }, + { + M: map[string]interface{}{ + "a": "a", + "foo": map[string]interface{}{ + "b": "b", + "bar": "quux", + }, + "c": map[string]interface{}{}, + }, + K: "foo.bar.quux", + Result: map[string]interface{}{ + "a": "a", + "foo": map[string]interface{}{ + "b": "b", + "bar": map[string]interface{}{ + "quux": "x", + }, + }, + "c": map[string]interface{}{}, + }, + }, } for i := range cases { @@ -234,40 +311,6 @@ func TestAddNestedKey_Bad(t *testing.T) { K: "foo.bar..quux", Result: ErrInvalidObjectPath, }, - { - M: func() map[string]interface{} { - return map[string]interface{}{ - "a": "a", - } - }, - K: "a.b", - Result: NewErrKeyExists("a.b", "a"), - }, - { - M: func() map[string]interface{} { - return map[string]interface{}{ - "a": "a", - "foo": map[string]interface{}{ - "b": "b", - "bar": "quux", - }, - "c": map[string]interface{}{}, - } - }, - K: "foo.bar.quux", - Result: NewErrKeyExists("foo.bar.quux", "bar"), - }, - { - M: func() map[string]interface{} { - return map[string]interface{}{ - "a": map[string]interface{}{ - "b": "b", - }, - } - }, - K: "a.b.c.", - Result: NewErrKeyExists("a.b.c.", "b"), - }, } for i := range cases { diff --git a/command/executor_plugin.go b/command/executor_plugin.go index da75450e0..7e7522bc5 100644 --- a/command/executor_plugin.go +++ b/command/executor_plugin.go @@ -6,7 +6,7 @@ import ( "strings" hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" + plugin "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/base" diff --git a/command/logmon_plugin.go b/command/logmon_plugin.go index f9329e423..57d7eb8c4 100644 --- a/command/logmon_plugin.go +++ b/command/logmon_plugin.go @@ -25,10 +25,15 @@ func (e *LogMonPluginCommand) Synopsis() string { } func (e *LogMonPluginCommand) Run(args []string) int { + logger := hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + JSONFormat: true, + Name: "logmon", + }) plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: base.Handshake, Plugins: map[string]plugin.Plugin{ - "logmon": logmon.NewPlugin(logmon.NewLogMon(hclog.Default().Named("logmon"))), + "logmon": logmon.NewPlugin(logmon.NewLogMon(logger)), }, GRPCServer: plugin.DefaultGRPCServer, }) diff --git a/drivers/docker/cmd/main.go b/drivers/docker/cmd/main.go new file mode 100644 index 000000000..e0f898f3a --- /dev/null +++ b/drivers/docker/cmd/main.go @@ -0,0 +1,44 @@ +// This package provides a mechanism to build the Docker driver plugin as an +// external binary. The binary has two entry points; the docker driver and the +// docker plugin's logging child binary. An example of using this is `go build +// -o 1 { + // Detect if we are being launched as a docker logging plugin + switch os.Args[1] { + case docklog.PluginName: + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: base.Handshake, + Plugins: map[string]plugin.Plugin{ + docklog.PluginName: docklog.NewPlugin(docklog.NewDockerLogger(log.Default().Named(docklog.PluginName))), + }, + GRPCServer: plugin.DefaultGRPCServer, + }) + + return + } + } + + // Serve the plugin + plugins.Serve(factory) +} + +// factory returns a new instance of the docker driver plugin +func factory(log log.Logger) interface{} { + return docker.NewDockerDriver(log) +} diff --git a/drivers/docker/fingerprint.go b/drivers/docker/fingerprint.go index 85ca9bd5f..d20550153 100644 --- a/drivers/docker/fingerprint.go +++ b/drivers/docker/fingerprint.go @@ -34,14 +34,14 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { fp := &drivers.Fingerprint{ Attributes: map[string]*pstructs.Attribute{}, Health: drivers.HealthStateHealthy, - HealthDescription: "healthy", + HealthDescription: drivers.DriverHealthy, } client, _, err := d.dockerClients() if err != nil { d.logger.Info("failed to initialize client", "error", err) return &drivers.Fingerprint{ Health: drivers.HealthStateUndetected, - HealthDescription: "failed to initialize docker client", + HealthDescription: "Failed to initialize docker client", } } @@ -50,7 +50,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { d.logger.Debug("could not connect to docker daemon", "endpoint", client.Endpoint(), "error", err) return &drivers.Fingerprint{ Health: drivers.HealthStateUnhealthy, - HealthDescription: "failed to connect to docker daemon", + HealthDescription: "Failed to connect to docker daemon", } } diff --git a/drivers/exec/driver.go b/drivers/exec/driver.go index 4bbd337be..9f8c764a0 100644 --- a/drivers/exec/driver.go +++ b/drivers/exec/driver.go @@ -189,7 +189,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { fp := &drivers.Fingerprint{ Attributes: map[string]*pstructs.Attribute{}, Health: drivers.HealthStateHealthy, - HealthDescription: "healthy", + HealthDescription: drivers.DriverHealthy, } if !utils.IsUnixRoot() { @@ -201,14 +201,14 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { mount, err := fingerprint.FindCgroupMountpointDir() if err != nil { fp.Health = drivers.HealthStateUnhealthy - fp.HealthDescription = "failed to discover cgroup mount point" + fp.HealthDescription = drivers.NoCgroupMountMessage d.logger.Warn(fp.HealthDescription, "error", err) return fp } if mount == "" { fp.Health = drivers.HealthStateUnhealthy - fp.HealthDescription = "requires cgroup" + fp.HealthDescription = drivers.CgroupMountEmpty return fp } diff --git a/drivers/exec/driver_test.go b/drivers/exec/driver_test.go index eee5c426e..dec127da4 100644 --- a/drivers/exec/driver_test.go +++ b/drivers/exec/driver_test.go @@ -21,8 +21,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" @@ -606,11 +606,11 @@ touch: cannot touch '/tmp/task-path-ro/testfile-from-ro': Read-only file system` func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(diag.HasErrors()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) require.False(diag.HasErrors()) err := task.EncodeDriverConfig(taskConfigCtyVal) require.Nil(err) diff --git a/drivers/java/driver.go b/drivers/java/driver.go index 1b561697a..b36699b16 100644 --- a/drivers/java/driver.go +++ b/drivers/java/driver.go @@ -196,7 +196,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { fp := &drivers.Fingerprint{ Attributes: map[string]*pstructs.Attribute{}, Health: drivers.HealthStateHealthy, - HealthDescription: "healthy", + HealthDescription: drivers.DriverHealthy, } if runtime.GOOS == "linux" { @@ -210,14 +210,14 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { mount, err := fingerprint.FindCgroupMountpointDir() if err != nil { fp.Health = drivers.HealthStateUnhealthy - fp.HealthDescription = "failed to discover cgroup mount point" + fp.HealthDescription = drivers.NoCgroupMountMessage d.logger.Warn(fp.HealthDescription, "error", err) return fp } if mount == "" { fp.Health = drivers.HealthStateUnhealthy - fp.HealthDescription = "cgroups are unavailable" + fp.HealthDescription = drivers.CgroupMountEmpty return fp } } diff --git a/drivers/java/driver_test.go b/drivers/java/driver_test.go index 89966c571..ce0ab2221 100644 --- a/drivers/java/driver_test.go +++ b/drivers/java/driver_test.go @@ -19,8 +19,8 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" ) @@ -272,11 +272,11 @@ func encodeDriverHelper(t *testing.T, task *drivers.TaskConfig, taskConfig map[s t.Helper() evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(t, diag.HasErrors()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) require.Empty(t, diag.Errs()) err := task.EncodeDriverConfig(taskConfigCtyVal) require.Nil(t, err) diff --git a/drivers/lxc/driver_test.go b/drivers/lxc/driver_test.go index 8a79510a1..313c9e480 100644 --- a/drivers/lxc/driver_test.go +++ b/drivers/lxc/driver_test.go @@ -19,8 +19,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" lxc "gopkg.in/lxc/go-lxc.v2" @@ -269,11 +269,11 @@ func requireLXC(t *testing.T) { func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(diag.HasErrors()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) require.False(diag.HasErrors()) err := task.EncodeDriverConfig(taskConfigCtyVal) require.Nil(err) diff --git a/drivers/mock/driver.go b/drivers/mock/driver.go index 1593e04fc..07b7344a6 100644 --- a/drivers/mock/driver.go +++ b/drivers/mock/driver.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/rand" "strconv" "strings" "sync" @@ -67,6 +68,7 @@ var ( "start_error_recoverable": hclspec.NewAttr("start_error_recoverable", "bool", false), "start_block_for": hclspec.NewAttr("start_block_for", "string", false), "kill_after": hclspec.NewAttr("kill_after", "string", false), + "plugin_exit_after": hclspec.NewAttr("plugin_exit_after", "string", false), "run_for": hclspec.NewAttr("run_for", "string", false), "exit_code": hclspec.NewAttr("exit_code", "number", false), "exit_signal": hclspec.NewAttr("exit_signal", "number", false), @@ -153,6 +155,10 @@ type Config struct { // TaskConfig is the driver configuration of a task within a job type TaskConfig struct { + // PluginExitAfter is the duration after which the mock driver indicates the + // plugin has exited via the WaitTask call. + PluginExitAfter string `codec:"plugin_exit_after"` + pluginExitAfterDuration time.Duration // StartErr specifies the error that should be returned when starting the // mock driver. @@ -213,8 +219,7 @@ type TaskConfig struct { } type MockTaskState struct { - TaskConfig *drivers.TaskConfig - StartedAt time.Time + StartedAt time.Time } func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { @@ -279,7 +284,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } else { health = drivers.HealthStateHealthy attrs["driver.mock"] = pstructs.NewBoolAttribute(true) - desc = "ready" + desc = drivers.DriverHealthy } return &drivers.Fingerprint{ @@ -289,42 +294,95 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } } -func (d *Driver) RecoverTask(h *drivers.TaskHandle) error { - if h == nil { +func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { + if handle == nil { return fmt.Errorf("handle cannot be nil") } - if _, ok := d.tasks.Get(h.Config.ID); ok { - d.logger.Debug("nothing to recover; task already exists", - "task_id", h.Config.ID, - "task_name", h.Config.Name, - ) - return nil + // Unmarshall the driver state and create a new handle + var taskState MockTaskState + if err := handle.GetDriverState(&taskState); err != nil { + d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to decode task state from handle: %v", err) } - // Recovering a task requires the task to be running external to the - // plugin. Since the mock_driver runs all tasks in process it cannot - // recover tasks. - return fmt.Errorf("%s cannot recover tasks", pluginName) + driverCfg, err := parseDriverConfig(handle.Config) + if err != nil { + d.logger.Error("failed to parse driver config from handle", "error", err, "task_id", handle.Config.ID, "config", hclog.Fmt("%+v", handle.Config)) + return fmt.Errorf("failed to parse driver config from handle: %v", err) + } + + // Remove the plugin exit time if set + driverCfg.pluginExitAfterDuration = 0 + + // Correct the run_for time based on how long it has already been running + now := time.Now() + driverCfg.runForDuration = driverCfg.runForDuration - now.Sub(taskState.StartedAt) + + h := newTaskHandle(handle.Config, driverCfg, d.logger) + h.Recovered = true + d.tasks.Set(handle.Config.ID, h) + go h.run() + return nil } -func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) { +func parseDriverConfig(cfg *drivers.TaskConfig) (*TaskConfig, error) { var driverConfig TaskConfig if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { - return nil, nil, err + return nil, err } var err error if driverConfig.startBlockForDuration, err = parseDuration(driverConfig.StartBlockFor); err != nil { - return nil, nil, fmt.Errorf("start_block_for %v not a valid duration: %v", driverConfig.StartBlockFor, err) + return nil, fmt.Errorf("start_block_for %v not a valid duration: %v", driverConfig.StartBlockFor, err) } if driverConfig.runForDuration, err = parseDuration(driverConfig.RunFor); err != nil { - return nil, nil, fmt.Errorf("run_for %v not a valid duration: %v", driverConfig.RunFor, err) + return nil, fmt.Errorf("run_for %v not a valid duration: %v", driverConfig.RunFor, err) + } + + if driverConfig.pluginExitAfterDuration, err = parseDuration(driverConfig.PluginExitAfter); err != nil { + return nil, fmt.Errorf("plugin_exit_after %v not a valid duration: %v", driverConfig.PluginExitAfter, err) } if driverConfig.stdoutRepeatDuration, err = parseDuration(driverConfig.StdoutRepeatDur); err != nil { - return nil, nil, fmt.Errorf("stdout_repeat_duration %v not a valid duration: %v", driverConfig.stdoutRepeatDuration, err) + return nil, fmt.Errorf("stdout_repeat_duration %v not a valid duration: %v", driverConfig.stdoutRepeatDuration, err) + } + + return &driverConfig, nil +} + +func newTaskHandle(cfg *drivers.TaskConfig, driverConfig *TaskConfig, logger hclog.Logger) *taskHandle { + killCtx, killCancel := context.WithCancel(context.Background()) + h := &taskHandle{ + taskConfig: cfg, + runFor: driverConfig.runForDuration, + pluginExitAfter: driverConfig.pluginExitAfterDuration, + killAfter: driverConfig.killAfterDuration, + exitCode: driverConfig.ExitCode, + exitSignal: driverConfig.ExitSignal, + stdoutString: driverConfig.StdoutString, + stdoutRepeat: driverConfig.StdoutRepeat, + stdoutRepeatDur: driverConfig.stdoutRepeatDuration, + logger: logger.With("task_name", cfg.Name), + waitCh: make(chan struct{}), + killCh: killCtx.Done(), + kill: killCancel, + startedAt: time.Now(), + } + if driverConfig.ExitErrMsg != "" { + h.exitErr = errors.New(driverConfig.ExitErrMsg) + } + if driverConfig.SignalErr != "" { + h.signalErr = fmt.Errorf(driverConfig.SignalErr) + } + return h +} + +func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) { + driverConfig, err := parseDriverConfig(cfg) + if err != nil { + return nil, nil, err } if driverConfig.startBlockForDuration != 0 { @@ -334,7 +392,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru // Store last configs d.lastMu.Lock() d.lastDriverTaskConfig = cfg - d.lastTaskConfig = &driverConfig + d.lastTaskConfig = driverConfig d.lastMu.Unlock() if driverConfig.StartErr != "" { @@ -358,32 +416,9 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru net.PortMap = map[string]int{parts[0]: port} } - killCtx, killCancel := context.WithCancel(context.Background()) - - h := &taskHandle{ - taskConfig: cfg, - runFor: driverConfig.runForDuration, - killAfter: driverConfig.killAfterDuration, - exitCode: driverConfig.ExitCode, - exitSignal: driverConfig.ExitSignal, - stdoutString: driverConfig.StdoutString, - stdoutRepeat: driverConfig.StdoutRepeat, - stdoutRepeatDur: driverConfig.stdoutRepeatDuration, - logger: d.logger.With("task_name", cfg.Name), - waitCh: make(chan struct{}), - killCh: killCtx.Done(), - kill: killCancel, - } - if driverConfig.ExitErrMsg != "" { - h.exitErr = errors.New(driverConfig.ExitErrMsg) - } - if driverConfig.SignalErr != "" { - h.signalErr = fmt.Errorf(driverConfig.SignalErr) - } - + h := newTaskHandle(cfg, driverConfig, d.logger) driverState := MockTaskState{ - TaskConfig: cfg, - StartedAt: h.startedAt, + StartedAt: h.startedAt, } handle := drivers.NewTaskHandle(pluginName) handle.Config = cfg @@ -461,8 +496,17 @@ func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { } func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) { - //TODO return an error? - return nil, nil + // Generate random value for the memory usage + s := &cstructs.TaskResourceUsage{ + ResourceUsage: &cstructs.ResourceUsage{ + MemoryStats: &cstructs.MemoryStats{ + RSS: rand.Uint64(), + Measured: []string{"RSS"}, + }, + }, + Timestamp: time.Now().UTC().UnixNano(), + } + return s, nil } func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { @@ -499,3 +543,10 @@ func (d *Driver) GetTaskConfig() (*drivers.TaskConfig, *TaskConfig) { defer d.lastMu.Unlock() return d.lastDriverTaskConfig, d.lastTaskConfig } + +// GetHandle is unique to the mock driver and for testing purposes only. It +// returns the handle of the given task ID +func (d *Driver) GetHandle(taskID string) *taskHandle { + h, _ := d.tasks.Get(taskID) + return h +} diff --git a/drivers/mock/handle.go b/drivers/mock/handle.go index 44ad4932e..81d204d9d 100644 --- a/drivers/mock/handle.go +++ b/drivers/mock/handle.go @@ -8,6 +8,7 @@ import ( hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/lib/fifo" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" "github.com/hashicorp/nomad/plugins/drivers" ) @@ -16,6 +17,7 @@ type taskHandle struct { logger hclog.Logger runFor time.Duration + pluginExitAfter time.Duration killAfter time.Duration waitCh chan struct{} exitCode int @@ -39,6 +41,9 @@ type taskHandle struct { // Calling kill closes killCh if it is not already closed kill context.CancelFunc killCh <-chan struct{} + + // Recovered is set to true if the handle was created while being recovered + Recovered bool } func (h *taskHandle) TaskStatus() *drivers.TaskStatus { @@ -79,18 +84,30 @@ func (h *taskHandle) run() { errCh := make(chan error, 1) // Setup logging output - if h.stdoutString != "" { - go h.handleLogging(errCh) - } + go h.handleLogging(errCh) timer := time.NewTimer(h.runFor) defer timer.Stop() + var pluginExitTimer <-chan time.Time + if h.pluginExitAfter != 0 { + timer := time.NewTimer(h.pluginExitAfter) + defer timer.Stop() + pluginExitTimer = timer.C + } + select { case <-timer.C: h.logger.Debug("run_for time elapsed; exiting", "run_for", h.runFor) case <-h.killCh: h.logger.Debug("killed; exiting") + case <-pluginExitTimer: + h.logger.Debug("exiting plugin") + h.exitResult = &drivers.ExitResult{ + Err: bstructs.ErrPluginShutdown, + } + + return case err := <-errCh: h.logger.Error("error running mock task; exiting", "error", err) h.exitResult = &drivers.ExitResult{ @@ -114,6 +131,18 @@ func (h *taskHandle) handleLogging(errCh chan<- error) { errCh <- err return } + stderr, err := fifo.Open(h.taskConfig.StderrPath) + if err != nil { + h.logger.Error("failed to write to stderr", "error", err) + errCh <- err + return + } + defer stderr.Close() + + if h.stdoutString == "" { + return + } + if _, err := io.WriteString(stdout, h.stdoutString); err != nil { h.logger.Error("failed to write to stdout", "error", err) errCh <- err diff --git a/drivers/qemu/driver.go b/drivers/qemu/driver.go index bcf6110a7..8514ea4f7 100644 --- a/drivers/qemu/driver.go +++ b/drivers/qemu/driver.go @@ -207,7 +207,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { fingerprint := &drivers.Fingerprint{ Attributes: map[string]*pstructs.Attribute{}, Health: drivers.HealthStateHealthy, - HealthDescription: "ready", + HealthDescription: drivers.DriverHealthy, } bin := "qemu-system-x86_64" @@ -229,7 +229,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { matches := versionRegex.FindStringSubmatch(out) if len(matches) != 2 { fingerprint.Health = drivers.HealthStateUndetected - fingerprint.HealthDescription = fmt.Sprintf("failed to parse qemu version from %v", out) + fingerprint.HealthDescription = fmt.Sprintf("Failed to parse qemu version from %v", out) return fingerprint } currentQemuVersion := matches[1] diff --git a/drivers/qemu/driver_test.go b/drivers/qemu/driver_test.go index 53901f0b7..d0e204c52 100644 --- a/drivers/qemu/driver_test.go +++ b/drivers/qemu/driver_test.go @@ -17,8 +17,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -203,11 +203,11 @@ func TestQemuDriver_GetMonitorPathNewQemu(t *testing.T) { //encodeDriverhelper sets up the task config spec and encodes qemu specific driver configuration func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(diag.HasErrors(), diag.Error()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) require.False(diag.HasErrors(), diag.Error()) err := task.EncodeDriverConfig(taskConfigCtyVal) require.Nil(err) diff --git a/drivers/rawexec/driver.go b/drivers/rawexec/driver.go index 42c0db31b..d0f9fe83f 100644 --- a/drivers/rawexec/driver.go +++ b/drivers/rawexec/driver.go @@ -230,7 +230,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { attrs := map[string]*pstructs.Attribute{} if d.config.Enabled { health = drivers.HealthStateHealthy - desc = "ready" + desc = drivers.DriverHealthy attrs["driver.raw_exec"] = pstructs.NewBoolAttribute(true) } else { health = drivers.HealthStateUndetected diff --git a/drivers/rawexec/driver_test.go b/drivers/rawexec/driver_test.go index 1eb42f6dd..186475747 100644 --- a/drivers/rawexec/driver_test.go +++ b/drivers/rawexec/driver_test.go @@ -20,8 +20,8 @@ import ( basePlug "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -121,7 +121,7 @@ func TestRawExecDriver_Fingerprint(t *testing.T) { Expected: drivers.Fingerprint{ Attributes: map[string]*pstructs.Attribute{"driver.raw_exec": pstructs.NewBoolAttribute(true)}, Health: drivers.HealthStateHealthy, - HealthDescription: "ready", + HealthDescription: drivers.DriverHealthy, }, }, } @@ -500,11 +500,11 @@ func TestRawExecDriver_Exec(t *testing.T) { func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(diag.HasErrors()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) require.False(diag.HasErrors()) err := task.EncodeDriverConfig(taskConfigCtyVal) require.Nil(err) diff --git a/drivers/rkt/driver.go b/drivers/rkt/driver.go index 1a823010f..189b54f47 100644 --- a/drivers/rkt/driver.go +++ b/drivers/rkt/driver.go @@ -16,14 +16,15 @@ import ( "regexp" "strconv" "strings" + "sync" "syscall" "time" appcschema "github.com/appc/spec/schema" "github.com/hashicorp/consul-template/signals" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" - "github.com/hashicorp/go-version" + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" @@ -193,6 +194,10 @@ type Driver struct { // logger will log to the Nomad agent logger hclog.Logger + + // hasFingerprinted is used to store whether we have fingerprinted before + hasFingerprinted bool + fingerprintLock sync.Mutex } func NewRktDriver(logger hclog.Logger) drivers.DriverPlugin { @@ -261,15 +266,36 @@ func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerp } } +// setFingerprinted marks the driver as having fingerprinted once before +func (d *Driver) setFingerprinted() { + d.fingerprintLock.Lock() + d.hasFingerprinted = true + d.fingerprintLock.Unlock() +} + +// fingerprinted returns whether the driver has fingerprinted before +func (d *Driver) fingerprinted() bool { + d.fingerprintLock.Lock() + defer d.fingerprintLock.Unlock() + return d.hasFingerprinted +} + func (d *Driver) buildFingerprint() *drivers.Fingerprint { + defer func() { + d.setFingerprinted() + }() + fingerprint := &drivers.Fingerprint{ Attributes: map[string]*pstructs.Attribute{}, Health: drivers.HealthStateHealthy, - HealthDescription: "ready", + HealthDescription: drivers.DriverHealthy, } // Only enable if we are root if syscall.Geteuid() != 0 { + if !d.fingerprinted() { + d.logger.Debug("must run as root user, disabling") + } fingerprint.Health = drivers.HealthStateUndetected fingerprint.HealthDescription = drivers.DriverRequiresRootMessage return fingerprint @@ -278,7 +304,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { outBytes, err := exec.Command(rktCmd, "version").Output() if err != nil { fingerprint.Health = drivers.HealthStateUndetected - fingerprint.HealthDescription = fmt.Sprintf("failed to executor %s version: %v", rktCmd, err) + fingerprint.HealthDescription = fmt.Sprintf("Failed to execute %s version: %v", rktCmd, err) return fingerprint } out := strings.TrimSpace(string(outBytes)) @@ -287,7 +313,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { appcMatches := reAppcVersion.FindStringSubmatch(out) if len(rktMatches) != 2 || len(appcMatches) != 2 { fingerprint.Health = drivers.HealthStateUndetected - fingerprint.HealthDescription = "unable to parse rkt version string" + fingerprint.HealthDescription = "Unable to parse rkt version string" return fingerprint } @@ -296,7 +322,11 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { if currentVersion.LessThan(minVersion) { // Do not allow ancient rkt versions fingerprint.Health = drivers.HealthStateUndetected - fingerprint.HealthDescription = fmt.Sprintf("unsuported rkt version %s", currentVersion) + fingerprint.HealthDescription = fmt.Sprintf("Unsuported rkt version %s", currentVersion) + if !d.fingerprinted() { + d.logger.Warn("unsupported rkt version please upgrade to >= "+minVersion.String(), + "rkt_version", currentVersion) + } return fingerprint } diff --git a/drivers/rkt/driver_test.go b/drivers/rkt/driver_test.go index 95a75e371..2df70aa97 100644 --- a/drivers/rkt/driver_test.go +++ b/drivers/rkt/driver_test.go @@ -22,8 +22,8 @@ import ( basePlug "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" @@ -874,11 +874,11 @@ func TestRktDriver_Stats(t *testing.T) { func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(diag.HasErrors()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) if diag.HasErrors() { fmt.Println("conversion error", diag.Error()) } diff --git a/drivers/shared/executor/plugins.go b/drivers/shared/executor/plugins.go index d2478e568..55441e702 100644 --- a/drivers/shared/executor/plugins.go +++ b/drivers/shared/executor/plugins.go @@ -5,7 +5,7 @@ import ( "net" hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" + plugin "github.com/hashicorp/go-plugin" ) // ExecutorConfig is the config that Nomad passes to the executor diff --git a/e2e/consultemplate/consultemplate.go b/e2e/consultemplate/consultemplate.go new file mode 100644 index 000000000..772dcc6db --- /dev/null +++ b/e2e/consultemplate/consultemplate.go @@ -0,0 +1,155 @@ +package consultemplate + +import ( + "time" + + capi "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/jobspec" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" + + . "github.com/onsi/gomega" +) + +type ConsulTemplateTest struct { + framework.TC + jobIds []string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "Consul Template", + CanRunLocal: true, + Consul: true, + Cases: []framework.TestCase{ + new(ConsulTemplateTest), + }, + }) +} + +func (tc *ConsulTemplateTest) TestUpdatesRestartTasks(f *framework.F) { + require := require.New(f.T()) + g := NewGomegaWithT(f.T()) + + nomadClient := tc.Nomad() + consulClient := tc.Consul() + + // Ensure consultemplatetest does not exist + _, err := consulClient.KV().Delete("consultemplatetest", nil) + require.NoError(err) + + // Parse job + job, err := jobspec.ParseFile("consultemplate/input/docker.nomad") + require.Nil(err) + uuid := uuid.Generate() + jobId := helper.StringToPtr("cltp" + uuid[:8]) + job.ID = jobId + + tc.jobIds = append(tc.jobIds, *jobId) + + // Register job + jobs := nomadClient.Jobs() + resp, _, err := jobs.Register(job, nil) + require.Nil(err) + require.NotEmpty(resp.EvalID) + + waitForTaskState := func(taskState string) { + g.Eventually(func() string { + allocs, _, _ := jobs.Allocations(*job.ID, false, nil) + if len(allocs) != 1 { + return "" + } + first := allocs[0] + taskState := first.TaskStates["test"] + if taskState == nil { + return "" + } + + return taskState.State + }, 5*time.Second, time.Second).Should(Equal(taskState), "Incorrect task state") + } + + waitForClientAllocStatus := func(allocState string) { + g.Eventually(func() string { + allocSummaries, _, _ := jobs.Allocations(*job.ID, false, nil) + if len(allocSummaries) != 1 { + return "" + } + + alloc, _, _ := nomadClient.Allocations().Info(allocSummaries[0].ID, nil) + if alloc == nil { + return "" + } + + return alloc.ClientStatus + }, 5*time.Second, time.Second).Should(Equal(allocState), "Incorrect alloc state") + } + + waitForRestartCount := func(count uint64) { + g.Eventually(func() uint64 { + allocs, _, _ := jobs.Allocations(*job.ID, false, nil) + if len(allocs) != 1 { + return 0 + } + first := allocs[0] + return first.TaskStates["test"].Restarts + }, 5*time.Second, time.Second).Should(Equal(count), "Incorrect restart count") + } + + // Wrap in retry to wait until placement + waitForTaskState(structs.TaskStatePending) + + // Client should be pending + waitForClientAllocStatus(structs.AllocClientStatusPending) + + // Alloc should have a blocked event + g.Eventually(func() []string { + allocSummaries, _, _ := jobs.Allocations(*job.ID, false, nil) + events := allocSummaries[0].TaskStates["test"].Events + messages := []string{} + for _, event := range events { + messages = append(messages, event.DisplayMessage) + } + + return messages + }, 5*time.Second, time.Second).Should(ContainElement(ContainSubstring("kv.block"))) + + // Insert consultemplatetest + _, err = consulClient.KV().Put(&capi.KVPair{Key: "consultemplatetest", Value: []byte("bar")}, nil) + require.Nil(err) + + // Placement should start running + waitForClientAllocStatus(structs.AllocClientStatusRunning) + + // Ensure restart count 0 -- we should be going from blocked to running. + waitForRestartCount(0) + + // Update consultemplatetest + _, err = consulClient.KV().Put(&capi.KVPair{Key: "consultemplatetest", Value: []byte("baz")}, nil) + require.Nil(err) + + // Wrap in retry to wait until restart + // TODO(dani): FIXME: This restart counter should only be 1. This is + // likely an accounting bug in restart tracking from + // template hooks. + waitForRestartCount(2) +} + +func (tc *ConsulTemplateTest) AfterEach(f *framework.F) { + nomadClient := tc.Nomad() + consulClient := tc.Consul() + + jobs := nomadClient.Jobs() + // Stop all jobs in test + for _, id := range tc.jobIds { + jobs.Deregister(id, true, nil) + } + // Garbage collect + nomadClient.System().GarbageCollect() + + // Ensure consultemplatetest does not exist + consulClient.KV().Delete("consultemplatetest", nil) +} diff --git a/e2e/consultemplate/input/docker.nomad b/e2e/consultemplate/input/docker.nomad new file mode 100644 index 000000000..9f2c601df --- /dev/null +++ b/e2e/consultemplate/input/docker.nomad @@ -0,0 +1,22 @@ +job "test1" { + datacenters = ["dc1", "dc2"] + type = "service" + + group "test1" { + count = 1 + + task "test" { + driver = "docker" + + config { + image = "redis:3.2" + } + + template { + data = "---\nkey: {{ key \"consultemplatetest\" }}" + destination = "local/file.yml" + change_mode = "restart" + } + } + } +} diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 2101ff0a1..0b9a3e142 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -4,6 +4,7 @@ import ( "testing" _ "github.com/hashicorp/nomad/e2e/affinities" + _ "github.com/hashicorp/nomad/e2e/consultemplate" _ "github.com/hashicorp/nomad/e2e/example" _ "github.com/hashicorp/nomad/e2e/spread" ) diff --git a/e2e/framework/case.go b/e2e/framework/case.go index 62422ac27..787d27be9 100644 --- a/e2e/framework/case.go +++ b/e2e/framework/case.go @@ -3,6 +3,7 @@ package framework import ( "fmt" + capi "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/api" ) @@ -65,6 +66,11 @@ func (tc *TC) Nomad() *api.Client { return tc.cluster.NomadClient } +// Consul returns a configured consul api client +func (tc *TC) Consul() *capi.Client { + return tc.cluster.ConsulClient +} + // Name returns the name of the test case which is set to the name of the // implementing type. func (tc *TC) Name() string { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6a333eabc..796b497d0 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -28,7 +28,7 @@ import ( "github.com/hashicorp/consul/api" hcodec "github.com/hashicorp/go-msgpack/codec" multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/go-version" + version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/args" @@ -5743,17 +5743,7 @@ func (ts *TaskState) Copy() *TaskState { // have meaning on a non-batch allocation because a service and system // allocation should not finish. func (ts *TaskState) Successful() bool { - l := len(ts.Events) - if ts.State != TaskStateDead || l == 0 { - return false - } - - e := ts.Events[l-1] - if e.Type != TaskTerminated { - return false - } - - return e.ExitCode == 0 + return ts.State == TaskStateDead && !ts.Failed } const ( diff --git a/plugins/base/client.go b/plugins/base/client.go index 80bc7ef4b..9dadaeed4 100644 --- a/plugins/base/client.go +++ b/plugins/base/client.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/hashicorp/nomad/plugins/base/proto" + "github.com/hashicorp/nomad/plugins/shared/grpcutils" "github.com/hashicorp/nomad/plugins/shared/hclspec" ) @@ -20,7 +21,7 @@ type BasePluginClient struct { func (b *BasePluginClient) PluginInfo() (*PluginInfoResponse, error) { presp, err := b.Client.PluginInfo(b.DoneCtx, &proto.PluginInfoRequest{}) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, b.DoneCtx) } var ptype string @@ -46,7 +47,7 @@ func (b *BasePluginClient) PluginInfo() (*PluginInfoResponse, error) { func (b *BasePluginClient) ConfigSchema() (*hclspec.Spec, error) { presp, err := b.Client.ConfigSchema(b.DoneCtx, &proto.ConfigSchemaRequest{}) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, b.DoneCtx) } return presp.GetSpec(), nil @@ -60,5 +61,5 @@ func (b *BasePluginClient) SetConfig(c *Config) error { PluginApiVersion: c.ApiVersion, }) - return err + return grpcutils.HandleGrpcErr(err, b.DoneCtx) } diff --git a/plugins/base/plugin.go b/plugins/base/plugin.go index 411c79662..f511a3d45 100644 --- a/plugins/base/plugin.go +++ b/plugins/base/plugin.go @@ -3,7 +3,6 @@ package base import ( "bytes" "context" - "errors" "reflect" plugin "github.com/hashicorp/go-plugin" @@ -30,9 +29,6 @@ var ( MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE", MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255", } - - // ErrPluginShutdown is returned when the plugin has shutdown. - ErrPluginShutdown = errors.New("plugin is shut down") ) // PluginBase is wraps a BasePlugin and implements go-plugins GRPCPlugin diff --git a/plugins/base/structs/errors.go b/plugins/base/structs/errors.go new file mode 100644 index 000000000..0a5a7a6d6 --- /dev/null +++ b/plugins/base/structs/errors.go @@ -0,0 +1,12 @@ +package structs + +import "errors" + +const ( + errPluginShutdown = "plugin is shut down" +) + +var ( + // ErrPluginShutdown is returned when the plugin has shutdown. + ErrPluginShutdown = errors.New(errPluginShutdown) +) diff --git a/plugins/device/client.go b/plugins/device/client.go index ffbb80166..4dc187453 100644 --- a/plugins/device/client.go +++ b/plugins/device/client.go @@ -9,7 +9,7 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/device/proto" - "github.com/hashicorp/nomad/plugins/shared" + "github.com/hashicorp/nomad/plugins/shared/grpcutils" ) // devicePluginClient implements the client side of a remote device plugin, using @@ -30,12 +30,12 @@ type devicePluginClient struct { // cancelled, the error will be propogated. func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) { // Join the passed context and the shutdown context - ctx, _ = joincontext.Join(ctx, d.doneCtx) + joinedCtx, _ := joincontext.Join(ctx, d.doneCtx) var req proto.FingerprintRequest - stream, err := d.client.Fingerprint(ctx, &req) + stream, err := d.client.Fingerprint(joinedCtx, &req) if err != nil { - return nil, err + return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx) } out := make(chan *FingerprintResponse, 1) @@ -47,7 +47,7 @@ func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerpri // the gRPC stream to a channel. Exits either when context is cancelled or the // stream has an error. func (d *devicePluginClient) handleFingerprint( - ctx context.Context, + reqCtx context.Context, stream proto.DevicePlugin_FingerprintClient, out chan *FingerprintResponse) { @@ -57,7 +57,7 @@ func (d *devicePluginClient) handleFingerprint( if err != nil { if err != io.EOF { out <- &FingerprintResponse{ - Error: shared.HandleStreamErr(err, ctx, d.doneCtx), + Error: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx), } } @@ -70,7 +70,7 @@ func (d *devicePluginClient) handleFingerprint( Devices: convertProtoDeviceGroups(resp.GetDeviceGroup()), } select { - case <-ctx.Done(): + case <-reqCtx.Done(): return case out <- f: } @@ -86,7 +86,7 @@ func (d *devicePluginClient) Reserve(deviceIDs []string) (*ContainerReservation, // Make the request resp, err := d.client.Reserve(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } // Convert the response @@ -100,14 +100,14 @@ func (d *devicePluginClient) Reserve(deviceIDs []string) (*ContainerReservation, // propogated. func (d *devicePluginClient) Stats(ctx context.Context, interval time.Duration) (<-chan *StatsResponse, error) { // Join the passed context and the shutdown context - ctx, _ = joincontext.Join(ctx, d.doneCtx) + joinedCtx, _ := joincontext.Join(ctx, d.doneCtx) req := proto.StatsRequest{ CollectionInterval: ptypes.DurationProto(interval), } - stream, err := d.client.Stats(ctx, &req) + stream, err := d.client.Stats(joinedCtx, &req) if err != nil { - return nil, err + return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx) } out := make(chan *StatsResponse, 1) @@ -119,7 +119,7 @@ func (d *devicePluginClient) Stats(ctx context.Context, interval time.Duration) // the gRPC stream to a channel. Exits either when context is cancelled or the // stream has an error. func (d *devicePluginClient) handleStats( - ctx context.Context, + reqCtx context.Context, stream proto.DevicePlugin_StatsClient, out chan *StatsResponse) { @@ -129,7 +129,7 @@ func (d *devicePluginClient) handleStats( if err != nil { if err != io.EOF { out <- &StatsResponse{ - Error: shared.HandleStreamErr(err, ctx, d.doneCtx), + Error: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx), } } @@ -142,7 +142,7 @@ func (d *devicePluginClient) handleStats( Groups: convertProtoDeviceGroupsStats(resp.GetGroups()), } select { - case <-ctx.Done(): + case <-reqCtx.Done(): return case out <- s: } diff --git a/plugins/drivers/client.go b/plugins/drivers/client.go index edefec092..98adc8632 100644 --- a/plugins/drivers/client.go +++ b/plugins/drivers/client.go @@ -8,12 +8,11 @@ import ( "github.com/LK4D4/joincontext" "github.com/golang/protobuf/ptypes" - hclog "github.com/hashicorp/go-hclog" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers/proto" - "github.com/hashicorp/nomad/plugins/shared" + "github.com/hashicorp/nomad/plugins/shared/grpcutils" "github.com/hashicorp/nomad/plugins/shared/hclspec" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" sproto "github.com/hashicorp/nomad/plugins/shared/structs/proto" @@ -26,7 +25,6 @@ type driverPluginClient struct { *base.BasePluginClient client proto.DriverClient - logger hclog.Logger // doneCtx is closed when the plugin exits doneCtx context.Context @@ -37,7 +35,7 @@ func (d *driverPluginClient) TaskConfigSchema() (*hclspec.Spec, error) { resp, err := d.client.TaskConfigSchema(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } return resp.Spec, nil @@ -48,7 +46,7 @@ func (d *driverPluginClient) Capabilities() (*Capabilities, error) { resp, err := d.client.Capabilities(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } caps := &Capabilities{} @@ -76,11 +74,11 @@ func (d *driverPluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerpri req := &proto.FingerprintRequest{} // Join the passed context and the shutdown context - ctx, _ = joincontext.Join(ctx, d.doneCtx) + joinedCtx, _ := joincontext.Join(ctx, d.doneCtx) - stream, err := d.client.Fingerprint(ctx, req) + stream, err := d.client.Fingerprint(joinedCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx) } ch := make(chan *Fingerprint, 1) @@ -89,15 +87,14 @@ func (d *driverPluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerpri return ch, nil } -func (d *driverPluginClient) handleFingerprint(ctx context.Context, ch chan *Fingerprint, stream proto.Driver_FingerprintClient) { +func (d *driverPluginClient) handleFingerprint(reqCtx context.Context, ch chan *Fingerprint, stream proto.Driver_FingerprintClient) { defer close(ch) for { pb, err := stream.Recv() if err != nil { if err != io.EOF { - d.logger.Error("error receiving stream from Fingerprint driver RPC", "error", err) ch <- &Fingerprint{ - Err: shared.HandleStreamErr(err, ctx, d.doneCtx), + Err: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx), } } @@ -112,7 +109,7 @@ func (d *driverPluginClient) handleFingerprint(ctx context.Context, ch chan *Fin } select { - case <-ctx.Done(): + case <-reqCtx.Done(): return case ch <- f: } @@ -125,7 +122,7 @@ func (d *driverPluginClient) RecoverTask(h *TaskHandle) error { req := &proto.RecoverTaskRequest{Handle: taskHandleToProto(h)} _, err := d.client.RecoverTask(d.doneCtx, req) - return err + return grpcutils.HandleGrpcErr(err, d.doneCtx) } // StartTask starts execution of a task with the given TaskConfig. A TaskHandle @@ -144,7 +141,7 @@ func (d *driverPluginClient) StartTask(c *TaskConfig) (*TaskHandle, *cstructs.Dr return nil, nil, structs.NewRecoverableError(err, rec.Recoverable) } } - return nil, nil, err + return nil, nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } var net *cstructs.DriverNetwork @@ -168,10 +165,6 @@ func (d *driverPluginClient) StartTask(c *TaskConfig) (*TaskHandle, *cstructs.Dr // the same task without issue. func (d *driverPluginClient) WaitTask(ctx context.Context, id string) (<-chan *ExitResult, error) { ch := make(chan *ExitResult) - - // Join the passed context and the shutdown context - ctx, _ = joincontext.Join(ctx, d.doneCtx) - go d.handleWaitTask(ctx, id, ch) return ch, nil } @@ -183,9 +176,12 @@ func (d *driverPluginClient) handleWaitTask(ctx context.Context, id string, ch c TaskId: id, } - resp, err := d.client.WaitTask(ctx, req) + // Join the passed context and the shutdown context + joinedCtx, _ := joincontext.Join(ctx, d.doneCtx) + + resp, err := d.client.WaitTask(joinedCtx, req) if err != nil { - result.Err = err + result.Err = grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx) } else { result.ExitCode = int(resp.Result.ExitCode) result.Signal = int(resp.Result.Signal) @@ -209,7 +205,7 @@ func (d *driverPluginClient) StopTask(taskID string, timeout time.Duration, sign } _, err := d.client.StopTask(d.doneCtx, req) - return err + return grpcutils.HandleGrpcErr(err, d.doneCtx) } // DestroyTask removes the task from the driver's in memory state. The task @@ -222,7 +218,7 @@ func (d *driverPluginClient) DestroyTask(taskID string, force bool) error { } _, err := d.client.DestroyTask(d.doneCtx, req) - return err + return grpcutils.HandleGrpcErr(err, d.doneCtx) } // InspectTask returns status information for a task @@ -231,7 +227,7 @@ func (d *driverPluginClient) InspectTask(taskID string) (*TaskStatus, error) { resp, err := d.client.InspectTask(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } status, err := taskStatusFromProto(resp.Task) @@ -262,7 +258,7 @@ func (d *driverPluginClient) TaskStats(taskID string) (*cstructs.TaskResourceUsa resp, err := d.client.TaskStats(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } stats, err := TaskStatsFromProto(resp.Stats) @@ -279,11 +275,11 @@ func (d *driverPluginClient) TaskEvents(ctx context.Context) (<-chan *TaskEvent, req := &proto.TaskEventsRequest{} // Join the passed context and the shutdown context - ctx, _ = joincontext.Join(ctx, d.doneCtx) + joinedCtx, _ := joincontext.Join(ctx, d.doneCtx) - stream, err := d.client.TaskEvents(ctx, req) + stream, err := d.client.TaskEvents(joinedCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx) } ch := make(chan *TaskEvent, 1) @@ -291,15 +287,14 @@ func (d *driverPluginClient) TaskEvents(ctx context.Context) (<-chan *TaskEvent, return ch, nil } -func (d *driverPluginClient) handleTaskEvents(ctx context.Context, ch chan *TaskEvent, stream proto.Driver_TaskEventsClient) { +func (d *driverPluginClient) handleTaskEvents(reqCtx context.Context, ch chan *TaskEvent, stream proto.Driver_TaskEventsClient) { defer close(ch) for { ev, err := stream.Recv() if err != nil { if err != io.EOF { - d.logger.Error("error receiving stream from TaskEvents driver RPC", "error", err) ch <- &TaskEvent{ - Err: shared.HandleStreamErr(err, ctx, d.doneCtx), + Err: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx), } } @@ -317,7 +312,7 @@ func (d *driverPluginClient) handleTaskEvents(ctx context.Context, ch chan *Task Timestamp: timestamp, } select { - case <-ctx.Done(): + case <-reqCtx.Done(): return case ch <- event: } @@ -331,7 +326,7 @@ func (d *driverPluginClient) SignalTask(taskID string, signal string) error { Signal: signal, } _, err := d.client.SignalTask(d.doneCtx, req) - return err + return grpcutils.HandleGrpcErr(err, d.doneCtx) } // ExecTask will run the given command within the execution context of the task. @@ -347,7 +342,7 @@ func (d *driverPluginClient) ExecTask(taskID string, cmd []string, timeout time. resp, err := d.client.ExecTask(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } result := &ExecTaskResult{ diff --git a/plugins/drivers/driver.go b/plugins/drivers/driver.go index 474597a64..68dc5899f 100644 --- a/plugins/drivers/driver.go +++ b/plugins/drivers/driver.go @@ -18,6 +18,8 @@ import ( "github.com/zclconf/go-cty/cty/msgpack" ) +const DriverHealthy = "Healthy" + // DriverPlugin is the interface with drivers will implement. It is also // implemented by a plugin client which proxies the calls to go-plugin. See // the proto/driver.proto file for detailed information about each RPC and diff --git a/plugins/drivers/errors.go b/plugins/drivers/errors.go index d9cf08398..a89ffc6a9 100644 --- a/plugins/drivers/errors.go +++ b/plugins/drivers/errors.go @@ -5,3 +5,7 @@ import "fmt" var ErrTaskNotFound = fmt.Errorf("task not found for given id") var DriverRequiresRootMessage = "Driver must run as root" + +var NoCgroupMountMessage = "Failed to discover cgroup mount point" + +var CgroupMountEmpty = "Cgroup mount point unavailable" diff --git a/plugins/drivers/plugin.go b/plugins/drivers/plugin.go index 524123cba..b3fcfcfc4 100644 --- a/plugins/drivers/plugin.go +++ b/plugins/drivers/plugin.go @@ -19,10 +19,9 @@ type PluginDriver struct { logger hclog.Logger } -func NewDriverPlugin(d DriverPlugin, logger hclog.Logger) plugin.GRPCPlugin { +func NewDriverPlugin(d DriverPlugin) plugin.GRPCPlugin { return &PluginDriver{ - impl: d, - logger: logger.Named("driver_plugin"), + impl: d, } } @@ -42,7 +41,6 @@ func (p *PluginDriver) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker Client: baseproto.NewBasePluginClient(c), }, client: proto.NewDriverClient(c), - logger: p.logger, doneCtx: ctx, }, nil } diff --git a/plugins/drivers/proto/driver.pb.go b/plugins/drivers/proto/driver.pb.go index 006a18dec..6c9e3e0f9 100644 --- a/plugins/drivers/proto/driver.pb.go +++ b/plugins/drivers/proto/driver.pb.go @@ -50,7 +50,7 @@ func (x TaskState) String() string { return proto.EnumName(TaskState_name, int32(x)) } func (TaskState) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{0} } type FingerprintResponse_HealthState int32 @@ -76,7 +76,7 @@ func (x FingerprintResponse_HealthState) String() string { return proto.EnumName(FingerprintResponse_HealthState_name, int32(x)) } func (FingerprintResponse_HealthState) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{5, 0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{5, 0} } type StartTaskResponse_Result int32 @@ -102,7 +102,7 @@ func (x StartTaskResponse_Result) String() string { return proto.EnumName(StartTaskResponse_Result_name, int32(x)) } func (StartTaskResponse_Result) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{9, 0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{9, 0} } type DriverCapabilities_FSIsolation int32 @@ -128,7 +128,7 @@ func (x DriverCapabilities_FSIsolation) String() string { return proto.EnumName(DriverCapabilities_FSIsolation_name, int32(x)) } func (DriverCapabilities_FSIsolation) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{25, 0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{25, 0} } type CPUUsage_Fields int32 @@ -163,7 +163,7 @@ func (x CPUUsage_Fields) String() string { return proto.EnumName(CPUUsage_Fields_name, int32(x)) } func (CPUUsage_Fields) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{43, 0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{43, 0} } type MemoryUsage_Fields int32 @@ -195,7 +195,7 @@ func (x MemoryUsage_Fields) String() string { return proto.EnumName(MemoryUsage_Fields_name, int32(x)) } func (MemoryUsage_Fields) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{44, 0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{44, 0} } type TaskConfigSchemaRequest struct { @@ -208,7 +208,7 @@ func (m *TaskConfigSchemaRequest) Reset() { *m = TaskConfigSchemaRequest func (m *TaskConfigSchemaRequest) String() string { return proto.CompactTextString(m) } func (*TaskConfigSchemaRequest) ProtoMessage() {} func (*TaskConfigSchemaRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{0} } func (m *TaskConfigSchemaRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskConfigSchemaRequest.Unmarshal(m, b) @@ -240,7 +240,7 @@ func (m *TaskConfigSchemaResponse) Reset() { *m = TaskConfigSchemaRespon func (m *TaskConfigSchemaResponse) String() string { return proto.CompactTextString(m) } func (*TaskConfigSchemaResponse) ProtoMessage() {} func (*TaskConfigSchemaResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{1} + return fileDescriptor_driver_66cfa35dd20ec741, []int{1} } func (m *TaskConfigSchemaResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskConfigSchemaResponse.Unmarshal(m, b) @@ -277,7 +277,7 @@ func (m *CapabilitiesRequest) Reset() { *m = CapabilitiesRequest{} } func (m *CapabilitiesRequest) String() string { return proto.CompactTextString(m) } func (*CapabilitiesRequest) ProtoMessage() {} func (*CapabilitiesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{2} + return fileDescriptor_driver_66cfa35dd20ec741, []int{2} } func (m *CapabilitiesRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_CapabilitiesRequest.Unmarshal(m, b) @@ -312,7 +312,7 @@ func (m *CapabilitiesResponse) Reset() { *m = CapabilitiesResponse{} } func (m *CapabilitiesResponse) String() string { return proto.CompactTextString(m) } func (*CapabilitiesResponse) ProtoMessage() {} func (*CapabilitiesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{3} + return fileDescriptor_driver_66cfa35dd20ec741, []int{3} } func (m *CapabilitiesResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_CapabilitiesResponse.Unmarshal(m, b) @@ -349,7 +349,7 @@ func (m *FingerprintRequest) Reset() { *m = FingerprintRequest{} } func (m *FingerprintRequest) String() string { return proto.CompactTextString(m) } func (*FingerprintRequest) ProtoMessage() {} func (*FingerprintRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{4} + return fileDescriptor_driver_66cfa35dd20ec741, []int{4} } func (m *FingerprintRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FingerprintRequest.Unmarshal(m, b) @@ -392,7 +392,7 @@ func (m *FingerprintResponse) Reset() { *m = FingerprintResponse{} } func (m *FingerprintResponse) String() string { return proto.CompactTextString(m) } func (*FingerprintResponse) ProtoMessage() {} func (*FingerprintResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{5} + return fileDescriptor_driver_66cfa35dd20ec741, []int{5} } func (m *FingerprintResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FingerprintResponse.Unmarshal(m, b) @@ -447,7 +447,7 @@ func (m *RecoverTaskRequest) Reset() { *m = RecoverTaskRequest{} } func (m *RecoverTaskRequest) String() string { return proto.CompactTextString(m) } func (*RecoverTaskRequest) ProtoMessage() {} func (*RecoverTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{6} + return fileDescriptor_driver_66cfa35dd20ec741, []int{6} } func (m *RecoverTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RecoverTaskRequest.Unmarshal(m, b) @@ -491,7 +491,7 @@ func (m *RecoverTaskResponse) Reset() { *m = RecoverTaskResponse{} } func (m *RecoverTaskResponse) String() string { return proto.CompactTextString(m) } func (*RecoverTaskResponse) ProtoMessage() {} func (*RecoverTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{7} + return fileDescriptor_driver_66cfa35dd20ec741, []int{7} } func (m *RecoverTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RecoverTaskResponse.Unmarshal(m, b) @@ -523,7 +523,7 @@ func (m *StartTaskRequest) Reset() { *m = StartTaskRequest{} } func (m *StartTaskRequest) String() string { return proto.CompactTextString(m) } func (*StartTaskRequest) ProtoMessage() {} func (*StartTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{8} + return fileDescriptor_driver_66cfa35dd20ec741, []int{8} } func (m *StartTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StartTaskRequest.Unmarshal(m, b) @@ -577,7 +577,7 @@ func (m *StartTaskResponse) Reset() { *m = StartTaskResponse{} } func (m *StartTaskResponse) String() string { return proto.CompactTextString(m) } func (*StartTaskResponse) ProtoMessage() {} func (*StartTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{9} + return fileDescriptor_driver_66cfa35dd20ec741, []int{9} } func (m *StartTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StartTaskResponse.Unmarshal(m, b) @@ -637,7 +637,7 @@ func (m *WaitTaskRequest) Reset() { *m = WaitTaskRequest{} } func (m *WaitTaskRequest) String() string { return proto.CompactTextString(m) } func (*WaitTaskRequest) ProtoMessage() {} func (*WaitTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{10} + return fileDescriptor_driver_66cfa35dd20ec741, []int{10} } func (m *WaitTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_WaitTaskRequest.Unmarshal(m, b) @@ -678,7 +678,7 @@ func (m *WaitTaskResponse) Reset() { *m = WaitTaskResponse{} } func (m *WaitTaskResponse) String() string { return proto.CompactTextString(m) } func (*WaitTaskResponse) ProtoMessage() {} func (*WaitTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{11} + return fileDescriptor_driver_66cfa35dd20ec741, []int{11} } func (m *WaitTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_WaitTaskResponse.Unmarshal(m, b) @@ -730,7 +730,7 @@ func (m *StopTaskRequest) Reset() { *m = StopTaskRequest{} } func (m *StopTaskRequest) String() string { return proto.CompactTextString(m) } func (*StopTaskRequest) ProtoMessage() {} func (*StopTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{12} + return fileDescriptor_driver_66cfa35dd20ec741, []int{12} } func (m *StopTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StopTaskRequest.Unmarshal(m, b) @@ -781,7 +781,7 @@ func (m *StopTaskResponse) Reset() { *m = StopTaskResponse{} } func (m *StopTaskResponse) String() string { return proto.CompactTextString(m) } func (*StopTaskResponse) ProtoMessage() {} func (*StopTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{13} + return fileDescriptor_driver_66cfa35dd20ec741, []int{13} } func (m *StopTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StopTaskResponse.Unmarshal(m, b) @@ -815,7 +815,7 @@ func (m *DestroyTaskRequest) Reset() { *m = DestroyTaskRequest{} } func (m *DestroyTaskRequest) String() string { return proto.CompactTextString(m) } func (*DestroyTaskRequest) ProtoMessage() {} func (*DestroyTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{14} + return fileDescriptor_driver_66cfa35dd20ec741, []int{14} } func (m *DestroyTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroyTaskRequest.Unmarshal(m, b) @@ -859,7 +859,7 @@ func (m *DestroyTaskResponse) Reset() { *m = DestroyTaskResponse{} } func (m *DestroyTaskResponse) String() string { return proto.CompactTextString(m) } func (*DestroyTaskResponse) ProtoMessage() {} func (*DestroyTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{15} + return fileDescriptor_driver_66cfa35dd20ec741, []int{15} } func (m *DestroyTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroyTaskResponse.Unmarshal(m, b) @@ -891,7 +891,7 @@ func (m *InspectTaskRequest) Reset() { *m = InspectTaskRequest{} } func (m *InspectTaskRequest) String() string { return proto.CompactTextString(m) } func (*InspectTaskRequest) ProtoMessage() {} func (*InspectTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{16} + return fileDescriptor_driver_66cfa35dd20ec741, []int{16} } func (m *InspectTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_InspectTaskRequest.Unmarshal(m, b) @@ -934,7 +934,7 @@ func (m *InspectTaskResponse) Reset() { *m = InspectTaskResponse{} } func (m *InspectTaskResponse) String() string { return proto.CompactTextString(m) } func (*InspectTaskResponse) ProtoMessage() {} func (*InspectTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{17} + return fileDescriptor_driver_66cfa35dd20ec741, []int{17} } func (m *InspectTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_InspectTaskResponse.Unmarshal(m, b) @@ -987,7 +987,7 @@ func (m *TaskStatsRequest) Reset() { *m = TaskStatsRequest{} } func (m *TaskStatsRequest) String() string { return proto.CompactTextString(m) } func (*TaskStatsRequest) ProtoMessage() {} func (*TaskStatsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{18} + return fileDescriptor_driver_66cfa35dd20ec741, []int{18} } func (m *TaskStatsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskStatsRequest.Unmarshal(m, b) @@ -1026,7 +1026,7 @@ func (m *TaskStatsResponse) Reset() { *m = TaskStatsResponse{} } func (m *TaskStatsResponse) String() string { return proto.CompactTextString(m) } func (*TaskStatsResponse) ProtoMessage() {} func (*TaskStatsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{19} + return fileDescriptor_driver_66cfa35dd20ec741, []int{19} } func (m *TaskStatsResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskStatsResponse.Unmarshal(m, b) @@ -1063,7 +1063,7 @@ func (m *TaskEventsRequest) Reset() { *m = TaskEventsRequest{} } func (m *TaskEventsRequest) String() string { return proto.CompactTextString(m) } func (*TaskEventsRequest) ProtoMessage() {} func (*TaskEventsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{20} + return fileDescriptor_driver_66cfa35dd20ec741, []int{20} } func (m *TaskEventsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskEventsRequest.Unmarshal(m, b) @@ -1097,7 +1097,7 @@ func (m *SignalTaskRequest) Reset() { *m = SignalTaskRequest{} } func (m *SignalTaskRequest) String() string { return proto.CompactTextString(m) } func (*SignalTaskRequest) ProtoMessage() {} func (*SignalTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{21} + return fileDescriptor_driver_66cfa35dd20ec741, []int{21} } func (m *SignalTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SignalTaskRequest.Unmarshal(m, b) @@ -1141,7 +1141,7 @@ func (m *SignalTaskResponse) Reset() { *m = SignalTaskResponse{} } func (m *SignalTaskResponse) String() string { return proto.CompactTextString(m) } func (*SignalTaskResponse) ProtoMessage() {} func (*SignalTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{22} + return fileDescriptor_driver_66cfa35dd20ec741, []int{22} } func (m *SignalTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SignalTaskResponse.Unmarshal(m, b) @@ -1178,7 +1178,7 @@ func (m *ExecTaskRequest) Reset() { *m = ExecTaskRequest{} } func (m *ExecTaskRequest) String() string { return proto.CompactTextString(m) } func (*ExecTaskRequest) ProtoMessage() {} func (*ExecTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{23} + return fileDescriptor_driver_66cfa35dd20ec741, []int{23} } func (m *ExecTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecTaskRequest.Unmarshal(m, b) @@ -1235,7 +1235,7 @@ func (m *ExecTaskResponse) Reset() { *m = ExecTaskResponse{} } func (m *ExecTaskResponse) String() string { return proto.CompactTextString(m) } func (*ExecTaskResponse) ProtoMessage() {} func (*ExecTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{24} + return fileDescriptor_driver_66cfa35dd20ec741, []int{24} } func (m *ExecTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecTaskResponse.Unmarshal(m, b) @@ -1294,7 +1294,7 @@ func (m *DriverCapabilities) Reset() { *m = DriverCapabilities{} } func (m *DriverCapabilities) String() string { return proto.CompactTextString(m) } func (*DriverCapabilities) ProtoMessage() {} func (*DriverCapabilities) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{25} + return fileDescriptor_driver_66cfa35dd20ec741, []int{25} } func (m *DriverCapabilities) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DriverCapabilities.Unmarshal(m, b) @@ -1380,7 +1380,7 @@ func (m *TaskConfig) Reset() { *m = TaskConfig{} } func (m *TaskConfig) String() string { return proto.CompactTextString(m) } func (*TaskConfig) ProtoMessage() {} func (*TaskConfig) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{26} + return fileDescriptor_driver_66cfa35dd20ec741, []int{26} } func (m *TaskConfig) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskConfig.Unmarshal(m, b) @@ -1519,7 +1519,7 @@ func (m *Resources) Reset() { *m = Resources{} } func (m *Resources) String() string { return proto.CompactTextString(m) } func (*Resources) ProtoMessage() {} func (*Resources) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{27} + return fileDescriptor_driver_66cfa35dd20ec741, []int{27} } func (m *Resources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Resources.Unmarshal(m, b) @@ -1566,7 +1566,7 @@ func (m *AllocatedTaskResources) Reset() { *m = AllocatedTaskResources{} func (m *AllocatedTaskResources) String() string { return proto.CompactTextString(m) } func (*AllocatedTaskResources) ProtoMessage() {} func (*AllocatedTaskResources) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{28} + return fileDescriptor_driver_66cfa35dd20ec741, []int{28} } func (m *AllocatedTaskResources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_AllocatedTaskResources.Unmarshal(m, b) @@ -1618,7 +1618,7 @@ func (m *AllocatedCpuResources) Reset() { *m = AllocatedCpuResources{} } func (m *AllocatedCpuResources) String() string { return proto.CompactTextString(m) } func (*AllocatedCpuResources) ProtoMessage() {} func (*AllocatedCpuResources) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{29} + return fileDescriptor_driver_66cfa35dd20ec741, []int{29} } func (m *AllocatedCpuResources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_AllocatedCpuResources.Unmarshal(m, b) @@ -1656,7 +1656,7 @@ func (m *AllocatedMemoryResources) Reset() { *m = AllocatedMemoryResourc func (m *AllocatedMemoryResources) String() string { return proto.CompactTextString(m) } func (*AllocatedMemoryResources) ProtoMessage() {} func (*AllocatedMemoryResources) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{30} + return fileDescriptor_driver_66cfa35dd20ec741, []int{30} } func (m *AllocatedMemoryResources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_AllocatedMemoryResources.Unmarshal(m, b) @@ -1699,7 +1699,7 @@ func (m *NetworkResource) Reset() { *m = NetworkResource{} } func (m *NetworkResource) String() string { return proto.CompactTextString(m) } func (*NetworkResource) ProtoMessage() {} func (*NetworkResource) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{31} + return fileDescriptor_driver_66cfa35dd20ec741, []int{31} } func (m *NetworkResource) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_NetworkResource.Unmarshal(m, b) @@ -1773,7 +1773,7 @@ func (m *NetworkPort) Reset() { *m = NetworkPort{} } func (m *NetworkPort) String() string { return proto.CompactTextString(m) } func (*NetworkPort) ProtoMessage() {} func (*NetworkPort) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{32} + return fileDescriptor_driver_66cfa35dd20ec741, []int{32} } func (m *NetworkPort) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_NetworkPort.Unmarshal(m, b) @@ -1833,7 +1833,7 @@ func (m *LinuxResources) Reset() { *m = LinuxResources{} } func (m *LinuxResources) String() string { return proto.CompactTextString(m) } func (*LinuxResources) ProtoMessage() {} func (*LinuxResources) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{33} + return fileDescriptor_driver_66cfa35dd20ec741, []int{33} } func (m *LinuxResources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_LinuxResources.Unmarshal(m, b) @@ -1925,7 +1925,7 @@ func (m *Mount) Reset() { *m = Mount{} } func (m *Mount) String() string { return proto.CompactTextString(m) } func (*Mount) ProtoMessage() {} func (*Mount) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{34} + return fileDescriptor_driver_66cfa35dd20ec741, []int{34} } func (m *Mount) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Mount.Unmarshal(m, b) @@ -1988,7 +1988,7 @@ func (m *Device) Reset() { *m = Device{} } func (m *Device) String() string { return proto.CompactTextString(m) } func (*Device) ProtoMessage() {} func (*Device) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{35} + return fileDescriptor_driver_66cfa35dd20ec741, []int{35} } func (m *Device) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Device.Unmarshal(m, b) @@ -2046,7 +2046,7 @@ func (m *TaskHandle) Reset() { *m = TaskHandle{} } func (m *TaskHandle) String() string { return proto.CompactTextString(m) } func (*TaskHandle) ProtoMessage() {} func (*TaskHandle) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{36} + return fileDescriptor_driver_66cfa35dd20ec741, []int{36} } func (m *TaskHandle) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskHandle.Unmarshal(m, b) @@ -2106,7 +2106,7 @@ func (m *NetworkOverride) Reset() { *m = NetworkOverride{} } func (m *NetworkOverride) String() string { return proto.CompactTextString(m) } func (*NetworkOverride) ProtoMessage() {} func (*NetworkOverride) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{37} + return fileDescriptor_driver_66cfa35dd20ec741, []int{37} } func (m *NetworkOverride) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_NetworkOverride.Unmarshal(m, b) @@ -2164,7 +2164,7 @@ func (m *ExitResult) Reset() { *m = ExitResult{} } func (m *ExitResult) String() string { return proto.CompactTextString(m) } func (*ExitResult) ProtoMessage() {} func (*ExitResult) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{38} + return fileDescriptor_driver_66cfa35dd20ec741, []int{38} } func (m *ExitResult) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExitResult.Unmarshal(m, b) @@ -2227,7 +2227,7 @@ func (m *TaskStatus) Reset() { *m = TaskStatus{} } func (m *TaskStatus) String() string { return proto.CompactTextString(m) } func (*TaskStatus) ProtoMessage() {} func (*TaskStatus) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{39} + return fileDescriptor_driver_66cfa35dd20ec741, []int{39} } func (m *TaskStatus) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskStatus.Unmarshal(m, b) @@ -2302,7 +2302,7 @@ func (m *TaskDriverStatus) Reset() { *m = TaskDriverStatus{} } func (m *TaskDriverStatus) String() string { return proto.CompactTextString(m) } func (*TaskDriverStatus) ProtoMessage() {} func (*TaskDriverStatus) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{40} + return fileDescriptor_driver_66cfa35dd20ec741, []int{40} } func (m *TaskDriverStatus) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskDriverStatus.Unmarshal(m, b) @@ -2347,7 +2347,7 @@ func (m *TaskStats) Reset() { *m = TaskStats{} } func (m *TaskStats) String() string { return proto.CompactTextString(m) } func (*TaskStats) ProtoMessage() {} func (*TaskStats) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{41} + return fileDescriptor_driver_66cfa35dd20ec741, []int{41} } func (m *TaskStats) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskStats.Unmarshal(m, b) @@ -2409,7 +2409,7 @@ func (m *TaskResourceUsage) Reset() { *m = TaskResourceUsage{} } func (m *TaskResourceUsage) String() string { return proto.CompactTextString(m) } func (*TaskResourceUsage) ProtoMessage() {} func (*TaskResourceUsage) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{42} + return fileDescriptor_driver_66cfa35dd20ec741, []int{42} } func (m *TaskResourceUsage) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskResourceUsage.Unmarshal(m, b) @@ -2461,7 +2461,7 @@ func (m *CPUUsage) Reset() { *m = CPUUsage{} } func (m *CPUUsage) String() string { return proto.CompactTextString(m) } func (*CPUUsage) ProtoMessage() {} func (*CPUUsage) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{43} + return fileDescriptor_driver_66cfa35dd20ec741, []int{43} } func (m *CPUUsage) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_CPUUsage.Unmarshal(m, b) @@ -2547,7 +2547,7 @@ func (m *MemoryUsage) Reset() { *m = MemoryUsage{} } func (m *MemoryUsage) String() string { return proto.CompactTextString(m) } func (*MemoryUsage) ProtoMessage() {} func (*MemoryUsage) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{44} + return fileDescriptor_driver_66cfa35dd20ec741, []int{44} } func (m *MemoryUsage) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MemoryUsage.Unmarshal(m, b) @@ -2631,7 +2631,7 @@ func (m *DriverTaskEvent) Reset() { *m = DriverTaskEvent{} } func (m *DriverTaskEvent) String() string { return proto.CompactTextString(m) } func (*DriverTaskEvent) ProtoMessage() {} func (*DriverTaskEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{45} + return fileDescriptor_driver_66cfa35dd20ec741, []int{45} } func (m *DriverTaskEvent) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DriverTaskEvent.Unmarshal(m, b) @@ -3339,10 +3339,10 @@ var _Driver_serviceDesc = grpc.ServiceDesc{ } func init() { - proto.RegisterFile("plugins/drivers/proto/driver.proto", fileDescriptor_driver_de29bfae7a3376ed) + proto.RegisterFile("plugins/drivers/proto/driver.proto", fileDescriptor_driver_66cfa35dd20ec741) } -var fileDescriptor_driver_de29bfae7a3376ed = []byte{ +var fileDescriptor_driver_66cfa35dd20ec741 = []byte{ // 2940 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x59, 0xcb, 0x6f, 0x23, 0xc7, 0xd1, 0x17, 0x9f, 0x22, 0x8b, 0x12, 0x35, 0xdb, 0xbb, 0x6b, 0xd3, 0x34, 0xbe, 0xcf, 0xeb, 0x01, diff --git a/plugins/drivers/testutils/testing.go b/plugins/drivers/testutils/testing.go index 2394b4f1b..f4c54702f 100644 --- a/plugins/drivers/testutils/testing.go +++ b/plugins/drivers/testutils/testing.go @@ -43,7 +43,7 @@ func (d *DriverHarness) Impl() drivers.DriverPlugin { func NewDriverHarness(t testing.T, d drivers.DriverPlugin) *DriverHarness { logger := testlog.HCLogger(t).Named("driver_harness") - pd := drivers.NewDriverPlugin(d, logger).(*drivers.PluginDriver) + pd := drivers.NewDriverPlugin(d).(*drivers.PluginDriver) client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ diff --git a/plugins/shared/cmd/launcher/command/device.go b/plugins/shared/cmd/launcher/command/device.go index 01855da7b..fc8b6b7af 100644 --- a/plugins/shared/cmd/launcher/command/device.go +++ b/plugins/shared/cmd/launcher/command/device.go @@ -18,8 +18,8 @@ import ( "github.com/hashicorp/hcl2/hcldec" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/device" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/kr/pretty" "github.com/mitchellh/cli" "github.com/zclconf/go-cty/cty/msgpack" @@ -198,10 +198,10 @@ func (c *Device) setConfig(spec hcldec.Spec, apiVersion string, config []byte, n c.logger.Trace("raw hcl config", "config", hclog.Fmt("% #v", pretty.Formatter(configVal))) ctx := &hcl2.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } - val, diag := shared.ParseHclInterface(configVal, spec, ctx) + val, diag := hclutils.ParseHclInterface(configVal, spec, ctx) if diag.HasErrors() { errStr := "failed to parse config" for _, err := range diag.Errs() { diff --git a/plugins/shared/grpc_utils.go b/plugins/shared/grpc_utils.go deleted file mode 100644 index 34fb33a87..000000000 --- a/plugins/shared/grpc_utils.go +++ /dev/null @@ -1,61 +0,0 @@ -package shared - -import ( - "context" - "time" - - "github.com/hashicorp/nomad/plugins/base" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// HandleStreamErr is used to handle a non io.EOF error in a stream. It handles -// detecting if the plugin has shutdown via the passeed pluginCtx. The -// parameters are: -// - err: the error returned from the streaming RPC -// - reqCtx: the context passed to the streaming request -// - pluginCtx: the plugins done ctx used to detect the plugin dying -// -// The return values are: -// - base.ErrPluginShutdown if the error is because the plugin shutdown -// - context.Canceled if the reqCtx is canceled -// - The original error -func HandleStreamErr(err error, reqCtx, pluginCtx context.Context) error { - if err == nil { - return nil - } - - // Determine if the error is because the plugin shutdown - if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.Unavailable { - // Potentially wait a little before returning an error so we can detect - // the exit - select { - case <-pluginCtx.Done(): - err = base.ErrPluginShutdown - case <-reqCtx.Done(): - err = reqCtx.Err() - - // There is no guarantee that the select will choose the - // doneCtx first so we have to double check - select { - case <-pluginCtx.Done(): - err = base.ErrPluginShutdown - default: - } - case <-time.After(3 * time.Second): - // Its okay to wait a while since the connection isn't available and - // on local host it is likely shutting down. It is not expected for - // this to ever reach even close to 3 seconds. - } - - // It is an error we don't know how to handle, so return it - return err - } - - // Context was cancelled - if errStatus := status.FromContextError(reqCtx.Err()); errStatus.Code() == codes.Canceled { - return context.Canceled - } - - return err -} diff --git a/plugins/shared/grpcutils/utils.go b/plugins/shared/grpcutils/utils.go new file mode 100644 index 000000000..001cf4ad3 --- /dev/null +++ b/plugins/shared/grpcutils/utils.go @@ -0,0 +1,105 @@ +package grpcutils + +import ( + "context" + "time" + + "github.com/hashicorp/nomad/plugins/base/structs" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// HandleReqCtxGrpcErr is used to handle a non io.EOF error in a GRPC request +// where a user supplied context is used. It handles detecting if the plugin has +// shutdown via the passeed pluginCtx. The parameters are: +// - err: the error returned from the streaming RPC +// - reqCtx: the user context passed to the request +// - pluginCtx: the plugins done ctx used to detect the plugin dying +// +// The return values are: +// - ErrPluginShutdown if the error is because the plugin shutdown +// - context.Canceled if the reqCtx is canceled +// - The original error +func HandleReqCtxGrpcErr(err error, reqCtx, pluginCtx context.Context) error { + if err == nil { + return nil + } + + // Determine if the error is because the plugin shutdown + if errStatus, ok := status.FromError(err); ok && + (errStatus.Code() == codes.Unavailable || errStatus.Code() == codes.Canceled) { + // Potentially wait a little before returning an error so we can detect + // the exit + select { + case <-pluginCtx.Done(): + err = structs.ErrPluginShutdown + case <-reqCtx.Done(): + err = reqCtx.Err() + + // There is no guarantee that the select will choose the + // doneCtx first so we have to double check + select { + case <-pluginCtx.Done(): + err = structs.ErrPluginShutdown + default: + } + case <-time.After(3 * time.Second): + // Its okay to wait a while since the connection isn't available and + // on local host it is likely shutting down. It is not expected for + // this to ever reach even close to 3 seconds. + } + + // It is an error we don't know how to handle, so return it + return err + } + + // Context was cancelled + if errStatus := status.FromContextError(reqCtx.Err()); errStatus.Code() == codes.Canceled { + return context.Canceled + } + + return err +} + +// HandleGrpcErr is used to handle errors made to a remote gRPC plugin. It +// handles detecting if the plugin has shutdown via the passeed pluginCtx. The +// parameters are: +// - err: the error returned from the streaming RPC +// - pluginCtx: the plugins done ctx used to detect the plugin dying +// +// The return values are: +// - ErrPluginShutdown if the error is because the plugin shutdown +// - The original error +func HandleGrpcErr(err error, pluginCtx context.Context) error { + if err == nil { + return nil + } + + if errStatus := status.FromContextError(pluginCtx.Err()); errStatus.Code() == codes.Canceled { + // See if the plugin shutdown + select { + case <-pluginCtx.Done(): + err = structs.ErrPluginShutdown + default: + } + } + + // Determine if the error is because the plugin shutdown + if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.Unavailable { + // Potentially wait a little before returning an error so we can detect + // the exit + select { + case <-pluginCtx.Done(): + err = structs.ErrPluginShutdown + case <-time.After(3 * time.Second): + // Its okay to wait a while since the connection isn't available and + // on local host it is likely shutting down. It is not expected for + // this to ever reach even close to 3 seconds. + } + + // It is an error we don't know how to handle, so return it + return err + } + + return err +} diff --git a/plugins/shared/util.go b/plugins/shared/hclutils/util.go similarity index 99% rename from plugins/shared/util.go rename to plugins/shared/hclutils/util.go index 9152915b4..86a8d2e6c 100644 --- a/plugins/shared/util.go +++ b/plugins/shared/hclutils/util.go @@ -1,4 +1,4 @@ -package shared +package hclutils import ( "bytes" diff --git a/plugins/shared/util_test.go b/plugins/shared/hclutils/util_test.go similarity index 99% rename from plugins/shared/util_test.go rename to plugins/shared/hclutils/util_test.go index 3dc2488ce..bfbb7c0a6 100644 --- a/plugins/shared/util_test.go +++ b/plugins/shared/hclutils/util_test.go @@ -1,4 +1,4 @@ -package shared +package hclutils import ( "testing" diff --git a/plugins/shared/loader/init.go b/plugins/shared/loader/init.go index 89f09198c..7af5c8e53 100644 --- a/plugins/shared/loader/init.go +++ b/plugins/shared/loader/init.go @@ -13,8 +13,8 @@ import ( hcl2 "github.com/hashicorp/hcl2/hcl" "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/plugins/base" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/zclconf/go-cty/cty/msgpack" ) @@ -22,7 +22,7 @@ var ( // configParseCtx is the context used to parse a plugin's configuration // stanza configParseCtx = &hcl2.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } ) @@ -467,7 +467,7 @@ func (l *PluginLoader) validePluginConfig(id PluginID, info *pluginInfo) error { } // Parse the config using the spec - val, diag := shared.ParseHclInterface(info.config, spec, configParseCtx) + val, diag := hclutils.ParseHclInterface(info.config, spec, configParseCtx) if diag.HasErrors() { multierror.Append(&mErr, diag.Errs()...) return multierror.Prefix(&mErr, "failed parsing config:") diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index ba3aba4bd..0e99efd6b 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -86,14 +86,19 @@ func TestServiceSched_JobRegister(t *testing.T) { } // Ensure different ports were used. - used := make(map[int]struct{}) + used := make(map[int]map[string]struct{}) for _, alloc := range out { for _, resource := range alloc.TaskResources { for _, port := range resource.Networks[0].DynamicPorts { - if _, ok := used[port.Value]; ok { - t.Fatalf("Port collision %v", port.Value) + nodeMap, ok := used[port.Value] + if !ok { + nodeMap = make(map[string]struct{}) + used[port.Value] = nodeMap } - used[port.Value] = struct{}{} + if _, ok := nodeMap[alloc.NodeID]; ok { + t.Fatalf("Port collision on node %q %v", alloc.NodeID, port.Value) + } + nodeMap[alloc.NodeID] = struct{}{} } } } diff --git a/terraform/aws/env/us-east/main.tf b/terraform/aws/env/us-east/main.tf index c33cc0060..0510dd3a5 100644 --- a/terraform/aws/env/us-east/main.tf +++ b/terraform/aws/env/us-east/main.tf @@ -29,7 +29,13 @@ variable "client_count" { variable "retry_join" { description = "Used by Consul to automatically form a cluster." - default = "provider=aws tag_key=ConsulAutoJoin tag_value=auto-join" + type = "map" + + default = { + provider = "aws" + tag_key = "ConsulAutoJoin" + tag_value = "auto-join" + } } variable "nomad_binary" { diff --git a/terraform/aws/modules/hashistack/hashistack.tf b/terraform/aws/modules/hashistack/hashistack.tf index a5d590881..66426723b 100644 --- a/terraform/aws/modules/hashistack/hashistack.tf +++ b/terraform/aws/modules/hashistack/hashistack.tf @@ -5,9 +5,18 @@ variable "instance_type" {} variable "key_name" {} variable "server_count" {} variable "client_count" {} -variable "retry_join" {} variable "nomad_binary" {} +variable "retry_join" { + type = "map" + + default = { + provider = "aws" + tag_key = "ConsulAutoJoin" + tag_value = "auto-join" + } +} + data "aws_vpc" "default" { default = true } @@ -92,7 +101,7 @@ data "template_file" "user_data_server" { vars { server_count = "${var.server_count}" region = "${var.region}" - retry_join = "${var.retry_join}" + retry_join = "${chomp(join(" ", formatlist("%s=%s", keys(var.retry_join), values(var.retry_join))))}" nomad_binary = "${var.nomad_binary}" } } @@ -101,8 +110,8 @@ data "template_file" "user_data_client" { template = "${file("${path.root}/user-data-client.sh")}" vars { - region = "${var.region}" - retry_join = "${var.retry_join}" + region = "${var.region}" + retry_join = "${chomp(join(" ", formatlist("%s=%s ", keys(var.retry_join), values(var.retry_join))))}" nomad_binary = "${var.nomad_binary}" } } @@ -114,11 +123,11 @@ resource "aws_instance" "server" { vpc_security_group_ids = ["${aws_security_group.primary.id}"] count = "${var.server_count}" - #Instance tags - tags { - Name = "${var.name}-server-${count.index}" - ConsulAutoJoin = "auto-join" - } + # instance tags + tags = "${merge( + map("Name", "${var.name}-server-${count.index}"), + map(lookup(var.retry_join, "tag_key"), lookup(var.retry_join, "tag_value")) + )}" user_data = "${data.template_file.user_data_server.rendered}" iam_instance_profile = "${aws_iam_instance_profile.instance_profile.name}" @@ -132,17 +141,17 @@ resource "aws_instance" "client" { count = "${var.client_count}" depends_on = ["aws_instance.server"] - #Instance tags - tags { - Name = "${var.name}-client-${count.index}" - ConsulAutoJoin = "auto-join" - } + # instance tags + tags = "${merge( + map("Name", "${var.name}-client-${count.index}"), + map(lookup(var.retry_join, "tag_key"), lookup(var.retry_join, "tag_value")) + )}" - ebs_block_device = { - device_name = "/dev/xvdd" - volume_type = "gp2" - volume_size = "50" - delete_on_termination = "true" + ebs_block_device = { + device_name = "/dev/xvdd" + volume_type = "gp2" + volume_size = "50" + delete_on_termination = "true" } user_data = "${data.template_file.user_data_client.rendered}"