From a1b46b3a6dfeb5ecd9ba8e7be65808b9cf225e2b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 13 Jul 2016 15:33:17 -0600 Subject: [PATCH] Allow following of files when cating and fix offsets --- api/fs.go | 14 ++++++++++---- api/fs_test.go | 6 +++--- command/fs.go | 48 +++++++++++++++++++++++++----------------------- 3 files changed, 38 insertions(+), 30 deletions(-) diff --git a/api/fs.go b/api/fs.go index d11e39d69..36458c694 100644 --- a/api/fs.go +++ b/api/fs.go @@ -162,7 +162,7 @@ func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int // Cat is used to read contents of a file at the given path in an allocation // directory -func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.Reader, *QueryMeta, error) { +func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) { node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{}) if err != nil { return nil, nil, err @@ -279,6 +279,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, type FrameReader struct { frames <-chan *StreamFrame cancelCh chan struct{} + closed bool frame *StreamFrame frameOffset int @@ -313,6 +314,9 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { return 0, io.EOF } f.frame = frame + + // Store the total offset into the file + f.byteOffset = int(f.frame.Offset) } if f.frame.FileEvent != "" && len(f.fileEvent) == 0 { @@ -336,9 +340,6 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { n = copy(p, f.frame.Data[f.frameOffset:]) f.frameOffset += n - // Store the total offset into the file - f.byteOffset = int(f.frame.Offset) + f.frameOffset - // Clear the frame and its offset once we have read everything if len(f.frame.Data) == f.frameOffset { f.frame = nil @@ -350,6 +351,11 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { // Close cancels the stream of frames func (f *FrameReader) Close() error { + if f.closed { + return nil + } + close(f.cancelCh) + f.closed = true return nil } diff --git a/api/fs_test.go b/api/fs_test.go index a70951656..25107d9c8 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -16,17 +16,17 @@ func TestFS_FrameReader(t *testing.T) { // Create some frames and send them f1 := &StreamFrame{ File: "foo", - Offset: 0, + Offset: 5, Data: []byte("hello"), } f2 := &StreamFrame{ File: "foo", - Offset: 5, + Offset: 10, Data: []byte(", wor"), } f3 := &StreamFrame{ File: "foo", - Offset: 10, + Offset: 12, Data: []byte("ld"), } framesCh <- f1 diff --git a/command/fs.go b/command/fs.go index e39ba6bb3..154f34e2f 100644 --- a/command/fs.go +++ b/command/fs.go @@ -250,13 +250,18 @@ nomad alloc-status %s`, allocID, allocID) } // We have a file, output it. + var r io.ReadCloser + var readErr error if !tail { - r, _, err := client.AllocFS().Cat(alloc, path, nil) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error reading file: %s", err)) - return 1 + if follow { + r, readErr = f.followFile(client, alloc, path, api.OriginStart, 0, -1) + } else { + r, _, readErr = client.AllocFS().Cat(alloc, path, nil) + } + + if readErr != nil { + readErr = fmt.Errorf("Error reading file: %v", readErr) } - io.Copy(os.Stdout, r) } else { // Parse the offset var offset int64 = defaultTailLines * bytesToLines @@ -276,46 +281,44 @@ nomad alloc-status %s`, allocID, allocID) offset = file.Size } - var err error if follow { - err = f.followFile(client, alloc, path, offset, numLines) + r, readErr = f.followFile(client, alloc, path, api.OriginEnd, offset, numLines) } else { // This offset needs to be relative from the front versus the follow // is relative to the end offset = file.Size - offset - r, _, err := client.AllocFS().ReadAt(alloc, path, offset, -1, nil) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error reading file: %s", err)) - return 1 - } + r, _, readErr = client.AllocFS().ReadAt(alloc, path, offset, -1, nil) // If numLines is set, wrap the reader if numLines != -1 { r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) } - - io.Copy(os.Stdout, r) - r.Close() } - if err != nil { - f.Ui.Error(fmt.Sprintf("Error tailing file: %v", err)) - return 1 + if readErr != nil { + readErr = fmt.Errorf("Error tailing file: %v", readErr) } } + defer r.Close() + if readErr != nil { + f.Ui.Error(readErr.Error()) + return 1 + } + + io.Copy(os.Stdout, r) return 0 } // followFile outputs the contents of the file to stdout relative to the end of // the file. If numLines does not equal -1, then tail -n behavior is used. func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, - path string, offset, numLines int64) error { + path, origin string, offset, numLines int64) (io.ReadCloser, error) { cancel := make(chan struct{}) - frames, _, err := client.AllocFS().Stream(alloc, path, api.OriginEnd, offset, cancel, nil) + frames, _, err := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil) if err != nil { - return err + return nil, err } signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) @@ -340,8 +343,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset())) }() - io.Copy(os.Stdout, r) - return nil + return r, nil } // Get Random Allocation ID from a known jobID. Prefer to use a running allocation,