Merge pull request #759 from hashicorp/r-executor-plugin

Implement executor plugin
This commit is contained in:
Diptanu Choudhury
2016-02-09 12:10:43 -08:00
48 changed files with 1735 additions and 2271 deletions

View File

@@ -53,7 +53,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusDead {
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}
return true, nil
@@ -77,7 +77,7 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusRunning {
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil

View File

@@ -8,6 +8,7 @@ import (
"path/filepath"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -37,9 +38,6 @@ type AllocDir struct {
// TaskDirs is a mapping of task names to their non-shared directory.
TaskDirs map[string]string
// A list of locations the shared alloc has been mounted to.
Mounted []string
}
// AllocFileInfo holds information about a file inside the AllocDir
@@ -67,13 +65,39 @@ func NewAllocDir(allocDir string) *AllocDir {
// Tears down previously build directory structure.
func (d *AllocDir) Destroy() error {
// Unmount all mounted shared alloc dirs.
for _, m := range d.Mounted {
if err := d.unmountSharedDir(m); err != nil {
return fmt.Errorf("Failed to unmount shared directory: %v", err)
}
var mErr multierror.Error
if err := d.UnmountAll(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
return os.RemoveAll(d.AllocDir)
if err := os.RemoveAll(d.AllocDir); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
return mErr.ErrorOrNil()
}
func (d *AllocDir) UnmountAll() error {
var mErr multierror.Error
for _, dir := range d.TaskDirs {
// Check if the directory has the shared alloc mounted.
taskAlloc := filepath.Join(dir, SharedAllocName)
if d.pathExists(taskAlloc) {
if err := d.unmountSharedDir(taskAlloc); err != nil {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("failed to unmount shared alloc dir %q: %v", taskAlloc, err))
}
if err := os.RemoveAll(taskAlloc); err != nil {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("failed to delete shared alloc dir %q: %v", taskAlloc, err))
}
}
// Unmount dev/ and proc/ have been mounted.
d.unmountSpecialDirs(dir)
}
return mErr.ErrorOrNil()
}
// Given a list of a task build the correct alloc structure.
@@ -248,7 +272,6 @@ func (d *AllocDir) MountSharedDir(task string) error {
return fmt.Errorf("Failed to mount shared directory for task %v: %v", task, err)
}
d.Mounted = append(d.Mounted, taskLoc)
return nil
}
@@ -325,3 +348,13 @@ func fileCopy(src, dst string, perm os.FileMode) error {
return nil
}
// pathExists is a helper function to check if the path exists.
func (d *AllocDir) pathExists(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}

View File

@@ -13,3 +13,14 @@ func (d *AllocDir) mountSharedDir(dir string) error {
func (d *AllocDir) unmountSharedDir(dir string) error {
return syscall.Unlink(dir)
}
// MountSpecialDirs mounts the dev and proc file system on the chroot of the
// task. It's a no-op on darwin.
func (d *AllocDir) MountSpecialDirs(taskDir string) error {
return nil
}
// unmountSpecialDirs unmounts the dev and proc file system from the chroot
func (d *AllocDir) unmountSpecialDirs(taskDir string) error {
return nil
}

View File

@@ -1,8 +1,12 @@
package allocdir
import (
"fmt"
"os"
"path/filepath"
"syscall"
"github.com/hashicorp/go-multierror"
)
// Bind mounts the shared directory into the task directory. Must be root to
@@ -18,3 +22,62 @@ func (d *AllocDir) mountSharedDir(taskDir string) error {
func (d *AllocDir) unmountSharedDir(dir string) error {
return syscall.Unmount(dir, 0)
}
// MountSpecialDirs mounts the dev and proc file system from the host to the
// chroot
func (d *AllocDir) MountSpecialDirs(taskDir string) error {
// Mount dev
dev := filepath.Join(taskDir, "dev")
if !d.pathExists(dev) {
if err := os.Mkdir(dev, 0777); err != nil {
return fmt.Errorf("Mkdir(%v) failed: %v", dev, err)
}
if err := syscall.Mount("none", dev, "devtmpfs", syscall.MS_RDONLY, ""); err != nil {
return fmt.Errorf("Couldn't mount /dev to %v: %v", dev, err)
}
}
// Mount proc
proc := filepath.Join(taskDir, "proc")
if !d.pathExists(proc) {
if err := os.Mkdir(proc, 0777); err != nil {
return fmt.Errorf("Mkdir(%v) failed: %v", proc, err)
}
if err := syscall.Mount("none", proc, "proc", syscall.MS_RDONLY, ""); err != nil {
return fmt.Errorf("Couldn't mount /proc to %v: %v", proc, err)
}
}
return nil
}
// unmountSpecialDirs unmounts the dev and proc file system from the chroot
func (d *AllocDir) unmountSpecialDirs(taskDir string) error {
errs := new(multierror.Error)
dev := filepath.Join(taskDir, "dev")
if d.pathExists(dev) {
if err := syscall.Unmount(dev, 0); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to unmount dev (%v): %v", dev, err))
}
if err := os.RemoveAll(dev); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to delete dev directory (%v): %v", dev, err))
}
}
// Unmount proc.
proc := filepath.Join(taskDir, "proc")
if d.pathExists(proc) {
if err := syscall.Unmount(proc, 0); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to unmount proc (%v): %v", proc, err))
}
if err := os.RemoveAll(proc); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to delete proc directory (%v): %v", dev, err))
}
}
return errs.ErrorOrNil()
}

View File

