refactor reconciler code and address comments

This commit is contained in:
Mahmood Ali
2019-10-17 08:37:18 -04:00
parent c8ba2d1b86
commit 24f6c2bf07
3 changed files with 206 additions and 122 deletions

View File

@@ -143,10 +143,14 @@ var (
hclspec.NewAttr("period", "string", false),
hclspec.NewLiteral(`"5m"`),
),
"creation_timeout": hclspec.NewDefault(
hclspec.NewAttr("creation_timeout", "string", false),
"creation_grace": hclspec.NewDefault(
hclspec.NewAttr("creation_grace", "string", false),
hclspec.NewLiteral(`"5m"`),
),
"dry_run": hclspec.NewDefault(
hclspec.NewAttr("dry_run", "bool", false),
hclspec.NewLiteral(`false`),
),
})
// configSpec is the hcl specification returned by the ConfigSchema RPC
@@ -510,14 +514,26 @@ type DockerVolumeDriverConfig struct {
Options hclutils.MapStrStr `codec:"options"`
}
// ContainerGCConfig controls the behavior of the GC reconciler to detects
// dangling nomad containers that aren't tracked due to docker/nomad bugs
type ContainerGCConfig struct {
// Enabled controls whether container reconciler is enabled
Enabled bool `codec:"enabled"`
// DryRun indicates that reconciler should log unexpectedly running containers
// if found without actually killing them
DryRun bool `codec:"dry_run"`
// PeriodStr controls the frequency of scanning containers
PeriodStr string `codec:"period"`
period time.Duration `codec:"-"`
CreationTimeoutStr string `codec:"creation_timeout"`
creationTimeout time.Duration `codec:"-"`
// CreationGraceStr is the duration allowed for a newly created container
// to live without being registered as a running task in nomad.
// A container is treated as leaked if it lived more than grace duration
// and haven't been registered in tasks.
CreationGraceStr string `codec:"creation_grace"`
CreationGrace time.Duration `codec:"-"`
}
type DriverConfig struct {
@@ -565,6 +581,8 @@ func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
}
const danglingContainersCreationGraceMinimum = 1 * time.Minute
func (d *Driver) SetConfig(c *base.Config) error {
var config DriverConfig
if len(c.PluginConfig) != 0 {
@@ -590,12 +608,15 @@ func (d *Driver) SetConfig(c *base.Config) error {
d.config.GC.DanglingContainers.period = dur
}
if len(d.config.GC.DanglingContainers.CreationTimeoutStr) > 0 {
dur, err := time.ParseDuration(d.config.GC.DanglingContainers.CreationTimeoutStr)
if len(d.config.GC.DanglingContainers.CreationGraceStr) > 0 {
dur, err := time.ParseDuration(d.config.GC.DanglingContainers.CreationGraceStr)
if err != nil {
return fmt.Errorf("failed to parse 'container_delay' duration: %v", err)
return fmt.Errorf("failed to parse 'creation_grace' duration: %v", err)
}
d.config.GC.DanglingContainers.creationTimeout = dur
if dur < danglingContainersCreationGraceMinimum {
return fmt.Errorf("creation_grace is less than minimum, %v", danglingContainersCreationGraceMinimum)
}
d.config.GC.DanglingContainers.CreationGrace = dur
}
if c.AgentConfig != nil {
@@ -615,7 +636,8 @@ func (d *Driver) SetConfig(c *base.Config) error {
d.coordinator = newDockerCoordinator(coordinatorConfig)
go d.removeDanglingContainersGoroutine()
reconciler := newReconciler(d)
reconciler.Start()
return nil
}

View File

@@ -4,21 +4,55 @@ import (
"context"
"fmt"
"regexp"
"strings"
"time"
docker "github.com/fsouza/go-dockerclient"
hclog "github.com/hashicorp/go-hclog"
)
func (d *Driver) removeDanglingContainersGoroutine() {
if !d.config.GC.DanglingContainers.Enabled {
d.logger.Debug("skipping dangling containers handling; is disabled")
// containerReconciler detects and kills unexpectedly running containers.
//
// Due to Docker architecture and network based communication, it is
// possible for Docker to start a container successfully, but have the
// creation API call fail with a network error. containerReconciler
// scans for these untracked containers and kill them.
type containerReconciler struct {
ctx context.Context
config *ContainerGCConfig
client *docker.Client
logger hclog.Logger
isDriverHealthy func() bool
trackedContainers func() map[string]bool
isNomadContainer func(c docker.APIContainers) bool
}
func newReconciler(d *Driver) *containerReconciler {
return &containerReconciler{
ctx: d.ctx,
config: &d.config.GC.DanglingContainers,
client: client,
logger: d.logger,
isDriverHealthy: func() bool { return d.previouslyDetected() && d.fingerprintSuccessful() },
trackedContainers: d.trackedContainers,
isNomadContainer: isNomadContainer,
}
}
func (r *containerReconciler) Start() {
if !r.config.Enabled {
r.logger.Debug("skipping dangling containers handling; is disabled")
return
}
period := d.config.GC.DanglingContainers.period
go r.removeDanglingContainersGoroutine()
}
succeeded := true
func (r *containerReconciler) removeDanglingContainersGoroutine() {
period := r.config.period
lastIterSucceeded := true
// ensure that we wait for at least a period or creation timeout
// for first container GC iteration
@@ -26,44 +60,55 @@ func (d *Driver) removeDanglingContainersGoroutine() {
// before a driver may kill containers launched by an earlier nomad
// process.
initialDelay := period
if d.config.GC.DanglingContainers.creationTimeout > initialDelay {
initialDelay = d.config.GC.DanglingContainers.creationTimeout
if r.config.CreationGrace > initialDelay {
initialDelay = r.config.CreationGrace
}
timer := time.NewTimer(initialDelay)
for {
select {
case <-timer.C:
if d.previouslyDetected() && d.fingerprintSuccessful() {
err := d.removeDanglingContainersIteration()
if err != nil && succeeded {
d.logger.Warn("failed to remove dangling containers", "error", err)
if r.isDriverHealthy() {
err := r.removeDanglingContainersIteration()
if err != nil && lastIterSucceeded {
r.logger.Warn("failed to remove dangling containers", "error", err)
}
succeeded = (err == nil)
lastIterSucceeded = (err == nil)
}
timer.Reset(period)
case <-d.ctx.Done():
case <-r.ctx.Done():
return
}
}
}
func (d *Driver) removeDanglingContainersIteration() error {
tracked := d.trackedContainers()
untracked, err := d.untrackedContainers(tracked, d.config.GC.DanglingContainers.creationTimeout)
func (r *containerReconciler) removeDanglingContainersIteration() error {
cutoff := time.Now().Add(-r.config.CreationGrace)
tracked := r.trackedContainers()
untracked, err := r.untrackedContainers(tracked, cutoff)
if err != nil {
return fmt.Errorf("failed to find untracked containers: %v", err)
}
if len(untracked) == 0 {
return nil
}
if r.config.DryRun {
r.logger.Info("detected untracked containers", "container_ids", untracked)
return nil
}
for _, id := range untracked {
d.logger.Info("removing untracked container", "container_id", id)
err := client.RemoveContainer(docker.RemoveContainerOptions{
ID: id,
Force: true,
})
if err != nil {
d.logger.Warn("failed to remove untracked container", "container_id", id, "error", err)
r.logger.Warn("failed to remove untracked container", "container_id", id, "error", err)
} else {
r.logger.Info("removed untracked container", "container_id", id)
}
}
@@ -72,15 +117,17 @@ func (d *Driver) removeDanglingContainersIteration() error {
// untrackedContainers returns the ids of containers that suspected
// to have been started by Nomad but aren't tracked by this driver
func (d *Driver) untrackedContainers(tracked map[string]bool, creationTimeout time.Duration) ([]string, error) {
func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutoffTime time.Time) ([]string, error) {
result := []string{}
cc, err := client.ListContainers(docker.ListContainersOptions{})
cc, err := client.ListContainers(docker.ListContainersOptions{
All: false, // only reconcile running containers
})
if err != nil {
return nil, fmt.Errorf("failed to list containers: %v", err)
}
cutoff := time.Now().Add(-creationTimeout).Unix()
cutoff := cutoffTime.Unix()
for _, c := range cc {
if tracked[c.ID] {
@@ -91,7 +138,7 @@ func (d *Driver) untrackedContainers(tracked map[string]bool, creationTimeout ti
continue
}
if !d.isNomadContainer(c) {
if !r.isNomadContainer(c) {
continue
}
@@ -101,13 +148,13 @@ func (d *Driver) untrackedContainers(tracked map[string]bool, creationTimeout ti
return result, nil
}
func (d *Driver) isNomadContainer(c docker.APIContainers) bool {
func isNomadContainer(c docker.APIContainers) bool {
if _, ok := c.Labels["com.hashicorp.nomad.alloc_id"]; ok {
return true
}
// pre-0.10 containers aren't tagged or labeled in any way,
// so use cheap heauristic based on mount paths
// so use cheap heuristic based on mount paths
// before inspecting container details
if !hasMount(c, "/alloc") ||
!hasMount(c, "/local") ||
@@ -116,18 +163,7 @@ func (d *Driver) isNomadContainer(c docker.APIContainers) bool {
return false
}
// double check before killing process
ctx, cancel := context.WithTimeout(d.ctx, 20*time.Second)
defer cancel()
ci, err := client.InspectContainerWithContext(c.ID, ctx)
if err != nil {
return false
}
env := ci.Config.Env
return hasEnvVar(env, "NOMAD_ALLOC_ID") &&
hasEnvVar(env, "NOMAD_GROUP_NAME")
return true
}
func hasMount(c docker.APIContainers, p string) bool {
@@ -152,16 +188,6 @@ func hasNomadName(c docker.APIContainers) bool {
return false
}
func hasEnvVar(vars []string, key string) bool {
for _, v := range vars {
if strings.HasPrefix(v, key+"=") {
return true
}
}
return false
}
func (d *Driver) trackedContainers() map[string]bool {
d.tasks.lock.RLock()
defer d.tasks.lock.RUnlock()

View File

@@ -9,85 +9,53 @@ import (
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/uuid"
tu "github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
var sampleContainerList []docker.APIContainers
var sampleNomadContainerListItem docker.APIContainers
var sampleNonNomadContainerListItem docker.APIContainers
func init() {
func fakeContainerList(t *testing.T) (nomadContainer, nonNomadContainer docker.APIContainers) {
path := "./test-resources/docker/reconciler_containers_list.json"
f, err := os.Open(path)
if err != nil {
return
t.Fatalf("failed to open file: %v", err)
}
var sampleContainerList []docker.APIContainers
err = json.NewDecoder(f).Decode(&sampleContainerList)
if err != nil {
return
t.Fatalf("failed to decode container list: %v", err)
}
sampleNomadContainerListItem = sampleContainerList[0]
sampleNonNomadContainerListItem = sampleContainerList[1]
return sampleContainerList[0], sampleContainerList[1]
}
func Test_HasMount(t *testing.T) {
require.True(t, hasMount(sampleNomadContainerListItem, "/alloc"))
require.True(t, hasMount(sampleNomadContainerListItem, "/data"))
require.True(t, hasMount(sampleNomadContainerListItem, "/secrets"))
require.False(t, hasMount(sampleNomadContainerListItem, "/random"))
nomadContainer, nonNomadContainer := fakeContainerList(t)
require.False(t, hasMount(sampleNonNomadContainerListItem, "/alloc"))
require.False(t, hasMount(sampleNonNomadContainerListItem, "/data"))
require.False(t, hasMount(sampleNonNomadContainerListItem, "/secrets"))
require.False(t, hasMount(sampleNonNomadContainerListItem, "/random"))
require.True(t, hasMount(nomadContainer, "/alloc"))
require.True(t, hasMount(nomadContainer, "/data"))
require.True(t, hasMount(nomadContainer, "/secrets"))
require.False(t, hasMount(nomadContainer, "/random"))
require.False(t, hasMount(nonNomadContainer, "/alloc"))
require.False(t, hasMount(nonNomadContainer, "/data"))
require.False(t, hasMount(nonNomadContainer, "/secrets"))
require.False(t, hasMount(nonNomadContainer, "/random"))
}
func Test_HasNomadName(t *testing.T) {
require.True(t, hasNomadName(sampleNomadContainerListItem))
require.False(t, hasNomadName(sampleNonNomadContainerListItem))
}
func TestHasEnv(t *testing.T) {
envvars := []string{
"NOMAD_ALLOC_DIR=/alloc",
"NOMAD_ALLOC_ID=72bfa388-024e-a903-45b8-2bc28b74ed69",
"NOMAD_ALLOC_INDEX=0",
"NOMAD_ALLOC_NAME=example.cache[0]",
"NOMAD_CPU_LIMIT=500",
"NOMAD_DC=dc1",
"NOMAD_GROUP_NAME=cache",
"NOMAD_JOB_NAME=example",
"NOMAD_MEMORY_LIMIT=256",
"NOMAD_NAMESPACE=default",
"NOMAD_REGION=global",
"NOMAD_SECRETS_DIR=/secrets",
"NOMAD_TASK_DIR=/local",
"NOMAD_TASK_NAME=redis",
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"GOSU_VERSION=1.10",
"REDIS_VERSION=3.2.12",
"REDIS_DOWNLOAD_URL=http://download.redis.io/releases/redis-3.2.12.tar.gz",
"REDIS_DOWNLOAD_SHA=98c4254ae1be4e452aa7884245471501c9aa657993e0318d88f048093e7f88fd",
}
require.True(t, hasEnvVar(envvars, "NOMAD_ALLOC_ID"))
require.True(t, hasEnvVar(envvars, "NOMAD_ALLOC_DIR"))
require.True(t, hasEnvVar(envvars, "GOSU_VERSION"))
require.False(t, hasEnvVar(envvars, "NOMAD_ALLOC_"))
require.False(t, hasEnvVar(envvars, "OTHER_VARIABLE"))
nomadContainer, nonNomadContainer := fakeContainerList(t)
require.True(t, hasNomadName(nomadContainer))
require.False(t, hasNomadName(nonNomadContainer))
}
// TestDanglingContainerRemoval asserts containers without corresponding tasks
// are removed after the creation grace period.
func TestDanglingContainerRemoval(t *testing.T) {
if !tu.IsCI() {
t.Parallel()
}
testutil.DockerCompatible(t)
// start two containers: one tracked nomad container, and one unrelated container
task, cfg, _ := dockerTask(t)
require.NoError(t, task.EncodeConcreteDriverConfig(cfg))
@@ -112,32 +80,42 @@ func TestDanglingContainerRemoval(t *testing.T) {
require.NoError(t, err)
dd := d.Impl().(*Driver)
reconciler := newReconciler(dd)
trackedContainers := map[string]bool{handle.containerID: true}
{
tf := dd.trackedContainers()
require.Contains(t, tf, handle.containerID)
require.NotContains(t, tf, c.ID)
}
tf := reconciler.trackedContainers()
require.Contains(t, tf, handle.containerID)
require.NotContains(t, tf, c.ID)
untracked, err := dd.untrackedContainers(trackedContainers, 1*time.Minute)
// assert tracked containers should never be untracked
untracked, err := reconciler.untrackedContainers(trackedContainers, time.Now())
require.NoError(t, err)
require.NotContains(t, untracked, handle.containerID)
require.NotContains(t, untracked, c.ID)
untracked, err = dd.untrackedContainers(map[string]bool{}, 0)
// assert we recognize nomad containers with appropriate cutoff
untracked, err = reconciler.untrackedContainers(map[string]bool{}, time.Now())
require.NoError(t, err)
require.Contains(t, untracked, handle.containerID)
require.NotContains(t, untracked, c.ID)
// Actually try to kill hosts
// but ignore if creation happened before cutoff
untracked, err = reconciler.untrackedContainers(map[string]bool{}, time.Now().Add(-1*time.Minute))
require.NoError(t, err)
require.NotContains(t, untracked, handle.containerID)
require.NotContains(t, untracked, c.ID)
// a full integration tests to assert that containers are removed
prestineDriver := dockerDriverHarness(t, nil).Impl().(*Driver)
prestineDriver.config.GC.DanglingContainers = ContainerGCConfig{
Enabled: true,
period: 1 * time.Second,
creationTimeout: 1 * time.Second,
Enabled: true,
period: 1 * time.Second,
CreationGrace: 1 * time.Second,
}
require.NoError(t, prestineDriver.removeDanglingContainersIteration())
nReconciler := newReconciler(prestineDriver)
require.NoError(t, nReconciler.removeDanglingContainersIteration())
_, err = client.InspectContainer(c.ID)
require.NoError(t, err)
@@ -146,3 +124,61 @@ func TestDanglingContainerRemoval(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), NoSuchContainerError)
}
// TestDanglingContainerRemoval_Stopped asserts stopped containers without
// corresponding tasks are not removed even if after creation grace period.
func TestDanglingContainerRemoval_Stopped(t *testing.T) {
testutil.DockerCompatible(t)
task, cfg, _ := dockerTask(t)
task.Resources.NomadResources.Networks = nil
require.NoError(t, task.EncodeConcreteDriverConfig(cfg))
// Start two containers: one nomad container, and one stopped container
// that acts like a nomad one
client, d, handle, cleanup := dockerSetup(t, task)
defer cleanup()
require.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second))
inspected, err := client.InspectContainer(handle.containerID)
require.NoError(t, err)
stoppedC, err := client.CreateContainer(docker.CreateContainerOptions{
Name: "mytest-image-" + uuid.Generate(),
Config: inspected.Config,
HostConfig: inspected.HostConfig,
})
require.NoError(t, err)
defer client.RemoveContainer(docker.RemoveContainerOptions{
ID: stoppedC.ID,
Force: true,
})
err = client.StartContainer(stoppedC.ID, nil)
require.NoError(t, err)
err = client.StopContainer(stoppedC.ID, 60)
require.NoError(t, err)
dd := d.Impl().(*Driver)
reconciler := newReconciler(dd)
trackedContainers := map[string]bool{handle.containerID: true}
// assert nomad container is tracked, and we ignore stopped one
tf := reconciler.trackedContainers()
require.Contains(t, tf, handle.containerID)
require.NotContains(t, tf, stoppedC.ID)
untracked, err := reconciler.untrackedContainers(trackedContainers, time.Now())
require.NoError(t, err)
require.NotContains(t, untracked, handle.containerID)
require.NotContains(t, untracked, stoppedC.ID)
// if we start container again, it'll be marked as untracked
require.NoError(t, client.StartContainer(stoppedC.ID, nil))
untracked, err = reconciler.untrackedContainers(trackedContainers, time.Now())
require.NoError(t, err)
require.NotContains(t, untracked, handle.containerID)
require.Contains(t, untracked, stoppedC.ID)
}