diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index f0e09b5cd..6f67e0f1f 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -21,6 +21,7 @@ import ( "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/shared/loader" ) // allocRunner is used to run all the tasks in a given allocation @@ -78,6 +79,13 @@ type allocRunner struct { // prevAllocWatcher allows waiting for a previous allocation to exit // and if necessary migrate its alloc dir. prevAllocWatcher allocwatcher.PrevAllocWatcher + + // pluginLoader is used to load plugins. + pluginLoader loader.PluginCatalog + + // pluginSingletonLoader is a plugin loader that will returns singleton + // instances of the plugins. + pluginSingletonLoader loader.PluginCatalog } // NewAllocRunner returns a new allocation runner. @@ -89,18 +97,20 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { } ar := &allocRunner{ - id: alloc.ID, - alloc: alloc, - clientConfig: config.ClientConfig, - consulClient: config.Consul, - vaultClient: config.Vault, - tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), - waitCh: make(chan struct{}), - state: &state.State{}, - stateDB: config.StateDB, - stateUpdater: config.StateUpdater, - allocBroadcaster: cstructs.NewAllocBroadcaster(), - prevAllocWatcher: config.PrevAllocWatcher, + id: alloc.ID, + alloc: alloc, + clientConfig: config.ClientConfig, + consulClient: config.Consul, + vaultClient: config.Vault, + tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), + waitCh: make(chan struct{}), + state: &state.State{}, + stateDB: config.StateDB, + stateUpdater: config.StateUpdater, + allocBroadcaster: cstructs.NewAllocBroadcaster(), + prevAllocWatcher: config.PrevAllocWatcher, + pluginLoader: config.PluginLoader, + pluginSingletonLoader: config.PluginSingletonLoader, } // Create the logger based on the allocation ID @@ -124,15 +134,17 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { for _, task := range tasks { config := &taskrunner.Config{ - Alloc: ar.alloc, - ClientConfig: ar.clientConfig, - Task: task, - TaskDir: ar.allocDir.NewTaskDir(task.Name), - Logger: ar.logger, - StateDB: ar.stateDB, - StateUpdater: ar, - Consul: ar.consulClient, - VaultClient: ar.vaultClient, + Alloc: ar.alloc, + ClientConfig: ar.clientConfig, + Task: task, + TaskDir: ar.allocDir.NewTaskDir(task.Name), + Logger: ar.logger, + StateDB: ar.stateDB, + StateUpdater: ar, + Consul: ar.consulClient, + VaultClient: ar.vaultClient, + PluginLoader: ar.pluginLoader, + PluginSingletonLoader: ar.pluginSingletonLoader, } // Create, but do not Run, the task runner diff --git a/client/allocrunner/interfaces/task_lifecycle.go b/client/allocrunner/interfaces/task_lifecycle.go index 9fe16097a..6b6314a69 100644 --- a/client/allocrunner/interfaces/task_lifecycle.go +++ b/client/allocrunner/interfaces/task_lifecycle.go @@ -3,7 +3,7 @@ package interfaces import ( "context" - "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" "github.com/hashicorp/nomad/client/driver/env" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" diff --git a/client/allocrunner/taskrunner/driver_handle.go b/client/allocrunner/taskrunner/driver_handle.go index 708209b45..9aea784d9 100644 --- a/client/allocrunner/taskrunner/driver_handle.go +++ b/client/allocrunner/taskrunner/driver_handle.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 3f93313ca..87f060e22 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -2,25 +2,33 @@ package taskrunner import ( "context" + "errors" "fmt" "sync" "time" metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/hcl2/hcl" + "github.com/hashicorp/hcl2/hcldec" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" - "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/driver/env" - dstructs "github.com/hashicorp/nomad/client/driver/structs" cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared" + "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/loader" + "github.com/zclconf/go-cty/cty" ) const ( @@ -69,6 +77,10 @@ type TaskRunner struct { // stateDB is for persisting localState and taskState stateDB cstate.StateDB + // persistedHash is the hash of the last persisted state for skipping + // unnecessary writes + persistedHash []byte + // ctx is the task runner's context representing the tasks's lifecycle. // Canceling the context will cause the task to be destroyed. ctx context.Context @@ -90,16 +102,22 @@ type TaskRunner struct { waitCh chan struct{} // driver is the driver for the task. - driver driver.Driver + driver drivers.DriverPlugin + + // driverCapabilities is the set capabilities the driver supports + driverCapabilities *drivers.Capabilities + + // taskSchema is the hcl spec for the task driver configuration + taskSchema hcldec.Spec // handleLock guards access to handle and handleResult handleLock sync.Mutex // handle to the running driver - handle driver.DriverHandle + handle tinterfaces.DriverHandle - // handleResult proxies wait results from drivers - handleResult *handleResult + // network is the configuration for the driver network if one was created + network *cstructs.DriverNetwork // task is the task being run task *structs.Task @@ -142,6 +160,13 @@ type TaskRunner struct { // LatestResourceUsage. May be nil at all times. resourceUsage *cstructs.TaskResourceUsage resourceUsageLock sync.Mutex + + // PluginLoader is used to load plugins. + pluginLoader loader.PluginCatalog + + // PluginSingletonLoader is a plugin loader that will returns singleton + // instances of the plugins. + pluginSingletonLoader loader.PluginCatalog } type Config struct { @@ -163,6 +188,13 @@ type Config struct { // StateUpdater is used to emit updated task state StateUpdater interfaces.TaskStateHandler + + // PluginLoader is used to load plugins. + PluginLoader loader.PluginCatalog + + // PluginSingletonLoader is a plugin loader that will returns singleton + // instances of the plugins. + PluginSingletonLoader loader.PluginCatalog } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -178,23 +210,25 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { ) tr := &TaskRunner{ - alloc: config.Alloc, - allocID: config.Alloc.ID, - clientConfig: config.ClientConfig, - task: config.Task, - taskDir: config.TaskDir, - taskName: config.Task.Name, - envBuilder: envBuilder, - consulClient: config.Consul, - vaultClient: config.VaultClient, - state: config.Alloc.TaskStates[config.Task.Name].Copy(), - localState: config.LocalState, - stateDB: config.StateDB, - stateUpdater: config.StateUpdater, - ctx: trCtx, - ctxCancel: trCancel, - triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), - waitCh: make(chan struct{}), + alloc: config.Alloc, + allocID: config.Alloc.ID, + clientConfig: config.ClientConfig, + task: config.Task, + taskDir: config.TaskDir, + taskName: config.Task.Name, + envBuilder: envBuilder, + consulClient: config.Consul, + vaultClient: config.VaultClient, + state: config.Alloc.TaskStates[config.Task.Name].Copy(), + localState: config.LocalState, + stateDB: config.StateDB, + stateUpdater: config.StateUpdater, + ctx: trCtx, + ctxCancel: trCancel, + triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), + waitCh: make(chan struct{}), + pluginLoader: config.PluginLoader, + pluginSingletonLoader: config.PluginSingletonLoader, } // Create the logger based on the allocation ID @@ -261,7 +295,7 @@ func (tr *TaskRunner) initLabels() { func (tr *TaskRunner) Run() { defer close(tr.waitCh) - var waitRes *dstructs.WaitResult + var result *drivers.ExitResult // Updates are handled asynchronously with the other hooks but each // triggered update - whether due to alloc updates or a new vault token @@ -295,18 +329,22 @@ MAIN: // Grab the result proxy and wait for task to exit { - _, result := tr.getDriverHandle() + handle := tr.getDriverHandle() // Do *not* use tr.ctx here as it would cause Wait() to // unblock before the task exits when Kill() is called. - waitRes = result.Wait(context.Background()) + if resultCh, err := handle.WaitCh(context.Background()); err != nil { + tr.logger.Error("wait task failed", "error", err) + } else { + result = <-resultCh + } } // Clear the handle tr.clearDriverHandle() // Store the wait result on the restart tracker - tr.restartTracker.SetWaitResult(waitRes) + tr.restartTracker.SetExitResult(result) if err := tr.exited(); err != nil { tr.logger.Error("exited hooks failed", "error", err) @@ -329,11 +367,11 @@ MAIN: // If task terminated, update server. All other exit conditions (eg // killed or out of restarts) will perform their own server updates. - if waitRes != nil { + if result != nil { event := structs.NewTaskEvent(structs.TaskTerminated). - SetExitCode(waitRes.ExitCode). - SetSignal(waitRes.Signal). - SetExitMessage(waitRes.Err) + SetExitCode(result.ExitCode). + SetSignal(result.Signal). + SetExitMessage(result.Err) tr.UpdateState(structs.TaskStateDead, event) } @@ -398,37 +436,67 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { // runDriver runs the driver and waits for it to exit func (tr *TaskRunner) runDriver() error { - // Run prestart - ctx := driver.NewExecContext(tr.taskDir, tr.envBuilder.Build()) - _, err := tr.driver.Prestart(ctx, tr.task) - if err != nil { - tr.logger.Error("driver pre-start failed", "error", err) + + taskConfig := drivers.NewTaskConfig(tr.task, tr.taskDir, tr.envBuilder.Build()) + taskConfig.ID = tr.buildID() + taskConfig.StdoutPath = tr.logmonHookConfig.stdoutFifo + taskConfig.StderrPath = tr.logmonHookConfig.stderrFifo + + evalCtx := &hcl.EvalContext{ + Functions: shared.GetStdlibFuncs(), + Variables: map[string]cty.Value{ + "NOMAD_ENV_bin": cty.StringVal("/bin/consul"), + }, + } + + val, diag := shared.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx) + if diag.HasErrors() { + errStr := "failed to parse config" + for _, err := range diag.Errs() { + errStr = fmt.Sprintf("%s\n* %s", errStr, err.Error()) + } + return errors.New(errStr) + } + + if err := taskConfig.EncodeDriverConfig(val); err != nil { + tr.logger.Warn("failed to encode driver config", "error", err) return err } - // Create a new context for Start since the environment may have been updated. - ctx = driver.NewExecContext(tr.taskDir, tr.envBuilder.Build()) - - ctx.StdoutFifo = tr.logmonHookConfig.stdoutFifo - ctx.StderrFifo = tr.logmonHookConfig.stderrFifo + //TODO mounts and devices + //XXX Evaluate and encode driver config // Start the job - sresp, err := tr.driver.Start(ctx, tr.task) + handle, net, err := tr.driver.StartTask(taskConfig) if err != nil { tr.logger.Warn("driver start failed", "error", err) return err } + tr.network = net - // Store the driver handle and associated metadata - tr.setDriverHandle(sresp.Handle) + tr.localStateLock.Lock() + tr.localState.TaskHandle = handle + tr.localStateLock.Unlock() + tr.updateDriverHandle(taskConfig.ID) // Emit an event that we started tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) return nil } +func (tr *TaskRunner) updateDriverHandle(taskID string) { + tr.handleLock.Lock() + defer tr.handleLock.Unlock() + tr.handle = &driverHandleImpl{ + driver: tr.driver, + net: tr.network, + taskID: taskID, + task: tr.Task(), + } +} + // initDriver creates the driver for the task -func (tr *TaskRunner) initDriver() error { +/*func (tr *TaskRunner) initDriver() error { // Create a task-specific event emitter callback to expose minimal // state to drivers //XXX Replace with EmitEvent -- no need for a shim @@ -455,6 +523,37 @@ func (tr *TaskRunner) initDriver() error { } tr.driver = driver + return nil +}*/ + +// initDriver retrives the DriverPlugin from the plugin loader for this task +func (tr *TaskRunner) initDriver() error { + plugin, err := tr.pluginSingletonLoader.Dispense(tr.Task().Driver, base.PluginTypeDriver, tr.logger) + if err != nil { + return err + } + + // XXX need to be able to reattach to running drivers + driver, ok := plugin.Plugin().(drivers.DriverPlugin) + if !ok { + return fmt.Errorf("plugin loaded for driver %s does not implement DriverPlugin interface", tr.task.Driver) + } + + tr.driver = driver + + schema, err := tr.driver.TaskConfigSchema() + if err != nil { + return err + } + spec, _ := hclspec.Convert(schema) + tr.taskSchema = spec + + caps, err := tr.driver.Capabilities() + if err != nil { + return err + } + tr.driverCapabilities = caps + return nil } @@ -462,10 +561,14 @@ func (tr *TaskRunner) initDriver() error { // handleDestroy will retry with an exponential backoff and will give up at a // given limit. It returns whether the task was destroyed and the error // associated with the last kill attempt. -func (tr *TaskRunner) handleDestroy(handle driver.DriverHandle) (destroyed bool, err error) { +func (tr *TaskRunner) handleDestroy(handle tinterfaces.DriverHandle) (destroyed bool, err error) { // Cap the number of times we attempt to kill the task. for i := 0; i < killFailureLimit; i++ { if err = handle.Kill(); err != nil { + if err == drivers.ErrTaskNotFound { + tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID()) + return true, nil + } // Calculate the new backoff backoff := (1 << (2 * uint64(i))) * killBackoffBaseline if backoff > killBackoffLimit { @@ -490,6 +593,11 @@ func (tr *TaskRunner) persistLocalState() error { return tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState) } +// buildID builds a consistent unique ID for the task from the alloc ID, task name and restart attempt +func (tr *TaskRunner) buildID() string { + return fmt.Sprintf("%s/%s/%d", tr.allocID, tr.taskName, tr.restartTracker.GetCount()) +} + // XXX If the objects don't exists since the client shutdown before the task // runner ever saved state, then we should treat it as a new task runner and not // return an error @@ -628,7 +736,7 @@ func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error { // Ensure the event is populated with human readable strings event.PopulateEventDisplayMessage() - // Propagate failure from event to task state + // Propogate failure from event to task state if event.FailsTask { tr.state.Failed = true } diff --git a/client/allocrunner/taskrunner/task_runner_getters.go b/client/allocrunner/taskrunner/task_runner_getters.go index 67d3a4810..2affbcc3e 100644 --- a/client/allocrunner/taskrunner/task_runner_getters.go +++ b/client/allocrunner/taskrunner/task_runner_getters.go @@ -1,7 +1,7 @@ package taskrunner import ( - "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" "github.com/hashicorp/nomad/nomad/structs" ) diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go deleted file mode 100644 index 171d6b8c7..000000000 --- a/client/allocrunnerv2/alloc_runner.go +++ /dev/null @@ -1,550 +0,0 @@ -package allocrunnerv2 - -import ( - "context" - "fmt" - "path/filepath" - "sync" - "time" - - log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/allocrunner" - "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" - "github.com/hashicorp/nomad/client/allocrunnerv2/state" - "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner" - "github.com/hashicorp/nomad/client/allocwatcher" - "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/consul" - cinterfaces "github.com/hashicorp/nomad/client/interfaces" - cstate "github.com/hashicorp/nomad/client/state" - cstructs "github.com/hashicorp/nomad/client/structs" - "github.com/hashicorp/nomad/client/vaultclient" - "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/shared/loader" -) - -// allocRunner is used to run all the tasks in a given allocation -type allocRunner struct { - // id is the ID of the allocation. Can be accessed without a lock - id string - - // Logger is the logger for the alloc runner. - logger log.Logger - - clientConfig *config.Config - - // stateUpdater is used to emit updated task state - stateUpdater cinterfaces.AllocStateHandler - - // consulClient is the client used by the consul service hook for - // registering services and checks - consulClient consul.ConsulServiceAPI - - // vaultClient is the used to manage Vault tokens - vaultClient vaultclient.VaultClient - - // waitCh is closed when the Run() loop has exited - waitCh chan struct{} - - // destroyed is true when the Run() loop has exited, postrun hooks have - // run, and alloc runner has been destroyed - destroyed bool - destroyedLock sync.Mutex - - // Alloc captures the allocation being run. - alloc *structs.Allocation - allocLock sync.RWMutex - - // state is the alloc runner's state - state *state.State - stateLock sync.RWMutex - - stateDB cstate.StateDB - - // allocDir is used to build the allocations directory structure. - allocDir *allocdir.AllocDir - - // runnerHooks are alloc runner lifecycle hooks that should be run on state - // transistions. - runnerHooks []interfaces.RunnerHook - - // tasks are the set of task runners - tasks map[string]*taskrunner.TaskRunner - tasksLock sync.RWMutex - - // allocBroadcaster sends client allocation updates to all listeners - allocBroadcaster *cstructs.AllocBroadcaster - - // prevAllocWatcher allows waiting for a previous allocation to exit - // and if necessary migrate its alloc dir. - prevAllocWatcher allocwatcher.PrevAllocWatcher - - // pluginLoader is used to load plugins. - pluginLoader loader.PluginCatalog - - // pluginSingletonLoader is a plugin loader that will returns singleton - // instances of the plugins. - pluginSingletonLoader loader.PluginCatalog -} - -// NewAllocRunner returns a new allocation runner. -func NewAllocRunner(config *Config) (*allocRunner, error) { - alloc := config.Alloc - tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if tg == nil { - return nil, fmt.Errorf("failed to lookup task group %q", alloc.TaskGroup) - } - - ar := &allocRunner{ - id: alloc.ID, - alloc: alloc, - clientConfig: config.ClientConfig, - consulClient: config.Consul, - vaultClient: config.Vault, - tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), - waitCh: make(chan struct{}), - state: &state.State{}, - stateDB: config.StateDB, - stateUpdater: config.StateUpdater, - allocBroadcaster: cstructs.NewAllocBroadcaster(), - prevAllocWatcher: config.PrevAllocWatcher, - pluginLoader: config.PluginLoader, - pluginSingletonLoader: config.PluginSingletonLoader, - } - - // Create the logger based on the allocation ID - ar.logger = config.Logger.Named("alloc_runner").With("alloc_id", alloc.ID) - - // Create alloc dir - ar.allocDir = allocdir.NewAllocDir(ar.logger, filepath.Join(config.ClientConfig.AllocDir, alloc.ID)) - - // Initialize the runners hooks. - ar.initRunnerHooks() - - // Create the TaskRunners - if err := ar.initTaskRunners(tg.Tasks); err != nil { - return nil, err - } - - return ar, nil -} - -// initTaskRunners creates task runners but does *not* run them. -func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { - for _, task := range tasks { - config := &taskrunner.Config{ - Alloc: ar.alloc, - ClientConfig: ar.clientConfig, - Task: task, - TaskDir: ar.allocDir.NewTaskDir(task.Name), - Logger: ar.logger, - StateDB: ar.stateDB, - StateUpdater: ar, - Consul: ar.consulClient, - VaultClient: ar.vaultClient, - PluginLoader: ar.pluginLoader, - PluginSingletonLoader: ar.pluginSingletonLoader, - } - - // Create, but do not Run, the task runner - tr, err := taskrunner.NewTaskRunner(config) - if err != nil { - return fmt.Errorf("failed creating runner for task %q: %v", task.Name, err) - } - - ar.tasks[task.Name] = tr - } - return nil -} - -func (ar *allocRunner) WaitCh() <-chan struct{} { - return ar.waitCh -} - -// XXX How does alloc Restart work -// Run is the main goroutine that executes all the tasks. -func (ar *allocRunner) Run() { - // Close the wait channel - defer close(ar.waitCh) - - var taskWaitCh <-chan struct{} - - // Run the prestart hooks - // XXX Equivalent to TR.Prestart hook - if err := ar.prerun(); err != nil { - ar.logger.Error("prerun failed", "error", err) - goto POST - } - - // Run the runners - taskWaitCh = ar.runImpl() - -MAIN: - for { - select { - case <-taskWaitCh: - // TaskRunners have all exited - break MAIN - } - } - -POST: - // Run the postrun hooks - // XXX Equivalent to TR.Poststop hook - if err := ar.postrun(); err != nil { - ar.logger.Error("postrun failed", "error", err) - } -} - -// runImpl is used to run the runners. -func (ar *allocRunner) runImpl() <-chan struct{} { - for _, task := range ar.tasks { - go task.Run() - } - - // Return a combined WaitCh that is closed when all task runners have - // exited. - waitCh := make(chan struct{}) - go func() { - defer close(waitCh) - for _, task := range ar.tasks { - <-task.WaitCh() - } - }() - - return waitCh -} - -// Alloc returns the current allocation being run by this runner as sent by the -// server. This view of the allocation does not have updated task states. -func (ar *allocRunner) Alloc() *structs.Allocation { - ar.allocLock.RLock() - defer ar.allocLock.RUnlock() - return ar.alloc -} - -func (ar *allocRunner) setAlloc(updated *structs.Allocation) { - ar.allocLock.Lock() - ar.alloc = updated - ar.allocLock.Unlock() -} - -// GetAllocDir returns the alloc dir which is safe for concurrent use. -func (ar *allocRunner) GetAllocDir() *allocdir.AllocDir { - return ar.allocDir -} - -// Restore state from database. Must be called after NewAllocRunner but before -// Run. -func (ar *allocRunner) Restore() error { - // Restore task runners - for _, tr := range ar.tasks { - if err := tr.Restore(); err != nil { - return err - } - } - - return nil -} - -// TaskStateUpdated is called by TaskRunner when a task's state has been -// updated. This hook is used to compute changes to the alloc's ClientStatus -// and to update the server with the new state. -func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskState) { - // If a task is dead, we potentially want to kill other tasks in the group - if state.State == structs.TaskStateDead { - // Find all tasks that are not the one that is dead and check if the one - // that is dead is a leader - var otherTaskRunners []*taskrunner.TaskRunner - var otherTaskNames []string - leader := false - for name, tr := range ar.tasks { - if name != taskName { - otherTaskRunners = append(otherTaskRunners, tr) - otherTaskNames = append(otherTaskNames, name) - } else if tr.Task().Leader { - leader = true - } - } - - // If the task failed, we should kill all the other tasks in the task group. - if state.Failed { - if len(otherTaskRunners) > 0 { - ar.logger.Debug("task failure, destroying all tasks", "failed_task", taskName, "destroying", otherTaskNames) - } - for _, tr := range otherTaskRunners { - tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName)) - } - } else if leader { - if len(otherTaskRunners) > 0 { - ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", taskName, "destroying", otherTaskNames) - } - // If the task was a leader task we should kill all the other tasks. - for _, tr := range otherTaskRunners { - tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskLeaderDead)) - } - } - } - - // Gather the state of the other tasks - ar.tasksLock.RLock() - states := make(map[string]*structs.TaskState, len(ar.tasks)) - for name, tr := range ar.tasks { - if name == taskName { - states[name] = state - } else { - states[name] = tr.TaskState() - } - } - ar.tasksLock.RUnlock() - - // Get the client allocation - calloc := ar.clientAlloc(states) - - // Update the server - ar.stateUpdater.AllocStateUpdated(calloc) - - // Broadcast client alloc to listeners - ar.allocBroadcaster.Send(calloc) -} - -// clientAlloc takes in the task states and returns an Allocation populated -// with Client specific fields -func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation { - ar.stateLock.RLock() - defer ar.stateLock.RUnlock() - - // store task states for AllocState to expose - ar.state.TaskStates = taskStates - - a := &structs.Allocation{ - ID: ar.id, - TaskStates: taskStates, - } - - if d := ar.state.DeploymentStatus; d != nil { - a.DeploymentStatus = d.Copy() - } - - // Compute the ClientStatus - if ar.state.ClientStatus != "" { - // The client status is being forced - a.ClientStatus, a.ClientDescription = ar.state.ClientStatus, ar.state.ClientDescription - } else { - a.ClientStatus, a.ClientDescription = getClientStatus(taskStates) - } - - // If the allocation is terminal, make sure all required fields are properly - // set. - if a.ClientTerminalStatus() { - alloc := ar.Alloc() - - // If we are part of a deployment and the task has failed, mark the - // alloc as unhealthy. This guards against the watcher not be started. - if a.ClientStatus == structs.AllocClientStatusFailed && - alloc.DeploymentID != "" && !a.DeploymentStatus.IsUnhealthy() { - a.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(false), - } - } - - // Make sure we have marked the finished at for every task. This is used - // to calculate the reschedule time for failed allocations. - now := time.Now() - for _, task := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks { - ts, ok := a.TaskStates[task.Name] - if !ok { - ts = &structs.TaskState{} - a.TaskStates[task.Name] = ts - } - if ts.FinishedAt.IsZero() { - ts.FinishedAt = now - } - } - } - - return a -} - -// getClientStatus takes in the task states for a given allocation and computes -// the client status and description -func getClientStatus(taskStates map[string]*structs.TaskState) (status, description string) { - var pending, running, dead, failed bool - for _, state := range taskStates { - switch state.State { - case structs.TaskStateRunning: - running = true - case structs.TaskStatePending: - pending = true - case structs.TaskStateDead: - if state.Failed { - failed = true - } else { - dead = true - } - } - } - - // Determine the alloc status - if failed { - return structs.AllocClientStatusFailed, "Failed tasks" - } else if running { - return structs.AllocClientStatusRunning, "Tasks are running" - } else if pending { - return structs.AllocClientStatusPending, "No tasks have started" - } else if dead { - return structs.AllocClientStatusComplete, "All tasks have completed" - } - - return "", "" -} - -// AllocState returns a copy of allocation state including a snapshot of task -// states. -func (ar *allocRunner) AllocState() *state.State { - ar.stateLock.RLock() - state := ar.state.Copy() - ar.stateLock.RUnlock() - - // If TaskStateUpdated has not been called yet, ar.state.TaskStates - // won't be set as it is not the canonical source of TaskStates. - if len(state.TaskStates) == 0 { - ar.tasksLock.RLock() - ar.state.TaskStates = make(map[string]*structs.TaskState, len(ar.tasks)) - for k, tr := range ar.tasks { - state.TaskStates[k] = tr.TaskState() - } - ar.tasksLock.RUnlock() - } - - return state -} - -// Update the running allocation with a new version received from the server. -// -// This method sends the updated alloc to Run for serially processing updates. -// If there is already a pending update it will be discarded and replaced by -// the latest update. -func (ar *allocRunner) Update(update *structs.Allocation) { - // Update ar.alloc - ar.setAlloc(update) - - // Run hooks - if err := ar.update(update); err != nil { - ar.logger.Error("error running update hooks", "error", err) - } - - // Update task runners - for _, tr := range ar.tasks { - tr.Update(update) - } -} - -func (ar *allocRunner) Listener() *cstructs.AllocListener { - return ar.allocBroadcaster.Listen() -} - -// Destroy the alloc runner by synchronously stopping it if it is still running -// and cleaning up all of its resources. -// -// This method is safe for calling concurrently with Run() and will cause it to -// exit (thus closing WaitCh). -func (ar *allocRunner) Destroy() { - // Stop tasks - ar.tasksLock.RLock() - for name, tr := range ar.tasks { - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil { - if err == taskrunner.ErrTaskNotRunning { - ar.logger.Trace("task not running", "task_name", name) - } else { - ar.logger.Warn("failed to kill task", "error", err, "task_name", name) - } - } - } - ar.tasksLock.RUnlock() - - // Wait for tasks to exit and postrun hooks to finish - <-ar.waitCh - - // Run destroy hooks - if err := ar.destroy(); err != nil { - ar.logger.Warn("error running destroy hooks", "error", err) - } - - // Cleanup state db - if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil { - ar.logger.Warn("failed to delete allocation state", "error", err) - } - - // Mark alloc as destroyed - ar.destroyedLock.Lock() - ar.destroyed = true - ar.destroyedLock.Unlock() -} - -// IsDestroyed returns true if the alloc runner has been destroyed (stopped and -// garbage collected). -// -// This method is safe for calling concurrently with Run(). Callers must -// receive on WaitCh() to block until alloc runner has stopped and been -// destroyed. -func (ar *allocRunner) IsDestroyed() bool { - ar.destroyedLock.Lock() - defer ar.destroyedLock.Unlock() - return ar.destroyed -} - -// IsWaiting returns true if the alloc runner is waiting for its previous -// allocation to terminate. -// -// This method is safe for calling concurrently with Run(). -func (ar *allocRunner) IsWaiting() bool { - return ar.prevAllocWatcher.IsWaiting() -} - -// IsMigrating returns true if the alloc runner is migrating data from its -// previous allocation. -// -// This method is safe for calling concurrently with Run(). -func (ar *allocRunner) IsMigrating() bool { - return ar.prevAllocWatcher.IsMigrating() -} - -func (ar *allocRunner) StatsReporter() allocrunner.AllocStatsReporter { - return ar -} - -// LatestAllocStats returns the latest stats for an allocation. If taskFilter -// is set, only stats for that task -- if it exists -- are returned. -func (ar *allocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) { - ar.tasksLock.RLock() - defer ar.tasksLock.RUnlock() - - astat := &cstructs.AllocResourceUsage{ - Tasks: make(map[string]*cstructs.TaskResourceUsage, len(ar.tasks)), - ResourceUsage: &cstructs.ResourceUsage{ - MemoryStats: &cstructs.MemoryStats{}, - CpuStats: &cstructs.CpuStats{}, - }, - } - - for name, tr := range ar.tasks { - if taskFilter != "" && taskFilter != name { - // Getting stats for a particular task and its not this one! - continue - } - - if usage := tr.LatestResourceUsage(); usage != nil { - astat.Tasks[name] = usage - astat.ResourceUsage.Add(usage.ResourceUsage) - if usage.Timestamp > astat.Timestamp { - astat.Timestamp = usage.Timestamp - } - } - } - - return astat, nil -} diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go deleted file mode 100644 index 14f64789e..000000000 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ /dev/null @@ -1,903 +0,0 @@ -package taskrunner - -import ( - "context" - "errors" - "fmt" - "sync" - "time" - - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/hcl2/hcl" - "github.com/hashicorp/hcl2/hcldec" - "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts" - "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" - tinterfaces "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" - "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state" - "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/consul" - "github.com/hashicorp/nomad/client/driver/env" - cstate "github.com/hashicorp/nomad/client/state" - cstructs "github.com/hashicorp/nomad/client/structs" - "github.com/hashicorp/nomad/client/vaultclient" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/base" - "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/shared" - "github.com/hashicorp/nomad/plugins/shared/hclspec" - "github.com/hashicorp/nomad/plugins/shared/loader" - "github.com/zclconf/go-cty/cty" -) - -const ( - // killBackoffBaseline is the baseline time for exponential backoff while - // killing a task. - killBackoffBaseline = 5 * time.Second - - // killBackoffLimit is the limit of the exponential backoff for killing - // the task. - killBackoffLimit = 2 * time.Minute - - // killFailureLimit is how many times we will attempt to kill a task before - // giving up and potentially leaking resources. - killFailureLimit = 5 - - // triggerUpdatechCap is the capacity for the triggerUpdateCh used for - // triggering updates. It should be exactly 1 as even if multiple - // updates have come in since the last one was handled, we only need to - // handle the last one. - triggerUpdateChCap = 1 -) - -type TaskRunner struct { - // allocID and taskName are immutable so these fields may be accessed - // without locks - allocID string - taskName string - - alloc *structs.Allocation - allocLock sync.Mutex - - clientConfig *config.Config - - // stateUpdater is used to emit updated task state - stateUpdater interfaces.TaskStateHandler - - // state captures the state of the task for updating the allocation - state *structs.TaskState - stateLock sync.Mutex - - // localState captures the node-local state of the task for when the - // Nomad agent restarts - localState *state.LocalState - localStateLock sync.RWMutex - - // stateDB is for persisting localState and taskState - stateDB cstate.StateDB - - // persistedHash is the hash of the last persisted state for skipping - // unnecessary writes - persistedHash []byte - - // ctx is the task runner's context representing the tasks's lifecycle. - // Canceling the context will cause the task to be destroyed. - ctx context.Context - - // ctxCancel is used to exit the task runner's Run loop without - // stopping the task. Shutdown hooks are run. - ctxCancel context.CancelFunc - - // Logger is the logger for the task runner. - logger log.Logger - - // triggerUpdateCh is ticked whenever update hooks need to be run and - // must be created with cap=1 to signal a pending update and prevent - // callers from deadlocking if the receiver has exited. - triggerUpdateCh chan struct{} - - // waitCh is closed when the task runner has transitioned to a terminal - // state - waitCh chan struct{} - - // driver is the driver for the task. - driver drivers.DriverPlugin - - // driverCapabilities is the set capabilities the driver supports - driverCapabilities *drivers.Capabilities - - // taskSchema is the hcl spec for the task driver configuration - taskSchema hcldec.Spec - - // handleLock guards access to handle and handleResult - handleLock sync.Mutex - - // handle to the running driver - handle tinterfaces.DriverHandle - - // network is the configuration for the driver network if one was created - network *cstructs.DriverNetwork - - // task is the task being run - task *structs.Task - taskLock sync.RWMutex - - // taskDir is the directory structure for this task. - taskDir *allocdir.TaskDir - - // envBuilder is used to build the task's environment - envBuilder *env.Builder - - // restartTracker is used to decide if the task should be restarted. - restartTracker *restarts.RestartTracker - - // runnerHooks are task runner lifecycle hooks that should be run on state - // transistions. - runnerHooks []interfaces.TaskHook - - // consulClient is the client used by the consul service hook for - // registering services and checks - consulClient consul.ConsulServiceAPI - - // vaultClient is the client to use to derive and renew Vault tokens - vaultClient vaultclient.VaultClient - - // vaultToken is the current Vault token. It should be accessed with the - // getter. - vaultToken string - vaultTokenLock sync.Mutex - - // baseLabels are used when emitting tagged metrics. All task runner metrics - // will have these tags, and optionally more. - baseLabels []metrics.Label - - // logmonHookConfig is used to get the paths to the stdout and stderr fifos - // to be passed to the driver for task logging - logmonHookConfig *logmonHookConfig - - // resourceUsage is written via UpdateStats and read via - // LatestResourceUsage. May be nil at all times. - resourceUsage *cstructs.TaskResourceUsage - resourceUsageLock sync.Mutex - - // PluginLoader is used to load plugins. - pluginLoader loader.PluginCatalog - - // PluginSingletonLoader is a plugin loader that will returns singleton - // instances of the plugins. - pluginSingletonLoader loader.PluginCatalog -} - -type Config struct { - Alloc *structs.Allocation - ClientConfig *config.Config - Consul consul.ConsulServiceAPI - Task *structs.Task - TaskDir *allocdir.TaskDir - Logger log.Logger - - // VaultClient is the client to use to derive and renew Vault tokens - VaultClient vaultclient.VaultClient - - // LocalState is optionally restored task state - LocalState *state.LocalState - - // StateDB is used to store and restore state. - StateDB cstate.StateDB - - // StateUpdater is used to emit updated task state - StateUpdater interfaces.TaskStateHandler - - // PluginLoader is used to load plugins. - PluginLoader loader.PluginCatalog - - // PluginSingletonLoader is a plugin loader that will returns singleton - // instances of the plugins. - PluginSingletonLoader loader.PluginCatalog -} - -func NewTaskRunner(config *Config) (*TaskRunner, error) { - // Create a context for the runner - trCtx, trCancel := context.WithCancel(context.Background()) - - // Initialize the environment builder - envBuilder := env.NewBuilder( - config.ClientConfig.Node, - config.Alloc, - config.Task, - config.ClientConfig.Region, - ) - - tr := &TaskRunner{ - alloc: config.Alloc, - allocID: config.Alloc.ID, - clientConfig: config.ClientConfig, - task: config.Task, - taskDir: config.TaskDir, - taskName: config.Task.Name, - envBuilder: envBuilder, - consulClient: config.Consul, - vaultClient: config.VaultClient, - state: config.Alloc.TaskStates[config.Task.Name].Copy(), - localState: config.LocalState, - stateDB: config.StateDB, - stateUpdater: config.StateUpdater, - ctx: trCtx, - ctxCancel: trCancel, - triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), - waitCh: make(chan struct{}), - pluginLoader: config.PluginLoader, - pluginSingletonLoader: config.PluginSingletonLoader, - } - - // Create the logger based on the allocation ID - tr.logger = config.Logger.Named("task_runner").With("task", config.Task.Name) - - // Build the restart tracker. - tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup) - if tg == nil { - tr.logger.Error("alloc missing task group") - return nil, fmt.Errorf("alloc missing task group") - } - tr.restartTracker = restarts.NewRestartTracker(tg.RestartPolicy, tr.alloc.Job.Type) - - // Initialize the task state - tr.initState() - - // Get the driver - if err := tr.initDriver(); err != nil { - tr.logger.Error("failed to create driver", "error", err) - return nil, err - } - - // Initialize the runners hooks. - tr.initHooks() - - // Initialize base labels - tr.initLabels() - - return tr, nil -} - -func (tr *TaskRunner) initState() { - if tr.state == nil { - tr.state = &structs.TaskState{ - State: structs.TaskStatePending, - } - } - if tr.localState == nil { - tr.localState = state.NewLocalState() - } -} - -func (tr *TaskRunner) initLabels() { - alloc := tr.Alloc() - tr.baseLabels = []metrics.Label{ - { - Name: "job", - Value: alloc.Job.Name, - }, - { - Name: "task_group", - Value: alloc.TaskGroup, - }, - { - Name: "alloc_id", - Value: tr.allocID, - }, - { - Name: "task", - Value: tr.taskName, - }, - } -} - -func (tr *TaskRunner) Run() { - defer close(tr.waitCh) - var result *drivers.ExitResult - - // Updates are handled asynchronously with the other hooks but each - // triggered update - whether due to alloc updates or a new vault token - // - should be handled serially. - go tr.handleUpdates() - -MAIN: - for tr.ctx.Err() == nil { - // Run the prestart hooks - if err := tr.prestart(); err != nil { - tr.logger.Error("prestart failed", "error", err) - tr.restartTracker.SetStartError(err) - goto RESTART - } - - if tr.ctx.Err() != nil { - break MAIN - } - - // Run the task - if err := tr.runDriver(); err != nil { - tr.logger.Error("running driver failed", "error", err) - tr.restartTracker.SetStartError(err) - goto RESTART - } - - // Run the poststart hooks - if err := tr.poststart(); err != nil { - tr.logger.Error("poststart failed", "error", err) - } - - // Grab the result proxy and wait for task to exit - { - handle := tr.getDriverHandle() - - // Do *not* use tr.ctx here as it would cause Wait() to - // unblock before the task exits when Kill() is called. - if resultCh, err := handle.WaitCh(context.Background()); err != nil { - tr.logger.Error("wait task failed", "error", err) - } else { - result = <-resultCh - } - } - - // Clear the handle - tr.clearDriverHandle() - - // Store the wait result on the restart tracker - tr.restartTracker.SetExitResult(result) - - if err := tr.exited(); err != nil { - tr.logger.Error("exited hooks failed", "error", err) - } - - RESTART: - restart, restartDelay := tr.shouldRestart() - if !restart { - break MAIN - } - - // Actually restart by sleeping and also watching for destroy events - select { - case <-time.After(restartDelay): - case <-tr.ctx.Done(): - tr.logger.Trace("task killed between restarts", "delay", restartDelay) - break MAIN - } - } - - // If task terminated, update server. All other exit conditions (eg - // killed or out of restarts) will perform their own server updates. - if result != nil { - event := structs.NewTaskEvent(structs.TaskTerminated). - SetExitCode(result.ExitCode). - SetSignal(result.Signal). - SetExitMessage(result.Err) - tr.UpdateState(structs.TaskStateDead, event) - } - - // Run the stop hooks - if err := tr.stop(); err != nil { - tr.logger.Error("stop failed", "error", err) - } - - tr.logger.Debug("task run loop exiting") -} - -// handleUpdates runs update hooks when triggerUpdateCh is ticked and exits -// when Run has returned. Should only be run in a goroutine from Run. -func (tr *TaskRunner) handleUpdates() { - for { - select { - case <-tr.triggerUpdateCh: - case <-tr.waitCh: - return - } - - if tr.Alloc().TerminalStatus() { - // Terminal update: kill TaskRunner and let Run execute postrun hooks - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil { - tr.logger.Warn("error stopping task", "error", err) - } - continue - } - - // Non-terminal update; run hooks - tr.updateHooks() - } -} - -// shouldRestart determines whether the task should be restarted and updates -// the task state unless the task is killed or terminated. -func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { - // Determine if we should restart - state, when := tr.restartTracker.GetState() - reason := tr.restartTracker.GetReason() - switch state { - case structs.TaskKilled: - // Never restart an explicitly killed task. Kill method handles - // updating the server. - return false, 0 - case structs.TaskNotRestarting, structs.TaskTerminated: - tr.logger.Info("not restarting task", "reason", reason) - if state == structs.TaskNotRestarting { - tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting).SetRestartReason(reason).SetFailsTask()) - } - return false, 0 - case structs.TaskRestarting: - tr.logger.Info("restarting task", "reason", reason, "delay", when) - tr.UpdateState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when).SetRestartReason(reason)) - return true, 0 - default: - tr.logger.Error("restart tracker returned unknown state", "state", state) - return true, when - } -} - -// runDriver runs the driver and waits for it to exit -func (tr *TaskRunner) runDriver() error { - - taskConfig := drivers.NewTaskConfig(tr.task, tr.taskDir, tr.envBuilder.Build()) - taskConfig.ID = tr.buildID() - taskConfig.StdoutPath = tr.logmonHookConfig.stdoutFifo - taskConfig.StderrPath = tr.logmonHookConfig.stderrFifo - - evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), - Variables: map[string]cty.Value{ - "NOMAD_ENV_bin": cty.StringVal("/bin/consul"), - }, - } - - val, diag := shared.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx) - if diag.HasErrors() { - errStr := "failed to parse config" - for _, err := range diag.Errs() { - errStr = fmt.Sprintf("%s\n* %s", errStr, err.Error()) - } - return errors.New(errStr) - } - - if err := taskConfig.EncodeDriverConfig(val); err != nil { - tr.logger.Warn("failed to encode driver config", "error", err) - return err - } - - //TODO mounts and devices - //XXX Evaluate and encode driver config - - // Start the job - handle, net, err := tr.driver.StartTask(taskConfig) - if err != nil { - tr.logger.Warn("driver start failed", "error", err) - return err - } - tr.network = net - - tr.localStateLock.Lock() - tr.localState.TaskHandle = handle - tr.localStateLock.Unlock() - - tr.updateDriverHandle(taskConfig.ID) - // Emit an event that we started - tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) - return nil -} - -func (tr *TaskRunner) updateDriverHandle(taskID string) { - tr.handleLock.Lock() - defer tr.handleLock.Unlock() - tr.handle = &driverHandleImpl{ - driver: tr.driver, - net: tr.network, - taskID: taskID, - task: tr.Task(), - } -} - -// initDriver creates the driver for the task -/*func (tr *TaskRunner) initDriver() error { - // Create a task-specific event emitter callback to expose minimal - // state to drivers - //XXX Replace with EmitEvent -- no need for a shim - eventEmitter := func(m string, args ...interface{}) { - msg := fmt.Sprintf(m, args...) - tr.logger.Debug("driver event", "event", msg) - tr.EmitEvent(structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg)) - } - - alloc := tr.Alloc() - driverCtx := driver.NewDriverContext( - alloc.Job.Name, - alloc.TaskGroup, - tr.taskName, - tr.allocID, - tr.clientConfig, // XXX Why does it need this - tr.clientConfig.Node, // XXX THIS I NEED TO FIX - tr.logger.StandardLogger(nil), // XXX Should pass this through - eventEmitter) - - driver, err := driver.NewDriver(tr.task.Driver, driverCtx) - if err != nil { - return err - } - - tr.driver = driver - return nil -}*/ - -// initDriver retrives the DriverPlugin from the plugin loader for this task -func (tr *TaskRunner) initDriver() error { - plugin, err := tr.pluginSingletonLoader.Dispense(tr.Task().Driver, base.PluginTypeDriver, tr.logger) - if err != nil { - return err - } - - // XXX need to be able to reattach to running drivers - driver, ok := plugin.Plugin().(drivers.DriverPlugin) - if !ok { - return fmt.Errorf("plugin loaded for driver %s does not implement DriverPlugin interface", tr.task.Driver) - } - - tr.driver = driver - - schema, err := tr.driver.TaskConfigSchema() - if err != nil { - return err - } - spec, _ := hclspec.Convert(schema) - tr.taskSchema = spec - - caps, err := tr.driver.Capabilities() - if err != nil { - return err - } - tr.driverCapabilities = caps - - return nil -} - -// handleDestroy kills the task handle. In the case that killing fails, -// handleDestroy will retry with an exponential backoff and will give up at a -// given limit. It returns whether the task was destroyed and the error -// associated with the last kill attempt. -func (tr *TaskRunner) handleDestroy(handle tinterfaces.DriverHandle) (destroyed bool, err error) { - // Cap the number of times we attempt to kill the task. - for i := 0; i < killFailureLimit; i++ { - if err = handle.Kill(); err != nil { - if err == drivers.ErrTaskNotFound { - tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID()) - return true, nil - } - // Calculate the new backoff - backoff := (1 << (2 * uint64(i))) * killBackoffBaseline - if backoff > killBackoffLimit { - backoff = killBackoffLimit - } - - tr.logger.Error("failed to kill task", "backoff", backoff, "error", err) - time.Sleep(backoff) - } else { - // Kill was successful - return true, nil - } - } - return -} - -// persistLocalState persists local state to disk synchronously. -func (tr *TaskRunner) persistLocalState() error { - tr.localStateLock.Lock() - defer tr.localStateLock.Unlock() - - return tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState) -} - -// buildID builds a consistent unique ID for the task from the alloc ID, task name and restart attempt -func (tr *TaskRunner) buildID() string { - return fmt.Sprintf("%s/%s/%d", tr.allocID, tr.taskName, tr.restartTracker.GetCount()) -} - -// XXX If the objects don't exists since the client shutdown before the task -// runner ever saved state, then we should treat it as a new task runner and not -// return an error -// -// Restore task runner state. Called by AllocRunner.Restore after NewTaskRunner -// but before Run so no locks need to be acquired. -func (tr *TaskRunner) Restore() error { - ls, ts, err := tr.stateDB.GetTaskRunnerState(tr.allocID, tr.taskName) - if err != nil { - return err - } - - tr.localState = ls - tr.state = ts - return nil -} - -// UpdateState sets the task runners allocation state and triggers a server -// update. -func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { - tr.logger.Debug("setting task state", "state", state, "event", event.Type) - // Update the local state - stateCopy := tr.setStateLocal(state, event) - - // Notify the alloc runner of the transition - tr.stateUpdater.TaskStateUpdated(tr.taskName, stateCopy) -} - -// setStateLocal updates the local in-memory state, persists a copy to disk and returns a -// copy of the task's state. -func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *structs.TaskState { - tr.stateLock.Lock() - defer tr.stateLock.Unlock() - - //XXX REMOVE ME AFTER TESTING - if state == "" { - panic("UpdateState must not be called with an empty state") - } - - // Update the task state - oldState := tr.state.State - taskState := tr.state - taskState.State = state - - // Append the event - tr.appendEvent(event) - - // Handle the state transition. - switch state { - case structs.TaskStateRunning: - // Capture the start time if it is just starting - if oldState != structs.TaskStateRunning { - taskState.StartedAt = time.Now().UTC() - if !tr.clientConfig.DisableTaggedMetrics { - metrics.IncrCounterWithLabels([]string{"client", "allocs", "running"}, 1, tr.baseLabels) - } - //if r.config.BackwardsCompatibleMetrics { - //metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "running"}, 1) - //} - } - case structs.TaskStateDead: - // Capture the finished time if not already set - if taskState.FinishedAt.IsZero() { - taskState.FinishedAt = time.Now().UTC() - } - - // Emitting metrics to indicate task complete and failures - if taskState.Failed { - if !tr.clientConfig.DisableTaggedMetrics { - metrics.IncrCounterWithLabels([]string{"client", "allocs", "failed"}, 1, tr.baseLabels) - } - //if r.config.BackwardsCompatibleMetrics { - //metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "failed"}, 1) - //} - } else { - if !tr.clientConfig.DisableTaggedMetrics { - metrics.IncrCounterWithLabels([]string{"client", "allocs", "complete"}, 1, tr.baseLabels) - } - //if r.config.BackwardsCompatibleMetrics { - //metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "complete"}, 1) - //} - } - } - - // Persist the state and event - if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, taskState); err != nil { - // Only a warning because the next event/state-transition will - // try to persist it again. - tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state) - } - - return tr.state.Copy() -} - -// EmitEvent appends a new TaskEvent to this task's TaskState. The actual -// TaskState.State (pending, running, dead) is not changed. Use UpdateState to -// transition states. -// Events are persisted locally and sent to the server, but errors are simply -// logged. Use AppendEvent to simply add a new event. -func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { - tr.stateLock.Lock() - defer tr.stateLock.Unlock() - - tr.appendEvent(event) - - if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil { - // Only a warning because the next event/state-transition will - // try to persist it again. - tr.logger.Warn("error persisting event", "error", err, "event", event) - } - - // Notify the alloc runner of the event - tr.stateUpdater.TaskStateUpdated(tr.taskName, tr.state.Copy()) -} - -// AppendEvent appends a new TaskEvent to this task's TaskState. The actual -// TaskState.State (pending, running, dead) is not changed. Use UpdateState to -// transition states. -// Events are persisted locally and errors are simply logged. Use EmitEvent -// also update AllocRunner. -func (tr *TaskRunner) AppendEvent(event *structs.TaskEvent) { - tr.stateLock.Lock() - defer tr.stateLock.Unlock() - - tr.appendEvent(event) - - if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil { - // Only a warning because the next event/state-transition will - // try to persist it again. - tr.logger.Warn("error persisting event", "error", err, "event", event) - } -} - -// appendEvent to task's event slice. Caller must acquire stateLock. -func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error { - // Ensure the event is populated with human readable strings - event.PopulateEventDisplayMessage() - - // Propogate failure from event to task state - if event.FailsTask { - tr.state.Failed = true - } - - // XXX This seems like a super awkward spot for this? Why not shouldRestart? - // Update restart metrics - if event.Type == structs.TaskRestarting { - if !tr.clientConfig.DisableTaggedMetrics { - metrics.IncrCounterWithLabels([]string{"client", "allocs", "restart"}, 1, tr.baseLabels) - } - //if r.config.BackwardsCompatibleMetrics { - //metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "restart"}, 1) - //} - tr.state.Restarts++ - tr.state.LastRestart = time.Unix(0, event.Time) - } - - // Append event to slice - appendTaskEvent(tr.state, event) - - return nil -} - -// WaitCh is closed when TaskRunner.Run exits. -func (tr *TaskRunner) WaitCh() <-chan struct{} { - return tr.waitCh -} - -// Update the running allocation with a new version received from the server. -// Calls Update hooks asynchronously with Run(). -// -// This method is safe for calling concurrently with Run() and does not modify -// the passed in allocation. -func (tr *TaskRunner) Update(update *structs.Allocation) { - // Update tr.alloc - tr.setAlloc(update) - - // Trigger update hooks - tr.triggerUpdateHooks() -} - -// triggerUpdate if there isn't already an update pending. Should be called -// instead of calling updateHooks directly to serialize runs of update hooks. -// TaskRunner state should be updated prior to triggering update hooks. -// -// Does not block. -func (tr *TaskRunner) triggerUpdateHooks() { - select { - case tr.triggerUpdateCh <- struct{}{}: - default: - // already an update hook pending - } -} - -// LatestResourceUsage returns the last resource utilization datapoint -// collected. May return nil if the task is not running or no resource -// utilization has been collected yet. -func (tr *TaskRunner) LatestResourceUsage() *cstructs.TaskResourceUsage { - tr.resourceUsageLock.Lock() - ru := tr.resourceUsage - tr.resourceUsageLock.Unlock() - return ru -} - -// UpdateStats updates and emits the latest stats from the driver. -func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) { - tr.resourceUsageLock.Lock() - tr.resourceUsage = ru - tr.resourceUsageLock.Unlock() - if ru != nil { - tr.emitStats(ru) - } -} - -//TODO Remove Backwardscompat or use tr.Alloc()? -func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) { - if !tr.clientConfig.DisableTaggedMetrics { - metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"}, - float32(ru.ResourceUsage.MemoryStats.RSS), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"}, - float32(ru.ResourceUsage.MemoryStats.RSS), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "cache"}, - float32(ru.ResourceUsage.MemoryStats.Cache), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "swap"}, - float32(ru.ResourceUsage.MemoryStats.Swap), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "max_usage"}, - float32(ru.ResourceUsage.MemoryStats.MaxUsage), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_usage"}, - float32(ru.ResourceUsage.MemoryStats.KernelUsage), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_max_usage"}, - float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage), tr.baseLabels) - } - - if tr.clientConfig.BackwardsCompatibleMetrics { - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "rss"}, float32(ru.ResourceUsage.MemoryStats.RSS)) - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "cache"}, float32(ru.ResourceUsage.MemoryStats.Cache)) - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "swap"}, float32(ru.ResourceUsage.MemoryStats.Swap)) - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "max_usage"}, float32(ru.ResourceUsage.MemoryStats.MaxUsage)) - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "kernel_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelUsage)) - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "kernel_max_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage)) - } -} - -//TODO Remove Backwardscompat or use tr.Alloc()? -func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) { - if !tr.clientConfig.DisableTaggedMetrics { - metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_percent"}, - float32(ru.ResourceUsage.CpuStats.Percent), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "system"}, - float32(ru.ResourceUsage.CpuStats.SystemMode), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "user"}, - float32(ru.ResourceUsage.CpuStats.UserMode), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_time"}, - float32(ru.ResourceUsage.CpuStats.ThrottledTime), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_periods"}, - float32(ru.ResourceUsage.CpuStats.ThrottledPeriods), tr.baseLabels) - metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_ticks"}, - float32(ru.ResourceUsage.CpuStats.TotalTicks), tr.baseLabels) - } - - if tr.clientConfig.BackwardsCompatibleMetrics { - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "total_percent"}, float32(ru.ResourceUsage.CpuStats.Percent)) - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "system"}, float32(ru.ResourceUsage.CpuStats.SystemMode)) - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "user"}, float32(ru.ResourceUsage.CpuStats.UserMode)) - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "throttled_time"}, float32(ru.ResourceUsage.CpuStats.ThrottledTime)) - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "throttled_periods"}, float32(ru.ResourceUsage.CpuStats.ThrottledPeriods)) - metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "total_ticks"}, float32(ru.ResourceUsage.CpuStats.TotalTicks)) - } -} - -// emitStats emits resource usage stats of tasks to remote metrics collector -// sinks -func (tr *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) { - if !tr.clientConfig.PublishAllocationMetrics { - return - } - - if ru.ResourceUsage.MemoryStats != nil { - tr.setGaugeForMemory(ru) - } - - if ru.ResourceUsage.CpuStats != nil { - tr.setGaugeForCPU(ru) - } -} - -// appendTaskEvent updates the task status by appending the new event. -func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) { - const capacity = 10 - if state.Events == nil { - state.Events = make([]*structs.TaskEvent, 1, capacity) - state.Events[0] = event - return - } - - // If we hit capacity, then shift it. - if len(state.Events) == capacity { - old := state.Events - state.Events = make([]*structs.TaskEvent, 0, capacity) - state.Events = append(state.Events, old[1:]...) - } - - state.Events = append(state.Events, event) -} diff --git a/client/client.go b/client/client.go index 491045dab..fd9935de9 100644 --- a/client/client.go +++ b/client/client.go @@ -761,7 +761,7 @@ func (c *Client) restoreState() error { for _, alloc := range allocs { c.configLock.RLock() - arConf := &allocrunnerv2.Config{ + arConf := &allocrunner.Config{ Alloc: alloc, Logger: c.logger, ClientConfig: c.config, @@ -1911,7 +1911,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error // The long term fix is to pass in the config and node separately and then // we don't have to do a copy. c.configLock.RLock() - arConf := &allocrunnerv2.Config{ + arConf := &allocrunner.Config{ Alloc: alloc, Logger: c.logger, ClientConfig: c.config, diff --git a/command/agent/consul/script.go b/command/agent/consul/script.go index 0caa3ef1e..4a24d8f99 100644 --- a/command/agent/consul/script.go +++ b/command/agent/consul/script.go @@ -8,7 +8,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/consul/api" - "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" "github.com/hashicorp/nomad/nomad/structs" ) diff --git a/command/agent/consul/structs.go b/command/agent/consul/structs.go index 729134523..a5d070d8f 100644 --- a/command/agent/consul/structs.go +++ b/command/agent/consul/structs.go @@ -1,7 +1,7 @@ package consul import ( - "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/interfaces" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" )