@@ -23,3 +23,14 @@ func (d *AllocDir) dropDirPermissions(path string) error {
func (d *AllocDir) unmountSharedDir(dir string) error {
return nil
}
// MountSpecialDirs mounts the dev and proc file system on the chroot of the
// task. It's a no-op on windows.
func (d *AllocDir) MountSpecialDirs(taskDir string) error {
return nil
}
// unmountSpecialDirs unmounts the dev and proc file system from the chroot
func (d *AllocDir) unmountSpecialDirs(taskDir string) error {
return nil
}

View File

@@ -57,6 +57,14 @@ type Config struct {
// Node provides the base node
Node *structs.Node
// ClientMaxPort is the upper range of the ports that the client uses for
// communicating with plugin subsystems
ClientMaxPort uint
// ClientMinPort is the lower range of the ports that the client uses for
// communicating with plugin subsystems
ClientMinPort uint
// Options provides arbitrary key-value configuration for nomad internals,
// like fingerprinters and drivers. The format is:
//

View File

@@ -89,8 +89,14 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node
func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration {
max := d.config.MaxKillTimeout.Nanoseconds()
desired := task.KillTimeout.Nanoseconds()
// Make the minimum time between signal and kill, 1 second.
if desired == 0 {
desired = (1 * time.Second).Nanoseconds()
}
if desired < max {
return task.KillTimeout
return time.Duration(desired)
}
return d.config.MaxKillTimeout

View File

@@ -46,6 +46,7 @@ func testConfig() *config.Config {
conf := &config.Config{}
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
conf.MaxKillTimeout = 10 * time.Second
return conf
}

View File

@@ -55,26 +55,26 @@ const (
// TaskEnvironment is used to expose information to a task via environment
// variables and provide interpolation of Nomad variables.
type TaskEnvironment struct {
env map[string]string
meta map[string]string
allocDir string
taskDir string
cpuLimit int
memLimit int
node *structs.Node
networks []*structs.NetworkResource
portMap map[string]int
Env map[string]string
Meta map[string]string
AllocDir string
TaskDir string
CpuLimit int
MemLimit int
Node *structs.Node
Networks []*structs.NetworkResource
PortMap map[string]int
// taskEnv is the variables that will be set in the tasks environment
taskEnv map[string]string
TaskEnv map[string]string
// nodeValues is the values that are allowed for interprolation from the
// node.
nodeValues map[string]string
NodeValues map[string]string
}
func NewTaskEnvironment(node *structs.Node) *TaskEnvironment {
return &TaskEnvironment{node: node}
return &TaskEnvironment{Node: node}
}
// ParseAndReplace takes the user supplied args replaces any instance of an
@@ -82,7 +82,7 @@ func NewTaskEnvironment(node *structs.Node) *TaskEnvironment {
func (t *TaskEnvironment) ParseAndReplace(args []string) []string {
replaced := make([]string, len(args))
for i, arg := range args {
replaced[i] = hargs.ReplaceEnv(arg, t.taskEnv, t.nodeValues)
replaced[i] = hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues)
}
return replaced
@@ -92,75 +92,75 @@ func (t *TaskEnvironment) ParseAndReplace(args []string) []string {
// and nomad variables. If the variable is found in the passed map it is
// replaced, otherwise the original string is returned.
func (t *TaskEnvironment) ReplaceEnv(arg string) string {
return hargs.ReplaceEnv(arg, t.taskEnv, t.nodeValues)
return hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues)
}
// Build must be called after all the tasks environment values have been set.
func (t *TaskEnvironment) Build() *TaskEnvironment {
t.nodeValues = make(map[string]string)
t.taskEnv = make(map[string]string)
t.NodeValues = make(map[string]string)
t.TaskEnv = make(map[string]string)
// Build the task metadata
for k, v := range t.meta {
t.taskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
for k, v := range t.Meta {
t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
}
// Build the ports
for _, network := range t.networks {
for label, value := range network.MapLabelToValues(t.portMap) {
for _, network := range t.Networks {
for label, value := range network.MapLabelToValues(t.PortMap) {
IPPort := fmt.Sprintf("%s:%d", network.IP, value)
t.taskEnv[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort
t.TaskEnv[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort
// Pass an explicit port mapping to the environment
if port, ok := t.portMap[label]; ok {
t.taskEnv[fmt.Sprintf("%s%s", HostPortPrefix, label)] = strconv.Itoa(port)
if port, ok := t.PortMap[label]; ok {
t.TaskEnv[fmt.Sprintf("%s%s", HostPortPrefix, label)] = strconv.Itoa(port)
}
}
}
// Build the directories
if t.allocDir != "" {
t.taskEnv[AllocDir] = t.allocDir
if t.AllocDir != "" {
t.TaskEnv[AllocDir] = t.AllocDir
}
if t.taskDir != "" {
t.taskEnv[TaskLocalDir] = t.taskDir
if t.TaskDir != "" {
t.TaskEnv[TaskLocalDir] = t.TaskDir
}
// Build the resource limits
if t.memLimit != 0 {
t.taskEnv[MemLimit] = strconv.Itoa(t.memLimit)
if t.MemLimit != 0 {
t.TaskEnv[MemLimit] = strconv.Itoa(t.MemLimit)
}
if t.cpuLimit != 0 {
t.taskEnv[CpuLimit] = strconv.Itoa(t.cpuLimit)
if t.CpuLimit != 0 {
t.TaskEnv[CpuLimit] = strconv.Itoa(t.CpuLimit)
}
// Build the node
if t.node != nil {
if t.Node != nil {
// Set up the node values.
t.nodeValues[nodeIdKey] = t.node.ID
t.nodeValues[nodeDcKey] = t.node.Datacenter
t.nodeValues[nodeNameKey] = t.node.Name
t.nodeValues[nodeClassKey] = t.node.NodeClass
t.NodeValues[nodeIdKey] = t.Node.ID
t.NodeValues[nodeDcKey] = t.Node.Datacenter
t.NodeValues[nodeNameKey] = t.Node.Name
t.NodeValues[nodeClassKey] = t.Node.NodeClass
// Set up the attributes.
for k, v := range t.node.Attributes {
t.nodeValues[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v
for k, v := range t.Node.Attributes {
t.NodeValues[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v
}
// Set up the meta.
for k, v := range t.node.Meta {
t.nodeValues[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v
for k, v := range t.Node.Meta {
t.NodeValues[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v
}
}
// Interpret the environment variables
interpreted := make(map[string]string, len(t.env))
for k, v := range t.env {
interpreted[k] = hargs.ReplaceEnv(v, t.nodeValues, t.taskEnv)
interpreted := make(map[string]string, len(t.Env))
for k, v := range t.Env {
interpreted[k] = hargs.ReplaceEnv(v, t.NodeValues, t.TaskEnv)
}
for k, v := range interpreted {
t.taskEnv[k] = v
t.TaskEnv[k] = v
}
return t
@@ -169,7 +169,7 @@ func (t *TaskEnvironment) Build() *TaskEnvironment {
// EnvList returns a list of strings with NAME=value pairs.
func (t *TaskEnvironment) EnvList() []string {
env := []string{}
for k, v := range t.taskEnv {
for k, v := range t.TaskEnv {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}
@@ -178,8 +178,8 @@ func (t *TaskEnvironment) EnvList() []string {
// EnvMap returns a copy of the tasks environment variables.
func (t *TaskEnvironment) EnvMap() map[string]string {
m := make(map[string]string, len(t.taskEnv))
for k, v := range t.taskEnv {
m := make(map[string]string, len(t.TaskEnv))
for k, v := range t.TaskEnv {
m[k] = v
}
@@ -188,95 +188,95 @@ func (t *TaskEnvironment) EnvMap() map[string]string {
// Builder methods to build the TaskEnvironment
func (t *TaskEnvironment) SetAllocDir(dir string) *TaskEnvironment {
t.allocDir = dir
t.AllocDir = dir
return t
}
func (t *TaskEnvironment) ClearAllocDir() *TaskEnvironment {
t.allocDir = ""
t.AllocDir = ""
return t
}
func (t *TaskEnvironment) SetTaskLocalDir(dir string) *TaskEnvironment {
t.taskDir = dir
t.TaskDir = dir
return t
}
func (t *TaskEnvironment) ClearTaskLocalDir() *TaskEnvironment {
t.taskDir = ""
t.TaskDir = ""
return t
}
func (t *TaskEnvironment) SetMemLimit(limit int) *TaskEnvironment {
t.memLimit = limit
t.MemLimit = limit
return t
}
func (t *TaskEnvironment) ClearMemLimit() *TaskEnvironment {
t.memLimit = 0
t.MemLimit = 0
return t
}
func (t *TaskEnvironment) SetCpuLimit(limit int) *TaskEnvironment {
t.cpuLimit = limit
t.CpuLimit = limit
return t
}
func (t *TaskEnvironment) ClearCpuLimit() *TaskEnvironment {
t.cpuLimit = 0
t.CpuLimit = 0
return t
}
func (t *TaskEnvironment) SetNetworks(networks []*structs.NetworkResource) *TaskEnvironment {
t.networks = networks
t.Networks = networks
return t
}
func (t *TaskEnvironment) clearNetworks() *TaskEnvironment {
t.networks = nil
t.Networks = nil
return t
}
func (t *TaskEnvironment) SetPortMap(portMap map[string]int) *TaskEnvironment {
t.portMap = portMap
t.PortMap = portMap
return t
}
func (t *TaskEnvironment) clearPortMap() *TaskEnvironment {
t.portMap = nil
t.PortMap = nil
return t
}
// Takes a map of meta values to be passed to the task. The keys are capatilized
// when the environent variable is set.
func (t *TaskEnvironment) SetMeta(m map[string]string) *TaskEnvironment {
t.meta = m
t.Meta = m
return t
}
func (t *TaskEnvironment) ClearMeta() *TaskEnvironment {
t.meta = nil
t.Meta = nil
return t
}
func (t *TaskEnvironment) SetEnvvars(m map[string]string) *TaskEnvironment {
t.env = m
t.Env = m
return t
}
// Appends the given environment variables.
func (t *TaskEnvironment) AppendEnvvars(m map[string]string) *TaskEnvironment {
if t.env == nil {
t.env = make(map[string]string, len(m))
if t.Env == nil {
t.Env = make(map[string]string, len(m))
}
for k, v := range m {
t.env[k] = v
t.Env[k] = v
}
return t
}
func (t *TaskEnvironment) ClearEnvvars() *TaskEnvironment {
t.env = nil
t.Env = nil
return t
}

View File

@@ -4,13 +4,19 @@ import (
"encoding/json"
"fmt"
"log"
"os/exec"
"path/filepath"
"syscall"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
@@ -30,11 +36,15 @@ type ExecDriverConfig struct {
// execHandle is returned from Start/Open as a handle to the PID
type execHandle struct {
cmd executor.Executor
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
pluginClient *plugin.Client
executor executor.Executor
isolationConfig *executor.IsolationConfig
userPid int
allocDir *allocdir.AllocDir
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}
// NewExecDriver is used to create a new exec driver
@@ -92,39 +102,58 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}
}
// Setup the command
execCtx := executor.NewExecutorContext(d.taskEnv)
cmd := executor.Command(execCtx, command, driverConfig.Args...)
if err := cmd.Limit(task.Resources); err != nil {
return nil, fmt.Errorf("failed to constrain resources: %s", err)
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "executor", pluginLogFile),
}
// Populate environment variables
cmd.Command().Env = d.taskEnv.EnvList()
if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil {
return nil, fmt.Errorf("failed to configure task directory: %v", err)
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start command: %v", err)
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
ResourceLimits: true,
FSIsolation: true,
UnprivilegedUser: true,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx)
if err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("error starting process via the plugin: %v", err)
}
d.logger.Printf("[DEBUG] driver.exec: started process via plugin with pid: %v", ps.Pid)
// Return a driver handle
h := &execHandle{
cmd: cmd,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: pluginClient,
userPid: ps.Pid,
executor: exec,
allocDir: ctx.AllocDir,
isolationConfig: ps.IsolationConfig,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
}
type execId struct {
ExecutorId string
KillTimeout time.Duration
KillTimeout time.Duration
UserPid int
TaskDir string
AllocDir *allocdir.AllocDir
IsolationConfig *executor.IsolationConfig
PluginConfig *ExecutorReattachConfig
}
func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@@ -133,30 +162,51 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
}
// Find the process
execCtx := executor.NewExecutorContext(d.taskEnv)
cmd, err := executor.OpenId(execCtx, id.ExecutorId)
pluginConfig := &plugin.ClientConfig{
Reattach: id.PluginConfig.PluginConfig(),
}
exec, client, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err)
merrs := new(multierror.Error)
merrs.Errors = append(merrs.Errors, err)
d.logger.Println("[ERROR] driver.exec: error connecting to plugin so destroying plugin pid and user pid")
if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("error destroying plugin and userpid: %v", e))
}
if id.IsolationConfig != nil {
if e := executor.DestroyCgroup(id.IsolationConfig.Cgroup); e != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("destroying cgroup failed: %v", e))
}
}
if e := ctx.AllocDir.UnmountAll(); e != nil {
merrs.Errors = append(merrs.Errors, e)
}
return nil, fmt.Errorf("error connecting to plugin: %v", merrs.ErrorOrNil())
}
// Return a driver handle
h := &execHandle{
cmd: cmd,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: client,
executor: exec,
userPid: id.UserPid,
allocDir: id.AllocDir,
isolationConfig: id.IsolationConfig,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
}
func (h *execHandle) ID() string {
executorId, _ := h.cmd.ID()
id := execId{
ExecutorId: executorId,
KillTimeout: h.killTimeout,
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
IsolationConfig: h.isolationConfig,
}
data, err := json.Marshal(id)
@@ -179,18 +229,46 @@ func (h *execHandle) Update(task *structs.Task) error {
}
func (h *execHandle) Kill() error {
h.cmd.Shutdown()
if err := h.executor.ShutDown(); err != nil {
return fmt.Errorf("executor Shutdown failed: %v", err)
}
select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
return h.cmd.ForceStop()
if h.pluginClient.Exited() {
return nil
}
if err := h.executor.Exit(); err != nil {
return fmt.Errorf("executor Exit failed: %v", err)
}
return nil
}
}
func (h *execHandle) run() {
res := h.cmd.Wait()
ps, err := h.executor.Wait()
close(h.doneCh)
h.waitCh <- res
// If the exitcode is 0 and we had an error that means the plugin didn't
// connect and doesn't know the state of the user process so we are killing
// the user process so that when we create a new executor on restarting the
// new user process doesn't have collisions with resources that the older
// user pid might be holding onto.
if ps.ExitCode == 0 && err != nil {
if h.isolationConfig != nil {
if e := executor.DestroyCgroup(h.isolationConfig.Cgroup); e != nil {
h.logger.Printf("[ERROR] driver.exec: destroying cgroup failed while killing cgroup: %v", e)
}
}
if e := h.allocDir.UnmountAll(); e != nil {
h.logger.Printf("[ERROR] driver.exec: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0,
Err: err}
close(h.waitCh)
h.pluginClient.Kill()
}

View File

@@ -1,10 +1,13 @@
package driver
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"syscall"
"testing"
"time"
@@ -70,6 +73,66 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) {
if handle2 == nil {
t.Fatalf("missing handle")
}
handle.Kill()
handle2.Kill()
}
func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) {
t.Parallel()
ctestutils.ExecCompatible(t)
task := &structs.Task{
Name: "sleep",
Config: map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"1000000"},
},
Resources: basicResources,
}
driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
handle, err := d.Start(execCtx, task)
defer handle.Kill()
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
id := &execId{}
if err := json.Unmarshal([]byte(handle.ID()), id); err != nil {
t.Fatalf("Failed to parse handle '%s': %v", handle.ID(), err)
}
pluginPid := id.PluginConfig.Pid
proc, err := os.FindProcess(pluginPid)
if err != nil {
t.Fatalf("can't find plugin pid: %v", pluginPid)
}
if err := proc.Kill(); err != nil {
t.Fatalf("can't kill plugin pid: %v", err)
}
// Attempt to open
handle2, err := d.Open(execCtx, handle.ID())
if err == nil {
t.Fatalf("expected error")
}
if handle2 != nil {
handle2.Kill()
t.Fatalf("expected handle2 to be nil")
}
// Test if the userpid is still present
userProc, err := os.FindProcess(id.UserPid)
err = userProc.Signal(syscall.Signal(0))
if err == nil {
t.Fatalf("expected user process to die")
}
}
func TestExecDriver_Start_Wait(t *testing.T) {
@@ -259,9 +322,10 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) {
Name: "sleep",
Config: map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"1"},
"args": []string{"100"},
},
Resources: basicResources,
Resources: basicResources,
KillTimeout: 10 * time.Second,
}
driverCtx, execCtx := testDriverContexts(task)

View File

@@ -1,123 +0,0 @@
// Package executor is used to invoke child processes across various operating
// systems in a way that provides the following features:
//
// - Least privilege
// - Resource constraints
// - Process isolation
//
// An operating system may be something like "windows" or "linux with systemd".
// Executors allow drivers like `exec` and `java` to share an implementation
// for isolation capabilities on a particular operating system.
//
// For example:
//
// - `exec` and `java` on Linux use a cgroups executor
// - `exec` and `java` on FreeBSD use a jails executor
//
// However, drivers that provide their own isolation should not use executors.
// For example, using an executor to start QEMU means that the QEMU call is
// run inside a chroot+cgroup, even though the VM already provides isolation for
// the task running inside it. This is an extraneous level of indirection.
package executor
import (
"fmt"
"os/exec"
"path/filepath"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/client/driver/env"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
var errNoResources = fmt.Errorf("No resources are associated with this task")
// Executor is an interface that any platform- or capability-specific exec
// wrapper must implement. You should not need to implement a Java executor.
// Rather, you would implement a cgroups executor that the Java driver will use.
type Executor interface {
// Limit must be called before Start and restricts the amount of resources
// the process can use. Note that an error may be returned ONLY IF the
// executor implements resource limiting. Otherwise Limit is ignored.
Limit(*structs.Resources) error
// ConfigureTaskDir must be called before Start and ensures that the tasks
// directory is properly configured.
ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error
// Start the process. This may wrap the actual process in another command,
// depending on the capabilities in this environment. Errors that arise from
// Limits or Runas may bubble through Start()
Start() error
// Open should be called to restore a previous execution. This might be needed if
// nomad is restarted.
Open(string) error
// Wait waits till the user's command is completed.
Wait() *cstructs.WaitResult
// Returns a handle that is executor specific for use in reopening.
ID() (string, error)
// Shutdown should use a graceful stop mechanism so the application can
// perform checkpointing or cleanup, if such a mechanism is available.
// If such a mechanism is not available, Shutdown() should call ForceStop().
Shutdown() error
// ForceStop will terminate the process without waiting for cleanup. Every
// implementations must provide this.
ForceStop() error
// Command provides access the underlying Cmd struct in case the Executor
// interface doesn't expose the functionality you need.
Command() *exec.Cmd
}
// ExecutorContext is a means to inject dependencies such as loggers, configs, and
// node attributes into a Driver without having to change the Driver interface
// each time we do it. Used in conjection with Factory, above.
type ExecutorContext struct {
taskEnv *env.TaskEnvironment
}
// NewExecutorContext initializes a new DriverContext with the specified fields.
func NewExecutorContext(taskEnv *env.TaskEnvironment) *ExecutorContext {
return &ExecutorContext{
taskEnv: taskEnv,
}
}
// Command returns a platform-specific Executor
func Command(ctx *ExecutorContext, name string, args ...string) Executor {
executor := NewExecutor(ctx)
SetCommand(executor, name, args)
return executor
}
func SetCommand(e Executor, name string, args []string) {
cmd := e.Command()
cmd.Path = name
cmd.Args = append([]string{name}, args...)
if filepath.Base(name) == name {
if lp, err := exec.LookPath(name); err != nil {
// cmd.lookPathErr = err
} else {
cmd.Path = lp
}
}
}
// OpenId is similar to executor.Command but will attempt to reopen with the
// passed ID.
func OpenId(ctx *ExecutorContext, id string) (Executor, error) {
executor := NewExecutor(ctx)
err := executor.Open(id)
if err != nil {
return nil, err
}
return executor, nil
}

View File

@@ -1,131 +0,0 @@
package executor
import (
"bytes"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/spawn"
"github.com/hashicorp/nomad/nomad/structs"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
// BasicExecutor should work everywhere, and as a result does not include
// any resource restrictions or runas capabilities.
type BasicExecutor struct {
*ExecutorContext
cmd exec.Cmd
spawn *spawn.Spawner
taskName string
taskDir string
allocDir string
}
func NewBasicExecutor(ctx *ExecutorContext) Executor {
return &BasicExecutor{ExecutorContext: ctx}
}
func (e *BasicExecutor) Limit(resources *structs.Resources) error {
if resources == nil {
return errNoResources
}
return nil
}
func (e *BasicExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error {
taskDir, ok := alloc.TaskDirs[taskName]
if !ok {
return fmt.Errorf("Couldn't find task directory for task %v", taskName)
}
e.cmd.Dir = taskDir
e.taskDir = taskDir
e.taskName = taskName
e.allocDir = alloc.AllocDir
return nil
}
func (e *BasicExecutor) Start() error {
// Parse the commands arguments and replace instances of Nomad environment
// variables.
e.cmd.Path = e.taskEnv.ReplaceEnv(e.cmd.Path)
e.cmd.Args = e.taskEnv.ParseAndReplace(e.cmd.Args)
e.cmd.Env = e.taskEnv.Build().EnvList()
spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status"))
e.spawn = spawn.NewSpawner(spawnState)
e.spawn.SetCommand(&e.cmd)
e.spawn.SetLogs(&spawn.Logs{
Stdout: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", e.taskName)),
Stderr: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", e.taskName)),
Stdin: os.DevNull,
})
return e.spawn.Spawn(nil)
}
func (e *BasicExecutor) Open(id string) error {
var spawn spawn.Spawner
dec := json.NewDecoder(strings.NewReader(id))
if err := dec.Decode(&spawn); err != nil {
return fmt.Errorf("Failed to parse id: %v", err)
}
// Setup the executor.
e.spawn = &spawn
return e.spawn.Valid()
}
func (e *BasicExecutor) Wait() *cstructs.WaitResult {
return e.spawn.Wait()
}
func (e *BasicExecutor) ID() (string, error) {
if e.spawn == nil {
return "", fmt.Errorf("Process was never started")
}
var buffer bytes.Buffer
enc := json.NewEncoder(&buffer)
if err := enc.Encode(e.spawn); err != nil {
return "", fmt.Errorf("Failed to serialize id: %v", err)
}
return buffer.String(), nil
}
func (e *BasicExecutor) Shutdown() error {
proc, err := os.FindProcess(e.spawn.UserPid)
if err != nil {
return fmt.Errorf("Failed to find user processes %v: %v", e.spawn.UserPid, err)
}
if runtime.GOOS == "windows" {
return proc.Kill()
}
return proc.Signal(os.Interrupt)
}
func (e *BasicExecutor) ForceStop() error {
proc, err := os.FindProcess(e.spawn.UserPid)
if err != nil {
return fmt.Errorf("Failed to find user processes %v: %v", e.spawn.UserPid, err)
}
if err := proc.Kill(); err != nil && err.Error() != "os: process already finished" {
return err
}
return nil
}
func (e *BasicExecutor) Command() *exec.Cmd {
return &e.cmd
}

View File

@@ -1,8 +0,0 @@
package executor
import "testing"
func TestExecutorBasic(t *testing.T) {
t.Parallel()
testExecutor(t, NewBasicExecutor, nil)
}

View File

@@ -1,426 +0,0 @@
package executor
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"
"os/user"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"github.com/hashicorp/go-multierror"
"github.com/opencontainers/runc/libcontainer/cgroups"
cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/cgroups/systemd"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/spawn"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
var (
// A mapping of directories on the host OS to attempt to embed inside each
// task's chroot.
chrootEnv = map[string]string{
"/bin": "/bin",
"/etc": "/etc",
"/lib": "/lib",
"/lib32": "/lib32",
"/lib64": "/lib64",
"/usr/bin": "/usr/bin",
"/usr/lib": "/usr/lib",
"/usr/share": "/usr/share",
}
)
func NewExecutor(ctx *ExecutorContext) Executor {
return NewLinuxExecutor(ctx)
}
func NewLinuxExecutor(ctx *ExecutorContext) Executor {
return &LinuxExecutor{ExecutorContext: ctx}
}
// Linux executor is designed to run on linux kernel 2.8+.
type LinuxExecutor struct {
*ExecutorContext
cmd exec.Cmd
user *user.User
l sync.Mutex
// Isolation configurations.
groups *cgroupConfig.Cgroup
taskName string
taskDir string
allocDir string
// Spawn process.
spawn *spawn.Spawner
}
func (e *LinuxExecutor) Command() *exec.Cmd {
return &e.cmd
}
func (e *LinuxExecutor) Limit(resources *structs.Resources) error {
if resources == nil {
return errNoResources
}
return e.configureCgroups(resources)
}
// execLinuxID contains the necessary information to reattach to an executed
// process and cleanup the created cgroups.
type ExecLinuxID struct {
Groups *cgroupConfig.Cgroup
Spawn *spawn.Spawner
TaskDir string
}
func (e *LinuxExecutor) Open(id string) error {
// De-serialize the ID.
dec := json.NewDecoder(strings.NewReader(id))
var execID ExecLinuxID
if err := dec.Decode(&execID); err != nil {
return fmt.Errorf("Failed to parse id: %v", err)
}
// Setup the executor.
e.groups = execID.Groups
e.spawn = execID.Spawn
e.taskDir = execID.TaskDir
return e.spawn.Valid()
}
func (e *LinuxExecutor) ID() (string, error) {
if e.groups == nil || e.spawn == nil || e.taskDir == "" {
return "", fmt.Errorf("LinuxExecutor not properly initialized.")
}
// Build the ID.
id := ExecLinuxID{
Groups: e.groups,
Spawn: e.spawn,
TaskDir: e.taskDir,
}
var buffer bytes.Buffer
enc := json.NewEncoder(&buffer)
if err := enc.Encode(id); err != nil {
return "", fmt.Errorf("Failed to serialize id: %v", err)
}
return buffer.String(), nil
}
// runAs takes a user id as a string and looks up the user, and sets the command
// to execute as that user.
func (e *LinuxExecutor) runAs(userid string) error {
u, err := user.Lookup(userid)
if err != nil {
return fmt.Errorf("Failed to identify user %v: %v", userid, err)
}
// Convert the uid and gid
uid, err := strconv.ParseUint(u.Uid, 10, 32)
if err != nil {
return fmt.Errorf("Unable to convert userid to uint32: %s", err)
}
gid, err := strconv.ParseUint(u.Gid, 10, 32)
if err != nil {
return fmt.Errorf("Unable to convert groupid to uint32: %s", err)
}
// Set the command to run as that user and group.
if e.cmd.SysProcAttr == nil {
e.cmd.SysProcAttr = &syscall.SysProcAttr{}
}
if e.cmd.SysProcAttr.Credential == nil {
e.cmd.SysProcAttr.Credential = &syscall.Credential{}
}
e.cmd.SysProcAttr.Credential.Uid = uint32(uid)
e.cmd.SysProcAttr.Credential.Gid = uint32(gid)
return nil
}
func (e *LinuxExecutor) Start() error {
// Run as "nobody" user so we don't leak root privilege to the spawned
// process.
if err := e.runAs("nobody"); err != nil {
return err
}
// Parse the commands arguments and replace instances of Nomad environment
// variables.
e.cmd.Path = e.taskEnv.ReplaceEnv(e.cmd.Path)
e.cmd.Args = e.taskEnv.ParseAndReplace(e.cmd.Args)
e.cmd.Env = e.taskEnv.EnvList()
spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status"))
e.spawn = spawn.NewSpawner(spawnState)
e.spawn.SetCommand(&e.cmd)
e.spawn.SetChroot(e.taskDir)
e.spawn.SetLogs(&spawn.Logs{
Stdout: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", e.taskName)),
Stderr: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", e.taskName)),
Stdin: os.DevNull,
})
enterCgroup := func(pid int) error {
// Join the spawn-daemon to the cgroup.
manager := e.getCgroupManager(e.groups)
// Apply will place the spawn dameon into the created cgroups.
if err := manager.Apply(pid); err != nil {
return fmt.Errorf("Failed to join spawn-daemon to the cgroup (%+v): %v", e.groups, err)
}
return nil
}
return e.spawn.Spawn(enterCgroup)
}
// Wait waits til the user process exits and returns an error on non-zero exit
// codes. Wait also cleans up the task directory and created cgroups.
func (e *LinuxExecutor) Wait() *cstructs.WaitResult {
errs := new(multierror.Error)
res := e.spawn.Wait()
if res.Err != nil {
errs = multierror.Append(errs, res.Err)
}
if err := e.destroyCgroup(); err != nil {
errs = multierror.Append(errs, err)
}
if err := e.cleanTaskDir(); err != nil {
errs = multierror.Append(errs, err)
}
res.Err = errs.ErrorOrNil()
return res
}
// Shutdown sends the user process an interrupt signal indicating that it is
// about to be forcefully shutdown in sometime
func (e *LinuxExecutor) Shutdown() error {
proc, err := os.FindProcess(e.spawn.UserPid)
if err != nil {
return fmt.Errorf("Failed to find user processes %v: %v", e.spawn.UserPid, err)
}
return proc.Signal(os.Interrupt)
}
// ForceStop immediately exits the user process and cleans up both the task
// directory and the cgroups.
func (e *LinuxExecutor) ForceStop() error {
errs := new(multierror.Error)
if err := e.destroyCgroup(); err != nil {
errs = multierror.Append(errs, err)
}
if err := e.cleanTaskDir(); err != nil {
errs = multierror.Append(errs, err)
}
return errs.ErrorOrNil()
}
// Task Directory related functions.
// ConfigureTaskDir creates the necessary directory structure for a proper
// chroot. cleanTaskDir should be called after.
func (e *LinuxExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error {
e.taskName = taskName
e.allocDir = alloc.AllocDir
taskDir, ok := alloc.TaskDirs[taskName]
if !ok {
fmt.Errorf("Couldn't find task directory for task %v", taskName)
}
e.taskDir = taskDir
if err := alloc.MountSharedDir(taskName); err != nil {
return err
}
if err := alloc.Embed(taskName, chrootEnv); err != nil {
return err
}
// Mount dev
dev := filepath.Join(taskDir, "dev")
if !e.pathExists(dev) {
if err := os.Mkdir(dev, 0777); err != nil {
return fmt.Errorf("Mkdir(%v) failed: %v", dev, err)
}
if err := syscall.Mount("none", dev, "devtmpfs", syscall.MS_RDONLY, ""); err != nil {
return fmt.Errorf("Couldn't mount /dev to %v: %v", dev, err)
}
}
// Mount proc
proc := filepath.Join(taskDir, "proc")
if !e.pathExists(proc) {
if err := os.Mkdir(proc, 0777); err != nil {
return fmt.Errorf("Mkdir(%v) failed: %v", proc, err)
}
if err := syscall.Mount("none", proc, "proc", syscall.MS_RDONLY, ""); err != nil {
return fmt.Errorf("Couldn't mount /proc to %v: %v", proc, err)
}
}
// Set the tasks AllocDir environment variable.
e.taskEnv.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName)).SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)).Build()
return nil
}
// pathExists is a helper function to check if the path exists.
func (e *LinuxExecutor) pathExists(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
// cleanTaskDir is an idempotent operation to clean the task directory and
// should be called when tearing down the task.
func (e *LinuxExecutor) cleanTaskDir() error {
// Prevent a race between Wait/ForceStop
e.l.Lock()
defer e.l.Unlock()
// Unmount dev.
errs := new(multierror.Error)
dev := filepath.Join(e.taskDir, "dev")
if e.pathExists(dev) {
if err := syscall.Unmount(dev, 0); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to unmount dev (%v): %v", dev, err))
}
if err := os.RemoveAll(dev); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to delete dev directory (%v): %v", dev, err))
}
}
// Unmount proc.
proc := filepath.Join(e.taskDir, "proc")
if e.pathExists(proc) {
if err := syscall.Unmount(proc, 0); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to unmount proc (%v): %v", proc, err))
}
if err := os.RemoveAll(proc); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to delete proc directory (%v): %v", dev, err))
}
}
return errs.ErrorOrNil()
}
// Cgroup related functions.
// configureCgroups converts a Nomad Resources specification into the equivalent
// cgroup configuration. It returns an error if the resources are invalid.
func (e *LinuxExecutor) configureCgroups(resources *structs.Resources) error {
e.groups = &cgroupConfig.Cgroup{}
e.groups.Resources = &cgroupConfig.Resources{}
e.groups.Name = structs.GenerateUUID()
// TODO: verify this is needed for things like network access
e.groups.Resources.AllowAllDevices = true
if resources.MemoryMB > 0 {
// Total amount of memory allowed to consume
e.groups.Resources.Memory = int64(resources.MemoryMB * 1024 * 1024)
// Disable swap to avoid issues on the machine
e.groups.Resources.MemorySwap = int64(-1)
}
if resources.CPU < 2 {
return fmt.Errorf("resources.CPU must be equal to or greater than 2: %v", resources.CPU)
}
// Set the relative CPU shares for this cgroup.
e.groups.Resources.CpuShares = int64(resources.CPU)
if resources.IOPS != 0 {
// Validate it is in an acceptable range.
if resources.IOPS < 10 || resources.IOPS > 1000 {
return fmt.Errorf("resources.IOPS must be between 10 and 1000: %d", resources.IOPS)
}
e.groups.Resources.BlkioWeight = uint16(resources.IOPS)
}
return nil
}
// destroyCgroup kills all processes in the cgroup and removes the cgroup
// configuration from the host.
func (e *LinuxExecutor) destroyCgroup() error {
if e.groups == nil {
return errors.New("Can't destroy: cgroup configuration empty")
}
// Prevent a race between Wait/ForceStop
e.l.Lock()
defer e.l.Unlock()
manager := e.getCgroupManager(e.groups)
pids, err := manager.GetPids()
if err != nil {
return fmt.Errorf("Failed to get pids in the cgroup %v: %v", e.groups.Name, err)
}
errs := new(multierror.Error)
for _, pid := range pids {
process, err := os.FindProcess(pid)
if err != nil {
multierror.Append(errs, fmt.Errorf("Failed to find Pid %v: %v", pid, err))
continue
}
if err := process.Kill(); err != nil && err.Error() != "os: process already finished" {
multierror.Append(errs, fmt.Errorf("Failed to kill Pid %v: %v", pid, err))
continue
}
}
// Remove the cgroup.
if err := manager.Destroy(); err != nil {
multierror.Append(errs, fmt.Errorf("Failed to delete the cgroup directories: %v", err))
}
if len(errs.Errors) != 0 {
return fmt.Errorf("Failed to destroy cgroup: %v", errs)
}
return nil
}
// getCgroupManager returns the correct libcontainer cgroup manager.
func (e *LinuxExecutor) getCgroupManager(groups *cgroupConfig.Cgroup) cgroups.Manager {
var manager cgroups.Manager
manager = &cgroupFs.Manager{Cgroups: groups}
if systemd.UseSystemd() {
manager = &systemd.Manager{Cgroups: groups}
}
return manager
}

View File

@@ -1,18 +0,0 @@
package executor
import (
"testing"
ctestutil "github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/testtask"
)
func init() {
// Add test binary to chroot during test run.
chrootEnv[testtask.Path()] = testtask.Path()
}
func TestExecutorLinux(t *testing.T) {
t.Parallel()
testExecutor(t, NewLinuxExecutor, ctestutil.ExecCompatible)
}

View File

@@ -1,14 +0,0 @@
// +build !linux
package executor
func NewExecutor(ctx *ExecutorContext) Executor {
return &UniversalExecutor{
BasicExecutor: NewBasicExecutor(ctx).(*BasicExecutor),
}
}
// UniversalExecutor wraps the BasicExecutor
type UniversalExecutor struct {
*BasicExecutor
}

View File

@@ -0,0 +1,259 @@
package executor
import (
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"syscall"
"time"
"github.com/hashicorp/go-multierror"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/nomad/structs"
)
// ExecutorContext holds context to configure the command user
// wants to run and isolate it
type ExecutorContext struct {
// TaskEnv holds information about the environment of a Task
TaskEnv *env.TaskEnvironment
// AllocDir is the handle to do operations on the alloc dir of
// the task
AllocDir *allocdir.AllocDir
// TaskName is the name of the Task
TaskName string
// TaskResources are the resource constraints for the Task
TaskResources *structs.Resources
// FSIsolation is a flag for drivers to impose file system
// isolation on certain platforms
FSIsolation bool
// ResourceLimits is a flag for drivers to impose resource
// contraints on a Task on certain platforms
ResourceLimits bool
// UnprivilegedUser is a flag for drivers to make the process
// run as nobody
UnprivilegedUser bool
}
// ExecCommand holds the user command and args. It's a lightweight replacement
// of exec.Cmd for serialization purposes.
type ExecCommand struct {
Cmd string
Args []string
}
// IsolationConfig has information about the isolation mechanism the executor
// uses to put resource constraints and isolation on the user process
type IsolationConfig struct {
Cgroup *cgroupConfig.Cgroup
}
// ProcessState holds information about the state of a user process.
type ProcessState struct {
Pid int
ExitCode int
Signal int
IsolationConfig *IsolationConfig
Time time.Time
}
// Executor is the interface which allows a driver to launch and supervise
// a process
type Executor interface {
LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error)
Wait() (*ProcessState, error)
ShutDown() error
Exit() error
}
// UniversalExecutor is an implementation of the Executor which launches and
// supervises processes. In addition to process supervision it provides resource
// and file system isolation
type UniversalExecutor struct {
cmd exec.Cmd
ctx *ExecutorContext
taskDir string
groups *cgroupConfig.Cgroup
exitState *ProcessState
processExited chan interface{}
logger *log.Logger
lock sync.Mutex
}
// NewExecutor returns an Executor
func NewExecutor(logger *log.Logger) Executor {
return &UniversalExecutor{logger: logger, processExited: make(chan interface{})}
}
// LaunchCmd launches a process and returns it's state. It also configures an
// applies isolation on certain platforms.
func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) {
e.logger.Printf("[DEBUG] executor: launching command %v %v", command.Cmd, strings.Join(command.Args, ""))
e.ctx = ctx
// configuring the task dir
if err := e.configureTaskDir(); err != nil {
return nil, err
}
// configuring the chroot, cgroup and enters the plugin process in the
// chroot
if err := e.configureIsolation(); err != nil {
return nil, err
}
// setting the user of the process
if e.ctx.UnprivilegedUser {
if err := e.runAs("nobody"); err != nil {
return nil, err
}
}
// configuring log rotate
stdoPath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", ctx.TaskName))
stdo, err := os.OpenFile(stdoPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
e.cmd.Stdout = stdo
stdePath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", ctx.TaskName))
stde, err := os.OpenFile(stdePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
e.cmd.Stderr = stde
// setting the env, path and args for the command
e.ctx.TaskEnv.Build()
e.cmd.Env = ctx.TaskEnv.EnvList()
e.cmd.Path = ctx.TaskEnv.ReplaceEnv(command.Cmd)
e.cmd.Args = append([]string{e.cmd.Path}, ctx.TaskEnv.ParseAndReplace(command.Args)...)
if filepath.Base(command.Cmd) == command.Cmd {
if lp, err := exec.LookPath(command.Cmd); err != nil {
} else {
e.cmd.Path = lp
}
}
// starting the process
if err := e.cmd.Start(); err != nil {
return nil, fmt.Errorf("error starting command: %v", err)
}
go e.wait()
ic := &IsolationConfig{Cgroup: e.groups}
return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil
}
// Wait waits until a process has exited and returns it's exitcode and errors
func (e *UniversalExecutor) Wait() (*ProcessState, error) {
<-e.processExited
return e.exitState, nil
}
func (e *UniversalExecutor) wait() {
defer close(e.processExited)
err := e.cmd.Wait()
if err == nil {
e.exitState = &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()}
return
}
exitCode := 1
if exitErr, ok := err.(*exec.ExitError); ok {
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
exitCode = status.ExitStatus()
}
}
if e.ctx.FSIsolation {
e.removeChrootMounts()
}
if e.ctx.ResourceLimits {
e.lock.Lock()
DestroyCgroup(e.groups)
e.lock.Unlock()
}
e.exitState = &ProcessState{Pid: 0, ExitCode: exitCode, Time: time.Now()}
}
var (
// finishedErr is the error message received when trying to kill and already
// exited process.
finishedErr = "os: process already finished"
)
// Exit cleans up the alloc directory, destroys cgroups and kills the user
// process
func (e *UniversalExecutor) Exit() error {
var merr multierror.Error
if e.cmd.Process != nil {
proc, err := os.FindProcess(e.cmd.Process.Pid)
if err != nil {
e.logger.Printf("[ERROR] executor: can't find process with pid: %v, err: %v",
e.cmd.Process.Pid, err)
} else if err := proc.Kill(); err != nil && err.Error() != finishedErr {
merr.Errors = append(merr.Errors,
fmt.Errorf("can't kill process with pid: %v, err: %v", e.cmd.Process.Pid, err))
}
}
if e.ctx.FSIsolation {
if err := e.removeChrootMounts(); err != nil {
merr.Errors = append(merr.Errors, err)
}
}
if e.ctx.ResourceLimits {
e.lock.Lock()
if err := DestroyCgroup(e.groups); err != nil {
merr.Errors = append(merr.Errors, err)
}
e.lock.Unlock()
}
return merr.ErrorOrNil()
}
// Shutdown sends an interrupt signal to the user process
func (e *UniversalExecutor) ShutDown() error {
if e.cmd.Process == nil {
return fmt.Errorf("executor.shutdown error: no process found")
}
proc, err := os.FindProcess(e.cmd.Process.Pid)
if err != nil {
return fmt.Errorf("executor.shutdown error: %v", err)
}
if runtime.GOOS == "windows" {
return proc.Kill()
}
if err = proc.Signal(os.Interrupt); err != nil {
return fmt.Errorf("executor.shutdown error: %v", err)
}
return nil
}
// configureTaskDir sets the task dir in the executor
func (e *UniversalExecutor) configureTaskDir() error {
taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.TaskName]
e.taskDir = taskDir
if !ok {
return fmt.Errorf("couldn't find task directory for task %v", e.ctx.TaskName)
}
e.cmd.Dir = taskDir
return nil
}

View File

@@ -0,0 +1,31 @@
// +build !linux
package executor
import (
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
func (e *UniversalExecutor) configureChroot() error {
return nil
}
func DestroyCgroup(groups *cgroupConfig.Cgroup) error {
return nil
}
func (e *UniversalExecutor) removeChrootMounts() error {
return nil
}
func (e *UniversalExecutor) runAs(userid string) error {
return nil
}
func (e *UniversalExecutor) applyLimits(pid int) error {
return nil
}
func (e *UniversalExecutor) configureIsolation() error {
return nil
}

View File

@@ -0,0 +1,224 @@
package executor
import (
"fmt"
"os"
"os/user"
"path/filepath"
"strconv"
"syscall"
"github.com/hashicorp/go-multierror"
"github.com/opencontainers/runc/libcontainer/cgroups"
cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/cgroups/systemd"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/nomad/structs"
)
var (
// A mapping of directories on the host OS to attempt to embed inside each
// task's chroot.
chrootEnv = map[string]string{
"/bin": "/bin",
"/etc": "/etc",
"/lib": "/lib",
"/lib32": "/lib32",
"/lib64": "/lib64",
"/usr/bin": "/usr/bin",
"/usr/lib": "/usr/lib",
"/usr/share": "/usr/share",
}
)
// configureIsolation configures chroot and creates cgroups
func (e *UniversalExecutor) configureIsolation() error {
if e.ctx.FSIsolation {
if err := e.configureChroot(); err != nil {
return err
}
}
if e.ctx.ResourceLimits {
if err := e.configureCgroups(e.ctx.TaskResources); err != nil {
return fmt.Errorf("error creating cgroups: %v", err)
}
if err := e.applyLimits(os.Getpid()); err != nil {
if er := DestroyCgroup(e.groups); er != nil {
e.logger.Printf("[ERROR] executor: error destroying cgroup: %v", er)
}
if er := e.removeChrootMounts(); er != nil {
e.logger.Printf("[ERROR] executor: error removing chroot: %v", er)
}
return fmt.Errorf("error entering the plugin process in the cgroup: %v:", err)
}
}
return nil
}
// applyLimits puts a process in a pre-configured cgroup
func (e *UniversalExecutor) applyLimits(pid int) error {
if !e.ctx.ResourceLimits {
return nil
}
// Entering the process in the cgroup
manager := getCgroupManager(e.groups)
if err := manager.Apply(pid); err != nil {
e.logger.Printf("[ERROR] executor: unable to join cgroup: %v", err)
if err := e.Exit(); err != nil {
e.logger.Printf("[ERROR] executor: unable to kill process: %v", err)
}
return err
}
return nil
}
// configureCgroups converts a Nomad Resources specification into the equivalent
// cgroup configuration. It returns an error if the resources are invalid.
func (e *UniversalExecutor) configureCgroups(resources *structs.Resources) error {
e.groups = &cgroupConfig.Cgroup{}
e.groups.Resources = &cgroupConfig.Resources{}
e.groups.Name = structs.GenerateUUID()
// TODO: verify this is needed for things like network access
e.groups.Resources.AllowAllDevices = true
if resources.MemoryMB > 0 {
// Total amount of memory allowed to consume
e.groups.Resources.Memory = int64(resources.MemoryMB * 1024 * 1024)
// Disable swap to avoid issues on the machine
e.groups.Resources.MemorySwap = int64(-1)
}
if resources.CPU < 2 {
return fmt.Errorf("resources.CPU must be equal to or greater than 2: %v", resources.CPU)
}
// Set the relative CPU shares for this cgroup.
e.groups.Resources.CpuShares = int64(resources.CPU)
if resources.IOPS != 0 {
// Validate it is in an acceptable range.
if resources.IOPS < 10 || resources.IOPS > 1000 {
return fmt.Errorf("resources.IOPS must be between 10 and 1000: %d", resources.IOPS)
}
e.groups.Resources.BlkioWeight = uint16(resources.IOPS)
}
return nil
}
// runAs takes a user id as a string and looks up the user, and sets the command
// to execute as that user.
func (e *UniversalExecutor) runAs(userid string) error {
u, err := user.Lookup(userid)
if err != nil {
return fmt.Errorf("Failed to identify user %v: %v", userid, err)
}
// Convert the uid and gid
uid, err := strconv.ParseUint(u.Uid, 10, 32)
if err != nil {
return fmt.Errorf("Unable to convert userid to uint32: %s", err)
}
gid, err := strconv.ParseUint(u.Gid, 10, 32)
if err != nil {
return fmt.Errorf("Unable to convert groupid to uint32: %s", err)
}
// Set the command to run as that user and group.
if e.cmd.SysProcAttr == nil {
e.cmd.SysProcAttr = &syscall.SysProcAttr{}
}
if e.cmd.SysProcAttr.Credential == nil {
e.cmd.SysProcAttr.Credential = &syscall.Credential{}
}
e.cmd.SysProcAttr.Credential.Uid = uint32(uid)
e.cmd.SysProcAttr.Credential.Gid = uint32(gid)
return nil
}
// configureChroot configures a chroot
func (e *UniversalExecutor) configureChroot() error {
allocDir := e.ctx.AllocDir
if err := allocDir.MountSharedDir(e.ctx.TaskName); err != nil {
return err
}
if err := allocDir.Embed(e.ctx.TaskName, chrootEnv); err != nil {
return err
}
// Set the tasks AllocDir environment variable.
e.ctx.TaskEnv.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName)).SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)).Build()
if e.cmd.SysProcAttr == nil {
e.cmd.SysProcAttr = &syscall.SysProcAttr{}
}
e.cmd.SysProcAttr.Chroot = e.taskDir
e.cmd.Dir = "/"
if err := allocDir.MountSpecialDirs(e.taskDir); err != nil {
return err
}
return nil
}
// cleanTaskDir is an idempotent operation to clean the task directory and
// should be called when tearing down the task.
func (e *UniversalExecutor) removeChrootMounts() error {
// Prevent a race between Wait/ForceStop
e.lock.Lock()
defer e.lock.Unlock()
return e.ctx.AllocDir.UnmountAll()
}
// destroyCgroup kills all processes in the cgroup and removes the cgroup
// configuration from the host.
func DestroyCgroup(groups *cgroupConfig.Cgroup) error {
merrs := new(multierror.Error)
if groups == nil {
return fmt.Errorf("Can't destroy: cgroup configuration empty")
}
manager := getCgroupManager(groups)
if pids, perr := manager.GetPids(); perr == nil {
for _, pid := range pids {
proc, err := os.FindProcess(pid)
if err != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("error finding process %v: %v", pid, err))
} else {
if e := proc.Kill(); e != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("error killing process %v: %v", pid, e))
}
}
}
}
// Remove the cgroup.
if err := manager.Destroy(); err != nil {
multierror.Append(merrs, fmt.Errorf("Failed to delete the cgroup directories: %v", err))
}
if len(merrs.Errors) != 0 {
return fmt.Errorf("errors while destroying cgroup: %v", merrs)
}
return nil
}
// getCgroupManager returns the correct libcontainer cgroup manager.
func getCgroupManager(groups *cgroupConfig.Cgroup) cgroups.Manager {
var manager cgroups.Manager
manager = &cgroupFs.Manager{Cgroups: groups}
if systemd.UseSystemd() {
manager = &systemd.Manager{Cgroups: groups}
}
return manager
}

View File

@@ -0,0 +1,203 @@
package executor
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
tu "github.com/hashicorp/nomad/testutil"
)
var (
constraint = &structs.Resources{
CPU: 250,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}},
},
},
}
)
func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) {
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID))
if err := allocDir.Build([]*structs.Task{task}); err != nil {
log.Panicf("allocDir.Build() failed: %v", err)
}
return task.Name, allocDir
}
func testExecutorContext(t *testing.T) *ExecutorContext {
taskEnv := env.NewTaskEnvironment(mock.Node())
taskName, allocDir := mockAllocDir(t)
ctx := &ExecutorContext{
TaskEnv: taskEnv,
TaskName: taskName,
AllocDir: allocDir,
TaskResources: constraint,
}
return ctx
}
func TestExecutor_Start_Invalid(t *testing.T) {
invalid := "/bin/foobar"
execCmd := ExecCommand{Cmd: invalid, Args: []string{"1"}}
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
_, err := executor.LaunchCmd(&execCmd, ctx)
if err == nil {
t.Fatalf("Expected error")
}
}
func TestExecutor_Start_Wait_Failure_Code(t *testing.T) {
execCmd := ExecCommand{Cmd: "/bin/sleep", Args: []string{"fail"}}
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, _ := executor.LaunchCmd(&execCmd, ctx)
if ps.Pid == 0 {
t.Fatalf("expected process to start and have non zero pid")
}
ps, _ = executor.Wait()
if ps.ExitCode < 1 {
t.Fatalf("expected exit code to be non zero, actual: %v", ps.ExitCode)
}
}
func TestExecutor_Start_Wait(t *testing.T) {
execCmd := ExecCommand{Cmd: "/bin/echo", Args: []string{"hello world"}}
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, err := executor.LaunchCmd(&execCmd, ctx)
if err != nil {
t.Fatalf("error in launching command: %v", err)
}
if ps.Pid == 0 {
t.Fatalf("expected process to start and have non zero pid")
}
ps, err = executor.Wait()
if err != nil {
t.Fatalf("error in waiting for command: %v", err)
}
task := "web"
taskDir, ok := ctx.AllocDir.TaskDirs[task]
if !ok {
log.Panicf("No task directory found for task %v", task)
}
expected := "hello world"
file := filepath.Join(allocdir.TaskLocal, "web.stdout")
absFilePath := filepath.Join(taskDir, file)
output, err := ioutil.ReadFile(absFilePath)
if err != nil {
t.Fatalf("Couldn't read file %v", absFilePath)
}
act := strings.TrimSpace(string(output))
if act != expected {
t.Fatalf("Command output incorrectly: want %v; got %v", expected, act)
}
}
func TestExecutor_IsolationAndConstraints(t *testing.T) {
testutil.ExecCompatible(t)
execCmd := ExecCommand{Cmd: "/bin/echo", Args: []string{"hello world"}}
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
ctx.FSIsolation = true
ctx.ResourceLimits = true
ctx.UnprivilegedUser = true
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, err := executor.LaunchCmd(&execCmd, ctx)
if err != nil {
t.Fatalf("error in launching command: %v", err)
}
if ps.Pid == 0 {
t.Fatalf("expected process to start and have non zero pid")
}
ps, err = executor.Wait()
if err != nil {
t.Fatalf("error in waiting for command: %v", err)
}
task := "web"
taskDir, ok := ctx.AllocDir.TaskDirs[task]
if !ok {
log.Panicf("No task directory found for task %v", task)
}
expected := "hello world"
file := filepath.Join(allocdir.TaskLocal, "web.stdout")
absFilePath := filepath.Join(taskDir, file)
output, err := ioutil.ReadFile(absFilePath)
if err != nil {
t.Fatalf("Couldn't read file %v", absFilePath)
}
act := strings.TrimSpace(string(output))
if act != expected {
t.Fatalf("Command output incorrectly: want %v; got %v", expected, act)
}
}
func TestExecutor_Start_Kill(t *testing.T) {
execCmd := ExecCommand{Cmd: "/bin/sleep", Args: []string{"10 && hello world"}}
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, err := executor.LaunchCmd(&execCmd, ctx)
if err != nil {
t.Fatalf("error in launching command: %v", err)
}
if ps.Pid == 0 {
t.Fatalf("expected process to start and have non zero pid")
}
ps, err = executor.Wait()
if err != nil {
t.Fatalf("error in waiting for command: %v", err)
}
task := "web"
taskDir, ok := ctx.AllocDir.TaskDirs[task]
if !ok {
t.Fatalf("No task directory found for task %v", task)
}
file := filepath.Join(allocdir.TaskLocal, "web.stdout")
absFilePath := filepath.Join(taskDir, file)
time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second)
output, err := ioutil.ReadFile(absFilePath)
if err != nil {
t.Fatalf("Couldn't read file %v", absFilePath)
}
expected := ""
act := strings.TrimSpace(string(output))
if act != expected {
t.Fatalf("Command output incorrectly: want %v; got %v", expected, act)
}
}

