diff --git a/client/driver/java.go b/client/driver/java.go new file mode 100644 index 000000000..856158bae --- /dev/null +++ b/client/driver/java.go @@ -0,0 +1,166 @@ +package driver + +import ( + "fmt" + "io" + "log" + "net/http" + "os" + "os/exec" + "path" + "strconv" + "strings" + "time" + + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/nomad/structs" +) + +// JavaDriver is a simple driver to execute applications packaged in Jars. +// It literally just fork/execs tasks with the java command. +type JavaDriver struct { + logger *log.Logger +} + +// javaHandle is returned from Start/Open as a handle to the PID +type javaHandle struct { + proc *os.Process + waitCh chan error + doneCh chan struct{} +} + +// NewJavaDriver is used to create a new exec driver +func NewJavaDriver(logger *log.Logger) Driver { + d := &JavaDriver{ + logger: logger, + } + return d +} + +func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) { + // We can always do a fork/exec + node.Attributes["driver.java"] = "1" + return true, nil +} + +func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { + // Get the jar source + source, ok := task.Config["jar_source"] + if !ok || source == "" { + return nil, fmt.Errorf("missing jar source for Java Jar driver") + } + + // Attempt to download the thing + // Should be extracted to some kind of Http Fetcher + // Right now, assume publicly accessible HTTP url + resp, err := http.Get(source) + if err != nil { + return nil, fmt.Errorf("Error downloading source for Java driver: %s", err) + } + + base := path.Base(source) + f, err := os.OpenFile(ctx.AllocDir+base, os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return nil, fmt.Errorf("Error opening file to download too: %s", err) + } + defer f.Close() + + // Copy remote file to local AllocDir for execution + // TODO: a retry of sort if io.Copy fails, for large binaries + _, ioErr := io.Copy(f, resp.Body) + if ioErr != nil { + return nil, fmt.Errorf("Error copying jar from source: %s", ioErr) + } + + // Look for arguments + argRaw, ok := task.Config["args"] + var userArgs []string + if ok { + userArgs = strings.Split(argRaw, " ") + } + args := []string{"-jar", f.Name()} + + for _, s := range userArgs { + args = append(args, s) + } + + // Setup the command + // Assumes Java is in the $PATH, but could probably be detected + cmd := exec.Command("java", args...) + err = cmd.Start() + if err != nil { + return nil, fmt.Errorf("failed to start source: %v", err) + } + + // Return a driver handle + h := &javaHandle{ + proc: cmd.Process, + doneCh: make(chan struct{}), + waitCh: make(chan error, 1), + } + + go h.run() + return h, nil +} + +func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { + // Split the handle + pidStr := strings.TrimPrefix(handleID, "PID:") + pid, err := strconv.Atoi(pidStr) + if err != nil { + return nil, fmt.Errorf("failed to parse handle '%s': %v", handleID, err) + } + + // Find the process + proc, err := os.FindProcess(pid) + if proc == nil || err != nil { + return nil, fmt.Errorf("failed to find PID %d: %v", pid, err) + } + + // Return a driver handle + h := &javaHandle{ + proc: proc, + doneCh: make(chan struct{}), + waitCh: make(chan error, 1), + } + + go h.run() + return h, nil +} + +func (h *javaHandle) ID() string { + // Return a handle to the PID + return fmt.Sprintf("PID:%d", h.proc.Pid) +} + +func (h *javaHandle) WaitCh() chan error { + return h.waitCh +} + +func (h *javaHandle) Update(task *structs.Task) error { + // Update is not possible + return nil +} + +// Kill is used to terminate the task. We send an Interrupt +// and then provide a 5 second grace period before doing a Kill. +func (h *javaHandle) Kill() error { + h.proc.Signal(os.Interrupt) + select { + case <-h.doneCh: + return nil + case <-time.After(5 * time.Second): + return h.proc.Kill() + } +} + +func (h *javaHandle) run() { + ps, err := h.proc.Wait() + close(h.doneCh) + if err != nil { + h.waitCh <- err + } else if !ps.Success() { + h.waitCh <- fmt.Errorf("task exited with error") + } + close(h.waitCh) +} diff --git a/client/driver/java_test.go b/client/driver/java_test.go new file mode 100644 index 000000000..3e095e286 --- /dev/null +++ b/client/driver/java_test.go @@ -0,0 +1,139 @@ +package driver + +import ( + "os" + "testing" + "time" + + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestJavaDriver_Fingerprint(t *testing.T) { + d := NewJavaDriver(testLogger()) + node := &structs.Node{ + Attributes: make(map[string]string), + } + apply, err := d.Fingerprint(&config.Config{}, node) + if err != nil { + t.Fatalf("err: %v", err) + } + if !apply { + t.Fatalf("should apply") + } + if node.Attributes["driver.exec"] == "" { + t.Fatalf("missing driver") + } +} + +func TestJavaDriver_StartOpen_Wait(t *testing.T) { + ctx := NewExecContext() + ctx.AllocDir = os.TempDir() + d := NewJavaDriver(testLogger()) + + task := &structs.Task{ + Config: map[string]string{ + "jar_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", + // "jar_source": "https://s3-us-west-2.amazonaws.com/java-jar-thing/demoapp.jar", + // "args": "-d64", + }, + } + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Attempt to open + handle2, err := d.Open(ctx, handle.ID()) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle2 == nil { + t.Fatalf("missing handle") + } + + time.Sleep(2 * time.Second) + // need to kill long lived process + err = handle.Kill() + if err != nil { + t.Fatalf("Error: %s", err) + } +} + +func TestJavaDriver_Start_Wait(t *testing.T) { + ctx := NewExecContext() + ctx.AllocDir = os.TempDir() + d := NewJavaDriver(testLogger()) + + task := &structs.Task{ + Config: map[string]string{ + "jar_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", + // "jar_source": "https://s3-us-west-2.amazonaws.com/java-jar-thing/demoapp.jar", + // "args": "-d64", + }, + } + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + // Task should terminate quickly + select { + case err := <-handle.WaitCh(): + if err != nil { + t.Fatalf("err: %v", err) + } + case <-time.After(2 * time.Second): + // expect the timeout b/c it's a long lived process + break + } +} + +func TestJavaDriver_Start_Kill_Wait(t *testing.T) { + ctx := NewExecContext() + ctx.AllocDir = os.TempDir() + d := NewJavaDriver(testLogger()) + + task := &structs.Task{ + Config: map[string]string{ + "jar_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", + // "jar_source": "https://s3-us-west-2.amazonaws.com/java-jar-thing/demoapp.jar", + // "args": "-d64", + }, + } + handle, err := d.Start(ctx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + go func() { + time.Sleep(100 * time.Millisecond) + err := handle.Kill() + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Task should terminate quickly + select { + case err := <-handle.WaitCh(): + if err == nil { + t.Fatalf("should err: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout") + } +} + +func cleanupFile(path string) error { + return nil +}