mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
core: propagate remote task handles
Add a new driver capability: RemoteTasks. When a task is run by a driver with RemoteTasks set, its TaskHandle will be propagated to the server in its allocation's TaskState. If the task is replaced due to a down node or draining, its TaskHandle will be propagated to its replacement allocation. This allows tasks to be scheduled in remote systems whose lifecycles are disconnected from the Nomad node's lifecycle. See https://github.com/hashicorp/nomad-driver-ecs for an example ECS remote task driver.
This commit is contained in:
@@ -13,20 +13,22 @@ import (
|
||||
// NewDriverHandle returns a handle for task operations on a specific task
|
||||
func NewDriverHandle(driver drivers.DriverPlugin, taskID string, task *structs.Task, net *drivers.DriverNetwork) *DriverHandle {
|
||||
return &DriverHandle{
|
||||
driver: driver,
|
||||
net: net,
|
||||
taskID: taskID,
|
||||
task: task,
|
||||
driver: driver,
|
||||
net: net,
|
||||
taskID: taskID,
|
||||
killSignal: task.KillSignal,
|
||||
killTimeout: task.KillTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
// DriverHandle encapsulates a driver plugin client and task identifier and exposes
|
||||
// an api to perform driver operations on the task
|
||||
type DriverHandle struct {
|
||||
driver drivers.DriverPlugin
|
||||
net *drivers.DriverNetwork
|
||||
task *structs.Task
|
||||
taskID string
|
||||
driver drivers.DriverPlugin
|
||||
net *drivers.DriverNetwork
|
||||
taskID string
|
||||
killSignal string
|
||||
killTimeout time.Duration
|
||||
}
|
||||
|
||||
func (h *DriverHandle) ID() string {
|
||||
@@ -37,12 +39,13 @@ func (h *DriverHandle) WaitCh(ctx context.Context) (<-chan *drivers.ExitResult,
|
||||
return h.driver.WaitTask(ctx, h.taskID)
|
||||
}
|
||||
|
||||
func (h *DriverHandle) Update(task *structs.Task) error {
|
||||
return nil
|
||||
// SetKillSignal allows overriding the signal sent to kill the task.
|
||||
func (h *DriverHandle) SetKillSignal(signal string) {
|
||||
h.killSignal = signal
|
||||
}
|
||||
|
||||
func (h *DriverHandle) Kill() error {
|
||||
return h.driver.StopTask(h.taskID, h.task.KillTimeout, h.task.KillSignal)
|
||||
return h.driver.StopTask(h.taskID, h.killTimeout, h.killSignal)
|
||||
}
|
||||
|
||||
func (h *DriverHandle) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) {
|
||||
|
||||
124
client/allocrunner/taskrunner/remotetask_hook.go
Normal file
124
client/allocrunner/taskrunner/remotetask_hook.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
)
|
||||
|
||||
var _ interfaces.TaskPrestartHook = (*remoteTaskHook)(nil)
|
||||
var _ interfaces.TaskPreKillHook = (*remoteTaskHook)(nil)
|
||||
|
||||
// remoteTaskHook reattaches to remotely executing tasks.
|
||||
type remoteTaskHook struct {
|
||||
tr *TaskRunner
|
||||
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
func newRemoteTaskHook(tr *TaskRunner, logger hclog.Logger) interfaces.TaskHook {
|
||||
h := &remoteTaskHook{
|
||||
tr: tr,
|
||||
}
|
||||
h.logger = logger.Named(h.Name())
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *remoteTaskHook) Name() string {
|
||||
return "remote_task"
|
||||
}
|
||||
|
||||
// Prestart performs 2 remote task driver related tasks:
|
||||
// 1. If there is no local handle, see if there is a handle propagated from a
|
||||
// previous alloc to be restored.
|
||||
// 2. If the alloc is lost make sure the task signal is set to detach instead
|
||||
// of kill.
|
||||
func (h *remoteTaskHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
|
||||
if h.tr.getDriverHandle() != nil {
|
||||
// Driver handle already exists so don't try to load remote
|
||||
// task handle
|
||||
return nil
|
||||
}
|
||||
|
||||
h.tr.stateLock.Lock()
|
||||
th := drivers.NewTaskHandleFromState(h.tr.state)
|
||||
h.tr.stateLock.Unlock()
|
||||
|
||||
// Task handle will be nil if there was no previous allocation or if
|
||||
// this is a destructive update
|
||||
if th == nil {
|
||||
resp.Done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// The task config is unique per invocation so recreate it here
|
||||
th.Config = h.tr.buildTaskConfig()
|
||||
|
||||
if err := h.tr.driver.RecoverTask(th); err != nil {
|
||||
// Soft error here to let a new instance get started instead of
|
||||
// failing the task since retrying is unlikely to help.
|
||||
h.logger.Error("error recovering task state", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
taskInfo, err := h.tr.driver.InspectTask(th.Config.ID)
|
||||
if err != nil {
|
||||
// Soft error here to let a new instance get started instead of
|
||||
// failing the task since retrying is unlikely to help.
|
||||
h.logger.Error("error inspecting recovered task state", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
h.tr.setDriverHandle(NewDriverHandle(h.tr.driver, th.Config.ID, h.tr.Task(), taskInfo.NetworkOverride))
|
||||
|
||||
h.tr.stateLock.Lock()
|
||||
h.tr.localState.TaskHandle = th
|
||||
h.tr.localState.DriverNetwork = taskInfo.NetworkOverride
|
||||
h.tr.stateLock.Unlock()
|
||||
|
||||
// Ensure the signal is set according to the allocation's state
|
||||
h.setSignal(h.tr.Alloc())
|
||||
|
||||
// Emit TaskStarted manually since the normal task runner logic will
|
||||
// treat this task like a restored task and skip emitting started.
|
||||
h.tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreKilling tells the remote task driver to detach a remote task instead of
|
||||
// stopping it.
|
||||
func (h *remoteTaskHook) PreKilling(ctx context.Context, req *interfaces.TaskPreKillRequest, resp *interfaces.TaskPreKillResponse) error {
|
||||
alloc := h.tr.Alloc()
|
||||
h.setSignal(alloc)
|
||||
return nil
|
||||
}
|
||||
|
||||
// setSignal to detach if the allocation is lost or draining. Safe to call
|
||||
// multiple times as it only transitions to using detach -- never back to kill.
|
||||
func (h *remoteTaskHook) setSignal(alloc *structs.Allocation) {
|
||||
driverHandle := h.tr.getDriverHandle()
|
||||
if driverHandle == nil {
|
||||
// Nothing to do exit early
|
||||
return
|
||||
}
|
||||
|
||||
switch {
|
||||
case alloc.ClientStatus == structs.AllocClientStatusLost:
|
||||
// Continue on; lost allocs should just detach
|
||||
h.logger.Debug("detaching from remote task since alloc was lost")
|
||||
case alloc.DesiredTransition.ShouldMigrate():
|
||||
// Continue on; migrating allocs should just detach
|
||||
h.logger.Debug("detaching from remote task since alloc was drained")
|
||||
default:
|
||||
// Nothing to do exit early
|
||||
return
|
||||
}
|
||||
|
||||
// Set DetachSignal to indicate to the remote task driver that it
|
||||
// should detach this remote task and ignore it.
|
||||
driverHandle.SetKillSignal(drivers.DetachSignal)
|
||||
}
|
||||
@@ -19,6 +19,7 @@ var _ interfaces.TaskPoststartHook = &serviceHook{}
|
||||
var _ interfaces.TaskPreKillHook = &serviceHook{}
|
||||
var _ interfaces.TaskExitedHook = &serviceHook{}
|
||||
var _ interfaces.TaskStopHook = &serviceHook{}
|
||||
var _ interfaces.TaskUpdateHook = &serviceHook{}
|
||||
|
||||
type serviceHookConfig struct {
|
||||
alloc *structs.Allocation
|
||||
|
||||
@@ -377,7 +377,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize the runners hooks.
|
||||
// Initialize the runners hooks. Must come after initDriver so hooks
|
||||
// can use tr.driverCapabilities
|
||||
tr.initHooks()
|
||||
|
||||
// Initialize base labels
|
||||
@@ -496,6 +497,7 @@ func (tr *TaskRunner) Run() {
|
||||
tr.logger.Info("task failed to restore; waiting to contact server before restarting")
|
||||
select {
|
||||
case <-tr.killCtx.Done():
|
||||
tr.logger.Info("task killed while waiting for server contact")
|
||||
case <-tr.shutdownCtx.Done():
|
||||
return
|
||||
case <-tr.serversContactedCh:
|
||||
@@ -637,11 +639,12 @@ MAIN:
|
||||
}
|
||||
|
||||
func (tr *TaskRunner) shouldShutdown() bool {
|
||||
if tr.alloc.ClientTerminalStatus() {
|
||||
alloc := tr.Alloc()
|
||||
if alloc.ClientTerminalStatus() {
|
||||
return true
|
||||
}
|
||||
|
||||
if !tr.IsPoststopTask() && tr.alloc.ServerTerminalStatus() {
|
||||
if !tr.IsPoststopTask() && alloc.ServerTerminalStatus() {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -1142,6 +1145,12 @@ func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) {
|
||||
tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state)
|
||||
}
|
||||
|
||||
// Store task handle for remote tasks
|
||||
if tr.driverCapabilities != nil && tr.driverCapabilities.RemoteTasks {
|
||||
tr.logger.Trace("storing remote task handle state")
|
||||
tr.localState.TaskHandle.Store(tr.state)
|
||||
}
|
||||
|
||||
// Notify the alloc runner of the transition
|
||||
tr.stateUpdater.TaskStateUpdated()
|
||||
}
|
||||
|
||||
@@ -152,6 +152,12 @@ func (tr *TaskRunner) initHooks() {
|
||||
consul: tr.consulServiceClient,
|
||||
logger: hookLogger,
|
||||
}))
|
||||
|
||||
// If this task driver has remote capabilities, add the remote task
|
||||
// hook.
|
||||
if tr.driverCapabilities.RemoteTasks {
|
||||
tr.runnerHooks = append(tr.runnerHooks, newRemoteTaskHook(tr, hookLogger))
|
||||
}
|
||||
}
|
||||
|
||||
func (tr *TaskRunner) emitHookError(err error, hookName string) {
|
||||
|
||||
@@ -2239,7 +2239,6 @@ func (c *Client) runAllocs(update *allocUpdates) {
|
||||
|
||||
// Update the existing allocations
|
||||
for _, update := range diff.updated {
|
||||
c.logger.Trace("updating alloc", "alloc_id", update.ID, "index", update.AllocModifyIndex)
|
||||
c.updateAlloc(update)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user