From 3576f489932ede9b12640d5bbfbf44324f0cc527 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 2 Nov 2015 20:28:37 -0800 Subject: [PATCH] Create Spawn pkg that handles IPC with the spawn-daemon and update exec_linux to use that --- client/driver/exec_test.go | 2 +- client/executor/exec_linux.go | 221 +++++------------------ client/spawn/spawn.go | 322 ++++++++++++++++++++++++++++++++++ client/spawn/spawn_test.go | 252 ++++++++++++++++++++++++++ command/spawn_daemon.go | 47 +++-- helper/discover/discover.go | 10 +- 6 files changed, 657 insertions(+), 197 deletions(-) create mode 100644 client/spawn/spawn.go create mode 100644 client/spawn/spawn_test.go diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 488847c5c..1bb4adf36 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -293,7 +293,7 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { if err == nil { t.Fatal("should err") } - case <-time.After(2 * time.Second): + case <-time.After(8 * time.Second): t.Fatalf("timeout") } } diff --git a/client/executor/exec_linux.go b/client/executor/exec_linux.go index 9c4bcd9a4..be70379d2 100644 --- a/client/executor/exec_linux.go +++ b/client/executor/exec_linux.go @@ -5,12 +5,9 @@ import ( "encoding/json" "errors" "fmt" - "io" "os" - "os/exec" "os/user" "path/filepath" - "strconv" "strings" "syscall" @@ -18,8 +15,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/args" "github.com/hashicorp/nomad/client/driver/environment" - "github.com/hashicorp/nomad/command" - "github.com/hashicorp/nomad/helper/discover" + "github.com/hashicorp/nomad/client/spawn" "github.com/hashicorp/nomad/nomad/structs" "github.com/opencontainers/runc/libcontainer/cgroups" @@ -53,18 +49,13 @@ type LinuxExecutor struct { // Isolation configurations. groups *cgroupConfig.Cgroup - alloc *allocdir.AllocDir taskName string taskDir string + allocDir string - // Tracking of spawn process. - spawnChild *os.Process - spawnOutputWriter *os.File - spawnOutputReader *os.File - - // Tracking of user process. - exitStatusFile string - userPid int + // Spawn process. + spawn *spawn.Spawner + spawnState string } func (e *LinuxExecutor) Command() *cmd { @@ -82,11 +73,9 @@ func (e *LinuxExecutor) Limit(resources *structs.Resources) error { // execLinuxID contains the necessary information to reattach to an executed // process and cleanup the created cgroups. type ExecLinuxID struct { - Groups *cgroupConfig.Cgroup - SpawnPid int - UserPid int - ExitStatusFile string - TaskDir string + Groups *cgroupConfig.Cgroup + Spawn *spawn.Spawner + TaskDir string } func (e *LinuxExecutor) Open(id string) error { @@ -99,30 +88,22 @@ func (e *LinuxExecutor) Open(id string) error { // Setup the executor. e.groups = execID.Groups - e.exitStatusFile = execID.ExitStatusFile - e.userPid = execID.UserPid + e.spawn = execID.Spawn e.taskDir = execID.TaskDir - proc, err := os.FindProcess(execID.SpawnPid) - if proc != nil && err == nil { - e.spawnChild = proc - } - return nil } func (e *LinuxExecutor) ID() (string, error) { - if e.spawnChild == nil { - return "", fmt.Errorf("Process has finished or was never started") + if e.groups == nil || e.spawn == nil || e.taskDir == "" { + return "", fmt.Errorf("LinuxExecutor not properly initialized.") } // Build the ID. id := ExecLinuxID{ - Groups: e.groups, - SpawnPid: e.spawnChild.Pid, - UserPid: e.userPid, - ExitStatusFile: e.exitStatusFile, - TaskDir: e.taskDir, + Groups: e.groups, + Spawn: e.spawn, + TaskDir: e.taskDir, } var buffer bytes.Buffer @@ -170,10 +151,6 @@ func (e *LinuxExecutor) Start() error { e.cmd.SetGID(e.user.Gid) } - if e.alloc == nil { - return errors.New("ConfigureTaskDir() must be called before Start()") - } - // Parse the commands arguments and replace instances of Nomad environment // variables. envVars, err := environment.ParseFromList(e.Cmd.Env) @@ -196,129 +173,42 @@ func (e *LinuxExecutor) Start() error { } e.Cmd.Args = parsed - return e.spawnDaemon() -} + spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status")) + e.spawn = spawn.NewSpawner(spawnState) + e.spawn.SetCommand(&e.cmd.Cmd) + e.spawn.SetChroot(e.taskDir) + e.spawn.SetLogs(&spawn.Logs{ + Stdout: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", e.taskName)), + Stderr: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", e.taskName)), + Stdin: "/dev/null", + }) -// spawnDaemon executes a double fork to start the user command with proper -// isolation. Stores the child process for use in Wait. -func (e *LinuxExecutor) spawnDaemon() error { - bin, err := discover.NomadExecutable() - if err != nil { - return fmt.Errorf("Failed to determine the nomad executable: %v", err) - } + enterCgroup := func(pid int) error { + // Join the spawn-daemon to the cgroup. + manager := e.getCgroupManager(e.groups) - c := command.DaemonConfig{ - Cmd: e.cmd.Cmd, - Chroot: e.taskDir, - StdoutFile: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", e.taskName)), - StderrFile: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", e.taskName)), - StdinFile: "/dev/null", - ExitStatusFile: e.exitStatusFile, - } - - // Serialize the cmd and the cgroup configuration so it can be passed to the - // sub-process. - var buffer bytes.Buffer - enc := json.NewEncoder(&buffer) - if err := enc.Encode(c); err != nil { - return fmt.Errorf("Failed to serialize daemon configuration: %v", err) - } - - // Create a pipe to capture stdout. - if e.spawnOutputReader, e.spawnOutputWriter, err = os.Pipe(); err != nil { - return err - } - - // Call ourselves using a hidden flag. The new instance of nomad will join - // the passed cgroup, forkExec the cmd, and return statuses through stdout. - escaped := strconv.Quote(buffer.String()) - spawn := exec.Command(bin, "spawn-daemon", escaped) - spawn.Stdout = e.spawnOutputWriter - - // Capture its Stdin. - spawnStdIn, err := spawn.StdinPipe() - if err != nil { - return err - } - - if err := spawn.Start(); err != nil { - fmt.Errorf("Failed to call spawn-daemon on nomad executable: %v", err) - } - - // Join the spawn-daemon to the cgroup. - manager := e.getCgroupManager(e.groups) - - // Apply will place the spawn dameon into the created cgroups. - if err := manager.Apply(spawn.Process.Pid); err != nil { - errs := new(multierror.Error) - errs = multierror.Append(errs, - fmt.Errorf("Failed to join spawn-daemon to the cgroup (%+v): %v", e.groups, err)) - - if err := sendAbortCommand(spawnStdIn); err != nil { - errs = multierror.Append(errs, err) + // Apply will place the spawn dameon into the created cgroups. + if err := manager.Apply(pid); err != nil { + return fmt.Errorf("Failed to join spawn-daemon to the cgroup (%+v): %v", e.groups, err) } - return errs + return nil } - // Tell it to start. - if err := sendStartCommand(spawnStdIn); err != nil { - return err - } - - // Parse the response. - dec := json.NewDecoder(e.spawnOutputReader) - var resp command.SpawnStartStatus - if err := dec.Decode(&resp); err != nil { - return fmt.Errorf("Failed to parse spawn-daemon start response: %v", err) - } - - if resp.ErrorMsg != "" { - return fmt.Errorf("Failed to execute user command: %s", resp.ErrorMsg) - } - - e.userPid = resp.UserPID - e.spawnChild = spawn.Process - return nil -} - -// sendStartCommand sends the necessary command to the spawn-daemon to have it -// start the user process. -func sendStartCommand(w io.Writer) error { - enc := json.NewEncoder(w) - if err := enc.Encode(true); err != nil { - return fmt.Errorf("Failed to serialize start command: %v", err) - } - - return nil -} - -// sendAbortCommand sends the necessary command to the spawn-daemon to have it -// abort starting the user process. This should be invoked if the spawn-daemon -// could not be isolated into a cgroup. -func sendAbortCommand(w io.Writer) error { - enc := json.NewEncoder(w) - if err := enc.Encode(false); err != nil { - return fmt.Errorf("Failed to serialize abort command: %v", err) - } - - return nil + return e.spawn.Spawn(enterCgroup) } // Wait waits til the user process exits and returns an error on non-zero exit // codes. Wait also cleans up the task directory and created cgroups. func (e *LinuxExecutor) Wait() error { - if e.spawnOutputReader != nil { - e.spawnOutputReader.Close() - } - - if e.spawnOutputWriter != nil { - e.spawnOutputWriter.Close() - } - errs := new(multierror.Error) - if err := e.spawnWait(); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Wait failed on pid %v: %v", e.spawnChild.Pid, err)) + code, err := e.spawn.Wait() + if err != nil { + errs = multierror.Append(errs, err) + } + + if code != 0 { + errs = multierror.Append(errs, fmt.Errorf("Task exited with code: %d", code)) } if err := e.destroyCgroup(); err != nil { @@ -332,20 +222,6 @@ func (e *LinuxExecutor) Wait() error { return errs.ErrorOrNil() } -// spawnWait waits on the spawn-daemon and can handle the spawn-daemon not being -// a child of this process. -func (e *LinuxExecutor) spawnWait() error { - // TODO: This needs to be able to wait on non-child processes. - state, err := e.spawnChild.Wait() - if err != nil { - return err - } else if !state.Success() { - return fmt.Errorf("exited with non-zero code") - } - - return nil -} - func (e *LinuxExecutor) Shutdown() error { return e.ForceStop() } @@ -353,19 +229,9 @@ func (e *LinuxExecutor) Shutdown() error { // ForceStop immediately exits the user process and cleans up both the task // directory and the cgroups. func (e *LinuxExecutor) ForceStop() error { - if e.spawnOutputReader != nil { - e.spawnOutputReader.Close() - } - - if e.spawnOutputWriter != nil { - e.spawnOutputWriter.Close() - } - errs := new(multierror.Error) - if e.groups != nil { - if err := e.destroyCgroup(); err != nil { - errs = multierror.Append(errs, err) - } + if err := e.destroyCgroup(); err != nil { + errs = multierror.Append(errs, err) } if err := e.cleanTaskDir(); err != nil { @@ -381,6 +247,8 @@ func (e *LinuxExecutor) ForceStop() error { // chroot. cleanTaskDir should be called after. func (e *LinuxExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error { e.taskName = taskName + e.allocDir = alloc.AllocDir + taskDir, ok := alloc.TaskDirs[taskName] if !ok { fmt.Errorf("Couldn't find task directory for task %v", taskName) @@ -424,10 +292,6 @@ func (e *LinuxExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocD env.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)) e.Cmd.Env = env.List() - // Store the file path to save the exit status to. - e.exitStatusFile = filepath.Join(alloc.AllocDir, fmt.Sprintf("%s_%s", taskName, "exit_status")) - - e.alloc = alloc return nil } @@ -445,6 +309,7 @@ func (e *LinuxExecutor) pathExists(path string) bool { // should be called when tearing down the task. func (e *LinuxExecutor) cleanTaskDir() error { // Unmount dev. + // TODO: This should check if it is a mount. errs := new(multierror.Error) dev := filepath.Join(e.taskDir, "dev") if e.pathExists(dev) { diff --git a/client/spawn/spawn.go b/client/spawn/spawn.go new file mode 100644 index 000000000..fa75b3940 --- /dev/null +++ b/client/spawn/spawn.go @@ -0,0 +1,322 @@ +package spawn + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "strconv" + "time" + + "github.com/docker/docker/vendor/src/gopkg.in/fsnotify.v1" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/command" + "github.com/hashicorp/nomad/helper/discover" +) + +// Spawner is used to start a user command in an isolated fashion that is +// resistent to Nomad agent failure. +type Spawner struct { + spawn *os.Process + SpawnPid int + SpawnPpid int + StateFile string + + // User configuration + UserCmd *exec.Cmd + Logs *Logs + Chroot string +} + +// Logs is used to define the filepaths the user command's logs should be +// redirected to. The files do not need to exist. +type Logs struct { + Stdin, Stdout, Stderr string +} + +// NewSpawner takes a path to a state file. This state file can be used to +// create a new Spawner that can be used to wait on the exit status of a +// process even through Nomad restarts. +func NewSpawner(stateFile string) *Spawner { + return &Spawner{StateFile: stateFile} +} + +// SetCommand sets the user command to spawn. +func (s *Spawner) SetCommand(cmd *exec.Cmd) { + s.UserCmd = cmd +} + +// SetLogs sets the redirection of user command log files. +func (s *Spawner) SetLogs(l *Logs) { + s.Logs = l +} + +// SetChroot puts the user command into a chroot. +func (s *Spawner) SetChroot(root string) { + s.Chroot = root +} + +// Spawn does a double-fork to start and isolate the user command. It takes a +// call-back that is invoked with the pid of the intermediary process. If the +// call back returns an error, the user command is not started and the spawn is +// cancelled. This can be used to put the process into a cgroup or jail and +// cancel starting the user process if that was not successful. An error is +// returned if the call-back returns an error or the user-command couldn't be +// started. +func (s *Spawner) Spawn(cb func(pid int) error) error { + bin, err := discover.NomadExecutable() + if err != nil { + return fmt.Errorf("Failed to determine the nomad executable: %v", err) + } + + exitFile, err := os.OpenFile(s.StateFile, os.O_CREATE|os.O_WRONLY, 0666) + defer exitFile.Close() + if err != nil { + return fmt.Errorf("Error opening file to store exit status: %v", err) + } + + config, err := s.spawnConfig() + if err != nil { + return err + } + + spawn := exec.Command(bin, "spawn-daemon", config) + + // Capture stdout + spawnStdout, err := spawn.StdoutPipe() + defer spawnStdout.Close() + if err != nil { + return fmt.Errorf("Failed to capture spawn-daemon stdout: %v", err) + } + + // Capture stdin. + spawnStdin, err := spawn.StdinPipe() + defer spawnStdin.Close() + if err != nil { + return fmt.Errorf("Failed to capture spawn-daemon stdin: %v", err) + } + + if err := spawn.Start(); err != nil { + return fmt.Errorf("Failed to call spawn-daemon on nomad executable: %v", err) + } + + if cb != nil { + cbErr := cb(spawn.Process.Pid) + if cbErr != nil { + errs := new(multierror.Error) + errs = multierror.Append(errs, cbErr) + if err := s.sendAbortCommand(spawnStdin); err != nil { + errs = multierror.Append(errs, err) + } + + return errs + } + } + + if err := s.sendStartCommand(spawnStdin); err != nil { + return err + } + + respCh := make(chan command.SpawnStartStatus, 1) + errCh := make(chan error, 1) + + go func() { + var resp command.SpawnStartStatus + dec := json.NewDecoder(spawnStdout) + if err := dec.Decode(&resp); err != nil { + errCh <- fmt.Errorf("Failed to parse spawn-daemon start response: %v", err) + } + respCh <- resp + }() + + select { + case err := <-errCh: + return err + case resp := <-respCh: + if resp.ErrorMsg != "" { + return fmt.Errorf("Failed to execute user command: %s", resp.ErrorMsg) + } + case <-time.After(5 * time.Second): + return fmt.Errorf("timed out waiting for response") + } + + // Store the spawn process. + s.spawn = spawn.Process + s.SpawnPpid = os.Getpid() + return nil +} + +// spawnConfig returns a serialized config to pass to the Nomad spawn-daemon +// command. +func (s *Spawner) spawnConfig() (string, error) { + if s.UserCmd == nil { + return "", fmt.Errorf("Must specify user command") + } + + config := command.DaemonConfig{ + Cmd: *s.UserCmd, + Chroot: s.Chroot, + ExitStatusFile: s.StateFile, + } + + if s.Logs != nil { + config.StdoutFile = s.Logs.Stdout + config.StdinFile = s.Logs.Stdin + config.StderrFile = s.Logs.Stderr + } + + var buffer bytes.Buffer + enc := json.NewEncoder(&buffer) + if err := enc.Encode(config); err != nil { + return "", fmt.Errorf("Failed to serialize configuration: %v", err) + } + + return strconv.Quote(buffer.String()), nil +} + +// sendStartCommand sends the necessary command to the spawn-daemon to have it +// start the user process. +func (s *Spawner) sendStartCommand(w io.Writer) error { + enc := json.NewEncoder(w) + if err := enc.Encode(true); err != nil { + return fmt.Errorf("Failed to serialize start command: %v", err) + } + + return nil +} + +// sendAbortCommand sends the necessary command to the spawn-daemon to have it +// abort starting the user process. This should be invoked if the spawn-daemon +// could not be isolated into a cgroup. +func (s *Spawner) sendAbortCommand(w io.Writer) error { + enc := json.NewEncoder(w) + if err := enc.Encode(false); err != nil { + return fmt.Errorf("Failed to serialize abort command: %v", err) + } + + return nil +} + +// Wait returns the exit code of the user process or an error if the wait +// failed. +func (s *Spawner) Wait() (int, error) { + if os.Getpid() == s.SpawnPpid { + return s.waitAsParent() + } + + return s.waitOnStatusFile() +} + +// waitAsParent waits on the process if the current process was the spawner. +func (s *Spawner) waitAsParent() (int, error) { + if s.SpawnPpid != os.Getpid() { + return -1, fmt.Errorf("not the parent. Spawner parent is %v; current pid is %v", s.SpawnPpid, os.Getpid()) + } + + // Try to reattach to the spawn. + if s.spawn == nil { + // If it can't be reattached, it means the spawn process has exited so + // we should just read its exit file. + var err error + if s.spawn, err = os.FindProcess(s.SpawnPid); err != nil { + return s.waitOnStatusFile() + } + } + + if state, err := s.spawn.Wait(); err != nil { + return -1, err + } else if !state.Exited() { + return -1, fmt.Errorf("Task was killed or crashed") + } + + return s.waitOnStatusFile() +} + +// waitOnStatusFile uses OS level file watching APIs to wait on the status file +// and returns the exit code and possibly an error. +func (s *Spawner) waitOnStatusFile() (int, error) { + // Set up a watcher for the exit status file. + watcher, err := fsnotify.NewWatcher() + if err != nil { + return -1, fmt.Errorf("Failed to create file watcher to read exit code: %v", err) + } + + if err := watcher.Add(s.StateFile); err != nil { + return -1, fmt.Errorf("Failed to watch %v to read exit code: %v", s.StateFile, err) + } + + // Stat to check if it is there to avoid a race condition. + stat, err := os.Stat(s.StateFile) + if err != nil { + return -1, fmt.Errorf("Failed to Stat exit status file %v: %v", s.StateFile, err) + } + + // If there is data it means that the file has already been written. + if stat.Size() > 0 { + return s.readExitCode() + } + + // Store the mod time as a way to heartbeat. If the file doesn't get touched + // then we know the spawner has died. This avoids an infinite loop. + prevModTime := stat.ModTime() + + // Wait on watcher. + for { + select { + case event := <-watcher.Events: + if event.Op&fsnotify.Write == fsnotify.Write { + stat, err := os.Stat(s.StateFile) + if err != nil { + return -1, fmt.Errorf("Failed to Stat exit status file %v: %v", s.StateFile, err) + } + + if stat.Size() > 0 { + return s.readExitCode() + } + } + case err := <-watcher.Errors: + return -1, fmt.Errorf("Failed to watch %v for an exit code: %v", s.StateFile, err) + case <-time.After(5 * time.Second): + stat, err := os.Stat(s.StateFile) + if err != nil { + return -1, fmt.Errorf("Failed to Stat exit status file %v: %v", s.StateFile, err) + } + + modTime := stat.ModTime() + if modTime.Equal(prevModTime) { + return -1, fmt.Errorf("Task is dead and exit code unreadable") + } + + prevModTime = modTime + } + } +} + +// readExitCode parses the state file and returns the exit code of the task. It +// returns an error if the file can't be read. +func (s *Spawner) readExitCode() (int, error) { + f, err := os.Open(s.StateFile) + defer f.Close() + if err != nil { + return -1, fmt.Errorf("Failed to open %v to read exit code: %v", s.StateFile, err) + } + + stat, err := f.Stat() + if err != nil { + return -1, fmt.Errorf("Failed to stat file %v: %v", s.StateFile, err) + } + + if stat.Size() == 0 { + return -1, fmt.Errorf("Empty state file: %v", s.StateFile) + } + + var exitStatus command.SpawnExitStatus + dec := json.NewDecoder(f) + if err := dec.Decode(&exitStatus); err != nil { + return -1, fmt.Errorf("Failed to parse exit status from %v: %v", s.StateFile, err) + } + + return exitStatus.ExitCode, nil +} diff --git a/client/spawn/spawn_test.go b/client/spawn/spawn_test.go new file mode 100644 index 000000000..d624f9d9f --- /dev/null +++ b/client/spawn/spawn_test.go @@ -0,0 +1,252 @@ +package spawn + +import ( + "fmt" + "io/ioutil" + "os" + "os/exec" + "runtime" + "strings" + "testing" + "time" +) + +func TestSpawn_NoCmd(t *testing.T) { + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("TempFile() failed") + } + defer os.Remove(f.Name()) + + spawn := NewSpawner(f.Name()) + if err := spawn.Spawn(nil); err == nil { + t.Fatalf("Spawn() with no user command should fail") + } +} + +func TestSpawn_InvalidCmd(t *testing.T) { + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("TempFile() failed") + } + defer os.Remove(f.Name()) + + spawn := NewSpawner(f.Name()) + spawn.SetCommand(exec.Command("foo")) + if err := spawn.Spawn(nil); err == nil { + t.Fatalf("Spawn() with no invalid command should fail") + } +} + +func TestSpawn_SetsLogs(t *testing.T) { + // TODO: Figure out why this test fails. If the spawn-daemon directly writes + // to the opened stdout file it works but not the user command. Maybe a + // flush issue? + if runtime.GOOS == "windows" { + t.Skip("Test fails on windows; unknown reason. Skipping") + } + + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("TempFile() failed") + } + defer os.Remove(f.Name()) + + spawn := NewSpawner(f.Name()) + exp := "foo" + spawn.SetCommand(exec.Command("echo", exp)) + + // Create file for stdout. + stdout, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("TempFile() failed") + } + defer os.Remove(stdout.Name()) + spawn.SetLogs(&Logs{Stdout: stdout.Name()}) + + if err := spawn.Spawn(nil); err != nil { + t.Fatalf("Spawn() failed: %v", err) + } + + if code, err := spawn.Wait(); code != 0 && err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", code, err) + } + + stdout2, err := os.Open(stdout.Name()) + if err != nil { + t.Fatalf("Open() failed: %v", err) + } + + data, err := ioutil.ReadAll(stdout2) + if err != nil { + t.Fatalf("ReadAll() failed: %v", err) + } + + act := strings.TrimSpace(string(data)) + if act != exp { + t.Fatalf("Unexpected data written to stdout; got %v; want %v", act, exp) + } +} + +func TestSpawn_Callback(t *testing.T) { + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("TempFile() failed") + } + defer os.Remove(f.Name()) + + spawn := NewSpawner(f.Name()) + spawn.SetCommand(exec.Command("sleep", "1")) + + called := false + cbErr := fmt.Errorf("ERROR CB") + cb := func(_ int) error { + called = true + return cbErr + } + + if err := spawn.Spawn(cb); err == nil { + t.Fatalf("Spawn(%#v) should have errored; want %v", cb, err, cbErr) + } + + if !called { + t.Fatalf("Spawn(%#v) didn't call callback", cb) + } +} + +func TestSpawn_ParentWaitExited(t *testing.T) { + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("TempFile() failed") + } + defer os.Remove(f.Name()) + + spawn := NewSpawner(f.Name()) + spawn.SetCommand(exec.Command("echo", "foo")) + if err := spawn.Spawn(nil); err != nil { + t.Fatalf("Spawn() failed %v", err) + } + + time.Sleep(1 * time.Second) + + code, err := spawn.Wait() + if err != nil { + t.Fatalf("Wait() failed %v", err) + } + + if code != 0 { + t.Fatalf("Wait() returned %v; want 0", code) + } +} + +func TestSpawn_ParentWait(t *testing.T) { + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("TempFile() failed") + } + defer os.Remove(f.Name()) + + spawn := NewSpawner(f.Name()) + spawn.SetCommand(exec.Command("sleep", "2")) + if err := spawn.Spawn(nil); err != nil { + t.Fatalf("Spawn() failed %v", err) + } + + code, err := spawn.Wait() + if err != nil { + t.Fatalf("Wait() failed %v", err) + } + + if code != 0 { + t.Fatalf("Wait() returned %v; want 0", code) + } +} + +func TestSpawn_NonParentWaitExited(t *testing.T) { + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("TempFile() failed") + } + defer os.Remove(f.Name()) + + spawn := NewSpawner(f.Name()) + spawn.SetCommand(exec.Command("echo", "foo")) + if err := spawn.Spawn(nil); err != nil { + t.Fatalf("Spawn() failed %v", err) + } + + time.Sleep(1 * time.Second) + + // Force the wait to assume non-parent. + spawn.SpawnPpid = 0 + code, err := spawn.Wait() + if err != nil { + t.Fatalf("Wait() failed %v", err) + } + + if code != 0 { + t.Fatalf("Wait() returned %v; want 0", code) + } +} + +func TestSpawn_NonParentWait(t *testing.T) { + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("TempFile() failed") + } + defer os.Remove(f.Name()) + + spawn := NewSpawner(f.Name()) + spawn.SetCommand(exec.Command("sleep", "2")) + if err := spawn.Spawn(nil); err != nil { + t.Fatalf("Spawn() failed %v", err) + } + + // Force the wait to assume non-parent. + spawn.SpawnPpid = 0 + code, err := spawn.Wait() + if err != nil { + t.Fatalf("Wait() failed %v", err) + } + + if code != 0 { + t.Fatalf("Wait() returned %v; want 0", code) + } +} + +func TestSpawn_DeadSpawnDaemon(t *testing.T) { + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("TempFile() failed") + } + defer os.Remove(f.Name()) + + var spawnPid int + cb := func(pid int) error { + spawnPid = pid + return nil + } + + spawn := NewSpawner(f.Name()) + spawn.SetCommand(exec.Command("sleep", "5")) + if err := spawn.Spawn(cb); err != nil { + t.Fatalf("Spawn() errored: %v", err) + } + + proc, err := os.FindProcess(spawnPid) + if err != nil { + t.FailNow() + } + + if err := proc.Kill(); err != nil { + t.FailNow() + } + + if _, err := proc.Wait(); err != nil { + t.FailNow() + } + + if _, err := spawn.Wait(); err == nil { + t.Fatalf("Wait() should have failed: %v", err) + } +} diff --git a/command/spawn_daemon.go b/command/spawn_daemon.go index 81117ce2e..81f5ca2ca 100644 --- a/command/spawn_daemon.go +++ b/command/spawn_daemon.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "syscall" + "time" ) type SpawnDaemonCommand struct { @@ -108,24 +109,31 @@ func (c *SpawnDaemonCommand) parseConfig(args []string) (*DaemonConfig, error) { // configureLogs creates the log files and redirects the process // stdin/stderr/stdout to them. If unsuccessful, an error is returned. func (c *SpawnDaemonCommand) configureLogs() error { - stdo, err := os.OpenFile(c.config.StdoutFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) - if err != nil { - return fmt.Errorf("Error opening file to redirect stdout: %v", err) + if len(c.config.StdoutFile) != 0 { + stdo, err := os.OpenFile(c.config.StdoutFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return fmt.Errorf("Error opening file to redirect stdout: %v", err) + } + + c.config.Cmd.Stdout = stdo } - stde, err := os.OpenFile(c.config.StderrFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) - if err != nil { - return fmt.Errorf("Error opening file to redirect stderr: %v", err) + if len(c.config.StderrFile) != 0 { + stde, err := os.OpenFile(c.config.StderrFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return fmt.Errorf("Error opening file to redirect stderr: %v", err) + } + c.config.Cmd.Stderr = stde } - stdi, err := os.OpenFile(c.config.StdinFile, os.O_CREATE|os.O_RDONLY, 0666) - if err != nil { - return fmt.Errorf("Error opening file to redirect stdin: %v", err) + if len(c.config.StdinFile) != 0 { + stdi, err := os.OpenFile(c.config.StdinFile, os.O_CREATE|os.O_RDONLY, 0666) + if err != nil { + return fmt.Errorf("Error opening file to redirect stdin: %v", err) + } + c.config.Cmd.Stdin = stdi } - c.config.Cmd.Stdout = stdo - c.config.Cmd.Stderr = stde - c.config.Cmd.Stdin = stdi return nil } @@ -139,7 +147,7 @@ func (c *SpawnDaemonCommand) Run(args []string) int { // Open the file we will be using to write exit codes to. We do this early // to ensure that we don't start the user process when we can't capture its // exit status. - c.exitFile, err = os.OpenFile(c.config.ExitStatusFile, os.O_CREATE|os.O_RDWR, 0666) + c.exitFile, err = os.OpenFile(c.config.ExitStatusFile, os.O_WRONLY, 0666) if err != nil { return c.outputStartStatus(fmt.Errorf("Error opening file to store exit status: %v", err), 1) } @@ -177,6 +185,17 @@ func (c *SpawnDaemonCommand) Run(args []string) int { // Indicate that the command was started successfully. c.outputStartStatus(nil, 0) + // Start a go routine that touches the exit file periodically. + go func() { + for { + select { + case <-time.After(2 * time.Second): + now := time.Now() + os.Chtimes(c.config.ExitStatusFile, now, now) + } + } + }() + // Wait and then output the exit status. return c.writeExitStatus(c.config.Cmd.Wait()) } @@ -192,7 +211,7 @@ func (c *SpawnDaemonCommand) outputStartStatus(err error, status int) int { startStatus.ErrorMsg = err.Error() } - if c.config != nil && c.config.Process != nil { + if c.config != nil && c.config.Cmd.Process != nil { startStatus.UserPID = c.config.Process.Pid } diff --git a/helper/discover/discover.go b/helper/discover/discover.go index d90ddb4cc..d172970f7 100644 --- a/helper/discover/discover.go +++ b/helper/discover/discover.go @@ -4,17 +4,19 @@ import ( "fmt" "os" "path/filepath" + "runtime" "github.com/kardianos/osext" ) -const ( - nomadExe = "nomad" -) - // Checks the current executable, then $GOPATH/bin, and finally the CWD, in that // order. If it can't be found, an error is returned. func NomadExecutable() (string, error) { + nomadExe := "nomad" + if runtime.GOOS == "windows" { + nomadExe = "nomad.exe" + } + // Check the current executable. bin, err := osext.Executable() if err != nil {