Merge pull request #1807 from hashicorp/f-executor-ctx

Send Executor Ctx separately from Launch commands
This commit is contained in:
Alex Dadgar
2016-10-12 13:25:58 -07:00
committed by GitHub
12 changed files with 137 additions and 42 deletions

View File

@@ -783,12 +783,17 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
PortLowerBound: d.config.ClientMinPort,
PortUpperBound: d.config.ClientMaxPort,
}
if err := exec.SetContext(executorCtx); err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("failed to set executor context: %v", err)
}
// Only launch syslog server if we're going to use it!
syslogAddr := ""
if len(driverConfig.Logging) == 0 || driverConfig.Logging[0].Type == "syslog" {
ss, err := exec.LaunchSyslogServer(executorCtx)
ss, err := exec.LaunchSyslogServer()
if err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("failed to start syslog collector: %v", err)
}
syslogAddr = ss.Addr

View File

@@ -128,18 +128,25 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
ChrootEnv: d.config.ChrootEnv,
Task: task,
}
if err := exec.SetContext(executorCtx); err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("failed to set executor context: %v", err)
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{
execCmd := &executor.ExecCommand{
Cmd: command,
Args: driverConfig.Args,
FSIsolation: true,
ResourceLimits: true,
User: getExecutorUser(task),
}, executorCtx)
}
ps, err := exec.LaunchCmd(execCmd)
if err != nil {
pluginClient.Kill()
return nil, err
}
d.logger.Printf("[DEBUG] driver.exec: started process via plugin with pid: %v", ps.Pid)
// Return a driver handle

View File

