client: refactor post allocrunnerv2 finalization

This commit is contained in:
Nick Ethier
2018-10-05 21:42:15 -04:00
committed by Michael Schurter
parent d335a82859
commit ea9ed2282e
10 changed files with 195 additions and 1528 deletions

View File

@@ -2,25 +2,33 @@ package taskrunner
import (
"context"
"errors"
"fmt"
"sync"
"time"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/hcl2/hcl"
"github.com/hashicorp/hcl2/hcldec"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/driver/env"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/loader"
"github.com/zclconf/go-cty/cty"
)
const (
@@ -69,6 +77,10 @@ type TaskRunner struct {
// stateDB is for persisting localState and taskState
stateDB cstate.StateDB
// persistedHash is the hash of the last persisted state for skipping
// unnecessary writes
persistedHash []byte
// ctx is the task runner's context representing the tasks's lifecycle.
// Canceling the context will cause the task to be destroyed.
ctx context.Context
@@ -90,16 +102,22 @@ type TaskRunner struct {
waitCh chan struct{}
// driver is the driver for the task.
driver driver.Driver
driver drivers.DriverPlugin
// driverCapabilities is the set capabilities the driver supports
driverCapabilities *drivers.Capabilities
// taskSchema is the hcl spec for the task driver configuration
taskSchema hcldec.Spec
// handleLock guards access to handle and handleResult
handleLock sync.Mutex
// handle to the running driver
handle driver.DriverHandle
handle tinterfaces.DriverHandle
// handleResult proxies wait results from drivers
handleResult *handleResult
// network is the configuration for the driver network if one was created
network *cstructs.DriverNetwork
// task is the task being run
task *structs.Task
@@ -142,6 +160,13 @@ type TaskRunner struct {
// LatestResourceUsage. May be nil at all times.
resourceUsage *cstructs.TaskResourceUsage
resourceUsageLock sync.Mutex
// PluginLoader is used to load plugins.
pluginLoader loader.PluginCatalog
// PluginSingletonLoader is a plugin loader that will returns singleton
// instances of the plugins.
pluginSingletonLoader loader.PluginCatalog
}
type Config struct {
@@ -163,6 +188,13 @@ type Config struct {
// StateUpdater is used to emit updated task state
StateUpdater interfaces.TaskStateHandler
// PluginLoader is used to load plugins.
PluginLoader loader.PluginCatalog
// PluginSingletonLoader is a plugin loader that will returns singleton
// instances of the plugins.
PluginSingletonLoader loader.PluginCatalog
}
func NewTaskRunner(config *Config) (*TaskRunner, error) {
@@ -178,23 +210,25 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
)
tr := &TaskRunner{
alloc: config.Alloc,
allocID: config.Alloc.ID,
clientConfig: config.ClientConfig,
task: config.Task,
taskDir: config.TaskDir,
taskName: config.Task.Name,
envBuilder: envBuilder,
consulClient: config.Consul,
vaultClient: config.VaultClient,
state: config.Alloc.TaskStates[config.Task.Name].Copy(),
localState: config.LocalState,
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
ctx: trCtx,
ctxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
waitCh: make(chan struct{}),
alloc: config.Alloc,
allocID: config.Alloc.ID,
clientConfig: config.ClientConfig,
task: config.Task,
taskDir: config.TaskDir,
taskName: config.Task.Name,
envBuilder: envBuilder,
consulClient: config.Consul,
vaultClient: config.VaultClient,
state: config.Alloc.TaskStates[config.Task.Name].Copy(),
localState: config.LocalState,
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
ctx: trCtx,
ctxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
waitCh: make(chan struct{}),
pluginLoader: config.PluginLoader,
pluginSingletonLoader: config.PluginSingletonLoader,
}
// Create the logger based on the allocation ID
@@ -261,7 +295,7 @@ func (tr *TaskRunner) initLabels() {
func (tr *TaskRunner) Run() {
defer close(tr.waitCh)
var waitRes *dstructs.WaitResult
var result *drivers.ExitResult
// Updates are handled asynchronously with the other hooks but each
// triggered update - whether due to alloc updates or a new vault token
@@ -295,18 +329,22 @@ MAIN:
// Grab the result proxy and wait for task to exit
{
_, result := tr.getDriverHandle()
handle := tr.getDriverHandle()
// Do *not* use tr.ctx here as it would cause Wait() to
// unblock before the task exits when Kill() is called.
waitRes = result.Wait(context.Background())
if resultCh, err := handle.WaitCh(context.Background()); err != nil {
tr.logger.Error("wait task failed", "error", err)
} else {
result = <-resultCh
}
}
// Clear the handle
tr.clearDriverHandle()
// Store the wait result on the restart tracker
tr.restartTracker.SetWaitResult(waitRes)
tr.restartTracker.SetExitResult(result)
if err := tr.exited(); err != nil {
tr.logger.Error("exited hooks failed", "error", err)
@@ -329,11 +367,11 @@ MAIN:
// If task terminated, update server. All other exit conditions (eg
// killed or out of restarts) will perform their own server updates.
if waitRes != nil {
if result != nil {
event := structs.NewTaskEvent(structs.TaskTerminated).
SetExitCode(waitRes.ExitCode).
SetSignal(waitRes.Signal).
SetExitMessage(waitRes.Err)
SetExitCode(result.ExitCode).
SetSignal(result.Signal).
SetExitMessage(result.Err)
tr.UpdateState(structs.TaskStateDead, event)
}
@@ -398,37 +436,67 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
// runDriver runs the driver and waits for it to exit
func (tr *TaskRunner) runDriver() error {
// Run prestart
ctx := driver.NewExecContext(tr.taskDir, tr.envBuilder.Build())
_, err := tr.driver.Prestart(ctx, tr.task)
if err != nil {
tr.logger.Error("driver pre-start failed", "error", err)
taskConfig := drivers.NewTaskConfig(tr.task, tr.taskDir, tr.envBuilder.Build())
taskConfig.ID = tr.buildID()
taskConfig.StdoutPath = tr.logmonHookConfig.stdoutFifo
taskConfig.StderrPath = tr.logmonHookConfig.stderrFifo
evalCtx := &hcl.EvalContext{
Functions: shared.GetStdlibFuncs(),
Variables: map[string]cty.Value{
"NOMAD_ENV_bin": cty.StringVal("/bin/consul"),
},
}
val, diag := shared.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx)
if diag.HasErrors() {
errStr := "failed to parse config"
for _, err := range diag.Errs() {
errStr = fmt.Sprintf("%s\n* %s", errStr, err.Error())
}
return errors.New(errStr)
}
if err := taskConfig.EncodeDriverConfig(val); err != nil {
tr.logger.Warn("failed to encode driver config", "error", err)
return err
}
// Create a new context for Start since the environment may have been updated.
ctx = driver.NewExecContext(tr.taskDir, tr.envBuilder.Build())
ctx.StdoutFifo = tr.logmonHookConfig.stdoutFifo
ctx.StderrFifo = tr.logmonHookConfig.stderrFifo
//TODO mounts and devices
//XXX Evaluate and encode driver config
// Start the job
sresp, err := tr.driver.Start(ctx, tr.task)
handle, net, err := tr.driver.StartTask(taskConfig)
if err != nil {
tr.logger.Warn("driver start failed", "error", err)
return err
}
tr.network = net
// Store the driver handle and associated metadata
tr.setDriverHandle(sresp.Handle)
tr.localStateLock.Lock()
tr.localState.TaskHandle = handle
tr.localStateLock.Unlock()
tr.updateDriverHandle(taskConfig.ID)
// Emit an event that we started
tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
return nil
}
func (tr *TaskRunner) updateDriverHandle(taskID string) {
tr.handleLock.Lock()
defer tr.handleLock.Unlock()
tr.handle = &driverHandleImpl{
driver: tr.driver,
net: tr.network,
taskID: taskID,
task: tr.Task(),
}
}
// initDriver creates the driver for the task
func (tr *TaskRunner) initDriver() error {
/*func (tr *TaskRunner) initDriver() error {
// Create a task-specific event emitter callback to expose minimal
// state to drivers
//XXX Replace with EmitEvent -- no need for a shim
@@ -455,6 +523,37 @@ func (tr *TaskRunner) initDriver() error {
}
tr.driver = driver
return nil
}*/
// initDriver retrives the DriverPlugin from the plugin loader for this task
func (tr *TaskRunner) initDriver() error {
plugin, err := tr.pluginSingletonLoader.Dispense(tr.Task().Driver, base.PluginTypeDriver, tr.logger)
if err != nil {
return err
}
// XXX need to be able to reattach to running drivers
driver, ok := plugin.Plugin().(drivers.DriverPlugin)
if !ok {
return fmt.Errorf("plugin loaded for driver %s does not implement DriverPlugin interface", tr.task.Driver)
}
tr.driver = driver
schema, err := tr.driver.TaskConfigSchema()
if err != nil {
return err
}
spec, _ := hclspec.Convert(schema)
tr.taskSchema = spec
caps, err := tr.driver.Capabilities()
if err != nil {
return err
}
tr.driverCapabilities = caps
return nil
}
@@ -462,10 +561,14 @@ func (tr *TaskRunner) initDriver() error {
// handleDestroy will retry with an exponential backoff and will give up at a
// given limit. It returns whether the task was destroyed and the error
// associated with the last kill attempt.
func (tr *TaskRunner) handleDestroy(handle driver.DriverHandle) (destroyed bool, err error) {
func (tr *TaskRunner) handleDestroy(handle tinterfaces.DriverHandle) (destroyed bool, err error) {
// Cap the number of times we attempt to kill the task.
for i := 0; i < killFailureLimit; i++ {
if err = handle.Kill(); err != nil {
if err == drivers.ErrTaskNotFound {
tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID())
return true, nil
}
// Calculate the new backoff
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
if backoff > killBackoffLimit {
@@ -490,6 +593,11 @@ func (tr *TaskRunner) persistLocalState() error {
return tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState)
}
// buildID builds a consistent unique ID for the task from the alloc ID, task name and restart attempt
func (tr *TaskRunner) buildID() string {
return fmt.Sprintf("%s/%s/%d", tr.allocID, tr.taskName, tr.restartTracker.GetCount())
}
// XXX If the objects don't exists since the client shutdown before the task
// runner ever saved state, then we should treat it as a new task runner and not
// return an error
@@ -628,7 +736,7 @@ func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error {
// Ensure the event is populated with human readable strings
event.PopulateEventDisplayMessage()
// Propagate failure from event to task state
// Propogate failure from event to task state
if event.FailsTask {
tr.state.Failed = true
}