diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 58fa0f6a5..2d25dc48b 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -205,6 +205,11 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( PlainText: plainText, } + // if node and server were requested return error + if args.NodeID != "" && args.ServerID != "" { + return nil, CodedError(400, "Cannot target node and server simultaneously") + } + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) // Make the RPC diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 70c277d1b..60d8c7b44 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -60,27 +60,25 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { } // Targeting a node, forward request to node - if args.NodeID != "" && args.NodeID != "leader" { + if args.NodeID != "" { m.forwardMonitorClient(conn, args, encoder, decoder) // forwarded request has ended, return return } - if args.NodeID == "leader" { - isLeader, remoteServer := m.srv.getLeader() - if !isLeader && remoteServer != nil { - m.forwardMonitorLeader(remoteServer, conn, args, encoder, decoder) - return - } - if !isLeader && remoteServer == nil { - err := fmt.Errorf("no leader") - handleStreamResultError(err, helper.Int64ToPtr(400), encoder) - return - } + currentServer := m.srv.serf.LocalMember().Name + var forwardServer bool + // Targeting a remote server which is not the leader and not this server + if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer { + forwardServer = true } - // targeting a specific server, forward to that server - if args.ServerID != "" { + // Targeting leader and this server is not current leader + if args.ServerID == "leader" && !m.srv.IsLeader() { + forwardServer = true + } + + if forwardServer { m.forwardMonitorServer(conn, args, encoder, decoder) return } @@ -256,62 +254,52 @@ func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni return } -func (m *Agent) forwardMonitorLeader(leader *serverParts, conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { - var leaderConn net.Conn - - localConn, err := m.srv.streamingRpc(leader, "Agent.Monitor") - if err != nil { - handleStreamResultError(err, nil, encoder) - return - } - - leaderConn = localConn - defer leaderConn.Close() - - // Send the Request - outEncoder := codec.NewEncoder(leaderConn, structs.MsgpackHandle) - if err := outEncoder.Encode(args); err != nil { - handleStreamResultError(err, nil, encoder) - return - } - - structs.Bridge(conn, leaderConn) - return -} - func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { + var target *serverParts serverID := args.ServerID + // empty ServerID to prevent forwarding loop args.ServerID = "" - serfMembers := m.srv.Members() - var target *serverParts - for _, mem := range serfMembers { - if mem.Name == serverID { - ok, srv := isNomadServer(mem) - if !ok { - err := fmt.Errorf("unknown nomad server %s", serverID) - handleStreamResultError(err, nil, encoder) - return + if serverID == "leader" { + isLeader, remoteServer := m.srv.getLeader() + if !isLeader && remoteServer != nil { + target = remoteServer + } + if !isLeader && remoteServer == nil { + handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder) + return + } + } else { + // See if the server ID is a known member + serfMembers := m.srv.Members() + for _, mem := range serfMembers { + if mem.Name == serverID { + if ok, srv := isNomadServer(mem); ok { + target = srv + } } - target = srv } } - var serverConn net.Conn - localConn, err := m.srv.streamingRpc(target, "Agent.Monitor") - if err != nil { - handleStreamResultError(err, nil, encoder) + // Unable to find a server + if target == nil { + err := fmt.Errorf("unknown nomad server %s", serverID) + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) return } - serverConn = localConn + serverConn, err := m.srv.streamingRpc(target, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } defer serverConn.Close() // Send the Request outEncoder := codec.NewEncoder(serverConn, structs.MsgpackHandle) if err := outEncoder.Encode(args); err != nil { - handleStreamResultError(err, nil, encoder) + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) return } diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 1ddbdb628..b80f75c56 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client" "github.com/hashicorp/nomad/client/config" @@ -22,7 +23,7 @@ import ( "github.com/ugorji/go/codec" ) -func TestMonitor_Monitor_Remote_Server(t *testing.T) { +func TestMonitor_Monitor_Remote_Client(t *testing.T) { t.Parallel() require := require.New(t) @@ -117,9 +118,8 @@ OUTER: } } -func TestMonitor_MonitorRemoteLeader(t *testing.T) { +func TestMonitor_Monitor_RemoteServer(t *testing.T) { t.Parallel() - require := require.New(t) // start servers s1 := TestServer(t, nil) @@ -132,6 +132,7 @@ func TestMonitor_MonitorRemoteLeader(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) + // determine leader and nonleader servers := []*Server{s1, s2} var nonLeader *Server var leader *Server @@ -143,78 +144,127 @@ func TestMonitor_MonitorRemoteLeader(t *testing.T) { } } - go func() { - for { - leader.logger.Warn("leader log") - time.Sleep(10 * time.Millisecond) - } - }() - - // No node ID to monitor the remote server - req := cstructs.MonitorRequest{ - LogLevel: "warn", - NodeID: "leader", + cases := []struct { + desc string + serverID string + expectedLog string + logger hclog.InterceptLogger + origin *Server + }{ + { + desc: "remote leader", + serverID: "leader", + expectedLog: "leader log", + logger: leader.logger, + origin: nonLeader, + }, + { + desc: "remote server", + serverID: nonLeader.serf.LocalMember().Name, + expectedLog: "nonleader log", + logger: nonLeader.logger, + origin: leader, + }, + { + desc: "serverID is current leader", + serverID: "leader", + expectedLog: "leader log", + logger: leader.logger, + origin: leader, + }, + { + desc: "serverID is current server", + serverID: nonLeader.serf.LocalMember().Name, + expectedLog: "non leader log", + logger: nonLeader.logger, + origin: nonLeader, + }, } - handler, err := nonLeader.StreamingRpcHandler("Agent.Monitor") - require.Nil(err) + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + require := require.New(t) - // create pipe - p1, p2 := net.Pipe() - defer p1.Close() - defer p2.Close() - - errCh := make(chan error) - streamMsg := make(chan *cstructs.StreamErrWrapper) - - go handler(p2) - - // Start decoder - go func() { - decoder := codec.NewDecoder(p1, structs.MsgpackHandle) - for { - var msg cstructs.StreamErrWrapper - if err := decoder.Decode(&msg); err != nil { - if err == io.EOF || strings.Contains(err.Error(), "closed") { - return + // send some specific logs + doneCh := make(chan struct{}) + go func() { + for { + select { + case <-doneCh: + return + default: + tc.logger.Warn(tc.expectedLog) + time.Sleep(10 * time.Millisecond) + } } - errCh <- fmt.Errorf("error decoding: %v", err) + }() + + req := cstructs.MonitorRequest{ + LogLevel: "warn", + ServerID: tc.serverID, } - streamMsg <- &msg - } - }() + handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) - // send request - encoder := codec.NewEncoder(p1, structs.MsgpackHandle) - require.Nil(encoder.Encode(req)) + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() - timeout := time.After(2 * time.Second) - expected := "leader log" - received := "" + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) -OUTER: - for { - select { - case <-timeout: - t.Fatal("timeout waiting for logs") - case err := <-errCh: - t.Fatal(err) - case msg := <-streamMsg: - if msg.Error != nil { - t.Fatalf("Got error: %v", msg.Error.Error()) + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(2 * time.Second) + received := "" + + OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) + if strings.Contains(received, tc.expectedLog) { + close(doneCh) + require.Nil(p2.Close()) + break OUTER + } + } } - - var frame sframer.StreamFrame - err := json.Unmarshal(msg.Payload, &frame) - assert.NoError(t, err) - - received += string(frame.Data) - if strings.Contains(received, expected) { - require.Nil(p2.Close()) - break OUTER - } - } + }) } } diff --git a/website/source/api/agent.html.md b/website/source/api/agent.html.md index 7d2446719..42e512567 100644 --- a/website/source/api/agent.html.md +++ b/website/source/api/agent.html.md @@ -518,16 +518,20 @@ The table below shows this endpoint's support for ### Parameters -- `log-level` `(string: "info")` - Specifies a text string containing a log level +- `log_level` `(string: "info")` - Specifies a text string containing a log level to filter on, such as `info`. Possible values include `trace`, `debug`, `info`, `warn`, `error` - `json` `(bool: false)` - Specifies if the log format for streamed logs should be JSON. -- `node-id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text +- `node_id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text string containing a node-id to target for streaming. +- `server_id` `(string: "server1.global")` - Specifies a text + string containing a server name or "leader" to target a specific remote server + or leader for streaming. + - `plain` `(bool: false)` - Specifies if the response should be JSON or plaintext @@ -535,7 +539,10 @@ The table below shows this endpoint's support for ```text $ curl \ - https://localhost:4646/v1/agent/monitor?log-level=debug&node-id=a57b2adb-1a30-2dda-8df0-25abb0881952 + https://localhost:4646/v1/agent/monitor?log_level=debug&server_id=leader + +$ curl \ + https://localhost:4646/v1/agent/monitor?log_level=debug&node_id=a57b2adb-1a30-2dda-8df0-25abb0881952 ``` ### Sample Response diff --git a/website/source/docs/commands/monitor.html.md.erb b/website/source/docs/commands/monitor.html.md.erb index 7f6f0264f..c26e730b2 100644 --- a/website/source/docs/commands/monitor.html.md.erb +++ b/website/source/docs/commands/monitor.html.md.erb @@ -37,6 +37,10 @@ The monitor command also allows you to specify a single client node id to follow - `-node-id`: Specifies the client node-id to stream logs from. If no node-id is given the nomad server from the -address flag will be used. +- `-server-id`: Specifies the nomad server id to stream logs from. Accepts +server names from `nomad server members` and also a special `leader` option +which will target the current leader. + - `-json`: Stream logs in json format ## Examples