View File

@@ -1,286 +0,0 @@
package executor
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"testing"
"time"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
func TestMain(m *testing.M) {
if !testtask.Run() {
os.Exit(m.Run())
}
}
var (
constraint = &structs.Resources{
CPU: 250,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}},
},
},
}
)
func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) {
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID))
if err := allocDir.Build([]*structs.Task{task}); err != nil {
log.Panicf("allocDir.Build() failed: %v", err)
}
return task.Name, allocDir
}
func testExecutorContext() *ExecutorContext {
taskEnv := env.NewTaskEnvironment(mock.Node())
return &ExecutorContext{taskEnv: taskEnv}
}
func testExecutor(t *testing.T, buildExecutor func(*ExecutorContext) Executor, compatible func(*testing.T)) {
if compatible != nil {
compatible(t)
}
command := func(name string, args ...string) Executor {
ctx := testExecutorContext()
e := buildExecutor(ctx)
SetCommand(e, name, args)
testtask.SetEnv(ctx.taskEnv)
return e
}
Executor_Start_Invalid(t, command)
Executor_Start_Wait_Failure_Code(t, command)
Executor_Start_Wait(t, command)
Executor_Start_Kill(t, command)
Executor_Open(t, command, buildExecutor)
Executor_Open_Invalid(t, command, buildExecutor)
}
type buildExecCommand func(name string, args ...string) Executor
func Executor_Start_Invalid(t *testing.T, command buildExecCommand) {
invalid := "/bin/foobar"
e := command(invalid, "1")
if err := e.Limit(constraint); err != nil {
log.Panicf("Limit() failed: %v", err)
}
task, alloc := mockAllocDir(t)
defer alloc.Destroy()
if err := e.ConfigureTaskDir(task, alloc); err != nil {
log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err)
}
if err := e.Start(); err == nil {
log.Panicf("Start(%v) should have failed", invalid)
}
}
func Executor_Start_Wait_Failure_Code(t *testing.T, command buildExecCommand) {
e := command(testtask.Path(), "fail")
if err := e.Limit(constraint); err != nil {
log.Panicf("Limit() failed: %v", err)
}
task, alloc := mockAllocDir(t)
defer alloc.Destroy()
if err := e.ConfigureTaskDir(task, alloc); err != nil {
log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err)
}
if err := e.Start(); err != nil {
log.Panicf("Start() failed: %v", err)
}
if err := e.Wait(); err == nil {
log.Panicf("Wait() should have failed")
}
}
func Executor_Start_Wait(t *testing.T, command buildExecCommand) {
task, alloc := mockAllocDir(t)
defer alloc.Destroy()
taskDir, ok := alloc.TaskDirs[task]
if !ok {
log.Panicf("No task directory found for task %v", task)
}
expected := "hello world"
file := filepath.Join(allocdir.TaskLocal, "output.txt")
absFilePath := filepath.Join(taskDir, file)
e := command(testtask.Path(), "sleep", "1s", "write", expected, file)
if err := e.Limit(constraint); err != nil {
log.Panicf("Limit() failed: %v", err)
}
if err := e.ConfigureTaskDir(task, alloc); err != nil {
log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err)
}
if err := e.Start(); err != nil {
log.Panicf("Start() failed: %v", err)
}
if res := e.Wait(); !res.Successful() {
log.Panicf("Wait() failed: %v", res)
}
output, err := ioutil.ReadFile(absFilePath)
if err != nil {
log.Panicf("Couldn't read file %v", absFilePath)
}
act := string(output)
if act != expected {
log.Panicf("Command output incorrectly: want %v; got %v", expected, act)
}
}
func Executor_Start_Kill(t *testing.T, command buildExecCommand) {
task, alloc := mockAllocDir(t)
defer alloc.Destroy()
taskDir, ok := alloc.TaskDirs[task]
if !ok {
log.Panicf("No task directory found for task %v", task)
}
filePath := filepath.Join(taskDir, "output")
e := command(testtask.Path(), "sleep", "1s", "write", "failure", filePath)
if err := e.Limit(constraint); err != nil {
log.Panicf("Limit() failed: %v", err)
}
if err := e.ConfigureTaskDir(task, alloc); err != nil {
log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err)
}
if err := e.Start(); err != nil {
log.Panicf("Start() failed: %v", err)
}
if err := e.Shutdown(); err != nil {
log.Panicf("Shutdown() failed: %v", err)
}
time.Sleep(time.Duration(testutil.TestMultiplier()*2) * time.Second)
// Check that the file doesn't exist.
if _, err := os.Stat(filePath); err == nil {
log.Panicf("Stat(%v) should have failed: task not killed", filePath)
}
}
func Executor_Open(t *testing.T, command buildExecCommand, newExecutor func(*ExecutorContext) Executor) {
task, alloc := mockAllocDir(t)
defer alloc.Destroy()
taskDir, ok := alloc.TaskDirs[task]
if !ok {
log.Panicf("No task directory found for task %v", task)
}
expected := "hello world"
file := filepath.Join(allocdir.TaskLocal, "output.txt")
absFilePath := filepath.Join(taskDir, file)
e := command(testtask.Path(), "sleep", "1s", "write", expected, file)
if err := e.Limit(constraint); err != nil {
log.Panicf("Limit() failed: %v", err)
}
if err := e.ConfigureTaskDir(task, alloc); err != nil {
log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err)
}
if err := e.Start(); err != nil {
log.Panicf("Start() failed: %v", err)
}
id, err := e.ID()
if err != nil {
log.Panicf("ID() failed: %v", err)
}
e2 := newExecutor(testExecutorContext())
if err := e2.Open(id); err != nil {
log.Panicf("Open(%v) failed: %v", id, err)
}
if res := e2.Wait(); !res.Successful() {
log.Panicf("Wait() failed: %v", res)
}
output, err := ioutil.ReadFile(absFilePath)
if err != nil {
log.Panicf("Couldn't read file %v", absFilePath)
}
act := string(output)
if act != expected {
log.Panicf("Command output incorrectly: want %v; got %v", expected, act)
}
}
func Executor_Open_Invalid(t *testing.T, command buildExecCommand, newExecutor func(*ExecutorContext) Executor) {
task, alloc := mockAllocDir(t)
e := command(testtask.Path(), "echo", "foo")
if err := e.Limit(constraint); err != nil {
log.Panicf("Limit() failed: %v", err)
}
if err := e.ConfigureTaskDir(task, alloc); err != nil {
log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err)
}
if err := e.Start(); err != nil {
log.Panicf("Start() failed: %v", err)
}
id, err := e.ID()
if err != nil {
log.Panicf("ID() failed: %v", err)
}
// Kill the task because some OSes (windows) will not let us destroy the
// alloc (below) if the task is still running.
if err := e.ForceStop(); err != nil {
log.Panicf("e.ForceStop() failed: %v", err)
}
// Wait until process is actually gone, we don't care what the result was.
e.Wait()
// Destroy the allocdir which removes the exit code.
if err := alloc.Destroy(); err != nil {
log.Panicf("alloc.Destroy() failed: %v", err)
}
e2 := newExecutor(testExecutorContext())
if err := e2.Open(id); err == nil {
log.Panicf("Open(%v) should have failed", id)
}
}

