diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 2179dcf2d..d4941b43a 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -157,7 +157,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { StateDB: ar.stateDB, StateUpdater: ar, Consul: ar.consulClient, - VaultClient: ar.vaultClient, + Vault: ar.vaultClient, PluginSingletonLoader: ar.pluginSingletonLoader, } diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 48b248f4a..1eb9e7873 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -11,7 +11,6 @@ import ( consulapi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" - "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/shared/catalog" @@ -57,13 +56,12 @@ func (m *MockStateUpdater) Reset() { // testAllocRunnerConfig returns a new allocrunner.Config with mocks and noop // versions of dependencies along with a cleanup func. func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) { - logger := testlog.HCLogger(t) pluginLoader := catalog.TestPluginLoader(t) clientConf, cleanup := config.TestClientConfig(t) conf := &Config{ // Copy the alloc in case the caller edits and reuses it Alloc: alloc.Copy(), - Logger: logger, + Logger: clientConf.Logger, ClientConfig: clientConf, StateDB: state.NoopDB{}, Consul: consulapi.NewMockConsulServiceClient(t, logger), diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index 7a3146480..17353044e 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -61,7 +61,7 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error { func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error { // Cancel the task runner to break out of restart delay or the main run // loop. - tr.ctxCancel() + tr.killCtxCancel() // Grab the handle handle := tr.getDriverHandle() diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 8f29d7c00..5a45bdb71 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -78,12 +78,18 @@ type TaskRunner struct { // stateDB is for persisting localState and taskState stateDB cstate.StateDB - // ctx is the task runner's context representing the tasks's lifecycle. - // Canceling the context will cause the task to be destroyed. + // killCtx is the task runner's context representing the tasks's lifecycle. + // The context is canceled when the task is killed. + killCtx context.Context + + // killCtxCancel is called when killing a task. + killCtxCancel context.CancelFunc + + // ctx is used to exit the TaskRunner *without* affecting task state. ctx context.Context - // ctxCancel is used to exit the task runner's Run loop without - // stopping the task. Shutdown hooks are run. + // ctxCancel causes the TaskRunner to exit immediately without + // affecting task state. Useful for testing or graceful agent shutdown. ctxCancel context.CancelFunc // Logger is the logger for the task runner. @@ -168,8 +174,8 @@ type Config struct { TaskDir *allocdir.TaskDir Logger log.Logger - // VaultClient is the client to use to derive and renew Vault tokens - VaultClient vaultclient.VaultClient + // Vault is the client to use to derive and renew Vault tokens + Vault vaultclient.VaultClient // StateDB is used to store and restore state. StateDB cstate.StateDB @@ -183,9 +189,12 @@ type Config struct { } func NewTaskRunner(config *Config) (*TaskRunner, error) { - // Create a context for the runner + // Create a context for causing the runner to exit trCtx, trCancel := context.WithCancel(context.Background()) + // Create a context for killing the runner + killCtx, killCancel := context.WithCancel(context.Background()) + // Initialize the environment builder envBuilder := env.NewBuilder( config.ClientConfig.Node, @@ -210,11 +219,13 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { taskLeader: config.Task.Leader, envBuilder: envBuilder, consulClient: config.Consul, - vaultClient: config.VaultClient, + vaultClient: config.Vault, state: tstate, localState: state.NewLocalState(), stateDB: config.StateDB, stateUpdater: config.StateUpdater, + killCtx: killCtx, + killCtxCancel: killCancel, ctx: trCtx, ctxCancel: trCancel, triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), @@ -299,7 +310,16 @@ func (tr *TaskRunner) Run() { go tr.handleUpdates() MAIN: - for tr.ctx.Err() == nil { + for { + select { + case <-tr.killCtx.Done(): + break MAIN + case <-tr.ctx.Done(): + // TaskRunner was told to exit immediately + return + default: + } + // Run the prestart hooks if err := tr.prestart(); err != nil { tr.logger.Error("prestart failed", "error", err) @@ -307,8 +327,13 @@ MAIN: goto RESTART } - if tr.ctx.Err() != nil { + select { + case <-tr.killCtx.Done(): break MAIN + case <-tr.ctx.Done(): + // TaskRunner was told to exit immediately + return + default: } // Run the task @@ -327,12 +352,19 @@ MAIN: { handle := tr.getDriverHandle() - // Do *not* use tr.ctx here as it would cause Wait() to - // unblock before the task exits when Kill() is called. + // Do *not* use tr.killCtx here as it would cause + // Wait() to unblock before the task exits when Kill() + // is called. if resultCh, err := handle.WaitCh(context.Background()); err != nil { tr.logger.Error("wait task failed", "error", err) } else { - result = <-resultCh + select { + case result = <-resultCh: + // WaitCh returned a result + case <-tr.ctx.Done(): + // TaskRunner was told to exit immediately + return + } } } @@ -355,9 +387,12 @@ MAIN: // Actually restart by sleeping and also watching for destroy events select { case <-time.After(restartDelay): - case <-tr.ctx.Done(): + case <-tr.killCtx.Done(): tr.logger.Trace("task killed between restarts", "delay", restartDelay) break MAIN + case <-tr.ctx.Done(): + // TaskRunner was told to exit immediately + return } } @@ -444,17 +479,76 @@ func (tr *TaskRunner) runDriver() error { //TODO mounts and devices //XXX Evaluate and encode driver config - // Start the job - handle, net, err := tr.driver.StartTask(taskConfig) + var handle *drivers.TaskHandle + var net *cstructs.DriverNetwork + var err error + + // Check to see if a task handle was restored + tr.localStateLock.RLock() + handle = tr.localState.TaskHandle + net = tr.localState.DriverNetwork + tr.localStateLock.RUnlock() + + if handle != nil { + tr.logger.Trace("restored handle; recovering task", "task_id", handle.Config.ID) + if err := tr.driver.RecoverTask(handle); err != nil { + tr.logger.Error("error recovering task; destroying and restarting", + "error", err, "task_id", handle.Config.ID) + + // Clear invalid task state + tr.localStateLock.Lock() + tr.localState.TaskHandle = nil + tr.localState.DriverNetwork = nil + tr.localStateLock.Unlock() + + // Try to cleanup any existing task state in the plugin before restarting + if err := tr.driver.DestroyTask(handle.Config.ID, true); err != nil { + // Ignore ErrTaskNotFound errors as ideally + // this task has already been stopped and + // therefore doesn't exist. + if err != drivers.ErrTaskNotFound { + tr.logger.Warn("error destroying unrecoverable task", + "error", err, "task_id", handle.Config.ID) + } + + } + + goto START + } + + // Update driver handle on task runner + tr.setDriverHandle(NewDriverHandle(tr.driver, handle.Config.ID, tr.Task(), net)) + + // Ensure running state is persisted but do *not* append a new + // task event as restoring is a client event and not relevant + // to a task's lifecycle. + if err := tr.updateStateImpl(structs.TaskStateRunning); err != nil { + tr.logger.Warn("error persisting task state", "error", err) + } + return nil + } + +START: + // Start the job if there's no existing handle (or if RecoverTask failed) + handle, net, err = tr.driver.StartTask(taskConfig) if err != nil { return fmt.Errorf("driver start failed: %v", err) } tr.localStateLock.Lock() tr.localState.TaskHandle = handle + tr.localState.DriverNetwork = net + if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil { + //TODO Nomad will be unable to restore this task; try to kill + // it now and fail? In general we prefer to leave running + // tasks running even if the agent encounters an error. + tr.logger.Warn("error persisting local task state; may be unable to restore after a Nomad restart", + "error", err, "task_id", handle.Config.ID) + } tr.localStateLock.Unlock() tr.setDriverHandle(NewDriverHandle(tr.driver, taskConfig.ID, tr.Task(), net)) + // Emit an event that we started tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) return nil @@ -601,29 +695,33 @@ func (tr *TaskRunner) Restore() error { // UpdateState sets the task runners allocation state and triggers a server // update. func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { + tr.stateLock.Lock() + defer tr.stateLock.Unlock() + tr.logger.Trace("setting task state", "state", state, "event", event.Type) - // Update the local state - tr.setStateLocal(state, event) + // Append the event + tr.appendEvent(event) + + // Update the state + if err := tr.updateStateImpl(state); err != nil { + // Only log the error as we persistence errors should not + // affect task state. + tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state) + } // Notify the alloc runner of the transition tr.stateUpdater.TaskStateUpdated() } -// setStateLocal updates the local in-memory state, persists a copy to disk and returns a -// copy of the task's state. -func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) { - tr.stateLock.Lock() - defer tr.stateLock.Unlock() +// updateStateImpl updates the in-memory task state and persists to disk. +func (tr *TaskRunner) updateStateImpl(state string) error { // Update the task state oldState := tr.state.State taskState := tr.state taskState.State = state - // Append the event - tr.appendEvent(event) - // Handle the state transition. switch state { case structs.TaskStateRunning: @@ -662,11 +760,7 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) { } // Persist the state and event - if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, taskState); err != nil { - // Only a warning because the next event/state-transition will - // try to persist it again. - tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state) - } + return tr.stateDB.PutTaskState(tr.allocID, tr.taskName, taskState) } // EmitEvent appends a new TaskEvent to this task's TaskState. The actual diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 4eaecc81b..e64683107 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -123,7 +123,7 @@ func (tr *TaskRunner) prestart() error { // Run the prestart hook var resp interfaces.TaskPrestartResponse - if err := pre.Prestart(tr.ctx, &req, &resp); err != nil { + if err := pre.Prestart(tr.killCtx, &req, &resp); err != nil { return structs.WrapRecoverable(fmt.Sprintf("prestart hook %q failed: %v", name, err), err) } @@ -195,7 +195,7 @@ func (tr *TaskRunner) poststart() error { TaskEnv: tr.envBuilder.Build(), } var resp interfaces.TaskPoststartResponse - if err := post.Poststart(tr.ctx, &req, &resp); err != nil { + if err := post.Poststart(tr.killCtx, &req, &resp); err != nil { merr.Errors = append(merr.Errors, fmt.Errorf("poststart hook %q failed: %v", name, err)) } @@ -237,7 +237,7 @@ func (tr *TaskRunner) exited() error { req := interfaces.TaskExitedRequest{} var resp interfaces.TaskExitedResponse - if err := post.Exited(tr.ctx, &req, &resp); err != nil { + if err := post.Exited(tr.killCtx, &req, &resp); err != nil { merr.Errors = append(merr.Errors, fmt.Errorf("exited hook %q failed: %v", name, err)) } @@ -280,7 +280,7 @@ func (tr *TaskRunner) stop() error { req := interfaces.TaskStopRequest{} var resp interfaces.TaskStopResponse - if err := post.Stop(tr.ctx, &req, &resp); err != nil { + if err := post.Stop(tr.killCtx, &req, &resp); err != nil { merr.Errors = append(merr.Errors, fmt.Errorf("stop hook %q failed: %v", name, err)) } @@ -336,7 +336,7 @@ func (tr *TaskRunner) updateHooks() { // Run the update hook var resp interfaces.TaskUpdateResponse - if err := upd.Update(tr.ctx, &req, &resp); err != nil { + if err := upd.Update(tr.killCtx, &req, &resp); err != nil { tr.logger.Error("update hook failed", "name", name, "error", err) } diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go new file mode 100644 index 000000000..c55b7a74b --- /dev/null +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -0,0 +1,163 @@ +package taskrunner + +import ( + "context" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/config" + consulapi "github.com/hashicorp/nomad/client/consul" + cstate "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/shared/catalog" + "github.com/hashicorp/nomad/plugins/shared/singleton" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type MockTaskStateUpdater struct { + ch chan struct{} +} + +func NewMockTaskStateUpdater() *MockTaskStateUpdater { + return &MockTaskStateUpdater{ + ch: make(chan struct{}, 1), + } +} + +func (m *MockTaskStateUpdater) TaskStateUpdated() { + select { + case m.ch <- struct{}{}: + default: + } +} + +// testTaskRunnerConfig returns a taskrunner.Config for the given alloc+task +// plus a cleanup func. +func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName string) (*Config, func()) { + logger := testlog.HCLogger(t) + pluginLoader := catalog.TestPluginLoader(t) + clientConf, cleanup := config.TestClientConfig(t) + + // Find the task + var thisTask *structs.Task + for _, tg := range alloc.Job.TaskGroups { + for _, task := range tg.Tasks { + if task.Name == taskName { + if thisTask != nil { + cleanup() + t.Fatalf("multiple tasks named %q; cannot use this helper", taskName) + } + thisTask = task + } + } + } + if thisTask == nil { + cleanup() + t.Fatalf("could not find task %q", taskName) + } + + // Create the alloc dir + task dir + allocPath := filepath.Join(clientConf.AllocDir, alloc.ID) + allocDir := allocdir.NewAllocDir(logger, allocPath) + if err := allocDir.Build(); err != nil { + cleanup() + t.Fatalf("error building alloc dir: %v", err) + } + taskDir := allocDir.NewTaskDir(taskName) + + trCleanup := func() { + if err := allocDir.Destroy(); err != nil { + t.Logf("error destroying alloc dir: %v", err) + } + cleanup() + } + + conf := &Config{ + Alloc: alloc, + ClientConfig: clientConf, + Consul: consulapi.NewMockConsulServiceClient(t, logger), + Task: thisTask, + TaskDir: taskDir, + Logger: clientConf.Logger, + Vault: vaultclient.NewMockVaultClient(), + StateDB: cstate.NoopDB{}, + StateUpdater: NewMockTaskStateUpdater(), + PluginSingletonLoader: singleton.NewSingletonLoader(logger, pluginLoader), + } + return conf, trCleanup +} + +// TestTaskRunner_Restore asserts restoring a running task does not rerun the +// task. +func TestTaskRunner_Restore_Running(t *testing.T) { + t.Parallel() + require := require.New(t) + + alloc := mock.BatchAlloc() + alloc.Job.TaskGroups[0].Count = 1 + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Name = "testtask" + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": 2 * time.Second, + } + conf, cleanup := testTaskRunnerConfig(t, alloc, "testtask") + conf.StateDB = cstate.NewMemDB() // "persist" state between task runners + defer cleanup() + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(err) + go origTR.Run() + defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for it to be running + testutil.WaitForResult(func() (bool, error) { + ts := origTR.TaskState() + return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State) + }, func(err error) { + t.Fatalf("expected running; got: %v", err) + }) + + // Cause TR to exit without shutting down task + origTR.ctxCancel() + <-origTR.WaitCh() + + // Start a new TaskRunner and make sure it does not rerun the task + newTR, err := NewTaskRunner(conf) + require.NoError(err) + + // Do the Restore + require.NoError(newTR.Restore()) + + go newTR.Run() + defer newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for new task runner to exit when the process does + <-newTR.WaitCh() + + // Assert that the process was only started once, and only restored once + started := 0 + restored := 0 + state := newTR.TaskState() + require.Equal(structs.TaskStateDead, state.State) + for _, ev := range state.Events { + t.Logf("task event: %s %s", ev.Type, ev.Message) + switch ev.Type { + case structs.TaskStarted: + started++ + case structs.TaskRestored: + restored++ + } + } + assert.Equal(t, 1, started) + assert.Equal(t, 1, restored) +} diff --git a/client/config/testing.go b/client/config/testing.go index 8281938ff..73ab82d2d 100644 --- a/client/config/testing.go +++ b/client/config/testing.go @@ -6,6 +6,7 @@ import ( "path/filepath" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/go-testing-interface" ) @@ -14,6 +15,7 @@ import ( // a cleanup func to remove the state and alloc dirs when finished. func TestClientConfig(t testing.T) (*Config, func()) { conf := DefaultConfig() + conf.Logger = testlog.HCLogger(t) // Create a tempdir to hold state and alloc subdirs parent, err := ioutil.TempDir("", "nomadtest") diff --git a/drivers/mock/driver.go b/drivers/mock/driver.go index 04a0c91cb..734ae61f8 100644 --- a/drivers/mock/driver.go +++ b/drivers/mock/driver.go @@ -273,9 +273,23 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } } -func (d *Driver) RecoverTask(*drivers.TaskHandle) error { - //TODO is there anything to do here? - return nil +func (d *Driver) RecoverTask(h *drivers.TaskHandle) error { + if h == nil { + return fmt.Errorf("handle cannot be nil") + } + + if _, ok := d.tasks.Get(h.Config.ID); ok { + d.logger.Debug("nothing to recover; task already exists", + "task_id", h.Config.ID, + "task_name", h.Config.Name, + ) + return nil + } + + // Recovering a task requires the task to be running external to the + // plugin. Since the mock_driver runs all tasks in process it cannot + // recover tasks. + return fmt.Errorf("%s cannot recover tasks", pluginName) } func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) { diff --git a/drivers/qemu/driver.go b/drivers/qemu/driver.go index fde8db8e1..9c9a3a3fe 100644 --- a/drivers/qemu/driver.go +++ b/drivers/qemu/driver.go @@ -244,6 +244,15 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return fmt.Errorf("error: handle cannot be nil") } + // If already attached to handle there's nothing to recover. + if _, ok := d.tasks.Get(handle.Config.ID); ok { + d.logger.Trace("nothing to recover; task already exists", + "task_id", handle.Config.ID, + "task_name", handle.Config.Name, + ) + return nil + } + var taskState TaskState if err := handle.GetDriverState(&taskState); err != nil { d.logger.Error("failed to decode taskConfig state from handle", "error", err, "task_id", handle.Config.ID) diff --git a/drivers/rawexec/driver.go b/drivers/rawexec/driver.go index 6c75a79de..fefd3190d 100644 --- a/drivers/rawexec/driver.go +++ b/drivers/rawexec/driver.go @@ -242,9 +242,19 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { if handle == nil { - return fmt.Errorf("error: handle cannot be nil") + return fmt.Errorf("handle cannot be nil") } + // If already attached to handle there's nothing to recover. + if _, ok := d.tasks.Get(handle.Config.ID); ok { + d.logger.Trace("nothing to recover; task already exists", + "task_id", handle.Config.ID, + "task_name", handle.Config.Name, + ) + return nil + } + + // Handle doesn't already exist, try to reattach var taskState TaskState if err := handle.GetDriverState(&taskState); err != nil { d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID) @@ -261,6 +271,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { Reattach: plugRC, } + // Create client for reattached executor exec, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr) if err != nil { d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) diff --git a/drivers/rkt/driver.go b/drivers/rkt/driver.go index cd5919e06..e7f89c758 100644 --- a/drivers/rkt/driver.go +++ b/drivers/rkt/driver.go @@ -317,6 +317,15 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return fmt.Errorf("error: handle cannot be nil") } + // If already attached to handle there's nothing to recover. + if _, ok := d.tasks.Get(handle.Config.ID); ok { + d.logger.Trace("nothing to recover; task already exists", + "task_id", handle.Config.ID, + "task_name", handle.Config.Name, + ) + return nil + } + var taskState TaskState if err := handle.GetDriverState(&taskState); err != nil { d.logger.Error("failed to decode taskConfig state from handle", "error", err, "task_id", handle.Config.ID) diff --git a/plugins/drivers/driver.go b/plugins/drivers/driver.go index f37a10ff0..2bb7267c4 100644 --- a/plugins/drivers/driver.go +++ b/plugins/drivers/driver.go @@ -46,7 +46,7 @@ type DriverPlugin interface { // DriverPlugin interface. type DriverSignalTaskNotSupported struct{} -func (_ DriverSignalTaskNotSupported) SignalTask(taskID, signal string) error { +func (DriverSignalTaskNotSupported) SignalTask(taskID, signal string) error { return fmt.Errorf("SignalTask is not supported by this driver") } diff --git a/plugins/drivers/task_handle.go b/plugins/drivers/task_handle.go index 749c10314..ea538c577 100644 --- a/plugins/drivers/task_handle.go +++ b/plugins/drivers/task_handle.go @@ -11,7 +11,7 @@ type TaskHandle struct { Driver string Config *TaskConfig State TaskState - driverState []byte + DriverState []byte } func NewTaskHandle(driver string) *TaskHandle { @@ -19,12 +19,12 @@ func NewTaskHandle(driver string) *TaskHandle { } func (h *TaskHandle) SetDriverState(v interface{}) error { - h.driverState = []byte{} - return base.MsgPackEncode(&h.driverState, v) + h.DriverState = []byte{} + return base.MsgPackEncode(&h.DriverState, v) } func (h *TaskHandle) GetDriverState(v interface{}) error { - return base.MsgPackDecode(h.driverState, v) + return base.MsgPackDecode(h.DriverState, v) } @@ -34,7 +34,10 @@ func (h *TaskHandle) Copy() *TaskHandle { } handle := new(TaskHandle) - *handle = *h + handle.Driver = h.Driver handle.Config = h.Config.Copy() + handle.State = h.State + handle.DriverState = make([]byte, len(h.DriverState)) + copy(handle.DriverState, h.DriverState) return handle } diff --git a/plugins/drivers/utils.go b/plugins/drivers/utils.go index db3756187..4caf3da7f 100644 --- a/plugins/drivers/utils.go +++ b/plugins/drivers/utils.go @@ -194,7 +194,7 @@ func taskHandleFromProto(pb *proto.TaskHandle) *TaskHandle { return &TaskHandle{ Config: taskConfigFromProto(pb.Config), State: taskStateFromProtoMap[pb.State], - driverState: pb.DriverState, + DriverState: pb.DriverState, } } @@ -202,7 +202,7 @@ func taskHandleToProto(handle *TaskHandle) *proto.TaskHandle { return &proto.TaskHandle{ Config: taskConfigToProto(handle.Config), State: taskStateToProtoMap[handle.State], - DriverState: handle.driverState, + DriverState: handle.DriverState, } }