Merge pull request #1420 from hashicorp/f-tail-lines

Implement tail -n
This commit is contained in:
Alex Dadgar
2016-07-13 15:33:34 -06:00
committed by GitHub
6 changed files with 369 additions and 60 deletions

View File

@@ -129,7 +129,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.Reader, *QueryMeta, error) {
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{})
if err != nil {
return nil, nil, err
@@ -162,7 +162,7 @@ func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int
// Cat is used to read contents of a file at the given path in an allocation
// directory
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.Reader, *QueryMeta, error) {
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{})
if err != nil {
return nil, nil, err
@@ -204,7 +204,6 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error {
// * path: path to file to stream.
// * offset: The offset to start streaming data at.
// * origin: Either "start" or "end" and defines from where the offset is applied.
// * cancel: A channel which when closed will stop streaming.
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
@@ -275,3 +274,88 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
return frames, nil, nil
}
// FrameReader is used to convert a stream of frames into a read closer.
type FrameReader struct {
frames <-chan *StreamFrame
cancelCh chan struct{}
closed bool
frame *StreamFrame
frameOffset int
// To handle printing the file events
fileEventOffset int
fileEvent []byte
byteOffset int
}
// NewFrameReader takes a channel of frames and returns a FrameReader which
// implements io.ReadCloser
func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameReader {
return &FrameReader{
frames: frames,
cancelCh: cancelCh,
}
}
// Offset returns the offset into the stream.
func (f *FrameReader) Offset() int {
return f.byteOffset
}
// Read reads the data of the incoming frames into the bytes buffer. Returns EOF
// when there are no more frames.
func (f *FrameReader) Read(p []byte) (n int, err error) {
if f.frame == nil {
frame, ok := <-f.frames
if !ok {
return 0, io.EOF
}
f.frame = frame
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
}
if f.frame.FileEvent != "" && len(f.fileEvent) == 0 {
f.fileEvent = []byte(fmt.Sprintf("\nnomad: %q\n", f.frame.FileEvent))
f.fileEventOffset = 0
}
// If there is a file event we inject it into the read stream
if l := len(f.fileEvent); l != 0 && l != f.fileEventOffset {
n = copy(p, f.fileEvent[f.fileEventOffset:])
f.fileEventOffset += n
return n, nil
}
if len(f.fileEvent) == f.fileEventOffset {
f.fileEvent = nil
f.fileEventOffset = 0
}
// Copy the data out of the frame and update our offset
n = copy(p, f.frame.Data[f.frameOffset:])
f.frameOffset += n
// Clear the frame and its offset once we have read everything
if len(f.frame.Data) == f.frameOffset {
f.frame = nil
f.frameOffset = 0
}
return n, nil
}
// Close cancels the stream of frames
func (f *FrameReader) Close() error {
if f.closed {
return nil
}
close(f.cancelCh)
f.closed = true
return nil
}

75
api/fs_test.go Normal file
View File

@@ -0,0 +1,75 @@
package api
import (
"io"
"reflect"
"testing"
)
func TestFS_FrameReader(t *testing.T) {
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
cancelCh := make(chan struct{})
r := NewFrameReader(framesCh, cancelCh)
// Create some frames and send them
f1 := &StreamFrame{
File: "foo",
Offset: 5,
Data: []byte("hello"),
}
f2 := &StreamFrame{
File: "foo",
Offset: 10,
Data: []byte(", wor"),
}
f3 := &StreamFrame{
File: "foo",
Offset: 12,
Data: []byte("ld"),
}
framesCh <- f1
framesCh <- f2
framesCh <- f3
close(framesCh)
expected := []byte("hello, world")
// Read a little
p := make([]byte, 12)
n, err := r.Read(p[:5])
if err != nil {
t.Fatalf("Read failed: %v", err)
}
if off := r.Offset(); off != n {
t.Fatalf("unexpected read bytes: got %v; wanted %v", n, off)
}
off := n
for {
n, err = r.Read(p[off:])
if err != nil {
if err == io.EOF {
break
}
t.Fatalf("Read failed: %v", err)
}
off += n
}
if !reflect.DeepEqual(p, expected) {
t.Fatalf("read %q, wanted %q", string(p), string(expected))
}
if err := r.Close(); err != nil {
t.Fatalf("Close() failed: %v", err)
}
if _, ok := <-cancelCh; ok {
t.Fatalf("Close() didn't close cancel channel")
}
if len(expected) != r.Offset() {
t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected))
}
}

