From fedf3c791bb8ecbe8404fc8f7925f98ed40e1a84 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 4 Feb 2016 10:09:52 -0800 Subject: [PATCH] Fixed a test --- client/driver/exec.go | 15 ++++++--------- client/driver/exec_test.go | 3 +++ client/driver/java.go | 7 ++++--- client/driver/plugins/executor.go | 13 +++++++------ client/driver/plugins/executor_linux.go | 6 +++--- client/driver/plugins/executor_plugin.go | 22 ++++++++++++++++++++++ client/driver/qemu.go | 7 ++++--- client/driver/raw_exec.go | 7 ++++--- 8 files changed, 53 insertions(+), 27 deletions(-) diff --git a/client/driver/exec.go b/client/driver/exec.go index 70d93b176..da1d96a2c 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -115,7 +115,8 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, executorCtx := &plugins.ExecutorContext{ TaskEnv: d.taskEnv, AllocDir: ctx.AllocDir, - Task: task, + TaskName: task.Name, + TaskResources: task.Resources, ResourceLimits: true, FSIsolation: true, UnprivilegedUser: false, @@ -143,7 +144,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, type execId struct { KillTimeout time.Duration - PluginConfig *plugin.ReattachConfig + PluginConfig *plugins.ExecutorReattachConfig } func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -152,16 +153,12 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } - bin, err := discover.NomadExecutable() - if err != nil { - return nil, fmt.Errorf("unable to find the nomad binary: %v", err) - } + reattachConfig := id.PluginConfig.PluginConfig() pluginConfig := &plugin.ClientConfig{ HandshakeConfig: plugins.HandshakeConfig, Plugins: plugins.PluginMap, - Cmd: exec.Command(bin, "executor"), - Reattach: id.PluginConfig, + Reattach: reattachConfig, } executor, client, err := d.executor(pluginConfig) if err != nil { @@ -199,7 +196,7 @@ func (d *ExecDriver) executor(config *plugin.ClientConfig) (plugins.Executor, *p func (h *execHandle) ID() string { id := execId{ KillTimeout: h.killTimeout, - PluginConfig: h.pluginClient.ReattachConfig(), + PluginConfig: plugins.NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), } data, err := json.Marshal(id) diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 5b077e9d6..a1ea5948e 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -70,6 +70,9 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) { if handle2 == nil { t.Fatalf("missing handle") } + + handle.Kill() + handle2.Kill() } func TestExecDriver_Start_Wait(t *testing.T) { diff --git a/client/driver/java.go b/client/driver/java.go index 79bf9454f..c455d457f 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -161,9 +161,10 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &plugins.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - Task: task, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + TaskName: task.Name, + TaskResources: task.Resources, } ps, err := executor.LaunchCmd(&plugins.ExecCommand{Cmd: "java", Args: args}, executorCtx) if err != nil { diff --git a/client/driver/plugins/executor.go b/client/driver/plugins/executor.go index 4c0444f4b..2106caf71 100644 --- a/client/driver/plugins/executor.go +++ b/client/driver/plugins/executor.go @@ -21,7 +21,8 @@ import ( type ExecutorContext struct { TaskEnv *env.TaskEnvironment AllocDir *allocdir.AllocDir - Task *structs.Task + TaskName string + TaskResources *structs.Resources FSIsolation bool ResourceLimits bool UnprivilegedUser bool @@ -85,14 +86,14 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext } } - stdoPath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", ctx.Task.Name)) + stdoPath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", ctx.TaskName)) stdo, err := os.OpenFile(stdoPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) if err != nil { return nil, err } e.cmd.Stdout = stdo - stdePath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", ctx.Task.Name)) + stdePath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", ctx.TaskName)) stde, err := os.OpenFile(stdePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) if err != nil { return nil, err @@ -138,7 +139,7 @@ func (e *UniversalExecutor) wait() { } func (e *UniversalExecutor) Exit() error { - e.logger.Printf("[INFO] Exiting plugin for task %q", e.ctx.Task.Name) + e.logger.Printf("[INFO] Exiting plugin for task %q", e.ctx.TaskName) proc, err := os.FindProcess(e.cmd.Process.Pid) if err != nil { return fmt.Errorf("failied to find user process %v: %v", e.cmd.Process.Pid, err) @@ -164,10 +165,10 @@ func (e *UniversalExecutor) ShutDown() error { } func (e *UniversalExecutor) configureTaskDir() error { - taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.Task.Name] + taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.TaskName] e.taskDir = taskDir if !ok { - return fmt.Errorf("Couldn't find task directory for task %v", e.ctx.Task.Name) + return fmt.Errorf("Couldn't find task directory for task %v", e.ctx.TaskName) } e.cmd.Dir = taskDir return nil diff --git a/client/driver/plugins/executor_linux.go b/client/driver/plugins/executor_linux.go index 8ae41d685..efdd4e410 100644 --- a/client/driver/plugins/executor_linux.go +++ b/client/driver/plugins/executor_linux.go @@ -41,7 +41,7 @@ func (e *UniversalExecutor) configureIsolation() error { } if e.ctx.ResourceLimits { - if err := e.configureCgroups(e.ctx.Task.Resources); err != nil { + if err := e.configureCgroups(e.ctx.TaskResources); err != nil { return fmt.Errorf("error creating cgroups: %v", err) } } @@ -142,11 +142,11 @@ func (e *UniversalExecutor) pathExists(path string) bool { func (e *UniversalExecutor) configureChroot() error { allocDir := e.ctx.AllocDir - if err := allocDir.MountSharedDir(e.ctx.Task.Name); err != nil { + if err := allocDir.MountSharedDir(e.ctx.TaskName); err != nil { return err } - if err := allocDir.Embed(e.ctx.Task.Name, chrootEnv); err != nil { + if err := allocDir.Embed(e.ctx.TaskName, chrootEnv); err != nil { return err } diff --git a/client/driver/plugins/executor_plugin.go b/client/driver/plugins/executor_plugin.go index bc683070c..9506dea3d 100644 --- a/client/driver/plugins/executor_plugin.go +++ b/client/driver/plugins/executor_plugin.go @@ -2,6 +2,7 @@ package plugins import ( "log" + "net" "net/rpc" "os" @@ -18,6 +19,27 @@ var PluginMap = map[string]plugin.Plugin{ "executor": new(ExecutorPlugin), } +type ExecutorReattachConfig struct { + Pid int + AddrNet string + AddrName string +} + +func (c *ExecutorReattachConfig) PluginConfig() *plugin.ReattachConfig { + var addr net.Addr + switch c.AddrNet { + case "unix", "unixgram", "unixpacket": + addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName) + case "tcp", "tcp4", "tcp6": + addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName) + } + return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr} +} + +func NewExecutorReattachConfig(c *plugin.ReattachConfig) *ExecutorReattachConfig { + return &ExecutorReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()} +} + type ExecutorRPC struct { client *rpc.Client } diff --git a/client/driver/qemu.go b/client/driver/qemu.go index d9df73d36..6903d303f 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -205,9 +205,10 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &plugins.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - Task: task, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + TaskName: task.Name, + TaskResources: task.Resources, } ps, err := executor.LaunchCmd(&plugins.ExecCommand{Cmd: args[0], Args: args[1:]}, executorCtx) if err != nil { diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 8819b98e4..9f42f41c8 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -112,9 +112,10 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return nil, err } executorCtx := &plugins.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - Task: task, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + TaskName: task.Name, + TaskResources: task.Resources, } ps, err := executor.LaunchCmd(&plugins.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) if err != nil {