View File

@@ -0,0 +1,121 @@
package driver
import (
"io"
"log"
"net"
"net/rpc"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver/executor"
)
var HandshakeConfig = plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE",
MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255",
}
func GetPluginMap(w io.Writer) map[string]plugin.Plugin {
p := new(ExecutorPlugin)
p.logger = log.New(w, "", log.LstdFlags)
return map[string]plugin.Plugin{"executor": p}
}
// ExecutorReattachConfig is the config that we seralize and de-serialize and
// store in disk
type ExecutorReattachConfig struct {
Pid int
AddrNet string
AddrName string
}
// PluginConfig returns a config from an ExecutorReattachConfig
func (c *ExecutorReattachConfig) PluginConfig() *plugin.ReattachConfig {
var addr net.Addr
switch c.AddrNet {
case "unix", "unixgram", "unixpacket":
addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName)
case "tcp", "tcp4", "tcp6":
addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName)
}
return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr}
}
func NewExecutorReattachConfig(c *plugin.ReattachConfig) *ExecutorReattachConfig {
return &ExecutorReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()}
}
type ExecutorRPC struct {
client *rpc.Client
}
// LaunchCmdArgs wraps a user command and the args for the purposes of RPC
type LaunchCmdArgs struct {
Cmd *executor.ExecCommand
Ctx *executor.ExecutorContext
}
func (e *ExecutorRPC) LaunchCmd(cmd *executor.ExecCommand, ctx *executor.ExecutorContext) (*executor.ProcessState, error) {
var ps *executor.ProcessState
err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd, Ctx: ctx}, &ps)
return ps, err
}
func (e *ExecutorRPC) Wait() (*executor.ProcessState, error) {
var ps executor.ProcessState
err := e.client.Call("Plugin.Wait", new(interface{}), &ps)
return &ps, err
}
func (e *ExecutorRPC) ShutDown() error {
return e.client.Call("Plugin.ShutDown", new(interface{}), new(interface{}))
}
func (e *ExecutorRPC) Exit() error {
return e.client.Call("Plugin.Exit", new(interface{}), new(interface{}))
}
type ExecutorRPCServer struct {
Impl executor.Executor
}
func (e *ExecutorRPCServer) LaunchCmd(args LaunchCmdArgs, ps *executor.ProcessState) error {
state, err := e.Impl.LaunchCmd(args.Cmd, args.Ctx)
if state != nil {
*ps = *state
}
return err
}
func (e *ExecutorRPCServer) Wait(args interface{}, ps *executor.ProcessState) error {
state, err := e.Impl.Wait()
if state != nil {
*ps = *state
}
return err
}
func (e *ExecutorRPCServer) ShutDown(args interface{}, resp *interface{}) error {
return e.Impl.ShutDown()
}
func (e *ExecutorRPCServer) Exit(args interface{}, resp *interface{}) error {
return e.Impl.Exit()
}
type ExecutorPlugin struct {
logger *log.Logger
Impl *ExecutorRPCServer
}
func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
if p.Impl == nil {
p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutor(p.logger)}
}
return p.Impl, nil
}
func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &ExecutorRPC{client: c}, nil
}

