mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
Merge pull request #1796 from hashicorp/f-task-runner
Task runner integrates with TaskTemplateManager
This commit is contained in:
51
api/tasks.go
51
api/tasks.go
@@ -167,13 +167,13 @@ type TaskArtifact struct {
|
||||
}
|
||||
|
||||
type Template struct {
|
||||
SourcePath string
|
||||
DestPath string
|
||||
EmbededTmpl string
|
||||
ChangeMode string
|
||||
RestartSignal string
|
||||
Splay time.Duration
|
||||
Once bool
|
||||
SourcePath string
|
||||
DestPath string
|
||||
EmbeddedTmpl string
|
||||
ChangeMode string
|
||||
ChangeSignal string
|
||||
Splay time.Duration
|
||||
Once bool
|
||||
}
|
||||
|
||||
type Vault struct {
|
||||
@@ -248,25 +248,30 @@ const (
|
||||
TaskDiskExceeded = "Disk Exceeded"
|
||||
TaskVaultRenewalFailed = "Vault token renewal failed"
|
||||
TaskSiblingFailed = "Sibling task failed"
|
||||
TaskSignaling = "Signaling"
|
||||
TaskRestartSignal = "Restart Signaled"
|
||||
)
|
||||
|
||||
// TaskEvent is an event that effects the state of a task and contains meta-data
|
||||
// appropriate to the events type.
|
||||
type TaskEvent struct {
|
||||
Type string
|
||||
Time int64
|
||||
RestartReason string
|
||||
DriverError string
|
||||
ExitCode int
|
||||
Signal int
|
||||
Message string
|
||||
KillTimeout time.Duration
|
||||
KillError string
|
||||
StartDelay int64
|
||||
DownloadError string
|
||||
ValidationError string
|
||||
DiskLimit int64
|
||||
DiskSize int64
|
||||
FailedSibling string
|
||||
VaultError string
|
||||
Type string
|
||||
Time int64
|
||||
RestartReason string
|
||||
DriverError string
|
||||
ExitCode int
|
||||
Signal int
|
||||
Message string
|
||||
KillReason string
|
||||
KillTimeout time.Duration
|
||||
KillError string
|
||||
StartDelay int64
|
||||
DownloadError string
|
||||
ValidationError string
|
||||
DiskLimit int64
|
||||
DiskSize int64
|
||||
FailedSibling string
|
||||
VaultError string
|
||||
TaskSignalReason string
|
||||
TaskSignal string
|
||||
}
|
||||
|
||||
@@ -26,17 +26,17 @@ var (
|
||||
// TaskHooks is an interface which provides hooks into the tasks life-cycle
|
||||
type TaskHooks interface {
|
||||
// Restart is used to restart the task
|
||||
Restart()
|
||||
Restart(source, reason string)
|
||||
|
||||
// Signal is used to signal the task
|
||||
Signal(os.Signal)
|
||||
Signal(source, reason string, s os.Signal)
|
||||
|
||||
// UnblockStart is used to unblock the starting of the task. This should be
|
||||
// called after prestart work is completed
|
||||
UnblockStart()
|
||||
UnblockStart(source string)
|
||||
|
||||
// Kill is used to kill the task because of the passed error.
|
||||
Kill(error)
|
||||
Kill(source, reason string)
|
||||
}
|
||||
|
||||
// TaskTemplateManager is used to run a set of templates for a given task
|
||||
@@ -141,6 +141,11 @@ func (tm *TaskTemplateManager) Stop() {
|
||||
// run is the long lived loop that handles errors and templates being rendered
|
||||
func (tm *TaskTemplateManager) run() {
|
||||
if tm.runner == nil {
|
||||
// Unblock the start if there is nothing to do
|
||||
if !tm.allRendered {
|
||||
tm.hook.UnblockStart("consul-template")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -164,7 +169,7 @@ func (tm *TaskTemplateManager) run() {
|
||||
continue
|
||||
}
|
||||
|
||||
tm.hook.Kill(err)
|
||||
tm.hook.Kill("consul-template", err.Error())
|
||||
case <-tm.runner.TemplateRenderedCh():
|
||||
// A template has been rendered, figure out what to do
|
||||
events := tm.runner.RenderEvents()
|
||||
@@ -186,7 +191,7 @@ func (tm *TaskTemplateManager) run() {
|
||||
}
|
||||
|
||||
allRenderedTime = time.Now()
|
||||
tm.hook.UnblockStart()
|
||||
tm.hook.UnblockStart("consul-template")
|
||||
}
|
||||
|
||||
// If all our templates are change mode no-op, then we can exit here
|
||||
@@ -207,7 +212,7 @@ func (tm *TaskTemplateManager) run() {
|
||||
continue
|
||||
}
|
||||
|
||||
tm.hook.Kill(err)
|
||||
tm.hook.Kill("consul-template", err.Error())
|
||||
case <-tm.runner.TemplateRenderedCh():
|
||||
// A template has been rendered, figure out what to do
|
||||
var handling []string
|
||||
@@ -232,7 +237,7 @@ func (tm *TaskTemplateManager) run() {
|
||||
// Lookup the template and determine what to do
|
||||
tmpls, ok := tm.lookup[id]
|
||||
if !ok {
|
||||
tm.hook.Kill(fmt.Errorf("consul-template runner returned unknown template id %q", id))
|
||||
tm.hook.Kill("consul-template", fmt.Sprintf("consul-template runner returned unknown template id %q", id))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -270,10 +275,10 @@ func (tm *TaskTemplateManager) run() {
|
||||
}
|
||||
|
||||
if restart {
|
||||
tm.hook.Restart()
|
||||
tm.hook.Restart("consul-template", "template with change_mode restart re-rendered")
|
||||
} else if len(signals) != 0 {
|
||||
for signal := range signals {
|
||||
tm.hook.Signal(tm.signals[signal])
|
||||
tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ type MockTaskHooks struct {
|
||||
UnblockCh chan struct{}
|
||||
Unblocked bool
|
||||
|
||||
KillError error
|
||||
KillReason string
|
||||
}
|
||||
|
||||
func NewMockTaskHooks() *MockTaskHooks {
|
||||
@@ -39,7 +39,7 @@ func NewMockTaskHooks() *MockTaskHooks {
|
||||
SignalCh: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
func (m *MockTaskHooks) Restart() {
|
||||
func (m *MockTaskHooks) Restart(source, reason string) {
|
||||
m.Restarts++
|
||||
select {
|
||||
case m.RestartCh <- struct{}{}:
|
||||
@@ -47,7 +47,7 @@ func (m *MockTaskHooks) Restart() {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockTaskHooks) Signal(s os.Signal) {
|
||||
func (m *MockTaskHooks) Signal(source, reason string, s os.Signal) {
|
||||
m.Signals = append(m.Signals, s)
|
||||
select {
|
||||
case m.SignalCh <- struct{}{}:
|
||||
@@ -55,8 +55,8 @@ func (m *MockTaskHooks) Signal(s os.Signal) {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockTaskHooks) Kill(e error) { m.KillError = e }
|
||||
func (m *MockTaskHooks) UnblockStart() {
|
||||
func (m *MockTaskHooks) Kill(source, reason string) { m.KillReason = reason }
|
||||
func (m *MockTaskHooks) UnblockStart(source string) {
|
||||
if !m.Unblocked {
|
||||
close(m.UnblockCh)
|
||||
}
|
||||
|
||||
@@ -34,15 +34,16 @@ func newRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTr
|
||||
}
|
||||
|
||||
type RestartTracker struct {
|
||||
waitRes *cstructs.WaitResult
|
||||
startErr error
|
||||
count int // Current number of attempts.
|
||||
onSuccess bool // Whether to restart on successful exit code.
|
||||
startTime time.Time // When the interval began
|
||||
reason string // The reason for the last state
|
||||
policy *structs.RestartPolicy
|
||||
rand *rand.Rand
|
||||
lock sync.Mutex
|
||||
waitRes *cstructs.WaitResult
|
||||
startErr error
|
||||
restartTriggered bool // Whether the task has been signalled to be restarted
|
||||
count int // Current number of attempts.
|
||||
onSuccess bool // Whether to restart on successful exit code.
|
||||
startTime time.Time // When the interval began
|
||||
reason string // The reason for the last state
|
||||
policy *structs.RestartPolicy
|
||||
rand *rand.Rand
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// SetPolicy updates the policy used to determine restarts.
|
||||
@@ -69,6 +70,15 @@ func (r *RestartTracker) SetWaitResult(res *cstructs.WaitResult) *RestartTracker
|
||||
return r
|
||||
}
|
||||
|
||||
// SetRestartTriggered is used to mark that the task has been signalled to be
|
||||
// restarted
|
||||
func (r *RestartTracker) SetRestartTriggered() *RestartTracker {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.restartTriggered = true
|
||||
return r
|
||||
}
|
||||
|
||||
// GetReason returns a human-readable description for the last state returned by
|
||||
// GetState.
|
||||
func (r *RestartTracker) GetReason() string {
|
||||
@@ -111,13 +121,25 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
|
||||
r.startTime = now
|
||||
}
|
||||
|
||||
var state string
|
||||
var dur time.Duration
|
||||
if r.startErr != nil {
|
||||
return r.handleStartError()
|
||||
state, dur = r.handleStartError()
|
||||
} else if r.waitRes != nil {
|
||||
return r.handleWaitResult()
|
||||
state, dur = r.handleWaitResult()
|
||||
} else if r.restartTriggered {
|
||||
state, dur = structs.TaskRestarting, 0
|
||||
r.reason = ""
|
||||
} else {
|
||||
return "", 0
|
||||
state, dur = "", 0
|
||||
}
|
||||
|
||||
// Clear out the existing state
|
||||
r.startErr = nil
|
||||
r.waitRes = nil
|
||||
r.restartTriggered = false
|
||||
|
||||
return state, dur
|
||||
}
|
||||
|
||||
// handleStartError returns the new state and potential wait duration for
|
||||
|
||||
@@ -94,6 +94,16 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_RestartTracker_RestartTriggered(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeFail)
|
||||
p.Attempts = 0
|
||||
rt := newRestartTracker(p, structs.JobTypeService)
|
||||
if state, when := rt.SetRestartTriggered().GetState(); state != structs.TaskRestarting && when != 0 {
|
||||
t.Fatalf("expect restart immediately, got %v %v", state, when)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeFail)
|
||||
|
||||
@@ -69,6 +69,28 @@ type TaskRunner struct {
|
||||
vaultToken string
|
||||
vaultRenewalCh <-chan error
|
||||
|
||||
// templateManager is used to manage any consul-templates this task may have
|
||||
templateManager *TaskTemplateManager
|
||||
|
||||
// templatesRendered mark whether the templates have been rendered
|
||||
templatesRendered bool
|
||||
|
||||
// unblockCh is used to unblock the starting of the task
|
||||
unblockCh chan struct{}
|
||||
unblocked bool
|
||||
unblockLock sync.Mutex
|
||||
|
||||
// restartCh is used to restart a task
|
||||
restartCh chan *structs.TaskEvent
|
||||
|
||||
// signalCh is used to send a signal to a task
|
||||
signalCh chan SignalEvent
|
||||
|
||||
// killCh is used to kill a task
|
||||
killCh chan *structs.TaskEvent
|
||||
killed bool
|
||||
killLock sync.Mutex
|
||||
|
||||
destroy bool
|
||||
destroyCh chan struct{}
|
||||
destroyLock sync.Mutex
|
||||
@@ -85,11 +107,18 @@ type taskRunnerState struct {
|
||||
Task *structs.Task
|
||||
HandleID string
|
||||
ArtifactDownloaded bool
|
||||
TemplatesRendered bool
|
||||
}
|
||||
|
||||
// TaskStateUpdater is used to signal that tasks state has changed.
|
||||
type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent)
|
||||
|
||||
// SignalEvent is a tuple of the signal and the event generating it
|
||||
type SignalEvent struct {
|
||||
s os.Signal
|
||||
e *structs.TaskEvent
|
||||
}
|
||||
|
||||
// NewTaskRunner is used to create a new task context
|
||||
func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
updater TaskStateUpdater, ctx *driver.ExecContext,
|
||||
@@ -117,6 +146,10 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
updateCh: make(chan *structs.Allocation, 64),
|
||||
destroyCh: make(chan struct{}),
|
||||
waitCh: make(chan struct{}),
|
||||
unblockCh: make(chan struct{}),
|
||||
restartCh: make(chan *structs.TaskEvent),
|
||||
signalCh: make(chan SignalEvent),
|
||||
killCh: make(chan *structs.TaskEvent),
|
||||
}
|
||||
|
||||
return tc
|
||||
@@ -167,6 +200,7 @@ func (r *TaskRunner) RestoreState() error {
|
||||
r.task = snap.Task
|
||||
}
|
||||
r.artifactsDownloaded = snap.ArtifactDownloaded
|
||||
r.templatesRendered = snap.TemplatesRendered
|
||||
|
||||
if err := r.setTaskEnv(); err != nil {
|
||||
return fmt.Errorf("client: failed to create task environment for task %q in allocation %q: %v",
|
||||
@@ -208,6 +242,7 @@ func (r *TaskRunner) SaveState() error {
|
||||
Task: r.task,
|
||||
Version: r.config.Version,
|
||||
ArtifactDownloaded: r.artifactsDownloaded,
|
||||
TemplatesRendered: r.templatesRendered,
|
||||
}
|
||||
r.handleLock.Lock()
|
||||
if r.handle != nil {
|
||||
@@ -315,24 +350,23 @@ func (r *TaskRunner) validateTask() error {
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (r *TaskRunner) run() {
|
||||
// Predeclare things so we can jump to the RESTART
|
||||
var handleEmpty bool
|
||||
var stopCollection chan struct{}
|
||||
// prestart handles life-cycle tasks that occur before the task has started.
|
||||
func (r *TaskRunner) prestart(taskDir string) (success bool) {
|
||||
// Build the template manager
|
||||
var err error
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, r.templatesRendered,
|
||||
r.config, r.vaultToken, taskDir, r.taskEnv)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||
r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
// Download the task's artifacts
|
||||
if !r.artifactsDownloaded && len(r.task.Artifacts) > 0 {
|
||||
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
|
||||
taskDir, ok := r.ctx.AllocDir.TaskDirs[r.task.Name]
|
||||
if !ok {
|
||||
err := fmt.Errorf("task directory couldn't be found")
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||
r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name)
|
||||
r.restartTracker.SetStartError(err)
|
||||
goto RESTART
|
||||
}
|
||||
|
||||
for _, artifact := range r.task.Artifacts {
|
||||
if err := getter.GetArtifact(r.taskEnv, artifact, taskDir); err != nil {
|
||||
r.setState(structs.TaskStatePending,
|
||||
@@ -345,6 +379,67 @@ func (r *TaskRunner) run() {
|
||||
r.artifactsDownloaded = true
|
||||
}
|
||||
|
||||
// We don't have to wait
|
||||
if r.templatesRendered {
|
||||
return true
|
||||
}
|
||||
|
||||
// Block for consul-template
|
||||
select {
|
||||
case <-r.unblockCh:
|
||||
r.templatesRendered = true
|
||||
return true
|
||||
case event := <-r.killCh:
|
||||
r.setState(structs.TaskStateDead, event)
|
||||
r.logger.Printf("[ERR] client: task killed: %v", event)
|
||||
return false
|
||||
case update := <-r.updateCh:
|
||||
if err := r.handleUpdate(update); err != nil {
|
||||
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
|
||||
}
|
||||
case err := <-r.vaultRenewalCh:
|
||||
if err == nil {
|
||||
continue // Only handle once.
|
||||
}
|
||||
|
||||
// This is a fatal error as the task is not valid if it requested a
|
||||
// Vault token and the token has now expired.
|
||||
r.logger.Printf("[WARN] client: vault token for task %q not renewed: %v", r.task.Name, err)
|
||||
r.Destroy(structs.NewTaskEvent(structs.TaskVaultRenewalFailed).SetVaultRenewalError(err))
|
||||
|
||||
case <-r.destroyCh:
|
||||
r.setState(structs.TaskStateDead, r.destroyEvent)
|
||||
return false
|
||||
}
|
||||
|
||||
RESTART:
|
||||
restart := r.shouldRestart()
|
||||
if !restart {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *TaskRunner) run() {
|
||||
// Predeclare things so we can jump to the RESTART
|
||||
var handleEmpty bool
|
||||
var stopCollection chan struct{}
|
||||
|
||||
// Get the task directory
|
||||
taskDir, ok := r.ctx.AllocDir.TaskDirs[r.task.Name]
|
||||
if !ok {
|
||||
err := fmt.Errorf("task directory couldn't be found")
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||
r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name)
|
||||
return
|
||||
}
|
||||
|
||||
// Do all prestart events first
|
||||
if success := r.prestart(taskDir); !success {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
// Start the task if not yet started or it is being forced. This logic
|
||||
// is necessary because in the case of a restore the handle already
|
||||
// exists.
|
||||
@@ -413,75 +508,41 @@ func (r *TaskRunner) run() {
|
||||
r.logger.Printf("[WARN] client: vault token for task %q not renewed: %v", r.task.Name, err)
|
||||
r.Destroy(structs.NewTaskEvent(structs.TaskVaultRenewalFailed).SetVaultRenewalError(err))
|
||||
|
||||
case se := <-r.signalCh:
|
||||
r.logger.Printf("[DEBUG] client: task being signalled with %v: %s", se.s, se.e.TaskSignalReason)
|
||||
r.setState(structs.TaskStateRunning, se.e)
|
||||
|
||||
// TODO need an interface on the driver
|
||||
|
||||
case event := <-r.restartCh:
|
||||
r.logger.Printf("[DEBUG] client: task being restarted: %s", event.RestartReason)
|
||||
r.setState(structs.TaskStateRunning, event)
|
||||
r.killTask(event.RestartReason, stopCollection)
|
||||
|
||||
// Since the restart isn't from a failure, restart immediately
|
||||
// and don't count against the restart policy
|
||||
r.restartTracker.SetRestartTriggered()
|
||||
break WAIT
|
||||
|
||||
case event := <-r.killCh:
|
||||
r.logger.Printf("[ERR] client: task being killed: %s", event.KillReason)
|
||||
r.killTask(event.KillReason, stopCollection)
|
||||
return
|
||||
|
||||
case <-r.destroyCh:
|
||||
// Store the task event that provides context on the task destroy.
|
||||
if r.destroyEvent.Type != structs.TaskKilled {
|
||||
r.setState(structs.TaskStateRunning, r.destroyEvent)
|
||||
}
|
||||
|
||||
// Mark that we received the kill event
|
||||
timeout := driver.GetKillTimeout(r.task.KillTimeout, r.config.MaxKillTimeout)
|
||||
r.setState(structs.TaskStateRunning,
|
||||
structs.NewTaskEvent(structs.TaskKilling).SetKillTimeout(timeout))
|
||||
|
||||
// Kill the task using an exponential backoff in-case of failures.
|
||||
destroySuccess, err := r.handleDestroy()
|
||||
if !destroySuccess {
|
||||
// We couldn't successfully destroy the resource created.
|
||||
r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err)
|
||||
}
|
||||
|
||||
// Stop collection of the task's resource usage
|
||||
close(stopCollection)
|
||||
|
||||
// Store that the task has been destroyed and any associated error.
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err))
|
||||
|
||||
r.runningLock.Lock()
|
||||
r.running = false
|
||||
r.runningLock.Unlock()
|
||||
|
||||
r.killTask("", stopCollection)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
RESTART:
|
||||
state, when := r.restartTracker.GetState()
|
||||
r.restartTracker.SetStartError(nil).SetWaitResult(nil)
|
||||
reason := r.restartTracker.GetReason()
|
||||
switch state {
|
||||
case structs.TaskNotRestarting, structs.TaskTerminated:
|
||||
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)
|
||||
if state == structs.TaskNotRestarting {
|
||||
r.setState(structs.TaskStateDead,
|
||||
structs.NewTaskEvent(structs.TaskNotRestarting).
|
||||
SetRestartReason(reason))
|
||||
}
|
||||
return
|
||||
case structs.TaskRestarting:
|
||||
r.logger.Printf("[INFO] client: Restarting task %q for alloc %q in %v", r.task.Name, r.alloc.ID, when)
|
||||
r.setState(structs.TaskStatePending,
|
||||
structs.NewTaskEvent(structs.TaskRestarting).
|
||||
SetRestartDelay(when).
|
||||
SetRestartReason(reason))
|
||||
default:
|
||||
r.logger.Printf("[ERR] client: restart tracker returned unknown state: %q", state)
|
||||
return
|
||||
}
|
||||
|
||||
// Sleep but watch for destroy events.
|
||||
select {
|
||||
case <-time.After(when):
|
||||
case <-r.destroyCh:
|
||||
}
|
||||
|
||||
// Destroyed while we were waiting to restart, so abort.
|
||||
r.destroyLock.Lock()
|
||||
destroyed := r.destroy
|
||||
r.destroyLock.Unlock()
|
||||
if destroyed {
|
||||
r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed due to: %s", r.task.Name, r.destroyEvent.Message)
|
||||
r.setState(structs.TaskStateDead, r.destroyEvent)
|
||||
restart := r.shouldRestart()
|
||||
if !restart {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -493,6 +554,85 @@ func (r *TaskRunner) run() {
|
||||
}
|
||||
}
|
||||
|
||||
// shouldRestart returns if the task should restart. If the return value is
|
||||
// true, the task's restart policy has already been considered and any wait time
|
||||
// between restarts has been applied.
|
||||
func (r *TaskRunner) shouldRestart() bool {
|
||||
state, when := r.restartTracker.GetState()
|
||||
reason := r.restartTracker.GetReason()
|
||||
switch state {
|
||||
case structs.TaskNotRestarting, structs.TaskTerminated:
|
||||
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)
|
||||
if state == structs.TaskNotRestarting {
|
||||
r.setState(structs.TaskStateDead,
|
||||
structs.NewTaskEvent(structs.TaskNotRestarting).
|
||||
SetRestartReason(reason))
|
||||
}
|
||||
return false
|
||||
case structs.TaskRestarting:
|
||||
r.logger.Printf("[INFO] client: Restarting task %q for alloc %q in %v", r.task.Name, r.alloc.ID, when)
|
||||
r.setState(structs.TaskStatePending,
|
||||
structs.NewTaskEvent(structs.TaskRestarting).
|
||||
SetRestartDelay(when).
|
||||
SetRestartReason(reason))
|
||||
default:
|
||||
r.logger.Printf("[ERR] client: restart tracker returned unknown state: %q", state)
|
||||
return false
|
||||
}
|
||||
|
||||
// Sleep but watch for destroy events.
|
||||
select {
|
||||
case <-time.After(when):
|
||||
case <-r.destroyCh:
|
||||
}
|
||||
|
||||
// Destroyed while we were waiting to restart, so abort.
|
||||
r.destroyLock.Lock()
|
||||
destroyed := r.destroy
|
||||
r.destroyLock.Unlock()
|
||||
if destroyed {
|
||||
r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed due to: %s", r.task.Name, r.destroyEvent.Message)
|
||||
r.setState(structs.TaskStateDead, r.destroyEvent)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// killTask kills the running task, storing the reason in the Killing TaskEvent.
|
||||
// The associated stats collection channel is also closed once the task is
|
||||
// successfully killed.
|
||||
func (r *TaskRunner) killTask(reason string, statsCh chan struct{}) {
|
||||
r.runningLock.Lock()
|
||||
running := r.running
|
||||
r.runningLock.Unlock()
|
||||
if !running {
|
||||
return
|
||||
}
|
||||
|
||||
// Mark that we received the kill event
|
||||
timeout := driver.GetKillTimeout(r.task.KillTimeout, r.config.MaxKillTimeout)
|
||||
r.setState(structs.TaskStateRunning,
|
||||
structs.NewTaskEvent(structs.TaskKilling).SetKillTimeout(timeout).SetKillReason(reason))
|
||||
|
||||
// Kill the task using an exponential backoff in-case of failures.
|
||||
destroySuccess, err := r.handleDestroy()
|
||||
if !destroySuccess {
|
||||
// We couldn't successfully destroy the resource created.
|
||||
r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err)
|
||||
}
|
||||
|
||||
r.runningLock.Lock()
|
||||
r.running = false
|
||||
r.runningLock.Unlock()
|
||||
|
||||
// Stop collection of the task's resource usage
|
||||
close(statsCh)
|
||||
|
||||
// Store that the task has been destroyed and any associated error.
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err))
|
||||
}
|
||||
|
||||
// startTask creates the driver and starts the task.
|
||||
func (r *TaskRunner) startTask() error {
|
||||
// Create a driver
|
||||
@@ -637,6 +777,88 @@ func (r *TaskRunner) handleDestroy() (destroyed bool, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Restart will restart the task
|
||||
func (r *TaskRunner) Restart(source, reason string) {
|
||||
|
||||
reasonStr := fmt.Sprintf("%s: %s", source, reason)
|
||||
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reasonStr)
|
||||
|
||||
r.logger.Printf("[DEBUG] client: restarting task %v for alloc %q: %v",
|
||||
r.task.Name, r.alloc.ID, reasonStr)
|
||||
|
||||
r.runningLock.Lock()
|
||||
running := r.running
|
||||
r.runningLock.Unlock()
|
||||
|
||||
// Drop the restart event
|
||||
if !running {
|
||||
r.logger.Printf("[DEBUG] client: skipping restart since task isn't running")
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case r.restartCh <- event:
|
||||
case <-r.waitCh:
|
||||
}
|
||||
}
|
||||
|
||||
// Signal will send a signal to the task
|
||||
func (r *TaskRunner) Signal(source, reason string, s os.Signal) {
|
||||
|
||||
reasonStr := fmt.Sprintf("%s: %s", source, reason)
|
||||
event := structs.NewTaskEvent(structs.TaskSignaling).SetTaskSignal(s).SetTaskSignalReason(reasonStr)
|
||||
|
||||
r.logger.Printf("[DEBUG] client: sending signal %v to task %v for alloc %q", s, r.task.Name, r.alloc.ID)
|
||||
|
||||
r.runningLock.Lock()
|
||||
running := r.running
|
||||
r.runningLock.Unlock()
|
||||
|
||||
// Drop the restart event
|
||||
if !running {
|
||||
r.logger.Printf("[DEBUG] client: skipping signal since task isn't running")
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case r.signalCh <- SignalEvent{s: s, e: event}:
|
||||
case <-r.waitCh:
|
||||
}
|
||||
}
|
||||
|
||||
// Kill will kill a task and store the error, no longer restarting the task
|
||||
func (r *TaskRunner) Kill(source, reason string) {
|
||||
r.killLock.Lock()
|
||||
defer r.killLock.Unlock()
|
||||
if r.killed {
|
||||
return
|
||||
}
|
||||
|
||||
reasonStr := fmt.Sprintf("%s: %s", source, reason)
|
||||
event := structs.NewTaskEvent(structs.TaskKilling).SetKillReason(reasonStr)
|
||||
|
||||
r.logger.Printf("[DEBUG] client: killing task %v for alloc %q: %v", r.task.Name, r.alloc.ID, reasonStr)
|
||||
|
||||
select {
|
||||
case r.killCh <- event:
|
||||
close(r.killCh)
|
||||
case <-r.waitCh:
|
||||
}
|
||||
}
|
||||
|
||||
// UnblockStart unblocks the starting of the task. It currently assumes only
|
||||
// consul-template will unblock
|
||||
func (r *TaskRunner) UnblockStart(source string) {
|
||||
r.unblockLock.Lock()
|
||||
defer r.unblockLock.Unlock()
|
||||
if r.unblocked {
|
||||
return
|
||||
}
|
||||
|
||||
r.logger.Printf("[DEBUG] client: unblocking task %v for alloc %q: %v", r.task.Name, r.alloc.ID, source)
|
||||
close(r.unblockCh)
|
||||
}
|
||||
|
||||
// Helper function for converting a WaitResult into a TaskTerminated event.
|
||||
func (r *TaskRunner) waitErrorToEvent(res *dstructs.WaitResult) *structs.TaskEvent {
|
||||
return structs.NewTaskEvent(structs.TaskTerminated).
|
||||
|
||||
@@ -471,3 +471,127 @@ func TestTaskRunner_VaultTokenRenewal(t *testing.T) {
|
||||
t.Fatalf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskKilled)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_RestartTask(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "10s",
|
||||
}
|
||||
|
||||
upd, tr := testTaskRunnerFromAlloc(true, alloc)
|
||||
tr.MarkReceived()
|
||||
go tr.Run()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
tr.Restart("test", "restart")
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
tr.Kill("test", "restart")
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
if len(upd.events) != 9 {
|
||||
t.Fatalf("should have 9 updates: %#v", upd.events)
|
||||
}
|
||||
|
||||
if upd.state != structs.TaskStateDead {
|
||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
if upd.events[2].Type != structs.TaskRestartSignal {
|
||||
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskRestartSignal)
|
||||
}
|
||||
|
||||
if upd.events[3].Type != structs.TaskKilling {
|
||||
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilling)
|
||||
}
|
||||
|
||||
if upd.events[4].Type != structs.TaskKilled {
|
||||
t.Fatalf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskKilled)
|
||||
}
|
||||
|
||||
t.Logf("%+v", upd.events[5])
|
||||
if upd.events[5].Type != structs.TaskRestarting {
|
||||
t.Fatalf("Sixth Event was %v; want %v", upd.events[5].Type, structs.TaskRestarting)
|
||||
}
|
||||
|
||||
if upd.events[6].Type != structs.TaskStarted {
|
||||
t.Fatalf("Seventh Event was %v; want %v", upd.events[7].Type, structs.TaskStarted)
|
||||
}
|
||||
if upd.events[7].Type != structs.TaskKilling {
|
||||
t.Fatalf("Eighth Event was %v; want %v", upd.events[7].Type, structs.TaskKilling)
|
||||
}
|
||||
|
||||
if upd.events[8].Type != structs.TaskKilled {
|
||||
t.Fatalf("Nineth Event was %v; want %v", upd.events[8].Type, structs.TaskKilled)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_KillTask(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "10s",
|
||||
}
|
||||
|
||||
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
go tr.Run()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
tr.Kill("test", "kill")
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
if len(upd.events) != 4 {
|
||||
t.Fatalf("should have 4 updates: %#v", upd.events)
|
||||
}
|
||||
|
||||
if upd.state != structs.TaskStateDead {
|
||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
if upd.events[2].Type != structs.TaskKilling {
|
||||
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilling)
|
||||
}
|
||||
|
||||
if upd.events[3].Type != structs.TaskKilled {
|
||||
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilled)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -353,6 +353,25 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
|
||||
} else {
|
||||
desc = "Task's sibling failed"
|
||||
}
|
||||
case api.TaskSignaling:
|
||||
sig := event.TaskSignal
|
||||
reason := event.TaskSignalReason
|
||||
|
||||
if sig == "" && reason == "" {
|
||||
desc = "Task being sent a signal"
|
||||
} else if sig == "" {
|
||||
desc = reason
|
||||
} else if reason == "" {
|
||||
desc = fmt.Sprintf("Task being sent signal %v", sig)
|
||||
} else {
|
||||
desc = fmt.Sprintf("Task being sent signal %v: %v", sig, reason)
|
||||
}
|
||||
case api.TaskRestartSignal:
|
||||
if event.RestartReason != "" {
|
||||
desc = event.RestartReason
|
||||
} else {
|
||||
desc = "Task signaled to restart"
|
||||
}
|
||||
}
|
||||
|
||||
// Reverse order so we are sorted by time
|
||||
|
||||
@@ -3287,6 +3287,12 @@ func TestTaskDiff(t *testing.T) {
|
||||
Old: "",
|
||||
New: "bam3",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "ChangeSignal",
|
||||
Old: "",
|
||||
New: "SIGHUP3",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "DestPath",
|
||||
@@ -3295,22 +3301,10 @@ func TestTaskDiff(t *testing.T) {
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "EmbededTmpl",
|
||||
Name: "EmbeddedTmpl",
|
||||
Old: "",
|
||||
New: "baz3",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "Once",
|
||||
Old: "",
|
||||
New: "true",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "RestartSignal",
|
||||
Old: "",
|
||||
New: "SIGHUP3",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "SourcePath",
|
||||
@@ -3335,6 +3329,12 @@ func TestTaskDiff(t *testing.T) {
|
||||
Old: "bam2",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "ChangeSignal",
|
||||
Old: "SIGHUP2",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "DestPath",
|
||||
@@ -3343,22 +3343,10 @@ func TestTaskDiff(t *testing.T) {
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "EmbededTmpl",
|
||||
Name: "EmbeddedTmpl",
|
||||
Old: "baz2",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "Once",
|
||||
Old: "false",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "RestartSignal",
|
||||
Old: "SIGHUP2",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "SourcePath",
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"regexp"
|
||||
@@ -2223,7 +2224,7 @@ type Template struct {
|
||||
DestPath string `mapstructure:"destination"`
|
||||
|
||||
// EmbeddedTmpl store the raw template. This is useful for smaller templates
|
||||
// where they are embeded in the job file rather than sent as an artificat
|
||||
// where they are embedded in the job file rather than sent as an artificat
|
||||
EmbeddedTmpl string `mapstructure:"data"`
|
||||
|
||||
// ChangeMode indicates what should be done if the template is re-rendered
|
||||
@@ -2261,7 +2262,7 @@ func (t *Template) Validate() error {
|
||||
|
||||
// Verify we have something to render
|
||||
if t.SourcePath == "" && t.EmbeddedTmpl == "" {
|
||||
multierror.Append(&mErr, fmt.Errorf("Must specify a source path or have an embeded template"))
|
||||
multierror.Append(&mErr, fmt.Errorf("Must specify a source path or have an embedded template"))
|
||||
}
|
||||
|
||||
// Verify we can render somewhere
|
||||
@@ -2393,6 +2394,13 @@ const (
|
||||
// restarted because it has exceeded its restart policy.
|
||||
TaskNotRestarting = "Not Restarting"
|
||||
|
||||
// TaskRestartSignal indicates that the task has been signalled to be
|
||||
// restarted
|
||||
TaskRestartSignal = "Restart Signaled"
|
||||
|
||||
// TaskSignaling indicates that the task is being signalled.
|
||||
TaskSignaling = "Signaling"
|
||||
|
||||
// TaskDownloadingArtifacts means the task is downloading the artifacts
|
||||
// specified in the task.
|
||||
TaskDownloadingArtifacts = "Downloading Artifacts"
|
||||
@@ -2436,6 +2444,9 @@ type TaskEvent struct {
|
||||
// Task Killed Fields.
|
||||
KillError string // Error killing the task.
|
||||
|
||||
// KillReason is the reason the task was killed
|
||||
KillReason string
|
||||
|
||||
// TaskRestarting fields.
|
||||
StartDelay int64 // The sleep period before restarting the task in unix nanoseconds.
|
||||
|
||||
@@ -2457,6 +2468,12 @@ type TaskEvent struct {
|
||||
|
||||
// VaultError is the error from token renewal
|
||||
VaultError string
|
||||
|
||||
// TaskSignalReason indicates the reason the task is being signalled.
|
||||
TaskSignalReason string
|
||||
|
||||
// TaskSignal is the signal that was sent to the task
|
||||
TaskSignal string
|
||||
}
|
||||
|
||||
func (te *TaskEvent) GoString() string {
|
||||
@@ -2510,6 +2527,11 @@ func (e *TaskEvent) SetKillError(err error) *TaskEvent {
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetKillReason(r string) *TaskEvent {
|
||||
e.KillReason = r
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetRestartDelay(delay time.Duration) *TaskEvent {
|
||||
e.StartDelay = int64(delay)
|
||||
return e
|
||||
@@ -2520,6 +2542,16 @@ func (e *TaskEvent) SetRestartReason(reason string) *TaskEvent {
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetTaskSignalReason(r string) *TaskEvent {
|
||||
e.TaskSignalReason = r
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetTaskSignal(s os.Signal) *TaskEvent {
|
||||
e.TaskSignal = s.String()
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetDownloadError(err error) *TaskEvent {
|
||||
if err != nil {
|
||||
e.DownloadError = err.Error()
|
||||
|
||||
Reference in New Issue
Block a user