From fb49f3c35bec799d899dd4546d0e6451f8650dff Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Tue, 12 Nov 2019 16:23:39 -0500 Subject: [PATCH] add server-id to monitor specific server --- client/structs/structs.go | 3 ++ command/agent/agent_endpoint.go | 5 ++-- command/agent_monitor.go | 3 ++ nomad/client_agent_endpoint.go | 52 +++++++++++++++++++++++++++++++-- 4 files changed, 57 insertions(+), 6 deletions(-) diff --git a/client/structs/structs.go b/client/structs/structs.go index 6a84e2d63..f2c54bdeb 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -44,6 +44,9 @@ type MonitorRequest struct { // NodeID is the node we want to track the logs of NodeID string + // ServerID is the server we want to track the logs of + ServerID string + // PlainText disables base64 encoding. PlainText bool diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 56c3631b7..58fa0f6a5 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -175,9 +175,6 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( return nil, CodedError(400, fmt.Sprintf("Unknown log level: %s", logLevel)) } - // Determine if we are targeting a server or client - nodeID := req.URL.Query().Get("node_id") - logJSON := false logJSONStr := req.URL.Query().Get("log_json") if logJSONStr != "" { @@ -198,9 +195,11 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( plainText = parsed } + nodeID := req.URL.Query().Get("node_id") // Build the request and parse the ACL token args := cstructs.MonitorRequest{ NodeID: nodeID, + ServerID: req.URL.Query().Get("server_id"), LogLevel: logLevel, LogJSON: logJSON, PlainText: plainText, diff --git a/command/agent_monitor.go b/command/agent_monitor.go index 4927c5784..4d50faf6a 100644 --- a/command/agent_monitor.go +++ b/command/agent_monitor.go @@ -61,12 +61,14 @@ func (c *MonitorCommand) Run(args []string) int { var logLevel string var nodeID string + var serverID string var logJSON bool flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.StringVar(&logLevel, "log-level", "", "") flags.StringVar(&nodeID, "node-id", "", "") + flags.StringVar(&serverID, "server-id", "", "") flags.BoolVar(&logJSON, "json", false, "") if err := flags.Parse(args); err != nil { @@ -90,6 +92,7 @@ func (c *MonitorCommand) Run(args []string) int { params := map[string]string{ "log_level": logLevel, "node_id": nodeID, + "server_id": serverID, "log_json": strconv.FormatBool(logJSON), } diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 1b664730f..70c277d1b 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -61,7 +61,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { // Targeting a node, forward request to node if args.NodeID != "" && args.NodeID != "leader" { - m.forwardMonitor(conn, args, encoder, decoder) + m.forwardMonitorClient(conn, args, encoder, decoder) // forwarded request has ended, return return } @@ -73,12 +73,18 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { return } if !isLeader && remoteServer == nil { - err := fmt.Errorf("No leader") + err := fmt.Errorf("no leader") handleStreamResultError(err, helper.Int64ToPtr(400), encoder) return } } + // targeting a specific server, forward to that server + if args.ServerID != "" { + m.forwardMonitorServer(conn, args, encoder, decoder) + return + } + // NodeID was empty, so monitor this current server ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -181,7 +187,7 @@ OUTER: } } -func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { +func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { nodeID := args.NodeID snap, err := m.srv.State().Snapshot() @@ -272,3 +278,43 @@ func (m *Agent) forwardMonitorLeader(leader *serverParts, conn io.ReadWriteClose structs.Bridge(conn, leaderConn) return } + +func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { + serverID := args.ServerID + // empty ServerID to prevent forwarding loop + args.ServerID = "" + + serfMembers := m.srv.Members() + var target *serverParts + for _, mem := range serfMembers { + if mem.Name == serverID { + ok, srv := isNomadServer(mem) + if !ok { + err := fmt.Errorf("unknown nomad server %s", serverID) + handleStreamResultError(err, nil, encoder) + return + } + target = srv + } + } + + var serverConn net.Conn + localConn, err := m.srv.streamingRpc(target, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + serverConn = localConn + defer serverConn.Close() + + // Send the Request + outEncoder := codec.NewEncoder(serverConn, structs.MsgpackHandle) + if err := outEncoder.Encode(args); err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + structs.Bridge(conn, serverConn) + return +}