client monitor endpoint tests

This commit is contained in:
Drew Bailey
2019-10-25 10:32:20 -04:00
parent 3c0082f132
commit 735530ca4f
3 changed files with 240 additions and 20 deletions

View File

@@ -1,14 +1,16 @@
package api
import (
"fmt"
"reflect"
"sort"
"strings"
"testing"
"time"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/stretchr/testify/assert"
)
@@ -262,17 +264,47 @@ func TestAgent_Health(t *testing.T) {
assert.True(health.Server.Ok)
}
func TestAgent_Monitor(t *testing.T) {
func TestAgent_MonitorWithNode(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
rpcPort := 0
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
rpcPort = c.Ports.RPC
c.Client = &testutil.ClientConfig{
Enabled: true,
}
})
defer s.Stop()
require.NoError(t, c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)}))
agent := c.Agent()
index := uint64(0)
var node *NodeListStub
// grab a node
testutil.WaitForResult(func() (bool, error) {
nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
index = qm.LastIndex
if len(nodes) != 1 {
return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes))
}
if nodes[0].Status != "ready" {
return false, fmt.Errorf("node not ready: %s", nodes[0].Status)
}
node = nodes[0]
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
doneCh := make(chan struct{})
q := &QueryOptions{
Params: map[string]string{
"log-level": "debug",
"node-id": node.ID,
},
}
@@ -283,34 +315,32 @@ func TestAgent_Monitor(t *testing.T) {
}
// make a request to generate some logs
_, err = agent.Region()
_, err = agent.NodeName()
require.NoError(t, err)
// Wait for the first log message and validate it
// Wait for a log message
OUTER:
for {
select {
case log := <-logCh:
if log == " " {
return
if strings.Contains(log, "[DEBUG]") {
break OUTER
}
require.Contains(t, log, "[DEBUG]")
case <-time.After(10 * time.Second):
require.Fail(t, "failed to get a log message")
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
}
}
func TestAgent_MonitorWithNode(t *testing.T) {
func TestAgent_Monitor(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()
agent := c.Agent()
id, _ := uuid.GenerateUUID()
q := &QueryOptions{
Params: map[string]string{
"log-level": "debug",
"node-id": id,
},
}
@@ -326,16 +356,16 @@ func TestAgent_MonitorWithNode(t *testing.T) {
_, err = agent.Region()
require.NoError(t, err)
// Wait for the first log message and validate it
// Wait for a log message
OUTER:
for {
select {
case log := <-logCh:
if log == " " {
return
if strings.Contains(log, "[DEBUG]") {
break OUTER
}
require.Contains(t, log, "[DEBUG]")
case <-time.After(10 * time.Second):
require.Fail(t, "failed to get a log message")
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
}
}

View File

@@ -133,6 +133,7 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) {
// NodeID was empty, so monitor this current server
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer close(stopCh)
defer cancel()
monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{
@@ -142,7 +143,7 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) {
go func() {
if _, err := conn.Read(nil); err != nil {
close(stopCh)
// One end of the pipe closed, exit
cancel()
return
}

View File

@@ -0,0 +1,189 @@
package nomad
import (
"fmt"
"io"
"net"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
)
func TestMonitor_Monitor_Remote_Server(t *testing.T) {
t.Parallel()
require := require.New(t)
// start server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.GetConfig().RPCAddr.String()}
})
defer cleanup()
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// No node ID to monitor the remote server
req := cstructs.MonitorRequest{
LogLevel: "debug",
NodeID: c.NodeID(),
}
handler, err := s1.StreamingRpcHandler("Agent.Monitor")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(1 * time.Second)
expected := "[DEBUG]"
received := ""
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
received += string(msg.Payload)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER
}
}
}
}
func TestMonitor_MonitorServer(t *testing.T) {
t.Parallel()
require := require.New(t)
// start server and client
s := TestServer(t, nil)
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
// No node ID to monitor the remote server
req := cstructs.MonitorRequest{
LogLevel: "debug",
}
handler, err := s.StreamingRpcHandler("Agent.Monitor")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(1 * time.Second)
expected := "[DEBUG]"
received := ""
// send logs
go func() {
for {
s.logger.Debug("test log")
time.Sleep(100 * time.Millisecond)
}
}()
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
received += string(msg.Payload)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER
}
}
}
}