From a8e1f352407967e2ac113526491df5bcc4e6d43d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 15:57:44 -0700 Subject: [PATCH] tests: test logs from client<->api package --- api/fs_test.go | 147 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/api/fs_test.go b/api/fs_test.go index bd9219fc2..e1101c92d 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -1,14 +1,161 @@ package api import ( + "bytes" "fmt" "io" "reflect" "strings" "testing" "time" + + units "github.com/docker/go-units" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/testutil" + "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) +func TestFS_Logs(t *testing.T) { + t.Parallel() + require := require.New(t) + rpcPort := 0 + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { + rpcPort = c.Ports.RPC + c.Client = &testutil.ClientConfig{ + Enabled: true, + } + }) + defer s.Stop() + + //TODO There should be a way to connect the client to the servers in + //makeClient above + require.NoError(c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)})) + + index := uint64(0) + testutil.WaitForResult(func() (bool, error) { + nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + index = qm.LastIndex + if len(nodes) != 1 { + return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes)) + } + if nodes[0].Status != "ready" { + return false, fmt.Errorf("node not ready: %s", nodes[0].Status) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + var input strings.Builder + input.Grow(units.MB) + lines := 80 * units.KB + for i := 0; i < lines; i++ { + fmt.Fprintf(&input, "%d\n", i) + } + + job := &Job{ + ID: helper.StringToPtr("TestFS_Logs"), + Region: helper.StringToPtr("global"), + Datacenters: []string{"dc1"}, + Type: helper.StringToPtr("batch"), + TaskGroups: []*TaskGroup{ + { + Name: helper.StringToPtr("TestFS_LogsGroup"), + Tasks: []*Task{ + { + Name: "logger", + Driver: "mock_driver", + Config: map[string]interface{}{ + "stdout_string": input.String(), + }, + }, + }, + }, + }, + } + + jobs := c.Jobs() + jobResp, _, err := jobs.Register(job, nil) + require.NoError(err) + + index = jobResp.EvalCreateIndex + evals := c.Evaluations() + testutil.WaitForResult(func() (bool, error) { + evalResp, qm, err := evals.Info(jobResp.EvalID, &QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + if evalResp.BlockedEval != "" { + t.Fatalf("Eval blocked: %s", pretty.Sprint(evalResp)) + } + index = qm.LastIndex + if evalResp.Status != "complete" { + return false, fmt.Errorf("eval status: %v", evalResp.Status) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + allocID := "" + testutil.WaitForResult(func() (bool, error) { + allocs, _, err := jobs.Allocations(*job.ID, true, &QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + if len(allocs) != 1 { + return false, fmt.Errorf("unexpected number of allocs: %d", len(allocs)) + } + if allocs[0].ClientStatus != "complete" { + return false, fmt.Errorf("alloc not complete: %s", allocs[0].ClientStatus) + } + allocID = allocs[0].ID + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + alloc, _, err := c.Allocations().Info(allocID, nil) + require.NoError(err) + + for i := 0; i < 3; i++ { + stopCh := make(chan struct{}) + defer close(stopCh) + + frames, errors := c.AllocFS().Logs(alloc, false, "logger", "stdout", "start", 0, stopCh, nil) + + var result bytes.Buffer + READ_FRAMES: + for { + select { + case f := <-frames: + if f == nil { + break READ_FRAMES + } + result.Write(f.Data) + case err := <-errors: + // Don't Fatal here as the other assertions may + // contain helpeful information. + t.Errorf("Error: %v", err) + } + } + + // Check length + require.Equal(input.Len(), result.Len()) + + // Check complete ordering + for i := 0; i < lines; i++ { + line, err := result.ReadBytes('\n') + require.NoErrorf(err, "unexpected error on line %d: %v", i, err) + require.Equal(fmt.Sprintf("%d\n", i), string(line)) + } + } +} + func TestFS_FrameReader(t *testing.T) { t.Parallel() // Create a channel of the frames and a cancel channel