View File

@@ -12,13 +12,18 @@ import (
"syscall"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
// JavaDriver is a simple driver to execute applications packaged in Jars.
@@ -37,7 +42,13 @@ type JavaDriverConfig struct {
// javaHandle is returned from Start/Open as a handle to the PID
type javaHandle struct {
cmd executor.Executor
pluginClient *plugin.Client
userPid int
executor executor.Executor
isolationConfig *executor.IsolationConfig
taskDir string
allocDir *allocdir.AllocDir
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
@@ -137,33 +148,48 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
args = append(args, driverConfig.Args...)
}
// Setup the command
// Assumes Java is in the $PATH, but could probably be detected
execCtx := executor.NewExecutorContext(d.taskEnv)
cmd := executor.Command(execCtx, "java", args...)
// Populate environment variables
cmd.Command().Env = d.taskEnv.EnvList()
if err := cmd.Limit(task.Resources); err != nil {
return nil, fmt.Errorf("failed to constrain resources: %s", err)
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil {
return nil, fmt.Errorf("failed to configure task directory: %v", err)
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "executor", pluginLogFile),
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start source: %v", err)
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
FSIsolation: true,
ResourceLimits: true,
UnprivilegedUser: true,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "java", Args: args}, executorCtx)
if err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("error starting process via the plugin: %v", err)
}
d.logger.Printf("[DEBUG] driver.java: started process with pid: %v", ps.Pid)
// Return a driver handle
h := &javaHandle{
cmd: cmd,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
isolationConfig: ps.IsolationConfig,
taskDir: taskDir,
allocDir: ctx.AllocDir,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
@@ -171,8 +197,12 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}
type javaId struct {
ExecutorId string
KillTimeout time.Duration
KillTimeout time.Duration
PluginConfig *ExecutorReattachConfig
IsolationConfig *executor.IsolationConfig
TaskDir string
AllocDir *allocdir.AllocDir
UserPid int
}
func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@@ -181,20 +211,41 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
}
// Find the process
execCtx := executor.NewExecutorContext(d.taskEnv)
cmd, err := executor.OpenId(execCtx, id.ExecutorId)
pluginConfig := &plugin.ClientConfig{
Reattach: id.PluginConfig.PluginConfig(),
}
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err)
merrs := new(multierror.Error)
merrs.Errors = append(merrs.Errors, err)
d.logger.Println("[ERROR] driver.java: error connecting to plugin so destroying plugin pid and user pid")
if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("error destroying plugin and userpid: %v", e))
}
if id.IsolationConfig != nil {
if e := executor.DestroyCgroup(id.IsolationConfig.Cgroup); e != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("destroying cgroup failed: %v", e))
}
}
if e := ctx.AllocDir.UnmountAll(); e != nil {
merrs.Errors = append(merrs.Errors, e)
}
return nil, fmt.Errorf("error connecting to plugin: %v", merrs.ErrorOrNil())
}
// Return a driver handle
h := &javaHandle{
cmd: cmd,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: pluginClient,
executor: exec,
userPid: id.UserPid,
isolationConfig: id.IsolationConfig,
taskDir: id.TaskDir,
allocDir: id.AllocDir,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
@@ -202,10 +253,13 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
}
func (h *javaHandle) ID() string {
executorId, _ := h.cmd.ID()
id := javaId{
ExecutorId: executorId,
KillTimeout: h.killTimeout,
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
TaskDir: h.taskDir,
AllocDir: h.allocDir,
IsolationConfig: h.isolationConfig,
}
data, err := json.Marshal(id)
@@ -228,18 +282,33 @@ func (h *javaHandle) Update(task *structs.Task) error {
}
func (h *javaHandle) Kill() error {
h.cmd.Shutdown()
h.executor.ShutDown()
select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
return h.cmd.ForceStop()
return h.executor.Exit()
}
}
func (h *javaHandle) run() {
res := h.cmd.Wait()
ps, err := h.executor.Wait()
close(h.doneCh)
h.waitCh <- res
if ps.ExitCode == 0 && err != nil {
if h.isolationConfig != nil {
if e := executor.DestroyCgroup(h.isolationConfig.Cgroup); e != nil {
h.logger.Printf("[ERROR] driver.java: destroying cgroup failed while killing cgroup: %v", e)
}
} else {
if e := killProcess(h.userPid); e != nil {
h.logger.Printf("[ERROR] driver.java: error killing user process: %v", e)
}
}
if e := h.allocDir.UnmountAll(); e != nil {
h.logger.Printf("[ERROR] driver.java: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err}
close(h.waitCh)
h.pluginClient.Kill()
}

View File

@@ -87,6 +87,7 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) {
// There is a race condition between the handle waiting and killing. One
// will return an error.
handle.Kill()
handle2.Kill()
}
func TestJavaDriver_Start_Wait(t *testing.T) {

View File

@@ -11,11 +11,14 @@ import (
"strings"
"time"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
@@ -41,11 +44,14 @@ type QemuDriverConfig struct {
// qemuHandle is returned from Start/Open as a handle to the PID
type qemuHandle struct {
cmd executor.Executor
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
pluginClient *plugin.Client
userPid int
executor executor.Executor
allocDir *allocdir.AllocDir
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}
// NewQemuDriver is used to create a new exec driver
@@ -182,30 +188,44 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
)
}
// Setup the command
execCtx := executor.NewExecutorContext(d.taskEnv)
cmd := executor.Command(execCtx, args[0], args[1:]...)
if err := cmd.Limit(task.Resources); err != nil {
return nil, fmt.Errorf("failed to constrain resources: %s", err)
}
if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil {
return nil, fmt.Errorf("failed to configure task directory: %v", err)
}
d.logger.Printf("[DEBUG] Starting QemuVM command: %q", strings.Join(args, " "))
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start command: %v", err)
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "executor", pluginLogFile),
}
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: args[0], Args: args[1:]}, executorCtx)
if err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("error starting process via the plugin: %v", err)
}
d.logger.Printf("[INFO] Started new QemuVM: %s", vmID)
// Create and Return Handle
h := &qemuHandle{
cmd: cmd,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
allocDir: ctx.AllocDir,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
@@ -213,8 +233,10 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}
type qemuId struct {
ExecutorId string
KillTimeout time.Duration
KillTimeout time.Duration
UserPid int
PluginConfig *ExecutorReattachConfig
AllocDir *allocdir.AllocDir
}
func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@@ -223,30 +245,40 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
}
// Find the process
execCtx := executor.NewExecutorContext(d.taskEnv)
cmd, err := executor.OpenId(execCtx, id.ExecutorId)
pluginConfig := &plugin.ClientConfig{
Reattach: id.PluginConfig.PluginConfig(),
}
executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err)
d.logger.Println("[ERROR] driver.qemu: error connecting to plugin so destroying plugin pid and user pid")
if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil {
d.logger.Printf("[ERROR] driver.qemu: error destroying plugin and userpid: %v", e)
}
return nil, fmt.Errorf("error connecting to plugin: %v", err)
}
// Return a driver handle
h := &execHandle{
cmd: cmd,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
h := &qemuHandle{
pluginClient: pluginClient,
executor: executor,
userPid: id.UserPid,
allocDir: id.AllocDir,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
}
func (h *qemuHandle) ID() string {
executorId, _ := h.cmd.ID()
id := qemuId{
ExecutorId: executorId,
KillTimeout: h.killTimeout,
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
}
data, err := json.Marshal(id)
@@ -271,18 +303,27 @@ func (h *qemuHandle) Update(task *structs.Task) error {
// TODO: allow a 'shutdown_command' that can be executed over a ssh connection
// to the VM
func (h *qemuHandle) Kill() error {
h.cmd.Shutdown()
h.executor.ShutDown()
select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
return h.cmd.ForceStop()
return h.executor.Exit()
}
}
func (h *qemuHandle) run() {
res := h.cmd.Wait()
ps, err := h.executor.Wait()
if ps.ExitCode == 0 && err != nil {
if e := killProcess(h.userPid); e != nil {
h.logger.Printf("[ERROR] driver.qemu: error killing user process: %v", e)
}
if e := h.allocDir.UnmountAll(); e != nil {
h.logger.Printf("[ERROR] driver.qemu: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
close(h.doneCh)
h.waitCh <- res
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err}
close(h.waitCh)
h.pluginClient.Kill()
}

View File

@@ -4,13 +4,18 @@ import (
"encoding/json"
"fmt"
"log"
"os/exec"
"path/filepath"
"time"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
@@ -30,11 +35,14 @@ type RawExecDriver struct {
// rawExecHandle is returned from Start/Open as a handle to the PID
type rawExecHandle struct {
cmd executor.Executor
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
pluginClient *plugin.Client
userPid int
executor executor.Executor
killTimeout time.Duration
allocDir *allocdir.AllocDir
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}
// NewRawExecDriver is used to create a new raw exec driver
@@ -88,40 +96,52 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
}
}
// Setup the command
execCtx := executor.NewExecutorContext(d.taskEnv)
cmd := executor.NewBasicExecutor(execCtx)
executor.SetCommand(cmd, command, driverConfig.Args)
if err := cmd.Limit(task.Resources); err != nil {
return nil, fmt.Errorf("failed to constrain resources: %s", err)
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "executor", pluginLogFile),
}
// Populate environment variables
cmd.Command().Env = d.taskEnv.EnvList()
if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil {
return nil, fmt.Errorf("failed to configure task directory: %v", err)
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start command: %v", err)
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx)
if err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("error starting process via the plugin: %v", err)
}
d.logger.Printf("[DEBUG] driver.raw_exec: started process with pid: %v", ps.Pid)
// Return a driver handle
h := &execHandle{
cmd: cmd,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
h := &rawExecHandle{
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
killTimeout: d.DriverContext.KillTimeout(task),
allocDir: ctx.AllocDir,
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
}
type rawExecId struct {
ExecutorId string
KillTimeout time.Duration
KillTimeout time.Duration
UserPid int
PluginConfig *ExecutorReattachConfig
AllocDir *allocdir.AllocDir
}
func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@@ -130,30 +150,39 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
}
// Find the process
execCtx := executor.NewExecutorContext(d.taskEnv)
cmd := executor.NewBasicExecutor(execCtx)
if err := cmd.Open(id.ExecutorId); err != nil {
return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err)
pluginConfig := &plugin.ClientConfig{
Reattach: id.PluginConfig.PluginConfig(),
}
executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
d.logger.Println("[ERROR] driver.raw_exec: error connecting to plugin so destroying plugin pid and user pid")
if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil {
d.logger.Printf("[ERROR] driver.raw_exec: error destroying plugin and userpid: %v", e)
}
return nil, fmt.Errorf("error connecting to plugin: %v", err)
}
// Return a driver handle
h := &execHandle{
cmd: cmd,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
h := &rawExecHandle{
pluginClient: pluginClient,
executor: executor,
userPid: id.UserPid,
logger: d.logger,
killTimeout: id.KillTimeout,
allocDir: id.AllocDir,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
}
func (h *rawExecHandle) ID() string {
executorId, _ := h.cmd.ID()
id := rawExecId{
ExecutorId: executorId,
KillTimeout: h.killTimeout,
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
}
data, err := json.Marshal(id)
@@ -176,18 +205,27 @@ func (h *rawExecHandle) Update(task *structs.Task) error {
}
func (h *rawExecHandle) Kill() error {
h.cmd.Shutdown()
h.executor.ShutDown()
select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
return h.cmd.ForceStop()
return h.executor.Exit()
}
}
func (h *rawExecHandle) run() {
res := h.cmd.Wait()
ps, err := h.executor.Wait()
close(h.doneCh)
h.waitCh <- res
if ps.ExitCode == 0 && err != nil {
if e := killProcess(h.userPid); e != nil {
h.logger.Printf("[ERROR] driver.raw_exec: error killing user process: %v", e)
}
if e := h.allocDir.UnmountAll(); e != nil {
h.logger.Printf("[ERROR] driver.raw_exec: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err}
close(h.waitCh)
h.pluginClient.Kill()
}

View File

@@ -1,10 +1,12 @@
package driver
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"testing"
@@ -91,6 +93,8 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) {
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
t.Fatalf("timeout")
}
handle.Kill()
handle2.Kill()
}
func TestRawExecDriver_Start_Artifact_basic(t *testing.T) {
@@ -285,7 +289,7 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) {
Name: "sleep",
Config: map[string]interface{}{
"command": testtask.Path(),
"args": []string{"sleep", "15s"},
"args": []string{"sleep", "45s"},
},
Resources: basicResources,
}

View File

@@ -1,308 +0,0 @@
package spawn
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/command"
"github.com/hashicorp/nomad/helper/discover"
)
// Spawner is used to start a user command in an isolated fashion that is
// resistent to Nomad agent failure.
type Spawner struct {
spawn *os.Process
SpawnPid int
SpawnPpid int
StateFile string
UserPid int
// User configuration
UserCmd *exec.Cmd
Logs *Logs
Chroot string
}
// Logs is used to define the filepaths the user command's logs should be
// redirected to. The files do not need to exist.
type Logs struct {
Stdin, Stdout, Stderr string
}
// NewSpawner takes a path to a state file. This state file can be used to
// create a new Spawner that can be used to wait on the exit status of a
// process even through Nomad restarts.
func NewSpawner(stateFile string) *Spawner {
return &Spawner{StateFile: stateFile}
}
// SetCommand sets the user command to spawn.
func (s *Spawner) SetCommand(cmd *exec.Cmd) {
s.UserCmd = cmd
}
// SetLogs sets the redirection of user command log files.
func (s *Spawner) SetLogs(l *Logs) {
s.Logs = l
}
// SetChroot puts the user command into a chroot.
func (s *Spawner) SetChroot(root string) {
s.Chroot = root
}
// Spawn does a double-fork to start and isolate the user command. It takes a
// call-back that is invoked with the pid of the intermediary process. If the
// call back returns an error, the user command is not started and the spawn is
// cancelled. This can be used to put the process into a cgroup or jail and
// cancel starting the user process if that was not successful. An error is
// returned if the call-back returns an error or the user-command couldn't be
// started.
func (s *Spawner) Spawn(cb func(pid int) error) error {
bin, err := discover.NomadExecutable()
if err != nil {
return fmt.Errorf("Failed to determine the nomad executable: %v", err)
}
exitFile, err := os.OpenFile(s.StateFile, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return fmt.Errorf("Error opening file to store exit status: %v", err)
}
defer exitFile.Close()
config, err := s.spawnConfig()
if err != nil {
return err
}
spawn := exec.Command(bin, "spawn-daemon", config)
// Capture stdout
spawnStdout, err := spawn.StdoutPipe()
if err != nil {
return fmt.Errorf("Failed to capture spawn-daemon stdout: %v", err)
}
defer spawnStdout.Close()
// Capture stdin.
spawnStdin, err := spawn.StdinPipe()
if err != nil {
return fmt.Errorf("Failed to capture spawn-daemon stdin: %v", err)
}
defer spawnStdin.Close()
if err := spawn.Start(); err != nil {
return fmt.Errorf("Failed to call spawn-daemon on nomad executable: %v", err)
}
if cb != nil {
cbErr := cb(spawn.Process.Pid)
if cbErr != nil {
errs := new(multierror.Error)
errs = multierror.Append(errs, cbErr)
if err := s.sendAbortCommand(spawnStdin); err != nil {
errs = multierror.Append(errs, err)
}
return errs
}
}
if err := s.sendStartCommand(spawnStdin); err != nil {
return err
}
respCh := make(chan command.SpawnStartStatus, 1)
errCh := make(chan error, 1)
go func() {
var resp command.SpawnStartStatus
dec := json.NewDecoder(spawnStdout)
if err := dec.Decode(&resp); err != nil {
errCh <- fmt.Errorf("Failed to parse spawn-daemon start response: %v", err)
}
respCh <- resp
}()
select {
case err := <-errCh:
return err
case resp := <-respCh:
if resp.ErrorMsg != "" {
return fmt.Errorf("Failed to execute user command: %s", resp.ErrorMsg)
}
s.UserPid = resp.UserPID
case <-time.After(5 * time.Second):
return fmt.Errorf("timed out waiting for response")
}
// Store the spawn process.
s.spawn = spawn.Process
s.SpawnPid = s.spawn.Pid
s.SpawnPpid = os.Getpid()
return nil
}
// spawnConfig returns a serialized config to pass to the Nomad spawn-daemon
// command.
func (s *Spawner) spawnConfig() (string, error) {
if s.UserCmd == nil {
return "", fmt.Errorf("Must specify user command")
}
config := command.DaemonConfig{
Cmd: *s.UserCmd,
Chroot: s.Chroot,
ExitStatusFile: s.StateFile,
}
if s.Logs != nil {
config.StdoutFile = s.Logs.Stdout
config.StdinFile = s.Logs.Stdin
config.StderrFile = s.Logs.Stderr
}
var buffer bytes.Buffer
enc := json.NewEncoder(&buffer)
if err := enc.Encode(config); err != nil {
return "", fmt.Errorf("Failed to serialize configuration: %v", err)
}
return strconv.Quote(buffer.String()), nil
}
// sendStartCommand sends the necessary command to the spawn-daemon to have it
// start the user process.
func (s *Spawner) sendStartCommand(w io.Writer) error {
enc := json.NewEncoder(w)
if err := enc.Encode(true); err != nil {
return fmt.Errorf("Failed to serialize start command: %v", err)
}
return nil
}
// sendAbortCommand sends the necessary command to the spawn-daemon to have it
// abort starting the user process. This should be invoked if the spawn-daemon
// could not be isolated into a cgroup.
func (s *Spawner) sendAbortCommand(w io.Writer) error {
enc := json.NewEncoder(w)
if err := enc.Encode(false); err != nil {
return fmt.Errorf("Failed to serialize abort command: %v", err)
}
return nil
}
// Wait returns the exit code of the user process or an error if the wait
// failed.
func (s *Spawner) Wait() *structs.WaitResult {
if os.Getpid() == s.SpawnPpid {
return s.waitAsParent()
}
return s.pollWait()
}
// waitAsParent waits on the process if the current process was the spawner.
func (s *Spawner) waitAsParent() *structs.WaitResult {
if s.SpawnPpid != os.Getpid() {
return structs.NewWaitResult(-1, 0, fmt.Errorf("not the parent. Spawner parent is %v; current pid is %v", s.SpawnPpid, os.Getpid()))
}
// Try to reattach to the spawn.
if s.spawn == nil {
// If it can't be reattached, it means the spawn process has exited so
// we should just read its exit file.
var err error
if s.spawn, err = os.FindProcess(s.SpawnPid); err != nil {
return s.pollWait()
}
}
if _, err := s.spawn.Wait(); err != nil {
return structs.NewWaitResult(-1, 0, err)
}
return s.pollWait()
}
// pollWait polls on the spawn daemon to determine when it exits. After it
// exits, it reads the state file and returns the exit code and possibly an
// error.
func (s *Spawner) pollWait() *structs.WaitResult {
// Stat to check if it is there to avoid a race condition.
stat, err := os.Stat(s.StateFile)
if err != nil {
return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to Stat exit status file %v: %v", s.StateFile, err))
}
// If there is data it means that the file has already been written.
if stat.Size() > 0 {
return s.readExitCode()
}
// Read after the process exits.
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if !s.Alive() {
break
}
}
return s.readExitCode()
}
// readExitCode parses the state file and returns the exit code of the task. It
// returns an error if the file can't be read.
func (s *Spawner) readExitCode() *structs.WaitResult {
f, err := os.Open(s.StateFile)
if err != nil {
return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to open %v to read exit code: %v", s.StateFile, err))
}
defer f.Close()
stat, err := f.Stat()
if err != nil {
return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to stat file %v: %v", s.StateFile, err))
}
if stat.Size() == 0 {
return structs.NewWaitResult(-1, 0, fmt.Errorf("Empty state file: %v", s.StateFile))
}
var exitStatus command.SpawnExitStatus
dec := json.NewDecoder(f)
if err := dec.Decode(&exitStatus); err != nil {
return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to parse exit status from %v: %v", s.StateFile, err))
}
return structs.NewWaitResult(exitStatus.ExitCode, 0, nil)
}
// Valid checks that the state of the Spawner is valid and that a subsequent
// Wait could be called. This is useful to call when reopening a Spawner
// through client restarts. If Valid a nil error is returned.
func (s *Spawner) Valid() error {
// If the spawner is still alive, then the task is running and we can wait
// on it.
if s.Alive() {
return nil
}
// The task isn't alive so check that there is a valid exit code file.
if res := s.readExitCode(); res.Err == nil {
return nil
}
return fmt.Errorf("Spawner not alive and exit code not written")
}

View File

@@ -1,20 +0,0 @@
// +build !windows
package spawn
import (
"os"
"syscall"
)
func (s *Spawner) Alive() bool {
if s.spawn == nil {
var err error
if s.spawn, err = os.FindProcess(s.SpawnPid); err != nil {
return false
}
}
err := s.spawn.Signal(syscall.Signal(0))
return err == nil
}

View File

@@ -1,335 +0,0 @@
package spawn
import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/helper/testtask"
)
func TestMain(m *testing.M) {
if !testtask.Run() {
os.Exit(m.Run())
}
}
func TestSpawn_NoCmd(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
if err := spawn.Spawn(nil); err == nil {
t.Fatalf("Spawn() with no user command should fail")
}
}
func TestSpawn_InvalidCmd(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
spawn.SetCommand(exec.Command("foo")) // non-existent command
if err := spawn.Spawn(nil); err == nil {
t.Fatalf("Spawn() with an invalid command should fail")
}
}
func TestSpawn_SetsLogs(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
exp := "foo"
spawn.SetCommand(testCommand("echo", exp))
// Create file for stdout.
stdout := tempFileName(t)
defer os.Remove(stdout)
spawn.SetLogs(&Logs{Stdout: stdout})
if err := spawn.Spawn(nil); err != nil {
t.Fatalf("Spawn() failed: %v", err)
}
if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil {
t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err)
}
stdout2, err := os.Open(stdout)
if err != nil {
t.Fatalf("Open() failed: %v", err)
}
data, err := ioutil.ReadAll(stdout2)
if err != nil {
t.Fatalf("ReadAll() failed: %v", err)
}
act := strings.TrimSpace(string(data))
if act != exp {
t.Fatalf("Unexpected data written to stdout; got %v; want %v", act, exp)
}
}
func TestSpawn_Callback(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
spawn.SetCommand(testCommand("sleep", "1s"))
called := false
cbErr := fmt.Errorf("ERROR CB")
cb := func(_ int) error {
called = true
return cbErr
}
if err := spawn.Spawn(cb); err == nil {
t.Fatalf("Spawn(%#v) should have errored; want %v", cb, cbErr)
}
if !called {
t.Fatalf("Spawn(%#v) didn't call callback", cb)
}
}
func TestSpawn_ParentWaitExited(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
spawn.SetCommand(testCommand("echo", "foo"))
if err := spawn.Spawn(nil); err != nil {
t.Fatalf("Spawn() failed %v", err)
}
time.Sleep(1 * time.Second)
if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil {
t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err)
}
}
func TestSpawn_ParentWait(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
spawn.SetCommand(testCommand("sleep", "2s"))
if err := spawn.Spawn(nil); err != nil {
t.Fatalf("Spawn() failed %v", err)
}
if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil {
t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err)
}
}
func TestSpawn_NonParentWaitExited(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
spawn.SetCommand(testCommand("echo", "foo"))
if err := spawn.Spawn(nil); err != nil {
t.Fatalf("Spawn() failed %v", err)
}
time.Sleep(1 * time.Second)
// Force the wait to assume non-parent.
spawn.SpawnPpid = 0
if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil {
t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err)
}
}
func TestSpawn_NonParentWait(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
spawn.SetCommand(testCommand("sleep", "2s"))
if err := spawn.Spawn(nil); err != nil {
t.Fatalf("Spawn() failed %v", err)
}
// Need to wait on the spawner, otherwise it becomes a zombie and the test
// only finishes after the init process cleans it. This speeds that up.
go func() {
time.Sleep(3 * time.Second)
if _, err := spawn.spawn.Wait(); err != nil {
t.FailNow()
}
}()
// Force the wait to assume non-parent.
spawn.SpawnPpid = 0
if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil {
t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err)
}
}
func TestSpawn_DeadSpawnDaemon_Parent(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
var spawnPid int
cb := func(pid int) error {
spawnPid = pid
return nil
}
spawn := NewSpawner(tempFile)
spawn.SetCommand(testCommand("sleep", "5s"))
if err := spawn.Spawn(cb); err != nil {
t.Fatalf("Spawn() errored: %v", err)
}
proc, err := os.FindProcess(spawnPid)
if err != nil {
t.FailNow()
}
if err := proc.Kill(); err != nil {
t.FailNow()
}
if _, err := proc.Wait(); err != nil {
t.FailNow()
}
if res := spawn.Wait(); res.Err == nil {
t.Fatalf("Wait() should have failed: %v", res.Err)
}
}
func TestSpawn_DeadSpawnDaemon_NonParent(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
var spawnPid int
cb := func(pid int) error {
spawnPid = pid
return nil
}
spawn := NewSpawner(tempFile)
spawn.SetCommand(testCommand("sleep", "2s"))
if err := spawn.Spawn(cb); err != nil {
t.Fatalf("Spawn() errored: %v", err)
}
proc, err := os.FindProcess(spawnPid)
if err != nil {
t.FailNow()
}
if err := proc.Kill(); err != nil {
t.FailNow()
}
if _, err := proc.Wait(); err != nil {
t.FailNow()
}
// Force the wait to assume non-parent.
spawn.SpawnPpid = 0
if res := spawn.Wait(); res.Err == nil {
t.Fatalf("Wait() should have failed: %v", res.Err)
}
}
func TestSpawn_Valid_TaskRunning(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
spawn.SetCommand(testCommand("sleep", "2s"))
if err := spawn.Spawn(nil); err != nil {
t.Fatalf("Spawn() failed %v", err)
}
if err := spawn.Valid(); err != nil {
t.Fatalf("Valid() failed: %v", err)
}
if res := spawn.Wait(); res.Err != nil {
t.Fatalf("Wait() failed: %v", res.Err)
}
}
func TestSpawn_Valid_TaskExit_ExitCode(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
spawn.SetCommand(testCommand("echo", "foo"))
if err := spawn.Spawn(nil); err != nil {
t.Fatalf("Spawn() failed %v", err)
}
if res := spawn.Wait(); res.Err != nil {
t.Fatalf("Wait() failed: %v", res.Err)
}
if err := spawn.Valid(); err != nil {
t.Fatalf("Valid() failed: %v", err)
}
}
func TestSpawn_Valid_TaskExit_NoExitCode(t *testing.T) {
t.Parallel()
tempFile := tempFileName(t)
defer os.Remove(tempFile)
spawn := NewSpawner(tempFile)
spawn.SetCommand(testCommand("echo", "foo"))
if err := spawn.Spawn(nil); err != nil {
t.Fatalf("Spawn() failed %v", err)
}
if res := spawn.Wait(); res.Err != nil {
t.Fatalf("Wait() failed: %v", res.Err)
}
// Delete the file so that it can't find the exit code.
os.Remove(tempFile)
if err := spawn.Valid(); err == nil {
t.Fatalf("Valid() should have failed")
}
}
func tempFileName(t *testing.T) string {
f, err := ioutil.TempFile("", "")
if err != nil {
t.Fatalf("TempFile() failed")
}
defer f.Close()
return f.Name()
}
func testCommand(args ...string) *exec.Cmd {
cmd := exec.Command(testtask.Path(), args...)
testtask.SetCmdEnv(cmd)
return cmd
}

