Fixed a test

This commit is contained in:
Diptanu Choudhury
2016-02-04 10:09:52 -08:00
parent 837b462f5c
commit fedf3c791b
8 changed files with 53 additions and 27 deletions

View File

@@ -115,7 +115,8 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
executorCtx := &plugins.ExecutorContext{
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
Task: task,
TaskName: task.Name,
TaskResources: task.Resources,
ResourceLimits: true,
FSIsolation: true,
UnprivilegedUser: false,
@@ -143,7 +144,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
type execId struct {
KillTimeout time.Duration
PluginConfig *plugin.ReattachConfig
PluginConfig *plugins.ExecutorReattachConfig
}
func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@@ -152,16 +153,12 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
}
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
reattachConfig := id.PluginConfig.PluginConfig()
pluginConfig := &plugin.ClientConfig{
HandshakeConfig: plugins.HandshakeConfig,
Plugins: plugins.PluginMap,
Cmd: exec.Command(bin, "executor"),
Reattach: id.PluginConfig,
Reattach: reattachConfig,
}
executor, client, err := d.executor(pluginConfig)
if err != nil {
@@ -199,7 +196,7 @@ func (d *ExecDriver) executor(config *plugin.ClientConfig) (plugins.Executor, *p
func (h *execHandle) ID() string {
id := execId{
KillTimeout: h.killTimeout,
PluginConfig: h.pluginClient.ReattachConfig(),
PluginConfig: plugins.NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
}
data, err := json.Marshal(id)

View File

@@ -70,6 +70,9 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) {
if handle2 == nil {
t.Fatalf("missing handle")
}
handle.Kill()
handle2.Kill()
}
func TestExecDriver_Start_Wait(t *testing.T) {

View File

@@ -161,9 +161,10 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
executorCtx := &plugins.ExecutorContext{
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
Task: task,
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
}
ps, err := executor.LaunchCmd(&plugins.ExecCommand{Cmd: "java", Args: args}, executorCtx)
if err != nil {

View File

@@ -21,7 +21,8 @@ import (
type ExecutorContext struct {
TaskEnv *env.TaskEnvironment
AllocDir *allocdir.AllocDir
Task *structs.Task
TaskName string
TaskResources *structs.Resources
FSIsolation bool
ResourceLimits bool
UnprivilegedUser bool
@@ -85,14 +86,14 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
}
}
stdoPath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", ctx.Task.Name))
stdoPath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", ctx.TaskName))
stdo, err := os.OpenFile(stdoPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
e.cmd.Stdout = stdo
stdePath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", ctx.Task.Name))
stdePath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", ctx.TaskName))
stde, err := os.OpenFile(stdePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, err
@@ -138,7 +139,7 @@ func (e *UniversalExecutor) wait() {
}
func (e *UniversalExecutor) Exit() error {
e.logger.Printf("[INFO] Exiting plugin for task %q", e.ctx.Task.Name)
e.logger.Printf("[INFO] Exiting plugin for task %q", e.ctx.TaskName)
proc, err := os.FindProcess(e.cmd.Process.Pid)
if err != nil {
return fmt.Errorf("failied to find user process %v: %v", e.cmd.Process.Pid, err)
@@ -164,10 +165,10 @@ func (e *UniversalExecutor) ShutDown() error {
}
func (e *UniversalExecutor) configureTaskDir() error {
taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.Task.Name]
taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.TaskName]
e.taskDir = taskDir
if !ok {
return fmt.Errorf("Couldn't find task directory for task %v", e.ctx.Task.Name)
return fmt.Errorf("Couldn't find task directory for task %v", e.ctx.TaskName)
}
e.cmd.Dir = taskDir
return nil

View File

@@ -41,7 +41,7 @@ func (e *UniversalExecutor) configureIsolation() error {
}
if e.ctx.ResourceLimits {
if err := e.configureCgroups(e.ctx.Task.Resources); err != nil {
if err := e.configureCgroups(e.ctx.TaskResources); err != nil {
return fmt.Errorf("error creating cgroups: %v", err)
}
}
@@ -142,11 +142,11 @@ func (e *UniversalExecutor) pathExists(path string) bool {
func (e *UniversalExecutor) configureChroot() error {
allocDir := e.ctx.AllocDir
if err := allocDir.MountSharedDir(e.ctx.Task.Name); err != nil {
if err := allocDir.MountSharedDir(e.ctx.TaskName); err != nil {
return err
}
if err := allocDir.Embed(e.ctx.Task.Name, chrootEnv); err != nil {
if err := allocDir.Embed(e.ctx.TaskName, chrootEnv); err != nil {
return err
}

View File

@@ -2,6 +2,7 @@ package plugins
import (
"log"
"net"
"net/rpc"
"os"
@@ -18,6 +19,27 @@ var PluginMap = map[string]plugin.Plugin{
"executor": new(ExecutorPlugin),
}
type ExecutorReattachConfig struct {
Pid int
AddrNet string
AddrName string
}
func (c *ExecutorReattachConfig) PluginConfig() *plugin.ReattachConfig {
var addr net.Addr
switch c.AddrNet {
case "unix", "unixgram", "unixpacket":
addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName)
case "tcp", "tcp4", "tcp6":
addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName)
}
return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr}
}
func NewExecutorReattachConfig(c *plugin.ReattachConfig) *ExecutorReattachConfig {
return &ExecutorReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()}
}
type ExecutorRPC struct {
client *rpc.Client
}

View File

@@ -205,9 +205,10 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
executorCtx := &plugins.ExecutorContext{
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
Task: task,
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
}
ps, err := executor.LaunchCmd(&plugins.ExecCommand{Cmd: args[0], Args: args[1:]}, executorCtx)
if err != nil {

View File

@@ -112,9 +112,10 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
return nil, err
}
executorCtx := &plugins.ExecutorContext{
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
Task: task,
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
}
ps, err := executor.LaunchCmd(&plugins.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx)
if err != nil {