mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
Merge pull request #5499 from hashicorp/r-fifo-plain-file
fifo: Use plain fifo file in Unix
This commit is contained in:
@@ -41,6 +41,7 @@ test_script:
|
||||
- cmd:
|
||||
gotestsum --junitfile results.xml
|
||||
github.com/hashicorp/nomad/drivers/docker
|
||||
github.com/hashicorp/nomad/client/lib/fifo
|
||||
# on_finish:
|
||||
# - ps: |
|
||||
# Push-AppveyorArtifact (Resolve-Path .\results.xml)
|
||||
|
||||
@@ -12,9 +12,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestFIFO tests basic behavior, and that reader closes when writer closes
|
||||
func TestFIFO(t *testing.T) {
|
||||
require := require.New(t)
|
||||
var path string
|
||||
@@ -29,9 +31,11 @@ func TestFIFO(t *testing.T) {
|
||||
path = filepath.Join(dir, "fifo")
|
||||
}
|
||||
|
||||
reader, err := New(path)
|
||||
readerOpenFn, err := CreateAndRead(path)
|
||||
require.NoError(err)
|
||||
|
||||
var reader io.ReadCloser
|
||||
|
||||
toWrite := [][]byte{
|
||||
[]byte("abc\n"),
|
||||
[]byte(""),
|
||||
@@ -45,10 +49,18 @@ func TestFIFO(t *testing.T) {
|
||||
wait.Add(1)
|
||||
go func() {
|
||||
defer wait.Done()
|
||||
io.Copy(&readBuf, reader)
|
||||
|
||||
reader, err = readerOpenFn()
|
||||
assert.NoError(t, err)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = io.Copy(&readBuf, reader)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
writer, err := Open(path)
|
||||
writer, err := OpenWriter(path)
|
||||
require.NoError(err)
|
||||
for _, b := range toWrite {
|
||||
n, err := writer.Write(b)
|
||||
@@ -57,9 +69,9 @@ func TestFIFO(t *testing.T) {
|
||||
}
|
||||
require.NoError(writer.Close())
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
require.NoError(reader.Close())
|
||||
|
||||
wait.Wait()
|
||||
require.NoError(reader.Close())
|
||||
|
||||
expected := "abc\ndef\nnomad\n"
|
||||
require.Equal(expected, readBuf.String())
|
||||
@@ -67,6 +79,7 @@ func TestFIFO(t *testing.T) {
|
||||
require.NoError(Remove(path))
|
||||
}
|
||||
|
||||
// TestWriteClose asserts that when writer closes, subsequent Write() fails
|
||||
func TestWriteClose(t *testing.T) {
|
||||
require := require.New(t)
|
||||
var path string
|
||||
@@ -81,18 +94,27 @@ func TestWriteClose(t *testing.T) {
|
||||
path = filepath.Join(dir, "fifo")
|
||||
}
|
||||
|
||||
reader, err := New(path)
|
||||
readerOpenFn, err := CreateAndRead(path)
|
||||
require.NoError(err)
|
||||
var reader io.ReadCloser
|
||||
|
||||
var readBuf bytes.Buffer
|
||||
var wait sync.WaitGroup
|
||||
wait.Add(1)
|
||||
go func() {
|
||||
defer wait.Done()
|
||||
io.Copy(&readBuf, reader)
|
||||
|
||||
reader, err = readerOpenFn()
|
||||
assert.NoError(t, err)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = io.Copy(&readBuf, reader)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
writer, err := Open(path)
|
||||
writer, err := OpenWriter(path)
|
||||
require.NoError(err)
|
||||
|
||||
var count int
|
||||
|
||||
@@ -3,23 +3,34 @@
|
||||
package fifo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
cfifo "github.com/containerd/fifo"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// New creates a fifo at the given path and returns an io.ReadWriteCloser for it
|
||||
// The fifo must not already exist
|
||||
func New(path string) (io.ReadWriteCloser, error) {
|
||||
return cfifo.OpenFifo(context.Background(), path, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
|
||||
// CreateAndRead creates a fifo at the given path, and returns an open function for reading.
|
||||
// The fifo must not exist already, or that it's already a fifo file
|
||||
//
|
||||
// It returns a reader open function that may block until a writer opens
|
||||
// so it's advised to run it in a goroutine different from reader goroutine
|
||||
func CreateAndRead(path string) (func() (io.ReadCloser, error), error) {
|
||||
// create first
|
||||
if err := mkfifo(path, 0600); err != nil && !os.IsExist(err) {
|
||||
return nil, fmt.Errorf("error creating fifo %v: %v", path, err)
|
||||
}
|
||||
|
||||
openFn := func() (io.ReadCloser, error) {
|
||||
return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe)
|
||||
}
|
||||
|
||||
return openFn, nil
|
||||
}
|
||||
|
||||
// Open opens a fifo that already exists and returns an io.ReadWriteCloser for it
|
||||
func Open(path string) (io.ReadWriteCloser, error) {
|
||||
return cfifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0600)
|
||||
// OpenWriter opens a fifo file for writer, assuming it already exists, returns io.WriteCloser
|
||||
func OpenWriter(path string) (io.WriteCloser, error) {
|
||||
return os.OpenFile(path, unix.O_WRONLY, os.ModeNamedPipe)
|
||||
}
|
||||
|
||||
// Remove a fifo that already exists at a given path
|
||||
@@ -34,3 +45,7 @@ func IsClosedErr(err error) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func mkfifo(path string, mode uint32) (err error) {
|
||||
return unix.Mkfifo(path, mode)
|
||||
}
|
||||
|
||||
@@ -67,9 +67,9 @@ func (f *winFIFO) Close() error {
|
||||
return f.listener.Close()
|
||||
}
|
||||
|
||||
// New creates a fifo at the given path and returns an io.ReadWriteCloser for it. The fifo
|
||||
// must not already exist
|
||||
func New(path string) (io.ReadWriteCloser, error) {
|
||||
// CreateAndRead creates a fifo at the given path and returns an io.ReadCloser open for it.
|
||||
// The fifo must not already exist
|
||||
func CreateAndRead(path string) (func() (io.ReadCloser, error), error) {
|
||||
l, err := winio.ListenPipe(path, &winio.PipeConfig{
|
||||
InputBufferSize: PipeBufferSize,
|
||||
OutputBufferSize: PipeBufferSize,
|
||||
@@ -78,13 +78,17 @@ func New(path string) (io.ReadWriteCloser, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &winFIFO{
|
||||
listener: l,
|
||||
}, nil
|
||||
openFn := func() (io.ReadCloser, error) {
|
||||
return &winFIFO{
|
||||
listener: l,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return openFn, nil
|
||||
}
|
||||
|
||||
// OpenWriter opens a fifo that already exists and returns an io.ReadWriteCloser for it
|
||||
func Open(path string) (io.ReadWriteCloser, error) {
|
||||
// OpenWriter opens a fifo that already exists and returns an io.WriteCloser for it
|
||||
func OpenWriter(path string) (io.WriteCloser, error) {
|
||||
return winio.DialPipe(path, nil)
|
||||
}
|
||||
|
||||
|
||||
@@ -177,10 +177,12 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) {
|
||||
// data will be copied from the reader to the rotator.
|
||||
type logRotatorWrapper struct {
|
||||
fifoPath string
|
||||
processOutReader io.ReadCloser
|
||||
rotatorWriter *logging.FileRotator
|
||||
hasFinishedCopied chan struct{}
|
||||
logger hclog.Logger
|
||||
|
||||
processOutReader io.ReadCloser
|
||||
openCompleted chan struct{}
|
||||
}
|
||||
|
||||
// isRunning will return true until the reader is closed
|
||||
@@ -197,29 +199,40 @@ func (l *logRotatorWrapper) isRunning() bool {
|
||||
// processOutWriter to attach to the stdout or stderr of a process.
|
||||
func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) {
|
||||
logger.Info("opening fifo", "path", path)
|
||||
f, err := fifo.New(path)
|
||||
fifoOpenFn, err := fifo.CreateAndRead(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err)
|
||||
}
|
||||
|
||||
wrap := &logRotatorWrapper{
|
||||
fifoPath: path,
|
||||
processOutReader: f,
|
||||
rotatorWriter: rotator,
|
||||
hasFinishedCopied: make(chan struct{}),
|
||||
openCompleted: make(chan struct{}),
|
||||
logger: logger,
|
||||
}
|
||||
wrap.start()
|
||||
wrap.start(fifoOpenFn)
|
||||
return wrap, nil
|
||||
}
|
||||
|
||||
// start starts a goroutine that copies from the pipe into the rotator. This is
|
||||
// called by the constructor and not the user of the wrapper.
|
||||
func (l *logRotatorWrapper) start() {
|
||||
func (l *logRotatorWrapper) start(readerOpenFn func() (io.ReadCloser, error)) {
|
||||
go func() {
|
||||
defer close(l.hasFinishedCopied)
|
||||
_, err := io.Copy(l.rotatorWriter, l.processOutReader)
|
||||
|
||||
reader, err := readerOpenFn()
|
||||
if err != nil {
|
||||
close(l.openCompleted)
|
||||
l.logger.Warn("failed to open log fifo", "error", err)
|
||||
return
|
||||
}
|
||||
l.processOutReader = reader
|
||||
close(l.openCompleted)
|
||||
|
||||
_, err = io.Copy(l.rotatorWriter, reader)
|
||||
if err != nil {
|
||||
l.logger.Warn("failed to read from log fifo", "error", err)
|
||||
// Close reader to propagate io error across pipe.
|
||||
// Note that this may block until the process exits on
|
||||
// Windows due to
|
||||
@@ -227,7 +240,7 @@ func (l *logRotatorWrapper) start() {
|
||||
// or similar issues. Since this is already running in
|
||||
// a goroutine its safe to block until the process is
|
||||
// force-killed.
|
||||
l.processOutReader.Close()
|
||||
reader.Close()
|
||||
}
|
||||
}()
|
||||
return
|
||||
@@ -249,9 +262,17 @@ func (l *logRotatorWrapper) Close() {
|
||||
closeDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(closeDone)
|
||||
err := l.processOutReader.Close()
|
||||
if err != nil && !strings.Contains(err.Error(), "file already closed") {
|
||||
l.logger.Warn("error closing read-side of process output pipe", "err", err)
|
||||
|
||||
// we must wait until reader is opened before we can close it, and cannot inteerrupt an in-flight open request
|
||||
// The Close function uses processOutputCloseTolerance to protect against long running open called
|
||||
// and then request will be interrupted and file will be closed on process shutdown
|
||||
<-l.openCompleted
|
||||
|
||||
if l.processOutReader != nil {
|
||||
err := l.processOutReader.Close()
|
||||
if err != nil && !strings.Contains(err.Error(), "file already closed") {
|
||||
l.logger.Warn("error closing read-side of process output pipe", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
@@ -37,7 +37,7 @@ func TestLogmon_Start_rotate(t *testing.T) {
|
||||
lm := NewLogMon(testlog.HCLogger(t))
|
||||
require.NoError(lm.Start(cfg))
|
||||
|
||||
stdout, err := fifo.Open(stdoutFifoPath)
|
||||
stdout, err := fifo.OpenWriter(stdoutFifoPath)
|
||||
require.NoError(err)
|
||||
|
||||
// Write enough bytes such that the log is rotated
|
||||
@@ -92,9 +92,9 @@ func TestLogmon_Start_restart(t *testing.T) {
|
||||
require.True(ok)
|
||||
require.NoError(lm.Start(cfg))
|
||||
|
||||
stdout, err := fifo.Open(stdoutFifoPath)
|
||||
stdout, err := fifo.OpenWriter(stdoutFifoPath)
|
||||
require.NoError(err)
|
||||
stderr, err := fifo.Open(stderrFifoPath)
|
||||
stderr, err := fifo.OpenWriter(stderrFifoPath)
|
||||
require.NoError(err)
|
||||
|
||||
// Write a string and assert it was written to the file
|
||||
@@ -122,9 +122,9 @@ func TestLogmon_Start_restart(t *testing.T) {
|
||||
require.NoError(err)
|
||||
})
|
||||
|
||||
stdout, err = fifo.Open(stdoutFifoPath)
|
||||
stdout, err = fifo.OpenWriter(stdoutFifoPath)
|
||||
require.NoError(err)
|
||||
stderr, err = fifo.Open(stderrFifoPath)
|
||||
stderr, err = fifo.OpenWriter(stderrFifoPath)
|
||||
require.NoError(err)
|
||||
|
||||
_, err = stdout.Write([]byte("te"))
|
||||
@@ -143,9 +143,9 @@ func TestLogmon_Start_restart(t *testing.T) {
|
||||
// Start logmon again and assert that it appended to the file
|
||||
require.NoError(lm.Start(cfg))
|
||||
|
||||
stdout, err = fifo.Open(stdoutFifoPath)
|
||||
stdout, err = fifo.OpenWriter(stdoutFifoPath)
|
||||
require.NoError(err)
|
||||
stderr, err = fifo.Open(stderrFifoPath)
|
||||
stderr, err = fifo.OpenWriter(stderrFifoPath)
|
||||
require.NoError(err)
|
||||
|
||||
_, err = stdout.Write([]byte("st\n"))
|
||||
|
||||
@@ -71,14 +71,14 @@ func (d *dockerLogger) Start(opts *StartOpts) error {
|
||||
}
|
||||
|
||||
if d.stdout == nil {
|
||||
stdout, err := fifo.Open(opts.Stdout)
|
||||
stdout, err := fifo.OpenWriter(opts.Stdout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open fifo for path %s: %v", opts.Stdout, err)
|
||||
}
|
||||
d.stdout = stdout
|
||||
}
|
||||
if d.stderr == nil {
|
||||
stderr, err := fifo.Open(opts.Stderr)
|
||||
stderr, err := fifo.OpenWriter(opts.Stderr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open fifo for path %s: %v", opts.Stdout, err)
|
||||
}
|
||||
|
||||
@@ -125,13 +125,13 @@ func (h *taskHandle) run() {
|
||||
}
|
||||
|
||||
func (h *taskHandle) handleLogging(errCh chan<- error) {
|
||||
stdout, err := fifo.Open(h.taskConfig.StdoutPath)
|
||||
stdout, err := fifo.OpenWriter(h.taskConfig.StdoutPath)
|
||||
if err != nil {
|
||||
h.logger.Error("failed to write to stdout", "error", err)
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
stderr, err := fifo.Open(h.taskConfig.StderrPath)
|
||||
stderr, err := fifo.OpenWriter(h.taskConfig.StderrPath)
|
||||
if err != nil {
|
||||
h.logger.Error("failed to write to stderr", "error", err)
|
||||
errCh <- err
|
||||
|
||||
@@ -148,7 +148,7 @@ func (nopCloser) Close() error { return nil }
|
||||
func (c *ExecCommand) Stdout() (io.WriteCloser, error) {
|
||||
if c.stdout == nil {
|
||||
if c.StdoutPath != "" {
|
||||
f, err := fifo.Open(c.StdoutPath)
|
||||
f, err := fifo.OpenWriter(c.StdoutPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create stdout: %v", err)
|
||||
}
|
||||
@@ -164,7 +164,7 @@ func (c *ExecCommand) Stdout() (io.WriteCloser, error) {
|
||||
func (c *ExecCommand) Stderr() (io.WriteCloser, error) {
|
||||
if c.stderr == nil {
|
||||
if c.StderrPath != "" {
|
||||
f, err := fifo.Open(c.StderrPath)
|
||||
f, err := fifo.OpenWriter(c.StderrPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create stderr: %v", err)
|
||||
}
|
||||
@@ -177,13 +177,11 @@ func (c *ExecCommand) Stderr() (io.WriteCloser, error) {
|
||||
}
|
||||
|
||||
func (c *ExecCommand) Close() {
|
||||
stdout, err := c.Stdout()
|
||||
if err == nil {
|
||||
stdout.Close()
|
||||
if c.stdout != nil {
|
||||
c.stdout.Close()
|
||||
}
|
||||
stderr, err := c.Stderr()
|
||||
if err == nil {
|
||||
stderr.Close()
|
||||
if c.stderr != nil {
|
||||
c.stderr.Close()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ var libcontainerFactory = executorFactory{
|
||||
// chroot. Use testExecutorContext if you don't need a chroot.
|
||||
//
|
||||
// The caller is responsible for calling AllocDir.Destroy() to cleanup.
|
||||
func testExecutorCommandWithChroot(t *testing.T) (*ExecCommand, *allocdir.AllocDir) {
|
||||
func testExecutorCommandWithChroot(t *testing.T) *testExecCmd {
|
||||
chrootEnv := map[string]string{
|
||||
"/etc/ld.so.cache": "/etc/ld.so.cache",
|
||||
"/etc/ld.so.conf": "/etc/ld.so.conf",
|
||||
@@ -74,9 +74,13 @@ func testExecutorCommandWithChroot(t *testing.T) (*ExecCommand, *allocdir.AllocD
|
||||
NomadResources: alloc.AllocatedResources.Tasks[task.Name],
|
||||
},
|
||||
}
|
||||
configureTLogging(cmd)
|
||||
|
||||
return cmd, allocDir
|
||||
testCmd := &testExecCmd{
|
||||
command: cmd,
|
||||
allocDir: allocDir,
|
||||
}
|
||||
configureTLogging(t, testCmd)
|
||||
return testCmd
|
||||
}
|
||||
|
||||
func TestExecutor_IsolationAndConstraints(t *testing.T) {
|
||||
@@ -84,7 +88,8 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) {
|
||||
require := require.New(t)
|
||||
testutil.ExecCompatible(t)
|
||||
|
||||
execCmd, allocDir := testExecutorCommandWithChroot(t)
|
||||
testExecCmd := testExecutorCommandWithChroot(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
execCmd.Cmd = "/bin/ls"
|
||||
execCmd.Args = []string{"-F", "/", "/etc/"}
|
||||
defer allocDir.Destroy()
|
||||
@@ -146,8 +151,7 @@ ld.so.cache
|
||||
ld.so.conf
|
||||
ld.so.conf.d/`
|
||||
tu.WaitForResult(func() (bool, error) {
|
||||
outWriter, _ := execCmd.GetWriters()
|
||||
output := outWriter.(*bufferCloser).String()
|
||||
output := testExecCmd.stdout.String()
|
||||
act := strings.TrimSpace(string(output))
|
||||
if act != expected {
|
||||
return false, fmt.Errorf("Command output incorrectly: want %v; got %v", expected, act)
|
||||
@@ -161,7 +165,8 @@ func TestExecutor_ClientCleanup(t *testing.T) {
|
||||
testutil.ExecCompatible(t)
|
||||
require := require.New(t)
|
||||
|
||||
execCmd, allocDir := testExecutorCommandWithChroot(t)
|
||||
testExecCmd := testExecutorCommandWithChroot(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
defer allocDir.Destroy()
|
||||
|
||||
executor := NewExecutorWithIsolation(testlog.HCLogger(t))
|
||||
@@ -193,11 +198,10 @@ func TestExecutor_ClientCleanup(t *testing.T) {
|
||||
require.Fail("timeout waiting for exec to shutdown")
|
||||
}
|
||||
|
||||
outWriter, _ := execCmd.GetWriters()
|
||||
output := outWriter.(*bufferCloser).String()
|
||||
output := testExecCmd.stdout.String()
|
||||
require.NotZero(len(output))
|
||||
time.Sleep(2 * time.Second)
|
||||
output1 := outWriter.(*bufferCloser).String()
|
||||
output1 := testExecCmd.stdout.String()
|
||||
require.Equal(len(output), len(output1))
|
||||
}
|
||||
|
||||
|
||||
@@ -4,11 +4,13 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -42,10 +44,19 @@ func init() {
|
||||
executorFactories["UniversalExecutor"] = universalFactory
|
||||
}
|
||||
|
||||
type testExecCmd struct {
|
||||
command *ExecCommand
|
||||
allocDir *allocdir.AllocDir
|
||||
|
||||
stdout *bytes.Buffer
|
||||
stderr *bytes.Buffer
|
||||
outputCopyDone *sync.WaitGroup
|
||||
}
|
||||
|
||||
// testExecutorContext returns an ExecutorContext and AllocDir.
|
||||
//
|
||||
// The caller is responsible for calling AllocDir.Destroy() to cleanup.
|
||||
func testExecutorCommand(t *testing.T) (*ExecCommand, *allocdir.AllocDir) {
|
||||
func testExecutorCommand(t *testing.T) *testExecCmd {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, task, "global").Build()
|
||||
@@ -78,18 +89,43 @@ func testExecutorCommand(t *testing.T) (*ExecCommand, *allocdir.AllocDir) {
|
||||
},
|
||||
}
|
||||
|
||||
configureTLogging(cmd)
|
||||
return cmd, allocDir
|
||||
testCmd := &testExecCmd{
|
||||
command: cmd,
|
||||
allocDir: allocDir,
|
||||
}
|
||||
configureTLogging(t, testCmd)
|
||||
return testCmd
|
||||
}
|
||||
|
||||
type bufferCloser struct {
|
||||
bytes.Buffer
|
||||
}
|
||||
// configureTLogging configures a test command executor with buffer as Std{out|err}
|
||||
// but using os.Pipe so it mimics non-test case where cmd is set with files as Std{out|err}
|
||||
// the buffers can be used to read command output
|
||||
func configureTLogging(t *testing.T, testcmd *testExecCmd) {
|
||||
var stdout, stderr bytes.Buffer
|
||||
var copyDone sync.WaitGroup
|
||||
|
||||
func (_ *bufferCloser) Close() error { return nil }
|
||||
stdoutPr, stdoutPw, err := os.Pipe()
|
||||
require.NoError(t, err)
|
||||
|
||||
func configureTLogging(cmd *ExecCommand) (stdout bufferCloser, stderr bufferCloser) {
|
||||
cmd.SetWriters(&stdout, &stderr)
|
||||
stderrPr, stderrPw, err := os.Pipe()
|
||||
require.NoError(t, err)
|
||||
|
||||
copyDone.Add(2)
|
||||
go func() {
|
||||
defer copyDone.Done()
|
||||
io.Copy(&stdout, stdoutPr)
|
||||
}()
|
||||
go func() {
|
||||
defer copyDone.Done()
|
||||
io.Copy(&stderr, stderrPr)
|
||||
}()
|
||||
|
||||
testcmd.stdout = &stdout
|
||||
testcmd.stderr = &stderr
|
||||
testcmd.outputCopyDone = ©Done
|
||||
|
||||
testcmd.command.stdout = stdoutPw
|
||||
testcmd.command.stderr = stderrPw
|
||||
return
|
||||
}
|
||||
|
||||
@@ -99,7 +135,8 @@ func TestExecutor_Start_Invalid(pt *testing.T) {
|
||||
for name, factory := range executorFactories {
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
testExecCmd := testExecutorCommand(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
execCmd.Cmd = invalid
|
||||
execCmd.Args = []string{"1"}
|
||||
factory.configureExecCmd(t, execCmd)
|
||||
@@ -118,7 +155,8 @@ func TestExecutor_Start_Wait_Failure_Code(pt *testing.T) {
|
||||
for name, factory := range executorFactories {
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
testExecCmd := testExecutorCommand(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
execCmd.Cmd = "/bin/date"
|
||||
execCmd.Args = []string{"fail"}
|
||||
factory.configureExecCmd(t, execCmd)
|
||||
@@ -141,7 +179,8 @@ func TestExecutor_Start_Wait(pt *testing.T) {
|
||||
for name, factory := range executorFactories {
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
testExecCmd := testExecutorCommand(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
execCmd.Cmd = "/bin/echo"
|
||||
execCmd.Args = []string{"hello world"}
|
||||
factory.configureExecCmd(t, execCmd)
|
||||
@@ -160,9 +199,7 @@ func TestExecutor_Start_Wait(pt *testing.T) {
|
||||
|
||||
expected := "hello world"
|
||||
tu.WaitForResult(func() (bool, error) {
|
||||
outWriter, _ := execCmd.GetWriters()
|
||||
output := outWriter.(*bufferCloser).String()
|
||||
act := strings.TrimSpace(string(output))
|
||||
act := strings.TrimSpace(string(testExecCmd.stdout.String()))
|
||||
if expected != act {
|
||||
return false, fmt.Errorf("expected: '%s' actual: '%s'", expected, act)
|
||||
}
|
||||
@@ -174,12 +211,52 @@ func TestExecutor_Start_Wait(pt *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecutor_Start_Wait_Children(pt *testing.T) {
|
||||
pt.Parallel()
|
||||
for name, factory := range executorFactories {
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
testExecCmd := testExecutorCommand(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
execCmd.Cmd = "/bin/sh"
|
||||
execCmd.Args = []string{"-c", "(sleep 30 > /dev/null & ) ; exec sleep 1"}
|
||||
factory.configureExecCmd(t, execCmd)
|
||||
|
||||
defer allocDir.Destroy()
|
||||
executor := factory.new(testlog.HCLogger(t))
|
||||
defer executor.Shutdown("SIGKILL", 0)
|
||||
|
||||
ps, err := executor.Launch(execCmd)
|
||||
require.NoError(err)
|
||||
require.NotZero(ps.Pid)
|
||||
|
||||
ch := make(chan error)
|
||||
|
||||
go func() {
|
||||
ps, err = executor.Wait(context.Background())
|
||||
t.Logf("Processe completed with %#v error: %#v", ps, err)
|
||||
ch <- err
|
||||
}()
|
||||
|
||||
timeout := 7 * time.Second
|
||||
select {
|
||||
case <-ch:
|
||||
require.NoError(err)
|
||||
//good
|
||||
case <-time.After(timeout):
|
||||
require.Fail(fmt.Sprintf("process is running after timeout: %v", timeout))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecutor_WaitExitSignal(pt *testing.T) {
|
||||
pt.Parallel()
|
||||
for name, factory := range executorFactories {
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
testExecCmd := testExecutorCommand(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
execCmd.Cmd = "/bin/sleep"
|
||||
execCmd.Args = []string{"10000"}
|
||||
execCmd.ResourceLimits = true
|
||||
@@ -233,7 +310,8 @@ func TestExecutor_Start_Kill(pt *testing.T) {
|
||||
for name, factory := range executorFactories {
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
testExecCmd := testExecutorCommand(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
execCmd.Cmd = "/bin/sleep"
|
||||
execCmd.Args = []string{"10"}
|
||||
factory.configureExecCmd(t, execCmd)
|
||||
@@ -249,8 +327,7 @@ func TestExecutor_Start_Kill(pt *testing.T) {
|
||||
require.NoError(executor.Shutdown("SIGINT", 100*time.Millisecond))
|
||||
|
||||
time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second)
|
||||
outWriter, _ := execCmd.GetWriters()
|
||||
output := outWriter.(*bufferCloser).String()
|
||||
output := testExecCmd.stdout.String()
|
||||
expected := ""
|
||||
act := strings.TrimSpace(string(output))
|
||||
if act != expected {
|
||||
@@ -263,7 +340,8 @@ func TestExecutor_Start_Kill(pt *testing.T) {
|
||||
func TestExecutor_Shutdown_Exit(t *testing.T) {
|
||||
require := require.New(t)
|
||||
t.Parallel()
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
testExecCmd := testExecutorCommand(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
execCmd.Cmd = "/bin/sleep"
|
||||
execCmd.Args = []string{"100"}
|
||||
cfg := &ExecutorConfig{
|
||||
@@ -400,7 +478,8 @@ func TestExecutor_Start_Kill_Immediately_NoGrace(pt *testing.T) {
|
||||
for name, factory := range executorFactories {
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
testExecCmd := testExecutorCommand(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
execCmd.Cmd = "/bin/sleep"
|
||||
execCmd.Args = []string{"100"}
|
||||
factory.configureExecCmd(t, execCmd)
|
||||
@@ -435,7 +514,8 @@ func TestExecutor_Start_Kill_Immediately_WithGrace(pt *testing.T) {
|
||||
for name, factory := range executorFactories {
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
testExecCmd := testExecutorCommand(t)
|
||||
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
|
||||
execCmd.Cmd = "/bin/sleep"
|
||||
execCmd.Args = []string{"100"}
|
||||
factory.configureExecCmd(t, execCmd)
|
||||
|
||||
Reference in New Issue
Block a user