mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 01:15:43 +03:00
client: artifact errors are retry-able
0.9.0beta2 contains a regression where artifact download errors would not cause a task restart and instead immediately fail the task. This restores the pre-0.9 behavior of retrying all artifact errors and adds missing tests.
This commit is contained in:
@@ -42,7 +42,10 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar
|
||||
for _, artifact := range req.Task.Artifacts {
|
||||
//XXX add ctx to GetArtifact to allow cancelling long downloads
|
||||
if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir.Dir); err != nil {
|
||||
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
|
||||
wrapped := structs.NewRecoverableError(
|
||||
fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err),
|
||||
true,
|
||||
)
|
||||
herr := NewHookError(wrapped, structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped))
|
||||
|
||||
return herr
|
||||
|
||||
57
client/allocrunner/taskrunner/artifact_hook_test.go
Normal file
57
client/allocrunner/taskrunner/artifact_hook_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/taskenv"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Statically assert the artifact hook implements the expected interface
|
||||
var _ interfaces.TaskPrestartHook = (*artifactHook)(nil)
|
||||
|
||||
type mockEmitter struct {
|
||||
events []*structs.TaskEvent
|
||||
}
|
||||
|
||||
func (m *mockEmitter) EmitEvent(ev *structs.TaskEvent) {
|
||||
m.events = append(m.events, ev)
|
||||
}
|
||||
|
||||
// TestTaskRunner_ArtifactHook_Recoverable asserts that failures to download
|
||||
// artifacts are a recoverable error.
|
||||
func TestTaskRunner_ArtifactHook_Recoverable(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
me := &mockEmitter{}
|
||||
artifactHook := newArtifactHook(me, testlog.HCLogger(t))
|
||||
|
||||
req := &interfaces.TaskPrestartRequest{
|
||||
TaskEnv: taskenv.NewEmptyTaskEnv(),
|
||||
TaskDir: &allocdir.TaskDir{Dir: os.TempDir()},
|
||||
Task: &structs.Task{
|
||||
Artifacts: []*structs.TaskArtifact{
|
||||
{
|
||||
GetterSource: "http://127.0.0.1:0",
|
||||
GetterMode: structs.GetterModeAny,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
resp := interfaces.TaskPrestartResponse{}
|
||||
|
||||
err := artifactHook.Prestart(context.Background(), req, &resp)
|
||||
|
||||
require.False(t, resp.Done)
|
||||
require.NotNil(t, err)
|
||||
require.True(t, structs.IsRecoverable(err))
|
||||
require.Len(t, me.events, 1)
|
||||
require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type)
|
||||
}
|
||||
@@ -14,10 +14,7 @@ var (
|
||||
ErrTaskNotRunning = errors.New(errTaskNotRunning)
|
||||
)
|
||||
|
||||
// NewHookError returns an implementation of a HookError with an underlying err
|
||||
// and a pre-formatted task event.
|
||||
// If the taskEvent is nil, then we won't attempt to generate one during error
|
||||
// handling.
|
||||
// NewHookError contains an underlying err and a pre-formatted task event.
|
||||
func NewHookError(err error, taskEvent *structs.TaskEvent) error {
|
||||
return &hookError{
|
||||
err: err,
|
||||
@@ -33,3 +30,8 @@ type hookError struct {
|
||||
func (h *hookError) Error() string {
|
||||
return h.err.Error()
|
||||
}
|
||||
|
||||
// Recoverable is true if the underlying error is recoverable.
|
||||
func (h *hookError) IsRecoverable() bool {
|
||||
return structs.IsRecoverable(h.err)
|
||||
}
|
||||
|
||||
52
client/allocrunner/taskrunner/errors_test.go
Normal file
52
client/allocrunner/taskrunner/errors_test.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Statically assert error implements the expected interfaces
|
||||
var _ structs.Recoverable = (*hookError)(nil)
|
||||
|
||||
// TestHookError_Recoverable asserts that a NewHookError is recoverable if
|
||||
// passed a recoverable error.
|
||||
func TestHookError_Recoverable(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create root error
|
||||
root := errors.New("test error")
|
||||
|
||||
// Make it recoverable
|
||||
recov := structs.NewRecoverableError(root, true)
|
||||
|
||||
// Create a fake task event
|
||||
ev := structs.NewTaskEvent("test event")
|
||||
|
||||
herr := NewHookError(recov, ev)
|
||||
|
||||
require.Equal(t, ev, herr.(*hookError).taskEvent)
|
||||
require.True(t, structs.IsRecoverable(herr))
|
||||
require.Equal(t, root.Error(), herr.Error())
|
||||
require.Equal(t, recov.Error(), herr.Error())
|
||||
}
|
||||
|
||||
// TestHookError_Unrecoverable asserts that a NewHookError is not recoverable
|
||||
// unless it is passed a recoverable error.
|
||||
func TestHookError_Unrecoverable(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create error
|
||||
err := errors.New("test error")
|
||||
|
||||
// Create a fake task event
|
||||
ev := structs.NewTaskEvent("test event")
|
||||
|
||||
herr := NewHookError(err, ev)
|
||||
|
||||
require.Equal(t, ev, herr.(*hookError).taskEvent)
|
||||
require.False(t, structs.IsRecoverable(herr))
|
||||
require.Equal(t, err.Error(), herr.Error())
|
||||
}
|
||||
@@ -113,11 +113,7 @@ func (tr *TaskRunner) emitHookError(err error, hookName string) {
|
||||
taskEvent = structs.NewTaskEvent(structs.TaskHookFailed).SetMessage(message)
|
||||
}
|
||||
|
||||
// The TaskEvent returned by a HookError may be nil if the hook chooses to opt
|
||||
// out of sending a task event.
|
||||
if taskEvent != nil {
|
||||
tr.EmitEvent(taskEvent)
|
||||
}
|
||||
tr.EmitEvent(taskEvent)
|
||||
}
|
||||
|
||||
// prestart is used to run the runners prestart hooks.
|
||||
|
||||
@@ -953,6 +953,50 @@ func TestTaskRunner_Download_List(t *testing.T) {
|
||||
require.NoErrorf(t, err, "%v not downloaded", f2)
|
||||
}
|
||||
|
||||
// TestTaskRunner_Download_Retries asserts that failed artifact downloads are
|
||||
// retried according to the task's restart policy.
|
||||
func TestTaskRunner_Download_Retries(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create an allocation that has a task with bad artifacts.
|
||||
alloc := mock.BatchAlloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
artifact := structs.TaskArtifact{
|
||||
GetterSource: "http://127.0.0.1:0/foo/bar/baz",
|
||||
}
|
||||
task.Artifacts = []*structs.TaskArtifact{&artifact}
|
||||
|
||||
// Make the restart policy retry once
|
||||
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
|
||||
Attempts: 1,
|
||||
Interval: 10 * time.Minute,
|
||||
Delay: 1 * time.Second,
|
||||
Mode: structs.RestartPolicyModeFail,
|
||||
}
|
||||
|
||||
tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name)
|
||||
defer cleanup()
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||
require.Fail(t, "timed out waiting for task to exit")
|
||||
}
|
||||
|
||||
state := tr.TaskState()
|
||||
require.Equal(t, structs.TaskStateDead, state.State)
|
||||
require.True(t, state.Failed)
|
||||
require.Len(t, state.Events, 8, pretty.Sprint(state.Events))
|
||||
require.Equal(t, structs.TaskReceived, state.Events[0].Type)
|
||||
require.Equal(t, structs.TaskSetup, state.Events[1].Type)
|
||||
require.Equal(t, structs.TaskDownloadingArtifacts, state.Events[2].Type)
|
||||
require.Equal(t, structs.TaskArtifactDownloadFailed, state.Events[3].Type)
|
||||
require.Equal(t, structs.TaskRestarting, state.Events[4].Type)
|
||||
require.Equal(t, structs.TaskDownloadingArtifacts, state.Events[5].Type)
|
||||
require.Equal(t, structs.TaskArtifactDownloadFailed, state.Events[6].Type)
|
||||
require.Equal(t, structs.TaskNotRestarting, state.Events[7].Type)
|
||||
}
|
||||
|
||||
// testWaitForTaskToStart waits for the task to be running or fails the test
|
||||
func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
|
||||
Reference in New Issue
Block a user