From 4308c69bccd49128a8fa95de5a17c8cb749c7036 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 29 Aug 2015 16:20:07 -0700 Subject: [PATCH] driver/exec: basic implementation --- client/driver/exec.go | 90 ++++++++++++++++++++++++++++++--- client/driver/exec_test.go | 101 +++++++++++++++++++++++++++++++++++++ 2 files changed, 185 insertions(+), 6 deletions(-) diff --git a/client/driver/exec.go b/client/driver/exec.go index dc98b088c..d5060e58b 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -1,7 +1,13 @@ package driver import ( + "fmt" "log" + "os" + "os/exec" + "strconv" + "strings" + "time" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/structs" @@ -16,7 +22,9 @@ type ExecDriver struct { // execHandle is returned from Start/Open as a handle to the PID type execHandle struct { + proc *os.Process waitCh chan error + doneCh chan struct{} } // NewExecDriver is used to create a new exec driver @@ -34,23 +42,93 @@ func (d *ExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, } func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { - // TODO - return nil, nil + // Get the command + command, ok := task.Config["command"] + if !ok || command == "" { + return nil, fmt.Errorf("missing command for exec driver") + } + + // Look for arguments + argRaw, ok := task.Config["args"] + var args []string + if ok { + args = strings.Split(argRaw, " ") + } + + // Setup the command + cmd := exec.Command(command, args...) + err := cmd.Start() + if err != nil { + return nil, fmt.Errorf("failed to start command: %v", err) + } + + // Return a driver handle + h := &execHandle{ + proc: cmd.Process, + doneCh: make(chan struct{}), + waitCh: make(chan error, 1), + } + go h.run() + return h, nil } func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { - // TODO - return nil, nil + // Split the handle + pidStr := strings.TrimPrefix(handleID, "PID:") + pid, err := strconv.Atoi(pidStr) + if err != nil { + return nil, fmt.Errorf("failed to parse handle '%s': %v", handleID, err) + } + + // Find the process + proc, err := os.FindProcess(pid) + if proc == nil || err != nil { + return nil, fmt.Errorf("failed to find PID %d: %v", pid, err) + } + + // Return a driver handle + h := &execHandle{ + proc: proc, + doneCh: make(chan struct{}), + waitCh: make(chan error, 1), + } + go h.run() + return h, nil } func (h *execHandle) ID() string { - return "test" + // Return a handle to the PID + return fmt.Sprintf("PID:%d", h.proc.Pid) } func (h *execHandle) WaitCh() chan error { return h.waitCh } -func (h *execHandle) Kill() error { +func (h *execHandle) Update(task *structs.Task) error { + // Update is not possible return nil } + +// Kill is used to terminate the task. We send an Interrupt +// and then provide a 5 second grace period before doing a Kill. +func (h *execHandle) Kill() error { + h.proc.Signal(os.Interrupt) + select { + case <-h.doneCh: + return nil + case <-time.After(5 * time.Second): + return h.proc.Kill() + } +} + +func (h *execHandle) run() { + ps, err := h.proc.Wait() + close(h.doneCh) + if err != nil { + h.waitCh <- err + } else if !ps.Success() { + h.waitCh <- fmt.Errorf("task exited with error") + } + close(h.waitCh) +} diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 88cc21e20..d2638408e 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -4,6 +4,7 @@ import ( "log" "os" "testing" + "time" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/structs" @@ -29,3 +30,103 @@ func TestExecDriver_Fingerprint(t *testing.T) { t.Fatalf("missing driver") } } + +func TestExecDriver_StartOpen_Wait(t *testing.T) { + ctx := NewExecContext() + d := NewExecDriver(testLogger()) + + task := &structs.Task{ + Config: map[string]string{ + "command": "/bin/sleep", + "args": "1", + }, + } + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Attempt to open + handle2, err := d.Open(ctx, handle.ID()) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle2 == nil { + t.Fatalf("missing handle") + } +} + +func TestExecDriver_Start_Wait(t *testing.T) { + ctx := NewExecContext() + d := NewExecDriver(testLogger()) + + task := &structs.Task{ + Config: map[string]string{ + "command": "/bin/sleep", + "args": "1", + }, + } + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Update should be a no-op + err = handle.Update(task) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Task should terminate quickly + select { + case err := <-handle.WaitCh(): + if err != nil { + t.Fatalf("err: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout") + } +} + +func TestExecDriver_Start_Kill_Wait(t *testing.T) { + ctx := NewExecContext() + d := NewExecDriver(testLogger()) + + task := &structs.Task{ + Config: map[string]string{ + "command": "/bin/sleep", + "args": "10", + }, + } + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + go func() { + time.Sleep(100 * time.Millisecond) + err := handle.Kill() + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Task should terminate quickly + select { + case err := <-handle.WaitCh(): + if err == nil { + t.Fatalf("should err: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout") + } +}