From ac1cfd182153296c75df1e1bcc52d2802d33d402 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 19 Jul 2016 19:48:16 -0700 Subject: [PATCH] unblock the readers to add liveness when using -n --- api/fs.go | 29 +++++++++++++++---- api/fs_test.go | 37 ++++++++++++++++++++++++ command/fs.go | 4 +-- command/helpers.go | 59 ++++++++++++++++++++++++++++++++++---- command/helpers_test.go | 63 +++++++++++++++++++++++++++++++++++++++-- command/logs.go | 6 ++-- 6 files changed, 180 insertions(+), 18 deletions(-) diff --git a/api/fs.go b/api/fs.go index 4528b36b0..5be452c2b 100644 --- a/api/fs.go +++ b/api/fs.go @@ -362,6 +362,8 @@ type FrameReader struct { cancelCh chan struct{} closed bool + unblockTime time.Duration + frame *StreamFrame frameOffset int @@ -381,6 +383,12 @@ func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameRe } } +// SetUnblockTime sets the time to unblock and return zero bytes read. If the +// duration is unset or is zero or less, the read will block til data is read. +func (f *FrameReader) SetUnblockTime(d time.Duration) { + f.unblockTime = d +} + // Offset returns the offset into the stream. func (f *FrameReader) Offset() int { return f.byteOffset @@ -390,14 +398,23 @@ func (f *FrameReader) Offset() int { // when there are no more frames. func (f *FrameReader) Read(p []byte) (n int, err error) { if f.frame == nil { - frame, ok := <-f.frames - if !ok { - return 0, io.EOF + var unblock <-chan time.Time + if f.unblockTime.Nanoseconds() > 0 { + unblock = time.After(f.unblockTime) } - f.frame = frame - // Store the total offset into the file - f.byteOffset = int(f.frame.Offset) + select { + case frame, ok := <-f.frames: + if !ok { + return 0, io.EOF + } + f.frame = frame + + // Store the total offset into the file + f.byteOffset = int(f.frame.Offset) + case <-unblock: + return 0, nil + } } if f.frame.FileEvent != "" && len(f.fileEvent) == 0 { diff --git a/api/fs_test.go b/api/fs_test.go index 25107d9c8..bea8d5984 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -4,6 +4,7 @@ import ( "io" "reflect" "testing" + "time" ) func TestFS_FrameReader(t *testing.T) { @@ -73,3 +74,39 @@ func TestFS_FrameReader(t *testing.T) { t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected)) } } + +func TestFS_FrameReader_Unblock(t *testing.T) { + // Create a channel of the frames and a cancel channel + framesCh := make(chan *StreamFrame, 3) + cancelCh := make(chan struct{}) + + r := NewFrameReader(framesCh, cancelCh) + r.SetUnblockTime(10 * time.Millisecond) + + // Read a little + p := make([]byte, 12) + + n, err := r.Read(p) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + + if n != 0 { + t.Fatalf("should have unblocked") + } + + // Unset the unblock + r.SetUnblockTime(0) + + resultCh := make(chan struct{}) + go func() { + r.Read(p) + close(resultCh) + }() + + select { + case <-resultCh: + t.Fatalf("shouldn't have unblocked") + case <-time.After(300 * time.Millisecond): + } +} diff --git a/command/fs.go b/command/fs.go index 2cf1b45c9..a9d67492a 100644 --- a/command/fs.go +++ b/command/fs.go @@ -282,7 +282,7 @@ func (f *FSCommand) Run(args []string) int { // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0) } } @@ -321,7 +321,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0) } go func() { diff --git a/command/helpers.go b/command/helpers.go index 36f3455cc..b9c4a6be7 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -105,17 +105,26 @@ type LineLimitReader struct { lines int searchLimit int + timeLimit time.Duration + lastRead time.Time + buffer *bytes.Buffer bufFiled bool foundLines bool } // NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find -// searching backwards in the first searchLimit bytes. -func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader { +// searching backwards in the first searchLimit bytes. timeLimit can optionally +// be specified by passing a non-zero duration. When set, the search for the +// last n lines is aborted if no data has been read in the duration. This +// can be used to flush what is had if no extra data is being received. When +// used, the underlying reader must not block forever and must periodically +// unblock even when no data has been read. +func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int, timeLimit time.Duration) *LineLimitReader { return &LineLimitReader{ ReadCloser: r, searchLimit: searchLimit, + timeLimit: timeLimit, lines: lines, buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)), } @@ -124,14 +133,52 @@ func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReade func (l *LineLimitReader) Read(p []byte) (n int, err error) { // Fill up the buffer so we can find the correct number of lines. if !l.bufFiled { - _, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit))) - if err != nil { - return 0, err + b := make([]byte, len(p)) + n, err := l.ReadCloser.Read(b) + if n > 0 { + if _, err := l.buffer.Write(b[:n]); err != nil { + return 0, err + } } - l.bufFiled = true + if err != nil { + if err != io.EOF { + return 0, err + } + + l.bufFiled = true + goto READ + } + + if l.buffer.Len() >= l.searchLimit { + l.bufFiled = true + goto READ + } + + if l.timeLimit.Nanoseconds() > 0 { + if l.lastRead.IsZero() { + l.lastRead = time.Now() + return 0, nil + } + + now := time.Now() + if n == 0 { + // We hit the limit + if l.lastRead.Add(l.timeLimit).Before(now) { + l.bufFiled = true + goto READ + } else { + return 0, nil + } + } else { + l.lastRead = now + } + } + + return 0, nil } +READ: if l.bufFiled && l.buffer.Len() != 0 { b := l.buffer.Bytes() diff --git a/command/helpers_test.go b/command/helpers_test.go index c2dd64e82..f43e701e7 100644 --- a/command/helpers_test.go +++ b/command/helpers_test.go @@ -1,9 +1,12 @@ package command import ( + "io" "io/ioutil" + "reflect" "strings" "testing" + "time" "github.com/mitchellh/cli" ) @@ -48,7 +51,7 @@ func TestHelpers_NodeID(t *testing.T) { } } -func TestHelpers_LineLimitReader(t *testing.T) { +func TestHelpers_LineLimitReader_NoTimeLimit(t *testing.T) { helloString := `hello world this @@ -114,7 +117,7 @@ test`, for i, c := range cases { in := ioutil.NopCloser(strings.NewReader(c.Input)) - limit := NewLineLimitReader(in, c.Lines, c.SearchLimit) + limit := NewLineLimitReader(in, c.Lines, c.SearchLimit, 0) outBytes, err := ioutil.ReadAll(limit) if err != nil { t.Fatalf("case %d failed: %v", i, err) @@ -126,3 +129,59 @@ test`, } } } + +type testReadCloser struct { + data chan []byte +} + +func (t *testReadCloser) Read(p []byte) (n int, err error) { + select { + case b, ok := <-t.data: + if !ok { + return 0, io.EOF + } + + return copy(p, b), nil + case <-time.After(10 * time.Millisecond): + return 0, nil + } +} + +func (t *testReadCloser) Close() error { + close(t.data) + return nil +} + +func TestHelpers_LineLimitReader_TimeLimit(t *testing.T) { + // Create the test reader + in := &testReadCloser{data: make(chan []byte)} + + // Set up the reader such that it won't hit the line/buffer limit and could + // only terminate if it hits the time limit + limit := NewLineLimitReader(in, 1000, 1000, 100*time.Millisecond) + + expected := []byte("hello world") + + resultCh := make(chan struct{}) + go func() { + outBytes, err := ioutil.ReadAll(limit) + if err != nil { + t.Fatalf("ReadAll failed: %v", err) + } + + if reflect.DeepEqual(outBytes, expected) { + close(resultCh) + return + } + }() + + // Send the data + in.data <- expected + in.Close() + + select { + case <-resultCh: + case <-time.After(1 * time.Second): + t.Fatalf("did not exit by time limit") + } +} diff --git a/command/logs.go b/command/logs.go index 282783d8d..d7073f43d 100644 --- a/command/logs.go +++ b/command/logs.go @@ -7,6 +7,7 @@ import ( "os/signal" "strings" "syscall" + "time" "github.com/hashicorp/nomad/api" ) @@ -33,7 +34,7 @@ Logs Specific Options: -job Use a random allocation from a specified job-id. - -tail + -tail Show the files contents with offsets relative to the end of the file. If no offset is given, -n is defaulted to 10. @@ -182,7 +183,7 @@ func (l *LogsCommand) Run(args []string) int { // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second) } if readErr != nil { @@ -216,6 +217,7 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, // Create a reader var r io.ReadCloser frameReader := api.NewFrameReader(frames, cancel) + frameReader.SetUnblockTime(500 * time.Millisecond) r = frameReader go func() {