diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go index 92d930255..f7d2aeabf 100644 --- a/client/allocrunnerv2/alloc_runner.go +++ b/client/allocrunnerv2/alloc_runner.go @@ -94,8 +94,8 @@ func (ar *allocRunner) Run() { var err error var taskWaitCh <-chan struct{} - // Run the prerun hooks - // XXX Equivalent to TR.Prerun hook + // Run the prestart hooks + // XXX Equivalent to TR.Prestart hook if err := ar.prerun(); err != nil { ar.logger.Error("prerun failed", "error", err) goto POST diff --git a/client/allocrunnerv2/alloc_runner_hooks.go b/client/allocrunnerv2/alloc_runner_hooks.go index e39d3fbdc..a3cac16f0 100644 --- a/client/allocrunnerv2/alloc_runner_hooks.go +++ b/client/allocrunnerv2/alloc_runner_hooks.go @@ -40,7 +40,7 @@ func (ar *allocRunner) prerun() error { var start time.Time if ar.logger.IsTrace() { start = time.Now() - ar.logger.Trace("running pre-run hook", "name", name, "start", start) + ar.logger.Trace("running prestart hook", "name", name, "start", start) } if err := pre.Prerun(); err != nil { diff --git a/client/allocrunnerv2/interfaces/task_lifecycle.go b/client/allocrunnerv2/interfaces/task_lifecycle.go index ba81544be..c0a11ee71 100644 --- a/client/allocrunnerv2/interfaces/task_lifecycle.go +++ b/client/allocrunnerv2/interfaces/task_lifecycle.go @@ -8,10 +8,13 @@ import ( ) /* - pre-run post-run pre-stop post-stop + prestart poststart exited stop | | | | | | | | --------> run ------> exited ----------> not restart ---------> garbage collect + | + | + kill -> exited -> stop */ @@ -20,7 +23,7 @@ type TaskHook interface { Name() string } -type TaskPrerunRequest struct { +type TaskPrestartRequest struct { // HookData is previously set data by the hook HookData map[string]string @@ -37,7 +40,7 @@ type TaskPrerunRequest struct { TaskEnv *env.TaskEnv } -type TaskPrerunResponse struct { +type TaskPrestartResponse struct { // Env is the environment variables to set for the task Env map[string]string @@ -49,39 +52,35 @@ type TaskPrerunResponse struct { Done bool } -type TaskPrerunHook interface { +type TaskPrestartHook interface { TaskHook - Prerun(context.Context, *TaskPrerunRequest, *TaskPrerunResponse) error + Prestart(context.Context, *TaskPrestartRequest, *TaskPrestartResponse) error } -// XXX If we want consul style hooks, need to have something that runs after the -// tasks starts -type TaskPostrunRequest struct { +type TaskPoststartRequest struct { // Network info } -type TaskPostrunResponse struct{} +type TaskPoststartResponse struct{} -type TaskPostrunHook interface { +type TaskPoststartHook interface { TaskHook - Postrun() error - //Postrun(context.Context, *TaskPostrunRequest, *TaskPostrunResponse) error + Poststart(context.Context, *TaskPoststartRequest, *TaskPoststartResponse) error } -type TaskPoststopRequest struct{} -type TaskPoststopResponse struct{} +type TaskKillRequest struct{} +type TaskKillResponse struct{} -type TaskPoststopHook interface { +type TaskKillHook interface { TaskHook - Postrun(context.Context, *TaskPostrunRequest, *TaskPostrunResponse) error + Kill(context.Context, *TaskKillRequest, *TaskKillResponse) error } -type TaskDestroyRequest struct{} -type TaskDestroyResponse struct{} +type TaskExitedRequest struct{} +type TaskExitedResponse struct{} -type TaskDestroyHook interface { +type TaskExitedHook interface { TaskHook - Destroy() error - //Destroy(context.Context, *TaskDestroyRequest, *TaskDestroyResponse) error + Exited(context.Context, *TaskExitedRequest, *TaskExitedResponse) error } type TaskUpdateRequest struct { @@ -93,3 +92,11 @@ type TaskUpdateHook interface { TaskHook Update(context.Context, *TaskUpdateRequest, *TaskUpdateResponse) error } + +type TaskStopRequest struct{} +type TaskStopResponse struct{} + +type TaskStopHook interface { + TaskHook + Stop(context.Context, *TaskStopRequest, *TaskStopResponse) error +} diff --git a/client/allocrunnerv2/taskrunner/artifact_hook.go b/client/allocrunnerv2/taskrunner/artifact_hook.go new file mode 100644 index 000000000..cceea6544 --- /dev/null +++ b/client/allocrunnerv2/taskrunner/artifact_hook.go @@ -0,0 +1,48 @@ +package taskrunner + +import ( + "context" + "fmt" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/getter" + "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" + ti "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" + "github.com/hashicorp/nomad/nomad/structs" +) + +// artifactHook downloads artifacts for a task. +type artifactHook struct { + eventEmitter ti.EventEmitter + logger log.Logger +} + +func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook { + h := &artifactHook{ + eventEmitter: e, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (*artifactHook) Name() string { + return "artifacts" +} + +func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + h.eventEmitter.SetState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts)) + + for _, artifact := range req.Task.Artifacts { + //XXX add ctx to GetArtifact to allow cancelling long downloads + if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir); err != nil { + wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err) + h.logger.Debug(wrapped.Error()) + h.eventEmitter.SetState(structs.TaskStatePending, + structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped)) + return wrapped + } + } + + resp.Done = true + return nil +} diff --git a/client/allocrunnerv2/taskrunner/state/state.go b/client/allocrunnerv2/taskrunner/state/state.go index 8abb71906..287fa5a8b 100644 --- a/client/allocrunnerv2/taskrunner/state/state.go +++ b/client/allocrunnerv2/taskrunner/state/state.go @@ -38,9 +38,9 @@ func (s *LocalState) Copy() *LocalState { } type HookState struct { - // PrerunDone is true if the hook has run Prerun successfully and does + // Prestart is true if the hook has run Prestart successfully and does // not need to run again - PrerunDone bool + PrestartDone bool Data map[string]string } @@ -56,7 +56,7 @@ func (h *HookState) Equal(o *HookState) bool { return h == o } - if h.PrerunDone != o.PrerunDone { + if h.PrestartDone != o.PrestartDone { return false } diff --git a/client/allocrunnerv2/taskrunner/task_dir_hook.go b/client/allocrunnerv2/taskrunner/task_dir_hook.go new file mode 100644 index 000000000..4d2840227 --- /dev/null +++ b/client/allocrunnerv2/taskrunner/task_dir_hook.go @@ -0,0 +1,51 @@ +package taskrunner + +import ( + "context" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" + cconfig "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver" + "github.com/hashicorp/nomad/nomad/structs" +) + +type taskDirHook struct { + runner *TaskRunner + logger log.Logger +} + +func newTaskDirHook(runner *TaskRunner, logger log.Logger) *taskDirHook { + td := &taskDirHook{ + runner: runner, + } + td.logger = logger.Named(td.Name()) + return td +} + +func (h *taskDirHook) Name() string { + return "task_dir" +} + +func (h *taskDirHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + cc := h.runner.clientConfig + chroot := cconfig.DefaultChrootEnv + if len(cc.ChrootEnv) > 0 { + chroot = cc.ChrootEnv + } + + // Emit the event that we are going to be building the task directory + h.runner.SetState("", structs.NewTaskEvent(structs.TaskSetup).SetMessage(structs.TaskBuildingTaskDir)) + + // Build the task directory structure + fsi := h.runner.driver.FSIsolation() + err := h.runner.taskDir.Build(false, chroot, fsi) + if err != nil { + return err + } + + // Update the environment variables based on the built task directory + driver.SetEnvvars(h.runner.envBuilder, fsi, h.runner.taskDir, h.runner.clientConfig) + resp.Done = true + return nil +} diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index f54d5d6bd..a6c57489e 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -229,9 +229,9 @@ func (tr *TaskRunner) Run() { MAIN: for { - // Run the prerun hooks - if err := tr.prerun(); err != nil { - tr.logger.Error("prerun failed", "error", err) + // Run the prestart hooks + if err := tr.prestart(); err != nil { + tr.logger.Error("prestart failed", "error", err) tr.restartTracker.SetStartError(err) goto RESTART } @@ -243,9 +243,9 @@ MAIN: goto RESTART } - // Run the postrun hooks - if err := tr.postrun(); err != nil { - tr.logger.Error("postrun failed", "error", err) + // Run the poststart hooks + if err := tr.poststart(); err != nil { + tr.logger.Error("poststart failed", "error", err) } WAIT: @@ -284,9 +284,9 @@ MAIN: timer.Stop() } - // Run the shutdown hooks - if err := tr.shutdown(); err != nil { - tr.logger.Error("postrun failed", "error", err) + // Run the stop hooks + if err := tr.stop(); err != nil { + tr.logger.Error("stop failed", "error", err) } tr.logger.Debug("task run loop exiting") diff --git a/client/allocrunnerv2/taskrunner/task_runner_hooks.go b/client/allocrunnerv2/taskrunner/task_runner_hooks.go index 5205754a1..415fb7f94 100644 --- a/client/allocrunnerv2/taskrunner/task_runner_hooks.go +++ b/client/allocrunnerv2/taskrunner/task_runner_hooks.go @@ -1,18 +1,11 @@ package taskrunner import ( - "context" "fmt" "time" - log "github.com/hashicorp/go-hclog" - - "github.com/hashicorp/nomad/client/allocrunner/getter" "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" - ti "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state" - cconfig "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" ) @@ -54,37 +47,37 @@ func (tr *TaskRunner) initHooks() { } } -// prerun is used to run the runners prerun hooks. -func (tr *TaskRunner) prerun() error { +// prestart is used to run the runners prestart hooks. +func (tr *TaskRunner) prestart() error { //XXX is this necessary? maybe we should have a generic cancelletion // method instead of peeking into the alloc // Determine if the allocation is terminaland we should avoid running - // pre-run hooks. + // prestart hooks. alloc := tr.Alloc() if alloc.TerminalStatus() { - tr.logger.Trace("skipping pre-run hooks since allocation is terminal") + tr.logger.Trace("skipping prestart hooks since allocation is terminal") return nil } if tr.logger.IsTrace() { start := time.Now() - tr.logger.Trace("running pre-run hooks", "start", start) + tr.logger.Trace("running prestart hooks", "start", start) defer func() { end := time.Now() - tr.logger.Trace("finished pre-run hooks", "end", end, "duration", end.Sub(start)) + tr.logger.Trace("finished prestart hooks", "end", end, "duration", end.Sub(start)) }() } for _, hook := range tr.runnerHooks { - pre, ok := hook.(interfaces.TaskPrerunHook) + pre, ok := hook.(interfaces.TaskPrestartHook) if !ok { - tr.logger.Trace("skipping non-prerun hook", "name", hook.Name()) + tr.logger.Trace("skipping non-prestart hook", "name", hook.Name()) continue } name := pre.Name() // Build the request - req := interfaces.TaskPrerunRequest{ + req := interfaces.TaskPrestartRequest{ Task: tr.Task(), TaskDir: tr.taskDir.Dir, TaskEnv: tr.envBuilder.Build(), @@ -93,24 +86,24 @@ func (tr *TaskRunner) prerun() error { tr.localStateLock.RLock() origHookState := tr.localState.Hooks[name] tr.localStateLock.RUnlock() - if origHookState != nil && origHookState.PrerunDone { - tr.logger.Trace("skipping done prerun hook", "name", pre.Name()) + if origHookState != nil && origHookState.PrestartDone { + tr.logger.Trace("skipping done prestart hook", "name", pre.Name()) continue } req.VaultToken = tr.getVaultToken() - // Time the prerun hook + // Time the prestart hook var start time.Time if tr.logger.IsTrace() { start = time.Now() - tr.logger.Trace("running pre-run hook", "name", name, "start", start) + tr.logger.Trace("running prestart hook", "name", name, "start", start) } - // Run the pre-run hook - var resp interfaces.TaskPrerunResponse - if err := pre.Prerun(tr.ctx, &req, &resp); err != nil { - return structs.WrapRecoverable(fmt.Sprintf("pre-run hook %q failed: %v", name, err), err) + // Run the prestart hook + var resp interfaces.TaskPrestartResponse + if err := pre.Prestart(tr.ctx, &req, &resp); err != nil { + return structs.WrapRecoverable(fmt.Sprintf("prestart hook %q failed: %v", name, err), err) } // Store the hook state @@ -124,7 +117,7 @@ func (tr *TaskRunner) prerun() error { if resp.HookData != nil { hookState.Data = resp.HookData - hookState.PrerunDone = resp.Done + hookState.PrestartDone = resp.Done } tr.localStateLock.Unlock() @@ -143,26 +136,26 @@ func (tr *TaskRunner) prerun() error { if tr.logger.IsTrace() { end := time.Now() - tr.logger.Trace("finished pre-run hooks", "name", name, "end", end, "duration", end.Sub(start)) + tr.logger.Trace("finished prestart hooks", "name", name, "end", end, "duration", end.Sub(start)) } } return nil } -// postrun is used to run the runners postrun hooks. -func (tr *TaskRunner) postrun() error { +// poststart is used to run the runners poststart hooks. +func (tr *TaskRunner) poststart() error { if tr.logger.IsTrace() { start := time.Now() - tr.logger.Trace("running post-run hooks", "start", start) + tr.logger.Trace("running poststart hooks", "start", start) defer func() { end := time.Now() - tr.logger.Trace("finished post-run hooks", "end", end, "duration", end.Sub(start)) + tr.logger.Trace("finished poststart hooks", "end", end, "duration", end.Sub(start)) }() } for _, hook := range tr.runnerHooks { - post, ok := hook.(interfaces.TaskPostrunHook) + post, ok := hook.(interfaces.TaskPoststartHook) if !ok { continue } @@ -171,36 +164,38 @@ func (tr *TaskRunner) postrun() error { var start time.Time if tr.logger.IsTrace() { start = time.Now() - tr.logger.Trace("running post-run hook", "name", name, "start", start) + tr.logger.Trace("running poststart hook", "name", name, "start", start) } + req := interfaces.TaskPoststartRequest{} + var resp interfaces.TaskPoststartResponse // XXX We shouldn't exit on the first one - if err := post.Postrun(); err != nil { - return fmt.Errorf("post-run hook %q failed: %v", name, err) + if err := post.Poststart(tr.ctx, &req, &resp); err != nil { + return fmt.Errorf("poststart hook %q failed: %v", name, err) } if tr.logger.IsTrace() { end := time.Now() - tr.logger.Trace("finished post-run hooks", "name", name, "end", end, "duration", end.Sub(start)) + tr.logger.Trace("finished poststart hooks", "name", name, "end", end, "duration", end.Sub(start)) } } return nil } -// shutdown is used to run the shutdown hooks. -func (tr *TaskRunner) shutdown() error { +// stop is used to run the stop hooks. +func (tr *TaskRunner) stop() error { if tr.logger.IsTrace() { start := time.Now() - tr.logger.Trace("running poststop hooks", "start", start) + tr.logger.Trace("running stop hooks", "start", start) defer func() { end := time.Now() - tr.logger.Trace("finished poststop hooks", "end", end, "duration", end.Sub(start)) + tr.logger.Trace("finished stop hooks", "end", end, "duration", end.Sub(start)) }() } for _, hook := range tr.runnerHooks { - post, ok := hook.(interfaces.TaskDestroyHook) + post, ok := hook.(interfaces.TaskStopHook) if !ok { continue } @@ -209,17 +204,19 @@ func (tr *TaskRunner) shutdown() error { var start time.Time if tr.logger.IsTrace() { start = time.Now() - tr.logger.Trace("running destroy hook", "name", name, "start", start) + tr.logger.Trace("running stop hook", "name", name, "start", start) } + req := interfaces.TaskStopRequest{} + var resp interfaces.TaskStopResponse // XXX We shouldn't exit on the first one - if err := post.Destroy(); err != nil { - return fmt.Errorf("destroy hook %q failed: %v", name, err) + if err := post.Stop(tr.ctx, &req, &resp); err != nil { + return fmt.Errorf("stop hook %q failed: %v", name, err) } if tr.logger.IsTrace() { end := time.Now() - tr.logger.Trace("finished destroy hooks", "name", name, "end", end, "duration", end.Sub(start)) + tr.logger.Trace("finished stop hooks", "name", name, "end", end, "duration", end.Sub(start)) } } @@ -251,7 +248,7 @@ func (tr *TaskRunner) updateHooks() { VaultToken: tr.getVaultToken(), } - // Time the prerun hook + // Time the update hook var start time.Time if tr.logger.IsTrace() { start = time.Now() @@ -271,82 +268,6 @@ func (tr *TaskRunner) updateHooks() { } } -type taskDirHook struct { - runner *TaskRunner - logger log.Logger -} - -func newTaskDirHook(runner *TaskRunner, logger log.Logger) *taskDirHook { - td := &taskDirHook{ - runner: runner, - } - td.logger = logger.Named(td.Name()) - return td -} - -func (h *taskDirHook) Name() string { - return "task_dir" -} - -func (h *taskDirHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunRequest, resp *interfaces.TaskPrerunResponse) error { - cc := h.runner.clientConfig - chroot := cconfig.DefaultChrootEnv - if len(cc.ChrootEnv) > 0 { - chroot = cc.ChrootEnv - } - - // Emit the event that we are going to be building the task directory - h.runner.SetState("", structs.NewTaskEvent(structs.TaskSetup).SetMessage(structs.TaskBuildingTaskDir)) - - // Build the task directory structure - fsi := h.runner.driver.FSIsolation() - err := h.runner.taskDir.Build(false, chroot, fsi) - if err != nil { - return err - } - - // Update the environment variables based on the built task directory - driver.SetEnvvars(h.runner.envBuilder, fsi, h.runner.taskDir, h.runner.clientConfig) - resp.Done = true - return nil -} - -// artifactHook downloads artifacts for a task. -type artifactHook struct { - eventEmitter ti.EventEmitter - logger log.Logger -} - -func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook { - h := &artifactHook{ - eventEmitter: e, - } - h.logger = logger.Named(h.Name()) - return h -} - -func (*artifactHook) Name() string { - return "artifacts" -} - -func (h *artifactHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunRequest, resp *interfaces.TaskPrerunResponse) error { - h.eventEmitter.SetState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts)) - - for _, artifact := range req.Task.Artifacts { - //XXX add ctx to GetArtifact to allow cancelling long downloads - if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir); err != nil { - wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err) - h.logger.Debug(wrapped.Error()) - h.eventEmitter.SetState(structs.TaskStatePending, - structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped)) - return wrapped - } - } - - resp.Done = true - return nil -} - /* TR Hooks: @@ -377,17 +298,17 @@ Implement: Prestart and Update (for new Vault token) and Destroy Consul Service Reg: Require: Task, interpolation/ENV Return: error -Implement: Postrun, Update, Prestop +Implement: Poststart, Update, Kill, Exited > @alex Dispatch Payload: Require: Alloc Return error -Implement: Prerun +Implement: Prestart > @schmichael Artifacts: Require: Folder structure, task, interpolation/ENV Return: error -Implement: Prerun and Destroy +Implement: Prestart */ diff --git a/client/allocrunnerv2/taskrunner/template_hook.go b/client/allocrunnerv2/taskrunner/template_hook.go index bd5ae732b..56957aa6b 100644 --- a/client/allocrunnerv2/taskrunner/template_hook.go +++ b/client/allocrunnerv2/taskrunner/template_hook.go @@ -63,7 +63,7 @@ func (*templateHook) Name() string { return "template" } -func (h *templateHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunRequest, resp *interfaces.TaskPrerunResponse) error { +func (h *templateHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { h.managerLock.Lock() defer h.managerLock.Unlock() @@ -111,7 +111,7 @@ func (h *templateHook) newManager() (unblock chan struct{}, err error) { return unblock, nil } -func (h *templateHook) Poststop(ctx context.Context, req *interfaces.TaskPoststopRequest, resp *interfaces.TaskPoststopResponse) error { +func (h *templateHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error { h.managerLock.Lock() defer h.managerLock.Unlock() diff --git a/client/allocrunnerv2/taskrunner/vault_hook.go b/client/allocrunnerv2/taskrunner/vault_hook.go index be0702ab4..4fa0b6896 100644 --- a/client/allocrunnerv2/taskrunner/vault_hook.go +++ b/client/allocrunnerv2/taskrunner/vault_hook.go @@ -121,9 +121,9 @@ func (*vaultHook) Name() string { return "vault" } -func (h *vaultHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunRequest, resp *interfaces.TaskPrerunResponse) error { - // If we have already run prerun before exit early. We do not use the - // PrerunDone value because we want to recover the token on restoration. +func (h *vaultHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + // If we have already run prestart before exit early. We do not use the + // PrestartDone value because we want to recover the token on restoration. first := h.firstRun h.firstRun = false if !first { @@ -160,7 +160,7 @@ func (h *vaultHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunReques return nil } -func (h *vaultHook) Poststop(ctx context.Context, req *interfaces.TaskPoststopRequest, resp *interfaces.TaskPoststopResponse) error { +func (h *vaultHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error { // Shutdown any created manager h.cancel() return nil