@@ -49,7 +49,12 @@ func TestExecScriptCheckWithIsolation(t *testing.T) {
execCmd.User = cstructs.DefaultUnpriviledgedUser
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
_, err := executor.LaunchCmd(&execCmd, ctx)
if err := executor.SetContext(ctx); err != nil {
t.Fatalf("Unexpected error")
}
_, err := executor.LaunchCmd(&execCmd)
if err != nil {
t.Fatalf("error in launching command: %v", err)
}

View File

@@ -48,8 +48,9 @@ var (
// Executor is the interface which allows a driver to launch and supervise
// a process
type Executor interface {
LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error)
LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error)
SetContext(ctx *ExecutorContext) error
LaunchCmd(command *ExecCommand) (*ProcessState, error)
LaunchSyslogServer() (*SyslogServerState, error)
Wait() (*ProcessState, error)
ShutDown() error
Exit() error
@@ -229,12 +230,23 @@ func (e *UniversalExecutor) Version() (*ExecutorVersion, error) {
return &ExecutorVersion{Version: "1.0.0"}, nil
}
// SetContext is used to set the executors context and should be the first call
// after launching the executor.
func (e *UniversalExecutor) SetContext(ctx *ExecutorContext) error {
e.ctx = ctx
return nil
}
// LaunchCmd launches a process and returns it's state. It also configures an
// applies isolation on certain platforms.
func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) {
func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, error) {
e.logger.Printf("[DEBUG] executor: launching command %v %v", command.Cmd, strings.Join(command.Args, " "))
e.ctx = ctx
// Ensure the context has been set first
if e.ctx == nil {
return nil, fmt.Errorf("SetContext must be called before launching a command")
}
e.command = command
// setting the user of the process
@@ -272,7 +284,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
e.cmd.Stderr = e.lre
// Look up the binary path and make it executable
absPath, err := e.lookupBin(ctx.TaskEnv.ReplaceEnv(command.Cmd))
absPath, err := e.lookupBin(e.ctx.TaskEnv.ReplaceEnv(command.Cmd))
if err != nil {
return nil, err
}
@@ -294,8 +306,8 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
// Set the commands arguments
e.cmd.Path = path
e.cmd.Args = append([]string{e.cmd.Path}, ctx.TaskEnv.ParseAndReplace(command.Args)...)
e.cmd.Env = ctx.TaskEnv.EnvList()
e.cmd.Args = append([]string{e.cmd.Path}, e.ctx.TaskEnv.ParseAndReplace(command.Args)...)
e.cmd.Env = e.ctx.TaskEnv.EnvList()
// Start the process
if err := e.cmd.Start(); err != nil {

View File

@@ -48,7 +48,12 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) {
execCmd.User = cstructs.DefaultUnpriviledgedUser
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, err := executor.LaunchCmd(&execCmd, ctx)
if err := executor.SetContext(ctx); err != nil {
t.Fatalf("Unexpected error")
}
ps, err := executor.LaunchCmd(&execCmd)
if err != nil {
t.Fatalf("error in launching command: %v", err)
}

View File

@@ -62,8 +62,12 @@ func TestExecutor_Start_Invalid(t *testing.T) {
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
_, err := executor.LaunchCmd(&execCmd, ctx)
if err == nil {
if err := executor.SetContext(ctx); err != nil {
t.Fatalf("Unexpected error")
}
if _, err := executor.LaunchCmd(&execCmd); err == nil {
t.Fatalf("Expected error")
}
}
@@ -73,7 +77,16 @@ func TestExecutor_Start_Wait_Failure_Code(t *testing.T) {
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, _ := executor.LaunchCmd(&execCmd, ctx)
if err := executor.SetContext(ctx); err != nil {
t.Fatalf("Unexpected error")
}
ps, err := executor.LaunchCmd(&execCmd)
if err != nil {
t.Fatalf("Unexpected error")
}
if ps.Pid == 0 {
t.Fatalf("expected process to start and have non zero pid")
}
@@ -91,7 +104,12 @@ func TestExecutor_Start_Wait(t *testing.T) {
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, err := executor.LaunchCmd(&execCmd, ctx)
if err := executor.SetContext(ctx); err != nil {
t.Fatalf("Unexpected error")
}
ps, err := executor.LaunchCmd(&execCmd)
if err != nil {
t.Fatalf("error in launching command: %v", err)
}
@@ -124,7 +142,12 @@ func TestExecutor_WaitExitSignal(t *testing.T) {
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, err := executor.LaunchCmd(&execCmd, ctx)
if err := executor.SetContext(ctx); err != nil {
t.Fatalf("Unexpected error")
}
ps, err := executor.LaunchCmd(&execCmd)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -170,7 +193,12 @@ func TestExecutor_ClientCleanup(t *testing.T) {
execCmd.User = "nobody"
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, err := executor.LaunchCmd(&execCmd, ctx)
if err := executor.SetContext(ctx); err != nil {
t.Fatalf("Unexpected error")
}
ps, err := executor.LaunchCmd(&execCmd)
if err != nil {
t.Fatalf("error in launching command: %v", err)
}
@@ -202,7 +230,12 @@ func TestExecutor_Start_Kill(t *testing.T) {
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, err := executor.LaunchCmd(&execCmd, ctx)
if err := executor.SetContext(ctx); err != nil {
t.Fatalf("Unexpected error")
}
ps, err := executor.LaunchCmd(&execCmd)
if err != nil {
t.Fatalf("error in launching command: %v", err)
}

View File

@@ -11,8 +11,11 @@ import (
"github.com/hashicorp/nomad/client/driver/logging"
)
func (e *UniversalExecutor) LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error) {
e.ctx = ctx
func (e *UniversalExecutor) LaunchSyslogServer() (*SyslogServerState, error) {
// Ensure the context has been set first
if e.ctx == nil {
return nil, fmt.Errorf("SetContext must be called before launching the Syslog Server")
}
// configuring the task dir
if err := e.configureTaskDir(); err != nil {

View File

@@ -28,12 +28,6 @@ type ExecutorRPC struct {
// LaunchCmdArgs wraps a user command and the args for the purposes of RPC
type LaunchCmdArgs struct {
Cmd *executor.ExecCommand
Ctx *executor.ExecutorContext
}
// LaunchSyslogServerArgs wraps the executor context for the purposes of RPC
type LaunchSyslogServerArgs struct {
Ctx *executor.ExecutorContext
}
// SyncServicesArgs wraps the consul context for the purposes of RPC
@@ -41,15 +35,15 @@ type SyncServicesArgs struct {
Ctx *executor.ConsulContext
}
func (e *ExecutorRPC) LaunchCmd(cmd *executor.ExecCommand, ctx *executor.ExecutorContext) (*executor.ProcessState, error) {
func (e *ExecutorRPC) LaunchCmd(cmd *executor.ExecCommand) (*executor.ProcessState, error) {
var ps *executor.ProcessState
err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd, Ctx: ctx}, &ps)
err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd}, &ps)
return ps, err
}
func (e *ExecutorRPC) LaunchSyslogServer(ctx *executor.ExecutorContext) (*executor.SyslogServerState, error) {
func (e *ExecutorRPC) LaunchSyslogServer() (*executor.SyslogServerState, error) {
var ss *executor.SyslogServerState
err := e.client.Call("Plugin.LaunchSyslogServer", LaunchSyslogServerArgs{Ctx: ctx}, &ss)
err := e.client.Call("Plugin.LaunchSyslogServer", new(interface{}), &ss)
return ss, err
}
@@ -67,6 +61,10 @@ func (e *ExecutorRPC) Exit() error {
return e.client.Call("Plugin.Exit", new(interface{}), new(interface{}))
}
func (e *ExecutorRPC) SetContext(ctx *executor.ExecutorContext) error {
return e.client.Call("Plugin.SetContext", ctx, new(interface{}))
}
func (e *ExecutorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error {
return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{}))
}
@@ -101,15 +99,15 @@ type ExecutorRPCServer struct {
}
func (e *ExecutorRPCServer) LaunchCmd(args LaunchCmdArgs, ps *executor.ProcessState) error {
state, err := e.Impl.LaunchCmd(args.Cmd, args.Ctx)
state, err := e.Impl.LaunchCmd(args.Cmd)
if state != nil {
*ps = *state
}
return err
}
func (e *ExecutorRPCServer) LaunchSyslogServer(args LaunchSyslogServerArgs, ss *executor.SyslogServerState) error {
state, err := e.Impl.LaunchSyslogServer(args.Ctx)
func (e *ExecutorRPCServer) LaunchSyslogServer(args interface{}, ss *executor.SyslogServerState) error {
state, err := e.Impl.LaunchSyslogServer()
if state != nil {
*ss = *state
}
@@ -132,6 +130,10 @@ func (e *ExecutorRPCServer) Exit(args interface{}, resp *interface{}) error {
return e.Impl.Exit()
}
func (e *ExecutorRPCServer) SetContext(args *executor.ExecutorContext, resp *interface{}) error {
return e.Impl.SetContext(args)
}
func (e *ExecutorRPCServer) UpdateLogConfig(args *structs.LogConfig, resp *interface{}) error {
return e.Impl.UpdateLogConfig(args)
}

View File

@@ -202,6 +202,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
if err != nil {
return nil, err
}
// Set the context
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
Driver: "java",
@@ -210,19 +212,24 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
ChrootEnv: d.config.ChrootEnv,
Task: task,
}
if err := execIntf.SetContext(executorCtx); err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("failed to set executor context: %v", err)
}
absPath, err := GetAbsolutePath("java")
if err != nil {
return nil, err
}
ps, err := execIntf.LaunchCmd(&executor.ExecCommand{
execCmd := &executor.ExecCommand{
Cmd: absPath,
Args: args,
FSIsolation: true,
ResourceLimits: true,
User: getExecutorUser(task),
}, executorCtx)
}
ps, err := execIntf.LaunchCmd(execCmd)
if err != nil {
pluginClient.Kill()
return nil, err

View File

@@ -247,11 +247,17 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
AllocID: ctx.AllocID,
Task: task,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{
if err := exec.SetContext(executorCtx); err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("failed to set executor context: %v", err)
}
execCmd := &executor.ExecCommand{
Cmd: args[0],
Args: args[1:],
User: task.User,
}, executorCtx)
}
ps, err := exec.LaunchCmd(execCmd)
if err != nil {
pluginClient.Kill()
return nil, err

View File

@@ -142,12 +142,17 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
AllocID: ctx.AllocID,
Task: task,
}
if err := exec.SetContext(executorCtx); err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("failed to set executor context: %v", err)
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{
execCmd := &executor.ExecCommand{
Cmd: command,
Args: driverConfig.Args,
User: task.User,
}, executorCtx)
}
ps, err := exec.LaunchCmd(execCmd)
if err != nil {
pluginClient.Kill()
return nil, err

View File

@@ -289,17 +289,22 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
AllocID: ctx.AllocID,
Task: task,
}
if err := execIntf.SetContext(executorCtx); err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("failed to set executor context: %v", err)
}
absPath, err := GetAbsolutePath("rkt")
if err != nil {
return nil, err
}
ps, err := execIntf.LaunchCmd(&executor.ExecCommand{
execCmd := &executor.ExecCommand{
Cmd: absPath,
Args: cmdArgs,
User: task.User,
}, executorCtx)
}
ps, err := execIntf.LaunchCmd(execCmd)
if err != nil {
pluginClient.Kill()
return nil, err