From a8e1f352407967e2ac113526491df5bcc4e6d43d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 15:57:44 -0700 Subject: [PATCH 01/10] tests: test logs from client<->api package --- api/fs_test.go | 147 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/api/fs_test.go b/api/fs_test.go index bd9219fc2..e1101c92d 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -1,14 +1,161 @@ package api import ( + "bytes" "fmt" "io" "reflect" "strings" "testing" "time" + + units "github.com/docker/go-units" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/testutil" + "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) +func TestFS_Logs(t *testing.T) { + t.Parallel() + require := require.New(t) + rpcPort := 0 + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { + rpcPort = c.Ports.RPC + c.Client = &testutil.ClientConfig{ + Enabled: true, + } + }) + defer s.Stop() + + //TODO There should be a way to connect the client to the servers in + //makeClient above + require.NoError(c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)})) + + index := uint64(0) + testutil.WaitForResult(func() (bool, error) { + nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + index = qm.LastIndex + if len(nodes) != 1 { + return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes)) + } + if nodes[0].Status != "ready" { + return false, fmt.Errorf("node not ready: %s", nodes[0].Status) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + var input strings.Builder + input.Grow(units.MB) + lines := 80 * units.KB + for i := 0; i < lines; i++ { + fmt.Fprintf(&input, "%d\n", i) + } + + job := &Job{ + ID: helper.StringToPtr("TestFS_Logs"), + Region: helper.StringToPtr("global"), + Datacenters: []string{"dc1"}, + Type: helper.StringToPtr("batch"), + TaskGroups: []*TaskGroup{ + { + Name: helper.StringToPtr("TestFS_LogsGroup"), + Tasks: []*Task{ + { + Name: "logger", + Driver: "mock_driver", + Config: map[string]interface{}{ + "stdout_string": input.String(), + }, + }, + }, + }, + }, + } + + jobs := c.Jobs() + jobResp, _, err := jobs.Register(job, nil) + require.NoError(err) + + index = jobResp.EvalCreateIndex + evals := c.Evaluations() + testutil.WaitForResult(func() (bool, error) { + evalResp, qm, err := evals.Info(jobResp.EvalID, &QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + if evalResp.BlockedEval != "" { + t.Fatalf("Eval blocked: %s", pretty.Sprint(evalResp)) + } + index = qm.LastIndex + if evalResp.Status != "complete" { + return false, fmt.Errorf("eval status: %v", evalResp.Status) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + allocID := "" + testutil.WaitForResult(func() (bool, error) { + allocs, _, err := jobs.Allocations(*job.ID, true, &QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + if len(allocs) != 1 { + return false, fmt.Errorf("unexpected number of allocs: %d", len(allocs)) + } + if allocs[0].ClientStatus != "complete" { + return false, fmt.Errorf("alloc not complete: %s", allocs[0].ClientStatus) + } + allocID = allocs[0].ID + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + alloc, _, err := c.Allocations().Info(allocID, nil) + require.NoError(err) + + for i := 0; i < 3; i++ { + stopCh := make(chan struct{}) + defer close(stopCh) + + frames, errors := c.AllocFS().Logs(alloc, false, "logger", "stdout", "start", 0, stopCh, nil) + + var result bytes.Buffer + READ_FRAMES: + for { + select { + case f := <-frames: + if f == nil { + break READ_FRAMES + } + result.Write(f.Data) + case err := <-errors: + // Don't Fatal here as the other assertions may + // contain helpeful information. + t.Errorf("Error: %v", err) + } + } + + // Check length + require.Equal(input.Len(), result.Len()) + + // Check complete ordering + for i := 0; i < lines; i++ { + line, err := result.ReadBytes('\n') + require.NoErrorf(err, "unexpected error on line %d: %v", i, err) + require.Equal(fmt.Sprintf("%d\n", i), string(line)) + } + } +} + func TestFS_FrameReader(t *testing.T) { t.Parallel() // Create a channel of the frames and a cancel channel From 41f05dc2c4bfbf3007ead084aaff407932cae950 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 16:04:06 -0700 Subject: [PATCH 02/10] api: never return EOF from Logs error chan Closing the frames chan is the only race-free way to signal to receivers that all frames have been sent and no errors have occurred. If EOF is sent on error chan receivers may not receive the last frame (or frames since the chan is buffered) before receiving the error. Closing frames is the idiomatic way of signaling there is no more data to be read from a chan. --- api/fs.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/api/fs.go b/api/fs.go index 45d2a676a..107e55303 100644 --- a/api/fs.go +++ b/api/fs.go @@ -254,10 +254,15 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, // * cancel: A channel that when closed, streaming will end. // // The return value is a channel that will emit StreamFrames as they are read. +// The chan will be closed when follow=false and the end of the file is +// reached. +// +// Unexpected (non-EOF) errors will be sent on the error chan. func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string, offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { errCh := make(chan error, 1) + nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) if err != nil { errCh <- err @@ -315,8 +320,11 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str // Decode the next frame var frame StreamFrame if err := dec.Decode(&frame); err != nil { - errCh <- err - close(frames) + if err == io.EOF || err == io.ErrClosedPipe { + close(frames) + } else { + errCh <- err + } return } From a7c71c1cdc875870faee891d63126dcfc2df2d42 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 16:07:23 -0700 Subject: [PATCH 03/10] client: reset encoders between uses According to go/codec's docs, Reset(...) should be called on Decoders/Encoders before reuse: https://godoc.org/github.com/ugorji/go/codec I could find no evidence that *not* calling Reset() caused bugs, but might as well do what the docs say? --- client/fs_endpoint.go | 7 +++++-- command/agent/fs_endpoint.go | 2 ++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index eaff009c7..9349b184b 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -294,6 +294,7 @@ OUTER: streamErr = err break OUTER } + encoder.Reset(conn) case <-ctx.Done(): break OUTER } @@ -405,8 +406,6 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, streamFramesBuffer) errCh := make(chan error) - var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) // Start streaming go func() { @@ -437,6 +436,8 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { }() var streamErr error + buf := new(bytes.Buffer) + frameCodec := codec.NewEncoder(buf, structs.JsonHandle) OUTER: for { select { @@ -455,6 +456,7 @@ OUTER: streamErr = err break OUTER } + frameCodec.Reset(buf) resp.Payload = buf.Bytes() buf.Reset() @@ -464,6 +466,7 @@ OUTER: streamErr = err break OUTER } + encoder.Reset(conn) } } diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index c19684c89..0f5f29c27 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -361,6 +361,7 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, cancel() return } + encoder.Reset(httpPipe) for { select { @@ -377,6 +378,7 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, cancel() return } + decoder.Reset(httpPipe) if err := res.Error; err != nil { if err.Code != nil { From 564854dd9712da6b08b27c2ca1b3f257347cdd7b Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 16:10:38 -0700 Subject: [PATCH 04/10] client: give pipe conns meaningful names --- command/agent/fs_endpoint.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 0f5f29c27..91e8afb85 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -338,15 +338,16 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, return nil, CodedError(500, handlerErr.Error()) } - p1, p2 := net.Pipe() - decoder := codec.NewDecoder(p1, structs.MsgpackHandle) - encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + // Create a pipe connecting the (possibly remote) handler to the http response + httpPipe, handlerPipe := net.Pipe() + decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) + encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) // Create a goroutine that closes the pipe if the connection closes. ctx, cancel := context.WithCancel(req.Context()) go func() { <-ctx.Done() - p1.Close() + httpPipe.Close() }() // Create an output that gets flushed on every write @@ -396,7 +397,7 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, } }() - handler(p2) + handler(handlerPipe) cancel() codedErr := <-errCh if codedErr != nil && From 63dad441b448bc61665f8378bb678c34a314c32c Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 16:11:08 -0700 Subject: [PATCH 05/10] client: ensure cancel is always called when func exits --- command/agent/fs_endpoint.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 91e8afb85..105c0481f 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -356,10 +356,11 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, // Create a channel that decodes the results errCh := make(chan HTTPCodedError) go func() { + defer cancel() + // Send the request if err := encoder.Encode(args); err != nil { errCh <- CodedError(500, err.Error()) - cancel() return } encoder.Reset(httpPipe) @@ -368,7 +369,6 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, select { case <-ctx.Done(): errCh <- nil - cancel() return default: } @@ -376,7 +376,6 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, var res cstructs.StreamErrWrapper if err := decoder.Decode(&res); err != nil { errCh <- CodedError(500, err.Error()) - cancel() return } decoder.Reset(httpPipe) @@ -384,14 +383,12 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, if err := res.Error; err != nil { if err.Code != nil { errCh <- CodedError(int(*err.Code), err.Error()) - cancel() return } } if _, err := io.Copy(output, bytes.NewBuffer(res.Payload)); err != nil { errCh <- CodedError(500, err.Error()) - cancel() return } } From cafcb89394e177bd5f4ce6ce19f3238990fbf73e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 16:16:39 -0700 Subject: [PATCH 06/10] client: don't spin on read errors --- client/fs_endpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 9349b184b..9f8440abd 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -429,8 +429,8 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { select { case errCh <- err: case <-ctx.Done(): - return } + return } } }() From aad596bb0fbbb1c69cccce199a6e9f1f561e114d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 16:17:11 -0700 Subject: [PATCH 07/10] client: squelch errors on cleanly closed pipes --- client/fs_endpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 9f8440abd..62423ad52 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -422,7 +422,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { go func() { for { if _, err := conn.Read(nil); err != nil { - if err == io.EOF { + if err == io.EOF || err == io.ErrClosedPipe { cancel() return } From 8776cf8a806457ad5fb3c7c5168d5b31b1264ffc Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 16:21:20 -0700 Subject: [PATCH 08/10] client: use a bytes.Reader for reading a []byte --- command/agent/fs_endpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 105c0481f..d8b7877b1 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -387,7 +387,7 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, } } - if _, err := io.Copy(output, bytes.NewBuffer(res.Payload)); err != nil { + if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil { errCh <- CodedError(500, err.Error()) return } From 361db269c21b9687d31de769364795a791fab93e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 16:39:02 -0700 Subject: [PATCH 09/10] framer: fix race and remove unused error var In the old code `sending` in the `send()` method shared the Data slice's underlying backing array with its caller. Clearing StreamFrame.Data didn't break the reference from the sent frame to the StreamFramer's data slice. --- client/fs_endpoint.go | 10 +-- client/lib/streamframer/framer.go | 112 +++++++++++++----------------- 2 files changed, 53 insertions(+), 69 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 62423ad52..904c5cf49 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -423,6 +423,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { for { if _, err := conn.Read(nil); err != nil { if err == io.EOF || err == io.ErrClosedPipe { + // One end of the pipe was explicitly closed, exit cleanly cancel() return } @@ -579,12 +580,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // #3342 select { case <-framer.ExitCh(): - err := parseFramerErr(framer.Err()) - if err == syscall.EPIPE { - // EPIPE just means the connection was closed - return nil - } - return err + return nil default: } @@ -708,7 +704,7 @@ OUTER: lastEvent = truncateEvent continue OUTER case <-framer.ExitCh(): - return parseFramerErr(framer.Err()) + return nil case <-ctx.Done(): return nil case err, ok := <-eofCancelCh: diff --git a/client/lib/streamframer/framer.go b/client/lib/streamframer/framer.go index b0caa4a04..5f2eeb626 100644 --- a/client/lib/streamframer/framer.go +++ b/client/lib/streamframer/framer.go @@ -55,6 +55,14 @@ func (s *StreamFrame) IsCleared() bool { } } +func (s *StreamFrame) Copy() *StreamFrame { + n := new(StreamFrame) + *n = *s + n.Data = make([]byte, len(s.Data)) + copy(n.Data, s.Data) + return n +} + // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { out chan<- *StreamFrame @@ -64,21 +72,24 @@ type StreamFramer struct { heartbeat *time.Ticker flusher *time.Ticker - shutdown bool + // shutdown is true when a shutdown is triggered + shutdown bool + + // shutdownCh is closed when a shutdown is triggered shutdownCh chan struct{} - exitCh chan struct{} + + // exitCh is closed when the run() goroutine exits + exitCh chan struct{} // The mutex protects everything below l sync.Mutex // The current working frame - f StreamFrame + f *StreamFrame data *bytes.Buffer - // Captures whether the framer is running and any error that occurred to - // cause it to stop. + // Captures whether the framer is running running bool - err error } // NewStreamFramer creates a new stream framer that will output StreamFrames to @@ -95,6 +106,7 @@ func NewStreamFramer(out chan<- *StreamFrame, frameSize: frameSize, heartbeat: heartbeat, flusher: flusher, + f: new(StreamFrame), data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)), shutdownCh: make(chan struct{}), exitCh: make(chan struct{}), @@ -110,6 +122,7 @@ func (s *StreamFramer) Destroy() { if !wasShutdown { close(s.shutdownCh) + close(s.out) } s.heartbeat.Stop() @@ -121,9 +134,6 @@ func (s *StreamFramer) Destroy() { if running { <-s.exitCh } - if !wasShutdown { - close(s.out) - } } // Run starts a long lived goroutine that handles sending data as well as @@ -144,24 +154,20 @@ func (s *StreamFramer) ExitCh() <-chan struct{} { return s.exitCh } -// Err returns the error that caused the StreamFramer to exit -func (s *StreamFramer) Err() error { - s.l.Lock() - defer s.l.Unlock() - return s.err -} - // run is the internal run method. It exits if Destroy is called or an error // occurs, in which case the exit channel is closed. func (s *StreamFramer) run() { - var err error defer func() { s.l.Lock() s.running = false - s.err = err s.l.Unlock() close(s.exitCh) }() + defer func() { + if r := recover(); r != nil { + fmt.Println("Recovered in f", r) + } + }() OUTER: for { @@ -177,40 +183,38 @@ OUTER: } // Read the data for the frame, and send it - s.f.Data = s.readData() - err = s.send(&s.f) - s.f.Clear() + s.send() s.l.Unlock() - if err != nil { - return - } case <-s.heartbeat.C: // Send a heartbeat frame - if err = s.send(HeartbeatStreamFrame); err != nil { - return + select { + case s.out <- HeartbeatStreamFrame: + case <-s.shutdownCh: } } } s.l.Lock() if !s.f.IsCleared() { - s.f.Data = s.readData() - err = s.send(&s.f) - s.f.Clear() + s.send() } s.l.Unlock() } // send takes a StreamFrame, encodes and sends it -func (s *StreamFramer) send(f *StreamFrame) error { - sending := *f - f.Data = nil - +func (s *StreamFramer) send() { + // Ensure s.out has not already been closd by Destroy select { - case s.out <- &sending: - return nil - case <-s.exitCh: - return nil + case <-s.shutdownCh: + return + default: + } + + s.f.Data = s.readData() + select { + case s.out <- s.f.Copy(): + s.f.Clear() + case <-s.shutdownCh: } } @@ -236,31 +240,16 @@ func (s *StreamFramer) readData() []byte { func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error { s.l.Lock() defer s.l.Unlock() - // If we are not running, return the error that caused us to not run or // indicated that it was never started. if !s.running { - if s.err != nil { - return s.err - } - return fmt.Errorf("StreamFramer not running") } // Check if not mergeable if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) { // Flush the old frame - s.f.Data = s.readData() - select { - case <-s.exitCh: - return nil - default: - } - err := s.send(&s.f) - s.f.Clear() - if err != nil { - return err - } + s.send() } // Store the new data as the current frame. @@ -285,25 +274,24 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e force = false } - // Create a new frame to send it - s.f.Data = s.readData() + // Ensure s.out has not already been closd by Destroy select { - case <-s.exitCh: + case <-s.shutdownCh: return nil default: } - if err := s.send(&s.f); err != nil { - return err + // Create a new frame to send it + s.f.Data = s.readData() + select { + case s.out <- s.f.Copy(): + case <-s.shutdownCh: + return nil } // Update the offset s.f.Offset += int64(len(s.f.Data)) } - if s.data.Len() == 0 { - s.f.Clear() - } - return nil } From 6858c520b2afed92c5d62d200416337ec2176af4 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 2 May 2018 09:47:08 -0700 Subject: [PATCH 10/10] framer: fix early exit/truncation in framer --- api/fs_test.go | 3 +- client/lib/streamframer/framer.go | 39 +++++++++++++++++--------- client/lib/streamframer/framer_test.go | 5 ++-- command/agent/fs_endpoint.go | 3 +- 4 files changed, 32 insertions(+), 18 deletions(-) diff --git a/api/fs_test.go b/api/fs_test.go index e1101c92d..790d2ca7c 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -145,7 +146,7 @@ func TestFS_Logs(t *testing.T) { } // Check length - require.Equal(input.Len(), result.Len()) + assert.Equal(t, input.Len(), result.Len(), "file size mismatch") // Check complete ordering for i := 0; i < lines; i++ { diff --git a/client/lib/streamframer/framer.go b/client/lib/streamframer/framer.go index 5f2eeb626..812f775c6 100644 --- a/client/lib/streamframer/framer.go +++ b/client/lib/streamframer/framer.go @@ -65,6 +65,8 @@ func (s *StreamFrame) Copy() *StreamFrame { // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { + // out is where frames are sent and is closed when no more frames will + // be sent. out chan<- *StreamFrame frameSize int @@ -75,10 +77,12 @@ type StreamFramer struct { // shutdown is true when a shutdown is triggered shutdown bool - // shutdownCh is closed when a shutdown is triggered + // shutdownCh is closed when no more Send()s will be called and run() + // should flush pending frames before closing exitCh shutdownCh chan struct{} - // exitCh is closed when the run() goroutine exits + // exitCh is closed when the run() goroutine exits and no more frames + // will be sent. exitCh chan struct{} // The mutex protects everything below @@ -122,7 +126,6 @@ func (s *StreamFramer) Destroy() { if !wasShutdown { close(s.shutdownCh) - close(s.out) } s.heartbeat.Stop() @@ -134,6 +137,11 @@ func (s *StreamFramer) Destroy() { if running { <-s.exitCh } + + // Close out chan only after exitCh has exited + if !wasShutdown { + close(s.out) + } } // Run starts a long lived goroutine that handles sending data as well as @@ -163,11 +171,6 @@ func (s *StreamFramer) run() { s.l.Unlock() close(s.exitCh) }() - defer func() { - if r := recover(); r != nil { - fmt.Println("Recovered in f", r) - } - }() OUTER: for { @@ -195,8 +198,16 @@ OUTER: } s.l.Lock() + // Send() may have left a partial frame. Send it now. if !s.f.IsCleared() { - s.send() + s.f.Data = s.readData() + + // Only send if there's actually data left + if len(s.f.Data) > 0 { + // Cannot select on shutdownCh as it's already closed + // Cannot select on exitCh as it's only closed after this exits + s.out <- s.f.Copy() + } } s.l.Unlock() } @@ -205,7 +216,7 @@ OUTER: func (s *StreamFramer) send() { // Ensure s.out has not already been closd by Destroy select { - case <-s.shutdownCh: + case <-s.exitCh: return default: } @@ -214,7 +225,7 @@ func (s *StreamFramer) send() { select { case s.out <- s.f.Copy(): s.f.Clear() - case <-s.shutdownCh: + case <-s.exitCh: } } @@ -274,9 +285,9 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e force = false } - // Ensure s.out has not already been closd by Destroy + // Ensure s.out has not already been closed by Destroy select { - case <-s.shutdownCh: + case <-s.exitCh: return nil default: } @@ -285,7 +296,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e s.f.Data = s.readData() select { case s.out <- s.f.Copy(): - case <-s.shutdownCh: + case <-s.exitCh: return nil } diff --git a/client/lib/streamframer/framer_test.go b/client/lib/streamframer/framer_test.go index 13be32141..12d5fefaa 100644 --- a/client/lib/streamframer/framer_test.go +++ b/client/lib/streamframer/framer_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/nomad/testutil" + "github.com/kr/pretty" ) // This test checks, that even if the frame size has not been hit, a flush will @@ -132,8 +133,8 @@ func TestStreamFramer_Batch(t *testing.T) { t.Fatalf("exit channel should close") } - if _, ok := <-frames; ok { - t.Fatal("out channel should be closed") + if f, ok := <-frames; ok { + t.Fatalf("out channel should be closed. recv: %s", pretty.Sprint(f)) } } diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index d8b7877b1..9da2c4f6c 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -363,7 +363,6 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, errCh <- CodedError(500, err.Error()) return } - encoder.Reset(httpPipe) for { select { @@ -397,6 +396,8 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, handler(handlerPipe) cancel() codedErr := <-errCh + + // Ignore EOF and ErrClosedPipe errors. if codedErr != nil && (codedErr == io.EOF || strings.Contains(codedErr.Error(), "closed") ||