From 5ca9b6eb372e0ff20428b0595376f7ea04a3b227 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Sun, 31 Mar 2019 12:50:06 -0400 Subject: [PATCH 1/9] fifo: Use plain fifo file in Unix This PR switches to using plain fifo files instead of golang structs managed by containerd/fifo library. The library main benefit is management of opening fifo files. In Linux, a reader `open()` request would block until a writer opens the file (and vice-versa). The library uses goroutines so that it's the first IO operation that blocks. This benefit isn't really useful for us: Given that logmon simply streams output in a separate process, blocking of opening or first read is effectively the same. The library additionally makes further complications for managing state and tracking read/write permission that seems overhead for our use, compared to using a file directly. Looking here, I made the following incidental changes: * document that we do handle if fifo files are already created, as we rely on that behavior for logmon restarts * use type system to lock read vs write: currently, fifo library returns `io.ReadWriteCloser` even if fifo is opened for writing only! --- client/lib/fifo/fifo_test.go | 25 ++++++++++++++++++----- client/lib/fifo/fifo_unix.go | 35 +++++++++++++++++++++++---------- client/lib/fifo/fifo_windows.go | 20 +++++++++++-------- client/logmon/logmon.go | 26 +++++++++++++++++------- 4 files changed, 76 insertions(+), 30 deletions(-) diff --git a/client/lib/fifo/fifo_test.go b/client/lib/fifo/fifo_test.go index f1814f1ba..ab7c1531e 100644 --- a/client/lib/fifo/fifo_test.go +++ b/client/lib/fifo/fifo_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" ) +// TestFIFO tests basic behavior, and that reader closes when writer closes func TestFIFO(t *testing.T) { require := require.New(t) var path string @@ -29,9 +30,11 @@ func TestFIFO(t *testing.T) { path = filepath.Join(dir, "fifo") } - reader, err := New(path) + readerOpenFn, err := New(path) require.NoError(err) + var reader io.ReadCloser + toWrite := [][]byte{ []byte("abc\n"), []byte(""), @@ -45,7 +48,12 @@ func TestFIFO(t *testing.T) { wait.Add(1) go func() { defer wait.Done() - io.Copy(&readBuf, reader) + + reader, err = readerOpenFn() + require.NoError(err) + + _, err = io.Copy(&readBuf, reader) + require.NoError(err) }() writer, err := Open(path) @@ -57,9 +65,9 @@ func TestFIFO(t *testing.T) { } require.NoError(writer.Close()) time.Sleep(500 * time.Millisecond) - require.NoError(reader.Close()) wait.Wait() + require.NoError(reader.Close()) expected := "abc\ndef\nnomad\n" require.Equal(expected, readBuf.String()) @@ -67,6 +75,7 @@ func TestFIFO(t *testing.T) { require.NoError(Remove(path)) } +// TestWriteClose asserts that when writer closes, subsequent Write() fails func TestWriteClose(t *testing.T) { require := require.New(t) var path string @@ -81,15 +90,21 @@ func TestWriteClose(t *testing.T) { path = filepath.Join(dir, "fifo") } - reader, err := New(path) + readerOpenFn, err := New(path) require.NoError(err) + var reader io.ReadCloser var readBuf bytes.Buffer var wait sync.WaitGroup wait.Add(1) go func() { defer wait.Done() - io.Copy(&readBuf, reader) + + reader, err = readerOpenFn() + require.NoError(err) + + _, err = io.Copy(&readBuf, reader) + require.NoError(err) }() writer, err := Open(path) diff --git a/client/lib/fifo/fifo_unix.go b/client/lib/fifo/fifo_unix.go index bc98a4618..47188a9bd 100644 --- a/client/lib/fifo/fifo_unix.go +++ b/client/lib/fifo/fifo_unix.go @@ -3,23 +3,34 @@ package fifo import ( - "context" + "fmt" "io" "os" - "syscall" - cfifo "github.com/containerd/fifo" + "golang.org/x/sys/unix" ) -// New creates a fifo at the given path and returns an io.ReadWriteCloser for it -// The fifo must not already exist -func New(path string) (io.ReadWriteCloser, error) { - return cfifo.OpenFifo(context.Background(), path, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600) +// New creates a fifo at the given path, and returns an open function for reading. +// The fifo must not exist already, or that it's already a fifo file +// +// It returns a reader open function that may block until a writer opens +// so it's advised to run it in a goroutine different from reader goroutine +func New(path string) (func() (io.ReadCloser, error), error) { + // create first + if err := mkfifo(path, 0600); err != nil && !os.IsExist(err) { + return nil, fmt.Errorf("error creating fifo %v: %v", path, err) + } + + openFn := func() (io.ReadCloser, error) { + return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe) + } + + return openFn, nil } -// Open opens a fifo that already exists and returns an io.ReadWriteCloser for it -func Open(path string) (io.ReadWriteCloser, error) { - return cfifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0600) +// Open opens a fifo file for reading, assuming it already exists, returns io.WriteCloser +func Open(path string) (io.WriteCloser, error) { + return os.OpenFile(path, unix.O_WRONLY, os.ModeNamedPipe) } // Remove a fifo that already exists at a given path @@ -34,3 +45,7 @@ func IsClosedErr(err error) bool { } return false } + +func mkfifo(path string, mode uint32) (err error) { + return unix.Mkfifo(path, mode) +} diff --git a/client/lib/fifo/fifo_windows.go b/client/lib/fifo/fifo_windows.go index adae1c7e1..b717dba4b 100644 --- a/client/lib/fifo/fifo_windows.go +++ b/client/lib/fifo/fifo_windows.go @@ -67,9 +67,9 @@ func (f *winFIFO) Close() error { return f.listener.Close() } -// New creates a fifo at the given path and returns an io.ReadWriteCloser for it. The fifo -// must not already exist -func New(path string) (io.ReadWriteCloser, error) { +// New creates a fifo at the given path and returns an io.ReadCloser open for it. +// The fifo must not already exist +func New(path string) (func() (io.ReadCloser, error), error) { l, err := winio.ListenPipe(path, &winio.PipeConfig{ InputBufferSize: PipeBufferSize, OutputBufferSize: PipeBufferSize, @@ -78,13 +78,17 @@ func New(path string) (io.ReadWriteCloser, error) { return nil, err } - return &winFIFO{ - listener: l, - }, nil + openFn := func() (io.ReadCloser, error) { + return &winFIFO{ + listener: l, + }, nil + } + + return openFn, nil } -// OpenWriter opens a fifo that already exists and returns an io.ReadWriteCloser for it -func Open(path string) (io.ReadWriteCloser, error) { +// OpenWriter opens a fifo that already exists and returns an io.WriteCloser for it +func Open(path string) (io.WriteCloser, error) { return winio.DialPipe(path, nil) } diff --git a/client/logmon/logmon.go b/client/logmon/logmon.go index 5592187d1..c5763509e 100644 --- a/client/logmon/logmon.go +++ b/client/logmon/logmon.go @@ -177,10 +177,12 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) { // data will be copied from the reader to the rotator. type logRotatorWrapper struct { fifoPath string - processOutReader io.ReadCloser rotatorWriter *logging.FileRotator hasFinishedCopied chan struct{} logger hclog.Logger + + processOutReader io.ReadCloser + opened chan struct{} } // isRunning will return true until the reader is closed @@ -197,29 +199,38 @@ func (l *logRotatorWrapper) isRunning() bool { // processOutWriter to attach to the stdout or stderr of a process. func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) { logger.Info("opening fifo", "path", path) - f, err := fifo.New(path) + fifoOpenFn, err := fifo.New(path) if err != nil { return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err) } wrap := &logRotatorWrapper{ fifoPath: path, - processOutReader: f, rotatorWriter: rotator, hasFinishedCopied: make(chan struct{}), + opened: make(chan struct{}), logger: logger, } - wrap.start() + wrap.start(fifoOpenFn) return wrap, nil } // start starts a goroutine that copies from the pipe into the rotator. This is // called by the constructor and not the user of the wrapper. -func (l *logRotatorWrapper) start() { +func (l *logRotatorWrapper) start(readerOpenFn func() (io.ReadCloser, error)) { go func() { defer close(l.hasFinishedCopied) - _, err := io.Copy(l.rotatorWriter, l.processOutReader) + + reader, err := readerOpenFn() if err != nil { + return + } + l.processOutReader = reader + close(l.opened) + + _, err = io.Copy(l.rotatorWriter, reader) + if err != nil { + l.logger.Error("copying got an error", "error", err) // Close reader to propagate io error across pipe. // Note that this may block until the process exits on // Windows due to @@ -227,7 +238,7 @@ func (l *logRotatorWrapper) start() { // or similar issues. Since this is already running in // a goroutine its safe to block until the process is // force-killed. - l.processOutReader.Close() + reader.Close() } }() return @@ -249,6 +260,7 @@ func (l *logRotatorWrapper) Close() { closeDone := make(chan struct{}) go func() { defer close(closeDone) + <-l.opened err := l.processOutReader.Close() if err != nil && !strings.Contains(err.Error(), "file already closed") { l.logger.Warn("error closing read-side of process output pipe", "err", err) From 0517bcb4030b246c97346d5bee1c3c1e9a5596e8 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 1 Apr 2019 11:27:00 -0400 Subject: [PATCH 2/9] run fifo tests on Windows --- appveyor.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/appveyor.yml b/appveyor.yml index 3f50466d9..387d6d501 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -41,6 +41,7 @@ test_script: - cmd: gotestsum --junitfile results.xml github.com/hashicorp/nomad/drivers/docker + github.com/hashicorp/nomad/client/lib/fifo # on_finish: # - ps: | # Push-AppveyorArtifact (Resolve-Path .\results.xml) From 9f1ee37687215b10b17e3dc289c482a9c67ba409 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 1 Apr 2019 11:29:02 -0400 Subject: [PATCH 3/9] log when fifo fails to open --- client/logmon/logmon.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/logmon/logmon.go b/client/logmon/logmon.go index c5763509e..e22686422 100644 --- a/client/logmon/logmon.go +++ b/client/logmon/logmon.go @@ -223,6 +223,7 @@ func (l *logRotatorWrapper) start(readerOpenFn func() (io.ReadCloser, error)) { reader, err := readerOpenFn() if err != nil { + l.logger.Warn("failed to open log fifo", "error", err) return } l.processOutReader = reader @@ -230,7 +231,7 @@ func (l *logRotatorWrapper) start(readerOpenFn func() (io.ReadCloser, error)) { _, err = io.Copy(l.rotatorWriter, reader) if err != nil { - l.logger.Error("copying got an error", "error", err) + l.logger.Warn("failed to read from log fifo", "error", err) // Close reader to propagate io error across pipe. // Note that this may block until the process exits on // Windows due to From 3fb377ae6e9dd60f277a75b6fc043aef137ebde3 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 1 Apr 2019 11:59:56 -0400 Subject: [PATCH 4/9] Add test cases for waiting on children Also, make the test use files just like in the non-test case. --- .../shared/executor/executor_linux_test.go | 24 ++-- drivers/shared/executor/executor_test.go | 121 ++++++++++++++---- 2 files changed, 113 insertions(+), 32 deletions(-) diff --git a/drivers/shared/executor/executor_linux_test.go b/drivers/shared/executor/executor_linux_test.go index b8db9f71a..727d2ea60 100644 --- a/drivers/shared/executor/executor_linux_test.go +++ b/drivers/shared/executor/executor_linux_test.go @@ -39,7 +39,7 @@ var libcontainerFactory = executorFactory{ // chroot. Use testExecutorContext if you don't need a chroot. // // The caller is responsible for calling AllocDir.Destroy() to cleanup. -func testExecutorCommandWithChroot(t *testing.T) (*ExecCommand, *allocdir.AllocDir) { +func testExecutorCommandWithChroot(t *testing.T) *testExecCmd { chrootEnv := map[string]string{ "/etc/ld.so.cache": "/etc/ld.so.cache", "/etc/ld.so.conf": "/etc/ld.so.conf", @@ -74,9 +74,13 @@ func testExecutorCommandWithChroot(t *testing.T) (*ExecCommand, *allocdir.AllocD NomadResources: alloc.AllocatedResources.Tasks[task.Name], }, } - configureTLogging(cmd) - return cmd, allocDir + testCmd := &testExecCmd{ + command: cmd, + allocDir: allocDir, + } + configureTLogging(t, testCmd) + return testCmd } func TestExecutor_IsolationAndConstraints(t *testing.T) { @@ -84,7 +88,8 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) { require := require.New(t) testutil.ExecCompatible(t) - execCmd, allocDir := testExecutorCommandWithChroot(t) + testExecCmd := testExecutorCommandWithChroot(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir execCmd.Cmd = "/bin/ls" execCmd.Args = []string{"-F", "/", "/etc/"} defer allocDir.Destroy() @@ -146,8 +151,7 @@ ld.so.cache ld.so.conf ld.so.conf.d/` tu.WaitForResult(func() (bool, error) { - outWriter, _ := execCmd.GetWriters() - output := outWriter.(*bufferCloser).String() + output := testExecCmd.stdout.String() act := strings.TrimSpace(string(output)) if act != expected { return false, fmt.Errorf("Command output incorrectly: want %v; got %v", expected, act) @@ -161,7 +165,8 @@ func TestExecutor_ClientCleanup(t *testing.T) { testutil.ExecCompatible(t) require := require.New(t) - execCmd, allocDir := testExecutorCommandWithChroot(t) + testExecCmd := testExecutorCommandWithChroot(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir defer allocDir.Destroy() executor := NewExecutorWithIsolation(testlog.HCLogger(t)) @@ -193,11 +198,10 @@ func TestExecutor_ClientCleanup(t *testing.T) { require.Fail("timeout waiting for exec to shutdown") } - outWriter, _ := execCmd.GetWriters() - output := outWriter.(*bufferCloser).String() + output := testExecCmd.stdout.String() require.NotZero(len(output)) time.Sleep(2 * time.Second) - output1 := outWriter.(*bufferCloser).String() + output1 := testExecCmd.stdout.String() require.Equal(len(output), len(output1)) } diff --git a/drivers/shared/executor/executor_test.go b/drivers/shared/executor/executor_test.go index 735650555..92e00c523 100644 --- a/drivers/shared/executor/executor_test.go +++ b/drivers/shared/executor/executor_test.go @@ -4,11 +4,13 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" "os" "path/filepath" "runtime" "strings" + "sync" "syscall" "testing" "time" @@ -42,10 +44,19 @@ func init() { executorFactories["UniversalExecutor"] = universalFactory } +type testExecCmd struct { + command *ExecCommand + allocDir *allocdir.AllocDir + + stdout *bytes.Buffer + stderr *bytes.Buffer + outputCopyDone *sync.WaitGroup +} + // testExecutorContext returns an ExecutorContext and AllocDir. // // The caller is responsible for calling AllocDir.Destroy() to cleanup. -func testExecutorCommand(t *testing.T) (*ExecCommand, *allocdir.AllocDir) { +func testExecutorCommand(t *testing.T) *testExecCmd { alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] taskEnv := taskenv.NewBuilder(mock.Node(), alloc, task, "global").Build() @@ -78,18 +89,40 @@ func testExecutorCommand(t *testing.T) (*ExecCommand, *allocdir.AllocDir) { }, } - configureTLogging(cmd) - return cmd, allocDir + testCmd := &testExecCmd{ + command: cmd, + allocDir: allocDir, + } + configureTLogging(t, testCmd) + return testCmd } -type bufferCloser struct { - bytes.Buffer -} +func configureTLogging(t *testing.T, testcmd *testExecCmd) { + var stdout, stderr bytes.Buffer + var copyDone sync.WaitGroup -func (_ *bufferCloser) Close() error { return nil } + stdoutPr, stdoutPw, err := os.Pipe() + require.NoError(t, err) -func configureTLogging(cmd *ExecCommand) (stdout bufferCloser, stderr bufferCloser) { - cmd.SetWriters(&stdout, &stderr) + stderrPr, stderrPw, err := os.Pipe() + require.NoError(t, err) + + copyDone.Add(2) + go func() { + defer copyDone.Done() + io.Copy(&stdout, stdoutPr) + }() + go func() { + defer copyDone.Done() + io.Copy(&stderr, stderrPr) + }() + + testcmd.stdout = &stdout + testcmd.stderr = &stderr + testcmd.outputCopyDone = ©Done + + testcmd.command.stdout = stdoutPw + testcmd.command.stderr = stderrPw return } @@ -99,7 +132,8 @@ func TestExecutor_Start_Invalid(pt *testing.T) { for name, factory := range executorFactories { pt.Run(name, func(t *testing.T) { require := require.New(t) - execCmd, allocDir := testExecutorCommand(t) + testExecCmd := testExecutorCommand(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir execCmd.Cmd = invalid execCmd.Args = []string{"1"} factory.configureExecCmd(t, execCmd) @@ -118,7 +152,8 @@ func TestExecutor_Start_Wait_Failure_Code(pt *testing.T) { for name, factory := range executorFactories { pt.Run(name, func(t *testing.T) { require := require.New(t) - execCmd, allocDir := testExecutorCommand(t) + testExecCmd := testExecutorCommand(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir execCmd.Cmd = "/bin/date" execCmd.Args = []string{"fail"} factory.configureExecCmd(t, execCmd) @@ -141,7 +176,8 @@ func TestExecutor_Start_Wait(pt *testing.T) { for name, factory := range executorFactories { pt.Run(name, func(t *testing.T) { require := require.New(t) - execCmd, allocDir := testExecutorCommand(t) + testExecCmd := testExecutorCommand(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir execCmd.Cmd = "/bin/echo" execCmd.Args = []string{"hello world"} factory.configureExecCmd(t, execCmd) @@ -160,9 +196,7 @@ func TestExecutor_Start_Wait(pt *testing.T) { expected := "hello world" tu.WaitForResult(func() (bool, error) { - outWriter, _ := execCmd.GetWriters() - output := outWriter.(*bufferCloser).String() - act := strings.TrimSpace(string(output)) + act := strings.TrimSpace(string(testExecCmd.stdout.String())) if expected != act { return false, fmt.Errorf("expected: '%s' actual: '%s'", expected, act) } @@ -174,12 +208,52 @@ func TestExecutor_Start_Wait(pt *testing.T) { } } +func TestExecutor_Start_Wait_Children(pt *testing.T) { + pt.Parallel() + for name, factory := range executorFactories { + pt.Run(name, func(t *testing.T) { + require := require.New(t) + testExecCmd := testExecutorCommand(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir + execCmd.Cmd = "/bin/sh" + execCmd.Args = []string{"-c", "(sleep 30 > /dev/null & ) ; exec sleep 1"} + factory.configureExecCmd(t, execCmd) + + defer allocDir.Destroy() + executor := factory.new(testlog.HCLogger(t)) + defer executor.Shutdown("SIGKILL", 0) + + ps, err := executor.Launch(execCmd) + require.NoError(err) + require.NotZero(ps.Pid) + + ch := make(chan error) + + go func() { + ps, err = executor.Wait(context.Background()) + t.Logf("Processe completed with %#v error: %#v", ps, err) + ch <- err + }() + + timeout := 7 * time.Second + select { + case <-ch: + require.NoError(err) + //good + case <-time.After(timeout): + require.Fail(fmt.Sprintf("process is running after timeout: %v", timeout)) + } + }) + } +} + func TestExecutor_WaitExitSignal(pt *testing.T) { pt.Parallel() for name, factory := range executorFactories { pt.Run(name, func(t *testing.T) { require := require.New(t) - execCmd, allocDir := testExecutorCommand(t) + testExecCmd := testExecutorCommand(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir execCmd.Cmd = "/bin/sleep" execCmd.Args = []string{"10000"} execCmd.ResourceLimits = true @@ -233,7 +307,8 @@ func TestExecutor_Start_Kill(pt *testing.T) { for name, factory := range executorFactories { pt.Run(name, func(t *testing.T) { require := require.New(t) - execCmd, allocDir := testExecutorCommand(t) + testExecCmd := testExecutorCommand(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir execCmd.Cmd = "/bin/sleep" execCmd.Args = []string{"10"} factory.configureExecCmd(t, execCmd) @@ -249,8 +324,7 @@ func TestExecutor_Start_Kill(pt *testing.T) { require.NoError(executor.Shutdown("SIGINT", 100*time.Millisecond)) time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second) - outWriter, _ := execCmd.GetWriters() - output := outWriter.(*bufferCloser).String() + output := testExecCmd.stdout.String() expected := "" act := strings.TrimSpace(string(output)) if act != expected { @@ -263,7 +337,8 @@ func TestExecutor_Start_Kill(pt *testing.T) { func TestExecutor_Shutdown_Exit(t *testing.T) { require := require.New(t) t.Parallel() - execCmd, allocDir := testExecutorCommand(t) + testExecCmd := testExecutorCommand(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir execCmd.Cmd = "/bin/sleep" execCmd.Args = []string{"100"} cfg := &ExecutorConfig{ @@ -400,7 +475,8 @@ func TestExecutor_Start_Kill_Immediately_NoGrace(pt *testing.T) { for name, factory := range executorFactories { pt.Run(name, func(t *testing.T) { require := require.New(t) - execCmd, allocDir := testExecutorCommand(t) + testExecCmd := testExecutorCommand(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir execCmd.Cmd = "/bin/sleep" execCmd.Args = []string{"100"} factory.configureExecCmd(t, execCmd) @@ -435,7 +511,8 @@ func TestExecutor_Start_Kill_Immediately_WithGrace(pt *testing.T) { for name, factory := range executorFactories { pt.Run(name, func(t *testing.T) { require := require.New(t) - execCmd, allocDir := testExecutorCommand(t) + testExecCmd := testExecutorCommand(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir execCmd.Cmd = "/bin/sleep" execCmd.Args = []string{"100"} factory.configureExecCmd(t, execCmd) From 48259078df97f8efe7d1d0da464d6dbe5479336d Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 1 Apr 2019 12:53:37 -0400 Subject: [PATCH 5/9] avoid opening files just to close them --- drivers/shared/executor/executor.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/drivers/shared/executor/executor.go b/drivers/shared/executor/executor.go index 070e101d5..2bd32dc48 100644 --- a/drivers/shared/executor/executor.go +++ b/drivers/shared/executor/executor.go @@ -177,13 +177,11 @@ func (c *ExecCommand) Stderr() (io.WriteCloser, error) { } func (c *ExecCommand) Close() { - stdout, err := c.Stdout() - if err == nil { - stdout.Close() + if c.stdout != nil { + c.stdout.Close() } - stderr, err := c.Stderr() - if err == nil { - stderr.Close() + if c.stderr != nil { + c.stderr.Close() } } From 3c68c946c4e4975f5321a899e2b7e5d266986bc7 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 1 Apr 2019 15:38:39 -0400 Subject: [PATCH 6/9] no requires in a test goroutine --- client/lib/fifo/fifo_test.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/client/lib/fifo/fifo_test.go b/client/lib/fifo/fifo_test.go index ab7c1531e..05d678563 100644 --- a/client/lib/fifo/fifo_test.go +++ b/client/lib/fifo/fifo_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/hashicorp/nomad/helper/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -50,10 +51,13 @@ func TestFIFO(t *testing.T) { defer wait.Done() reader, err = readerOpenFn() - require.NoError(err) + assert.NoError(t, err) + if err != nil { + return + } _, err = io.Copy(&readBuf, reader) - require.NoError(err) + assert.NoError(t, err) }() writer, err := Open(path) @@ -101,10 +105,13 @@ func TestWriteClose(t *testing.T) { defer wait.Done() reader, err = readerOpenFn() - require.NoError(err) + assert.NoError(t, err) + if err != nil { + return + } _, err = io.Copy(&readBuf, reader) - require.NoError(err) + assert.NoError(t, err) }() writer, err := Open(path) From 7661857c6e3ce762d99c1d81db16bbfc0a0c3395 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 1 Apr 2019 15:52:02 -0400 Subject: [PATCH 7/9] clarify closeDone blocking and field name --- client/logmon/logmon.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/client/logmon/logmon.go b/client/logmon/logmon.go index e22686422..a248dcf27 100644 --- a/client/logmon/logmon.go +++ b/client/logmon/logmon.go @@ -182,7 +182,7 @@ type logRotatorWrapper struct { logger hclog.Logger processOutReader io.ReadCloser - opened chan struct{} + openCompleted chan struct{} } // isRunning will return true until the reader is closed @@ -208,7 +208,7 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.Fil fifoPath: path, rotatorWriter: rotator, hasFinishedCopied: make(chan struct{}), - opened: make(chan struct{}), + openCompleted: make(chan struct{}), logger: logger, } wrap.start(fifoOpenFn) @@ -223,11 +223,12 @@ func (l *logRotatorWrapper) start(readerOpenFn func() (io.ReadCloser, error)) { reader, err := readerOpenFn() if err != nil { + close(l.openCompleted) l.logger.Warn("failed to open log fifo", "error", err) return } l.processOutReader = reader - close(l.opened) + close(l.openCompleted) _, err = io.Copy(l.rotatorWriter, reader) if err != nil { @@ -261,10 +262,17 @@ func (l *logRotatorWrapper) Close() { closeDone := make(chan struct{}) go func() { defer close(closeDone) - <-l.opened - err := l.processOutReader.Close() - if err != nil && !strings.Contains(err.Error(), "file already closed") { - l.logger.Warn("error closing read-side of process output pipe", "err", err) + + // we must wait until reader is opened before we can close it, and cannot inteerrupt an in-flight open request + // The Close function uses processOutputCloseTolerance to protect against long running open called + // and then request will be interrupted and file will be closed on process shutdown + <-l.openCompleted + + if l.processOutReader != nil { + err := l.processOutReader.Close() + if err != nil && !strings.Contains(err.Error(), "file already closed") { + l.logger.Warn("error closing read-side of process output pipe", "err", err) + } } }() From 714c41185c83cd6727b816ad6bcbf0e5d9897242 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 1 Apr 2019 15:56:43 -0400 Subject: [PATCH 8/9] rename fifo methods for clarity --- client/lib/fifo/fifo_test.go | 8 ++++---- client/lib/fifo/fifo_unix.go | 8 ++++---- client/lib/fifo/fifo_windows.go | 6 +++--- client/logmon/logmon.go | 2 +- client/logmon/logmon_test.go | 14 +++++++------- drivers/docker/docklog/docker_logger.go | 4 ++-- drivers/mock/handle.go | 4 ++-- drivers/shared/executor/executor.go | 4 ++-- 8 files changed, 25 insertions(+), 25 deletions(-) diff --git a/client/lib/fifo/fifo_test.go b/client/lib/fifo/fifo_test.go index 05d678563..632fbbffc 100644 --- a/client/lib/fifo/fifo_test.go +++ b/client/lib/fifo/fifo_test.go @@ -31,7 +31,7 @@ func TestFIFO(t *testing.T) { path = filepath.Join(dir, "fifo") } - readerOpenFn, err := New(path) + readerOpenFn, err := CreateAndRead(path) require.NoError(err) var reader io.ReadCloser @@ -60,7 +60,7 @@ func TestFIFO(t *testing.T) { assert.NoError(t, err) }() - writer, err := Open(path) + writer, err := OpenWriter(path) require.NoError(err) for _, b := range toWrite { n, err := writer.Write(b) @@ -94,7 +94,7 @@ func TestWriteClose(t *testing.T) { path = filepath.Join(dir, "fifo") } - readerOpenFn, err := New(path) + readerOpenFn, err := CreateAndRead(path) require.NoError(err) var reader io.ReadCloser @@ -114,7 +114,7 @@ func TestWriteClose(t *testing.T) { assert.NoError(t, err) }() - writer, err := Open(path) + writer, err := OpenWriter(path) require.NoError(err) var count int diff --git a/client/lib/fifo/fifo_unix.go b/client/lib/fifo/fifo_unix.go index 47188a9bd..4d902fb03 100644 --- a/client/lib/fifo/fifo_unix.go +++ b/client/lib/fifo/fifo_unix.go @@ -10,12 +10,12 @@ import ( "golang.org/x/sys/unix" ) -// New creates a fifo at the given path, and returns an open function for reading. +// CreateAndRead creates a fifo at the given path, and returns an open function for reading. // The fifo must not exist already, or that it's already a fifo file // // It returns a reader open function that may block until a writer opens // so it's advised to run it in a goroutine different from reader goroutine -func New(path string) (func() (io.ReadCloser, error), error) { +func CreateAndRead(path string) (func() (io.ReadCloser, error), error) { // create first if err := mkfifo(path, 0600); err != nil && !os.IsExist(err) { return nil, fmt.Errorf("error creating fifo %v: %v", path, err) @@ -28,8 +28,8 @@ func New(path string) (func() (io.ReadCloser, error), error) { return openFn, nil } -// Open opens a fifo file for reading, assuming it already exists, returns io.WriteCloser -func Open(path string) (io.WriteCloser, error) { +// OpenWriter opens a fifo file for writer, assuming it already exists, returns io.WriteCloser +func OpenWriter(path string) (io.WriteCloser, error) { return os.OpenFile(path, unix.O_WRONLY, os.ModeNamedPipe) } diff --git a/client/lib/fifo/fifo_windows.go b/client/lib/fifo/fifo_windows.go index b717dba4b..24d9c1e42 100644 --- a/client/lib/fifo/fifo_windows.go +++ b/client/lib/fifo/fifo_windows.go @@ -67,9 +67,9 @@ func (f *winFIFO) Close() error { return f.listener.Close() } -// New creates a fifo at the given path and returns an io.ReadCloser open for it. +// CreateAndRead creates a fifo at the given path and returns an io.ReadCloser open for it. // The fifo must not already exist -func New(path string) (func() (io.ReadCloser, error), error) { +func CreateAndRead(path string) (func() (io.ReadCloser, error), error) { l, err := winio.ListenPipe(path, &winio.PipeConfig{ InputBufferSize: PipeBufferSize, OutputBufferSize: PipeBufferSize, @@ -88,7 +88,7 @@ func New(path string) (func() (io.ReadCloser, error), error) { } // OpenWriter opens a fifo that already exists and returns an io.WriteCloser for it -func Open(path string) (io.WriteCloser, error) { +func OpenWriter(path string) (io.WriteCloser, error) { return winio.DialPipe(path, nil) } diff --git a/client/logmon/logmon.go b/client/logmon/logmon.go index a248dcf27..74a13631d 100644 --- a/client/logmon/logmon.go +++ b/client/logmon/logmon.go @@ -199,7 +199,7 @@ func (l *logRotatorWrapper) isRunning() bool { // processOutWriter to attach to the stdout or stderr of a process. func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) { logger.Info("opening fifo", "path", path) - fifoOpenFn, err := fifo.New(path) + fifoOpenFn, err := fifo.CreateAndRead(path) if err != nil { return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err) } diff --git a/client/logmon/logmon_test.go b/client/logmon/logmon_test.go index 1692e7a08..609b3db6e 100644 --- a/client/logmon/logmon_test.go +++ b/client/logmon/logmon_test.go @@ -37,7 +37,7 @@ func TestLogmon_Start_rotate(t *testing.T) { lm := NewLogMon(testlog.HCLogger(t)) require.NoError(lm.Start(cfg)) - stdout, err := fifo.Open(stdoutFifoPath) + stdout, err := fifo.OpenWriter(stdoutFifoPath) require.NoError(err) // Write enough bytes such that the log is rotated @@ -92,9 +92,9 @@ func TestLogmon_Start_restart(t *testing.T) { require.True(ok) require.NoError(lm.Start(cfg)) - stdout, err := fifo.Open(stdoutFifoPath) + stdout, err := fifo.OpenWriter(stdoutFifoPath) require.NoError(err) - stderr, err := fifo.Open(stderrFifoPath) + stderr, err := fifo.OpenWriter(stderrFifoPath) require.NoError(err) // Write a string and assert it was written to the file @@ -122,9 +122,9 @@ func TestLogmon_Start_restart(t *testing.T) { require.NoError(err) }) - stdout, err = fifo.Open(stdoutFifoPath) + stdout, err = fifo.OpenWriter(stdoutFifoPath) require.NoError(err) - stderr, err = fifo.Open(stderrFifoPath) + stderr, err = fifo.OpenWriter(stderrFifoPath) require.NoError(err) _, err = stdout.Write([]byte("te")) @@ -143,9 +143,9 @@ func TestLogmon_Start_restart(t *testing.T) { // Start logmon again and assert that it appended to the file require.NoError(lm.Start(cfg)) - stdout, err = fifo.Open(stdoutFifoPath) + stdout, err = fifo.OpenWriter(stdoutFifoPath) require.NoError(err) - stderr, err = fifo.Open(stderrFifoPath) + stderr, err = fifo.OpenWriter(stderrFifoPath) require.NoError(err) _, err = stdout.Write([]byte("st\n")) diff --git a/drivers/docker/docklog/docker_logger.go b/drivers/docker/docklog/docker_logger.go index 3d1e83dfd..0567a5e5f 100644 --- a/drivers/docker/docklog/docker_logger.go +++ b/drivers/docker/docklog/docker_logger.go @@ -71,14 +71,14 @@ func (d *dockerLogger) Start(opts *StartOpts) error { } if d.stdout == nil { - stdout, err := fifo.Open(opts.Stdout) + stdout, err := fifo.OpenWriter(opts.Stdout) if err != nil { return fmt.Errorf("failed to open fifo for path %s: %v", opts.Stdout, err) } d.stdout = stdout } if d.stderr == nil { - stderr, err := fifo.Open(opts.Stderr) + stderr, err := fifo.OpenWriter(opts.Stderr) if err != nil { return fmt.Errorf("failed to open fifo for path %s: %v", opts.Stdout, err) } diff --git a/drivers/mock/handle.go b/drivers/mock/handle.go index 81d204d9d..7940a6283 100644 --- a/drivers/mock/handle.go +++ b/drivers/mock/handle.go @@ -125,13 +125,13 @@ func (h *taskHandle) run() { } func (h *taskHandle) handleLogging(errCh chan<- error) { - stdout, err := fifo.Open(h.taskConfig.StdoutPath) + stdout, err := fifo.OpenWriter(h.taskConfig.StdoutPath) if err != nil { h.logger.Error("failed to write to stdout", "error", err) errCh <- err return } - stderr, err := fifo.Open(h.taskConfig.StderrPath) + stderr, err := fifo.OpenWriter(h.taskConfig.StderrPath) if err != nil { h.logger.Error("failed to write to stderr", "error", err) errCh <- err diff --git a/drivers/shared/executor/executor.go b/drivers/shared/executor/executor.go index 2bd32dc48..ced883f58 100644 --- a/drivers/shared/executor/executor.go +++ b/drivers/shared/executor/executor.go @@ -148,7 +148,7 @@ func (nopCloser) Close() error { return nil } func (c *ExecCommand) Stdout() (io.WriteCloser, error) { if c.stdout == nil { if c.StdoutPath != "" { - f, err := fifo.Open(c.StdoutPath) + f, err := fifo.OpenWriter(c.StdoutPath) if err != nil { return nil, fmt.Errorf("failed to create stdout: %v", err) } @@ -164,7 +164,7 @@ func (c *ExecCommand) Stdout() (io.WriteCloser, error) { func (c *ExecCommand) Stderr() (io.WriteCloser, error) { if c.stderr == nil { if c.StderrPath != "" { - f, err := fifo.Open(c.StderrPath) + f, err := fifo.OpenWriter(c.StderrPath) if err != nil { return nil, fmt.Errorf("failed to create stderr: %v", err) } From 1450197936ab6e8fe23ca1112c74393329bcabf3 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 1 Apr 2019 16:02:00 -0400 Subject: [PATCH 9/9] comment configureTLogging --- drivers/shared/executor/executor_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/drivers/shared/executor/executor_test.go b/drivers/shared/executor/executor_test.go index 92e00c523..9d3b4c24b 100644 --- a/drivers/shared/executor/executor_test.go +++ b/drivers/shared/executor/executor_test.go @@ -97,6 +97,9 @@ func testExecutorCommand(t *testing.T) *testExecCmd { return testCmd } +// configureTLogging configures a test command executor with buffer as Std{out|err} +// but using os.Pipe so it mimics non-test case where cmd is set with files as Std{out|err} +// the buffers can be used to read command output func configureTLogging(t *testing.T, testcmd *testExecCmd) { var stdout, stderr bytes.Buffer var copyDone sync.WaitGroup