diff --git a/drivers/docker/docklog/docker_logger.go b/drivers/docker/docklog/docker_logger.go index f6cdd03be..3d1e83dfd 100644 --- a/drivers/docker/docklog/docker_logger.go +++ b/drivers/docker/docklog/docker_logger.go @@ -3,6 +3,8 @@ package docklog import ( "fmt" "io" + "math/rand" + "strings" "time" docker "github.com/fsouza/go-dockerclient" @@ -44,7 +46,10 @@ type StartOpts struct { // NewDockerLogger returns an implementation of the DockerLogger interface func NewDockerLogger(logger hclog.Logger) DockerLogger { - return &dockerLogger{logger: logger} + return &dockerLogger{ + logger: logger, + doneCh: make(chan interface{}), + } } // dockerLogger implements the DockerLogger interface @@ -54,6 +59,8 @@ type dockerLogger struct { stdout io.WriteCloser stderr io.WriteCloser cancelCtx context.CancelFunc + + doneCh chan interface{} } // Start log monitoring @@ -81,7 +88,10 @@ func (d *dockerLogger) Start(opts *StartOpts) error { d.cancelCtx = cancel go func() { + defer close(d.doneCh) + sinceTime := time.Unix(opts.StartTime, 0) + backoff := 0.0 for { logOpts := docker.LogsOptions{ @@ -99,8 +109,16 @@ func (d *dockerLogger) Start(opts *StartOpts) error { if ctx.Err() != nil { // If context is terminated then we can safely break the loop return + } else if err == nil { + backoff = 0.0 + } else if isLoggingTerminalError(err) { + d.logger.Error("log streaming ended with terminal error", "error", err) + return } else if err != nil { - d.logger.Error("Log streaming ended with error", "error", err) + backoff = nextBackoff(backoff) + d.logger.Error("log streaming ended with error", "error", err, "retry_in", backoff) + + time.Sleep(time.Duration(backoff) * time.Second) } sinceTime = time.Now() @@ -168,3 +186,44 @@ func (d *dockerLogger) getDockerClient(opts *StartOpts) (*docker.Client, error) return newClient, merr.ErrorOrNil() } + +func isLoggingTerminalError(err error) bool { + if err == nil { + return false + } + + if apiErr, ok := err.(*docker.Error); ok { + switch apiErr.Status { + case 501: + return true + } + } + + terminals := []string{ + "configured logging driver does not support reading", + } + + for _, c := range terminals { + if strings.Contains(err.Error(), c) { + return true + } + } + + return false +} + +// nextBackoff returns the next backoff period in seconds given current backoff +func nextBackoff(backoff float64) float64 { + if backoff < 0.5 { + backoff = 0.5 + } + + backoff = backoff * 1.15 * (1.0 + rand.Float64()) + if backoff > 120 { + backoff = 120 + } else if backoff < 0.5 { + backoff = 0.5 + } + + return backoff +} diff --git a/drivers/docker/docklog/docker_logger_test.go b/drivers/docker/docklog/docker_logger_test.go index 22c3dea8c..0321e443e 100644 --- a/drivers/docker/docklog/docker_logger_test.go +++ b/drivers/docker/docklog/docker_logger_test.go @@ -2,9 +2,11 @@ package docklog import ( "bytes" + "errors" "fmt" "runtime" "testing" + "time" docker "github.com/fsouza/go-dockerclient" ctu "github.com/hashicorp/nomad/client/testutil" @@ -24,7 +26,7 @@ func testContainerDetails() (image string, imageName string, imageTag string) { return "busybox:1", "busybox", "1" } -func TestDockerLogger(t *testing.T) { +func TestDockerLogger_Success(t *testing.T) { ctu.DockerCompatible(t) t.Parallel() @@ -43,9 +45,7 @@ func TestDockerLogger(t *testing.T) { Repository: containerImageName, Tag: containerImageTag, }, docker.AuthConfiguration{}) - if err != nil { - t.Fatalf("failed to pull image: %v", err) - } + require.NoError(err, "failed to pull image") } containerConf := docker.CreateContainerOptions{ @@ -111,7 +111,7 @@ func echoToContainer(t *testing.T, client *docker.Client, id string, line string op := docker.CreateExecOptions{ Container: id, Cmd: []string{ - "ash", "-c", + "/bin/sh", "-c", fmt.Sprintf("echo %s >>~/docklog", line), }, } @@ -121,6 +121,89 @@ func echoToContainer(t *testing.T, client *docker.Client, id string, line string require.NoError(t, client.StartExec(exec.ID, docker.StartExecOptions{Detach: true})) } +func TestDockerLogger_LoggingNotSupported(t *testing.T) { + ctu.DockerCompatible(t) + + t.Parallel() + require := require.New(t) + + containerImage, containerImageName, containerImageTag := testContainerDetails() + + client, err := docker.NewClientFromEnv() + if err != nil { + t.Skip("docker unavailable:", err) + } + + if img, err := client.InspectImage(containerImage); err != nil || img == nil { + t.Log("image not found locally, downloading...") + err = client.PullImage(docker.PullImageOptions{ + Repository: containerImageName, + Tag: containerImageTag, + }, docker.AuthConfiguration{}) + require.NoError(err, "failed to pull image") + } + + containerConf := docker.CreateContainerOptions{ + Config: &docker.Config{ + Cmd: []string{ + "sh", "-c", "touch ~/docklog; tail -f ~/docklog", + }, + Image: containerImage, + }, + HostConfig: &docker.HostConfig{ + LogConfig: docker.LogConfig{ + Type: "gelf", + Config: map[string]string{ + "gelf-address": "udp://localhost:12201", + "mode": "non-blocking", + "max-buffer-size": "4m", + }, + }, + }, + Context: context.Background(), + } + + container, err := client.CreateContainer(containerConf) + require.NoError(err) + + defer client.RemoveContainer(docker.RemoveContainerOptions{ + ID: container.ID, + Force: true, + }) + + err = client.StartContainer(container.ID, nil) + require.NoError(err) + + testutil.WaitForResult(func() (bool, error) { + container, err = client.InspectContainer(container.ID) + if err != nil { + return false, err + } + if !container.State.Running { + return false, fmt.Errorf("container not running") + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + stdout := &noopCloser{bytes.NewBuffer(nil)} + stderr := &noopCloser{bytes.NewBuffer(nil)} + + dl := NewDockerLogger(testlog.HCLogger(t)).(*dockerLogger) + dl.stdout = stdout + dl.stderr = stderr + require.NoError(dl.Start(&StartOpts{ + ContainerID: container.ID, + })) + + select { + case <-dl.doneCh: + case <-time.After(10 * time.Second): + require.Fail("timedout while waiting for docker_logging to terminate") + } +} + type noopCloser struct { *bytes.Buffer } @@ -128,3 +211,55 @@ type noopCloser struct { func (*noopCloser) Close() error { return nil } + +func TestNextBackoff(t *testing.T) { + cases := []struct { + currentBackoff float64 + min float64 + max float64 + }{ + {0.0, 0.5, 1.15}, + {5.0, 5.0, 16}, + {120, 120, 120}, + } + + for _, c := range cases { + t.Run(fmt.Sprintf("case %v", c.currentBackoff), func(t *testing.T) { + next := nextBackoff(c.currentBackoff) + t.Logf("computed backoff(%v) = %v", c.currentBackoff, next) + + require.True(t, next >= c.min, "next backoff is smaller than expected") + require.True(t, next <= c.max, "next backoff is larger than expected") + }) + } +} + +func TestIsLoggingTerminalError(t *testing.T) { + terminalErrs := []error{ + errors.New("docker returned: configured logging driver does not support reading"), + &docker.Error{ + Status: 501, + Message: "configured logging driver does not support reading", + }, + &docker.Error{ + Status: 501, + Message: "not implemented", + }, + } + + for _, err := range terminalErrs { + require.Truef(t, isLoggingTerminalError(err), "error should be terminal: %v", err) + } + + nonTerminalErrs := []error{ + errors.New("not expected"), + &docker.Error{ + Status: 503, + Message: "Service Unavailable", + }, + } + + for _, err := range nonTerminalErrs { + require.Falsef(t, isLoggingTerminalError(err), "error should be terminal: %v", err) + } +}