unblock the readers to add liveness when using -n

This commit is contained in:
Alex Dadgar
2016-07-19 19:48:16 -07:00
parent 4cf65d7944
commit ac1cfd1821
6 changed files with 180 additions and 18 deletions

View File

@@ -362,6 +362,8 @@ type FrameReader struct {
cancelCh chan struct{}
closed bool
unblockTime time.Duration
frame *StreamFrame
frameOffset int
@@ -381,6 +383,12 @@ func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameRe
}
}
// SetUnblockTime sets the time to unblock and return zero bytes read. If the
// duration is unset or is zero or less, the read will block til data is read.
func (f *FrameReader) SetUnblockTime(d time.Duration) {
f.unblockTime = d
}
// Offset returns the offset into the stream.
func (f *FrameReader) Offset() int {
return f.byteOffset
@@ -390,14 +398,23 @@ func (f *FrameReader) Offset() int {
// when there are no more frames.
func (f *FrameReader) Read(p []byte) (n int, err error) {
if f.frame == nil {
frame, ok := <-f.frames
if !ok {
return 0, io.EOF
var unblock <-chan time.Time
if f.unblockTime.Nanoseconds() > 0 {
unblock = time.After(f.unblockTime)
}
f.frame = frame
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
select {
case frame, ok := <-f.frames:
if !ok {
return 0, io.EOF
}
f.frame = frame
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
case <-unblock:
return 0, nil
}
}
if f.frame.FileEvent != "" && len(f.fileEvent) == 0 {

View File

@@ -4,6 +4,7 @@ import (
"io"
"reflect"
"testing"
"time"
)
func TestFS_FrameReader(t *testing.T) {
@@ -73,3 +74,39 @@ func TestFS_FrameReader(t *testing.T) {
t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected))
}
}
func TestFS_FrameReader_Unblock(t *testing.T) {
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
cancelCh := make(chan struct{})
r := NewFrameReader(framesCh, cancelCh)
r.SetUnblockTime(10 * time.Millisecond)
// Read a little
p := make([]byte, 12)
n, err := r.Read(p)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
if n != 0 {
t.Fatalf("should have unblocked")
}
// Unset the unblock
r.SetUnblockTime(0)
resultCh := make(chan struct{})
go func() {
r.Read(p)
close(resultCh)
}()
select {
case <-resultCh:
t.Fatalf("shouldn't have unblocked")
case <-time.After(300 * time.Millisecond):
}
}

View File

@@ -282,7 +282,7 @@ func (f *FSCommand) Run(args []string) int {
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0)
}
}
@@ -321,7 +321,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0)
}
go func() {

View File

@@ -105,17 +105,26 @@ type LineLimitReader struct {
lines int
searchLimit int
timeLimit time.Duration
lastRead time.Time
buffer *bytes.Buffer
bufFiled bool
foundLines bool
}
// NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find
// searching backwards in the first searchLimit bytes.
func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader {
// searching backwards in the first searchLimit bytes. timeLimit can optionally
// be specified by passing a non-zero duration. When set, the search for the
// last n lines is aborted if no data has been read in the duration. This
// can be used to flush what is had if no extra data is being received. When
// used, the underlying reader must not block forever and must periodically
// unblock even when no data has been read.
func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int, timeLimit time.Duration) *LineLimitReader {
return &LineLimitReader{
ReadCloser: r,
searchLimit: searchLimit,
timeLimit: timeLimit,
lines: lines,
buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)),
}
@@ -124,14 +133,52 @@ func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReade
func (l *LineLimitReader) Read(p []byte) (n int, err error) {
// Fill up the buffer so we can find the correct number of lines.
if !l.bufFiled {
_, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit)))
if err != nil {
return 0, err
b := make([]byte, len(p))
n, err := l.ReadCloser.Read(b)
if n > 0 {
if _, err := l.buffer.Write(b[:n]); err != nil {
return 0, err
}
}
l.bufFiled = true
if err != nil {
if err != io.EOF {
return 0, err
}
l.bufFiled = true
goto READ
}
if l.buffer.Len() >= l.searchLimit {
l.bufFiled = true
goto READ
}
if l.timeLimit.Nanoseconds() > 0 {
if l.lastRead.IsZero() {
l.lastRead = time.Now()
return 0, nil
}
now := time.Now()
if n == 0 {
// We hit the limit
if l.lastRead.Add(l.timeLimit).Before(now) {
l.bufFiled = true
goto READ
} else {
return 0, nil
}
} else {
l.lastRead = now
}
}
return 0, nil
}
READ:
if l.bufFiled && l.buffer.Len() != 0 {
b := l.buffer.Bytes()

View File

@@ -1,9 +1,12 @@
package command
import (
"io"
"io/ioutil"
"reflect"
"strings"
"testing"
"time"
"github.com/mitchellh/cli"
)
@@ -48,7 +51,7 @@ func TestHelpers_NodeID(t *testing.T) {
}
}
func TestHelpers_LineLimitReader(t *testing.T) {
func TestHelpers_LineLimitReader_NoTimeLimit(t *testing.T) {
helloString := `hello
world
this
@@ -114,7 +117,7 @@ test`,
for i, c := range cases {
in := ioutil.NopCloser(strings.NewReader(c.Input))
limit := NewLineLimitReader(in, c.Lines, c.SearchLimit)
limit := NewLineLimitReader(in, c.Lines, c.SearchLimit, 0)
outBytes, err := ioutil.ReadAll(limit)
if err != nil {
t.Fatalf("case %d failed: %v", i, err)
@@ -126,3 +129,59 @@ test`,
}
}
}
type testReadCloser struct {
data chan []byte
}
func (t *testReadCloser) Read(p []byte) (n int, err error) {
select {
case b, ok := <-t.data:
if !ok {
return 0, io.EOF
}
return copy(p, b), nil
case <-time.After(10 * time.Millisecond):
return 0, nil
}
}
func (t *testReadCloser) Close() error {
close(t.data)
return nil
}
func TestHelpers_LineLimitReader_TimeLimit(t *testing.T) {
// Create the test reader
in := &testReadCloser{data: make(chan []byte)}
// Set up the reader such that it won't hit the line/buffer limit and could
// only terminate if it hits the time limit
limit := NewLineLimitReader(in, 1000, 1000, 100*time.Millisecond)
expected := []byte("hello world")
resultCh := make(chan struct{})
go func() {
outBytes, err := ioutil.ReadAll(limit)
if err != nil {
t.Fatalf("ReadAll failed: %v", err)
}
if reflect.DeepEqual(outBytes, expected) {
close(resultCh)
return
}
}()
// Send the data
in.data <- expected
in.Close()
select {
case <-resultCh:
case <-time.After(1 * time.Second):
t.Fatalf("did not exit by time limit")
}
}

View File

@@ -7,6 +7,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"
"github.com/hashicorp/nomad/api"
)
@@ -33,7 +34,7 @@ Logs Specific Options:
-job <job-id>
Use a random allocation from a specified job-id.
-tail
-tail
Show the files contents with offsets relative to the end of the file. If no
offset is given, -n is defaulted to 10.
@@ -182,7 +183,7 @@ func (l *LogsCommand) Run(args []string) int {
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second)
}
if readErr != nil {
@@ -216,6 +217,7 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation,
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, cancel)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader
go func() {