From e7589301ea4f7d23aec1d35c36daefca2f36a02c Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Thu, 24 Oct 2019 12:47:46 -0400 Subject: [PATCH] enable json formatting, use queryoptions --- api/agent.go | 10 +--------- api/agent_test.go | 16 ++++++++++++++-- client/monitor_endpoint.go | 16 ++++++++-------- command/agent/agent_endpoint.go | 15 +++++++++++---- command/agent/agent_endpoint_test.go | 6 +++--- command/agent/monitor/monitor.go | 3 +++ command/agent_monitor.go | 16 +++++++++++++++- nomad/client_monitor_endpoint.go | 2 +- 8 files changed, 56 insertions(+), 28 deletions(-) diff --git a/api/agent.go b/api/agent.go index 1fa20a00e..6e381e4ba 100644 --- a/api/agent.go +++ b/api/agent.go @@ -240,20 +240,13 @@ func (a *Agent) Health() (*AgentHealthResponse, error) { // Monitor returns a channel which will receive streaming logs from the agent // Providing a non-nil stopCh can be used to close the connection and stop log streaming -func (a *Agent) Monitor(loglevel string, nodeID string, stopCh <-chan struct{}, q *QueryOptions) (chan string, error) { +func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (chan string, error) { r, err := a.client.newRequest("GET", "/v1/agent/monitor") if err != nil { return nil, err } r.setQueryOptions(q) - if loglevel != "" { - r.params.Add("loglevel", loglevel) - } - if nodeID != "" { - r.params.Add("nodeID", nodeID) - } - _, resp, err := requireOK(a.client.doRequest(r)) if err != nil { return nil, err @@ -267,7 +260,6 @@ func (a *Agent) Monitor(loglevel string, nodeID string, stopCh <-chan struct{}, for { select { case <-stopCh: - close(logCh) return default: } diff --git a/api/agent_test.go b/api/agent_test.go index a8ec1eb14..8c18cc2dc 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -270,7 +270,12 @@ func TestAgent_MonitorServer(t *testing.T) { agent := c.Agent() doneCh := make(chan struct{}) - logCh, err := agent.Monitor("debug", "", doneCh, nil) + q := &QueryOptions{ + Params: map[string]string{ + "log-level": "debug", + }, + } + logCh, err := agent.Monitor(doneCh, q) defer close(doneCh) if err != nil { t.Fatalf("err: %v", err) @@ -301,9 +306,16 @@ func TestAgent_MonitorWithNode(t *testing.T) { agent := c.Agent() id, _ := uuid.GenerateUUID() + q := &QueryOptions{ + Params: map[string]string{ + "log-level": "debug", + "node-id": id, + }, + } + doneCh := make(chan struct{}) // todo need to create or stub a nodeid? - logCh, err := agent.Monitor("debug", id, doneCh, nil) + logCh, err := agent.Monitor(doneCh, q) defer close(doneCh) if err != nil { t.Fatalf("err: %v", err) diff --git a/client/monitor_endpoint.go b/client/monitor_endpoint.go index 6d4aa4183..181bfad84 100644 --- a/client/monitor_endpoint.go +++ b/client/monitor_endpoint.go @@ -33,29 +33,29 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) { defer conn.Close() // Decode arguments - var req cstructs.MonitorRequest + var args cstructs.MonitorRequest decoder := codec.NewDecoder(conn, structs.MsgpackHandle) encoder := codec.NewEncoder(conn, structs.MsgpackHandle) - if err := decoder.Decode(&req); err != nil { + if err := decoder.Decode(&args); err != nil { handleStreamResultError(err, helper.Int64ToPtr(500), encoder) return } // Check acl - if aclObj, err := m.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { + if aclObj, err := m.c.ResolveToken(args.QueryOptions.AuthToken); err != nil { handleStreamResultError(err, helper.Int64ToPtr(403), encoder) return - } else if aclObj != nil && !aclObj.AllowNsOp(req.Namespace, acl.NamespaceCapabilityReadFS) { + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) { handleStreamResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder) return } var logLevel log.Level - if req.LogLevel == "" { + if args.LogLevel == "" { logLevel = log.LevelFromString("INFO") } else { - logLevel = log.LevelFromString(req.LogLevel) + logLevel = log.LevelFromString(args.LogLevel) } if logLevel == log.NoLevel { @@ -69,13 +69,13 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) { defer cancel() monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{ + JSONFormat: args.LogJSON, Level: logLevel, - JSONFormat: false, }) go func() { if _, err := conn.Read(nil); err != nil { - close(stopCh) + // One end of the pipe explicitly closed, exit cancel() return } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index b99b62f60..5b92fe07d 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "sort" + "strconv" "strings" "github.com/docker/docker/pkg/ioutils" @@ -165,7 +166,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( } // Get the provided loglevel. - logLevel := req.URL.Query().Get("loglevel") + logLevel := req.URL.Query().Get("log-level") if logLevel == "" { logLevel = "INFO" } @@ -175,13 +176,19 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( } // Determine if we are targeting a server or client - nodeID := req.URL.Query().Get("nodeID") + nodeID := req.URL.Query().Get("node-id") + + logJSONStr := req.URL.Query().Get("log-json") + logJSON, err := strconv.ParseBool(logJSONStr) + if err != nil { + logJSON = false + } // Build the request and parse the ACL token args := cstructs.MonitorRequest{ NodeID: nodeID, LogLevel: logLevel, - LogJSON: false, + LogJSON: logJSON, } s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) @@ -208,7 +215,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(req.Context()) go func() { <-ctx.Done() httpPipe.Close() diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 72be97b7e..9c4cadb5e 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -256,7 +256,7 @@ func TestHTTP_AgentMonitor(t *testing.T) { httpTest(t, nil, func(s *TestAgent) { { - req, err := http.NewRequest("GET", "/v1/agent/monitor?loglevel=unknown", nil) + req, err := http.NewRequest("GET", "/v1/agent/monitor?log-level=unknown", nil) require.Nil(t, err) resp := newClosableRecorder() @@ -269,7 +269,7 @@ func TestHTTP_AgentMonitor(t *testing.T) { // check for a specific log { - req, err := http.NewRequest("GET", "/v1/agent/monitor?loglevel=warn", nil) + req, err := http.NewRequest("GET", "/v1/agent/monitor?log-level=warn", nil) require.Nil(t, err) resp := newClosableRecorder() defer resp.Close() @@ -305,7 +305,7 @@ func TestHTTP_AgentMonitor(t *testing.T) { // stream logs for a given node { - req, err := http.NewRequest("GET", "/v1/agent/monitor?loglevel=warn&nodeID="+s.client.NodeID(), nil) + req, err := http.NewRequest("GET", "/v1/agent/monitor?log-level=warn&node-id="+s.client.NodeID(), nil) require.Nil(t, err) resp := newClosableRecorder() defer resp.Close() diff --git a/command/agent/monitor/monitor.go b/command/agent/monitor/monitor.go index 7c56270b0..db220a661 100644 --- a/command/agent/monitor/monitor.go +++ b/command/agent/monitor/monitor.go @@ -41,6 +41,9 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { case log := <-d.logCh: logCh <- log case <-stopCh: + d.Lock() + defer d.Unlock() + d.logger.DeregisterSink(d.sink) close(d.logCh) return diff --git a/command/agent_monitor.go b/command/agent_monitor.go index 1d0341c1e..daeb35040 100644 --- a/command/agent_monitor.go +++ b/command/agent_monitor.go @@ -4,9 +4,11 @@ import ( "fmt" "os" "os/signal" + "strconv" "strings" "syscall" + "github.com/hashicorp/nomad/api" "github.com/mitchellh/cli" ) @@ -46,10 +48,13 @@ func (c *MonitorCommand) Run(args []string) int { var logLevel string var nodeID string + var logJSON bool + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.StringVar(&logLevel, "log-level", "", "") flags.StringVar(&nodeID, "node-id", "", "") + flags.BoolVar(&logJSON, "log-json", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -62,8 +67,17 @@ func (c *MonitorCommand) Run(args []string) int { return 1 } + params := map[string]string{ + "log-level": logLevel, + "node-id": nodeID, + "log-json": strconv.FormatBool(logJSON), + } + + query := &api.QueryOptions{ + Params: params, + } eventDoneCh := make(chan struct{}) - logCh, err := client.Agent().Monitor(logLevel, nodeID, eventDoneCh, nil) + logCh, err := client.Agent().Monitor(eventDoneCh, query) if err != nil { c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err)) c.Ui.Error(commandErrorText(c)) diff --git a/nomad/client_monitor_endpoint.go b/nomad/client_monitor_endpoint.go index bbe93ae16..6eedd75d1 100644 --- a/nomad/client_monitor_endpoint.go +++ b/nomad/client_monitor_endpoint.go @@ -137,7 +137,7 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) { monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{ Level: logLevel, - JSONFormat: false, + JSONFormat: args.LogJSON, }) go func() {