From 6c01ae4ae239ddfa745d262da73a58342f74278e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 4 Jan 2019 15:19:00 -0800 Subject: [PATCH 1/2] e2e: add task events tests --- e2e/e2e_test.go | 1 + e2e/taskevents/input/completed_leader.nomad | 28 ++++ e2e/taskevents/input/failed_batch.nomad | 17 ++ e2e/taskevents/input/simple_batch.nomad | 12 ++ e2e/taskevents/taskevents.go | 167 ++++++++++++++++++++ 5 files changed, 225 insertions(+) create mode 100644 e2e/taskevents/input/completed_leader.nomad create mode 100644 e2e/taskevents/input/failed_batch.nomad create mode 100644 e2e/taskevents/input/simple_batch.nomad create mode 100644 e2e/taskevents/taskevents.go diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 0b9a3e142..f6588ed17 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -7,6 +7,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/consultemplate" _ "github.com/hashicorp/nomad/e2e/example" _ "github.com/hashicorp/nomad/e2e/spread" + _ "github.com/hashicorp/nomad/e2e/taskevents" ) func TestE2E(t *testing.T) { diff --git a/e2e/taskevents/input/completed_leader.nomad b/e2e/taskevents/input/completed_leader.nomad new file mode 100644 index 000000000..7b3d9a552 --- /dev/null +++ b/e2e/taskevents/input/completed_leader.nomad @@ -0,0 +1,28 @@ +job "completed_leader" { + type = "batch" + datacenters = ["dc1"] + + group "completed_leader" { + restart { + attempts = 0 + } + + # Only the task named the same as the job has its events tested. + task "completed_leader" { + driver = "raw_exec" + config { + command = "sleep" + args = ["1000"] + } + } + + task "leader" { + leader = true + driver = "raw_exec" + config { + command = "sleep" + args = ["1"] + } + } + } +} diff --git a/e2e/taskevents/input/failed_batch.nomad b/e2e/taskevents/input/failed_batch.nomad new file mode 100644 index 000000000..ae5d1f122 --- /dev/null +++ b/e2e/taskevents/input/failed_batch.nomad @@ -0,0 +1,17 @@ +job "failed_batch" { + type = "batch" + datacenters = ["dc1"] + + group "failed_batch" { + restart { + attempts = 0 + } + + task "failed_batch" { + driver = "raw_exec" + config { + command = "SomeInvalidCommand" + } + } + } +} diff --git a/e2e/taskevents/input/simple_batch.nomad b/e2e/taskevents/input/simple_batch.nomad new file mode 100644 index 000000000..333084f57 --- /dev/null +++ b/e2e/taskevents/input/simple_batch.nomad @@ -0,0 +1,12 @@ +job "simple_batch" { + type = "batch" + datacenters = ["dc1"] + + task "simple_batch" { + driver = "raw_exec" + config { + command = "sleep" + args = ["1"] + } + } +} diff --git a/e2e/taskevents/taskevents.go b/e2e/taskevents/taskevents.go new file mode 100644 index 000000000..c59615a0e --- /dev/null +++ b/e2e/taskevents/taskevents.go @@ -0,0 +1,167 @@ +package taskevents + +import ( + "fmt" + "strings" + "time" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/helper/uuid" +) + +type TaskEventsTest struct { + framework.TC + jobIds []string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "TaskEvents", + CanRunLocal: true, + Cases: []framework.TestCase{ + new(TaskEventsTest), + }, + }) +} + +func (tc *TaskEventsTest) BeforeAll(f *framework.F) { + e2eutil.WaitForLeader(f.T(), tc.Nomad()) + e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 1) +} + +func (tc *TaskEventsTest) AfterEach(f *framework.F) { + nomadClient := tc.Nomad() + jobs := nomadClient.Jobs() + // Stop all jobs in test + for _, id := range tc.jobIds { + jobs.Deregister(id, true, nil) + } + // Garbage collect + nomadClient.System().GarbageCollect() +} + +func formatEvents(events []*api.TaskEvent) string { + estrs := make([]string, len(events)) + for i, e := range events { + estrs[i] = fmt.Sprintf("%2d %-20s fail=%t msg=> %s", i, e.Type, e.FailsTask, e.DisplayMessage) + } + return strings.Join(estrs, "\n") +} + +// waitUntilEvents submits a job and then waits until the expected number of +// events exist. +// +// The job name is used to load the job file from "input/${job}.nomad", and +// events are only inspected for tasks named the same as the job. +func (tc *TaskEventsTest) waitUntilEvents(f *framework.F, jobName string, numEvents int) *api.TaskState { + t := f.T() + nomadClient := tc.Nomad() + uuid := uuid.Generate() + uniqJobId := jobName + uuid[0:8] + tc.jobIds = append(tc.jobIds, uniqJobId) + + jobFile := fmt.Sprintf("taskevents/input/%s.nomad", jobName) + allocs := e2eutil.RegisterAndWaitForAllocs(f, nomadClient, jobFile, uniqJobId) + + require.Len(t, allocs, 1) + alloc := allocs[0] + qo := &api.QueryOptions{ + WaitTime: time.Second, + } + + // Capture state outside of wait to ease assertions once expected + // number of events have been received. + var taskState *api.TaskState + + testutil.WaitForResultRetries(10, func() (bool, error) { + a, meta, err := nomadClient.Allocations().Info(alloc.ID, qo) + if err != nil { + return false, err + } + + qo.WaitIndex = meta.LastIndex + + // Capture task state + taskState = a.TaskStates[jobName] + if taskState == nil { + return false, fmt.Errorf("task state not found for %s", jobName) + } + + // Assert expected number of task events + if len(taskState.Events) != numEvents { + return false, fmt.Errorf("expected %d task events but found %d\n%s", + numEvents, len(taskState.Events), formatEvents(taskState.Events), + ) + } + + return true, nil + }, func(err error) { + t.Fatalf("task events error: %v", err) + }) + + return taskState +} + +func (tc *TaskEventsTest) TestTaskEvents_SimpleBatch(f *framework.F) { + t := f.T() + taskState := tc.waitUntilEvents(f, "simple_batch", 4) + events := taskState.Events + + // Assert task did not fail + require.Falsef(t, taskState.Failed, "task unexpectedly failed after %d events\n%s", + len(events), formatEvents(events), + ) + + // Assert the expected type of events were emitted in a specific order + // (based on v0.8.6) + require.Equal(t, api.TaskReceived, events[0].Type) + require.Equal(t, api.TaskSetup, events[1].Type) + require.Equal(t, api.TaskStarted, events[2].Type) + require.Equal(t, api.TaskTerminated, events[3].Type) +} + +func (tc *TaskEventsTest) TestTaskEvents_FailedBatch(f *framework.F) { + t := f.T() + taskState := tc.waitUntilEvents(f, "failed_batch", 4) + events := taskState.Events + + // Assert task did fail + require.Truef(t, taskState.Failed, "task unexpectedly succeeded after %d events\n%s", + len(events), formatEvents(events), + ) + + // Assert the expected type of events were emitted in a specific order + // (based on v0.8.6) + require.Equal(t, api.TaskReceived, events[0].Type) + require.Equal(t, api.TaskSetup, events[1].Type) + require.Equal(t, api.TaskDriverFailure, events[2].Type) + require.Equal(t, api.TaskNotRestarting, events[3].Type) + require.True(t, events[3].FailsTask) +} + +// TestTaskEvents_CompletedLeader asserts the proper events are emitted for a +// non-leader task when its leader task completes. +func (tc *TaskEventsTest) TestTaskEvents_CompletedLeader(f *framework.F) { + t := f.T() + taskState := tc.waitUntilEvents(f, "completed_leader", 7) + events := taskState.Events + + // Assert task did not fail + require.Falsef(t, taskState.Failed, "task unexpectedly failed after %d events\n%s", + len(events), formatEvents(events), + ) + + // Assert the expected type of events were emitted in a specific order + require.Equal(t, api.TaskReceived, events[0].Type) + require.Equal(t, api.TaskSetup, events[1].Type) + require.Equal(t, api.TaskStarted, events[2].Type) + require.Equal(t, api.TaskLeaderDead, events[3].Type) + require.Equal(t, api.TaskKilling, events[4].Type) + require.Equal(t, api.TaskTerminated, events[5].Type) + require.Equal(t, api.TaskKilled, events[6].Type) +} From 1ae8261139f2f1127108c0806b8aea3dbfad07cc Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 4 Jan 2019 15:19:57 -0800 Subject: [PATCH 2/2] client: emit Killing/Killed task events We were just emitting Killed/Terminated events before. In v0.8 we emitted Killing/Killed, but lacked Terminated when explicitly stopping a task. This change makes it so Terminated is always included, whether explicitly stopping a task or it exiting on its own. New output: 2019-01-04T14:58:51-08:00 Killed Task successfully killed 2019-01-04T14:58:51-08:00 Terminated Exit Code: 130, Signal: 2 2019-01-04T14:58:51-08:00 Killing Sent interrupt 2019-01-04T14:58:51-08:00 Leader Task Dead Leader Task in Group dead 2019-01-04T14:58:49-08:00 Started Task started by client 2019-01-04T14:58:49-08:00 Task Setup Building Task Directory 2019-01-04T14:58:49-08:00 Received Task received by client Old (v0.8.6) output: 2019-01-04T22:14:54Z Killed Task successfully killed 2019-01-04T22:14:54Z Killing Sent interrupt. Waiting 5s before force killing 2019-01-04T22:14:54Z Leader Task Dead Leader Task in Group dead 2019-01-04T22:14:53Z Started Task started by client 2019-01-04T22:14:53Z Task Setup Building Task Directory 2019-01-04T22:14:53Z Received Task received by client --- client/allocrunner/alloc_runner.go | 20 ++++++++++++++++++-- client/allocrunner/taskrunner/task_runner.go | 1 + 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 73ae31f9c..a599d9218 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -458,7 +458,23 @@ func (ar *allocRunner) handleTaskStateUpdates() { ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) } + // Emit kill event for live runners + for _, tr := range liveRunners { + tr.EmitEvent(killEvent) + } + + // Kill 'em all states = ar.killTasks() + + // Wait for TaskRunners to exit before continuing to + // prevent looping before TaskRunners have transitioned + // to Dead. + for _, tr := range liveRunners { + select { + case <-tr.WaitCh(): + case <-ar.waitCh: + } + } } // Get the client allocation @@ -485,7 +501,7 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { continue } - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) + err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilling)) if err != nil && err != taskrunner.ErrTaskNotRunning { ar.logger.Warn("error stopping leader task", "error", err, "task_name", name) } @@ -505,7 +521,7 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { wg.Add(1) go func(name string, tr *taskrunner.TaskRunner) { defer wg.Done() - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) + err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilling)) if err != nil && err != taskrunner.ErrTaskNotRunning { ar.logger.Warn("error stopping task", "error", err, "task_name", name) } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index e120f0ca5..364594eef 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -546,6 +546,7 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { case structs.TaskKilled: // Never restart an explicitly killed task. Kill method handles // updating the server. + tr.EmitEvent(structs.NewTaskEvent(state)) return false, 0 case structs.TaskNotRestarting, structs.TaskTerminated: tr.logger.Info("not restarting task", "reason", reason)