Merge pull request #4234 from hashicorp/b-4159

Fix race in StreamFramer and truncation in api/AllocFS.Logs
This commit is contained in:
Michael Schurter
2018-05-04 14:24:07 -07:00
committed by GitHub
6 changed files with 238 additions and 82 deletions

View File

@@ -254,10 +254,15 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
// The chan will be closed when follow=false and the end of the file is
// reached.
//
// Unexpected (non-EOF) errors will be sent on the error chan.
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
@@ -315,8 +320,11 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
errCh <- err
close(frames)
if err == io.EOF || err == io.ErrClosedPipe {
close(frames)
} else {
errCh <- err
}
return
}

View File

@@ -1,14 +1,162 @@
package api
import (
"bytes"
"fmt"
"io"
"reflect"
"strings"
"testing"
"time"
units "github.com/docker/go-units"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFS_Logs(t *testing.T) {
t.Parallel()
require := require.New(t)
rpcPort := 0
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
rpcPort = c.Ports.RPC
c.Client = &testutil.ClientConfig{
Enabled: true,
}
})
defer s.Stop()
//TODO There should be a way to connect the client to the servers in
//makeClient above
require.NoError(c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)}))
index := uint64(0)
testutil.WaitForResult(func() (bool, error) {
nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
index = qm.LastIndex
if len(nodes) != 1 {
return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes))
}
if nodes[0].Status != "ready" {
return false, fmt.Errorf("node not ready: %s", nodes[0].Status)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
var input strings.Builder
input.Grow(units.MB)
lines := 80 * units.KB
for i := 0; i < lines; i++ {
fmt.Fprintf(&input, "%d\n", i)
}
job := &Job{
ID: helper.StringToPtr("TestFS_Logs"),
Region: helper.StringToPtr("global"),
Datacenters: []string{"dc1"},
Type: helper.StringToPtr("batch"),
TaskGroups: []*TaskGroup{
{
Name: helper.StringToPtr("TestFS_LogsGroup"),
Tasks: []*Task{
{
Name: "logger",
Driver: "mock_driver",
Config: map[string]interface{}{
"stdout_string": input.String(),
},
},
},
},
},
}
jobs := c.Jobs()
jobResp, _, err := jobs.Register(job, nil)
require.NoError(err)
index = jobResp.EvalCreateIndex
evals := c.Evaluations()
testutil.WaitForResult(func() (bool, error) {
evalResp, qm, err := evals.Info(jobResp.EvalID, &QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
if evalResp.BlockedEval != "" {
t.Fatalf("Eval blocked: %s", pretty.Sprint(evalResp))
}
index = qm.LastIndex
if evalResp.Status != "complete" {
return false, fmt.Errorf("eval status: %v", evalResp.Status)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
allocID := ""
testutil.WaitForResult(func() (bool, error) {
allocs, _, err := jobs.Allocations(*job.ID, true, &QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
if len(allocs) != 1 {
return false, fmt.Errorf("unexpected number of allocs: %d", len(allocs))
}
if allocs[0].ClientStatus != "complete" {
return false, fmt.Errorf("alloc not complete: %s", allocs[0].ClientStatus)
}
allocID = allocs[0].ID
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
alloc, _, err := c.Allocations().Info(allocID, nil)
require.NoError(err)
for i := 0; i < 3; i++ {
stopCh := make(chan struct{})
defer close(stopCh)
frames, errors := c.AllocFS().Logs(alloc, false, "logger", "stdout", "start", 0, stopCh, nil)
var result bytes.Buffer
READ_FRAMES:
for {
select {
case f := <-frames:
if f == nil {
break READ_FRAMES
}
result.Write(f.Data)
case err := <-errors:
// Don't Fatal here as the other assertions may
// contain helpeful information.
t.Errorf("Error: %v", err)
}
}
// Check length
assert.Equal(t, input.Len(), result.Len(), "file size mismatch")
// Check complete ordering
for i := 0; i < lines; i++ {
line, err := result.ReadBytes('\n')
require.NoErrorf(err, "unexpected error on line %d: %v", i, err)
require.Equal(fmt.Sprintf("%d\n", i), string(line))
}
}
}
func TestFS_FrameReader(t *testing.T) {
t.Parallel()
// Create a channel of the frames and a cancel channel

View File

@@ -294,6 +294,7 @@ OUTER:
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
}
@@ -405,8 +406,6 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
// Start streaming
go func() {
@@ -423,20 +422,23 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
go func() {
for {
if _, err := conn.Read(nil); err != nil {
if err == io.EOF {
if err == io.EOF || err == io.ErrClosedPipe {
// One end of the pipe was explicitly closed, exit cleanly
cancel()
return
}
select {
case errCh <- err:
case <-ctx.Done():
return
}
return
}
}
}()
var streamErr error
buf := new(bytes.Buffer)
frameCodec := codec.NewEncoder(buf, structs.JsonHandle)
OUTER:
for {
select {
@@ -455,6 +457,7 @@ OUTER:
streamErr = err
break OUTER
}
frameCodec.Reset(buf)
resp.Payload = buf.Bytes()
buf.Reset()
@@ -464,6 +467,7 @@ OUTER:
streamErr = err
break OUTER
}
encoder.Reset(conn)
}
}
@@ -576,12 +580,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in
// #3342
select {
case <-framer.ExitCh():
err := parseFramerErr(framer.Err())
if err == syscall.EPIPE {
// EPIPE just means the connection was closed
return nil
}
return err
return nil
default:
}
@@ -705,7 +704,7 @@ OUTER:
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
return parseFramerErr(framer.Err())
return nil
case <-ctx.Done():
return nil
case err, ok := <-eofCancelCh:

View File

@@ -55,8 +55,18 @@ func (s *StreamFrame) IsCleared() bool {
}
}
func (s *StreamFrame) Copy() *StreamFrame {
n := new(StreamFrame)
*n = *s
n.Data = make([]byte, len(s.Data))
copy(n.Data, s.Data)
return n
}
// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
// out is where frames are sent and is closed when no more frames will
// be sent.
out chan<- *StreamFrame
frameSize int
@@ -64,21 +74,26 @@ type StreamFramer struct {
heartbeat *time.Ticker
flusher *time.Ticker
shutdown bool
// shutdown is true when a shutdown is triggered
shutdown bool
// shutdownCh is closed when no more Send()s will be called and run()
// should flush pending frames before closing exitCh
shutdownCh chan struct{}
exitCh chan struct{}
// exitCh is closed when the run() goroutine exits and no more frames
// will be sent.
exitCh chan struct{}
// The mutex protects everything below
l sync.Mutex
// The current working frame
f StreamFrame
f *StreamFrame
data *bytes.Buffer
// Captures whether the framer is running and any error that occurred to
// cause it to stop.
// Captures whether the framer is running
running bool
err error
}
// NewStreamFramer creates a new stream framer that will output StreamFrames to
@@ -95,6 +110,7 @@ func NewStreamFramer(out chan<- *StreamFrame,
frameSize: frameSize,
heartbeat: heartbeat,
flusher: flusher,
f: new(StreamFrame),
data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)),
shutdownCh: make(chan struct{}),
exitCh: make(chan struct{}),
@@ -121,6 +137,8 @@ func (s *StreamFramer) Destroy() {
if running {
<-s.exitCh
}
// Close out chan only after exitCh has exited
if !wasShutdown {
close(s.out)
}
@@ -144,21 +162,12 @@ func (s *StreamFramer) ExitCh() <-chan struct{} {
return s.exitCh
}
// Err returns the error that caused the StreamFramer to exit
func (s *StreamFramer) Err() error {
s.l.Lock()
defer s.l.Unlock()
return s.err
}
// run is the internal run method. It exits if Destroy is called or an error
// occurs, in which case the exit channel is closed.
func (s *StreamFramer) run() {
var err error
defer func() {
s.l.Lock()
s.running = false
s.err = err
s.l.Unlock()
close(s.exitCh)
}()
@@ -177,40 +186,46 @@ OUTER:
}
// Read the data for the frame, and send it
s.f.Data = s.readData()
err = s.send(&s.f)
s.f.Clear()
s.send()
s.l.Unlock()
if err != nil {
return
}
case <-s.heartbeat.C:
// Send a heartbeat frame
if err = s.send(HeartbeatStreamFrame); err != nil {
return
select {
case s.out <- HeartbeatStreamFrame:
case <-s.shutdownCh:
}
}
}
s.l.Lock()
// Send() may have left a partial frame. Send it now.
if !s.f.IsCleared() {
s.f.Data = s.readData()
err = s.send(&s.f)
s.f.Clear()
// Only send if there's actually data left
if len(s.f.Data) > 0 {
// Cannot select on shutdownCh as it's already closed
// Cannot select on exitCh as it's only closed after this exits
s.out <- s.f.Copy()
}
}
s.l.Unlock()
}
// send takes a StreamFrame, encodes and sends it
func (s *StreamFramer) send(f *StreamFrame) error {
sending := *f
f.Data = nil
func (s *StreamFramer) send() {
// Ensure s.out has not already been closd by Destroy
select {
case s.out <- &sending:
return nil
case <-s.exitCh:
return nil
return
default:
}
s.f.Data = s.readData()
select {
case s.out <- s.f.Copy():
s.f.Clear()
case <-s.exitCh:
}
}
@@ -236,31 +251,16 @@ func (s *StreamFramer) readData() []byte {
func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error {
s.l.Lock()
defer s.l.Unlock()
// If we are not running, return the error that caused us to not run or
// indicated that it was never started.
if !s.running {
if s.err != nil {
return s.err
}
return fmt.Errorf("StreamFramer not running")
}
// Check if not mergeable
if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
err := s.send(&s.f)
s.f.Clear()
if err != nil {
return err
}
s.send()
}
// Store the new data as the current frame.
@@ -285,25 +285,24 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
force = false
}
// Create a new frame to send it
s.f.Data = s.readData()
// Ensure s.out has not already been closed by Destroy
select {
case <-s.exitCh:
return nil
default:
}
if err := s.send(&s.f); err != nil {
return err
// Create a new frame to send it
s.f.Data = s.readData()
select {
case s.out <- s.f.Copy():
case <-s.exitCh:
return nil
}
// Update the offset
s.f.Offset += int64(len(s.f.Data))
}
if s.data.Len() == 0 {
s.f.Clear()
}
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
)
// This test checks, that even if the frame size has not been hit, a flush will
@@ -132,8 +133,8 @@ func TestStreamFramer_Batch(t *testing.T) {
t.Fatalf("exit channel should close")
}
if _, ok := <-frames; ok {
t.Fatal("out channel should be closed")
if f, ok := <-frames; ok {
t.Fatalf("out channel should be closed. recv: %s", pretty.Sprint(f))
}
}

View File

@@ -338,15 +338,16 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
return nil, CodedError(500, handlerErr.Error())
}
p1, p2 := net.Pipe()
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
// Create a pipe connecting the (possibly remote) handler to the http response
httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
// Create a goroutine that closes the pipe if the connection closes.
ctx, cancel := context.WithCancel(req.Context())
go func() {
<-ctx.Done()
p1.Close()
httpPipe.Close()
}()
// Create an output that gets flushed on every write
@@ -355,10 +356,11 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
// Create a channel that decodes the results
errCh := make(chan HTTPCodedError)
go func() {
defer cancel()
// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
cancel()
return
}
@@ -366,7 +368,6 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
select {
case <-ctx.Done():
errCh <- nil
cancel()
return
default:
}
@@ -374,29 +375,29 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
var res cstructs.StreamErrWrapper
if err := decoder.Decode(&res); err != nil {
errCh <- CodedError(500, err.Error())
cancel()
return
}
decoder.Reset(httpPipe)
if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
cancel()
return
}
}
if _, err := io.Copy(output, bytes.NewBuffer(res.Payload)); err != nil {
if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil {
errCh <- CodedError(500, err.Error())
cancel()
return
}
}
}()
handler(p2)
handler(handlerPipe)
cancel()
codedErr := <-errCh
// Ignore EOF and ErrClosedPipe errors.
if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), "closed") ||