From 5ca9b6eb372e0ff20428b0595376f7ea04a3b227 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Sun, 31 Mar 2019 12:50:06 -0400 Subject: [PATCH] fifo: Use plain fifo file in Unix This PR switches to using plain fifo files instead of golang structs managed by containerd/fifo library. The library main benefit is management of opening fifo files. In Linux, a reader `open()` request would block until a writer opens the file (and vice-versa). The library uses goroutines so that it's the first IO operation that blocks. This benefit isn't really useful for us: Given that logmon simply streams output in a separate process, blocking of opening or first read is effectively the same. The library additionally makes further complications for managing state and tracking read/write permission that seems overhead for our use, compared to using a file directly. Looking here, I made the following incidental changes: * document that we do handle if fifo files are already created, as we rely on that behavior for logmon restarts * use type system to lock read vs write: currently, fifo library returns `io.ReadWriteCloser` even if fifo is opened for writing only! --- client/lib/fifo/fifo_test.go | 25 ++++++++++++++++++----- client/lib/fifo/fifo_unix.go | 35 +++++++++++++++++++++++---------- client/lib/fifo/fifo_windows.go | 20 +++++++++++-------- client/logmon/logmon.go | 26 +++++++++++++++++------- 4 files changed, 76 insertions(+), 30 deletions(-) diff --git a/client/lib/fifo/fifo_test.go b/client/lib/fifo/fifo_test.go index f1814f1ba..ab7c1531e 100644 --- a/client/lib/fifo/fifo_test.go +++ b/client/lib/fifo/fifo_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" ) +// TestFIFO tests basic behavior, and that reader closes when writer closes func TestFIFO(t *testing.T) { require := require.New(t) var path string @@ -29,9 +30,11 @@ func TestFIFO(t *testing.T) { path = filepath.Join(dir, "fifo") } - reader, err := New(path) + readerOpenFn, err := New(path) require.NoError(err) + var reader io.ReadCloser + toWrite := [][]byte{ []byte("abc\n"), []byte(""), @@ -45,7 +48,12 @@ func TestFIFO(t *testing.T) { wait.Add(1) go func() { defer wait.Done() - io.Copy(&readBuf, reader) + + reader, err = readerOpenFn() + require.NoError(err) + + _, err = io.Copy(&readBuf, reader) + require.NoError(err) }() writer, err := Open(path) @@ -57,9 +65,9 @@ func TestFIFO(t *testing.T) { } require.NoError(writer.Close()) time.Sleep(500 * time.Millisecond) - require.NoError(reader.Close()) wait.Wait() + require.NoError(reader.Close()) expected := "abc\ndef\nnomad\n" require.Equal(expected, readBuf.String()) @@ -67,6 +75,7 @@ func TestFIFO(t *testing.T) { require.NoError(Remove(path)) } +// TestWriteClose asserts that when writer closes, subsequent Write() fails func TestWriteClose(t *testing.T) { require := require.New(t) var path string @@ -81,15 +90,21 @@ func TestWriteClose(t *testing.T) { path = filepath.Join(dir, "fifo") } - reader, err := New(path) + readerOpenFn, err := New(path) require.NoError(err) + var reader io.ReadCloser var readBuf bytes.Buffer var wait sync.WaitGroup wait.Add(1) go func() { defer wait.Done() - io.Copy(&readBuf, reader) + + reader, err = readerOpenFn() + require.NoError(err) + + _, err = io.Copy(&readBuf, reader) + require.NoError(err) }() writer, err := Open(path) diff --git a/client/lib/fifo/fifo_unix.go b/client/lib/fifo/fifo_unix.go index bc98a4618..47188a9bd 100644 --- a/client/lib/fifo/fifo_unix.go +++ b/client/lib/fifo/fifo_unix.go @@ -3,23 +3,34 @@ package fifo import ( - "context" + "fmt" "io" "os" - "syscall" - cfifo "github.com/containerd/fifo" + "golang.org/x/sys/unix" ) -// New creates a fifo at the given path and returns an io.ReadWriteCloser for it -// The fifo must not already exist -func New(path string) (io.ReadWriteCloser, error) { - return cfifo.OpenFifo(context.Background(), path, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600) +// New creates a fifo at the given path, and returns an open function for reading. +// The fifo must not exist already, or that it's already a fifo file +// +// It returns a reader open function that may block until a writer opens +// so it's advised to run it in a goroutine different from reader goroutine +func New(path string) (func() (io.ReadCloser, error), error) { + // create first + if err := mkfifo(path, 0600); err != nil && !os.IsExist(err) { + return nil, fmt.Errorf("error creating fifo %v: %v", path, err) + } + + openFn := func() (io.ReadCloser, error) { + return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe) + } + + return openFn, nil } -// Open opens a fifo that already exists and returns an io.ReadWriteCloser for it -func Open(path string) (io.ReadWriteCloser, error) { - return cfifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0600) +// Open opens a fifo file for reading, assuming it already exists, returns io.WriteCloser +func Open(path string) (io.WriteCloser, error) { + return os.OpenFile(path, unix.O_WRONLY, os.ModeNamedPipe) } // Remove a fifo that already exists at a given path @@ -34,3 +45,7 @@ func IsClosedErr(err error) bool { } return false } + +func mkfifo(path string, mode uint32) (err error) { + return unix.Mkfifo(path, mode) +} diff --git a/client/lib/fifo/fifo_windows.go b/client/lib/fifo/fifo_windows.go index adae1c7e1..b717dba4b 100644 --- a/client/lib/fifo/fifo_windows.go +++ b/client/lib/fifo/fifo_windows.go @@ -67,9 +67,9 @@ func (f *winFIFO) Close() error { return f.listener.Close() } -// New creates a fifo at the given path and returns an io.ReadWriteCloser for it. The fifo -// must not already exist -func New(path string) (io.ReadWriteCloser, error) { +// New creates a fifo at the given path and returns an io.ReadCloser open for it. +// The fifo must not already exist +func New(path string) (func() (io.ReadCloser, error), error) { l, err := winio.ListenPipe(path, &winio.PipeConfig{ InputBufferSize: PipeBufferSize, OutputBufferSize: PipeBufferSize, @@ -78,13 +78,17 @@ func New(path string) (io.ReadWriteCloser, error) { return nil, err } - return &winFIFO{ - listener: l, - }, nil + openFn := func() (io.ReadCloser, error) { + return &winFIFO{ + listener: l, + }, nil + } + + return openFn, nil } -// OpenWriter opens a fifo that already exists and returns an io.ReadWriteCloser for it -func Open(path string) (io.ReadWriteCloser, error) { +// OpenWriter opens a fifo that already exists and returns an io.WriteCloser for it +func Open(path string) (io.WriteCloser, error) { return winio.DialPipe(path, nil) } diff --git a/client/logmon/logmon.go b/client/logmon/logmon.go index 5592187d1..c5763509e 100644 --- a/client/logmon/logmon.go +++ b/client/logmon/logmon.go @@ -177,10 +177,12 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) { // data will be copied from the reader to the rotator. type logRotatorWrapper struct { fifoPath string - processOutReader io.ReadCloser rotatorWriter *logging.FileRotator hasFinishedCopied chan struct{} logger hclog.Logger + + processOutReader io.ReadCloser + opened chan struct{} } // isRunning will return true until the reader is closed @@ -197,29 +199,38 @@ func (l *logRotatorWrapper) isRunning() bool { // processOutWriter to attach to the stdout or stderr of a process. func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) { logger.Info("opening fifo", "path", path) - f, err := fifo.New(path) + fifoOpenFn, err := fifo.New(path) if err != nil { return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err) } wrap := &logRotatorWrapper{ fifoPath: path, - processOutReader: f, rotatorWriter: rotator, hasFinishedCopied: make(chan struct{}), + opened: make(chan struct{}), logger: logger, } - wrap.start() + wrap.start(fifoOpenFn) return wrap, nil } // start starts a goroutine that copies from the pipe into the rotator. This is // called by the constructor and not the user of the wrapper. -func (l *logRotatorWrapper) start() { +func (l *logRotatorWrapper) start(readerOpenFn func() (io.ReadCloser, error)) { go func() { defer close(l.hasFinishedCopied) - _, err := io.Copy(l.rotatorWriter, l.processOutReader) + + reader, err := readerOpenFn() if err != nil { + return + } + l.processOutReader = reader + close(l.opened) + + _, err = io.Copy(l.rotatorWriter, reader) + if err != nil { + l.logger.Error("copying got an error", "error", err) // Close reader to propagate io error across pipe. // Note that this may block until the process exits on // Windows due to @@ -227,7 +238,7 @@ func (l *logRotatorWrapper) start() { // or similar issues. Since this is already running in // a goroutine its safe to block until the process is // force-killed. - l.processOutReader.Close() + reader.Close() } }() return @@ -249,6 +260,7 @@ func (l *logRotatorWrapper) Close() { closeDone := make(chan struct{}) go func() { defer close(closeDone) + <-l.opened err := l.processOutReader.Close() if err != nil && !strings.Contains(err.Error(), "file already closed") { l.logger.Warn("error closing read-side of process output pipe", "err", err)