warn when enabled debug is on when registering

m -> a receiver name

return codederrors, fix query
This commit is contained in:
Drew Bailey
2019-12-10 15:18:57 -05:00
parent d077cfe763
commit c28e5ad036
7 changed files with 98 additions and 60 deletions

View File

@@ -24,12 +24,19 @@ type Agent struct {
}
func NewAgentEndpoint(c *Client) *Agent {
m := &Agent{c: c}
m.c.streamingRpcs.Register("Agent.Monitor", m.monitor)
return m
a := &Agent{c: c}
a.c.streamingRpcs.Register("Agent.Monitor", a.monitor)
return a
}
func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
func (a *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
// Check ACL for agent write
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
return structs.NewErrRPCCoded(500, err.Error())
} else if aclObj != nil && !aclObj.AllowAgentWrite() {
return structs.NewErrRPCCoded(403, structs.ErrPermissionDenied.Error())
}
var resp []byte
var err error
@@ -55,12 +62,12 @@ func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentP
// Copy profile response to reply
reply.Payload = resp
reply.AgentID = m.c.NodeID()
reply.AgentID = a.c.NodeID()
return nil
}
func (m *Agent) monitor(conn io.ReadWriteCloser) {
func (a *Agent) monitor(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now())
defer conn.Close()
@@ -75,7 +82,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}
// Check acl
if aclObj, err := m.c.ResolveToken(args.AuthToken); err != nil {
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(403), encoder)
return
} else if aclObj != nil && !aclObj.AllowAgentRead() {
@@ -96,7 +103,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{
monitor := monitor.New(512, a.c.logger, &log.LoggerOptions{
JSONFormat: args.LogJSON,
Level: logLevel,
})

View File

@@ -35,6 +35,7 @@ type ClientStatsResponse struct {
structs.QueryMeta
}
// MonitorRequest is used to request and stream logs from a client node.
type MonitorRequest struct {
// LogLevel is the log level filter we want to stream logs on
LogLevel string
@@ -54,11 +55,14 @@ type MonitorRequest struct {
structs.QueryOptions
}
// AgentPprofRequest is used to request a pprof report for a given node.
type AgentPprofRequest struct {
// Profile specifies the profile to use
// ReqType specifies the profile to use
ReqType profile.ReqType
// Profile specifies the runtime/pprof profile to lookup and generate.
Profile string
// Seconds is the number of seconds to capture a profile
Seconds int
@@ -74,13 +78,12 @@ type AgentPprofRequest struct {
structs.QueryOptions
}
// AgentPprofResponse is used to return a generated pprof profile
type AgentPprofResponse struct {
// Error stores any error that may have occurred.
Error *RpcError
// ID of the agent that fulfilled the request
AgentID string
// Payload is the generated pprof profile
Payload []byte
}

View File

@@ -348,7 +348,10 @@ func (s *HTTPServer) AgentPprofRequest(resp http.ResponseWriter, req *http.Reque
return s.agentPprof(profile.TraceReq, resp, req)
default:
// Add profile to request
req.URL.Query().Add("profile", path)
values := req.URL.Query()
values.Add("profile", path)
req.URL.RawQuery = values.Encode()
// generic pprof profile request
return s.agentPprof(profile.LookupReq, resp, req)
}
@@ -424,10 +427,19 @@ func (s *HTTPServer) agentPprof(reqType profile.ReqType, resp http.ResponseWrite
}
if rpcErr != nil {
// Return RPCCodedErr
return nil, rpcErr
code, msg, ok := structs.CodeFromRPCCodedErr(rpcErr)
if !ok {
return nil, CodedError(500, rpcErr.Error())
}
// Return CodedError
return nil, CodedError(code, msg)
}
// Pprof cmdline is not a typical pprof
// so just return string instead of bytes
if args.ReqType == profile.CmdReq {
return string(reply.Payload), nil
}
return reply.Payload, nil
}

View File

@@ -549,12 +549,14 @@ func TestAgent_PprofRequest(t *testing.T) {
if tc.expectedErr != "" {
require.Error(t, err)
httpErr, ok := err.(HTTPCodedError)
require.True(t, ok)
require.Equal(t, httpErr.Code(), tc.expectedStatus)
} else {
require.NoError(t, err)
require.NotNil(t, resp)
}
require.Equal(t, tc.expectedStatus, respW.Code)
})
})
}

