mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 09:25:46 +03:00
open fifo on background goroutine
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"io"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
@@ -56,11 +57,12 @@ func NewDockerLogger(logger hclog.Logger) DockerLogger {
|
||||
type dockerLogger struct {
|
||||
logger hclog.Logger
|
||||
|
||||
stdout io.WriteCloser
|
||||
stderr io.WriteCloser
|
||||
cancelCtx context.CancelFunc
|
||||
stdout io.WriteCloser
|
||||
stderr io.WriteCloser
|
||||
stdLock sync.Mutex
|
||||
|
||||
doneCh chan interface{}
|
||||
cancelCtx context.CancelFunc
|
||||
doneCh chan interface{}
|
||||
}
|
||||
|
||||
// Start log monitoring
|
||||
@@ -70,26 +72,18 @@ func (d *dockerLogger) Start(opts *StartOpts) error {
|
||||
return fmt.Errorf("failed to open docker client: %v", err)
|
||||
}
|
||||
|
||||
if d.stdout == nil {
|
||||
stdout, err := fifo.OpenWriter(opts.Stdout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open fifo for path %s: %v", opts.Stdout, err)
|
||||
}
|
||||
d.stdout = stdout
|
||||
}
|
||||
if d.stderr == nil {
|
||||
stderr, err := fifo.OpenWriter(opts.Stderr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open fifo for path %s: %v", opts.Stdout, err)
|
||||
}
|
||||
d.stderr = stderr
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
d.cancelCtx = cancel
|
||||
|
||||
go func() {
|
||||
defer close(d.doneCh)
|
||||
|
||||
stdout, stderr, err := d.openStreams(opts)
|
||||
if err != nil {
|
||||
d.logger.Error("log streaming ended with terminal error", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
sinceTime := time.Unix(opts.StartTime, 0)
|
||||
backoff := 0.0
|
||||
|
||||
@@ -97,8 +91,8 @@ func (d *dockerLogger) Start(opts *StartOpts) error {
|
||||
logOpts := docker.LogsOptions{
|
||||
Context: ctx,
|
||||
Container: opts.ContainerID,
|
||||
OutputStream: d.stdout,
|
||||
ErrorStream: d.stderr,
|
||||
OutputStream: stdout,
|
||||
ErrorStream: stderr,
|
||||
Since: sinceTime.Unix(),
|
||||
Follow: true,
|
||||
Stdout: true,
|
||||
@@ -138,16 +132,51 @@ func (d *dockerLogger) Start(opts *StartOpts) error {
|
||||
|
||||
}
|
||||
|
||||
func (d *dockerLogger) openStreams(opts *StartOpts) (stdout, stderr io.WriteCloser, err error) {
|
||||
d.stdLock.Lock()
|
||||
stdoutF, stderrF := d.stdout, d.stderr
|
||||
d.stdLock.Unlock()
|
||||
|
||||
if stdoutF != nil && stderrF != nil {
|
||||
return stdoutF, stderrF, nil
|
||||
}
|
||||
|
||||
if stdoutF == nil {
|
||||
stdoutF, err = fifo.OpenWriter(opts.Stdout)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if stderrF == nil {
|
||||
stderrF, err = fifo.OpenWriter(opts.Stderr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
d.stdLock.Lock()
|
||||
d.stdout, d.stderr = stdoutF, stderrF
|
||||
d.stdLock.Unlock()
|
||||
|
||||
return stdoutF, stderrF, nil
|
||||
}
|
||||
|
||||
// Stop log monitoring
|
||||
func (d *dockerLogger) Stop() error {
|
||||
if d.cancelCtx != nil {
|
||||
d.cancelCtx()
|
||||
}
|
||||
if d.stdout != nil {
|
||||
d.stdout.Close()
|
||||
|
||||
d.stdLock.Lock()
|
||||
stdout, stderr := d.stdout, d.stderr
|
||||
d.stdLock.Unlock()
|
||||
|
||||
if stdout != nil {
|
||||
stdout.Close()
|
||||
}
|
||||
if d.stderr != nil {
|
||||
d.stderr.Close()
|
||||
if stderr != nil {
|
||||
stderr.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user