From d4a17ae71fdd284d02ac9a665314e195a93762b9 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 13 Feb 2019 14:51:05 -0800 Subject: [PATCH 1/3] test: port TestTaskRunner_DriverNetwork from 0.8 --- .../taskrunner/task_runner_test.go | 133 ++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 19c9b264d..2a43f12c8 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "strings" "testing" "time" @@ -997,6 +998,138 @@ func TestTaskRunner_Download_Retries(t *testing.T) { require.Equal(t, structs.TaskNotRestarting, state.Events[7].Type) } +// TestTaskRunner_DriverNetwork asserts that a driver's network is properly +// used in services and checks. +func TestTaskRunner_DriverNetwork(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "100s", + "driver_ip": "10.1.2.3", + "driver_port_map": "http:80", + } + + // Create services and checks with custom address modes to exercise + // address detection logic + task.Services = []*structs.Service{ + { + Name: "host-service", + PortLabel: "http", + AddressMode: "host", + Checks: []*structs.ServiceCheck{ + { + Name: "driver-check", + Type: "tcp", + PortLabel: "1234", + AddressMode: "driver", + }, + }, + }, + { + Name: "driver-service", + PortLabel: "5678", + AddressMode: "driver", + Checks: []*structs.ServiceCheck{ + { + Name: "host-check", + Type: "tcp", + PortLabel: "http", + }, + { + Name: "driver-label-check", + Type: "tcp", + PortLabel: "http", + AddressMode: "driver", + }, + }, + }, + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + // Use a mock agent to test for services + consulAgent := agentconsul.NewMockAgent() + consulClient := agentconsul.NewServiceClient(consulAgent, conf.Logger, true) + defer consulClient.Shutdown() + go consulClient.Run() + + conf.Consul = consulClient + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + go tr.Run() + + // Wait for the task to start + testWaitForTaskToStart(t, tr) + + testutil.WaitForResult(func() (bool, error) { + services, _ := consulAgent.Services() + if n := len(services); n != 2 { + return false, fmt.Errorf("expected 2 services, but found %d", n) + } + for _, s := range services { + switch s.Service { + case "host-service": + if expected := "192.168.0.100"; s.Address != expected { + return false, fmt.Errorf("expected host-service to have IP=%s but found %s", + expected, s.Address) + } + case "driver-service": + if expected := "10.1.2.3"; s.Address != expected { + return false, fmt.Errorf("expected driver-service to have IP=%s but found %s", + expected, s.Address) + } + if expected := 5678; s.Port != expected { + return false, fmt.Errorf("expected driver-service to have port=%d but found %d", + expected, s.Port) + } + default: + return false, fmt.Errorf("unexpected service: %q", s.Service) + } + + } + + checks := consulAgent.CheckRegs() + if n := len(checks); n != 3 { + return false, fmt.Errorf("expected 3 checks, but found %d", n) + } + for _, check := range checks { + switch check.Name { + case "driver-check": + if expected := "10.1.2.3:1234"; check.TCP != expected { + return false, fmt.Errorf("expected driver-check to have address %q but found %q", expected, check.TCP) + } + case "driver-label-check": + if expected := "10.1.2.3:80"; check.TCP != expected { + return false, fmt.Errorf("expected driver-label-check to have address %q but found %q", expected, check.TCP) + } + case "host-check": + if expected := "192.168.0.100:"; !strings.HasPrefix(check.TCP, expected) { + return false, fmt.Errorf("expected host-check to have address start with %q but found %q", expected, check.TCP) + } + default: + return false, fmt.Errorf("unexpected check: %q", check.Name) + } + } + + return true, nil + }, func(err error) { + services, _ := consulAgent.Services() + for _, s := range services { + t.Logf(pretty.Sprint("Service: ", s)) + } + for _, c := range consulAgent.CheckRegs() { + t.Logf(pretty.Sprint("Check: ", c)) + } + require.NoError(t, err) + }) +} + // testWaitForTaskToStart waits for the task to be running or fails the test func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) { From 414532adab23aa84a20f3fee8c46c256ac767d27 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 13 Feb 2019 15:19:33 -0800 Subject: [PATCH 2/3] test: port TestTaskRunner_RestartSignalTask_NotRunning from 0.8 --- .../taskrunner/task_runner_test.go | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 2a43f12c8..3fe650070 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -1130,6 +1130,73 @@ func TestTaskRunner_DriverNetwork(t *testing.T) { }) } +// TestTaskRunner_RestartSignalTask_NotRunning asserts resilience to failures +// when a restart or signal is triggered and the task is not running. +func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "0s", + } + + // Use vault to block the start + task.Vault = &structs.Vault{Policies: []string{"default"}} + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + // Control when we get a Vault token + waitCh := make(chan struct{}, 1) + defer close(waitCh) + handler := func(*structs.Allocation, []string) (map[string]string, error) { + <-waitCh + return map[string]string{task.Name: "1234"}, nil + } + vaultClient := conf.Vault.(*vaultclient.MockVaultClient) + vaultClient.DeriveTokenFn = handler + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + go tr.Run() + + select { + case <-tr.WaitCh(): + require.Fail(t, "unexpected exit") + case <-time.After(1 * time.Second): + } + + // Send a signal and restart + err = tr.Signal(structs.NewTaskEvent("don't panic"), "QUIT") + require.EqualError(t, err, ErrTaskNotRunning.Error()) + + // Send a restart + err = tr.Restart(context.Background(), structs.NewTaskEvent("don't panic"), false) + require.EqualError(t, err, ErrTaskNotRunning.Error()) + + // Unblock and let it finish + waitCh <- struct{}{} + + select { + case <-tr.WaitCh(): + case <-time.After(10 * time.Second): + require.Fail(t, "timed out waiting for task to complete") + } + + // Assert the task ran and never restarted + state := tr.TaskState() + require.Equal(t, structs.TaskStateDead, state.State) + require.False(t, state.Failed) + require.Len(t, state.Events, 4, pretty.Sprint(state.Events)) + require.Equal(t, structs.TaskReceived, state.Events[0].Type) + require.Equal(t, structs.TaskSetup, state.Events[1].Type) + require.Equal(t, structs.TaskStarted, state.Events[2].Type) + require.Equal(t, structs.TaskTerminated, state.Events[3].Type) +} + // testWaitForTaskToStart waits for the task to be running or fails the test func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) { From cf66e25e57e41260305ab9876fd17df56117759f Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 13 Feb 2019 15:34:17 -0800 Subject: [PATCH 3/3] client: restart on recoverable StartTask errors Fixes restarting on recoverable errors from StartTask. Ports TestTaskRunner_Run_RecoverableStartError from 0.8 which discovered the bug. --- client/allocrunner/taskrunner/task_runner.go | 4 +- .../taskrunner/task_runner_test.go | 41 +++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 38fb488db..10a7fac5e 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -656,7 +656,9 @@ func (tr *TaskRunner) runDriver() error { return fmt.Errorf("failed to start task after driver exited unexpectedly: %v", err) } } else { - return fmt.Errorf("driver start failed: %v", err) + // Do *NOT* wrap the error here without maintaining + // whether or not is Recoverable. + return err } } diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 3fe650070..2dedc011f 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -1197,6 +1197,47 @@ func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) { require.Equal(t, structs.TaskTerminated, state.Events[3].Type) } +// TestTaskRunner_Run_RecoverableStartError asserts tasks are restarted if they +// return a recoverable error from StartTask. +func TestTaskRunner_Run_RecoverableStartError(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Config = map[string]interface{}{ + "start_error": "driver failure", + "start_error_recoverable": true, + } + + // Make the restart policy retry once + alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{ + Attempts: 1, + Interval: 10 * time.Minute, + Delay: 0, + Mode: structs.RestartPolicyModeFail, + } + + tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) + defer cleanup() + + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + require.Fail(t, "timed out waiting for task to exit") + } + + state := tr.TaskState() + require.Equal(t, structs.TaskStateDead, state.State) + require.True(t, state.Failed) + require.Len(t, state.Events, 6, pretty.Sprint(state.Events)) + require.Equal(t, structs.TaskReceived, state.Events[0].Type) + require.Equal(t, structs.TaskSetup, state.Events[1].Type) + require.Equal(t, structs.TaskDriverFailure, state.Events[2].Type) + require.Equal(t, structs.TaskRestarting, state.Events[3].Type) + require.Equal(t, structs.TaskDriverFailure, state.Events[4].Type) + require.Equal(t, structs.TaskNotRestarting, state.Events[5].Type) +} + // testWaitForTaskToStart waits for the task to be running or fails the test func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) {