diff --git a/api/agent.go b/api/agent.go index b7e5e5a10..d3622acbc 100644 --- a/api/agent.go +++ b/api/agent.go @@ -238,9 +238,13 @@ func (a *Agent) Health() (*AgentHealthResponse, error) { return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err) } +type MonitorFrame struct { + Data []byte `json:",omitempty"` +} + // Monitor returns a channel which will receive streaming logs from the agent // Providing a non-nil stopCh can be used to close the connection and stop log streaming -func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (chan string, error) { +func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *MonitorFrame, error) { r, err := a.client.newRequest("GET", "/v1/agent/monitor") if err != nil { return nil, err @@ -252,10 +256,10 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (chan string, e return nil, err } - logCh := make(chan string, 64) + frames := make(chan *MonitorFrame, 10) go func() { defer resp.Body.Close() - defer close(logCh) + defer close(frames) scanner := bufio.NewScanner(resp.Body) LOOP: @@ -267,10 +271,12 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (chan string, e } if scanner.Scan() { - if text := scanner.Text(); text != "" { - logCh <- text + var frame MonitorFrame + if bytes := scanner.Bytes(); len(bytes) > 0 { + frame.Data = bytes + frames <- &frame } else { - logCh <- " " + frames <- &frame } } else { break LOOP @@ -278,7 +284,7 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (chan string, e } }() - return logCh, nil + return frames, nil } // joinResponse is used to decode the response we get while diff --git a/api/agent_test.go b/api/agent_test.go index 82a8e93f8..4a0304f09 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -324,7 +324,7 @@ OUTER: for { select { case log := <-logCh: - if strings.Contains(log, "[DEBUG]") { + if strings.Contains(string(log.Data), "[DEBUG]") { break OUTER } case <-time.After(2 * time.Second): @@ -363,7 +363,7 @@ OUTER: for { select { case log := <-logCh: - if strings.Contains(log, "[DEBUG]") { + if strings.Contains(string(log.Data), "[DEBUG]") { break OUTER } case <-time.After(2 * time.Second): diff --git a/command/agent/monitor/monitor.go b/command/agent/monitor/monitor.go index 3605bb992..471a2c451 100644 --- a/command/agent/monitor/monitor.go +++ b/command/agent/monitor/monitor.go @@ -105,9 +105,6 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { // Write attempts to send latest log to logCh // it drops the log if channel is unavailable to receive func (d *Monitor) Write(p []byte) (n int, err error) { - d.Lock() - defer d.Unlock() - bytes := make([]byte, len(p)) copy(bytes, p) diff --git a/command/agent_monitor.go b/command/agent_monitor.go index c09c200da..a369fbb48 100644 --- a/command/agent_monitor.go +++ b/command/agent_monitor.go @@ -89,7 +89,7 @@ func (c *MonitorCommand) Run(args []string) int { Params: params, } eventDoneCh := make(chan struct{}) - logCh, err := client.Agent().Monitor(eventDoneCh, query) + frames, err := client.Agent().Monitor(eventDoneCh, query) if err != nil { c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err)) c.Ui.Error(commandErrorText(c)) @@ -101,11 +101,11 @@ func (c *MonitorCommand) Run(args []string) int { OUTER: for { select { - case log, ok := <-logCh: + case frame, ok := <-frames: if !ok { break OUTER } - c.Ui.Output(log) + c.Ui.Output(string(frame.Data)) } }