View File

@@ -1,21 +0,0 @@
package spawn
import "syscall"
const STILL_ACTIVE = 259
func (s *Spawner) Alive() bool {
const da = syscall.STANDARD_RIGHTS_READ | syscall.PROCESS_QUERY_INFORMATION | syscall.SYNCHRONIZE
h, e := syscall.OpenProcess(da, false, uint32(s.SpawnPid))
if e != nil {
return false
}
var ec uint32
e = syscall.GetExitCodeProcess(h, &ec)
if e != nil {
return false
}
return ec == STILL_ACTIVE
}

63
client/driver/utils.go Normal file
View File

@@ -0,0 +1,63 @@
package driver
import (
"fmt"
"io"
"os"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
)
// createExecutor launches an executor plugin and returns an instance of the
// Executor interface
func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *config.Config) (executor.Executor, *plugin.Client, error) {
config.HandshakeConfig = HandshakeConfig
config.Plugins = GetPluginMap(w)
config.MaxPort = clientConfig.ClientMaxPort
config.MinPort = clientConfig.ClientMinPort
// setting the setsid of the plugin process so that it doesn't get signals sent to
// the nomad client.
if config.Cmd != nil {
isolateCommand(config.Cmd)
}
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err)
}
raw, err := rpcClient.Dispense("executor")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin := raw.(executor.Executor)
return executorPlugin, executorClient, nil
}
// killProcess kills a process with the given pid
func killProcess(pid int) error {
proc, err := os.FindProcess(pid)
if err != nil {
return err
}
return proc.Kill()
}
// destroyPlugin kills the plugin with the given pid and also kills the user
// process
func destroyPlugin(pluginPid int, userPid int) error {
var merr error
if err := killProcess(pluginPid); err != nil {
merr = multierror.Append(merr, err)
}
if err := killProcess(userPid); err != nil {
merr = multierror.Append(merr, err)
}
return merr
}

