diff --git a/api/fs.go b/api/fs.go index eee0ba362..36458c694 100644 --- a/api/fs.go +++ b/api/fs.go @@ -129,7 +129,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF // ReadAt is used to read bytes at a given offset until limit at the given path // in an allocation directory. If limit is <= 0, there is no limit. -func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.Reader, *QueryMeta, error) { +func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) { node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{}) if err != nil { return nil, nil, err @@ -162,7 +162,7 @@ func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int // Cat is used to read contents of a file at the given path in an allocation // directory -func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.Reader, *QueryMeta, error) { +func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, *QueryMeta, error) { node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{}) if err != nil { return nil, nil, err @@ -204,7 +204,6 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error { // * path: path to file to stream. // * offset: The offset to start streaming data at. // * origin: Either "start" or "end" and defines from where the offset is applied. -// * cancel: A channel which when closed will stop streaming. // // The return value is a channel that will emit StreamFrames as they are read. func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, @@ -275,3 +274,88 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, return frames, nil, nil } + +// FrameReader is used to convert a stream of frames into a read closer. +type FrameReader struct { + frames <-chan *StreamFrame + cancelCh chan struct{} + closed bool + + frame *StreamFrame + frameOffset int + + // To handle printing the file events + fileEventOffset int + fileEvent []byte + + byteOffset int +} + +// NewFrameReader takes a channel of frames and returns a FrameReader which +// implements io.ReadCloser +func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameReader { + return &FrameReader{ + frames: frames, + cancelCh: cancelCh, + } +} + +// Offset returns the offset into the stream. +func (f *FrameReader) Offset() int { + return f.byteOffset +} + +// Read reads the data of the incoming frames into the bytes buffer. Returns EOF +// when there are no more frames. +func (f *FrameReader) Read(p []byte) (n int, err error) { + if f.frame == nil { + frame, ok := <-f.frames + if !ok { + return 0, io.EOF + } + f.frame = frame + + // Store the total offset into the file + f.byteOffset = int(f.frame.Offset) + } + + if f.frame.FileEvent != "" && len(f.fileEvent) == 0 { + f.fileEvent = []byte(fmt.Sprintf("\nnomad: %q\n", f.frame.FileEvent)) + f.fileEventOffset = 0 + } + + // If there is a file event we inject it into the read stream + if l := len(f.fileEvent); l != 0 && l != f.fileEventOffset { + n = copy(p, f.fileEvent[f.fileEventOffset:]) + f.fileEventOffset += n + return n, nil + } + + if len(f.fileEvent) == f.fileEventOffset { + f.fileEvent = nil + f.fileEventOffset = 0 + } + + // Copy the data out of the frame and update our offset + n = copy(p, f.frame.Data[f.frameOffset:]) + f.frameOffset += n + + // Clear the frame and its offset once we have read everything + if len(f.frame.Data) == f.frameOffset { + f.frame = nil + f.frameOffset = 0 + } + + return n, nil +} + +// Close cancels the stream of frames +func (f *FrameReader) Close() error { + if f.closed { + return nil + } + + close(f.cancelCh) + f.closed = true + return nil +} diff --git a/api/fs_test.go b/api/fs_test.go new file mode 100644 index 000000000..25107d9c8 --- /dev/null +++ b/api/fs_test.go @@ -0,0 +1,75 @@ +package api + +import ( + "io" + "reflect" + "testing" +) + +func TestFS_FrameReader(t *testing.T) { + // Create a channel of the frames and a cancel channel + framesCh := make(chan *StreamFrame, 3) + cancelCh := make(chan struct{}) + + r := NewFrameReader(framesCh, cancelCh) + + // Create some frames and send them + f1 := &StreamFrame{ + File: "foo", + Offset: 5, + Data: []byte("hello"), + } + f2 := &StreamFrame{ + File: "foo", + Offset: 10, + Data: []byte(", wor"), + } + f3 := &StreamFrame{ + File: "foo", + Offset: 12, + Data: []byte("ld"), + } + framesCh <- f1 + framesCh <- f2 + framesCh <- f3 + close(framesCh) + + expected := []byte("hello, world") + + // Read a little + p := make([]byte, 12) + + n, err := r.Read(p[:5]) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + if off := r.Offset(); off != n { + t.Fatalf("unexpected read bytes: got %v; wanted %v", n, off) + } + + off := n + for { + n, err = r.Read(p[off:]) + if err != nil { + if err == io.EOF { + break + } + t.Fatalf("Read failed: %v", err) + } + off += n + } + + if !reflect.DeepEqual(p, expected) { + t.Fatalf("read %q, wanted %q", string(p), string(expected)) + } + + if err := r.Close(); err != nil { + t.Fatalf("Close() failed: %v", err) + } + if _, ok := <-cancelCh; ok { + t.Fatalf("Close() didn't close cancel channel") + } + if len(expected) != r.Offset() { + t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected)) + } +} diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index ce578aa08..d7a330ca6 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -270,13 +270,12 @@ func (s *StreamFramer) Destroy() { // heartbeating func (s *StreamFramer) Run() { s.l.Lock() + defer s.l.Unlock() if s.running { return } s.running = true - s.l.Unlock() - go s.run() } diff --git a/command/fs.go b/command/fs.go index 35a7ea58d..154f34e2f 100644 --- a/command/fs.go +++ b/command/fs.go @@ -250,13 +250,18 @@ nomad alloc-status %s`, allocID, allocID) } // We have a file, output it. + var r io.ReadCloser + var readErr error if !tail { - r, _, err := client.AllocFS().Cat(alloc, path, nil) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error reading file: %s", err)) - return 1 + if follow { + r, readErr = f.followFile(client, alloc, path, api.OriginStart, 0, -1) + } else { + r, _, readErr = client.AllocFS().Cat(alloc, path, nil) + } + + if readErr != nil { + readErr = fmt.Errorf("Error reading file: %v", readErr) } - io.Copy(os.Stdout, r) } else { // Parse the offset var offset int64 = defaultTailLines * bytesToLines @@ -268,88 +273,77 @@ nomad alloc-status %s`, allocID, allocID) offset = numLines * bytesToLines } else if nBytes { offset = numBytes + } else { + numLines = defaultTailLines } if offset > file.Size { offset = file.Size } - var err error if follow { - err = f.followFile(client, alloc, path, offset) + r, readErr = f.followFile(client, alloc, path, api.OriginEnd, offset, numLines) } else { // This offset needs to be relative from the front versus the follow // is relative to the end offset = file.Size - offset - r, _, err := client.AllocFS().ReadAt(alloc, path, offset, -1, nil) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error reading file: %s", err)) - return 1 + r, _, readErr = client.AllocFS().ReadAt(alloc, path, offset, -1, nil) + + // If numLines is set, wrap the reader + if numLines != -1 { + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) } - io.Copy(os.Stdout, r) } - if err != nil { - f.Ui.Error(fmt.Sprintf("Error tailing file: %v", err)) - return 1 + if readErr != nil { + readErr = fmt.Errorf("Error tailing file: %v", readErr) } } + defer r.Close() + if readErr != nil { + f.Ui.Error(readErr.Error()) + return 1 + } + + io.Copy(os.Stdout, r) return 0 } // followFile outputs the contents of the file to stdout relative to the end of -// the file. If numLines and numBytes are both less than zero, the default -// output is defaulted to 10 lines. +// the file. If numLines does not equal -1, then tail -n behavior is used. func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, - path string, offset int64) error { + path, origin string, offset, numLines int64) (io.ReadCloser, error) { cancel := make(chan struct{}) - frames, _, err := client.AllocFS().Stream(alloc, path, api.OriginEnd, offset, cancel, nil) + frames, _, err := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil) if err != nil { - return err + return nil, err } signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - var frame *api.StreamFrame - var ok bool - for { - select { - case <-signalCh: - // End the streaming - close(cancel) + // Create a reader + var r io.ReadCloser + frameReader := api.NewFrameReader(frames, cancel) + r = frameReader - // Output the last offset - if frame != nil && frame.Offset > 0 { - f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frame.Offset)) - } - - return nil - case frame, ok = <-frames: - if !ok { - // Connection has been killed - return nil - } - - if frame == nil { - panic("received nil frame; please report as a bug") - } - - if frame.IsHeartbeat() { - continue - } - - // Print the file event - if frame.FileEvent != "" { - f.Ui.Output(fmt.Sprintf("nomad: FileEvent %q", frame.FileEvent)) - } - - fmt.Print(string(frame.Data)) - } + // If numLines is set, wrap the reader + if numLines != -1 { + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) } - return nil + go func() { + <-signalCh + + // End the streaming + r.Close() + + // Output the last offset + f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset())) + }() + + return r, nil } // Get Random Allocation ID from a known jobID. Prefer to use a running allocation, diff --git a/command/helpers.go b/command/helpers.go index 29183b4cb..36f3455cc 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -1,7 +1,9 @@ package command import ( + "bytes" "fmt" + "io" "strconv" "time" @@ -93,3 +95,77 @@ func evalFailureStatus(eval *api.Evaluation) (string, bool) { return text, hasFailures } + +// LineLimitReader wraps another reader and provides `tail -n` like behavior. +// LineLimitReader buffers up to the searchLimit and returns `-n` number of +// lines. After those lines have been returned, LineLimitReader streams the +// underlying ReadCloser +type LineLimitReader struct { + io.ReadCloser + lines int + searchLimit int + + buffer *bytes.Buffer + bufFiled bool + foundLines bool +} + +// NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find +// searching backwards in the first searchLimit bytes. +func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader { + return &LineLimitReader{ + ReadCloser: r, + searchLimit: searchLimit, + lines: lines, + buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)), + } +} + +func (l *LineLimitReader) Read(p []byte) (n int, err error) { + // Fill up the buffer so we can find the correct number of lines. + if !l.bufFiled { + _, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit))) + if err != nil { + return 0, err + } + + l.bufFiled = true + } + + if l.bufFiled && l.buffer.Len() != 0 { + b := l.buffer.Bytes() + + // Find the lines + if !l.foundLines { + found := 0 + i := len(b) - 1 + sep := byte('\n') + lastIndex := len(b) - 1 + for ; found < l.lines && i >= 0; i-- { + if b[i] == sep { + lastIndex = i + + // Skip the first one + if i != len(b)-1 { + found++ + } + } + } + + // We found them all + if found == l.lines { + // Clear the buffer until the last index + l.buffer.Next(lastIndex + 1) + } + + l.foundLines = true + } + + // Read from the buffer + n := copy(p, l.buffer.Next(len(p))) + return n, nil + } + + // Just stream from the underlying reader now + return l.ReadCloser.Read(p) +} diff --git a/command/helpers_test.go b/command/helpers_test.go index 276719c33..c2dd64e82 100644 --- a/command/helpers_test.go +++ b/command/helpers_test.go @@ -1,6 +1,8 @@ package command import ( + "io/ioutil" + "strings" "testing" "github.com/mitchellh/cli" @@ -45,3 +47,82 @@ func TestHelpers_NodeID(t *testing.T) { t.Fatalf("getLocalNodeID() should fail") } } + +func TestHelpers_LineLimitReader(t *testing.T) { + helloString := `hello +world +this +is +a +test` + + noLines := "jskdfhjasdhfjkajkldsfdlsjkahfkjdsafa" + + cases := []struct { + Input string + Output string + Lines int + SearchLimit int + }{ + { + Input: helloString, + Output: helloString, + Lines: 6, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: `world +this +is +a +test`, + Lines: 5, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: `test`, + Lines: 1, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: "", + Lines: 0, + SearchLimit: 1000, + }, + { + Input: helloString, + Output: helloString, + Lines: 6, + SearchLimit: 1, // Exceed the limit + }, + { + Input: noLines, + Output: noLines, + Lines: 10, + SearchLimit: 1000, + }, + { + Input: noLines, + Output: noLines, + Lines: 10, + SearchLimit: 2, + }, + } + + for i, c := range cases { + in := ioutil.NopCloser(strings.NewReader(c.Input)) + limit := NewLineLimitReader(in, c.Lines, c.SearchLimit) + outBytes, err := ioutil.ReadAll(limit) + if err != nil { + t.Fatalf("case %d failed: %v", i, err) + } + + out := string(outBytes) + if out != c.Output { + t.Fatalf("case %d: got %q; want %q", i, out, c.Output) + } + } +}