mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 01:45:44 +03:00
exec_basic uses Spawner; create shared test harness for executors
This commit is contained in:
@@ -1,22 +1,29 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/driver/args"
|
||||
"github.com/hashicorp/nomad/client/driver/environment"
|
||||
"github.com/hashicorp/nomad/client/driver/spawn"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// BasicExecutor should work everywhere, and as a result does not include
|
||||
// any resource restrictions or runas capabilities.
|
||||
type BasicExecutor struct {
|
||||
cmd exec.Cmd
|
||||
cmd exec.Cmd
|
||||
spawn *spawn.Spawner
|
||||
taskName string
|
||||
taskDir string
|
||||
allocDir string
|
||||
}
|
||||
|
||||
// TODO: Have raw_exec use this as well.
|
||||
@@ -34,9 +41,13 @@ func (e *BasicExecutor) Limit(resources *structs.Resources) error {
|
||||
func (e *BasicExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error {
|
||||
taskDir, ok := alloc.TaskDirs[taskName]
|
||||
if !ok {
|
||||
return fmt.Errorf("Error finding task dir for (%s)", taskName)
|
||||
fmt.Errorf("Couldn't find task directory for task %v", taskName)
|
||||
}
|
||||
e.cmd.Dir = taskDir
|
||||
|
||||
e.taskDir = taskDir
|
||||
e.taskName = taskName
|
||||
e.allocDir = alloc.AllocDir
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -64,42 +75,73 @@ func (e *BasicExecutor) Start() error {
|
||||
e.cmd.Args = parsed
|
||||
|
||||
// We don't want to call ourself. We want to call Start on our embedded Cmd
|
||||
return e.cmd.Start()
|
||||
spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status"))
|
||||
e.spawn = spawn.NewSpawner(spawnState)
|
||||
e.spawn.SetCommand(&e.cmd)
|
||||
e.spawn.SetLogs(&spawn.Logs{
|
||||
Stdout: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", e.taskName)),
|
||||
Stderr: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", e.taskName)),
|
||||
Stdin: os.DevNull,
|
||||
})
|
||||
|
||||
return e.spawn.Spawn(nil)
|
||||
}
|
||||
|
||||
func (e *BasicExecutor) Open(pid string) error {
|
||||
pidNum, err := strconv.Atoi(pid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse pid %v: %v", pid, err)
|
||||
func (e *BasicExecutor) Open(id string) error {
|
||||
var spawn spawn.Spawner
|
||||
dec := json.NewDecoder(strings.NewReader(id))
|
||||
if err := dec.Decode(&spawn); err != nil {
|
||||
return fmt.Errorf("Failed to parse id: %v", err)
|
||||
}
|
||||
|
||||
process, err := os.FindProcess(pidNum)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to reopen pid %d: %v", pidNum, err)
|
||||
}
|
||||
e.cmd.Process = process
|
||||
// Setup the executor.
|
||||
e.spawn = &spawn
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *BasicExecutor) Wait() error {
|
||||
// We don't want to call ourself. We want to call Start on our embedded Cmd
|
||||
return e.cmd.Wait()
|
||||
code, err := e.spawn.Wait()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if code != 0 {
|
||||
return fmt.Errorf("Task exited with code: %d", code)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *BasicExecutor) ID() (string, error) {
|
||||
if e.cmd.Process != nil {
|
||||
return strconv.Itoa(e.cmd.Process.Pid), nil
|
||||
} else {
|
||||
return "", fmt.Errorf("Process has finished or was never started")
|
||||
if e.spawn == nil {
|
||||
return "", fmt.Errorf("Process was never started")
|
||||
}
|
||||
|
||||
var buffer bytes.Buffer
|
||||
enc := json.NewEncoder(&buffer)
|
||||
if err := enc.Encode(e.spawn); err != nil {
|
||||
return "", fmt.Errorf("Failed to serialize id: %v", err)
|
||||
}
|
||||
|
||||
return buffer.String(), nil
|
||||
}
|
||||
|
||||
func (e *BasicExecutor) Shutdown() error {
|
||||
return e.ForceStop()
|
||||
proc, err := os.FindProcess(e.spawn.UserPid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to find user processes %v: %v", e.spawn.UserPid, err)
|
||||
}
|
||||
|
||||
return proc.Signal(os.Interrupt)
|
||||
}
|
||||
|
||||
func (e *BasicExecutor) ForceStop() error {
|
||||
return e.cmd.Process.Kill()
|
||||
proc, err := os.FindProcess(e.spawn.UserPid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to find user processes %v: %v", e.spawn.UserPid, err)
|
||||
}
|
||||
|
||||
return proc.Kill()
|
||||
}
|
||||
|
||||
func (e *BasicExecutor) Command() *exec.Cmd {
|
||||
|
||||
Reference in New Issue
Block a user