From 1a3585b5724b39567e462c44cc18e54f7dbbbc47 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 7 Oct 2016 12:37:52 -0700 Subject: [PATCH 1/5] Docker + Qemu --- client/driver/docker.go | 18 ++++++++ client/driver/docker_test.go | 79 ++++++++++++++++++++++++++++++++++++ client/driver/driver.go | 4 ++ client/driver/exec.go | 5 +++ client/driver/java.go | 5 +++ client/driver/mock_driver.go | 6 +++ client/driver/qemu.go | 5 +++ client/driver/qemu_test.go | 6 +++ client/driver/raw_exec.go | 5 +++ client/driver/rkt.go | 5 +++ 10 files changed, 138 insertions(+) diff --git a/client/driver/docker.go b/client/driver/docker.go index d4f21088d..f5047a7a2 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -13,6 +13,7 @@ import ( "strconv" "strings" "sync" + "syscall" "time" docker "github.com/fsouza/go-dockerclient" @@ -1003,6 +1004,23 @@ func (h *DockerHandle) Update(task *structs.Task) error { return nil } +func (h *DockerHandle) Signal(s os.Signal) error { + // Convert types + sysSig, ok := s.(syscall.Signal) + if !ok { + return fmt.Errorf("Failed to determine signal number") + } + + dockerSignal := docker.Signal(sysSig) + opts := docker.KillContainerOptions{ + ID: h.containerID, + Signal: dockerSignal, + } + h.logger.Printf("Sending: %v", dockerSignal) + return h.client.KillContainer(opts) + +} + // Kill is used to terminate the task. This uses `docker stop -t killTimeout` func (h *DockerHandle) Kill() error { // Stop the container diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 390652f1d..059c079cf 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -11,6 +11,7 @@ import ( "runtime/debug" "strconv" "strings" + "syscall" "testing" "time" @@ -861,7 +862,85 @@ func TestDockerDriver_Stats(t *testing.T) { case <-time.After(time.Duration(tu.TestMultiplier()*10) * time.Second): t.Fatalf("timeout") } +} +func TestDockerDriver_Signal(t *testing.T) { + task := &structs.Task{ + Name: "redis-demo", + Config: map[string]interface{}{ + "image": "busybox", + "load": []string{"busybox.tar"}, + "command": "/bin/sh", + "args": []string{"local/test.sh"}, + }, + Resources: &structs.Resources{ + MemoryMB: 256, + CPU: 512, + }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, + } + + driverCtx, execCtx := testDriverContexts(task) + //defer execCtx.AllocDir.Destroy() + d := NewDockerDriver(driverCtx) + + // Copy the image into the task's directory + copyImage(execCtx, task, "busybox.tar", t) + + testFile := filepath.Join(execCtx.AllocDir.TaskDirs["redis-demo"], "test.sh") + testData := []byte(` +at_term() { + echo 'Terminated.' + exit 3 +} +trap at_term USR1 +while true; do + sleep 1 +done + `) + if err := ioutil.WriteFile(testFile, testData, 0777); err != nil { + fmt.Errorf("Failed to write data") + } + + handle, err := d.Start(execCtx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + defer handle.Kill() + + waitForExist(t, handle.(*DockerHandle).client, handle.(*DockerHandle)) + + time.Sleep(1 * time.Second) + if err := handle.Signal(syscall.SIGUSR1); err != nil { + t.Fatalf("Signal returned an error: %v", err) + } + + select { + case res := <-handle.WaitCh(): + if res.Successful() { + t.Fatalf("should err: %v", res) + } + case <-time.After(time.Duration(tu.TestMultiplier()*5) * time.Second): + t.Fatalf("timeout") + } + + // Check the log file to see it exited because of the signal + outputFile := filepath.Join(execCtx.AllocDir.LogDir(), "redis-demo.stdout.0") + act, err := ioutil.ReadFile(outputFile) + if err != nil { + t.Fatalf("Couldn't read expected output: %v", err) + } + + exp := "Terminated." + if strings.TrimSpace(string(act)) != exp { + t.Fatalf("Command outputted %v; want %v", act, exp) + } } func setupDockerVolumes(t *testing.T, cfg *config.Config) (*structs.Task, Driver, *ExecContext, string, func()) { diff --git a/client/driver/driver.go b/client/driver/driver.go index ab31c5f8a..170be6055 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -3,6 +3,7 @@ package driver import ( "fmt" "log" + "os" "path/filepath" "github.com/hashicorp/nomad/client/allocdir" @@ -116,6 +117,9 @@ type DriverHandle interface { // Stats returns aggregated stats of the driver Stats() (*cstructs.TaskResourceUsage, error) + + // Signal is used to send a signal to the task + Signal(s os.Signal) error } // ExecContext is shared between drivers within an allocation diff --git a/client/driver/exec.go b/client/driver/exec.go index 728ed3f5f..db1301c88 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "os" "os/exec" "path/filepath" "strings" @@ -258,6 +259,10 @@ func (h *execHandle) Update(task *structs.Task) error { return nil } +func (h *execHandle) Signal(s os.Signal) error { + return nil +} + func (h *execHandle) Kill() error { if err := h.executor.ShutDown(); err != nil { if h.pluginClient.Exited() { diff --git a/client/driver/java.go b/client/driver/java.go index 24e6ce6a8..2e869cbfe 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "os" "os/exec" "path/filepath" "runtime" @@ -358,6 +359,10 @@ func (h *javaHandle) Update(task *structs.Task) error { return nil } +func (h *javaHandle) Signal(s os.Signal) error { + return nil +} + func (h *javaHandle) Kill() error { if err := h.executor.ShutDown(); err != nil { if h.pluginClient.Exited() { diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index 5ff2219be..7cd1260a5 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "log" + "os" "time" "github.com/mitchellh/mapstructure" @@ -178,6 +179,11 @@ func (h *mockDriverHandle) Update(task *structs.Task) error { return nil } +// TODO Implement when we need it. +func (h *mockDriverHandle) Signal(s os.Signal) error { + return nil +} + // Kill kills a mock task func (h *mockDriverHandle) Kill() error { h.logger.Printf("[DEBUG] driver.mock: killing task %q after kill timeout: %v", h.taskName, h.killTimeout) diff --git a/client/driver/qemu.go b/client/driver/qemu.go index d109f3912..2e99606c8 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "os" "os/exec" "path/filepath" "regexp" @@ -360,6 +361,10 @@ func (h *qemuHandle) Update(task *structs.Task) error { return nil } +func (h *qemuHandle) Signal(s os.Signal) error { + return fmt.Errorf("Qemu driver can't send signals") +} + // TODO: allow a 'shutdown_command' that can be executed over a ssh connection // to the VM func (h *qemuHandle) Kill() error { diff --git a/client/driver/qemu_test.go b/client/driver/qemu_test.go index 4f3122080..73fc786c5 100644 --- a/client/driver/qemu_test.go +++ b/client/driver/qemu_test.go @@ -4,6 +4,7 @@ import ( "fmt" "path/filepath" "strings" + "syscall" "testing" "github.com/hashicorp/nomad/client/config" @@ -84,6 +85,11 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) { t.Fatalf("missing handle") } + // Ensure that sending a Signal returns an error + if err := handle.Signal(syscall.SIGINT); err == nil { + t.Fatalf("Expect an error when signalling") + } + // Attempt to open handle2, err := d.Open(execCtx, handle.ID()) if err != nil { diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 684632b06..2995b964e 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "os" "os/exec" "path/filepath" "strings" @@ -255,6 +256,10 @@ func (h *rawExecHandle) Update(task *structs.Task) error { return nil } +func (h *rawExecHandle) Signal(s os.Signal) error { + return nil +} + func (h *rawExecHandle) Kill() error { if err := h.executor.ShutDown(); err != nil { if h.pluginClient.Exited() { diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 875372394..6c67fc25d 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "net" + "os" "os/exec" "path/filepath" "regexp" @@ -395,6 +396,10 @@ func (h *rktHandle) Update(task *structs.Task) error { return nil } +func (h *rktHandle) Signal(s os.Signal) error { + return nil +} + // Kill is used to terminate the task. We send an Interrupt // and then provide a 5 second grace period before doing a Kill. func (h *rktHandle) Kill() error { From 1de16a3847c9c89d22449813c57fb22d41eda065 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 7 Oct 2016 16:49:00 -0700 Subject: [PATCH 2/5] Rkt --- client/driver/rkt.go | 2 +- client/driver/rkt_test.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 6c67fc25d..ea3ed21c8 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -397,7 +397,7 @@ func (h *rktHandle) Update(task *structs.Task) error { } func (h *rktHandle) Signal(s os.Signal) error { - return nil + return fmt.Errorf("Rkt does not support signals") } // Kill is used to terminate the task. We send an Interrupt diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index fc68e4780..6418320e3 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -7,6 +7,7 @@ import ( "path/filepath" "reflect" "strings" + "syscall" "testing" "time" @@ -160,6 +161,11 @@ func TestRktDriver_Start_Wait(t *testing.T) { t.Fatalf("err: %v", err) } + // Signal should be an error + if err = handle.Signal(syscall.SIGTERM); err == nil { + t.Fatalf("err: %v", err) + } + select { case res := <-handle.WaitCh(): if !res.Successful() { From 0b65a5f95d35f2e39e288daa4e0f87e6890b51c9 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 10 Oct 2016 11:46:27 -0700 Subject: [PATCH 3/5] Executor + Java/Raw Exec/Exec --- client/driver/exec.go | 2 +- client/driver/exec_test.go | 75 +++++++++++++++++++ client/driver/executor/executor.go | 21 ++++++ client/driver/executor_plugin.go | 12 +++ client/driver/java.go | 2 +- client/driver/java_test.go | 59 +++++++++++++++ client/driver/raw_exec.go | 2 +- client/driver/raw_exec_test.go | 75 +++++++++++++++++++ .../driver/test-resources/java/demoapp.java | 2 +- 9 files changed, 246 insertions(+), 4 deletions(-) diff --git a/client/driver/exec.go b/client/driver/exec.go index db1301c88..1d3fded29 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -260,7 +260,7 @@ func (h *execHandle) Update(task *structs.Task) error { } func (h *execHandle) Signal(s os.Signal) error { - return nil + return h.executor.Signal(s) } func (h *execHandle) Kill() error { diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index f2e72637f..125ff1de6 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -292,6 +292,81 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { } } +func TestExecDriver_Signal(t *testing.T) { + ctestutils.ExecCompatible(t) + task := &structs.Task{ + Name: "signal", + Config: map[string]interface{}{ + "command": "/bin/bash", + "args": []string{"test.sh"}, + }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, + Resources: basicResources, + KillTimeout: 10 * time.Second, + } + + driverCtx, execCtx := testDriverContexts(task) + defer execCtx.AllocDir.Destroy() + d := NewExecDriver(driverCtx) + + testFile := filepath.Join(execCtx.AllocDir.TaskDirs["signal"], "test.sh") + testData := []byte(` +at_term() { + echo 'Terminated.' + exit 3 +} +trap at_term USR1 +while true; do + sleep 1 +done + `) + if err := ioutil.WriteFile(testFile, testData, 0777); err != nil { + fmt.Errorf("Failed to write data") + } + + handle, err := d.Start(execCtx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + go func() { + time.Sleep(100 * time.Millisecond) + err := handle.Signal(syscall.SIGUSR1) + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Task should terminate quickly + select { + case res := <-handle.WaitCh(): + if res.Successful() { + t.Fatal("should err") + } + case <-time.After(time.Duration(testutil.TestMultiplier()*6) * time.Second): + t.Fatalf("timeout") + } + + // Check the log file to see it exited because of the signal + outputFile := filepath.Join(execCtx.AllocDir.LogDir(), "signal.stdout.0") + act, err := ioutil.ReadFile(outputFile) + if err != nil { + t.Fatalf("Couldn't read expected output: %v", err) + } + + exp := "Terminated." + if strings.TrimSpace(string(act)) != exp { + t.Logf("Read from %v", outputFile) + t.Fatalf("Command outputted %v; want %v", act, exp) + } +} + func TestExecDriverUser(t *testing.T) { ctestutils.ExecCompatible(t) task := &structs.Task{ diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 9341f39ce..de99933d7 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -59,6 +59,7 @@ type Executor interface { DeregisterServices() error Version() (*ExecutorVersion, error) Stats() (*cstructs.TaskResourceUsage, error) + Signal(s os.Signal) error } // ConsulContext holds context to configure the Consul client and run checks @@ -396,6 +397,10 @@ func (e *UniversalExecutor) wait() { e.exitState = &ProcessState{Pid: 0, ExitCode: 0, IsolationConfig: ic, Time: time.Now()} return } + + e.lre.Close() + e.lro.Close() + exitCode := 1 var signal int if exitErr, ok := err.(*exec.ExitError); ok { @@ -856,3 +861,19 @@ func (e *UniversalExecutor) aggregatedResourceUsage(pidStats map[string]*cstruct Pids: pidStats, } } + +// Signal sends the passed signal to the task +func (e *UniversalExecutor) Signal(s os.Signal) error { + if e.cmd.Process == nil { + return fmt.Errorf("Task not yet run") + } + + e.logger.Printf("[DEBUG] executor: sending signal %s", s) + err := e.cmd.Process.Signal(s) + if err != nil { + e.logger.Printf("[ERR] executor: sending signal %s failed: %v", err) + return err + } + + return nil +} diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index 1fc9d7e45..526abcbf4 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -4,6 +4,8 @@ import ( "encoding/gob" "log" "net/rpc" + "os" + "syscall" "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/driver/executor" @@ -18,6 +20,8 @@ func init() { gob.Register(map[string]interface{}{}) gob.Register([]map[string]string{}) gob.Register([]map[string]int{}) + gob.Register(new(os.Signal)) + gob.Register(syscall.Signal(0x1)) } type ExecutorRPC struct { @@ -95,6 +99,10 @@ func (e *ExecutorRPC) Stats() (*cstructs.TaskResourceUsage, error) { return &resourceUsage, err } +func (e *ExecutorRPC) Signal(s os.Signal) error { + return e.client.Call("Plugin.Signal", &s, new(interface{})) +} + type ExecutorRPCServer struct { Impl executor.Executor logger *log.Logger @@ -164,6 +172,10 @@ func (e *ExecutorRPCServer) Stats(args interface{}, resourceUsage *cstructs.Task return err } +func (e *ExecutorRPCServer) Signal(args os.Signal, resp *interface{}) error { + return e.Impl.Signal(args) +} + type ExecutorPlugin struct { logger *log.Logger Impl *ExecutorRPCServer diff --git a/client/driver/java.go b/client/driver/java.go index 2e869cbfe..e61c05839 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -360,7 +360,7 @@ func (h *javaHandle) Update(task *structs.Task) error { } func (h *javaHandle) Signal(s os.Signal) error { - return nil + return h.executor.Signal(s) } func (h *javaHandle) Kill() error { diff --git a/client/driver/java_test.go b/client/driver/java_test.go index 40e7b46c6..466589715 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "runtime" "strings" + "syscall" "testing" "time" @@ -235,6 +236,64 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { } } +func TestJavaDriver_Signal(t *testing.T) { + if !javaLocated() { + t.Skip("Java not found; skipping") + } + + ctestutils.JavaCompatible(t) + task := &structs.Task{ + Name: "demo-app", + Config: map[string]interface{}{ + "jar_path": "demoapp.jar", + }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, + Resources: basicResources, + } + + driverCtx, execCtx := testDriverContexts(task) + defer execCtx.AllocDir.Destroy() + d := NewJavaDriver(driverCtx) + + // Copy the test jar into the task's directory + dst, _ := execCtx.AllocDir.TaskDirs[task.Name] + copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + + handle, err := d.Start(execCtx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + go func() { + time.Sleep(100 * time.Millisecond) + err := handle.Signal(syscall.SIGHUP) + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Task should terminate quickly + select { + case res := <-handle.WaitCh(): + if res.Successful() { + t.Fatal("should err") + } + case <-time.After(time.Duration(testutil.TestMultiplier()*10) * time.Second): + t.Fatalf("timeout") + + // Need to kill long lived process + if err = handle.Kill(); err != nil { + t.Fatalf("Error: %s", err) + } + } +} + func TestJavaDriverUser(t *testing.T) { if !javaLocated() { t.Skip("Java not found; skipping") diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 2995b964e..4bae42b6a 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -257,7 +257,7 @@ func (h *rawExecHandle) Update(task *structs.Task) error { } func (h *rawExecHandle) Signal(s os.Signal) error { - return nil + return h.executor.Signal(s) } func (h *rawExecHandle) Kill() error { diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index 17023393f..0103a7007 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "reflect" "strings" + "syscall" "testing" "time" @@ -277,3 +278,77 @@ func TestRawExecDriverUser(t *testing.T) { t.Fatalf("Expecting '%v' in '%v'", msg, err) } } + +func TestRawExecDriver_Signal(t *testing.T) { + task := &structs.Task{ + Name: "signal", + Config: map[string]interface{}{ + "command": "/bin/bash", + "args": []string{"test.sh"}, + }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, + Resources: basicResources, + KillTimeout: 10 * time.Second, + } + + driverCtx, execCtx := testDriverContexts(task) + defer execCtx.AllocDir.Destroy() + d := NewExecDriver(driverCtx) + + testFile := filepath.Join(execCtx.AllocDir.TaskDirs["signal"], "test.sh") + testData := []byte(` +at_term() { + echo 'Terminated.' + exit 3 +} +trap at_term USR1 +while true; do + sleep 1 +done + `) + if err := ioutil.WriteFile(testFile, testData, 0777); err != nil { + fmt.Errorf("Failed to write data") + } + + handle, err := d.Start(execCtx, task) + if err != nil { + t.Fatalf("err: %v", err) + } + if handle == nil { + t.Fatalf("missing handle") + } + + go func() { + time.Sleep(100 * time.Millisecond) + err := handle.Signal(syscall.SIGUSR1) + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Task should terminate quickly + select { + case res := <-handle.WaitCh(): + if res.Successful() { + t.Fatal("should err") + } + case <-time.After(time.Duration(testutil.TestMultiplier()*6) * time.Second): + t.Fatalf("timeout") + } + + // Check the log file to see it exited because of the signal + outputFile := filepath.Join(execCtx.AllocDir.LogDir(), "signal.stdout.0") + act, err := ioutil.ReadFile(outputFile) + if err != nil { + t.Fatalf("Couldn't read expected output: %v", err) + } + + exp := "Terminated." + if strings.TrimSpace(string(act)) != exp { + t.Logf("Read from %v", outputFile) + t.Fatalf("Command outputted %v; want %v", act, exp) + } +} diff --git a/client/driver/test-resources/java/demoapp.java b/client/driver/test-resources/java/demoapp.java index 66eca76fd..3f168f959 100644 --- a/client/driver/test-resources/java/demoapp.java +++ b/client/driver/test-resources/java/demoapp.java @@ -9,4 +9,4 @@ public class Hello { } } } -} \ No newline at end of file +} From af517b3d37f31905ccb20b349e1d6faa774f9147 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 10 Oct 2016 14:49:37 -0700 Subject: [PATCH 4/5] Task runner sends signals --- api/tasks.go | 2 + client/consul_template.go | 17 +++++++- client/consul_template_test.go | 73 ++++++++++++++++++++++++++------ client/driver/executor_plugin.go | 1 - client/driver/mock_driver.go | 12 +++++- client/task_runner.go | 25 ++++++++--- client/task_runner_test.go | 23 ++++++++++ command/alloc_status.go | 10 ++++- nomad/structs/structs.go | 16 ++++++- 9 files changed, 155 insertions(+), 24 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index 8466c8cb3..aab17ec4d 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -234,6 +234,7 @@ type TaskState struct { } const ( + TaskSetupFailure = "Setup Failure" TaskDriverFailure = "Driver Failure" TaskReceived = "Received" TaskFailedValidation = "Failed Validation" @@ -258,6 +259,7 @@ type TaskEvent struct { Type string Time int64 RestartReason string + SetupError string DriverError string ExitCode int Signal int diff --git a/client/consul_template.go b/client/consul_template.go index f306306ba..08126d218 100644 --- a/client/consul_template.go +++ b/client/consul_template.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/consul-template/manager" "github.com/hashicorp/consul-template/signals" "github.com/hashicorp/consul-template/watch" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/env" "github.com/hashicorp/nomad/nomad/structs" @@ -29,7 +30,7 @@ type TaskHooks interface { Restart(source, reason string) // Signal is used to signal the task - Signal(source, reason string, s os.Signal) + Signal(source, reason string, s os.Signal) error // UnblockStart is used to unblock the starting of the task. This should be // called after prestart work is completed @@ -277,8 +278,20 @@ func (tm *TaskTemplateManager) run() { if restart { tm.hook.Restart("consul-template", "template with change_mode restart re-rendered") } else if len(signals) != 0 { + var mErr multierror.Error for signal := range signals { - tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal]) + err := tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal]) + if err != nil { + multierror.Append(&mErr, err) + } + } + + if err := mErr.ErrorOrNil(); err != nil { + flat := make([]os.Signal, 0, len(signals)) + for signal := range signals { + flat = append(flat, tm.signals[signal]) + } + tm.hook.Kill("consul-template", fmt.Sprintf("Sending signals %v failed: %v", flat, err)) } } } diff --git a/client/consul_template_test.go b/client/consul_template_test.go index 31ea8a7b8..4d04f8093 100644 --- a/client/consul_template_test.go +++ b/client/consul_template_test.go @@ -26,10 +26,14 @@ type MockTaskHooks struct { Signals []os.Signal SignalCh chan struct{} + // SignalError is returned when Signal is called on the mock hook + SignalError error + UnblockCh chan struct{} Unblocked bool KillReason string + KillCh chan struct{} } func NewMockTaskHooks() *MockTaskHooks { @@ -37,6 +41,7 @@ func NewMockTaskHooks() *MockTaskHooks { UnblockCh: make(chan struct{}, 1), RestartCh: make(chan struct{}, 1), SignalCh: make(chan struct{}, 1), + KillCh: make(chan struct{}, 1), } } func (m *MockTaskHooks) Restart(source, reason string) { @@ -47,15 +52,24 @@ func (m *MockTaskHooks) Restart(source, reason string) { } } -func (m *MockTaskHooks) Signal(source, reason string, s os.Signal) { +func (m *MockTaskHooks) Signal(source, reason string, s os.Signal) error { m.Signals = append(m.Signals, s) select { case m.SignalCh <- struct{}{}: default: } + + return m.SignalError +} + +func (m *MockTaskHooks) Kill(source, reason string) { + m.KillReason = reason + select { + case m.KillCh <- struct{}{}: + default: + } } -func (m *MockTaskHooks) Kill(source, reason string) { m.KillReason = reason } func (m *MockTaskHooks) UnblockStart(source string) { if !m.Unblocked { close(m.UnblockCh) @@ -673,17 +687,13 @@ func TestTaskTemplateManager_AllRendered_Signal(t *testing.T) { harness.consul.SetKV(key1, []byte(content1_1)) // Wait for restart - timeout := time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second) -OUTER: - for { - select { - case <-harness.mockHooks.RestartCh: - t.Fatalf("Restart with signal policy: %+v", harness.mockHooks) - case <-harness.mockHooks.SignalCh: - break OUTER - case <-timeout: - t.Fatalf("Should have received a signals: %+v", harness.mockHooks) - } + select { + case <-harness.mockHooks.RestartCh: + t.Fatalf("Restart with signal policy: %+v", harness.mockHooks) + case <-harness.mockHooks.SignalCh: + break + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Should have received a signals: %+v", harness.mockHooks) } // Check the files have been updated @@ -697,3 +707,40 @@ OUTER: t.Fatalf("Unexpected template data; got %q, want %q", s, content1_1) } } + +func TestTaskTemplateManager_Signal_Error(t *testing.T) { + // Make a template that renders based on a key in Consul and sends SIGALRM + key1 := "foo" + content1_1 := "bar" + embedded1 := fmt.Sprintf(`{{key "%s"}}`, key1) + file1 := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: embedded1, + DestPath: file1, + ChangeMode: structs.TemplateChangeModeSignal, + ChangeSignal: "SIGALRM", + } + + // Drop the retry rate + testRetryRate = 10 * time.Millisecond + + harness := newTestHarness(t, []*structs.Template{template}, true, true, false) + defer harness.stop() + + harness.mockHooks.SignalError = fmt.Errorf("test error") + + // Write the key to Consul + harness.consul.SetKV(key1, []byte(content1_1)) + + // Wait for kill channel + select { + case <-harness.mockHooks.KillCh: + break + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + t.Fatalf("Should have received a signals: %+v", harness.mockHooks) + } + + if !strings.Contains(harness.mockHooks.KillReason, "Sending signals") { + t.Fatalf("Unexpected error", harness.mockHooks.KillReason) + } +} diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index 526abcbf4..5c6b58be4 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -20,7 +20,6 @@ func init() { gob.Register(map[string]interface{}{}) gob.Register([]map[string]string{}) gob.Register([]map[string]int{}) - gob.Register(new(os.Signal)) gob.Register(syscall.Signal(0x1)) } diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index 7cd1260a5..e1ef41998 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -46,6 +46,9 @@ type MockDriverConfig struct { // ExitErrMsg is the error message that the task returns while exiting ExitErrMsg string `mapstructure:"exit_err_msg"` + + // SignalErr is the error message that the task returns if signalled + SignalErr string `mapstructure:"signal_error"` } // MockDriver is a driver which is used for testing purposes @@ -88,6 +91,9 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, if driverConfig.ExitErrMsg != "" { h.exitErr = errors.New(driverConfig.ExitErrMsg) } + if driverConfig.SignalErr != "" { + h.signalErr = fmt.Errorf(driverConfig.SignalErr) + } m.logger.Printf("[DEBUG] driver.mock: starting task %q", task.Name) go h.run() return &h, nil @@ -113,6 +119,7 @@ type mockDriverHandle struct { exitCode int exitSignal int exitErr error + signalErr error logger *log.Logger waitCh chan *dstructs.WaitResult doneCh chan struct{} @@ -126,6 +133,7 @@ type mockDriverID struct { ExitCode int ExitSignal int ExitErr error + SignalErr error } func (h *mockDriverHandle) ID() string { @@ -137,6 +145,7 @@ func (h *mockDriverHandle) ID() string { ExitCode: h.exitCode, ExitSignal: h.exitSignal, ExitErr: h.exitErr, + SignalErr: h.signalErr, } data, err := json.Marshal(id) @@ -161,6 +170,7 @@ func (m *MockDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro exitCode: id.ExitCode, exitSignal: id.ExitSignal, exitErr: id.ExitErr, + signalErr: id.SignalErr, logger: m.logger, doneCh: make(chan struct{}), waitCh: make(chan *dstructs.WaitResult, 1), @@ -181,7 +191,7 @@ func (h *mockDriverHandle) Update(task *structs.Task) error { // TODO Implement when we need it. func (h *mockDriverHandle) Signal(s os.Signal) error { - return nil + return h.signalErr } // Kill kills a mock task diff --git a/client/task_runner.go b/client/task_runner.go index 4367e8dab..e6880929f 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -115,8 +115,14 @@ type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent) // SignalEvent is a tuple of the signal and the event generating it type SignalEvent struct { + // s is the signal to be sent s os.Signal + + // e is the task event generating the signal e *structs.TaskEvent + + // result should be used to send back the result of the signal + result chan<- error } // NewTaskRunner is used to create a new task context @@ -358,7 +364,7 @@ func (r *TaskRunner) prestart(taskDir string) (success bool) { r.config, r.vaultToken, taskDir, r.taskEnv) if err != nil { err := fmt.Errorf("failed to build task's template manager: %v", err) - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err)) + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err)) r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err) return } @@ -512,7 +518,8 @@ func (r *TaskRunner) run() { r.logger.Printf("[DEBUG] client: task being signalled with %v: %s", se.s, se.e.TaskSignalReason) r.setState(structs.TaskStateRunning, se.e) - // TODO need an interface on the driver + res := r.handle.Signal(se.s) + se.result <- res case event := <-r.restartCh: r.logger.Printf("[DEBUG] client: task being restarted: %s", event.RestartReason) @@ -803,7 +810,7 @@ func (r *TaskRunner) Restart(source, reason string) { } // Signal will send a signal to the task -func (r *TaskRunner) Signal(source, reason string, s os.Signal) { +func (r *TaskRunner) Signal(source, reason string, s os.Signal) error { reasonStr := fmt.Sprintf("%s: %s", source, reason) event := structs.NewTaskEvent(structs.TaskSignaling).SetTaskSignal(s).SetTaskSignalReason(reasonStr) @@ -817,13 +824,21 @@ func (r *TaskRunner) Signal(source, reason string, s os.Signal) { // Drop the restart event if !running { r.logger.Printf("[DEBUG] client: skipping signal since task isn't running") - return + return nil } + resCh := make(chan error) + se := SignalEvent{ + s: s, + e: event, + result: resCh, + } select { - case r.signalCh <- SignalEvent{s: s, e: event}: + case r.signalCh <- se: case <-r.waitCh: } + + return <-resCh } // Kill will kill a task and store the error, no longer restarting the task diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 29d8f0f2d..d28d5ee2e 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "syscall" "testing" "time" @@ -595,3 +596,25 @@ func TestTaskRunner_KillTask(t *testing.T) { t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilled) } } + +func TestTaskRunner_SignalFailure(t *testing.T) { + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "exit_code": "0", + "run_for": "10s", + "signal_error": "test forcing failure", + } + + _, tr := testTaskRunnerFromAlloc(false, alloc) + tr.MarkReceived() + go tr.Run() + defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled)) + defer tr.ctx.AllocDir.Destroy() + + time.Sleep(100 * time.Millisecond) + if err := tr.Signal("test", "test", syscall.SIGINT); err == nil { + t.Fatalf("Didn't receive error") + } +} diff --git a/command/alloc_status.go b/command/alloc_status.go index 4e2de1b9f..566743e48 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -284,6 +284,12 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { } else { desc = "Validation of task failed" } + case api.TaskSetupFailure: + if event.SetupError != "" { + desc = event.SetupError + } else { + desc = "Task setup failed" + } case api.TaskDriverFailure: if event.DriverError != "" { desc = event.DriverError @@ -299,7 +305,9 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { desc = "Failed to download artifacts" } case api.TaskKilling: - if event.KillTimeout != 0 { + if event.KillReason != "" { + desc = fmt.Sprintf("Killing task: %v", event.KillReason) + } else if event.KillTimeout != 0 { desc = fmt.Sprintf("Sent interrupt. Waiting %v before force killing", event.KillTimeout) } else { desc = "Sent interrupt" diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fff8b3e82..842576224 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2339,7 +2339,7 @@ func (ts *TaskState) Failed() bool { switch ts.Events[l-1].Type { case TaskDiskExceeded, TaskNotRestarting, TaskArtifactDownloadFailed, - TaskFailedValidation, TaskVaultRenewalFailed: + TaskFailedValidation, TaskVaultRenewalFailed, TaskSetupFailure: return true default: return false @@ -2362,6 +2362,10 @@ func (ts *TaskState) Successful() bool { } const ( + // TaskSetupFailure indicates that the task could not be started due to a + // a setup failure. + TaskSetupFailure = "Setup Failure" + // TaskDriveFailure indicates that the task could not be started due to a // failure in the driver. TaskDriverFailure = "Driver Failure" @@ -2430,6 +2434,9 @@ type TaskEvent struct { // Restart fields. RestartReason string + // Setup Failure fields. + SetupError string + // Driver Failure fields. DriverError string // A driver error occurred while starting the task. @@ -2496,6 +2503,13 @@ func NewTaskEvent(event string) *TaskEvent { } } +func (e *TaskEvent) SetSetupError(err error) *TaskEvent { + if err != nil { + e.SetupError = err.Error() + } + return e +} + func (e *TaskEvent) SetDriverError(err error) *TaskEvent { if err != nil { e.DriverError = err.Error() From cf85948b12474bb73b98c4ff7b8ee768b22cda3f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 18 Oct 2016 11:23:27 -0700 Subject: [PATCH 5/5] comment --- nomad/structs/structs.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 842576224..c49ee6808 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2503,6 +2503,8 @@ func NewTaskEvent(event string) *TaskEvent { } } +// SetSetupError is used to store an error that occured while setting up the +// task func (e *TaskEvent) SetSetupError(err error) *TaskEvent { if err != nil { e.SetupError = err.Error()