mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
add circbufwriter package
This commit is contained in:
@@ -5,15 +5,13 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/circbuf"
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
plugin "github.com/hashicorp/go-plugin"
|
||||
|
||||
"github.com/hashicorp/nomad/drivers/shared/executor"
|
||||
"github.com/hashicorp/nomad/lib/circbufwriter"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
)
|
||||
|
||||
@@ -60,7 +58,7 @@ func (e *ExecutorPluginCommand) Run(args []string) int {
|
||||
// If the client detatches from go-plugin it will block on logging to stderr.
|
||||
// This buffered writer will never block on write, and instead buffer the
|
||||
// writes to a ring buffer.
|
||||
bufferedStderrW := newCircbufWriter(os.Stderr)
|
||||
bufferedStderrW := circbufwriter.New(os.Stderr, circleBufferSize)
|
||||
|
||||
// Tee the logs to stderr and the file so that they are streamed to the
|
||||
// client
|
||||
@@ -84,75 +82,3 @@ func (e *ExecutorPluginCommand) Run(args []string) int {
|
||||
})
|
||||
return 0
|
||||
}
|
||||
|
||||
type circbufWriter struct {
|
||||
buf *circbuf.Buffer
|
||||
err error
|
||||
bufLock sync.Mutex
|
||||
wr io.Writer
|
||||
flushCh chan struct{}
|
||||
}
|
||||
|
||||
func newCircbufWriter(w io.Writer) *circbufWriter {
|
||||
buf, _ := circbuf.NewBuffer(circleBufferSize)
|
||||
c := &circbufWriter{
|
||||
buf: buf,
|
||||
wr: w,
|
||||
flushCh: make(chan struct{}),
|
||||
}
|
||||
go c.flushLoop()
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *circbufWriter) Write(p []byte) (nn int, err error) {
|
||||
if c.err != nil {
|
||||
return nn, c.err
|
||||
}
|
||||
c.bufLock.Lock()
|
||||
nn, err = c.buf.Write(p)
|
||||
c.bufLock.Unlock()
|
||||
|
||||
select {
|
||||
case c.flushCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return nn, err
|
||||
}
|
||||
|
||||
func (c *circbufWriter) Close() error {
|
||||
var err error
|
||||
if wc, ok := c.wr.(io.WriteCloser); ok {
|
||||
err = wc.Close()
|
||||
}
|
||||
|
||||
close(c.flushCh)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *circbufWriter) flushLoop() {
|
||||
timer := time.NewTimer(time.Millisecond * 100)
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-c.flushCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
c.err = c.flush()
|
||||
case <-timer.C:
|
||||
c.err = c.flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *circbufWriter) flush() error {
|
||||
c.bufLock.Lock()
|
||||
b := c.buf.Bytes()
|
||||
c.buf.Reset()
|
||||
c.bufLock.Unlock()
|
||||
|
||||
var err error
|
||||
if len(b) > 0 {
|
||||
_, err = c.wr.Write(b)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user