View File

@@ -219,6 +219,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.Handle("/", handleRootFallthrough())
if enableDebug {
s.logger.Warn("enable_debug is set to true. This is insecure and should not be enabled in production")
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile)

View File

@@ -24,17 +24,17 @@ type Agent struct {
srv *Server
}
func (m *Agent) register() {
m.srv.streamingRpcs.Register("Agent.Monitor", m.monitor)
func (a *Agent) register() {
a.srv.streamingRpcs.Register("Agent.Monitor", a.monitor)
}
func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
func (a *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
// Targeting a node, forward request to node
if args.NodeID != "" {
return m.forwardProfileClient(args, reply)
return a.forwardProfileClient(args, reply)
}
currentServer := m.srv.serf.LocalMember().Name
currentServer := a.srv.serf.LocalMember().Name
var forwardServer bool
// Targeting a remote server which is not the leader and not this server
if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer {
@@ -42,20 +42,29 @@ func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentP
}
// Targeting leader and this server is not current leader
if args.ServerID == "leader" && !m.srv.IsLeader() {
if args.ServerID == "leader" && !a.srv.IsLeader() {
forwardServer = true
}
// Forward request to a remote server
if forwardServer {
// forward the request
return m.forwardProfileServer(args, reply)
return a.forwardProfileServer(args, reply)
}
// Check ACL for agent write
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
return structs.NewErrRPCCoded(500, err.Error())
} else if aclObj != nil && !aclObj.AllowAgentWrite() {
return structs.NewErrRPCCoded(403, structs.ErrPermissionDenied.Error())
}
// Process the request on this server
var resp []byte
var err error
// Mark which server fulfilled the request
reply.AgentID = m.srv.serf.LocalMember().Name
reply.AgentID = a.srv.serf.LocalMember().Name
// Determine which profile to run
// and generate profile. Blocks for args.Seconds
@@ -85,7 +94,7 @@ func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentP
return nil
}
func (m *Agent) monitor(conn io.ReadWriteCloser) {
func (a *Agent) monitor(conn io.ReadWriteCloser) {
defer conn.Close()
// Decode args
@@ -99,7 +108,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}
// Check agent read permissions
if aclObj, err := m.srv.ResolveToken(args.AuthToken); err != nil {
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, nil, encoder)
return
} else if aclObj != nil && !aclObj.AllowAgentRead() {
@@ -119,12 +128,12 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
// Targeting a node, forward request to node
if args.NodeID != "" {
m.forwardMonitorClient(conn, args, encoder, decoder)
a.forwardMonitorClient(conn, args, encoder, decoder)
// forwarded request has ended, return
return
}
currentServer := m.srv.serf.LocalMember().Name
currentServer := a.srv.serf.LocalMember().Name
var forwardServer bool
// Targeting a remote server which is not the leader and not this server
if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer {
@@ -132,12 +141,12 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}
// Targeting leader and this server is not current leader
if args.ServerID == "leader" && !m.srv.IsLeader() {
if args.ServerID == "leader" && !a.srv.IsLeader() {
forwardServer = true
}
if forwardServer {
m.forwardMonitorServer(conn, args, encoder, decoder)
a.forwardMonitorServer(conn, args, encoder, decoder)
return
}
@@ -145,7 +154,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{
monitor := monitor.New(512, a.srv.logger, &log.LoggerOptions{
Level: logLevel,
JSONFormat: args.LogJSON,
})
@@ -243,10 +252,10 @@ OUTER:
}
}
func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
nodeID := args.NodeID
snap, err := m.srv.State().Snapshot()
snap, err := a.srv.State().Snapshot()
if err != nil {
handleStreamResultError(err, nil, encoder)
return
@@ -272,10 +281,10 @@ func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni
// Get the Connection to the client either by fowarding to another server
// or creating direct stream
var clientConn net.Conn
state, ok := m.srv.getNodeConn(nodeID)
state, ok := a.srv.getNodeConn(nodeID)
if !ok {
// Determine the server that has a connection to the node
srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region())
srv, err := a.srv.serverWithNodeConn(nodeID, a.srv.Region())
if err != nil {
var code *int64
if structs.IsErrNoNodeConn(err) {
@@ -284,7 +293,7 @@ func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni
handleStreamResultError(err, code, encoder)
return
}
conn, err := m.srv.streamingRpc(srv, "Agent.Monitor")
conn, err := a.srv.streamingRpc(srv, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
@@ -312,7 +321,7 @@ func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni
return
}
func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
var target *serverParts
serverID := args.ServerID
@@ -320,7 +329,7 @@ func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.Moni
args.ServerID = ""
if serverID == "leader" {
isLeader, remoteServer := m.srv.getLeader()
isLeader, remoteServer := a.srv.getLeader()
if !isLeader && remoteServer != nil {
target = remoteServer
}
@@ -330,7 +339,7 @@ func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.Moni
}
} else {
// See if the server ID is a known member
serfMembers := m.srv.Members()
serfMembers := a.srv.Members()
for _, mem := range serfMembers {
if mem.Name == serverID {
if ok, srv := isNomadServer(mem); ok {
@@ -347,7 +356,7 @@ func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.Moni
return
}
serverConn, err := m.srv.streamingRpc(target, "Agent.Monitor")
serverConn, err := a.srv.streamingRpc(target, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
@@ -365,7 +374,7 @@ func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.Moni
return
}
func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
func (a *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
var target *serverParts
serverID := args.ServerID
@@ -373,7 +382,7 @@ func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cs
args.ServerID = ""
if serverID == "leader" {
isLeader, remoteServer := m.srv.getLeader()
isLeader, remoteServer := a.srv.getLeader()
if !isLeader && remoteServer != nil {
target = remoteServer
}
@@ -382,7 +391,7 @@ func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cs
}
} else {
// See if the server ID is a known member
serfMembers := m.srv.Members()
serfMembers := a.srv.Members()
for _, mem := range serfMembers {
if mem.Name == serverID {
if ok, srv := isNomadServer(mem); ok {
@@ -399,7 +408,7 @@ func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cs
}
// Forward the request
rpcErr := m.srv.forwardServer(target, "Agent.Profile", args, reply)
rpcErr := a.srv.forwardServer(target, "Agent.Profile", args, reply)
if rpcErr != nil {
return structs.NewErrRPCCoded(500, rpcErr.Error())
}
@@ -407,10 +416,10 @@ func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cs
return nil
}
func (m *Agent) forwardProfileClient(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
func (a *Agent) forwardProfileClient(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
nodeID := args.NodeID
snap, err := m.srv.State().Snapshot()
snap, err := a.srv.State().Snapshot()
if err != nil {
return structs.NewErrRPCCoded(500, err.Error())
}
@@ -431,10 +440,10 @@ func (m *Agent) forwardProfileClient(args *cstructs.AgentPprofRequest, reply *cs
// Get the Connection to the client either by fowarding to another server
// or creating direct stream
state, ok := m.srv.getNodeConn(nodeID)
state, ok := a.srv.getNodeConn(nodeID)
if !ok {
// Determine the server that has a connection to the node
srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region())
srv, err := a.srv.serverWithNodeConn(nodeID, a.srv.Region())
if err != nil {
code := 500
if structs.IsErrNoNodeConn(err) {
@@ -443,7 +452,7 @@ func (m *Agent) forwardProfileClient(args *cstructs.AgentPprofRequest, reply *cs
return structs.NewErrRPCCoded(code, err.Error())
}
rpcErr := m.srv.forwardServer(srv, "Agent.Profile", args, reply)
rpcErr := a.srv.forwardServer(srv, "Agent.Profile", args, reply)
if rpcErr != nil {
return structs.NewErrRPCCoded(500, err.Error())
}

View File

@@ -464,20 +464,22 @@ func TestAgentProfile_RemoteClient(t *testing.T) {
require := require.New(t)
// start server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
s1, cleanup := TestServer(t, nil)
defer cleanup()
s2, cleanup := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
defer cleanup()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c, cleanupC := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.GetConfig().RPCAddr.String()}
})
defer cleanup()
defer cleanupC()
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
@@ -504,12 +506,14 @@ func TestAgentProfile_Server(t *testing.T) {
t.Parallel()
// start servers
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
s1, cleanup := TestServer(t, nil)
defer cleanup()
s2, cleanup := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
defer cleanup()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)