handle errors when streaming logs

This commit is contained in:
Alex Dadgar
2017-09-19 07:59:05 -05:00
parent 2e29851b82
commit 46ab2d69b1
4 changed files with 72 additions and 19 deletions

View File

@@ -157,11 +157,13 @@ func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadC
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, error) {
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
if err != nil {
return nil, err
errCh <- err
return nil, errCh
}
if q == nil {
@@ -177,7 +179,8 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID), q)
if err != nil {
return nil, err
errCh <- err
return nil, errCh
}
// Create the output channel
@@ -201,6 +204,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
errCh <- err
close(frames)
return
}
@@ -214,7 +218,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
}
}()
return frames, nil
return frames, errCh
}
// Logs streams the content of a tasks logs blocking on EOF.
@@ -229,11 +233,13 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, error) {
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
if err != nil {
return nil, err
errCh <- err
return nil, errCh
}
if q == nil {
@@ -251,7 +257,8 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID), q)
if err != nil {
return nil, err
errCh <- err
return nil, errCh
}
// Create the output channel
@@ -275,6 +282,7 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
errCh <- err
close(frames)
return
}
@@ -288,12 +296,13 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
}
}()
return frames, nil
return frames, errCh
}
// FrameReader is used to convert a stream of frames into a read closer.
type FrameReader struct {
frames <-chan *StreamFrame
errCh <-chan error
cancelCh chan struct{}
closedLock sync.Mutex
@@ -309,9 +318,10 @@ type FrameReader struct {
// NewFrameReader takes a channel of frames and returns a FrameReader which
// implements io.ReadCloser
func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameReader {
func NewFrameReader(frames <-chan *StreamFrame, errCh <-chan error, cancelCh chan struct{}) *FrameReader {
return &FrameReader{
frames: frames,
errCh: errCh,
cancelCh: cancelCh,
}
}
@@ -354,6 +364,8 @@ func (f *FrameReader) Read(p []byte) (n int, err error) {
f.byteOffset = int(f.frame.Offset)
case <-unblock:
return 0, nil
case err := <-f.errCh:
return 0, err
case <-f.cancelCh:
return 0, io.EOF
}

View File

@@ -1,8 +1,10 @@
package api
import (
"fmt"
"io"
"reflect"
"strings"
"testing"
"time"
)
@@ -11,9 +13,10 @@ func TestFS_FrameReader(t *testing.T) {
t.Parallel()
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
errCh := make(chan error)
cancelCh := make(chan struct{})
r := NewFrameReader(framesCh, cancelCh)
r := NewFrameReader(framesCh, errCh, cancelCh)
// Create some frames and send them
f1 := &StreamFrame{
@@ -80,9 +83,10 @@ func TestFS_FrameReader_Unblock(t *testing.T) {
t.Parallel()
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
errCh := make(chan error)
cancelCh := make(chan struct{})
r := NewFrameReader(framesCh, cancelCh)
r := NewFrameReader(framesCh, errCh, cancelCh)
r.SetUnblockTime(10 * time.Millisecond)
// Read a little
@@ -112,3 +116,26 @@ func TestFS_FrameReader_Unblock(t *testing.T) {
case <-time.After(300 * time.Millisecond):
}
}
func TestFS_FrameReader_Error(t *testing.T) {
t.Parallel()
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
errCh := make(chan error, 1)
cancelCh := make(chan struct{})
r := NewFrameReader(framesCh, errCh, cancelCh)
r.SetUnblockTime(10 * time.Millisecond)
// Send an error
expected := fmt.Errorf("test error")
errCh <- expected
// Read a little
p := make([]byte, 12)
_, err := r.Read(p)
if err == nil || !strings.Contains(err.Error(), expected.Error()) {
t.Fatalf("bad error: %v", err)
}
}

View File

@@ -322,7 +322,12 @@ func (f *FSCommand) Run(args []string) int {
return 1
}
io.Copy(os.Stdout, r)
_, err = io.Copy(os.Stdout, r)
if err != nil {
f.Ui.Error(fmt.Sprintf("error tailing file: %s", err))
return 1
}
return 0
}
@@ -332,16 +337,18 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
path, origin string, offset, numLines int64) (io.ReadCloser, error) {
cancel := make(chan struct{})
frames, err := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil)
if err != nil {
frames, errCh := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil)
select {
case err := <-errCh:
return nil, err
default:
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, cancel)
frameReader := api.NewFrameReader(frames, errCh, cancel)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader

View File

@@ -251,7 +251,12 @@ func (l *LogsCommand) Run(args []string) int {
}
defer r.Close()
io.Copy(os.Stdout, r)
_, err = io.Copy(os.Stdout, r)
if err != nil {
l.Ui.Error(fmt.Sprintf("error following logs: %s", err))
return 1
}
return 0
}
@@ -261,16 +266,18 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation,
follow bool, task, logType, origin string, offset int64) (io.ReadCloser, error) {
cancel := make(chan struct{})
frames, err := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil)
if err != nil {
frames, errCh := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil)
select {
case err := <-errCh:
return nil, err
default:
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, cancel)
frameReader := api.NewFrameReader(frames, errCh, cancel)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader