mirror of
https://github.com/kemko/nomad.git
synced 2026-01-11 04:45:42 +03:00
Merge pull request #5325 from hashicorp/test-port-08-2
Do not wrap errors from StartTask as that loses Recoverable
This commit is contained in:
@@ -656,7 +656,9 @@ func (tr *TaskRunner) runDriver() error {
|
||||
return fmt.Errorf("failed to start task after driver exited unexpectedly: %v", err)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("driver start failed: %v", err)
|
||||
// Do *NOT* wrap the error here without maintaining
|
||||
// whether or not is Recoverable.
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -997,6 +998,246 @@ func TestTaskRunner_Download_Retries(t *testing.T) {
|
||||
require.Equal(t, structs.TaskNotRestarting, state.Events[7].Type)
|
||||
}
|
||||
|
||||
// TestTaskRunner_DriverNetwork asserts that a driver's network is properly
|
||||
// used in services and checks.
|
||||
func TestTaskRunner_DriverNetwork(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "100s",
|
||||
"driver_ip": "10.1.2.3",
|
||||
"driver_port_map": "http:80",
|
||||
}
|
||||
|
||||
// Create services and checks with custom address modes to exercise
|
||||
// address detection logic
|
||||
task.Services = []*structs.Service{
|
||||
{
|
||||
Name: "host-service",
|
||||
PortLabel: "http",
|
||||
AddressMode: "host",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "driver-check",
|
||||
Type: "tcp",
|
||||
PortLabel: "1234",
|
||||
AddressMode: "driver",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "driver-service",
|
||||
PortLabel: "5678",
|
||||
AddressMode: "driver",
|
||||
Checks: []*structs.ServiceCheck{
|
||||
{
|
||||
Name: "host-check",
|
||||
Type: "tcp",
|
||||
PortLabel: "http",
|
||||
},
|
||||
{
|
||||
Name: "driver-label-check",
|
||||
Type: "tcp",
|
||||
PortLabel: "http",
|
||||
AddressMode: "driver",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||
defer cleanup()
|
||||
|
||||
// Use a mock agent to test for services
|
||||
consulAgent := agentconsul.NewMockAgent()
|
||||
consulClient := agentconsul.NewServiceClient(consulAgent, conf.Logger, true)
|
||||
defer consulClient.Shutdown()
|
||||
go consulClient.Run()
|
||||
|
||||
conf.Consul = consulClient
|
||||
|
||||
tr, err := NewTaskRunner(conf)
|
||||
require.NoError(t, err)
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
go tr.Run()
|
||||
|
||||
// Wait for the task to start
|
||||
testWaitForTaskToStart(t, tr)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
services, _ := consulAgent.Services()
|
||||
if n := len(services); n != 2 {
|
||||
return false, fmt.Errorf("expected 2 services, but found %d", n)
|
||||
}
|
||||
for _, s := range services {
|
||||
switch s.Service {
|
||||
case "host-service":
|
||||
if expected := "192.168.0.100"; s.Address != expected {
|
||||
return false, fmt.Errorf("expected host-service to have IP=%s but found %s",
|
||||
expected, s.Address)
|
||||
}
|
||||
case "driver-service":
|
||||
if expected := "10.1.2.3"; s.Address != expected {
|
||||
return false, fmt.Errorf("expected driver-service to have IP=%s but found %s",
|
||||
expected, s.Address)
|
||||
}
|
||||
if expected := 5678; s.Port != expected {
|
||||
return false, fmt.Errorf("expected driver-service to have port=%d but found %d",
|
||||
expected, s.Port)
|
||||
}
|
||||
default:
|
||||
return false, fmt.Errorf("unexpected service: %q", s.Service)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
checks := consulAgent.CheckRegs()
|
||||
if n := len(checks); n != 3 {
|
||||
return false, fmt.Errorf("expected 3 checks, but found %d", n)
|
||||
}
|
||||
for _, check := range checks {
|
||||
switch check.Name {
|
||||
case "driver-check":
|
||||
if expected := "10.1.2.3:1234"; check.TCP != expected {
|
||||
return false, fmt.Errorf("expected driver-check to have address %q but found %q", expected, check.TCP)
|
||||
}
|
||||
case "driver-label-check":
|
||||
if expected := "10.1.2.3:80"; check.TCP != expected {
|
||||
return false, fmt.Errorf("expected driver-label-check to have address %q but found %q", expected, check.TCP)
|
||||
}
|
||||
case "host-check":
|
||||
if expected := "192.168.0.100:"; !strings.HasPrefix(check.TCP, expected) {
|
||||
return false, fmt.Errorf("expected host-check to have address start with %q but found %q", expected, check.TCP)
|
||||
}
|
||||
default:
|
||||
return false, fmt.Errorf("unexpected check: %q", check.Name)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
services, _ := consulAgent.Services()
|
||||
for _, s := range services {
|
||||
t.Logf(pretty.Sprint("Service: ", s))
|
||||
}
|
||||
for _, c := range consulAgent.CheckRegs() {
|
||||
t.Logf(pretty.Sprint("Check: ", c))
|
||||
}
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// TestTaskRunner_RestartSignalTask_NotRunning asserts resilience to failures
|
||||
// when a restart or signal is triggered and the task is not running.
|
||||
func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.BatchAlloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "0s",
|
||||
}
|
||||
|
||||
// Use vault to block the start
|
||||
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
||||
|
||||
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||
defer cleanup()
|
||||
|
||||
// Control when we get a Vault token
|
||||
waitCh := make(chan struct{}, 1)
|
||||
defer close(waitCh)
|
||||
handler := func(*structs.Allocation, []string) (map[string]string, error) {
|
||||
<-waitCh
|
||||
return map[string]string{task.Name: "1234"}, nil
|
||||
}
|
||||
vaultClient := conf.Vault.(*vaultclient.MockVaultClient)
|
||||
vaultClient.DeriveTokenFn = handler
|
||||
|
||||
tr, err := NewTaskRunner(conf)
|
||||
require.NoError(t, err)
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
go tr.Run()
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
require.Fail(t, "unexpected exit")
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
|
||||
// Send a signal and restart
|
||||
err = tr.Signal(structs.NewTaskEvent("don't panic"), "QUIT")
|
||||
require.EqualError(t, err, ErrTaskNotRunning.Error())
|
||||
|
||||
// Send a restart
|
||||
err = tr.Restart(context.Background(), structs.NewTaskEvent("don't panic"), false)
|
||||
require.EqualError(t, err, ErrTaskNotRunning.Error())
|
||||
|
||||
// Unblock and let it finish
|
||||
waitCh <- struct{}{}
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
case <-time.After(10 * time.Second):
|
||||
require.Fail(t, "timed out waiting for task to complete")
|
||||
}
|
||||
|
||||
// Assert the task ran and never restarted
|
||||
state := tr.TaskState()
|
||||
require.Equal(t, structs.TaskStateDead, state.State)
|
||||
require.False(t, state.Failed)
|
||||
require.Len(t, state.Events, 4, pretty.Sprint(state.Events))
|
||||
require.Equal(t, structs.TaskReceived, state.Events[0].Type)
|
||||
require.Equal(t, structs.TaskSetup, state.Events[1].Type)
|
||||
require.Equal(t, structs.TaskStarted, state.Events[2].Type)
|
||||
require.Equal(t, structs.TaskTerminated, state.Events[3].Type)
|
||||
}
|
||||
|
||||
// TestTaskRunner_Run_RecoverableStartError asserts tasks are restarted if they
|
||||
// return a recoverable error from StartTask.
|
||||
func TestTaskRunner_Run_RecoverableStartError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.BatchAlloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Config = map[string]interface{}{
|
||||
"start_error": "driver failure",
|
||||
"start_error_recoverable": true,
|
||||
}
|
||||
|
||||
// Make the restart policy retry once
|
||||
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
|
||||
Attempts: 1,
|
||||
Interval: 10 * time.Minute,
|
||||
Delay: 0,
|
||||
Mode: structs.RestartPolicyModeFail,
|
||||
}
|
||||
|
||||
tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name)
|
||||
defer cleanup()
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||
require.Fail(t, "timed out waiting for task to exit")
|
||||
}
|
||||
|
||||
state := tr.TaskState()
|
||||
require.Equal(t, structs.TaskStateDead, state.State)
|
||||
require.True(t, state.Failed)
|
||||
require.Len(t, state.Events, 6, pretty.Sprint(state.Events))
|
||||
require.Equal(t, structs.TaskReceived, state.Events[0].Type)
|
||||
require.Equal(t, structs.TaskSetup, state.Events[1].Type)
|
||||
require.Equal(t, structs.TaskDriverFailure, state.Events[2].Type)
|
||||
require.Equal(t, structs.TaskRestarting, state.Events[3].Type)
|
||||
require.Equal(t, structs.TaskDriverFailure, state.Events[4].Type)
|
||||
require.Equal(t, structs.TaskNotRestarting, state.Events[5].Type)
|
||||
}
|
||||
|
||||
// testWaitForTaskToStart waits for the task to be running or fails the test
|
||||
func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
|
||||
Reference in New Issue
Block a user