mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 19:35:41 +03:00
StreamFramer encapsulates the sending, heartbeating and batching behavior of frames
This commit is contained in:
@@ -37,7 +37,7 @@ type StreamFrame struct {
|
||||
}
|
||||
|
||||
func (s *StreamFrame) IsHeartbeat() bool {
|
||||
return s.Data == "" && s.FileEvent == ""
|
||||
return s.Data == "" && s.FileEvent == "" && s.File == "" && s.Offset == 0
|
||||
}
|
||||
|
||||
// AllocFS is used to introspect an allocation directory on a Nomad client
|
||||
@@ -127,7 +127,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
|
||||
}
|
||||
|
||||
// ReadAt is used to read bytes at a given offset until limit at the given path
|
||||
// in an allocation directory
|
||||
// in an allocation directory. If limit is <= 0, there is no limit.
|
||||
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.Reader, *QueryMeta, error) {
|
||||
node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{})
|
||||
if err != nil {
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gopkg.in/tomb.v1"
|
||||
@@ -32,6 +34,10 @@ const (
|
||||
// a closed connection without sending any additional data
|
||||
streamHeartbeatRate = 10 * time.Second
|
||||
|
||||
// streamBatchWindow is the window in which file content is batched before
|
||||
// being flushed if the frame size has not been hit.
|
||||
streamBatchWindow = 200 * time.Millisecond
|
||||
|
||||
deleteEvent = "file deleted"
|
||||
truncateEvent = "file truncated"
|
||||
)
|
||||
@@ -184,6 +190,208 @@ type StreamFrame struct {
|
||||
FileEvent string
|
||||
}
|
||||
|
||||
// StreamFramer is used to buffer and send frames as well as heartbeat.
|
||||
type StreamFramer struct {
|
||||
out io.WriteCloser
|
||||
enc *codec.Encoder
|
||||
heartbeat *time.Ticker
|
||||
flusher *time.Ticker
|
||||
shutdown chan struct{}
|
||||
exitCh chan struct{}
|
||||
|
||||
outbound chan *StreamFrame
|
||||
|
||||
// The mutex protects everything below
|
||||
l sync.Mutex
|
||||
|
||||
// The current working frame
|
||||
f *StreamFrame
|
||||
data *bytes.Buffer
|
||||
|
||||
// Captures whether the framer is running and any error that occured to
|
||||
// cause it to stop.
|
||||
running bool
|
||||
err error
|
||||
}
|
||||
|
||||
// NewStreamFramer creates a new stream framer that will output StreamFrames to
|
||||
// the passed output.
|
||||
func NewStreamFramer(out io.WriteCloser) *StreamFramer {
|
||||
// Create a JSON encoder
|
||||
enc := codec.NewEncoder(out, jsonHandle)
|
||||
|
||||
// Create the heartbeat and flush ticker
|
||||
heartbeat := time.NewTicker(streamHeartbeatRate)
|
||||
flusher := time.NewTicker(streamBatchWindow)
|
||||
|
||||
return &StreamFramer{
|
||||
out: out,
|
||||
enc: enc,
|
||||
heartbeat: heartbeat,
|
||||
flusher: flusher,
|
||||
outbound: make(chan *StreamFrame),
|
||||
data: bytes.NewBuffer(make([]byte, 2*frameSize)),
|
||||
shutdown: make(chan struct{}),
|
||||
exitCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Destroy is used to cleanup the StreamFramer and flush any pending frames
|
||||
func (s *StreamFramer) Destroy() {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
||||
// Flush any existing frames
|
||||
if s.f != nil {
|
||||
s.f.Data = s.readData()
|
||||
s.enc.Encode(s.f)
|
||||
}
|
||||
|
||||
s.f = nil
|
||||
s.running = false
|
||||
close(s.shutdown)
|
||||
s.out.Close()
|
||||
s.heartbeat.Stop()
|
||||
s.flusher.Stop()
|
||||
}
|
||||
|
||||
// Run starts a long lived goroutine that handles sending data as well as
|
||||
// heartbeating
|
||||
func (s *StreamFramer) Run() {
|
||||
s.l.Lock()
|
||||
s.running = true
|
||||
s.l.Unlock()
|
||||
|
||||
go s.run()
|
||||
}
|
||||
|
||||
// ExitCh returns a channel that will be closed when the run loop terminates.
|
||||
func (s *StreamFramer) ExitCh() <-chan struct{} {
|
||||
return s.exitCh
|
||||
}
|
||||
|
||||
// run is the internal run method. It exits if Destroy is called or an error
|
||||
// occurs, in which case the exit channel is closed.
|
||||
func (s *StreamFramer) run() {
|
||||
// Store any error and mark it as not running
|
||||
var err error
|
||||
defer func() {
|
||||
s.l.Lock()
|
||||
s.err = err
|
||||
close(s.exitCh)
|
||||
s.l.Unlock()
|
||||
}()
|
||||
|
||||
// Start a heartbeat/flusher go-routine. This is done seprately to avoid blocking
|
||||
// the outbound channel.
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
return
|
||||
case <-s.flusher.C:
|
||||
// Skip if there is nothing to flush
|
||||
s.l.Lock()
|
||||
if s.f == nil {
|
||||
s.l.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// Read the data for the frame, and send it
|
||||
s.f.Data = s.readData()
|
||||
s.outbound <- s.f
|
||||
s.f = nil
|
||||
|
||||
s.l.Unlock()
|
||||
case <-s.heartbeat.C:
|
||||
// Send a heartbeat frame
|
||||
s.outbound <- &StreamFrame{}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
return
|
||||
case o := <-s.outbound:
|
||||
// Send the frame and then clear the current working frame
|
||||
if err = s.enc.Encode(o); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// readData reads the buffered data and returns a base64 encoded version of it.
|
||||
// Must be called with the lock held.
|
||||
func (s *StreamFramer) readData() string {
|
||||
// Compute the amount to read from the buffer
|
||||
size := s.data.Len()
|
||||
if size > frameSize {
|
||||
size = frameSize
|
||||
}
|
||||
return base64.StdEncoding.EncodeToString(s.data.Next(size))
|
||||
}
|
||||
|
||||
// Send creates and sends a StreamFrame based on the passed parameters. An error
|
||||
// is returned if the run routine hasn't run or encountered an error. Send is
|
||||
// asyncronous and does not block for the data to be transferred.
|
||||
func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
||||
// If we are not running, return the error that caused us to not run or
|
||||
// indicated that it was never started.
|
||||
if !s.running {
|
||||
if s.err != nil {
|
||||
return s.err
|
||||
}
|
||||
return fmt.Errorf("StreamFramer not running")
|
||||
}
|
||||
|
||||
// Check if not mergeable
|
||||
if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) {
|
||||
// Flush the old frame
|
||||
s.outbound <- &StreamFrame{
|
||||
Offset: s.f.Offset,
|
||||
File: s.f.File,
|
||||
FileEvent: s.f.FileEvent,
|
||||
Data: s.readData(),
|
||||
}
|
||||
s.f = nil
|
||||
}
|
||||
|
||||
// Store the new data as the current frame.
|
||||
if s.f == nil {
|
||||
s.f = &StreamFrame{
|
||||
Offset: offset,
|
||||
File: file,
|
||||
FileEvent: fileEvent,
|
||||
}
|
||||
}
|
||||
|
||||
// Write the data to the buffer
|
||||
s.data.Write(data)
|
||||
|
||||
// Flush till we are under the max frame size
|
||||
for s.data.Len() >= frameSize {
|
||||
// Create a new frame to send it
|
||||
s.outbound <- &StreamFrame{
|
||||
Offset: s.f.Offset,
|
||||
File: s.f.File,
|
||||
FileEvent: s.f.FileEvent,
|
||||
Data: s.readData(),
|
||||
}
|
||||
}
|
||||
|
||||
if s.data.Len() == 0 {
|
||||
s.f = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stream streams the content of a file blocking on EOF.
|
||||
// The parameters are:
|
||||
// * path: path to file to stream.
|
||||
@@ -248,9 +456,6 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
|
||||
}
|
||||
|
||||
func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, output io.WriteCloser) error {
|
||||
// Create a JSON encoder
|
||||
enc := codec.NewEncoder(output, jsonHandle)
|
||||
|
||||
// Get the reader
|
||||
f, err := fs.ReadAt(path, offset)
|
||||
if err != nil {
|
||||
@@ -260,11 +465,15 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o
|
||||
|
||||
// Create a tomb to cancel watch events
|
||||
t := tomb.Tomb{}
|
||||
defer t.Done()
|
||||
defer func() {
|
||||
t.Kill(nil)
|
||||
t.Done()
|
||||
}()
|
||||
|
||||
// Create the heartbeat timer
|
||||
ticker := time.NewTimer(streamHeartbeatRate)
|
||||
defer ticker.Stop()
|
||||
// Create the framer
|
||||
framer := NewStreamFramer(output)
|
||||
framer.Run()
|
||||
defer framer.Destroy()
|
||||
|
||||
// Create a variable to allow setting the last event
|
||||
var lastEvent string
|
||||
@@ -274,46 +483,38 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o
|
||||
var changes *watch.FileChanges
|
||||
|
||||
// Start streaming the data
|
||||
data := make([]byte, frameSize)
|
||||
OUTER:
|
||||
for {
|
||||
// Create a frame
|
||||
frame := StreamFrame{
|
||||
Offset: offset,
|
||||
File: path,
|
||||
}
|
||||
data := make([]byte, frameSize)
|
||||
|
||||
if lastEvent != "" {
|
||||
frame.FileEvent = lastEvent
|
||||
lastEvent = ""
|
||||
}
|
||||
|
||||
// Read up to the max frame size
|
||||
n, err := f.Read(data)
|
||||
n, readErr := f.Read(data)
|
||||
|
||||
// Update the offset
|
||||
offset += int64(n)
|
||||
|
||||
// Convert the data to Base64
|
||||
frame.Data = base64.StdEncoding.EncodeToString(data[:n])
|
||||
|
||||
// Return non-EOF errors
|
||||
if err != nil && err != io.EOF {
|
||||
return err
|
||||
if readErr != nil && readErr != io.EOF {
|
||||
return readErr
|
||||
}
|
||||
|
||||
// Send the frame
|
||||
if err := enc.Encode(&frame); err != nil {
|
||||
return err
|
||||
if n != 0 {
|
||||
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the last event
|
||||
if lastEvent != "" {
|
||||
lastEvent = ""
|
||||
}
|
||||
|
||||
// Just keep reading
|
||||
if err == nil {
|
||||
if readErr == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// If EOF is hit, wait for a change to the file but periodically
|
||||
// heartbeat to ensure the socket is not closed
|
||||
// If EOF is hit, wait for a change to the file
|
||||
if changes == nil {
|
||||
changes, err = fs.ChangeEvents(path, offset, &t)
|
||||
if err != nil {
|
||||
@@ -321,27 +522,12 @@ OUTER:
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the heartbeat timer as we just started waiting
|
||||
ticker.Reset(streamHeartbeatRate)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-changes.Modified:
|
||||
continue OUTER
|
||||
case <-changes.Deleted:
|
||||
// Send a heartbeat frame with the delete
|
||||
hFrame := StreamFrame{
|
||||
Offset: offset,
|
||||
File: path,
|
||||
FileEvent: deleteEvent,
|
||||
}
|
||||
|
||||
if err := enc.Encode(&hFrame); err != nil {
|
||||
// The defer on the tomb will stop the watch
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return framer.Send(path, deleteEvent, nil, offset)
|
||||
case <-changes.Truncated:
|
||||
// Close the current reader
|
||||
if err := f.Close(); err != nil {
|
||||
@@ -360,21 +546,8 @@ OUTER:
|
||||
// Store the last event
|
||||
lastEvent = truncateEvent
|
||||
continue OUTER
|
||||
case <-t.Dying():
|
||||
case <-framer.ExitCh():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
// Send a heartbeat frame
|
||||
hFrame := StreamFrame{
|
||||
Offset: offset,
|
||||
File: path,
|
||||
}
|
||||
|
||||
if err := enc.Encode(&hFrame); err != nil {
|
||||
// The defer on the tomb will stop the watch
|
||||
return err
|
||||
}
|
||||
|
||||
ticker.Reset(streamHeartbeatRate)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,7 +317,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
|
||||
|
||||
// Output the last offset
|
||||
if frame != nil && frame.Offset > 0 {
|
||||
f.Ui.Output(fmt.Sprintf("Last outputted offset (bytes): %d", frame.Offset))
|
||||
f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frame.Offset))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user