From 5a31bd62ca64510e0714efa58c260dd7cfcf5800 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Tue, 12 Nov 2019 14:22:51 -0500 Subject: [PATCH 1/3] Allows monitor to target leader server Allows user to pass in node-id=leader to forward monitor request to remote a remote leader. --- nomad/client_agent_endpoint.go | 38 ++++++++++- nomad/client_agent_endpoint_test.go | 101 ++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 1 deletion(-) diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 8101b2b57..1b664730f 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -60,12 +60,25 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { } // Targeting a node, forward request to node - if args.NodeID != "" { + if args.NodeID != "" && args.NodeID != "leader" { m.forwardMonitor(conn, args, encoder, decoder) // forwarded request has ended, return return } + if args.NodeID == "leader" { + isLeader, remoteServer := m.srv.getLeader() + if !isLeader && remoteServer != nil { + m.forwardMonitorLeader(remoteServer, conn, args, encoder, decoder) + return + } + if !isLeader && remoteServer == nil { + err := fmt.Errorf("No leader") + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + } + // NodeID was empty, so monitor this current server ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -236,3 +249,26 @@ func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorReq structs.Bridge(conn, clientConn) return } + +func (m *Agent) forwardMonitorLeader(leader *serverParts, conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { + var leaderConn net.Conn + + localConn, err := m.srv.streamingRpc(leader, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + leaderConn = localConn + defer leaderConn.Close() + + // Send the Request + outEncoder := codec.NewEncoder(leaderConn, structs.MsgpackHandle) + if err := outEncoder.Encode(args); err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + structs.Bridge(conn, leaderConn) + return +} diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 201e21fb9..1ddbdb628 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -117,6 +117,107 @@ OUTER: } } +func TestMonitor_MonitorRemoteLeader(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start servers + s1 := TestServer(t, nil) + defer s1.Shutdown() + s2 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + servers := []*Server{s1, s2} + var nonLeader *Server + var leader *Server + for _, s := range servers { + if !s.IsLeader() { + nonLeader = s + } else { + leader = s + } + } + + go func() { + for { + leader.logger.Warn("leader log") + time.Sleep(10 * time.Millisecond) + } + }() + + // No node ID to monitor the remote server + req := cstructs.MonitorRequest{ + LogLevel: "warn", + NodeID: "leader", + } + + handler, err := nonLeader.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(2 * time.Second) + expected := "leader log" + received := "" + +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) + if strings.Contains(received, expected) { + require.Nil(p2.Close()) + break OUTER + } + } + } +} + func TestMonitor_MonitorServer(t *testing.T) { t.Parallel() require := require.New(t) From fb49f3c35bec799d899dd4546d0e6451f8650dff Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Tue, 12 Nov 2019 16:23:39 -0500 Subject: [PATCH 2/3] add server-id to monitor specific server --- client/structs/structs.go | 3 ++ command/agent/agent_endpoint.go | 5 ++-- command/agent_monitor.go | 3 ++ nomad/client_agent_endpoint.go | 52 +++++++++++++++++++++++++++++++-- 4 files changed, 57 insertions(+), 6 deletions(-) diff --git a/client/structs/structs.go b/client/structs/structs.go index 6a84e2d63..f2c54bdeb 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -44,6 +44,9 @@ type MonitorRequest struct { // NodeID is the node we want to track the logs of NodeID string + // ServerID is the server we want to track the logs of + ServerID string + // PlainText disables base64 encoding. PlainText bool diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 56c3631b7..58fa0f6a5 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -175,9 +175,6 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( return nil, CodedError(400, fmt.Sprintf("Unknown log level: %s", logLevel)) } - // Determine if we are targeting a server or client - nodeID := req.URL.Query().Get("node_id") - logJSON := false logJSONStr := req.URL.Query().Get("log_json") if logJSONStr != "" { @@ -198,9 +195,11 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( plainText = parsed } + nodeID := req.URL.Query().Get("node_id") // Build the request and parse the ACL token args := cstructs.MonitorRequest{ NodeID: nodeID, + ServerID: req.URL.Query().Get("server_id"), LogLevel: logLevel, LogJSON: logJSON, PlainText: plainText, diff --git a/command/agent_monitor.go b/command/agent_monitor.go index 4927c5784..4d50faf6a 100644 --- a/command/agent_monitor.go +++ b/command/agent_monitor.go @@ -61,12 +61,14 @@ func (c *MonitorCommand) Run(args []string) int { var logLevel string var nodeID string + var serverID string var logJSON bool flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.StringVar(&logLevel, "log-level", "", "") flags.StringVar(&nodeID, "node-id", "", "") + flags.StringVar(&serverID, "server-id", "", "") flags.BoolVar(&logJSON, "json", false, "") if err := flags.Parse(args); err != nil { @@ -90,6 +92,7 @@ func (c *MonitorCommand) Run(args []string) int { params := map[string]string{ "log_level": logLevel, "node_id": nodeID, + "server_id": serverID, "log_json": strconv.FormatBool(logJSON), } diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 1b664730f..70c277d1b 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -61,7 +61,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { // Targeting a node, forward request to node if args.NodeID != "" && args.NodeID != "leader" { - m.forwardMonitor(conn, args, encoder, decoder) + m.forwardMonitorClient(conn, args, encoder, decoder) // forwarded request has ended, return return } @@ -73,12 +73,18 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { return } if !isLeader && remoteServer == nil { - err := fmt.Errorf("No leader") + err := fmt.Errorf("no leader") handleStreamResultError(err, helper.Int64ToPtr(400), encoder) return } } + // targeting a specific server, forward to that server + if args.ServerID != "" { + m.forwardMonitorServer(conn, args, encoder, decoder) + return + } + // NodeID was empty, so monitor this current server ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -181,7 +187,7 @@ OUTER: } } -func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { +func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { nodeID := args.NodeID snap, err := m.srv.State().Snapshot() @@ -272,3 +278,43 @@ func (m *Agent) forwardMonitorLeader(leader *serverParts, conn io.ReadWriteClose structs.Bridge(conn, leaderConn) return } + +func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { + serverID := args.ServerID + // empty ServerID to prevent forwarding loop + args.ServerID = "" + + serfMembers := m.srv.Members() + var target *serverParts + for _, mem := range serfMembers { + if mem.Name == serverID { + ok, srv := isNomadServer(mem) + if !ok { + err := fmt.Errorf("unknown nomad server %s", serverID) + handleStreamResultError(err, nil, encoder) + return + } + target = srv + } + } + + var serverConn net.Conn + localConn, err := m.srv.streamingRpc(target, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + serverConn = localConn + defer serverConn.Close() + + // Send the Request + outEncoder := codec.NewEncoder(serverConn, structs.MsgpackHandle) + if err := outEncoder.Encode(args); err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + structs.Bridge(conn, serverConn) + return +} From e46c41553d2eec347aa3c9209bb0ca20757db909 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Wed, 13 Nov 2019 09:32:18 -0500 Subject: [PATCH 3/3] serverID to target remote leader or server handle the case where we request a server-id which is this current server update docs, error on node and server id params more accurate names for tests use shared no leader err, formatting rm bad comment remove redundant variable --- command/agent/agent_endpoint.go | 5 + nomad/client_agent_endpoint.go | 94 ++++----- nomad/client_agent_endpoint_test.go | 180 +++++++++++------- website/source/api/agent.html.md | 13 +- .../source/docs/commands/monitor.html.md.erb | 4 + 5 files changed, 175 insertions(+), 121 deletions(-) diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 58fa0f6a5..2d25dc48b 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -205,6 +205,11 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( PlainText: plainText, } + // if node and server were requested return error + if args.NodeID != "" && args.ServerID != "" { + return nil, CodedError(400, "Cannot target node and server simultaneously") + } + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) // Make the RPC diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 70c277d1b..60d8c7b44 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -60,27 +60,25 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { } // Targeting a node, forward request to node - if args.NodeID != "" && args.NodeID != "leader" { + if args.NodeID != "" { m.forwardMonitorClient(conn, args, encoder, decoder) // forwarded request has ended, return return } - if args.NodeID == "leader" { - isLeader, remoteServer := m.srv.getLeader() - if !isLeader && remoteServer != nil { - m.forwardMonitorLeader(remoteServer, conn, args, encoder, decoder) - return - } - if !isLeader && remoteServer == nil { - err := fmt.Errorf("no leader") - handleStreamResultError(err, helper.Int64ToPtr(400), encoder) - return - } + currentServer := m.srv.serf.LocalMember().Name + var forwardServer bool + // Targeting a remote server which is not the leader and not this server + if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer { + forwardServer = true } - // targeting a specific server, forward to that server - if args.ServerID != "" { + // Targeting leader and this server is not current leader + if args.ServerID == "leader" && !m.srv.IsLeader() { + forwardServer = true + } + + if forwardServer { m.forwardMonitorServer(conn, args, encoder, decoder) return } @@ -256,62 +254,52 @@ func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni return } -func (m *Agent) forwardMonitorLeader(leader *serverParts, conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { - var leaderConn net.Conn - - localConn, err := m.srv.streamingRpc(leader, "Agent.Monitor") - if err != nil { - handleStreamResultError(err, nil, encoder) - return - } - - leaderConn = localConn - defer leaderConn.Close() - - // Send the Request - outEncoder := codec.NewEncoder(leaderConn, structs.MsgpackHandle) - if err := outEncoder.Encode(args); err != nil { - handleStreamResultError(err, nil, encoder) - return - } - - structs.Bridge(conn, leaderConn) - return -} - func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { + var target *serverParts serverID := args.ServerID + // empty ServerID to prevent forwarding loop args.ServerID = "" - serfMembers := m.srv.Members() - var target *serverParts - for _, mem := range serfMembers { - if mem.Name == serverID { - ok, srv := isNomadServer(mem) - if !ok { - err := fmt.Errorf("unknown nomad server %s", serverID) - handleStreamResultError(err, nil, encoder) - return + if serverID == "leader" { + isLeader, remoteServer := m.srv.getLeader() + if !isLeader && remoteServer != nil { + target = remoteServer + } + if !isLeader && remoteServer == nil { + handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder) + return + } + } else { + // See if the server ID is a known member + serfMembers := m.srv.Members() + for _, mem := range serfMembers { + if mem.Name == serverID { + if ok, srv := isNomadServer(mem); ok { + target = srv + } } - target = srv } } - var serverConn net.Conn - localConn, err := m.srv.streamingRpc(target, "Agent.Monitor") - if err != nil { - handleStreamResultError(err, nil, encoder) + // Unable to find a server + if target == nil { + err := fmt.Errorf("unknown nomad server %s", serverID) + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) return } - serverConn = localConn + serverConn, err := m.srv.streamingRpc(target, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } defer serverConn.Close() // Send the Request outEncoder := codec.NewEncoder(serverConn, structs.MsgpackHandle) if err := outEncoder.Encode(args); err != nil { - handleStreamResultError(err, nil, encoder) + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) return } diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 1ddbdb628..b80f75c56 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client" "github.com/hashicorp/nomad/client/config" @@ -22,7 +23,7 @@ import ( "github.com/ugorji/go/codec" ) -func TestMonitor_Monitor_Remote_Server(t *testing.T) { +func TestMonitor_Monitor_Remote_Client(t *testing.T) { t.Parallel() require := require.New(t) @@ -117,9 +118,8 @@ OUTER: } } -func TestMonitor_MonitorRemoteLeader(t *testing.T) { +func TestMonitor_Monitor_RemoteServer(t *testing.T) { t.Parallel() - require := require.New(t) // start servers s1 := TestServer(t, nil) @@ -132,6 +132,7 @@ func TestMonitor_MonitorRemoteLeader(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) + // determine leader and nonleader servers := []*Server{s1, s2} var nonLeader *Server var leader *Server @@ -143,78 +144,127 @@ func TestMonitor_MonitorRemoteLeader(t *testing.T) { } } - go func() { - for { - leader.logger.Warn("leader log") - time.Sleep(10 * time.Millisecond) - } - }() - - // No node ID to monitor the remote server - req := cstructs.MonitorRequest{ - LogLevel: "warn", - NodeID: "leader", + cases := []struct { + desc string + serverID string + expectedLog string + logger hclog.InterceptLogger + origin *Server + }{ + { + desc: "remote leader", + serverID: "leader", + expectedLog: "leader log", + logger: leader.logger, + origin: nonLeader, + }, + { + desc: "remote server", + serverID: nonLeader.serf.LocalMember().Name, + expectedLog: "nonleader log", + logger: nonLeader.logger, + origin: leader, + }, + { + desc: "serverID is current leader", + serverID: "leader", + expectedLog: "leader log", + logger: leader.logger, + origin: leader, + }, + { + desc: "serverID is current server", + serverID: nonLeader.serf.LocalMember().Name, + expectedLog: "non leader log", + logger: nonLeader.logger, + origin: nonLeader, + }, } - handler, err := nonLeader.StreamingRpcHandler("Agent.Monitor") - require.Nil(err) + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + require := require.New(t) - // create pipe - p1, p2 := net.Pipe() - defer p1.Close() - defer p2.Close() - - errCh := make(chan error) - streamMsg := make(chan *cstructs.StreamErrWrapper) - - go handler(p2) - - // Start decoder - go func() { - decoder := codec.NewDecoder(p1, structs.MsgpackHandle) - for { - var msg cstructs.StreamErrWrapper - if err := decoder.Decode(&msg); err != nil { - if err == io.EOF || strings.Contains(err.Error(), "closed") { - return + // send some specific logs + doneCh := make(chan struct{}) + go func() { + for { + select { + case <-doneCh: + return + default: + tc.logger.Warn(tc.expectedLog) + time.Sleep(10 * time.Millisecond) + } } - errCh <- fmt.Errorf("error decoding: %v", err) + }() + + req := cstructs.MonitorRequest{ + LogLevel: "warn", + ServerID: tc.serverID, } - streamMsg <- &msg - } - }() + handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) - // send request - encoder := codec.NewEncoder(p1, structs.MsgpackHandle) - require.Nil(encoder.Encode(req)) + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() - timeout := time.After(2 * time.Second) - expected := "leader log" - received := "" + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) -OUTER: - for { - select { - case <-timeout: - t.Fatal("timeout waiting for logs") - case err := <-errCh: - t.Fatal(err) - case msg := <-streamMsg: - if msg.Error != nil { - t.Fatalf("Got error: %v", msg.Error.Error()) + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(2 * time.Second) + received := "" + + OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) + if strings.Contains(received, tc.expectedLog) { + close(doneCh) + require.Nil(p2.Close()) + break OUTER + } + } } - - var frame sframer.StreamFrame - err := json.Unmarshal(msg.Payload, &frame) - assert.NoError(t, err) - - received += string(frame.Data) - if strings.Contains(received, expected) { - require.Nil(p2.Close()) - break OUTER - } - } + }) } } diff --git a/website/source/api/agent.html.md b/website/source/api/agent.html.md index 7d2446719..42e512567 100644 --- a/website/source/api/agent.html.md +++ b/website/source/api/agent.html.md @@ -518,16 +518,20 @@ The table below shows this endpoint's support for ### Parameters -- `log-level` `(string: "info")` - Specifies a text string containing a log level +- `log_level` `(string: "info")` - Specifies a text string containing a log level to filter on, such as `info`. Possible values include `trace`, `debug`, `info`, `warn`, `error` - `json` `(bool: false)` - Specifies if the log format for streamed logs should be JSON. -- `node-id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text +- `node_id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text string containing a node-id to target for streaming. +- `server_id` `(string: "server1.global")` - Specifies a text + string containing a server name or "leader" to target a specific remote server + or leader for streaming. + - `plain` `(bool: false)` - Specifies if the response should be JSON or plaintext @@ -535,7 +539,10 @@ The table below shows this endpoint's support for ```text $ curl \ - https://localhost:4646/v1/agent/monitor?log-level=debug&node-id=a57b2adb-1a30-2dda-8df0-25abb0881952 + https://localhost:4646/v1/agent/monitor?log_level=debug&server_id=leader + +$ curl \ + https://localhost:4646/v1/agent/monitor?log_level=debug&node_id=a57b2adb-1a30-2dda-8df0-25abb0881952 ``` ### Sample Response diff --git a/website/source/docs/commands/monitor.html.md.erb b/website/source/docs/commands/monitor.html.md.erb index 7f6f0264f..c26e730b2 100644 --- a/website/source/docs/commands/monitor.html.md.erb +++ b/website/source/docs/commands/monitor.html.md.erb @@ -37,6 +37,10 @@ The monitor command also allows you to specify a single client node id to follow - `-node-id`: Specifies the client node-id to stream logs from. If no node-id is given the nomad server from the -address flag will be used. +- `-server-id`: Specifies the nomad server id to stream logs from. Accepts +server names from `nomad server members` and also a special `leader` option +which will target the current leader. + - `-json`: Stream logs in json format ## Examples