From 25dd45c763f8885e33c0129f062fb60b85837bd5 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 12 Jul 2016 17:29:18 -0600 Subject: [PATCH 1/4] frames to reader --- api/fs.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++- command/fs.go | 43 +++++++-------------------- 2 files changed, 89 insertions(+), 34 deletions(-) diff --git a/api/fs.go b/api/fs.go index eee0ba362..12756eb96 100644 --- a/api/fs.go +++ b/api/fs.go @@ -204,7 +204,6 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error { // * path: path to file to stream. // * offset: The offset to start streaming data at. // * origin: Either "start" or "end" and defines from where the offset is applied. -// * cancel: A channel which when closed will stop streaming. // // The return value is a channel that will emit StreamFrames as they are read. func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, @@ -275,3 +274,82 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, return frames, nil, nil } + +// FrameReader is used to convert a stream of frames into a read closer. +type FrameReader struct { + frames <-chan *StreamFrame + cancelCh chan struct{} + + frame *StreamFrame + frameOffset int + + // To handle printing the file events + fileEventOffset int + fileEvent []byte + + byteOffset int +} + +// NewFrameReader takes a channel of frames and returns a FrameReader which +// implements io.ReadCloser +func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameReader { + return &FrameReader{ + frames: frames, + cancelCh: cancelCh, + } +} + +// Offset returns the offset into the stream. +func (f *FrameReader) Offset() int { + return f.byteOffset +} + +// Read reads the data of the incoming frames into the bytes buffer. Returns EOF +// when there are no more frames. +func (f *FrameReader) Read(p []byte) (n int, err error) { + if f.frame == nil { + frame, ok := <-f.frames + if !ok { + return 0, io.EOF + } + f.frame = frame + } + + if f.frame.FileEvent != "" && len(f.fileEvent) == 0 { + f.fileEvent = []byte(fmt.Sprintf("\nnomad: %q\n", f.frame.FileEvent)) + f.fileEventOffset = 0 + } + + // If there is a file event we inject it into the read stream + if l := len(f.fileEvent); l != 0 && l != f.fileEventOffset { + n = copy(p, f.fileEvent[f.fileEventOffset:]) + f.fileEventOffset += n + return n, nil + } + + if len(f.fileEvent) == f.fileEventOffset { + f.fileEvent = nil + f.fileEventOffset = 0 + } + + // Copy the data out of the frame and update our offset + n = copy(p, f.frame.Data[f.frameOffset:]) + f.frameOffset += n + + // Store the total offset into the file + f.byteOffset = int(f.frame.Offset) + f.frameOffset + + // Clear the frame and its offset once we have read everything + if len(f.frame.Data) == f.frameOffset { + f.frame = nil + f.frameOffset = 0 + } + + return n, nil +} + +// Close cancels the stream of frames +func (f *FrameReader) Close() error { + close(f.cancelCh) + return nil +} diff --git a/command/fs.go b/command/fs.go index 35a7ea58d..9dae2dc37 100644 --- a/command/fs.go +++ b/command/fs.go @@ -312,43 +312,20 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - var frame *api.StreamFrame - var ok bool - for { - select { - case <-signalCh: - // End the streaming - close(cancel) + // Create a reader + r := api.NewFrameReader(frames, cancel) - // Output the last offset - if frame != nil && frame.Offset > 0 { - f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frame.Offset)) - } + go func() { + <-signalCh - return nil - case frame, ok = <-frames: - if !ok { - // Connection has been killed - return nil - } + // End the streaming + r.Close() - if frame == nil { - panic("received nil frame; please report as a bug") - } - - if frame.IsHeartbeat() { - continue - } - - // Print the file event - if frame.FileEvent != "" { - f.Ui.Output(fmt.Sprintf("nomad: FileEvent %q", frame.FileEvent)) - } - - fmt.Print(string(frame.Data)) - } - } + // Output the last offset + f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", r.Offset())) + }() + io.Copy(os.Stdout, r) return nil } From 991220b5c79d1fd384f2a5848770b5795b3cfd91 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 13 Jul 2016 13:23:33 -0600 Subject: [PATCH 2/4] implement -n for tail --- api/fs.go | 2 +- command/agent/fs_endpoint.go | 3 +- command/fs.go | 27 +++++++++--- command/helpers.go | 76 +++++++++++++++++++++++++++++++++ command/helpers_test.go | 81 ++++++++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+), 9 deletions(-) diff --git a/api/fs.go b/api/fs.go index 12756eb96..d11e39d69 100644 --- a/api/fs.go +++ b/api/fs.go @@ -129,7 +129,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF // ReadAt is used to read bytes at a given offset until limit at the given path // in an allocation directory. If limit is <= 0, there is no limit. -func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.Reader, *QueryMeta, error) { +func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) { node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{}) if err != nil { return nil, nil, err diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index ce578aa08..d7a330ca6 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -270,13 +270,12 @@ func (s *StreamFramer) Destroy() { // heartbeating func (s *StreamFramer) Run() { s.l.Lock() + defer s.l.Unlock() if s.running { return } s.running = true - s.l.Unlock() - go s.run() } diff --git a/command/fs.go b/command/fs.go index 9dae2dc37..e39ba6bb3 100644 --- a/command/fs.go +++ b/command/fs.go @@ -268,6 +268,8 @@ nomad alloc-status %s`, allocID, allocID) offset = numLines * bytesToLines } else if nBytes { offset = numBytes + } else { + numLines = defaultTailLines } if offset > file.Size { @@ -276,7 +278,7 @@ nomad alloc-status %s`, allocID, allocID) var err error if follow { - err = f.followFile(client, alloc, path, offset) + err = f.followFile(client, alloc, path, offset, numLines) } else { // This offset needs to be relative from the front versus the follow // is relative to the end @@ -286,7 +288,14 @@ nomad alloc-status %s`, allocID, allocID) f.Ui.Error(fmt.Sprintf("Error reading file: %s", err)) return 1 } + + // If numLines is set, wrap the reader + if numLines != -1 { + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + } + io.Copy(os.Stdout, r) + r.Close() } if err != nil { @@ -299,10 +308,9 @@ nomad alloc-status %s`, allocID, allocID) } // followFile outputs the contents of the file to stdout relative to the end of -// the file. If numLines and numBytes are both less than zero, the default -// output is defaulted to 10 lines. +// the file. If numLines does not equal -1, then tail -n behavior is used. func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, - path string, offset int64) error { + path string, offset, numLines int64) error { cancel := make(chan struct{}) frames, _, err := client.AllocFS().Stream(alloc, path, api.OriginEnd, offset, cancel, nil) @@ -313,7 +321,14 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) // Create a reader - r := api.NewFrameReader(frames, cancel) + var r io.ReadCloser + frameReader := api.NewFrameReader(frames, cancel) + r = frameReader + + // If numLines is set, wrap the reader + if numLines != -1 { + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + } go func() { <-signalCh @@ -322,7 +337,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, r.Close() // Output the last offset - f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", r.Offset())) + f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset())) }() io.Copy(os.Stdout, r) diff --git a/command/helpers.go b/command/helpers.go index 29183b4cb..36f3455cc 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -1,7 +1,9 @@ package command import ( + "bytes" "fmt" + "io" "strconv" "time" @@ -93,3 +95,77 @@ func evalFailureStatus(eval *api.Evaluation) (string, bool) { return text, hasFailures } + +// LineLimitReader wraps another reader and provides `tail -n` like behavior. +// LineLimitReader buffers up to the searchLimit and returns `-n` number of +// lines. After those lines have been returned, LineLimitReader streams the +// underlying ReadCloser +type LineLimitReader struct { + io.ReadCloser + lines int + searchLimit int + + buffer *bytes.Buffer + bufFiled bool + foundLines bool +} + +// NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find +// searching backwards in the first searchLimit bytes. +func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader { + return &LineLimitReader{ + ReadCloser: r, + searchLimit: searchLimit, + lines: lines, + buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)), + } +} + +func (l *LineLimitReader) Read(p []byte) (n int, err error) { + // Fill up the buffer so we can find the correct number of lines. + if !l.bufFiled { + _, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit))) + if err != nil { + return 0, err + } + + l.bufFiled = true + } + + if l.bufFiled && l.buffer.Len() != 0 { + b := l.buffer.Bytes() + + // Find the lines + if !l.foundLines { + found := 0 + i := len(b) - 1 + sep := byte('\n') + lastIndex := len(b) - 1 + for ; found < l.lines && i >= 0; i-- { + if b[i] == sep { + lastIndex = i + + // Skip the first one + if i != len(b)-1 { + found++ + } + } + } + + // We found them all + if found == l.lines { + // Clear the buffer until the last index + l.buffer.Next(lastIndex + 1) + } + + l.foundLines = true + } + + // Read from the buffer + n := copy(p, l.buffer.Next(len(p))) + return n, nil + } + + // Just stream from the underlying reader now + return l.ReadCloser.Read(p) +} diff --git a/command/helpers_test.go b/command/helpers_test.go index 276719c33..c2dd64e82 100644 --- a/command/helpers_test.go +++ b/command/helpers_test.go @@ -1,6 +1,8 @@ package command import ( + "io/ioutil" + "strings" "testing" "github.com/mitchellh/cli" @@ -45,3 +47,82 @@ func TestHelpers_NodeID(t *testing.T) { t.Fatalf("getLocalNodeID() should fail") } } + +func TestHelpers_LineLimitReader(t *testing.T) { + helloString := `hello +world +this +is +a +test` + + noLines := "jskdfhjasdhfjkajkldsfdlsjkahfkjdsafa" + + cases := []struct { + Input string + Output string + Lines int + SearchLimit int + }{ + { + Input: helloString, + Output: helloString, + Lines: 6, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: `world +this +is +a +test`, + Lines: 5, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: `test`, + Lines: 1, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: "", + Lines: 0, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: helloString, + Lines: 6, + SearchLimit: 1, // Exceed the limit + }, + { + Input: noLines, + Output: noLines, + Lines: 10, + SearchLimit: 1000, + }, + { + Input: noLines, + Output: noLines, + Lines: 10, + SearchLimit: 2, + }, + } + + for i, c := range cases { + in := ioutil.NopCloser(strings.NewReader(c.Input)) + limit := NewLineLimitReader(in, c.Lines, c.SearchLimit) + outBytes, err := ioutil.ReadAll(limit) + if err != nil { + t.Fatalf("case %d failed: %v", i, err) + } + + out := string(outBytes) + if out != c.Output { + t.Fatalf("case %d: got %q; want %q", i, out, c.Output) + } + } +} From 398f8886624e603a8dc67d1994c00285c0c7218f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 13 Jul 2016 14:37:03 -0600 Subject: [PATCH 3/4] Add FrameReader test --- api/fs_test.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 api/fs_test.go diff --git a/api/fs_test.go b/api/fs_test.go new file mode 100644 index 000000000..a70951656 --- /dev/null +++ b/api/fs_test.go @@ -0,0 +1,75 @@ +package api + +import ( + "io" + "reflect" + "testing" +) + +func TestFS_FrameReader(t *testing.T) { + // Create a channel of the frames and a cancel channel + framesCh := make(chan *StreamFrame, 3) + cancelCh := make(chan struct{}) + + r := NewFrameReader(framesCh, cancelCh) + + // Create some frames and send them + f1 := &StreamFrame{ + File: "foo", + Offset: 0, + Data: []byte("hello"), + } + f2 := &StreamFrame{ + File: "foo", + Offset: 5, + Data: []byte(", wor"), + } + f3 := &StreamFrame{ + File: "foo", + Offset: 10, + Data: []byte("ld"), + } + framesCh <- f1 + framesCh <- f2 + framesCh <- f3 + close(framesCh) + + expected := []byte("hello, world") + + // Read a little + p := make([]byte, 12) + + n, err := r.Read(p[:5]) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + if off := r.Offset(); off != n { + t.Fatalf("unexpected read bytes: got %v; wanted %v", n, off) + } + + off := n + for { + n, err = r.Read(p[off:]) + if err != nil { + if err == io.EOF { + break + } + t.Fatalf("Read failed: %v", err) + } + off += n + } + + if !reflect.DeepEqual(p, expected) { + t.Fatalf("read %q, wanted %q", string(p), string(expected)) + } + + if err := r.Close(); err != nil { + t.Fatalf("Close() failed: %v", err) + } + if _, ok := <-cancelCh; ok { + t.Fatalf("Close() didn't close cancel channel") + } + if len(expected) != r.Offset() { + t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected)) + } +} From a1b46b3a6dfeb5ecd9ba8e7be65808b9cf225e2b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 13 Jul 2016 15:33:17 -0600 Subject: [PATCH 4/4] Allow following of files when cating and fix offsets --- api/fs.go | 14 ++++++++++---- api/fs_test.go | 6 +++--- command/fs.go | 48 +++++++++++++++++++++++++----------------------- 3 files changed, 38 insertions(+), 30 deletions(-) diff --git a/api/fs.go b/api/fs.go index d11e39d69..36458c694 100644 --- a/api/fs.go +++ b/api/fs.go @@ -162,7 +162,7 @@ func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int // Cat is used to read contents of a file at the given path in an allocation // directory -func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.Reader, *QueryMeta, error) { +func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) { node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{}) if err != nil { return nil, nil, err @@ -279,6 +279,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, type FrameReader struct { frames <-chan *StreamFrame cancelCh chan struct{} + closed bool frame *StreamFrame frameOffset int @@ -313,6 +314,9 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { return 0, io.EOF } f.frame = frame + + // Store the total offset into the file + f.byteOffset = int(f.frame.Offset) } if f.frame.FileEvent != "" && len(f.fileEvent) == 0 { @@ -336,9 +340,6 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { n = copy(p, f.frame.Data[f.frameOffset:]) f.frameOffset += n - // Store the total offset into the file - f.byteOffset = int(f.frame.Offset) + f.frameOffset - // Clear the frame and its offset once we have read everything if len(f.frame.Data) == f.frameOffset { f.frame = nil @@ -350,6 +351,11 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { // Close cancels the stream of frames func (f *FrameReader) Close() error { + if f.closed { + return nil + } + close(f.cancelCh) + f.closed = true return nil } diff --git a/api/fs_test.go b/api/fs_test.go index a70951656..25107d9c8 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -16,17 +16,17 @@ func TestFS_FrameReader(t *testing.T) { // Create some frames and send them f1 := &StreamFrame{ File: "foo", - Offset: 0, + Offset: 5, Data: []byte("hello"), } f2 := &StreamFrame{ File: "foo", - Offset: 5, + Offset: 10, Data: []byte(", wor"), } f3 := &StreamFrame{ File: "foo", - Offset: 10, + Offset: 12, Data: []byte("ld"), } framesCh <- f1 diff --git a/command/fs.go b/command/fs.go index e39ba6bb3..154f34e2f 100644 --- a/command/fs.go +++ b/command/fs.go @@ -250,13 +250,18 @@ nomad alloc-status %s`, allocID, allocID) } // We have a file, output it. + var r io.ReadCloser + var readErr error if !tail { - r, _, err := client.AllocFS().Cat(alloc, path, nil) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error reading file: %s", err)) - return 1 + if follow { + r, readErr = f.followFile(client, alloc, path, api.OriginStart, 0, -1) + } else { + r, _, readErr = client.AllocFS().Cat(alloc, path, nil) + } + + if readErr != nil { + readErr = fmt.Errorf("Error reading file: %v", readErr) } - io.Copy(os.Stdout, r) } else { // Parse the offset var offset int64 = defaultTailLines * bytesToLines @@ -276,46 +281,44 @@ nomad alloc-status %s`, allocID, allocID) offset = file.Size } - var err error if follow { - err = f.followFile(client, alloc, path, offset, numLines) + r, readErr = f.followFile(client, alloc, path, api.OriginEnd, offset, numLines) } else { // This offset needs to be relative from the front versus the follow // is relative to the end offset = file.Size - offset - r, _, err := client.AllocFS().ReadAt(alloc, path, offset, -1, nil) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error reading file: %s", err)) - return 1 - } + r, _, readErr = client.AllocFS().ReadAt(alloc, path, offset, -1, nil) // If numLines is set, wrap the reader if numLines != -1 { r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) } - - io.Copy(os.Stdout, r) - r.Close() } - if err != nil { - f.Ui.Error(fmt.Sprintf("Error tailing file: %v", err)) - return 1 + if readErr != nil { + readErr = fmt.Errorf("Error tailing file: %v", readErr) } } + defer r.Close() + if readErr != nil { + f.Ui.Error(readErr.Error()) + return 1 + } + + io.Copy(os.Stdout, r) return 0 } // followFile outputs the contents of the file to stdout relative to the end of // the file. If numLines does not equal -1, then tail -n behavior is used. func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, - path string, offset, numLines int64) error { + path, origin string, offset, numLines int64) (io.ReadCloser, error) { cancel := make(chan struct{}) - frames, _, err := client.AllocFS().Stream(alloc, path, api.OriginEnd, offset, cancel, nil) + frames, _, err := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil) if err != nil { - return err + return nil, err } signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) @@ -340,8 +343,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset())) }() - io.Copy(os.Stdout, r) - return nil + return r, nil } // Get Random Allocation ID from a known jobID. Prefer to use a running allocation,