diff --git a/api/fs.go b/api/fs.go index d5c51787b..dc0342340 100644 --- a/api/fs.go +++ b/api/fs.go @@ -37,7 +37,7 @@ type StreamFrame struct { } func (s *StreamFrame) IsHeartbeat() bool { - return s.Data == "" && s.FileEvent == "" + return s.Data == "" && s.FileEvent == "" && s.File == "" && s.Offset == 0 } // AllocFS is used to introspect an allocation directory on a Nomad client @@ -127,7 +127,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF } // ReadAt is used to read bytes at a given offset until limit at the given path -// in an allocation directory +// in an allocation directory. If limit is <= 0, there is no limit. func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.Reader, *QueryMeta, error) { node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{}) if err != nil { diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 6cc686eca..87a1698c4 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -1,12 +1,14 @@ package agent import ( + "bytes" "encoding/base64" "fmt" "io" "net/http" "strconv" "strings" + "sync" "time" "gopkg.in/tomb.v1" @@ -32,6 +34,10 @@ const ( // a closed connection without sending any additional data streamHeartbeatRate = 10 * time.Second + // streamBatchWindow is the window in which file content is batched before + // being flushed if the frame size has not been hit. + streamBatchWindow = 200 * time.Millisecond + deleteEvent = "file deleted" truncateEvent = "file truncated" ) @@ -184,6 +190,208 @@ type StreamFrame struct { FileEvent string } +// StreamFramer is used to buffer and send frames as well as heartbeat. +type StreamFramer struct { + out io.WriteCloser + enc *codec.Encoder + heartbeat *time.Ticker + flusher *time.Ticker + shutdown chan struct{} + exitCh chan struct{} + + outbound chan *StreamFrame + + // The mutex protects everything below + l sync.Mutex + + // The current working frame + f *StreamFrame + data *bytes.Buffer + + // Captures whether the framer is running and any error that occured to + // cause it to stop. + running bool + err error +} + +// NewStreamFramer creates a new stream framer that will output StreamFrames to +// the passed output. +func NewStreamFramer(out io.WriteCloser) *StreamFramer { + // Create a JSON encoder + enc := codec.NewEncoder(out, jsonHandle) + + // Create the heartbeat and flush ticker + heartbeat := time.NewTicker(streamHeartbeatRate) + flusher := time.NewTicker(streamBatchWindow) + + return &StreamFramer{ + out: out, + enc: enc, + heartbeat: heartbeat, + flusher: flusher, + outbound: make(chan *StreamFrame), + data: bytes.NewBuffer(make([]byte, 2*frameSize)), + shutdown: make(chan struct{}), + exitCh: make(chan struct{}), + } +} + +// Destroy is used to cleanup the StreamFramer and flush any pending frames +func (s *StreamFramer) Destroy() { + s.l.Lock() + defer s.l.Unlock() + + // Flush any existing frames + if s.f != nil { + s.f.Data = s.readData() + s.enc.Encode(s.f) + } + + s.f = nil + s.running = false + close(s.shutdown) + s.out.Close() + s.heartbeat.Stop() + s.flusher.Stop() +} + +// Run starts a long lived goroutine that handles sending data as well as +// heartbeating +func (s *StreamFramer) Run() { + s.l.Lock() + s.running = true + s.l.Unlock() + + go s.run() +} + +// ExitCh returns a channel that will be closed when the run loop terminates. +func (s *StreamFramer) ExitCh() <-chan struct{} { + return s.exitCh +} + +// run is the internal run method. It exits if Destroy is called or an error +// occurs, in which case the exit channel is closed. +func (s *StreamFramer) run() { + // Store any error and mark it as not running + var err error + defer func() { + s.l.Lock() + s.err = err + close(s.exitCh) + s.l.Unlock() + }() + + // Start a heartbeat/flusher go-routine. This is done seprately to avoid blocking + // the outbound channel. + go func() { + for { + select { + case <-s.shutdown: + return + case <-s.flusher.C: + // Skip if there is nothing to flush + s.l.Lock() + if s.f == nil { + s.l.Unlock() + continue + } + + // Read the data for the frame, and send it + s.f.Data = s.readData() + s.outbound <- s.f + s.f = nil + + s.l.Unlock() + case <-s.heartbeat.C: + // Send a heartbeat frame + s.outbound <- &StreamFrame{} + } + } + }() + + for { + select { + case <-s.shutdown: + return + case o := <-s.outbound: + // Send the frame and then clear the current working frame + if err = s.enc.Encode(o); err != nil { + return + } + } + } +} + +// readData reads the buffered data and returns a base64 encoded version of it. +// Must be called with the lock held. +func (s *StreamFramer) readData() string { + // Compute the amount to read from the buffer + size := s.data.Len() + if size > frameSize { + size = frameSize + } + return base64.StdEncoding.EncodeToString(s.data.Next(size)) +} + +// Send creates and sends a StreamFrame based on the passed parameters. An error +// is returned if the run routine hasn't run or encountered an error. Send is +// asyncronous and does not block for the data to be transferred. +func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error { + s.l.Lock() + defer s.l.Unlock() + + // If we are not running, return the error that caused us to not run or + // indicated that it was never started. + if !s.running { + if s.err != nil { + return s.err + } + return fmt.Errorf("StreamFramer not running") + } + + // Check if not mergeable + if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) { + // Flush the old frame + s.outbound <- &StreamFrame{ + Offset: s.f.Offset, + File: s.f.File, + FileEvent: s.f.FileEvent, + Data: s.readData(), + } + s.f = nil + } + + // Store the new data as the current frame. + if s.f == nil { + s.f = &StreamFrame{ + Offset: offset, + File: file, + FileEvent: fileEvent, + } + } + + // Write the data to the buffer + s.data.Write(data) + + // Flush till we are under the max frame size + for s.data.Len() >= frameSize { + // Create a new frame to send it + s.outbound <- &StreamFrame{ + Offset: s.f.Offset, + File: s.f.File, + FileEvent: s.f.FileEvent, + Data: s.readData(), + } + } + + if s.data.Len() == 0 { + s.f = nil + } + + return nil +} + // Stream streams the content of a file blocking on EOF. // The parameters are: // * path: path to file to stream. @@ -248,9 +456,6 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf } func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, output io.WriteCloser) error { - // Create a JSON encoder - enc := codec.NewEncoder(output, jsonHandle) - // Get the reader f, err := fs.ReadAt(path, offset) if err != nil { @@ -260,11 +465,15 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o // Create a tomb to cancel watch events t := tomb.Tomb{} - defer t.Done() + defer func() { + t.Kill(nil) + t.Done() + }() - // Create the heartbeat timer - ticker := time.NewTimer(streamHeartbeatRate) - defer ticker.Stop() + // Create the framer + framer := NewStreamFramer(output) + framer.Run() + defer framer.Destroy() // Create a variable to allow setting the last event var lastEvent string @@ -274,46 +483,38 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o var changes *watch.FileChanges // Start streaming the data + data := make([]byte, frameSize) OUTER: for { - // Create a frame - frame := StreamFrame{ - Offset: offset, - File: path, - } - data := make([]byte, frameSize) - - if lastEvent != "" { - frame.FileEvent = lastEvent - lastEvent = "" - } - // Read up to the max frame size - n, err := f.Read(data) + n, readErr := f.Read(data) // Update the offset offset += int64(n) - // Convert the data to Base64 - frame.Data = base64.StdEncoding.EncodeToString(data[:n]) - // Return non-EOF errors - if err != nil && err != io.EOF { - return err + if readErr != nil && readErr != io.EOF { + return readErr } // Send the frame - if err := enc.Encode(&frame); err != nil { - return err + if n != 0 { + if err := framer.Send(path, lastEvent, data[:n], offset); err != nil { + return err + } + } + + // Clear the last event + if lastEvent != "" { + lastEvent = "" } // Just keep reading - if err == nil { + if readErr == nil { continue } - // If EOF is hit, wait for a change to the file but periodically - // heartbeat to ensure the socket is not closed + // If EOF is hit, wait for a change to the file if changes == nil { changes, err = fs.ChangeEvents(path, offset, &t) if err != nil { @@ -321,27 +522,12 @@ OUTER: } } - // Reset the heartbeat timer as we just started waiting - ticker.Reset(streamHeartbeatRate) - for { select { case <-changes.Modified: continue OUTER case <-changes.Deleted: - // Send a heartbeat frame with the delete - hFrame := StreamFrame{ - Offset: offset, - File: path, - FileEvent: deleteEvent, - } - - if err := enc.Encode(&hFrame); err != nil { - // The defer on the tomb will stop the watch - return err - } - - return nil + return framer.Send(path, deleteEvent, nil, offset) case <-changes.Truncated: // Close the current reader if err := f.Close(); err != nil { @@ -360,21 +546,8 @@ OUTER: // Store the last event lastEvent = truncateEvent continue OUTER - case <-t.Dying(): + case <-framer.ExitCh(): return nil - case <-ticker.C: - // Send a heartbeat frame - hFrame := StreamFrame{ - Offset: offset, - File: path, - } - - if err := enc.Encode(&hFrame); err != nil { - // The defer on the tomb will stop the watch - return err - } - - ticker.Reset(streamHeartbeatRate) } } } diff --git a/command/fs.go b/command/fs.go index 9e8567828..75ad0d20b 100644 --- a/command/fs.go +++ b/command/fs.go @@ -317,7 +317,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, // Output the last offset if frame != nil && frame.Offset > 0 { - f.Ui.Output(fmt.Sprintf("Last outputted offset (bytes): %d", frame.Offset)) + f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frame.Offset)) } return nil