From 65a64fb1cba40f2e6a8121e31f5f85fac28e7102 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 10 Jul 2016 13:55:52 -0400 Subject: [PATCH] StreamFramer tests --- command/agent/fs_endpoint.go | 27 ++-- command/agent/fs_endpoint_test.go | 214 +++++++++++++++++++++++++++++- vendor/gopkg.in/tomb.v1/LICENSE | 29 ++++ vendor/gopkg.in/tomb.v1/README.md | 4 + vendor/gopkg.in/tomb.v1/tomb.go | 176 ++++++++++++++++++++++++ 5 files changed, 437 insertions(+), 13 deletions(-) create mode 100644 vendor/gopkg.in/tomb.v1/LICENSE create mode 100644 vendor/gopkg.in/tomb.v1/README.md create mode 100644 vendor/gopkg.in/tomb.v1/tomb.go diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 87a1698c4..18febc0aa 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -27,8 +27,8 @@ var ( ) const ( - // frameSize is the maximum number of bytes to send in a single frame - frameSize = 64 * 1024 + // streamFrameSize is the maximum number of bytes to send in a single frame + streamFrameSize = 64 * 1024 // streamHeartbeatRate is the rate at which a heartbeat will occur to detect // a closed connection without sending any additional data @@ -190,10 +190,16 @@ type StreamFrame struct { FileEvent string } +// IsHeartbeat returns if the frame is a heartbeat frame +func (s *StreamFrame) IsHeartbeat() bool { + return s.Offset == 0 && s.Data == "" && s.File == "" && s.FileEvent == "" +} + // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { out io.WriteCloser enc *codec.Encoder + frameSize int heartbeat *time.Ticker flusher *time.Ticker shutdown chan struct{} @@ -216,17 +222,18 @@ type StreamFramer struct { // NewStreamFramer creates a new stream framer that will output StreamFrames to // the passed output. -func NewStreamFramer(out io.WriteCloser) *StreamFramer { +func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { // Create a JSON encoder enc := codec.NewEncoder(out, jsonHandle) // Create the heartbeat and flush ticker - heartbeat := time.NewTicker(streamHeartbeatRate) - flusher := time.NewTicker(streamBatchWindow) + heartbeat := time.NewTicker(heartbeatRate) + flusher := time.NewTicker(batchWindow) return &StreamFramer{ out: out, enc: enc, + frameSize: frameSize, heartbeat: heartbeat, flusher: flusher, outbound: make(chan *StreamFrame), @@ -328,8 +335,8 @@ func (s *StreamFramer) run() { func (s *StreamFramer) readData() string { // Compute the amount to read from the buffer size := s.data.Len() - if size > frameSize { - size = frameSize + if size > s.frameSize { + size = s.frameSize } return base64.StdEncoding.EncodeToString(s.data.Next(size)) } @@ -375,7 +382,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e s.data.Write(data) // Flush till we are under the max frame size - for s.data.Len() >= frameSize { + for s.data.Len() >= s.frameSize { // Create a new frame to send it s.outbound <- &StreamFrame{ Offset: s.f.Offset, @@ -471,7 +478,7 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o }() // Create the framer - framer := NewStreamFramer(output) + framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -483,7 +490,7 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o var changes *watch.FileChanges // Start streaming the data - data := make([]byte, frameSize) + data := make([]byte, streamFrameSize) OUTER: for { // Read up to the max frame size diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 9958ba6c8..5513371ad 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -1,9 +1,14 @@ package agent import ( + "encoding/base64" + "io" "net/http" "net/http/httptest" "testing" + "time" + + "github.com/ugorji/go/codec" ) func TestAllocDirFS_List_MissingParams(t *testing.T) { @@ -85,7 +90,210 @@ func TestAllocDirFS_ReadAt_MissingParams(t *testing.T) { }) } -func TestHTTP_FsStream_EOF_Modify(t *testing.T) { - httpTest(t, nil, func(s *TestServer) { - }) +type WriteCloseChecker struct { + io.WriteCloser + Closed bool +} + +func (w *WriteCloseChecker) Close() error { + w.Closed = true + return w.WriteCloser.Close() +} + +// This test checks, that even if the frame size has not been hit, a flush will +// periodically occur. +func TestStreamFramer_Flush(t *testing.T) { + // Create the stream framer + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond + sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + sf.Run() + + // Create a decoder + dec := codec.NewDecoder(r, jsonHandle) + + f := "foo" + fe := "bar" + d := []byte{0xa} + expected := base64.StdEncoding.EncodeToString(d) + o := int64(10) + + // Start the reader + resultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + t.Fatalf("failed to decode") + } + + if frame.IsHeartbeat() { + continue + } + + if frame.Data == expected && frame.Offset == o && frame.File == f && frame.FileEvent == fe { + resultCh <- struct{}{} + return + } + } + }() + + // Write only 1 byte so we do not hit the frame size + if err := sf.Send(f, fe, d, o); err != nil { + t.Fatalf("Send() failed %v", err) + } + + select { + case <-resultCh: + case <-time.After(2 * bWindow): + t.Fatalf("failed to flush") + } + + // Close the reader and wait. This should cause the runner to exit + if err := r.Close(); err != nil { + t.Fatalf("failed to close reader") + } + + select { + case <-sf.ExitCh(): + case <-time.After(2 * hRate): + t.Fatalf("exit channel should close") + } + + sf.Destroy() + if !wrappedW.Closed { + t.Fatalf("writer not closed") + } +} + +// This test checks that frames will be batched till the frame size is hit (in +// the case that is before the flush). +func TestStreamFramer_Batch(t *testing.T) { + // Create the stream framer + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + // Ensure the batch window doesn't get hit + hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond + sf := NewStreamFramer(wrappedW, hRate, bWindow, 3) + sf.Run() + + // Create a decoder + dec := codec.NewDecoder(r, jsonHandle) + + f := "foo" + fe := "bar" + d := []byte{0xa, 0xb, 0xc} + expected := base64.StdEncoding.EncodeToString(d) + o := int64(10) + + // Start the reader + resultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + t.Fatalf("failed to decode") + } + + if frame.IsHeartbeat() { + continue + } + + if frame.Data == expected && frame.Offset == o && frame.File == f && frame.FileEvent == fe { + resultCh <- struct{}{} + return + } + } + }() + + // Write only 1 byte so we do not hit the frame size + if err := sf.Send(f, fe, d[:1], o); err != nil { + t.Fatalf("Send() failed %v", err) + } + + // Ensure we didn't get any data + select { + case <-resultCh: + t.Fatalf("Got data before frame size reached") + case <-time.After(bWindow / 2): + } + + // Write the rest so we hit the frame size + if err := sf.Send(f, fe, d[1:], o); err != nil { + t.Fatalf("Send() failed %v", err) + } + + // Ensure we get data + select { + case <-resultCh: + case <-time.After(2 * bWindow): + t.Fatalf("Did not receive data after batch size reached") + } + + // Close the reader and wait. This should cause the runner to exit + if err := r.Close(); err != nil { + t.Fatalf("failed to close reader") + } + + select { + case <-sf.ExitCh(): + case <-time.After(2 * hRate): + t.Fatalf("exit channel should close") + } + + sf.Destroy() + if !wrappedW.Closed { + t.Fatalf("writer not closed") + } +} + +func TestStreamFramer_Heartbeat(t *testing.T) { + // Create the stream framer + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond + sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + sf.Run() + + // Create a decoder + dec := codec.NewDecoder(r, jsonHandle) + + // Start the reader + resultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + t.Fatalf("failed to decode") + } + + if frame.IsHeartbeat() { + resultCh <- struct{}{} + return + } + } + }() + + select { + case <-resultCh: + case <-time.After(2 * hRate): + t.Fatalf("failed to heartbeat") + } + + // Close the reader and wait. This should cause the runner to exit + if err := r.Close(); err != nil { + t.Fatalf("failed to close reader") + } + + select { + case <-sf.ExitCh(): + case <-time.After(2 * hRate): + t.Fatalf("exit channel should close") + } + + sf.Destroy() + if !wrappedW.Closed { + t.Fatalf("writer not closed") + } } diff --git a/vendor/gopkg.in/tomb.v1/LICENSE b/vendor/gopkg.in/tomb.v1/LICENSE new file mode 100644 index 000000000..a4249bb31 --- /dev/null +++ b/vendor/gopkg.in/tomb.v1/LICENSE @@ -0,0 +1,29 @@ +tomb - support for clean goroutine termination in Go. + +Copyright (c) 2010-2011 - Gustavo Niemeyer + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + * Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/gopkg.in/tomb.v1/README.md b/vendor/gopkg.in/tomb.v1/README.md new file mode 100644 index 000000000..3ae8788e8 --- /dev/null +++ b/vendor/gopkg.in/tomb.v1/README.md @@ -0,0 +1,4 @@ +Installation and usage +---------------------- + +See [gopkg.in/tomb.v1](https://gopkg.in/tomb.v1) for documentation and usage details. diff --git a/vendor/gopkg.in/tomb.v1/tomb.go b/vendor/gopkg.in/tomb.v1/tomb.go new file mode 100644 index 000000000..9aec56d82 --- /dev/null +++ b/vendor/gopkg.in/tomb.v1/tomb.go @@ -0,0 +1,176 @@ +// Copyright (c) 2011 - Gustavo Niemeyer +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// The tomb package offers a conventional API for clean goroutine termination. +// +// A Tomb tracks the lifecycle of a goroutine as alive, dying or dead, +// and the reason for its death. +// +// The zero value of a Tomb assumes that a goroutine is about to be +// created or already alive. Once Kill or Killf is called with an +// argument that informs the reason for death, the goroutine is in +// a dying state and is expected to terminate soon. Right before the +// goroutine function or method returns, Done must be called to inform +// that the goroutine is indeed dead and about to stop running. +// +// A Tomb exposes Dying and Dead channels. These channels are closed +// when the Tomb state changes in the respective way. They enable +// explicit blocking until the state changes, and also to selectively +// unblock select statements accordingly. +// +// When the tomb state changes to dying and there's still logic going +// on within the goroutine, nested functions and methods may choose to +// return ErrDying as their error value, as this error won't alter the +// tomb state if provided to the Kill method. This is a convenient way to +// follow standard Go practices in the context of a dying tomb. +// +// For background and a detailed example, see the following blog post: +// +// http://blog.labix.org/2011/10/09/death-of-goroutines-under-control +// +// For a more complex code snippet demonstrating the use of multiple +// goroutines with a single Tomb, see: +// +// http://play.golang.org/p/Xh7qWsDPZP +// +package tomb + +import ( + "errors" + "fmt" + "sync" +) + +// A Tomb tracks the lifecycle of a goroutine as alive, dying or dead, +// and the reason for its death. +// +// See the package documentation for details. +type Tomb struct { + m sync.Mutex + dying chan struct{} + dead chan struct{} + reason error +} + +var ( + ErrStillAlive = errors.New("tomb: still alive") + ErrDying = errors.New("tomb: dying") +) + +func (t *Tomb) init() { + t.m.Lock() + if t.dead == nil { + t.dead = make(chan struct{}) + t.dying = make(chan struct{}) + t.reason = ErrStillAlive + } + t.m.Unlock() +} + +// Dead returns the channel that can be used to wait +// until t.Done has been called. +func (t *Tomb) Dead() <-chan struct{} { + t.init() + return t.dead +} + +// Dying returns the channel that can be used to wait +// until t.Kill or t.Done has been called. +func (t *Tomb) Dying() <-chan struct{} { + t.init() + return t.dying +} + +// Wait blocks until the goroutine is in a dead state and returns the +// reason for its death. +func (t *Tomb) Wait() error { + t.init() + <-t.dead + t.m.Lock() + reason := t.reason + t.m.Unlock() + return reason +} + +// Done flags the goroutine as dead, and should be called a single time +// right before the goroutine function or method returns. +// If the goroutine was not already in a dying state before Done is +// called, it will be flagged as dying and dead at once with no +// error. +func (t *Tomb) Done() { + t.Kill(nil) + close(t.dead) +} + +// Kill flags the goroutine as dying for the given reason. +// Kill may be called multiple times, but only the first +// non-nil error is recorded as the reason for termination. +// +// If reason is ErrDying, the previous reason isn't replaced +// even if it is nil. It's a runtime error to call Kill with +// ErrDying if t is not in a dying state. +func (t *Tomb) Kill(reason error) { + t.init() + t.m.Lock() + defer t.m.Unlock() + if reason == ErrDying { + if t.reason == ErrStillAlive { + panic("tomb: Kill with ErrDying while still alive") + } + return + } + if t.reason == nil || t.reason == ErrStillAlive { + t.reason = reason + } + // If the receive on t.dying succeeds, then + // it can only be because we have already closed it. + // If it blocks, then we know that it needs to be closed. + select { + case <-t.dying: + default: + close(t.dying) + } +} + +// Killf works like Kill, but builds the reason providing the received +// arguments to fmt.Errorf. The generated error is also returned. +func (t *Tomb) Killf(f string, a ...interface{}) error { + err := fmt.Errorf(f, a...) + t.Kill(err) + return err +} + +// Err returns the reason for the goroutine death provided via Kill +// or Killf, or ErrStillAlive when the goroutine is still alive. +func (t *Tomb) Err() (reason error) { + t.init() + t.m.Lock() + reason = t.reason + t.m.Unlock() + return +}