View File

@@ -270,13 +270,12 @@ func (s *StreamFramer) Destroy() {
// heartbeating
func (s *StreamFramer) Run() {
s.l.Lock()
defer s.l.Unlock()
if s.running {
return
}
s.running = true
s.l.Unlock()
go s.run()
}

View File

@@ -250,13 +250,18 @@ nomad alloc-status %s`, allocID, allocID)
}
// We have a file, output it.
var r io.ReadCloser
var readErr error
if !tail {
r, _, err := client.AllocFS().Cat(alloc, path, nil)
if err != nil {
f.Ui.Error(fmt.Sprintf("Error reading file: %s", err))
return 1
if follow {
r, readErr = f.followFile(client, alloc, path, api.OriginStart, 0, -1)
} else {
r, _, readErr = client.AllocFS().Cat(alloc, path, nil)
}
if readErr != nil {
readErr = fmt.Errorf("Error reading file: %v", readErr)
}
io.Copy(os.Stdout, r)
} else {
// Parse the offset
var offset int64 = defaultTailLines * bytesToLines
@@ -268,88 +273,77 @@ nomad alloc-status %s`, allocID, allocID)
offset = numLines * bytesToLines
} else if nBytes {
offset = numBytes
} else {
numLines = defaultTailLines
}
if offset > file.Size {
offset = file.Size
}
var err error
if follow {
err = f.followFile(client, alloc, path, offset)
r, readErr = f.followFile(client, alloc, path, api.OriginEnd, offset, numLines)
} else {
// This offset needs to be relative from the front versus the follow
// is relative to the end
offset = file.Size - offset
r, _, err := client.AllocFS().ReadAt(alloc, path, offset, -1, nil)
if err != nil {
f.Ui.Error(fmt.Sprintf("Error reading file: %s", err))
return 1
r, _, readErr = client.AllocFS().ReadAt(alloc, path, offset, -1, nil)
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
}
io.Copy(os.Stdout, r)
}
if err != nil {
f.Ui.Error(fmt.Sprintf("Error tailing file: %v", err))
return 1
if readErr != nil {
readErr = fmt.Errorf("Error tailing file: %v", readErr)
}
}
defer r.Close()
if readErr != nil {
f.Ui.Error(readErr.Error())
return 1
}
io.Copy(os.Stdout, r)
return 0
}
// followFile outputs the contents of the file to stdout relative to the end of
// the file. If numLines and numBytes are both less than zero, the default
// output is defaulted to 10 lines.
// the file. If numLines does not equal -1, then tail -n behavior is used.
func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
path string, offset int64) error {
path, origin string, offset, numLines int64) (io.ReadCloser, error) {
cancel := make(chan struct{})
frames, _, err := client.AllocFS().Stream(alloc, path, api.OriginEnd, offset, cancel, nil)
frames, _, err := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil)
if err != nil {
return err
return nil, err
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
var frame *api.StreamFrame
var ok bool
for {
select {
case <-signalCh:
// End the streaming
close(cancel)
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, cancel)
r = frameReader
// Output the last offset
if frame != nil && frame.Offset > 0 {
f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frame.Offset))
}
return nil
case frame, ok = <-frames:
if !ok {
// Connection has been killed
return nil
}
if frame == nil {
panic("received nil frame; please report as a bug")
}
if frame.IsHeartbeat() {
continue
}
// Print the file event
if frame.FileEvent != "" {
f.Ui.Output(fmt.Sprintf("nomad: FileEvent %q", frame.FileEvent))
}
fmt.Print(string(frame.Data))
}
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
}
return nil
go func() {
<-signalCh
// End the streaming
r.Close()
// Output the last offset
f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset()))
}()
return r, nil
}
// Get Random Allocation ID from a known jobID. Prefer to use a running allocation,

