mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
fix deadlock issue, switch to frames envelope
This commit is contained in:
20
api/agent.go
20
api/agent.go
@@ -238,9 +238,13 @@ func (a *Agent) Health() (*AgentHealthResponse, error) {
|
||||
return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err)
|
||||
}
|
||||
|
||||
type MonitorFrame struct {
|
||||
Data []byte `json:",omitempty"`
|
||||
}
|
||||
|
||||
// Monitor returns a channel which will receive streaming logs from the agent
|
||||
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
|
||||
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (chan string, error) {
|
||||
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *MonitorFrame, error) {
|
||||
r, err := a.client.newRequest("GET", "/v1/agent/monitor")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -252,10 +256,10 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (chan string, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logCh := make(chan string, 64)
|
||||
frames := make(chan *MonitorFrame, 10)
|
||||
go func() {
|
||||
defer resp.Body.Close()
|
||||
defer close(logCh)
|
||||
defer close(frames)
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
|
||||
LOOP:
|
||||
@@ -267,10 +271,12 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (chan string, e
|
||||
}
|
||||
|
||||
if scanner.Scan() {
|
||||
if text := scanner.Text(); text != "" {
|
||||
logCh <- text
|
||||
var frame MonitorFrame
|
||||
if bytes := scanner.Bytes(); len(bytes) > 0 {
|
||||
frame.Data = bytes
|
||||
frames <- &frame
|
||||
} else {
|
||||
logCh <- " "
|
||||
frames <- &frame
|
||||
}
|
||||
} else {
|
||||
break LOOP
|
||||
@@ -278,7 +284,7 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (chan string, e
|
||||
}
|
||||
}()
|
||||
|
||||
return logCh, nil
|
||||
return frames, nil
|
||||
}
|
||||
|
||||
// joinResponse is used to decode the response we get while
|
||||
|
||||
@@ -324,7 +324,7 @@ OUTER:
|
||||
for {
|
||||
select {
|
||||
case log := <-logCh:
|
||||
if strings.Contains(log, "[DEBUG]") {
|
||||
if strings.Contains(string(log.Data), "[DEBUG]") {
|
||||
break OUTER
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
@@ -363,7 +363,7 @@ OUTER:
|
||||
for {
|
||||
select {
|
||||
case log := <-logCh:
|
||||
if strings.Contains(log, "[DEBUG]") {
|
||||
if strings.Contains(string(log.Data), "[DEBUG]") {
|
||||
break OUTER
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
|
||||
@@ -105,9 +105,6 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
|
||||
// Write attempts to send latest log to logCh
|
||||
// it drops the log if channel is unavailable to receive
|
||||
func (d *Monitor) Write(p []byte) (n int, err error) {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
bytes := make([]byte, len(p))
|
||||
copy(bytes, p)
|
||||
|
||||
|
||||
@@ -89,7 +89,7 @@ func (c *MonitorCommand) Run(args []string) int {
|
||||
Params: params,
|
||||
}
|
||||
eventDoneCh := make(chan struct{})
|
||||
logCh, err := client.Agent().Monitor(eventDoneCh, query)
|
||||
frames, err := client.Agent().Monitor(eventDoneCh, query)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err))
|
||||
c.Ui.Error(commandErrorText(c))
|
||||
@@ -101,11 +101,11 @@ func (c *MonitorCommand) Run(args []string) int {
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case log, ok := <-logCh:
|
||||
case frame, ok := <-frames:
|
||||
if !ok {
|
||||
break OUTER
|
||||
}
|
||||
c.Ui.Output(log)
|
||||
c.Ui.Output(string(frame.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user