View File

@@ -0,0 +1,16 @@
package driver
import (
"os/exec"
"syscall"
)
// isolateCommand sets the setsid flag in exec.Cmd to true so that the process
// becomes the process leader in a new session and doesn't receive signals that
// are sent to the parent process.
func isolateCommand(cmd *exec.Cmd) {
if cmd.SysProcAttr == nil {
cmd.SysProcAttr = &syscall.SysProcAttr{}
}
cmd.SysProcAttr.Setsid = true
}

View File

@@ -0,0 +1,18 @@
// +build !linux
package driver
import (
"os/exec"
"syscall"
)
// isolateCommand sets the setsid flag in exec.Cmd to true so that the process
// becomes the process leader in a new session and doesn't receive signals that
// are sent to the parent process.
func isolateCommand(cmd *exec.Cmd) {
if cmd.SysProcAttr == nil {
cmd.SysProcAttr = &syscall.SysProcAttr{}
}
cmd.SysProcAttr.Setsid = true
}

View File

@@ -0,0 +1,9 @@
package driver
import (
"os/exec"
)
// TODO Figure out if this is needed in Wondows
func isolateCommand(cmd *exec.Cmd) {
}

View File

@@ -7,6 +7,7 @@ import (
"net"
"os"
"path/filepath"
"runtime"
"sync"
"time"
@@ -215,6 +216,8 @@ func (a *Agent) setupClient() error {
}
conf.MaxKillTimeout = dur
}
conf.ClientMaxPort = a.config.Client.ClientMaxPort
conf.ClientMinPort = a.config.Client.ClientMinPort
// Setup the node
conf.Node = new(structs.Node)
@@ -238,6 +241,29 @@ func (a *Agent) setupClient() error {
}
conf.Node.HTTPAddr = httpAddr
// Reserve some ports for the plugins
if runtime.GOOS == "windows" {
deviceName, err := a.findLoopbackDevice()
if err != nil {
return fmt.Errorf("error finding the device name for loopback: %v", err)
}
var nr *structs.NetworkResource
for _, n := range conf.Node.Reserved.Networks {
if n.Device == deviceName {
nr = n
}
}
if nr == nil {
nr = &structs.NetworkResource{
Device: deviceName,
ReservedPorts: make([]structs.Port, 0),
}
}
for i := conf.ClientMinPort; i <= conf.ClientMaxPort; i++ {
nr.ReservedPorts = append(nr.ReservedPorts, structs.Port{Label: fmt.Sprintf("plugin-%d", i), Value: int(i)})
}
}
// Create the client
client, err := client.NewClient(conf)
if err != nil {
@@ -247,6 +273,28 @@ func (a *Agent) setupClient() error {
return nil
}
func (a *Agent) findLoopbackDevice() (string, error) {
var ifcs []net.Interface
var err error
var deviceName string
ifcs, err = net.Interfaces()
if err != nil {
return "", err
}
for _, ifc := range ifcs {
addrs, err := ifc.Addrs()
if err != nil {
return deviceName, err
}
for _, addr := range addrs {
if net.ParseIP(addr.String()).IsLoopback() {
return ifc.Name, nil
}
}
}
return deviceName, err
}
// Leave is used gracefully exit. Clients will inform servers
// of their departure so that allocations can be rescheduled.
func (a *Agent) Leave() error {

View File

@@ -159,6 +159,14 @@ type ClientConfig struct {
// MaxKillTimeout allows capping the user-specifiable KillTimeout.
MaxKillTimeout string `hcl:"max_kill_timeout"`
// ClientMaxPort is the upper range of the ports that the client uses for
// communicating with plugin subsystems
ClientMaxPort uint `hcl:"client_max_port"`
// ClientMinPort is the lower range of the ports that the client uses for
// communicating with plugin subsystems
ClientMinPort uint `hcl:"client_min_port"`
}
// ServerConfig is configuration specific to the server mode
@@ -288,6 +296,8 @@ func DefaultConfig() *Config {
Enabled: false,
NetworkSpeed: 100,
MaxKillTimeout: "30s",
ClientMinPort: 14000,
ClientMaxPort: 19000,
},
Server: &ServerConfig{
Enabled: false,
@@ -505,6 +515,12 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
if b.MaxKillTimeout != "" {
result.MaxKillTimeout = b.MaxKillTimeout
}
if b.ClientMaxPort != 0 {
result.ClientMaxPort = b.ClientMaxPort
}
if b.ClientMinPort != 0 {
result.ClientMinPort = b.ClientMinPort
}
// Add the servers
result.Servers = append(result.Servers, b.Servers...)

View File

@@ -104,6 +104,8 @@ func TestConfig_Merge(t *testing.T) {
"foo": "bar",
"baz": "zip",
},
ClientMaxPort: 20000,
ClientMinPort: 22000,
NetworkSpeed: 105,
MaxKillTimeout: "50s",
},

View File

@@ -0,0 +1,43 @@
package command
import (
"os"
"strings"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver"
)
type ExecutorPluginCommand struct {
Meta
}
func (e *ExecutorPluginCommand) Help() string {
helpText := `
This is a command used by Nomad internally to launch an executor plugin"
`
return strings.TrimSpace(helpText)
}
func (e *ExecutorPluginCommand) Synopsis() string {
return "internal - launch an executor plugin"
}
func (e *ExecutorPluginCommand) Run(args []string) int {
if len(args) == 0 {
e.Ui.Error("log output file isn't provided")
return 1
}
logFileName := args[0]
stdo, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
e.Ui.Error(err.Error())
return 1
}
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: driver.HandshakeConfig,
Plugins: driver.GetPluginMap(stdo),
})
return 0
}