View File

@@ -1,7 +1,9 @@
package command
import (
"bytes"
"fmt"
"io"
"strconv"
"time"
@@ -93,3 +95,77 @@ func evalFailureStatus(eval *api.Evaluation) (string, bool) {
return text, hasFailures
}
// LineLimitReader wraps another reader and provides `tail -n` like behavior.
// LineLimitReader buffers up to the searchLimit and returns `-n` number of
// lines. After those lines have been returned, LineLimitReader streams the
// underlying ReadCloser
type LineLimitReader struct {
io.ReadCloser
lines int
searchLimit int
buffer *bytes.Buffer
bufFiled bool
foundLines bool
}
// NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find
// searching backwards in the first searchLimit bytes.
func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader {
return &LineLimitReader{
ReadCloser: r,
searchLimit: searchLimit,
lines: lines,
buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)),
}
}
func (l *LineLimitReader) Read(p []byte) (n int, err error) {
// Fill up the buffer so we can find the correct number of lines.
if !l.bufFiled {
_, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit)))
if err != nil {
return 0, err
}
l.bufFiled = true
}
if l.bufFiled && l.buffer.Len() != 0 {
b := l.buffer.Bytes()
// Find the lines
if !l.foundLines {
found := 0
i := len(b) - 1
sep := byte('\n')
lastIndex := len(b) - 1
for ; found < l.lines && i >= 0; i-- {
if b[i] == sep {
lastIndex = i
// Skip the first one
if i != len(b)-1 {
found++
}
}
}
// We found them all
if found == l.lines {
// Clear the buffer until the last index
l.buffer.Next(lastIndex + 1)
}
l.foundLines = true
}
// Read from the buffer
n := copy(p, l.buffer.Next(len(p)))
return n, nil
}
// Just stream from the underlying reader now
return l.ReadCloser.Read(p)
}

View File

@@ -1,6 +1,8 @@
package command
import (
"io/ioutil"
"strings"
"testing"
"github.com/mitchellh/cli"
@@ -45,3 +47,82 @@ func TestHelpers_NodeID(t *testing.T) {
t.Fatalf("getLocalNodeID() should fail")
}
}
func TestHelpers_LineLimitReader(t *testing.T) {
helloString := `hello
world
this
is
a
test`
noLines := "jskdfhjasdhfjkajkldsfdlsjkahfkjdsafa"
cases := []struct {
Input string
Output string
Lines int
SearchLimit int
}{
{
Input: helloString,
Output: helloString,
Lines: 6,
SearchLimit: 1000,
},
{
Input: helloString,
Output: `world
this
is
a
test`,
Lines: 5,
SearchLimit: 1000,
},
{
Input: helloString,
Output: `test`,
Lines: 1,
SearchLimit: 1000,
},
{
Input: helloString,
Output: "",
Lines: 0,
SearchLimit: 1000,
},
{
Input: helloString,
Output: helloString,
Lines: 6,
SearchLimit: 1, // Exceed the limit
},
{
Input: noLines,
Output: noLines,
Lines: 10,
SearchLimit: 1000,
},
{
Input: noLines,
Output: noLines,
Lines: 10,
SearchLimit: 2,
},
}
for i, c := range cases {
in := ioutil.NopCloser(strings.NewReader(c.Input))
limit := NewLineLimitReader(in, c.Lines, c.SearchLimit)
outBytes, err := ioutil.ReadAll(limit)
if err != nil {
t.Fatalf("case %d failed: %v", i, err)
}
out := string(outBytes)
if out != c.Output {
t.Fatalf("case %d: got %q; want %q", i, out, c.Output)
}
}
}