executor v2 (#4656)

* client/executor: refactor client to remove interpolation

* executor: POC libcontainer based executor

* vendor: use hashicorp libcontainer fork

* vendor: add libcontainer/nsenter dep

* executor: updated executor interface to simplify operations

* executor: implement logging pipe

* logmon: new logmon plugin to manage task logs

* driver/executor: use logmon for log management

* executor: fix tests and windows build

* executor: fix logging key names

* executor: fix test failures

* executor: add config field to toggle between using libcontainer and standard executors

* logmon: use discover utility to discover nomad executable

* executor: only call libcontainer-shim on main in linux

* logmon: use seperate path configs for stdout/stderr fifos

* executor: windows fixes

* executor: created reusable pid stats collection utility that can be used in an executor

* executor: update fifo.Open calls

* executor: fix build

* remove executor from docker driver

* executor: Shutdown func to kill and cleanup executor and its children

* executor: move linux specific universal executor funcs to seperate file

* move logmon initialization to a task runner hook

* client: doc fixes and renaming from code review


* taskrunner: use shared config struct for logmon fifo fields

* taskrunner: logmon only needs to be started once per task
This commit is contained in:
Nick Ethier
2018-09-24 14:37:45 -04:00
committed by Michael Schurter
parent da8f053a0d
commit 5b14d24bf4
68 changed files with 4186 additions and 2048 deletions

View File

@@ -55,14 +55,14 @@ type JavaDriverConfig struct {
// javaHandle is returned from Start/Open as a handle to the PID
type javaHandle struct {
pluginClient *plugin.Client
userPid int
executor executor.Executor
isolationConfig *dstructs.IsolationConfig
taskDir string
pluginClient *plugin.Client
userPid int
executor executor.Executor
taskDir string
killTimeout time.Duration
maxKillTimeout time.Duration
shutdownSignal string
version string
logger *log.Logger
waitCh chan *dstructs.WaitResult
@@ -239,8 +239,9 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
pluginLogFile := filepath.Join(ctx.TaskDir.Dir, "executor.out")
executorConfig := &dstructs.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: d.config.LogLevel,
LogFile: pluginLogFile,
LogLevel: d.config.LogLevel,
FSIsolation: true,
}
execIntf, pluginClient, err := createExecutor(d.config.LogOutput, d.config, executorConfig)
@@ -248,25 +249,12 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
return nil, err
}
// Set the context
executorCtx := &executor.ExecutorContext{
TaskEnv: ctx.TaskEnv,
Driver: "java",
Task: task,
TaskDir: ctx.TaskDir.Dir,
LogDir: ctx.TaskDir.LogDir,
}
if err := execIntf.SetContext(executorCtx); err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("failed to set executor context: %v", err)
}
absPath, err := GetAbsolutePath("java")
if err != nil {
return nil, err
}
taskKillSignal, err := getTaskKillSignal(task.KillSignal)
_, err = getTaskKillSignal(task.KillSignal)
if err != nil {
return nil, err
}
@@ -274,12 +262,20 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
execCmd := &executor.ExecCommand{
Cmd: absPath,
Args: args,
FSIsolation: true,
ResourceLimits: true,
User: getExecutorUser(task),
TaskKillSignal: taskKillSignal,
Resources: &executor.Resources{
CPU: task.Resources.CPU,
MemoryMB: task.Resources.MemoryMB,
IOPS: task.Resources.IOPS,
DiskMB: task.Resources.DiskMB,
},
Env: ctx.TaskEnv.List(),
TaskDir: ctx.TaskDir.Dir,
StdoutPath: ctx.StdoutFifo,
StderrPath: ctx.StderrFifo,
}
ps, err := execIntf.LaunchCmd(execCmd)
ps, err := execIntf.Launch(execCmd)
if err != nil {
pluginClient.Kill()
return nil, err
@@ -289,17 +285,17 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
// Return a driver handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &javaHandle{
pluginClient: pluginClient,
executor: execIntf,
userPid: ps.Pid,
isolationConfig: ps.IsolationConfig,
taskDir: ctx.TaskDir.Dir,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
version: d.config.Version.VersionNumber(),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
pluginClient: pluginClient,
executor: execIntf,
userPid: ps.Pid,
shutdownSignal: task.KillSignal,
taskDir: ctx.TaskDir.Dir,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
version: d.config.Version.VersionNumber(),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
go h.run()
return &StartResponse{Handle: h}, nil
@@ -308,13 +304,13 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
func (d *JavaDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil }
type javaId struct {
Version string
KillTimeout time.Duration
MaxKillTimeout time.Duration
PluginConfig *PluginReattachConfig
IsolationConfig *dstructs.IsolationConfig
TaskDir string
UserPid int
Version string
KillTimeout time.Duration
MaxKillTimeout time.Duration
PluginConfig *PluginReattachConfig
TaskDir string
UserPid int
ShutdownSignal string
}
func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@@ -334,12 +330,6 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("error destroying plugin and userpid: %v", e))
}
if id.IsolationConfig != nil {
ePid := pluginConfig.Reattach.Pid
if e := executor.ClientCleanup(id.IsolationConfig, ePid); e != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("destroying resource container failed: %v", e))
}
}
return nil, fmt.Errorf("error connecting to plugin: %v", merrs.ErrorOrNil())
}
@@ -349,16 +339,16 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
// Return a driver handle
h := &javaHandle{
pluginClient: pluginClient,
executor: exec,
userPid: id.UserPid,
isolationConfig: id.IsolationConfig,
logger: d.logger,
version: id.Version,
killTimeout: id.KillTimeout,
maxKillTimeout: id.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
pluginClient: pluginClient,
executor: exec,
userPid: id.UserPid,
shutdownSignal: id.ShutdownSignal,
logger: d.logger,
version: id.Version,
killTimeout: id.KillTimeout,
maxKillTimeout: id.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
go h.run()
return h, nil
@@ -366,13 +356,13 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
func (h *javaHandle) ID() string {
id := javaId{
Version: h.version,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
IsolationConfig: h.isolationConfig,
TaskDir: h.taskDir,
Version: h.version,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
TaskDir: h.taskDir,
ShutdownSignal: h.shutdownSignal,
}
data, err := json.Marshal(id)
@@ -389,7 +379,12 @@ func (h *javaHandle) WaitCh() chan *dstructs.WaitResult {
func (h *javaHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
h.executor.UpdateTask(task)
h.executor.UpdateResources(&executor.Resources{
CPU: task.Resources.CPU,
MemoryMB: task.Resources.MemoryMB,
IOPS: task.Resources.IOPS,
DiskMB: task.Resources.DiskMB,
})
// Update is not possible
return nil
@@ -413,11 +408,11 @@ func (d *javaHandle) Network() *cstructs.DriverNetwork {
}
func (h *javaHandle) Kill() error {
if err := h.executor.ShutDown(); err != nil {
if err := h.executor.Shutdown(h.shutdownSignal, h.killTimeout); err != nil {
if h.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
return fmt.Errorf("executor Kill failed: %v", err)
}
select {
@@ -426,8 +421,8 @@ func (h *javaHandle) Kill() error {
if h.pluginClient.Exited() {
break
}
if err := h.executor.Exit(); err != nil {
return fmt.Errorf("executor Exit failed: %v", err)
if err := h.executor.Shutdown(h.shutdownSignal, h.killTimeout); err != nil {
return fmt.Errorf("executor Destroy failed: %v", err)
}
}
@@ -442,20 +437,13 @@ func (h *javaHandle) run() {
ps, werr := h.executor.Wait()
close(h.doneCh)
if ps.ExitCode == 0 && werr != nil {
if h.isolationConfig != nil {
ePid := h.pluginClient.ReattachConfig().Pid
if e := executor.ClientCleanup(h.isolationConfig, ePid); e != nil {
h.logger.Printf("[ERR] driver.java: destroying resource container failed: %v", e)
}
} else {
if e := killProcess(h.userPid); e != nil {
h.logger.Printf("[ERR] driver.java: error killing user process: %v", e)
}
if e := killProcess(h.userPid); e != nil {
h.logger.Printf("[ERR] driver.java: error killing user process: %v", e)
}
}
// Exit the executor
h.executor.Exit()
// Destroy the executor
h.executor.Shutdown(h.shutdownSignal, 0)
h.pluginClient.Kill()
// Send the results