From 735530ca4f578ab05325c8f0da91a93f150d83ad Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Fri, 25 Oct 2019 10:32:20 -0400 Subject: [PATCH] client monitor endpoint tests --- api/agent_test.go | 68 ++++++--- nomad/client_monitor_endpoint.go | 3 +- nomad/client_monitor_endpoint_test.go | 189 ++++++++++++++++++++++++++ 3 files changed, 240 insertions(+), 20 deletions(-) create mode 100644 nomad/client_monitor_endpoint_test.go diff --git a/api/agent_test.go b/api/agent_test.go index bc22d374c..9d731c399 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -1,14 +1,16 @@ package api import ( + "fmt" "reflect" "sort" + "strings" "testing" "time" + "github.com/kr/pretty" "github.com/stretchr/testify/require" - "github.com/hashicorp/go-uuid" "github.com/hashicorp/nomad/api/internal/testutil" "github.com/stretchr/testify/assert" ) @@ -262,17 +264,47 @@ func TestAgent_Health(t *testing.T) { assert.True(health.Server.Ok) } -func TestAgent_Monitor(t *testing.T) { +func TestAgent_MonitorWithNode(t *testing.T) { t.Parallel() - c, s := makeClient(t, nil, nil) + rpcPort := 0 + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { + rpcPort = c.Ports.RPC + c.Client = &testutil.ClientConfig{ + Enabled: true, + } + }) defer s.Stop() + require.NoError(t, c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)})) + agent := c.Agent() + index := uint64(0) + var node *NodeListStub + // grab a node + testutil.WaitForResult(func() (bool, error) { + nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + index = qm.LastIndex + if len(nodes) != 1 { + return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes)) + } + if nodes[0].Status != "ready" { + return false, fmt.Errorf("node not ready: %s", nodes[0].Status) + } + node = nodes[0] + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + doneCh := make(chan struct{}) q := &QueryOptions{ Params: map[string]string{ "log-level": "debug", + "node-id": node.ID, }, } @@ -283,34 +315,32 @@ func TestAgent_Monitor(t *testing.T) { } // make a request to generate some logs - _, err = agent.Region() + _, err = agent.NodeName() require.NoError(t, err) - // Wait for the first log message and validate it + // Wait for a log message +OUTER: for { select { case log := <-logCh: - if log == " " { - return + if strings.Contains(log, "[DEBUG]") { + break OUTER } - require.Contains(t, log, "[DEBUG]") - case <-time.After(10 * time.Second): - require.Fail(t, "failed to get a log message") + case <-time.After(2 * time.Second): + require.Fail(t, "failed to get a DEBUG log message") } } } -func TestAgent_MonitorWithNode(t *testing.T) { +func TestAgent_Monitor(t *testing.T) { t.Parallel() c, s := makeClient(t, nil, nil) defer s.Stop() agent := c.Agent() - id, _ := uuid.GenerateUUID() q := &QueryOptions{ Params: map[string]string{ "log-level": "debug", - "node-id": id, }, } @@ -326,16 +356,16 @@ func TestAgent_MonitorWithNode(t *testing.T) { _, err = agent.Region() require.NoError(t, err) - // Wait for the first log message and validate it + // Wait for a log message +OUTER: for { select { case log := <-logCh: - if log == " " { - return + if strings.Contains(log, "[DEBUG]") { + break OUTER } - require.Contains(t, log, "[DEBUG]") - case <-time.After(10 * time.Second): - require.Fail(t, "failed to get a log message") + case <-time.After(2 * time.Second): + require.Fail(t, "failed to get a DEBUG log message") } } } diff --git a/nomad/client_monitor_endpoint.go b/nomad/client_monitor_endpoint.go index 6eedd75d1..dada37069 100644 --- a/nomad/client_monitor_endpoint.go +++ b/nomad/client_monitor_endpoint.go @@ -133,6 +133,7 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) { // NodeID was empty, so monitor this current server stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) + defer close(stopCh) defer cancel() monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{ @@ -142,7 +143,7 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) { go func() { if _, err := conn.Read(nil); err != nil { - close(stopCh) + // One end of the pipe closed, exit cancel() return } diff --git a/nomad/client_monitor_endpoint_test.go b/nomad/client_monitor_endpoint_test.go new file mode 100644 index 000000000..6045c9caa --- /dev/null +++ b/nomad/client_monitor_endpoint_test.go @@ -0,0 +1,189 @@ +package nomad + +import ( + "fmt" + "io" + "net" + "strings" + "testing" + "time" + + "github.com/hashicorp/nomad/client" + "github.com/hashicorp/nomad/client/config" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" + "github.com/ugorji/go/codec" +) + +func TestMonitor_Monitor_Remote_Server(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server and client + s1 := TestServer(t, nil) + defer s1.Shutdown() + s2 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + c, cleanup := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s2.GetConfig().RPCAddr.String()} + }) + defer cleanup() + + testutil.WaitForResult(func() (bool, error) { + nodes := s2.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + t.Fatalf("should have a clients") + }) + + // No node ID to monitor the remote server + req := cstructs.MonitorRequest{ + LogLevel: "debug", + NodeID: c.NodeID(), + } + + handler, err := s1.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(1 * time.Second) + expected := "[DEBUG]" + received := "" + +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + received += string(msg.Payload) + if strings.Contains(received, expected) { + require.Nil(p2.Close()) + break OUTER + } + } + } +} + +func TestMonitor_MonitorServer(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server and client + s := TestServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + // No node ID to monitor the remote server + req := cstructs.MonitorRequest{ + LogLevel: "debug", + } + + handler, err := s.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(1 * time.Second) + expected := "[DEBUG]" + received := "" + + // send logs + go func() { + for { + s.logger.Debug("test log") + time.Sleep(100 * time.Millisecond) + } + }() + +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + received += string(msg.Payload) + if strings.Contains(received, expected) { + require.Nil(p2.Close()) + break OUTER + } + } + } +}