From e46c41553d2eec347aa3c9209bb0ca20757db909 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Wed, 13 Nov 2019 09:32:18 -0500 Subject: [PATCH] serverID to target remote leader or server handle the case where we request a server-id which is this current server update docs, error on node and server id params more accurate names for tests use shared no leader err, formatting rm bad comment remove redundant variable --- command/agent/agent_endpoint.go | 5 + nomad/client_agent_endpoint.go | 94 ++++----- nomad/client_agent_endpoint_test.go | 180 +++++++++++------- website/source/api/agent.html.md | 13 +- .../source/docs/commands/monitor.html.md.erb | 4 + 5 files changed, 175 insertions(+), 121 deletions(-) diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 58fa0f6a5..2d25dc48b 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -205,6 +205,11 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( PlainText: plainText, } + // if node and server were requested return error + if args.NodeID != "" && args.ServerID != "" { + return nil, CodedError(400, "Cannot target node and server simultaneously") + } + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) // Make the RPC diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 70c277d1b..60d8c7b44 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -60,27 +60,25 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { } // Targeting a node, forward request to node - if args.NodeID != "" && args.NodeID != "leader" { + if args.NodeID != "" { m.forwardMonitorClient(conn, args, encoder, decoder) // forwarded request has ended, return return } - if args.NodeID == "leader" { - isLeader, remoteServer := m.srv.getLeader() - if !isLeader && remoteServer != nil { - m.forwardMonitorLeader(remoteServer, conn, args, encoder, decoder) - return - } - if !isLeader && remoteServer == nil { - err := fmt.Errorf("no leader") - handleStreamResultError(err, helper.Int64ToPtr(400), encoder) - return - } + currentServer := m.srv.serf.LocalMember().Name + var forwardServer bool + // Targeting a remote server which is not the leader and not this server + if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer { + forwardServer = true } - // targeting a specific server, forward to that server - if args.ServerID != "" { + // Targeting leader and this server is not current leader + if args.ServerID == "leader" && !m.srv.IsLeader() { + forwardServer = true + } + + if forwardServer { m.forwardMonitorServer(conn, args, encoder, decoder) return } @@ -256,62 +254,52 @@ func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni return } -func (m *Agent) forwardMonitorLeader(leader *serverParts, conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { - var leaderConn net.Conn - - localConn, err := m.srv.streamingRpc(leader, "Agent.Monitor") - if err != nil { - handleStreamResultError(err, nil, encoder) - return - } - - leaderConn = localConn - defer leaderConn.Close() - - // Send the Request - outEncoder := codec.NewEncoder(leaderConn, structs.MsgpackHandle) - if err := outEncoder.Encode(args); err != nil { - handleStreamResultError(err, nil, encoder) - return - } - - structs.Bridge(conn, leaderConn) - return -} - func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { + var target *serverParts serverID := args.ServerID + // empty ServerID to prevent forwarding loop args.ServerID = "" - serfMembers := m.srv.Members() - var target *serverParts - for _, mem := range serfMembers { - if mem.Name == serverID { - ok, srv := isNomadServer(mem) - if !ok { - err := fmt.Errorf("unknown nomad server %s", serverID) - handleStreamResultError(err, nil, encoder) - return + if serverID == "leader" { + isLeader, remoteServer := m.srv.getLeader() + if !isLeader && remoteServer != nil { + target = remoteServer + } + if !isLeader && remoteServer == nil { + handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder) + return + } + } else { + // See if the server ID is a known member + serfMembers := m.srv.Members() + for _, mem := range serfMembers { + if mem.Name == serverID { + if ok, srv := isNomadServer(mem); ok { + target = srv + } } - target = srv } } - var serverConn net.Conn - localConn, err := m.srv.streamingRpc(target, "Agent.Monitor") - if err != nil { - handleStreamResultError(err, nil, encoder) + // Unable to find a server + if target == nil { + err := fmt.Errorf("unknown nomad server %s", serverID) + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) return } - serverConn = localConn + serverConn, err := m.srv.streamingRpc(target, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } defer serverConn.Close() // Send the Request outEncoder := codec.NewEncoder(serverConn, structs.MsgpackHandle) if err := outEncoder.Encode(args); err != nil { - handleStreamResultError(err, nil, encoder) + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) return } diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 1ddbdb628..b80f75c56 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client" "github.com/hashicorp/nomad/client/config" @@ -22,7 +23,7 @@ import ( "github.com/ugorji/go/codec" ) -func TestMonitor_Monitor_Remote_Server(t *testing.T) { +func TestMonitor_Monitor_Remote_Client(t *testing.T) { t.Parallel() require := require.New(t) @@ -117,9 +118,8 @@ OUTER: } } -func TestMonitor_MonitorRemoteLeader(t *testing.T) { +func TestMonitor_Monitor_RemoteServer(t *testing.T) { t.Parallel() - require := require.New(t) // start servers s1 := TestServer(t, nil) @@ -132,6 +132,7 @@ func TestMonitor_MonitorRemoteLeader(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) + // determine leader and nonleader servers := []*Server{s1, s2} var nonLeader *Server var leader *Server @@ -143,78 +144,127 @@ func TestMonitor_MonitorRemoteLeader(t *testing.T) { } } - go func() { - for { - leader.logger.Warn("leader log") - time.Sleep(10 * time.Millisecond) - } - }() - - // No node ID to monitor the remote server - req := cstructs.MonitorRequest{ - LogLevel: "warn", - NodeID: "leader", + cases := []struct { + desc string + serverID string + expectedLog string + logger hclog.InterceptLogger + origin *Server + }{ + { + desc: "remote leader", + serverID: "leader", + expectedLog: "leader log", + logger: leader.logger, + origin: nonLeader, + }, + { + desc: "remote server", + serverID: nonLeader.serf.LocalMember().Name, + expectedLog: "nonleader log", + logger: nonLeader.logger, + origin: leader, + }, + { + desc: "serverID is current leader", + serverID: "leader", + expectedLog: "leader log", + logger: leader.logger, + origin: leader, + }, + { + desc: "serverID is current server", + serverID: nonLeader.serf.LocalMember().Name, + expectedLog: "non leader log", + logger: nonLeader.logger, + origin: nonLeader, + }, } - handler, err := nonLeader.StreamingRpcHandler("Agent.Monitor") - require.Nil(err) + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + require := require.New(t) - // create pipe - p1, p2 := net.Pipe() - defer p1.Close() - defer p2.Close() - - errCh := make(chan error) - streamMsg := make(chan *cstructs.StreamErrWrapper) - - go handler(p2) - - // Start decoder - go func() { - decoder := codec.NewDecoder(p1, structs.MsgpackHandle) - for { - var msg cstructs.StreamErrWrapper - if err := decoder.Decode(&msg); err != nil { - if err == io.EOF || strings.Contains(err.Error(), "closed") { - return + // send some specific logs + doneCh := make(chan struct{}) + go func() { + for { + select { + case <-doneCh: + return + default: + tc.logger.Warn(tc.expectedLog) + time.Sleep(10 * time.Millisecond) + } } - errCh <- fmt.Errorf("error decoding: %v", err) + }() + + req := cstructs.MonitorRequest{ + LogLevel: "warn", + ServerID: tc.serverID, } - streamMsg <- &msg - } - }() + handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) - // send request - encoder := codec.NewEncoder(p1, structs.MsgpackHandle) - require.Nil(encoder.Encode(req)) + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() - timeout := time.After(2 * time.Second) - expected := "leader log" - received := "" + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) -OUTER: - for { - select { - case <-timeout: - t.Fatal("timeout waiting for logs") - case err := <-errCh: - t.Fatal(err) - case msg := <-streamMsg: - if msg.Error != nil { - t.Fatalf("Got error: %v", msg.Error.Error()) + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(2 * time.Second) + received := "" + + OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) + if strings.Contains(received, tc.expectedLog) { + close(doneCh) + require.Nil(p2.Close()) + break OUTER + } + } } - - var frame sframer.StreamFrame - err := json.Unmarshal(msg.Payload, &frame) - assert.NoError(t, err) - - received += string(frame.Data) - if strings.Contains(received, expected) { - require.Nil(p2.Close()) - break OUTER - } - } + }) } } diff --git a/website/source/api/agent.html.md b/website/source/api/agent.html.md index 7d2446719..42e512567 100644 --- a/website/source/api/agent.html.md +++ b/website/source/api/agent.html.md @@ -518,16 +518,20 @@ The table below shows this endpoint's support for ### Parameters -- `log-level` `(string: "info")` - Specifies a text string containing a log level +- `log_level` `(string: "info")` - Specifies a text string containing a log level to filter on, such as `info`. Possible values include `trace`, `debug`, `info`, `warn`, `error` - `json` `(bool: false)` - Specifies if the log format for streamed logs should be JSON. -- `node-id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text +- `node_id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text string containing a node-id to target for streaming. +- `server_id` `(string: "server1.global")` - Specifies a text + string containing a server name or "leader" to target a specific remote server + or leader for streaming. + - `plain` `(bool: false)` - Specifies if the response should be JSON or plaintext @@ -535,7 +539,10 @@ The table below shows this endpoint's support for ```text $ curl \ - https://localhost:4646/v1/agent/monitor?log-level=debug&node-id=a57b2adb-1a30-2dda-8df0-25abb0881952 + https://localhost:4646/v1/agent/monitor?log_level=debug&server_id=leader + +$ curl \ + https://localhost:4646/v1/agent/monitor?log_level=debug&node_id=a57b2adb-1a30-2dda-8df0-25abb0881952 ``` ### Sample Response diff --git a/website/source/docs/commands/monitor.html.md.erb b/website/source/docs/commands/monitor.html.md.erb index 7f6f0264f..c26e730b2 100644 --- a/website/source/docs/commands/monitor.html.md.erb +++ b/website/source/docs/commands/monitor.html.md.erb @@ -37,6 +37,10 @@ The monitor command also allows you to specify a single client node id to follow - `-node-id`: Specifies the client node-id to stream logs from. If no node-id is given the nomad server from the -address flag will be used. +- `-server-id`: Specifies the nomad server id to stream logs from. Accepts +server names from `nomad server members` and also a special `leader` option +which will target the current leader. + - `-json`: Stream logs in json format ## Examples