View File

@@ -1,234 +0,0 @@
package command
import (
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"syscall"
)
type SpawnDaemonCommand struct {
Meta
config *DaemonConfig
exitFile io.WriteCloser
}
func (c *SpawnDaemonCommand) Help() string {
helpText := `
Usage: nomad spawn-daemon [options] <daemon_config>
INTERNAL ONLY
Spawns a daemon process by double forking. The required daemon_config is a
json encoding of the DaemonConfig struct containing the isolation
configuration and command to run. SpawnStartStatus is json serialized to
stdout upon running the user command or if any error prevents its execution.
If there is no error, the process waits on the users command. Once the user
command exits, the exit code is written to a file specified in the
daemon_config and this process exits with the same exit status as the user
command.
`
return strings.TrimSpace(helpText)
}
func (c *SpawnDaemonCommand) Synopsis() string {
return "Spawn a daemon command with configurable isolation."
}
// Status of executing the user's command.
type SpawnStartStatus struct {
// The PID of the user's command.
UserPID int
// ErrorMsg will be empty if the user command was started successfully.
// Otherwise it will have an error message.
ErrorMsg string
}
// Exit status of the user's command.
type SpawnExitStatus struct {
// The exit code of the user's command.
ExitCode int
}
// Configuration for the command to start as a daemon.
type DaemonConfig struct {
exec.Cmd
// The filepath to write the exit status to.
ExitStatusFile string
// The paths, if not /dev/null, must be either in the tasks root directory
// or in the shared alloc directory.
StdoutFile string
StdinFile string
StderrFile string
// An optional path specifying the directory to chroot the process in.
Chroot string
}
// Whether to start the user command or abort.
type TaskStart bool
// parseConfig reads the DaemonConfig from the passed arguments. If not
// successful, an error is returned.
func (c *SpawnDaemonCommand) parseConfig(args []string) (*DaemonConfig, error) {
flags := c.Meta.FlagSet("spawn-daemon", FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
if err := flags.Parse(args); err != nil {
return nil, fmt.Errorf("failed to parse args: %v", err)
}
// Check that we got json input.
args = flags.Args()
if len(args) != 1 {
return nil, fmt.Errorf("incorrect number of args; got %v; want 1", len(args))
}
jsonInput, err := strconv.Unquote(args[0])
if err != nil {
return nil, fmt.Errorf("Failed to unquote json input: %v", err)
}
// De-serialize the passed command.
var config DaemonConfig
dec := json.NewDecoder(strings.NewReader(jsonInput))
if err := dec.Decode(&config); err != nil {
return nil, err
}
return &config, nil
}
// configureLogs creates the log files and redirects the process
// stdin/stderr/stdout to them. If unsuccessful, an error is returned.
func (c *SpawnDaemonCommand) configureLogs() error {
if len(c.config.StdoutFile) != 0 {
stdo, err := os.OpenFile(c.config.StdoutFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("Error opening file to redirect stdout: %v", err)
}
c.config.Cmd.Stdout = stdo
}
if len(c.config.StderrFile) != 0 {
stde, err := os.OpenFile(c.config.StderrFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("Error opening file to redirect stderr: %v", err)
}
c.config.Cmd.Stderr = stde
}
if len(c.config.StdinFile) != 0 {
stdi, err := os.OpenFile(c.config.StdinFile, os.O_CREATE|os.O_RDONLY, 0666)
if err != nil {
return fmt.Errorf("Error opening file to redirect stdin: %v", err)
}
c.config.Cmd.Stdin = stdi
}
return nil
}
func (c *SpawnDaemonCommand) Run(args []string) int {
var err error
c.config, err = c.parseConfig(args)
if err != nil {
return c.outputStartStatus(err, 1)
}
// Open the file we will be using to write exit codes to. We do this early
// to ensure that we don't start the user process when we can't capture its
// exit status.
c.exitFile, err = os.OpenFile(c.config.ExitStatusFile, os.O_WRONLY, 0666)
if err != nil {
return c.outputStartStatus(fmt.Errorf("Error opening file to store exit status: %v", err), 1)
}
// Isolate the user process.
if err := c.isolateCmd(); err != nil {
return c.outputStartStatus(err, 1)
}
// Redirect logs.
if err := c.configureLogs(); err != nil {
return c.outputStartStatus(err, 1)
}
// Chroot jail the process and set its working directory.
c.configureChroot()
// Wait to get the start command.
var start TaskStart
dec := json.NewDecoder(os.Stdin)
if err := dec.Decode(&start); err != nil {
return c.outputStartStatus(err, 1)
}
// Aborted by Nomad process.
if !start {
return 0
}
// Spawn the user process.
if err := c.config.Cmd.Start(); err != nil {
return c.outputStartStatus(fmt.Errorf("Error starting user command: %v", err), 1)
}
// Indicate that the command was started successfully.
c.outputStartStatus(nil, 0)
// Wait and then output the exit status.
return c.writeExitStatus(c.config.Cmd.Wait())
}
// outputStartStatus is a helper function that outputs a SpawnStartStatus to
// Stdout with the passed error, which may be nil to indicate no error. It
// returns the passed status.
func (c *SpawnDaemonCommand) outputStartStatus(err error, status int) int {
startStatus := &SpawnStartStatus{}
enc := json.NewEncoder(os.Stdout)
if err != nil {
startStatus.ErrorMsg = err.Error()
}
if c.config != nil && c.config.Cmd.Process != nil {
startStatus.UserPID = c.config.Process.Pid
}
enc.Encode(startStatus)
return status
}
// writeExitStatus takes in the error result from calling wait and writes out
// the exit status to a file. It returns the same exit status as the user
// command.
func (c *SpawnDaemonCommand) writeExitStatus(exit error) int {
// Parse the exit code.
exitStatus := &SpawnExitStatus{}
if exit != nil {
// Default to exit code 1 if we can not get the actual exit code.
exitStatus.ExitCode = 1
if exiterr, ok := exit.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
exitStatus.ExitCode = status.ExitStatus()
}
}
}
if c.exitFile != nil {
enc := json.NewEncoder(c.exitFile)
enc.Encode(exitStatus)
c.exitFile.Close()
}
return exitStatus.ExitCode
}

View File

@@ -1,4 +0,0 @@
package command
// No chroot on darwin.
func (c *SpawnDaemonCommand) configureChroot() {}

View File

@@ -1,16 +0,0 @@
package command
import "syscall"
// configureChroot enters the user command into a chroot if specified in the
// config and on an OS that supports Chroots.
func (c *SpawnDaemonCommand) configureChroot() {
if len(c.config.Chroot) != 0 {
if c.config.Cmd.SysProcAttr == nil {
c.config.Cmd.SysProcAttr = &syscall.SysProcAttr{}
}
c.config.Cmd.SysProcAttr.Chroot = c.config.Chroot
c.config.Cmd.Dir = "/"
}
}

View File

@@ -1,48 +0,0 @@
package command
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os/exec"
"testing"
)
type nopCloser struct {
io.ReadWriter
}
func (n *nopCloser) Close() error {
return nil
}
func TestSpawnDaemon_WriteExitStatus(t *testing.T) {
// Check if there is python.
path, err := exec.LookPath("python")
if err != nil {
t.Skip("python not detected")
}
var b bytes.Buffer
daemon := &SpawnDaemonCommand{exitFile: &nopCloser{&b}}
code := 3
cmd := exec.Command(path, "./test-resources/exiter.py", fmt.Sprintf("%d", code))
err = cmd.Run()
actual := daemon.writeExitStatus(err)
if actual != code {
t.Fatalf("writeExitStatus(%v) returned %v; want %v", err, actual, code)
}
// De-serialize the passed command.
var exitStatus SpawnExitStatus
dec := json.NewDecoder(&b)
if err := dec.Decode(&exitStatus); err != nil {
t.Fatalf("failed to decode exit status: %v", err)
}
if exitStatus.ExitCode != code {
t.Fatalf("writeExitStatus(%v) wrote exit status %v; want %v", err, exitStatus.ExitCode, code)
}
}

View File

@@ -1,16 +0,0 @@
// +build !windows
package command
import "syscall"
// isolateCmd sets the session id for the process and the umask.
func (c *SpawnDaemonCommand) isolateCmd() error {
if c.config.Cmd.SysProcAttr == nil {
c.config.Cmd.SysProcAttr = &syscall.SysProcAttr{}
}
c.config.Cmd.SysProcAttr.Setsid = true
syscall.Umask(0)
return nil
}

View File

@@ -1,7 +0,0 @@
// build !linux !darwin
package command
// No isolation on Windows.
func (c *SpawnDaemonCommand) isolateCmd() error { return nil }
func (c *SpawnDaemonCommand) configureChroot() {}

View File

@@ -57,6 +57,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"executor": func() (cli.Command, error) {
return &command.ExecutorPluginCommand{
Meta: meta,
}, nil
},
"fs": func() (cli.Command, error) {
return &command.FSCommand{
Meta: meta,
@@ -118,13 +123,6 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"spawn-daemon": func() (cli.Command, error) {
return &command.SpawnDaemonCommand{
Meta: meta,
}, nil
},
"status": func() (cli.Command, error) {
return &command.StatusCommand{
Meta: meta,

View File

@@ -32,7 +32,7 @@ func RunCustom(args []string, commands map[string]cli.CommandFactory) int {
commandsInclude := make([]string, 0, len(commands))
for k, _ := range commands {
switch k {
case "spawn-daemon":
case "executor":
default:
commandsInclude = append(commandsInclude, k)
}