From 6837a7370859e647b579d3a89098f28e607d9ba2 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Mon, 28 Jan 2019 11:35:21 -0500 Subject: [PATCH] add circbufwriter package --- command/executor_plugin.go | 78 +------------------------ lib/circbufwriter/writer.go | 112 ++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 76 deletions(-) create mode 100644 lib/circbufwriter/writer.go diff --git a/command/executor_plugin.go b/command/executor_plugin.go index 56263dc84..2ba041dff 100644 --- a/command/executor_plugin.go +++ b/command/executor_plugin.go @@ -5,15 +5,13 @@ import ( "io" "os" "strings" - "sync" - "time" - "github.com/armon/circbuf" hclog "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog" plugin "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/drivers/shared/executor" + "github.com/hashicorp/nomad/lib/circbufwriter" "github.com/hashicorp/nomad/plugins/base" ) @@ -60,7 +58,7 @@ func (e *ExecutorPluginCommand) Run(args []string) int { // If the client detatches from go-plugin it will block on logging to stderr. // This buffered writer will never block on write, and instead buffer the // writes to a ring buffer. - bufferedStderrW := newCircbufWriter(os.Stderr) + bufferedStderrW := circbufwriter.New(os.Stderr, circleBufferSize) // Tee the logs to stderr and the file so that they are streamed to the // client @@ -84,75 +82,3 @@ func (e *ExecutorPluginCommand) Run(args []string) int { }) return 0 } - -type circbufWriter struct { - buf *circbuf.Buffer - err error - bufLock sync.Mutex - wr io.Writer - flushCh chan struct{} -} - -func newCircbufWriter(w io.Writer) *circbufWriter { - buf, _ := circbuf.NewBuffer(circleBufferSize) - c := &circbufWriter{ - buf: buf, - wr: w, - flushCh: make(chan struct{}), - } - go c.flushLoop() - return c -} - -func (c *circbufWriter) Write(p []byte) (nn int, err error) { - if c.err != nil { - return nn, c.err - } - c.bufLock.Lock() - nn, err = c.buf.Write(p) - c.bufLock.Unlock() - - select { - case c.flushCh <- struct{}{}: - default: - } - return nn, err -} - -func (c *circbufWriter) Close() error { - var err error - if wc, ok := c.wr.(io.WriteCloser); ok { - err = wc.Close() - } - - close(c.flushCh) - return err -} - -func (c *circbufWriter) flushLoop() { - timer := time.NewTimer(time.Millisecond * 100) - for { - select { - case _, ok := <-c.flushCh: - if !ok { - return - } - c.err = c.flush() - case <-timer.C: - c.err = c.flush() - } - } -} - -func (c *circbufWriter) flush() error { - c.bufLock.Lock() - b := c.buf.Bytes() - c.buf.Reset() - c.bufLock.Unlock() - - var err error - if len(b) > 0 { - _, err = c.wr.Write(b) - } - return err -} diff --git a/lib/circbufwriter/writer.go b/lib/circbufwriter/writer.go new file mode 100644 index 000000000..8588dfec7 --- /dev/null +++ b/lib/circbufwriter/writer.go @@ -0,0 +1,112 @@ +package circbufwriter + +import ( + "io" + "sync" + "time" + + "github.com/armon/circbuf" +) + +type circbufWriter struct { + // circle buffer for data to write + buf *circbuf.Buffer + + // error to return from the writer + err error + + // bufLock syncronizes access to err and buf + bufLock sync.Mutex + + // wrapped writer + wr io.Writer + + // signals to flush the buffer + flushCh chan struct{} +} + +// New created a circle buffered writer that wraps the given writer. The +// bufferSize is the amount of data that will be stored in memory before +// overwriting. +func New(w io.Writer, bufferSize int64) io.WriteCloser { + buf, _ := circbuf.NewBuffer(bufferSize) + c := &circbufWriter{ + buf: buf, + wr: w, + flushCh: make(chan struct{}, 1), + } + go c.flushLoop() + return c +} + +// Write will write the data to the buffer and attempt to flush the buffer to +// the wrapped writer. If the wrapped writer blocks on write, subsequent write +// will be written to the circle buffer. +func (c *circbufWriter) Write(p []byte) (nn int, err error) { + // If the last write returned an error, return it here + c.bufLock.Lock() + if c.err != nil { + return nn, c.err + } + + // Write to the buffer + nn, err = c.buf.Write(p) + c.bufLock.Unlock() + + // Signal to flush the buffer + select { + case c.flushCh <- struct{}{}: + default: + // flush is blocked + } + return nn, err +} + +func (c *circbufWriter) Close() error { + // Guard against double closing channel + select { + case <-c.flushCh: + default: + close(c.flushCh) + } + + // if the last write errored, it will return here + c.bufLock.Lock() + defer c.bufLock.Unlock() + return c.err +} + +func (c *circbufWriter) flushLoop() { + // Check buffer every 100ms in case a flush from Write was missed + ticker := time.NewTicker(time.Millisecond * 100) + for { + var err error + select { + case _, ok := <-c.flushCh: + if !ok { + // Close called, exiting loop + return + } + err = c.flush() + case <-ticker.C: + err = c.flush() + } + + c.bufLock.Lock() + c.err = err + c.bufLock.Unlock() + } +} + +func (c *circbufWriter) flush() error { + c.bufLock.Lock() + b := c.buf.Bytes() + c.buf.Reset() + c.bufLock.Unlock() + + var err error + if len(b) > 0 { + _, err = c.wr.Write(b) + } + return err +}