diff --git a/api/fs.go b/api/fs.go index 45840fb0e..c412db541 100644 --- a/api/fs.go +++ b/api/fs.go @@ -157,11 +157,13 @@ func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadC // // The return value is a channel that will emit StreamFrames as they are read. func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, - cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, error) { + cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { + errCh := make(chan error, 1) nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q) if err != nil { - return nil, err + errCh <- err + return nil, errCh } if q == nil { @@ -177,7 +179,8 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID), q) if err != nil { - return nil, err + errCh <- err + return nil, errCh } // Create the output channel @@ -201,6 +204,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, // Decode the next frame var frame StreamFrame if err := dec.Decode(&frame); err != nil { + errCh <- err close(frames) return } @@ -214,7 +218,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, } }() - return frames, nil + return frames, errCh } // Logs streams the content of a tasks logs blocking on EOF. @@ -229,11 +233,13 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, // // The return value is a channel that will emit StreamFrames as they are read. func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string, - offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, error) { + offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { + errCh := make(chan error, 1) nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q) if err != nil { - return nil, err + errCh <- err + return nil, errCh } if q == nil { @@ -251,7 +257,8 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID), q) if err != nil { - return nil, err + errCh <- err + return nil, errCh } // Create the output channel @@ -275,6 +282,7 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str // Decode the next frame var frame StreamFrame if err := dec.Decode(&frame); err != nil { + errCh <- err close(frames) return } @@ -288,12 +296,13 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str } }() - return frames, nil + return frames, errCh } // FrameReader is used to convert a stream of frames into a read closer. type FrameReader struct { frames <-chan *StreamFrame + errCh <-chan error cancelCh chan struct{} closedLock sync.Mutex @@ -309,9 +318,10 @@ type FrameReader struct { // NewFrameReader takes a channel of frames and returns a FrameReader which // implements io.ReadCloser -func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameReader { +func NewFrameReader(frames <-chan *StreamFrame, errCh <-chan error, cancelCh chan struct{}) *FrameReader { return &FrameReader{ frames: frames, + errCh: errCh, cancelCh: cancelCh, } } @@ -354,6 +364,8 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { f.byteOffset = int(f.frame.Offset) case <-unblock: return 0, nil + case err := <-f.errCh: + return 0, err case <-f.cancelCh: return 0, io.EOF } diff --git a/api/fs_test.go b/api/fs_test.go index 09f5f9073..bd9219fc2 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -1,8 +1,10 @@ package api import ( + "fmt" "io" "reflect" + "strings" "testing" "time" ) @@ -11,9 +13,10 @@ func TestFS_FrameReader(t *testing.T) { t.Parallel() // Create a channel of the frames and a cancel channel framesCh := make(chan *StreamFrame, 3) + errCh := make(chan error) cancelCh := make(chan struct{}) - r := NewFrameReader(framesCh, cancelCh) + r := NewFrameReader(framesCh, errCh, cancelCh) // Create some frames and send them f1 := &StreamFrame{ @@ -80,9 +83,10 @@ func TestFS_FrameReader_Unblock(t *testing.T) { t.Parallel() // Create a channel of the frames and a cancel channel framesCh := make(chan *StreamFrame, 3) + errCh := make(chan error) cancelCh := make(chan struct{}) - r := NewFrameReader(framesCh, cancelCh) + r := NewFrameReader(framesCh, errCh, cancelCh) r.SetUnblockTime(10 * time.Millisecond) // Read a little @@ -112,3 +116,26 @@ func TestFS_FrameReader_Unblock(t *testing.T) { case <-time.After(300 * time.Millisecond): } } + +func TestFS_FrameReader_Error(t *testing.T) { + t.Parallel() + // Create a channel of the frames and a cancel channel + framesCh := make(chan *StreamFrame, 3) + errCh := make(chan error, 1) + cancelCh := make(chan struct{}) + + r := NewFrameReader(framesCh, errCh, cancelCh) + r.SetUnblockTime(10 * time.Millisecond) + + // Send an error + expected := fmt.Errorf("test error") + errCh <- expected + + // Read a little + p := make([]byte, 12) + + _, err := r.Read(p) + if err == nil || !strings.Contains(err.Error(), expected.Error()) { + t.Fatalf("bad error: %v", err) + } +} diff --git a/command/fs.go b/command/fs.go index 65651b702..97b18a0b4 100644 --- a/command/fs.go +++ b/command/fs.go @@ -322,7 +322,12 @@ func (f *FSCommand) Run(args []string) int { return 1 } - io.Copy(os.Stdout, r) + _, err = io.Copy(os.Stdout, r) + if err != nil { + f.Ui.Error(fmt.Sprintf("error tailing file: %s", err)) + return 1 + } + return 0 } @@ -332,16 +337,18 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, path, origin string, offset, numLines int64) (io.ReadCloser, error) { cancel := make(chan struct{}) - frames, err := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil) - if err != nil { + frames, errCh := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil) + select { + case err := <-errCh: return nil, err + default: } signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) // Create a reader var r io.ReadCloser - frameReader := api.NewFrameReader(frames, cancel) + frameReader := api.NewFrameReader(frames, errCh, cancel) frameReader.SetUnblockTime(500 * time.Millisecond) r = frameReader diff --git a/command/logs.go b/command/logs.go index 84e6e29a5..68e2b3f57 100644 --- a/command/logs.go +++ b/command/logs.go @@ -251,7 +251,12 @@ func (l *LogsCommand) Run(args []string) int { } defer r.Close() - io.Copy(os.Stdout, r) + _, err = io.Copy(os.Stdout, r) + if err != nil { + l.Ui.Error(fmt.Sprintf("error following logs: %s", err)) + return 1 + } + return 0 } @@ -261,16 +266,18 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, follow bool, task, logType, origin string, offset int64) (io.ReadCloser, error) { cancel := make(chan struct{}) - frames, err := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil) - if err != nil { + frames, errCh := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil) + select { + case err := <-errCh: return nil, err + default: } signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) // Create a reader var r io.ReadCloser - frameReader := api.NewFrameReader(frames, cancel) + frameReader := api.NewFrameReader(frames, errCh, cancel) frameReader.SetUnblockTime(500 * time.Millisecond) r = frameReader