diff --git a/client/structs/structs.go b/client/structs/structs.go index 6a84e2d63..f2c54bdeb 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -44,6 +44,9 @@ type MonitorRequest struct { // NodeID is the node we want to track the logs of NodeID string + // ServerID is the server we want to track the logs of + ServerID string + // PlainText disables base64 encoding. PlainText bool diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 56c3631b7..2d25dc48b 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -175,9 +175,6 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( return nil, CodedError(400, fmt.Sprintf("Unknown log level: %s", logLevel)) } - // Determine if we are targeting a server or client - nodeID := req.URL.Query().Get("node_id") - logJSON := false logJSONStr := req.URL.Query().Get("log_json") if logJSONStr != "" { @@ -198,14 +195,21 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( plainText = parsed } + nodeID := req.URL.Query().Get("node_id") // Build the request and parse the ACL token args := cstructs.MonitorRequest{ NodeID: nodeID, + ServerID: req.URL.Query().Get("server_id"), LogLevel: logLevel, LogJSON: logJSON, PlainText: plainText, } + // if node and server were requested return error + if args.NodeID != "" && args.ServerID != "" { + return nil, CodedError(400, "Cannot target node and server simultaneously") + } + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) // Make the RPC diff --git a/command/agent_monitor.go b/command/agent_monitor.go index 4927c5784..4d50faf6a 100644 --- a/command/agent_monitor.go +++ b/command/agent_monitor.go @@ -61,12 +61,14 @@ func (c *MonitorCommand) Run(args []string) int { var logLevel string var nodeID string + var serverID string var logJSON bool flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.StringVar(&logLevel, "log-level", "", "") flags.StringVar(&nodeID, "node-id", "", "") + flags.StringVar(&serverID, "server-id", "", "") flags.BoolVar(&logJSON, "json", false, "") if err := flags.Parse(args); err != nil { @@ -90,6 +92,7 @@ func (c *MonitorCommand) Run(args []string) int { params := map[string]string{ "log_level": logLevel, "node_id": nodeID, + "server_id": serverID, "log_json": strconv.FormatBool(logJSON), } diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 8101b2b57..60d8c7b44 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -61,11 +61,28 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { // Targeting a node, forward request to node if args.NodeID != "" { - m.forwardMonitor(conn, args, encoder, decoder) + m.forwardMonitorClient(conn, args, encoder, decoder) // forwarded request has ended, return return } + currentServer := m.srv.serf.LocalMember().Name + var forwardServer bool + // Targeting a remote server which is not the leader and not this server + if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer { + forwardServer = true + } + + // Targeting leader and this server is not current leader + if args.ServerID == "leader" && !m.srv.IsLeader() { + forwardServer = true + } + + if forwardServer { + m.forwardMonitorServer(conn, args, encoder, decoder) + return + } + // NodeID was empty, so monitor this current server ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -168,7 +185,7 @@ OUTER: } } -func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { +func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { nodeID := args.NodeID snap, err := m.srv.State().Snapshot() @@ -236,3 +253,56 @@ func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorReq structs.Bridge(conn, clientConn) return } + +func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { + var target *serverParts + serverID := args.ServerID + + // empty ServerID to prevent forwarding loop + args.ServerID = "" + + if serverID == "leader" { + isLeader, remoteServer := m.srv.getLeader() + if !isLeader && remoteServer != nil { + target = remoteServer + } + if !isLeader && remoteServer == nil { + handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder) + return + } + } else { + // See if the server ID is a known member + serfMembers := m.srv.Members() + for _, mem := range serfMembers { + if mem.Name == serverID { + if ok, srv := isNomadServer(mem); ok { + target = srv + } + } + } + } + + // Unable to find a server + if target == nil { + err := fmt.Errorf("unknown nomad server %s", serverID) + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + + serverConn, err := m.srv.streamingRpc(target, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + defer serverConn.Close() + + // Send the Request + outEncoder := codec.NewEncoder(serverConn, structs.MsgpackHandle) + if err := outEncoder.Encode(args); err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + structs.Bridge(conn, serverConn) + return +} diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 201e21fb9..b80f75c56 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client" "github.com/hashicorp/nomad/client/config" @@ -22,7 +23,7 @@ import ( "github.com/ugorji/go/codec" ) -func TestMonitor_Monitor_Remote_Server(t *testing.T) { +func TestMonitor_Monitor_Remote_Client(t *testing.T) { t.Parallel() require := require.New(t) @@ -117,6 +118,156 @@ OUTER: } } +func TestMonitor_Monitor_RemoteServer(t *testing.T) { + t.Parallel() + + // start servers + s1 := TestServer(t, nil) + defer s1.Shutdown() + s2 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // determine leader and nonleader + servers := []*Server{s1, s2} + var nonLeader *Server + var leader *Server + for _, s := range servers { + if !s.IsLeader() { + nonLeader = s + } else { + leader = s + } + } + + cases := []struct { + desc string + serverID string + expectedLog string + logger hclog.InterceptLogger + origin *Server + }{ + { + desc: "remote leader", + serverID: "leader", + expectedLog: "leader log", + logger: leader.logger, + origin: nonLeader, + }, + { + desc: "remote server", + serverID: nonLeader.serf.LocalMember().Name, + expectedLog: "nonleader log", + logger: nonLeader.logger, + origin: leader, + }, + { + desc: "serverID is current leader", + serverID: "leader", + expectedLog: "leader log", + logger: leader.logger, + origin: leader, + }, + { + desc: "serverID is current server", + serverID: nonLeader.serf.LocalMember().Name, + expectedLog: "non leader log", + logger: nonLeader.logger, + origin: nonLeader, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + require := require.New(t) + + // send some specific logs + doneCh := make(chan struct{}) + go func() { + for { + select { + case <-doneCh: + return + default: + tc.logger.Warn(tc.expectedLog) + time.Sleep(10 * time.Millisecond) + } + } + }() + + req := cstructs.MonitorRequest{ + LogLevel: "warn", + ServerID: tc.serverID, + } + + handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(2 * time.Second) + received := "" + + OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) + if strings.Contains(received, tc.expectedLog) { + close(doneCh) + require.Nil(p2.Close()) + break OUTER + } + } + } + }) + } +} + func TestMonitor_MonitorServer(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/website/source/api/agent.html.md b/website/source/api/agent.html.md index 7d2446719..42e512567 100644 --- a/website/source/api/agent.html.md +++ b/website/source/api/agent.html.md @@ -518,16 +518,20 @@ The table below shows this endpoint's support for ### Parameters -- `log-level` `(string: "info")` - Specifies a text string containing a log level +- `log_level` `(string: "info")` - Specifies a text string containing a log level to filter on, such as `info`. Possible values include `trace`, `debug`, `info`, `warn`, `error` - `json` `(bool: false)` - Specifies if the log format for streamed logs should be JSON. -- `node-id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text +- `node_id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text string containing a node-id to target for streaming. +- `server_id` `(string: "server1.global")` - Specifies a text + string containing a server name or "leader" to target a specific remote server + or leader for streaming. + - `plain` `(bool: false)` - Specifies if the response should be JSON or plaintext @@ -535,7 +539,10 @@ The table below shows this endpoint's support for ```text $ curl \ - https://localhost:4646/v1/agent/monitor?log-level=debug&node-id=a57b2adb-1a30-2dda-8df0-25abb0881952 + https://localhost:4646/v1/agent/monitor?log_level=debug&server_id=leader + +$ curl \ + https://localhost:4646/v1/agent/monitor?log_level=debug&node_id=a57b2adb-1a30-2dda-8df0-25abb0881952 ``` ### Sample Response diff --git a/website/source/docs/commands/monitor.html.md.erb b/website/source/docs/commands/monitor.html.md.erb index 7f6f0264f..c26e730b2 100644 --- a/website/source/docs/commands/monitor.html.md.erb +++ b/website/source/docs/commands/monitor.html.md.erb @@ -37,6 +37,10 @@ The monitor command also allows you to specify a single client node id to follow - `-node-id`: Specifies the client node-id to stream logs from. If no node-id is given the nomad server from the -address flag will be used. +- `-server-id`: Specifies the nomad server id to stream logs from. Accepts +server names from `nomad server members` and also a special `leader` option +which will target the current leader. + - `-json`: Stream logs in json format ## Examples