tests: test logs from client<->api package

This commit is contained in:
Michael Schurter
2018-04-30 15:57:44 -07:00
parent 72017cd060
commit a8e1f35240

View File

@@ -1,14 +1,161 @@
package api
import (
"bytes"
"fmt"
"io"
"reflect"
"strings"
"testing"
"time"
units "github.com/docker/go-units"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)
func TestFS_Logs(t *testing.T) {
t.Parallel()
require := require.New(t)
rpcPort := 0
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
rpcPort = c.Ports.RPC
c.Client = &testutil.ClientConfig{
Enabled: true,
}
})
defer s.Stop()
//TODO There should be a way to connect the client to the servers in
//makeClient above
require.NoError(c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)}))
index := uint64(0)
testutil.WaitForResult(func() (bool, error) {
nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
index = qm.LastIndex
if len(nodes) != 1 {
return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes))
}
if nodes[0].Status != "ready" {
return false, fmt.Errorf("node not ready: %s", nodes[0].Status)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
var input strings.Builder
input.Grow(units.MB)
lines := 80 * units.KB
for i := 0; i < lines; i++ {
fmt.Fprintf(&input, "%d\n", i)
}
job := &Job{
ID: helper.StringToPtr("TestFS_Logs"),
Region: helper.StringToPtr("global"),
Datacenters: []string{"dc1"},
Type: helper.StringToPtr("batch"),
TaskGroups: []*TaskGroup{
{
Name: helper.StringToPtr("TestFS_LogsGroup"),
Tasks: []*Task{
{
Name: "logger",
Driver: "mock_driver",
Config: map[string]interface{}{
"stdout_string": input.String(),
},
},
},
},
},
}
jobs := c.Jobs()
jobResp, _, err := jobs.Register(job, nil)
require.NoError(err)
index = jobResp.EvalCreateIndex
evals := c.Evaluations()
testutil.WaitForResult(func() (bool, error) {
evalResp, qm, err := evals.Info(jobResp.EvalID, &QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
if evalResp.BlockedEval != "" {
t.Fatalf("Eval blocked: %s", pretty.Sprint(evalResp))
}
index = qm.LastIndex
if evalResp.Status != "complete" {
return false, fmt.Errorf("eval status: %v", evalResp.Status)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
allocID := ""
testutil.WaitForResult(func() (bool, error) {
allocs, _, err := jobs.Allocations(*job.ID, true, &QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
if len(allocs) != 1 {
return false, fmt.Errorf("unexpected number of allocs: %d", len(allocs))
}
if allocs[0].ClientStatus != "complete" {
return false, fmt.Errorf("alloc not complete: %s", allocs[0].ClientStatus)
}
allocID = allocs[0].ID
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
alloc, _, err := c.Allocations().Info(allocID, nil)
require.NoError(err)
for i := 0; i < 3; i++ {
stopCh := make(chan struct{})
defer close(stopCh)
frames, errors := c.AllocFS().Logs(alloc, false, "logger", "stdout", "start", 0, stopCh, nil)
var result bytes.Buffer
READ_FRAMES:
for {
select {
case f := <-frames:
if f == nil {
break READ_FRAMES
}
result.Write(f.Data)
case err := <-errors:
// Don't Fatal here as the other assertions may
// contain helpeful information.
t.Errorf("Error: %v", err)
}
}
// Check length
require.Equal(input.Len(), result.Len())
// Check complete ordering
for i := 0; i < lines; i++ {
line, err := result.ReadBytes('\n')
require.NoErrorf(err, "unexpected error on line %d: %v", i, err)
require.Equal(fmt.Sprintf("%d\n", i), string(line))
}
}
}
func TestFS_FrameReader(t *testing.T) {
t.Parallel()
// Create a channel of the frames and a cancel channel