mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
* drivers/docker: refactor use of clients in docker driver This PR refactors how we manage the two underlying clients used by the docker driver for communicating with the docker daemon. We keep two clients - one with a hard-coded timeout that applies to all operations no matter what, intended for use with short lived / async calls to docker. The other has no timeout and is the responsibility of the caller to set a context that will ensure the call eventually terminates. The use of these two clients has been confusing and mistakes were made in a number of places where calls were making use of the wrong client. This PR makes it so that a user must explicitly call a function to get the client that makes sense for that use case. Fixes #17023 * cr: followup items
194 lines
5.0 KiB
Go
194 lines
5.0 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package docker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
docker "github.com/fsouza/go-dockerclient"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/drivers/docker/util"
|
|
"github.com/hashicorp/nomad/helper"
|
|
nstructs "github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
const (
|
|
// statsCollectorBackoffBaseline is the baseline time for exponential
|
|
// backoff while calling the docker stats api.
|
|
statsCollectorBackoffBaseline = 5 * time.Second
|
|
|
|
// statsCollectorBackoffLimit is the limit of the exponential backoff for
|
|
// calling the docker stats api.
|
|
statsCollectorBackoffLimit = 2 * time.Minute
|
|
)
|
|
|
|
// usageSender wraps a TaskResourceUsage chan such that it supports concurrent
|
|
// sending and closing, and backpressures by dropping events if necessary.
|
|
type usageSender struct {
|
|
closed bool
|
|
destCh chan<- *cstructs.TaskResourceUsage
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// newStatsChanPipe returns a chan wrapped in a struct that supports concurrent
|
|
// sending and closing, and the receiver end of the chan.
|
|
func newStatsChanPipe() (*usageSender, <-chan *cstructs.TaskResourceUsage) {
|
|
destCh := make(chan *cstructs.TaskResourceUsage, 1)
|
|
return &usageSender{
|
|
destCh: destCh,
|
|
}, destCh
|
|
|
|
}
|
|
|
|
// send resource usage to the receiver unless the chan is already full or
|
|
// closed.
|
|
func (u *usageSender) send(tru *cstructs.TaskResourceUsage) {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
|
|
if u.closed {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case u.destCh <- tru:
|
|
default:
|
|
// Backpressure caused missed interval
|
|
}
|
|
}
|
|
|
|
// close resource usage. Any further sends will be dropped.
|
|
func (u *usageSender) close() {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
|
|
if u.closed {
|
|
// already closed
|
|
return
|
|
}
|
|
|
|
u.closed = true
|
|
close(u.destCh)
|
|
}
|
|
|
|
// Stats starts collecting stats from the docker daemon and sends them on the
|
|
// returned channel.
|
|
func (h *taskHandle) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) {
|
|
select {
|
|
case <-h.doneCh:
|
|
return nil, nstructs.NewRecoverableError(fmt.Errorf("container stopped"), false)
|
|
default:
|
|
}
|
|
|
|
destCh, recvCh := newStatsChanPipe()
|
|
go h.collectStats(ctx, destCh, interval)
|
|
return recvCh, nil
|
|
}
|
|
|
|
// collectStats starts collecting resource usage stats of a docker container
|
|
func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, interval time.Duration) {
|
|
defer destCh.close()
|
|
|
|
// backoff and retry used if the docker stats API returns an error
|
|
var backoff time.Duration = 0
|
|
var retry int
|
|
|
|
// create an interval timer
|
|
timer, stop := helper.NewSafeTimer(backoff)
|
|
defer stop()
|
|
|
|
// loops until doneCh is closed
|
|
for {
|
|
timer.Reset(backoff)
|
|
|
|
if backoff > 0 {
|
|
select {
|
|
case <-timer.C:
|
|
case <-ctx.Done():
|
|
return
|
|
case <-h.doneCh:
|
|
return
|
|
}
|
|
}
|
|
|
|
// make a channel for docker stats structs and start a collector to
|
|
// receive stats from docker and emit nomad stats
|
|
// statsCh will always be closed by docker client.
|
|
statsCh := make(chan *docker.Stats)
|
|
go dockerStatsCollector(destCh, statsCh, interval)
|
|
|
|
statsOpts := docker.StatsOptions{
|
|
ID: h.containerID,
|
|
Context: ctx,
|
|
Done: h.doneCh,
|
|
Stats: statsCh,
|
|
Stream: true,
|
|
}
|
|
|
|
// Stats blocks until an error has occurred, or doneCh has been closed
|
|
if err := h.dockerClient.Stats(statsOpts); err != nil && err != io.ErrClosedPipe {
|
|
// An error occurred during stats collection, retry with backoff
|
|
h.logger.Debug("error collecting stats from container", "error", err)
|
|
|
|
// Calculate the new backoff
|
|
backoff = (1 << (2 * uint64(retry))) * statsCollectorBackoffBaseline
|
|
if backoff > statsCollectorBackoffLimit {
|
|
backoff = statsCollectorBackoffLimit
|
|
}
|
|
// Increment retry counter
|
|
retry++
|
|
continue
|
|
}
|
|
// Stats finished either because context was canceled, doneCh was closed
|
|
// or the container stopped. Stop stats collections.
|
|
return
|
|
}
|
|
}
|
|
|
|
func dockerStatsCollector(destCh *usageSender, statsCh <-chan *docker.Stats, interval time.Duration) {
|
|
var resourceUsage *cstructs.TaskResourceUsage
|
|
|
|
// hasSentInitialStats is used so as to emit the first stats received from
|
|
// the docker daemon
|
|
var hasSentInitialStats bool
|
|
|
|
// timer is used to send nomad status at the specified interval
|
|
timer := time.NewTimer(interval)
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
// it is possible for the timer to go off before the first stats
|
|
// has been emitted from docker
|
|
if resourceUsage == nil {
|
|
continue
|
|
}
|
|
|
|
// sending to destCh could block, drop this interval if it does
|
|
destCh.send(resourceUsage)
|
|
|
|
timer.Reset(interval)
|
|
|
|
case s, ok := <-statsCh:
|
|
// if statsCh is closed stop collection
|
|
if !ok {
|
|
return
|
|
}
|
|
// s should always be set, but check and skip just in case
|
|
if s != nil {
|
|
resourceUsage = util.DockerStatsToTaskResourceUsage(s)
|
|
// send stats next interation if this is the first time received
|
|
// from docker
|
|
if !hasSentInitialStats {
|
|
timer.Reset(0)
|
|
hasSentInitialStats = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|