moving endpoints over to frames

This commit is contained in:
Drew Bailey
2019-11-01 10:33:28 -04:00
parent f8eaf1f5af
commit 9a96c10d4c
6 changed files with 97 additions and 27 deletions

View File

@@ -289,7 +289,7 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *Stream
}
}()
return frames, nil
return frames, errCh
}
// joinResponse is used to decode the response we get while

View File

@@ -1,6 +1,7 @@
package api
import (
"bytes"
"fmt"
"reflect"
"sort"
@@ -311,22 +312,24 @@ func TestAgent_MonitorWithNode(t *testing.T) {
},
}
logCh, err := agent.Monitor(doneCh, q)
require.NoError(t, err)
frames, errCh := agent.Monitor(doneCh, q)
defer close(doneCh)
// make a request to generate some logs
_, err = agent.NodeName()
_, err := agent.NodeName()
require.NoError(t, err)
// Wait for a log message
var result bytes.Buffer
OUTER:
for {
select {
case log := <-logCh:
if strings.Contains(string(log.Data), "[DEBUG]") {
case f := <-frames:
if strings.Contains(string(f.Data), "[DEBUG]") {
break OUTER
}
case err := <-errCh:
t.Errorf("Error: %v", err)
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
@@ -350,22 +353,26 @@ func TestAgent_Monitor(t *testing.T) {
}
doneCh := make(chan struct{})
logCh, err := agent.Monitor(doneCh, q)
require.NoError(t, err)
frames, errCh := agent.Monitor(doneCh, q)
defer close(doneCh)
// make a request to generate some logs
_, err = agent.Region()
_, err := agent.Region()
require.NoError(t, err)
// Wait for a log message
OUTER:
for {
select {
case log := <-logCh:
case log := <-frames:
if log == nil {
continue
}
if strings.Contains(string(log.Data), "[DEBUG]") {
break OUTER
}
case err := <-errCh:
t.Fatalf("error: %v", err)
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}

View File

@@ -298,15 +298,13 @@ func TestHTTP_AgentMonitor(t *testing.T) {
tried := 0
testutil.WaitForResult(func() (bool, error) {
if tried < maxLogAttempts {
s.Server.logger.Debug("log that should not be sent")
s.Server.logger.Warn("log that should be sent")
tried++
}
got := resp.Body.String()
want := "[WARN] http: log that should be sent"
want := `{"Data":"`
if strings.Contains(got, want) {
require.NotContains(t, resp.Body.String(), "[DEBUG]")
return true, nil
}
@@ -344,9 +342,8 @@ func TestHTTP_AgentMonitor(t *testing.T) {
}
out += string(output)
want := "[WARN] http: log that should be sent"
want := `{"Data":"`
if strings.Contains(out, want) {
require.NotContains(t, resp.Body.String(), "[DEBUG]")
return true, nil
}

View File

@@ -14,6 +14,8 @@ func TestMonitorCommand_Implements(t *testing.T) {
func TestMonitorCommand_Fails(t *testing.T) {
t.Parallel()
srv, _, _ := testServer(t, false, nil)
defer srv.Shutdown()
ui := new(cli.MockUi)
cmd := &MonitorCommand{Meta: Meta{Ui: ui}}

View File

@@ -1,14 +1,17 @@
package nomad
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"strings"
"time"
log "github.com/hashicorp/go-hclog"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/helper"
@@ -138,9 +141,20 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
JSONFormat: args.LogJSON,
})
frames := make(chan *sframer.StreamFrame, 32)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
// framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 64*1024)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()
defer framer.Destroy()
// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe closed, exit
// One end of the pipe explicitly closed, exit
cancel()
return
}
@@ -151,14 +165,59 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}()
logCh := monitor.Start(stopCh)
initialOffset := int64(0)
// receive logs and build frames
go func() {
defer framer.Destroy()
LOOP:
for {
select {
case log := <-logCh:
if err := framer.Send("", "log", log, initialOffset); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
}()
var streamErr error
OUTER:
for {
select {
case log := <-logCh:
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
// There was a pending error!
default:
// No error, continue on
}
break OUTER
}
var resp cstructs.StreamErrWrapper
resp.Payload = log
if args.PlainText {
resp.Payload = frame.Data
} else {
if err := frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}
resp.Payload = buf.Bytes()
buf.Reset()
}
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
@@ -174,11 +233,5 @@ OUTER:
if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") {
return
}
// Attempt to send the error
encoder.Encode(&cstructs.StreamErrWrapper{
Error: cstructs.NewRpcError(streamErr, helper.Int64ToPtr(500)),
})
return
}
}

View File

@@ -1,6 +1,7 @@
package nomad
import (
"encoding/json"
"fmt"
"io"
"net"
@@ -11,10 +12,12 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
)
@@ -85,7 +88,7 @@ func TestMonitor_Monitor_Remote_Server(t *testing.T) {
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(1 * time.Second)
timeout := time.After(3 * time.Second)
expected := "[DEBUG]"
received := ""
@@ -101,7 +104,11 @@ OUTER:
t.Fatalf("Got error: %v", msg.Error.Error())
}
received += string(msg.Payload)
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)
received += string(frame.Data)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER
@@ -181,7 +188,11 @@ OUTER:
t.Fatalf("Got error: %v", msg.Error.Error())
}
received += string(msg.Payload)
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)
received += string(frame.Data)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER