Hook renames

This commit is contained in:
Alex Dadgar
2018-07-16 17:19:56 -07:00
committed by Michael Schurter
parent 05a0565040
commit 912d3613ec
10 changed files with 193 additions and 166 deletions

View File

@@ -94,8 +94,8 @@ func (ar *allocRunner) Run() {
var err error
var taskWaitCh <-chan struct{}
// Run the prerun hooks
// XXX Equivalent to TR.Prerun hook
// Run the prestart hooks
// XXX Equivalent to TR.Prestart hook
if err := ar.prerun(); err != nil {
ar.logger.Error("prerun failed", "error", err)
goto POST

View File

@@ -40,7 +40,7 @@ func (ar *allocRunner) prerun() error {
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running pre-run hook", "name", name, "start", start)
ar.logger.Trace("running prestart hook", "name", name, "start", start)
}
if err := pre.Prerun(); err != nil {

View File

@@ -8,10 +8,13 @@ import (
)
/*
pre-run post-run pre-stop post-stop
prestart poststart exited stop
| | | |
| | | |
--------> run ------> exited ----------> not restart ---------> garbage collect
|
|
kill -> exited -> stop
*/
@@ -20,7 +23,7 @@ type TaskHook interface {
Name() string
}
type TaskPrerunRequest struct {
type TaskPrestartRequest struct {
// HookData is previously set data by the hook
HookData map[string]string
@@ -37,7 +40,7 @@ type TaskPrerunRequest struct {
TaskEnv *env.TaskEnv
}
type TaskPrerunResponse struct {
type TaskPrestartResponse struct {
// Env is the environment variables to set for the task
Env map[string]string
@@ -49,39 +52,35 @@ type TaskPrerunResponse struct {
Done bool
}
type TaskPrerunHook interface {
type TaskPrestartHook interface {
TaskHook
Prerun(context.Context, *TaskPrerunRequest, *TaskPrerunResponse) error
Prestart(context.Context, *TaskPrestartRequest, *TaskPrestartResponse) error
}
// XXX If we want consul style hooks, need to have something that runs after the
// tasks starts
type TaskPostrunRequest struct {
type TaskPoststartRequest struct {
// Network info
}
type TaskPostrunResponse struct{}
type TaskPoststartResponse struct{}
type TaskPostrunHook interface {
type TaskPoststartHook interface {
TaskHook
Postrun() error
//Postrun(context.Context, *TaskPostrunRequest, *TaskPostrunResponse) error
Poststart(context.Context, *TaskPoststartRequest, *TaskPoststartResponse) error
}
type TaskPoststopRequest struct{}
type TaskPoststopResponse struct{}
type TaskKillRequest struct{}
type TaskKillResponse struct{}
type TaskPoststopHook interface {
type TaskKillHook interface {
TaskHook
Postrun(context.Context, *TaskPostrunRequest, *TaskPostrunResponse) error
Kill(context.Context, *TaskKillRequest, *TaskKillResponse) error
}
type TaskDestroyRequest struct{}
type TaskDestroyResponse struct{}
type TaskExitedRequest struct{}
type TaskExitedResponse struct{}
type TaskDestroyHook interface {
type TaskExitedHook interface {
TaskHook
Destroy() error
//Destroy(context.Context, *TaskDestroyRequest, *TaskDestroyResponse) error
Exited(context.Context, *TaskExitedRequest, *TaskExitedResponse) error
}
type TaskUpdateRequest struct {
@@ -93,3 +92,11 @@ type TaskUpdateHook interface {
TaskHook
Update(context.Context, *TaskUpdateRequest, *TaskUpdateResponse) error
}
type TaskStopRequest struct{}
type TaskStopResponse struct{}
type TaskStopHook interface {
TaskHook
Stop(context.Context, *TaskStopRequest, *TaskStopResponse) error
}

View File

@@ -0,0 +1,48 @@
package taskrunner
import (
"context"
"fmt"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/getter"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
ti "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
)
// artifactHook downloads artifacts for a task.
type artifactHook struct {
eventEmitter ti.EventEmitter
logger log.Logger
}
func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook {
h := &artifactHook{
eventEmitter: e,
}
h.logger = logger.Named(h.Name())
return h
}
func (*artifactHook) Name() string {
return "artifacts"
}
func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
h.eventEmitter.SetState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
for _, artifact := range req.Task.Artifacts {
//XXX add ctx to GetArtifact to allow cancelling long downloads
if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir); err != nil {
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
h.logger.Debug(wrapped.Error())
h.eventEmitter.SetState(structs.TaskStatePending,
structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped))
return wrapped
}
}
resp.Done = true
return nil
}

View File

@@ -38,9 +38,9 @@ func (s *LocalState) Copy() *LocalState {
}
type HookState struct {
// PrerunDone is true if the hook has run Prerun successfully and does
// Prestart is true if the hook has run Prestart successfully and does
// not need to run again
PrerunDone bool
PrestartDone bool
Data map[string]string
}
@@ -56,7 +56,7 @@ func (h *HookState) Equal(o *HookState) bool {
return h == o
}
if h.PrerunDone != o.PrerunDone {
if h.PrestartDone != o.PrestartDone {
return false
}

View File

@@ -0,0 +1,51 @@
package taskrunner
import (
"context"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
cconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
)
type taskDirHook struct {
runner *TaskRunner
logger log.Logger
}
func newTaskDirHook(runner *TaskRunner, logger log.Logger) *taskDirHook {
td := &taskDirHook{
runner: runner,
}
td.logger = logger.Named(td.Name())
return td
}
func (h *taskDirHook) Name() string {
return "task_dir"
}
func (h *taskDirHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
cc := h.runner.clientConfig
chroot := cconfig.DefaultChrootEnv
if len(cc.ChrootEnv) > 0 {
chroot = cc.ChrootEnv
}
// Emit the event that we are going to be building the task directory
h.runner.SetState("", structs.NewTaskEvent(structs.TaskSetup).SetMessage(structs.TaskBuildingTaskDir))
// Build the task directory structure
fsi := h.runner.driver.FSIsolation()
err := h.runner.taskDir.Build(false, chroot, fsi)
if err != nil {
return err
}
// Update the environment variables based on the built task directory
driver.SetEnvvars(h.runner.envBuilder, fsi, h.runner.taskDir, h.runner.clientConfig)
resp.Done = true
return nil
}

View File

@@ -229,9 +229,9 @@ func (tr *TaskRunner) Run() {
MAIN:
for {
// Run the prerun hooks
if err := tr.prerun(); err != nil {
tr.logger.Error("prerun failed", "error", err)
// Run the prestart hooks
if err := tr.prestart(); err != nil {
tr.logger.Error("prestart failed", "error", err)
tr.restartTracker.SetStartError(err)
goto RESTART
}
@@ -243,9 +243,9 @@ MAIN:
goto RESTART
}
// Run the postrun hooks
if err := tr.postrun(); err != nil {
tr.logger.Error("postrun failed", "error", err)
// Run the poststart hooks
if err := tr.poststart(); err != nil {
tr.logger.Error("poststart failed", "error", err)
}
WAIT:
@@ -284,9 +284,9 @@ MAIN:
timer.Stop()
}
// Run the shutdown hooks
if err := tr.shutdown(); err != nil {
tr.logger.Error("postrun failed", "error", err)
// Run the stop hooks
if err := tr.stop(); err != nil {
tr.logger.Error("stop failed", "error", err)
}
tr.logger.Debug("task run loop exiting")

View File

@@ -1,18 +1,11 @@
package taskrunner
import (
"context"
"fmt"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/getter"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
ti "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state"
cconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -54,37 +47,37 @@ func (tr *TaskRunner) initHooks() {
}
}
// prerun is used to run the runners prerun hooks.
func (tr *TaskRunner) prerun() error {
// prestart is used to run the runners prestart hooks.
func (tr *TaskRunner) prestart() error {
//XXX is this necessary? maybe we should have a generic cancelletion
// method instead of peeking into the alloc
// Determine if the allocation is terminaland we should avoid running
// pre-run hooks.
// prestart hooks.
alloc := tr.Alloc()
if alloc.TerminalStatus() {
tr.logger.Trace("skipping pre-run hooks since allocation is terminal")
tr.logger.Trace("skipping prestart hooks since allocation is terminal")
return nil
}
if tr.logger.IsTrace() {
start := time.Now()
tr.logger.Trace("running pre-run hooks", "start", start)
tr.logger.Trace("running prestart hooks", "start", start)
defer func() {
end := time.Now()
tr.logger.Trace("finished pre-run hooks", "end", end, "duration", end.Sub(start))
tr.logger.Trace("finished prestart hooks", "end", end, "duration", end.Sub(start))
}()
}
for _, hook := range tr.runnerHooks {
pre, ok := hook.(interfaces.TaskPrerunHook)
pre, ok := hook.(interfaces.TaskPrestartHook)
if !ok {
tr.logger.Trace("skipping non-prerun hook", "name", hook.Name())
tr.logger.Trace("skipping non-prestart hook", "name", hook.Name())
continue
}
name := pre.Name()
// Build the request
req := interfaces.TaskPrerunRequest{
req := interfaces.TaskPrestartRequest{
Task: tr.Task(),
TaskDir: tr.taskDir.Dir,
TaskEnv: tr.envBuilder.Build(),
@@ -93,24 +86,24 @@ func (tr *TaskRunner) prerun() error {
tr.localStateLock.RLock()
origHookState := tr.localState.Hooks[name]
tr.localStateLock.RUnlock()
if origHookState != nil && origHookState.PrerunDone {
tr.logger.Trace("skipping done prerun hook", "name", pre.Name())
if origHookState != nil && origHookState.PrestartDone {
tr.logger.Trace("skipping done prestart hook", "name", pre.Name())
continue
}
req.VaultToken = tr.getVaultToken()
// Time the prerun hook
// Time the prestart hook
var start time.Time
if tr.logger.IsTrace() {
start = time.Now()
tr.logger.Trace("running pre-run hook", "name", name, "start", start)
tr.logger.Trace("running prestart hook", "name", name, "start", start)
}
// Run the pre-run hook
var resp interfaces.TaskPrerunResponse
if err := pre.Prerun(tr.ctx, &req, &resp); err != nil {
return structs.WrapRecoverable(fmt.Sprintf("pre-run hook %q failed: %v", name, err), err)
// Run the prestart hook
var resp interfaces.TaskPrestartResponse
if err := pre.Prestart(tr.ctx, &req, &resp); err != nil {
return structs.WrapRecoverable(fmt.Sprintf("prestart hook %q failed: %v", name, err), err)
}
// Store the hook state
@@ -124,7 +117,7 @@ func (tr *TaskRunner) prerun() error {
if resp.HookData != nil {
hookState.Data = resp.HookData
hookState.PrerunDone = resp.Done
hookState.PrestartDone = resp.Done
}
tr.localStateLock.Unlock()
@@ -143,26 +136,26 @@ func (tr *TaskRunner) prerun() error {
if tr.logger.IsTrace() {
end := time.Now()
tr.logger.Trace("finished pre-run hooks", "name", name, "end", end, "duration", end.Sub(start))
tr.logger.Trace("finished prestart hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return nil
}
// postrun is used to run the runners postrun hooks.
func (tr *TaskRunner) postrun() error {
// poststart is used to run the runners poststart hooks.
func (tr *TaskRunner) poststart() error {
if tr.logger.IsTrace() {
start := time.Now()
tr.logger.Trace("running post-run hooks", "start", start)
tr.logger.Trace("running poststart hooks", "start", start)
defer func() {
end := time.Now()
tr.logger.Trace("finished post-run hooks", "end", end, "duration", end.Sub(start))
tr.logger.Trace("finished poststart hooks", "end", end, "duration", end.Sub(start))
}()
}
for _, hook := range tr.runnerHooks {
post, ok := hook.(interfaces.TaskPostrunHook)
post, ok := hook.(interfaces.TaskPoststartHook)
if !ok {
continue
}
@@ -171,36 +164,38 @@ func (tr *TaskRunner) postrun() error {
var start time.Time
if tr.logger.IsTrace() {
start = time.Now()
tr.logger.Trace("running post-run hook", "name", name, "start", start)
tr.logger.Trace("running poststart hook", "name", name, "start", start)
}
req := interfaces.TaskPoststartRequest{}
var resp interfaces.TaskPoststartResponse
// XXX We shouldn't exit on the first one
if err := post.Postrun(); err != nil {
return fmt.Errorf("post-run hook %q failed: %v", name, err)
if err := post.Poststart(tr.ctx, &req, &resp); err != nil {
return fmt.Errorf("poststart hook %q failed: %v", name, err)
}
if tr.logger.IsTrace() {
end := time.Now()
tr.logger.Trace("finished post-run hooks", "name", name, "end", end, "duration", end.Sub(start))
tr.logger.Trace("finished poststart hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return nil
}
// shutdown is used to run the shutdown hooks.
func (tr *TaskRunner) shutdown() error {
// stop is used to run the stop hooks.
func (tr *TaskRunner) stop() error {
if tr.logger.IsTrace() {
start := time.Now()
tr.logger.Trace("running poststop hooks", "start", start)
tr.logger.Trace("running stop hooks", "start", start)
defer func() {
end := time.Now()
tr.logger.Trace("finished poststop hooks", "end", end, "duration", end.Sub(start))
tr.logger.Trace("finished stop hooks", "end", end, "duration", end.Sub(start))
}()
}
for _, hook := range tr.runnerHooks {
post, ok := hook.(interfaces.TaskDestroyHook)
post, ok := hook.(interfaces.TaskStopHook)
if !ok {
continue
}
@@ -209,17 +204,19 @@ func (tr *TaskRunner) shutdown() error {
var start time.Time
if tr.logger.IsTrace() {
start = time.Now()
tr.logger.Trace("running destroy hook", "name", name, "start", start)
tr.logger.Trace("running stop hook", "name", name, "start", start)
}
req := interfaces.TaskStopRequest{}
var resp interfaces.TaskStopResponse
// XXX We shouldn't exit on the first one
if err := post.Destroy(); err != nil {
return fmt.Errorf("destroy hook %q failed: %v", name, err)
if err := post.Stop(tr.ctx, &req, &resp); err != nil {
return fmt.Errorf("stop hook %q failed: %v", name, err)
}
if tr.logger.IsTrace() {
end := time.Now()
tr.logger.Trace("finished destroy hooks", "name", name, "end", end, "duration", end.Sub(start))
tr.logger.Trace("finished stop hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
@@ -251,7 +248,7 @@ func (tr *TaskRunner) updateHooks() {
VaultToken: tr.getVaultToken(),
}
// Time the prerun hook
// Time the update hook
var start time.Time
if tr.logger.IsTrace() {
start = time.Now()
@@ -271,82 +268,6 @@ func (tr *TaskRunner) updateHooks() {
}
}
type taskDirHook struct {
runner *TaskRunner
logger log.Logger
}
func newTaskDirHook(runner *TaskRunner, logger log.Logger) *taskDirHook {
td := &taskDirHook{
runner: runner,
}
td.logger = logger.Named(td.Name())
return td
}
func (h *taskDirHook) Name() string {
return "task_dir"
}
func (h *taskDirHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunRequest, resp *interfaces.TaskPrerunResponse) error {
cc := h.runner.clientConfig
chroot := cconfig.DefaultChrootEnv
if len(cc.ChrootEnv) > 0 {
chroot = cc.ChrootEnv
}
// Emit the event that we are going to be building the task directory
h.runner.SetState("", structs.NewTaskEvent(structs.TaskSetup).SetMessage(structs.TaskBuildingTaskDir))
// Build the task directory structure
fsi := h.runner.driver.FSIsolation()
err := h.runner.taskDir.Build(false, chroot, fsi)
if err != nil {
return err
}
// Update the environment variables based on the built task directory
driver.SetEnvvars(h.runner.envBuilder, fsi, h.runner.taskDir, h.runner.clientConfig)
resp.Done = true
return nil
}
// artifactHook downloads artifacts for a task.
type artifactHook struct {
eventEmitter ti.EventEmitter
logger log.Logger
}
func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook {
h := &artifactHook{
eventEmitter: e,
}
h.logger = logger.Named(h.Name())
return h
}
func (*artifactHook) Name() string {
return "artifacts"
}
func (h *artifactHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunRequest, resp *interfaces.TaskPrerunResponse) error {
h.eventEmitter.SetState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
for _, artifact := range req.Task.Artifacts {
//XXX add ctx to GetArtifact to allow cancelling long downloads
if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir); err != nil {
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
h.logger.Debug(wrapped.Error())
h.eventEmitter.SetState(structs.TaskStatePending,
structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped))
return wrapped
}
}
resp.Done = true
return nil
}
/*
TR Hooks:
@@ -377,17 +298,17 @@ Implement: Prestart and Update (for new Vault token) and Destroy
Consul Service Reg:
Require: Task, interpolation/ENV
Return: error
Implement: Postrun, Update, Prestop
Implement: Poststart, Update, Kill, Exited
> @alex
Dispatch Payload:
Require: Alloc
Return error
Implement: Prerun
Implement: Prestart
> @schmichael
Artifacts:
Require: Folder structure, task, interpolation/ENV
Return: error
Implement: Prerun and Destroy
Implement: Prestart
*/

View File

@@ -63,7 +63,7 @@ func (*templateHook) Name() string {
return "template"
}
func (h *templateHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunRequest, resp *interfaces.TaskPrerunResponse) error {
func (h *templateHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
h.managerLock.Lock()
defer h.managerLock.Unlock()
@@ -111,7 +111,7 @@ func (h *templateHook) newManager() (unblock chan struct{}, err error) {
return unblock, nil
}
func (h *templateHook) Poststop(ctx context.Context, req *interfaces.TaskPoststopRequest, resp *interfaces.TaskPoststopResponse) error {
func (h *templateHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error {
h.managerLock.Lock()
defer h.managerLock.Unlock()

View File

@@ -121,9 +121,9 @@ func (*vaultHook) Name() string {
return "vault"
}
func (h *vaultHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunRequest, resp *interfaces.TaskPrerunResponse) error {
// If we have already run prerun before exit early. We do not use the
// PrerunDone value because we want to recover the token on restoration.
func (h *vaultHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
// If we have already run prestart before exit early. We do not use the
// PrestartDone value because we want to recover the token on restoration.
first := h.firstRun
h.firstRun = false
if !first {
@@ -160,7 +160,7 @@ func (h *vaultHook) Prerun(ctx context.Context, req *interfaces.TaskPrerunReques
return nil
}
func (h *vaultHook) Poststop(ctx context.Context, req *interfaces.TaskPoststopRequest, resp *interfaces.TaskPoststopResponse) error {
func (h *vaultHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error {
// Shutdown any created manager
h.cancel()
return nil