From 4d1a1ac5bb4341a34a91cd93a9d84bd6bbe02b82 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 26 Sep 2018 17:33:32 -0700 Subject: [PATCH] tests: test logs endpoint against pending task Although the really exciting change is making WaitForRunning return the allocations that it started. This should cut down test boilerplate significantly. --- client/driver/mock_driver.go | 3 +- client/fs_endpoint_test.go | 338 +++++++++++---------------- command/agent/alloc_endpoint_test.go | 2 +- testutil/wait.go | 52 ++--- 4 files changed, 170 insertions(+), 225 deletions(-) diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index f17c9d8ab..11b502518 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -45,7 +45,8 @@ type MockDriverConfig struct { // StartErrRecoverable marks the error returned is recoverable StartErrRecoverable bool `mapstructure:"start_error_recoverable"` - // StartBlockFor specifies a duration in which to block before returning + // StartBlockFor specifies a duration in which to block Start before + // returning. Useful for testing the behavior of tasks in pending. StartBlockFor time.Duration `mapstructure:"start_block_for"` // KillAfter is the duration after which the mock driver indicates the task diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 7258b3778..b9e141acb 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -32,7 +32,7 @@ import ( // tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller // should destroy the temp dir. func tempAllocDir(t testing.TB) *allocdir.AllocDir { - dir, err := ioutil.TempDir("", "") + dir, err := ioutil.TempDir("", "nomadtest") if err != nil { t.Fatalf("TempDir() failed: %v", err) } @@ -536,68 +536,21 @@ func TestFS_Stream(t *testing.T) { }) defer c.Shutdown() - // Force an allocation onto the node expected := "Hello from the other side" - a := mock.Alloc() - a.Job.Type = structs.JobTypeBatch - a.NodeID = c.NodeID() - a.Job.TaskGroups[0].Count = 1 - a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ - Name: "web", - Driver: "mock_driver", - Config: map[string]interface{}{ - "run_for": "2s", - "stdout_string": expected, - }, - LogConfig: structs.DefaultLogConfig(), - Resources: &structs.Resources{ - CPU: 500, - MemoryMB: 256, - }, + job := mock.BatchJob() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "2s", + "stdout_string": expected, } - // Wait for the client to connect - testutil.WaitForResult(func() (bool, error) { - node, err := s.State().NodeByID(nil, c.NodeID()) - if err != nil { - return false, err - } - if node == nil { - return false, fmt.Errorf("unknown node") - } - - return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status") - }, func(err error) { - t.Fatal(err) - }) - - // Upsert the allocation - state := s.State() - require.Nil(state.UpsertJob(999, a.Job)) - require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) - - // Wait for the client to run the allocation - testutil.WaitForResult(func() (bool, error) { - alloc, err := state.AllocByID(nil, a.ID) - if err != nil { - return false, err - } - if alloc == nil { - return false, fmt.Errorf("unknown alloc") - } - if alloc.ClientStatus != structs.AllocClientStatusComplete { - return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) - } - - return true, nil - }, func(err error) { - t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) - }) + // Wait for alloc to be running + alloc := testutil.WaitForRunning(t, s.RPC, job)[0] // Make the request req := &cstructs.FsStreamRequest{ - AllocID: a.ID, - Path: "alloc/logs/web.stdout.0", + AllocID: alloc.ID, + Path: "alloc/logs/worker.stdout.0", PlainText: true, QueryOptions: structs.QueryOptions{Region: "global"}, } @@ -693,72 +646,25 @@ func TestFS_Stream_Follow(t *testing.T) { }) defer c.Shutdown() - // Force an allocation onto the node expectedBase := "Hello from the other side" repeat := 10 - a := mock.Alloc() - a.Job.Type = structs.JobTypeBatch - a.NodeID = c.NodeID() - a.Job.TaskGroups[0].Count = 1 - a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ - Name: "web", - Driver: "mock_driver", - Config: map[string]interface{}{ - "run_for": "20s", - "stdout_string": expectedBase, - "stdout_repeat": repeat, - "stdout_repeat_duration": 200 * time.Millisecond, - }, - LogConfig: structs.DefaultLogConfig(), - Resources: &structs.Resources{ - CPU: 500, - MemoryMB: 256, - }, + job := mock.BatchJob() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "20s", + "stdout_string": expectedBase, + "stdout_repeat": repeat, + "stdout_repeat_duration": 200 * time.Millisecond, } - // Wait for the client to connect - testutil.WaitForResult(func() (bool, error) { - node, err := s.State().NodeByID(nil, c.NodeID()) - if err != nil { - return false, err - } - if node == nil { - return false, fmt.Errorf("unknown node") - } - - return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status") - }, func(err error) { - t.Fatal(err) - }) - - // Upsert the allocation - state := s.State() - require.Nil(state.UpsertJob(999, a.Job)) - require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) - - // Wait for the client to run the allocation - testutil.WaitForResult(func() (bool, error) { - alloc, err := state.AllocByID(nil, a.ID) - if err != nil { - return false, err - } - if alloc == nil { - return false, fmt.Errorf("unknown alloc") - } - if alloc.ClientStatus != structs.AllocClientStatusRunning { - return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) - } - - return true, nil - }, func(err error) { - t.Fatalf("Alloc on node %q not running: %v", c.NodeID(), err) - }) + // Wait for alloc to be running + alloc := testutil.WaitForRunning(t, s.RPC, job)[0] // Make the request req := &cstructs.FsStreamRequest{ - AllocID: a.ID, - Path: "alloc/logs/web.stdout.0", + AllocID: alloc.ID, + Path: "alloc/logs/worker.stdout.0", PlainText: true, Follow: true, QueryOptions: structs.QueryOptions{Region: "global"}, @@ -837,69 +743,22 @@ func TestFS_Stream_Limit(t *testing.T) { }) defer c.Shutdown() - // Force an allocation onto the node var limit int64 = 5 full := "Hello from the other side" expected := full[:limit] - a := mock.Alloc() - a.Job.Type = structs.JobTypeBatch - a.NodeID = c.NodeID() - a.Job.TaskGroups[0].Count = 1 - a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ - Name: "web", - Driver: "mock_driver", - Config: map[string]interface{}{ - "run_for": "2s", - "stdout_string": full, - }, - LogConfig: structs.DefaultLogConfig(), - Resources: &structs.Resources{ - CPU: 500, - MemoryMB: 256, - }, + job := mock.BatchJob() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "2s", + "stdout_string": full, } - // Wait for the client to connect - testutil.WaitForResult(func() (bool, error) { - node, err := s.State().NodeByID(nil, c.NodeID()) - if err != nil { - return false, err - } - if node == nil { - return false, fmt.Errorf("unknown node") - } - - return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status") - }, func(err error) { - t.Fatal(err) - }) - - // Upsert the allocation - state := s.State() - require.Nil(state.UpsertJob(999, a.Job)) - require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) - - // Wait for the client to run the allocation - testutil.WaitForResult(func() (bool, error) { - alloc, err := state.AllocByID(nil, a.ID) - if err != nil { - return false, err - } - if alloc == nil { - return false, fmt.Errorf("unknown alloc") - } - if alloc.ClientStatus != structs.AllocClientStatusComplete { - return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) - } - - return true, nil - }, func(err error) { - t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) - }) + // Wait for alloc to be running + alloc := testutil.WaitForRunning(t, s.RPC, job)[0] // Make the request req := &cstructs.FsStreamRequest{ - AllocID: a.ID, + AllocID: alloc.ID, Path: "alloc/logs/web.stdout.0", PlainText: true, Limit: limit, @@ -1040,6 +899,116 @@ OUTER: } } +// TestFS_Logs_TaskPending asserts that trying to stream logs for tasks which +// have not started returns a 404 error. +func TestFS_Logs_TaskPending(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s := nomad.TestServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + c := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + defer c.Shutdown() + + job := mock.BatchJob() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "start_block_for": "4s", + } + + // Register job + args := &structs.JobRegisterRequest{} + args.Job = job + args.WriteRequest.Region = "global" + var jobResp structs.JobRegisterResponse + require.NoError(s.RPC("Job.Register", args, &jobResp)) + + // Get the allocation ID + var allocID string + testutil.WaitForResult(func() (bool, error) { + args := structs.AllocListRequest{} + args.Region = "global" + resp := structs.AllocListResponse{} + if err := s.RPC("Alloc.List", &args, &resp); err != nil { + return false, err + } + + if len(resp.Allocations) != 1 { + return false, fmt.Errorf("expected 1 alloc, found %d", len(resp.Allocations)) + } + + allocID = resp.Allocations[0].ID + return true, nil + }, func(err error) { + t.Fatalf("error getting alloc id: %v", err) + }) + + // Make the request + req := &cstructs.FsLogsRequest{ + AllocID: allocID, + Task: job.TaskGroups[0].Tasks[0].Name, + LogType: "stdout", + Origin: "start", + PlainText: true, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler + handler, err := c.StreamingRpcHandler("FileSystem.Logs") + require.Nil(err) + + // Create a pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler + go handler(p2) + + // Start the decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + for { + select { + case <-time.After(3 * time.Second): + t.Fatal("timeout") + case err := <-errCh: + t.Fatalf("unexpected stream error: %v", err) + case msg := <-streamMsg: + require.NotNil(msg.Error) + require.NotNil(msg.Error.Code) + require.EqualValues(404, *msg.Error.Code) + require.Contains(msg.Error.Message, "not started") + return + } + } +} + func TestFS_Logs_ACL(t *testing.T) { t.Parallel() require := require.New(t) @@ -1174,7 +1143,6 @@ func TestFS_Logs(t *testing.T) { }) defer c.Shutdown() - // Force an allocation onto the node expected := "Hello from the other side\n" job := mock.BatchJob() job.TaskGroups[0].Count = 1 @@ -1183,21 +1151,6 @@ func TestFS_Logs(t *testing.T) { "stdout_string": expected, } - // Wait for the client to connect - testutil.WaitForResult(func() (bool, error) { - node, err := s.State().NodeByID(nil, c.NodeID()) - if err != nil { - return false, err - } - if node == nil { - return false, fmt.Errorf("unknown node") - } - - return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status") - }, func(err error) { - t.Fatal(err) - }) - // Wait for client to be running job testutil.WaitForRunning(t, s.RPC, job) @@ -1291,7 +1244,6 @@ func TestFS_Logs_Follow(t *testing.T) { }) defer c.Shutdown() - // Force an allocation onto the node expectedBase := "Hello from the other side\n" repeat := 10 @@ -1305,19 +1257,11 @@ func TestFS_Logs_Follow(t *testing.T) { } // Wait for client to be running job - testutil.WaitForRunning(t, s.RPC, job) - - // Get the allocation ID - args := structs.AllocListRequest{} - args.Region = "global" - resp := structs.AllocListResponse{} - require.NoError(s.RPC("Alloc.List", &args, &resp)) - require.Len(resp.Allocations, 1) - allocID := resp.Allocations[0].ID + alloc := testutil.WaitForRunning(t, s.RPC, job)[0] // Make the request req := &cstructs.FsLogsRequest{ - AllocID: allocID, + AllocID: alloc.ID, Task: job.TaskGroups[0].Tasks[0].Name, LogType: "stdout", Origin: "start", diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 5d85894c2..d576817f1 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -453,7 +453,7 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { // Wait for the client to run it testutil.WaitForResult(func() (bool, error) { - if _, err := s.client.GetClientAlloc(alloc.ID); err != nil { + if _, err := s.client.GetAllocState(alloc.ID); err != nil { return false, err } diff --git a/testutil/wait.go b/testutil/wait.go index 583f2f1f7..3e0ea166f 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -87,42 +87,40 @@ func WaitForLeader(t testing.T, rpc rpcFn) { }) } -// WaitForRunning runs a job and blocks until it is running. -func WaitForRunning(t testing.T, rpc rpcFn, job *structs.Job) { - registered := false +// WaitForRunning runs a job and blocks until all allocs are out of pending. +func WaitForRunning(t testing.T, rpc rpcFn, job *structs.Job) []*structs.AllocListStub { WaitForResult(func() (bool, error) { - if !registered { - args := &structs.JobRegisterRequest{} - args.Job = job - args.WriteRequest.Region = "global" - var jobResp structs.JobRegisterResponse - err := rpc("Job.Register", args, &jobResp) - if err != nil { - return false, fmt.Errorf("Job.Register error: %v", err) - } + args := &structs.JobRegisterRequest{} + args.Job = job + args.WriteRequest.Region = "global" + var jobResp structs.JobRegisterResponse + err := rpc("Job.Register", args, &jobResp) + return err == nil, fmt.Errorf("Job.Register error: %v", err) + }, func(err error) { + t.Fatalf("error registering job: %v", err) + }) - // Only register once - registered = true - } + t.Logf("Job %q registered", job.ID) - args := &structs.JobSummaryRequest{} + var resp structs.JobAllocationsResponse + + WaitForResult(func() (bool, error) { + args := &structs.JobSpecificRequest{} args.JobID = job.ID args.QueryOptions.Region = "global" - var resp structs.JobSummaryResponse - err := rpc("Job.Summary", args, &resp) + err := rpc("Job.Allocations", args, &resp) if err != nil { - return false, fmt.Errorf("Job.Summary error: %v", err) + return false, fmt.Errorf("Job.Allocations error: %v", err) } - tgs := len(job.TaskGroups) - summaries := len(resp.JobSummary.Summary) - if tgs != summaries { - return false, fmt.Errorf("task_groups=%d summaries=%d", tgs, summaries) + if len(resp.Allocations) == 0 { + return false, fmt.Errorf("0 allocations") } - for tg, summary := range resp.JobSummary.Summary { - if summary.Running == 0 { - return false, fmt.Errorf("task_group=%s %#v", tg, resp.JobSummary.Summary) + for _, alloc := range resp.Allocations { + if alloc.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("alloc not running: id=%v tg=%v status=%v", + alloc.ID, alloc.TaskGroup, alloc.ClientStatus) } } @@ -130,4 +128,6 @@ func WaitForRunning(t testing.T, rpc rpcFn, job *structs.Job) { }, func(err error) { t.Fatalf("job not running: %v", err) }) + + return resp.Allocations }