From c28e5ad036167653d8863a62bec5c081b96cbfec Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Tue, 10 Dec 2019 15:18:57 -0500 Subject: [PATCH] warn when enabled debug is on when registering m -> a receiver name return codederrors, fix query --- client/agent_endpoint.go | 23 ++++++--- client/structs/structs.go | 11 ++-- command/agent/agent_endpoint.go | 18 +++++-- command/agent/agent_endpoint_test.go | 6 ++- command/agent/http.go | 1 + nomad/client_agent_endpoint.go | 75 ++++++++++++++++------------ nomad/client_agent_endpoint_test.go | 24 +++++---- 7 files changed, 98 insertions(+), 60 deletions(-) diff --git a/client/agent_endpoint.go b/client/agent_endpoint.go index 6744b8a9a..76fde73c4 100644 --- a/client/agent_endpoint.go +++ b/client/agent_endpoint.go @@ -24,12 +24,19 @@ type Agent struct { } func NewAgentEndpoint(c *Client) *Agent { - m := &Agent{c: c} - m.c.streamingRpcs.Register("Agent.Monitor", m.monitor) - return m + a := &Agent{c: c} + a.c.streamingRpcs.Register("Agent.Monitor", a.monitor) + return a } -func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { +func (a *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { + // Check ACL for agent write + if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { + return structs.NewErrRPCCoded(500, err.Error()) + } else if aclObj != nil && !aclObj.AllowAgentWrite() { + return structs.NewErrRPCCoded(403, structs.ErrPermissionDenied.Error()) + } + var resp []byte var err error @@ -55,12 +62,12 @@ func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentP // Copy profile response to reply reply.Payload = resp - reply.AgentID = m.c.NodeID() + reply.AgentID = a.c.NodeID() return nil } -func (m *Agent) monitor(conn io.ReadWriteCloser) { +func (a *Agent) monitor(conn io.ReadWriteCloser) { defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now()) defer conn.Close() @@ -75,7 +82,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { } // Check acl - if aclObj, err := m.c.ResolveToken(args.AuthToken); err != nil { + if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { handleStreamResultError(err, helper.Int64ToPtr(403), encoder) return } else if aclObj != nil && !aclObj.AllowAgentRead() { @@ -96,7 +103,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{ + monitor := monitor.New(512, a.c.logger, &log.LoggerOptions{ JSONFormat: args.LogJSON, Level: logLevel, }) diff --git a/client/structs/structs.go b/client/structs/structs.go index 5429aaf38..10c9ad239 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -35,6 +35,7 @@ type ClientStatsResponse struct { structs.QueryMeta } +// MonitorRequest is used to request and stream logs from a client node. type MonitorRequest struct { // LogLevel is the log level filter we want to stream logs on LogLevel string @@ -54,11 +55,14 @@ type MonitorRequest struct { structs.QueryOptions } +// AgentPprofRequest is used to request a pprof report for a given node. type AgentPprofRequest struct { - // Profile specifies the profile to use + // ReqType specifies the profile to use ReqType profile.ReqType + // Profile specifies the runtime/pprof profile to lookup and generate. Profile string + // Seconds is the number of seconds to capture a profile Seconds int @@ -74,13 +78,12 @@ type AgentPprofRequest struct { structs.QueryOptions } +// AgentPprofResponse is used to return a generated pprof profile type AgentPprofResponse struct { - // Error stores any error that may have occurred. - Error *RpcError - // ID of the agent that fulfilled the request AgentID string + // Payload is the generated pprof profile Payload []byte } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 7020795c9..7eddfb3f5 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -348,7 +348,10 @@ func (s *HTTPServer) AgentPprofRequest(resp http.ResponseWriter, req *http.Reque return s.agentPprof(profile.TraceReq, resp, req) default: // Add profile to request - req.URL.Query().Add("profile", path) + values := req.URL.Query() + values.Add("profile", path) + req.URL.RawQuery = values.Encode() + // generic pprof profile request return s.agentPprof(profile.LookupReq, resp, req) } @@ -424,10 +427,19 @@ func (s *HTTPServer) agentPprof(reqType profile.ReqType, resp http.ResponseWrite } if rpcErr != nil { - // Return RPCCodedErr - return nil, rpcErr + code, msg, ok := structs.CodeFromRPCCodedErr(rpcErr) + if !ok { + return nil, CodedError(500, rpcErr.Error()) + } + // Return CodedError + return nil, CodedError(code, msg) } + // Pprof cmdline is not a typical pprof + // so just return string instead of bytes + if args.ReqType == profile.CmdReq { + return string(reply.Payload), nil + } return reply.Payload, nil } diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 024f77a75..a317228a7 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -549,12 +549,14 @@ func TestAgent_PprofRequest(t *testing.T) { if tc.expectedErr != "" { require.Error(t, err) + + httpErr, ok := err.(HTTPCodedError) + require.True(t, ok) + require.Equal(t, httpErr.Code(), tc.expectedStatus) } else { require.NoError(t, err) require.NotNil(t, resp) } - - require.Equal(t, tc.expectedStatus, respW.Code) }) }) } diff --git a/command/agent/http.go b/command/agent/http.go index 53ab014c5..9710e1e47 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -219,6 +219,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.Handle("/", handleRootFallthrough()) if enableDebug { + s.logger.Warn("enable_debug is set to true. This is insecure and should not be enabled in production") s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile) diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 604eacc5b..ba52afb1a 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -24,17 +24,17 @@ type Agent struct { srv *Server } -func (m *Agent) register() { - m.srv.streamingRpcs.Register("Agent.Monitor", m.monitor) +func (a *Agent) register() { + a.srv.streamingRpcs.Register("Agent.Monitor", a.monitor) } -func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { +func (a *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { // Targeting a node, forward request to node if args.NodeID != "" { - return m.forwardProfileClient(args, reply) + return a.forwardProfileClient(args, reply) } - currentServer := m.srv.serf.LocalMember().Name + currentServer := a.srv.serf.LocalMember().Name var forwardServer bool // Targeting a remote server which is not the leader and not this server if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer { @@ -42,20 +42,29 @@ func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentP } // Targeting leader and this server is not current leader - if args.ServerID == "leader" && !m.srv.IsLeader() { + if args.ServerID == "leader" && !a.srv.IsLeader() { forwardServer = true } + // Forward request to a remote server if forwardServer { // forward the request - return m.forwardProfileServer(args, reply) + return a.forwardProfileServer(args, reply) } + // Check ACL for agent write + if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + return structs.NewErrRPCCoded(500, err.Error()) + } else if aclObj != nil && !aclObj.AllowAgentWrite() { + return structs.NewErrRPCCoded(403, structs.ErrPermissionDenied.Error()) + } + + // Process the request on this server var resp []byte var err error // Mark which server fulfilled the request - reply.AgentID = m.srv.serf.LocalMember().Name + reply.AgentID = a.srv.serf.LocalMember().Name // Determine which profile to run // and generate profile. Blocks for args.Seconds @@ -85,7 +94,7 @@ func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentP return nil } -func (m *Agent) monitor(conn io.ReadWriteCloser) { +func (a *Agent) monitor(conn io.ReadWriteCloser) { defer conn.Close() // Decode args @@ -99,7 +108,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { } // Check agent read permissions - if aclObj, err := m.srv.ResolveToken(args.AuthToken); err != nil { + if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { handleStreamResultError(err, nil, encoder) return } else if aclObj != nil && !aclObj.AllowAgentRead() { @@ -119,12 +128,12 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { // Targeting a node, forward request to node if args.NodeID != "" { - m.forwardMonitorClient(conn, args, encoder, decoder) + a.forwardMonitorClient(conn, args, encoder, decoder) // forwarded request has ended, return return } - currentServer := m.srv.serf.LocalMember().Name + currentServer := a.srv.serf.LocalMember().Name var forwardServer bool // Targeting a remote server which is not the leader and not this server if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer { @@ -132,12 +141,12 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { } // Targeting leader and this server is not current leader - if args.ServerID == "leader" && !m.srv.IsLeader() { + if args.ServerID == "leader" && !a.srv.IsLeader() { forwardServer = true } if forwardServer { - m.forwardMonitorServer(conn, args, encoder, decoder) + a.forwardMonitorServer(conn, args, encoder, decoder) return } @@ -145,7 +154,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{ + monitor := monitor.New(512, a.srv.logger, &log.LoggerOptions{ Level: logLevel, JSONFormat: args.LogJSON, }) @@ -243,10 +252,10 @@ OUTER: } } -func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { +func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { nodeID := args.NodeID - snap, err := m.srv.State().Snapshot() + snap, err := a.srv.State().Snapshot() if err != nil { handleStreamResultError(err, nil, encoder) return @@ -272,10 +281,10 @@ func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni // Get the Connection to the client either by fowarding to another server // or creating direct stream var clientConn net.Conn - state, ok := m.srv.getNodeConn(nodeID) + state, ok := a.srv.getNodeConn(nodeID) if !ok { // Determine the server that has a connection to the node - srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region()) + srv, err := a.srv.serverWithNodeConn(nodeID, a.srv.Region()) if err != nil { var code *int64 if structs.IsErrNoNodeConn(err) { @@ -284,7 +293,7 @@ func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni handleStreamResultError(err, code, encoder) return } - conn, err := m.srv.streamingRpc(srv, "Agent.Monitor") + conn, err := a.srv.streamingRpc(srv, "Agent.Monitor") if err != nil { handleStreamResultError(err, nil, encoder) return @@ -312,7 +321,7 @@ func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni return } -func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { +func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { var target *serverParts serverID := args.ServerID @@ -320,7 +329,7 @@ func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.Moni args.ServerID = "" if serverID == "leader" { - isLeader, remoteServer := m.srv.getLeader() + isLeader, remoteServer := a.srv.getLeader() if !isLeader && remoteServer != nil { target = remoteServer } @@ -330,7 +339,7 @@ func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.Moni } } else { // See if the server ID is a known member - serfMembers := m.srv.Members() + serfMembers := a.srv.Members() for _, mem := range serfMembers { if mem.Name == serverID { if ok, srv := isNomadServer(mem); ok { @@ -347,7 +356,7 @@ func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.Moni return } - serverConn, err := m.srv.streamingRpc(target, "Agent.Monitor") + serverConn, err := a.srv.streamingRpc(target, "Agent.Monitor") if err != nil { handleStreamResultError(err, helper.Int64ToPtr(500), encoder) return @@ -365,7 +374,7 @@ func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.Moni return } -func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { +func (a *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { var target *serverParts serverID := args.ServerID @@ -373,7 +382,7 @@ func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cs args.ServerID = "" if serverID == "leader" { - isLeader, remoteServer := m.srv.getLeader() + isLeader, remoteServer := a.srv.getLeader() if !isLeader && remoteServer != nil { target = remoteServer } @@ -382,7 +391,7 @@ func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cs } } else { // See if the server ID is a known member - serfMembers := m.srv.Members() + serfMembers := a.srv.Members() for _, mem := range serfMembers { if mem.Name == serverID { if ok, srv := isNomadServer(mem); ok { @@ -399,7 +408,7 @@ func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cs } // Forward the request - rpcErr := m.srv.forwardServer(target, "Agent.Profile", args, reply) + rpcErr := a.srv.forwardServer(target, "Agent.Profile", args, reply) if rpcErr != nil { return structs.NewErrRPCCoded(500, rpcErr.Error()) } @@ -407,10 +416,10 @@ func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cs return nil } -func (m *Agent) forwardProfileClient(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { +func (a *Agent) forwardProfileClient(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { nodeID := args.NodeID - snap, err := m.srv.State().Snapshot() + snap, err := a.srv.State().Snapshot() if err != nil { return structs.NewErrRPCCoded(500, err.Error()) } @@ -431,10 +440,10 @@ func (m *Agent) forwardProfileClient(args *cstructs.AgentPprofRequest, reply *cs // Get the Connection to the client either by fowarding to another server // or creating direct stream - state, ok := m.srv.getNodeConn(nodeID) + state, ok := a.srv.getNodeConn(nodeID) if !ok { // Determine the server that has a connection to the node - srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region()) + srv, err := a.srv.serverWithNodeConn(nodeID, a.srv.Region()) if err != nil { code := 500 if structs.IsErrNoNodeConn(err) { @@ -443,7 +452,7 @@ func (m *Agent) forwardProfileClient(args *cstructs.AgentPprofRequest, reply *cs return structs.NewErrRPCCoded(code, err.Error()) } - rpcErr := m.srv.forwardServer(srv, "Agent.Profile", args, reply) + rpcErr := a.srv.forwardServer(srv, "Agent.Profile", args, reply) if rpcErr != nil { return structs.NewErrRPCCoded(500, err.Error()) } diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index ef93da44a..f5a5ca41b 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -464,20 +464,22 @@ func TestAgentProfile_RemoteClient(t *testing.T) { require := require.New(t) // start server and client - s1 := TestServer(t, nil) - defer s1.Shutdown() - s2 := TestServer(t, func(c *Config) { + s1, cleanup := TestServer(t, nil) + defer cleanup() + + s2, cleanup := TestServer(t, func(c *Config) { c.DevDisableBootstrap = true }) - defer s2.Shutdown() + defer cleanup() + TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) - c, cleanup := client.TestClient(t, func(c *config.Config) { + c, cleanupC := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.GetConfig().RPCAddr.String()} }) - defer cleanup() + defer cleanupC() testutil.WaitForResult(func() (bool, error) { nodes := s2.connectedNodes() @@ -504,12 +506,14 @@ func TestAgentProfile_Server(t *testing.T) { t.Parallel() // start servers - s1 := TestServer(t, nil) - defer s1.Shutdown() - s2 := TestServer(t, func(c *Config) { + s1, cleanup := TestServer(t, nil) + defer cleanup() + + s2, cleanup := TestServer(t, func(c *Config) { c.DevDisableBootstrap = true }) - defer s2.Shutdown() + defer cleanup() + TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC)