From 33c015bcc767d970b5cf1029970313501c01d0b4 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 10 Jan 2017 13:24:45 -0800 Subject: [PATCH 01/17] Add Cleanup method to Driver interface Cleanup can be used for cleaning up resources created by drivers to run a task. Initially the Docker driver is the only user (to remove downloaded images). --- client/driver/docker.go | 118 +++++++++++++++++++-------------- client/driver/docker_test.go | 92 ++++++++++++++++++++----- client/driver/driver.go | 74 ++++++++++++++++++++- client/driver/driver_test.go | 39 +++++++++++ client/driver/exec.go | 6 +- client/driver/exec_test.go | 14 ++-- client/driver/java.go | 6 +- client/driver/java_test.go | 10 +-- client/driver/lxc.go | 6 +- client/driver/lxc_test.go | 4 +- client/driver/mock_driver.go | 6 +- client/driver/qemu.go | 6 +- client/driver/qemu_test.go | 4 +- client/driver/raw_exec.go | 6 +- client/driver/raw_exec_test.go | 12 ++-- client/driver/rkt.go | 6 +- client/driver/rkt_test.go | 14 ++-- client/task_runner.go | 75 ++++++++++++++++----- 18 files changed, 372 insertions(+), 126 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index a740fc51f..b2ebb5397 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -84,15 +84,22 @@ const ( // Docker's privileged mode. dockerPrivilegedConfigOption = "docker.privileged.enabled" + // dockerCleanupImageConfigOption is the key for whether or not to + // cleanup images after the task exits. + dockerCleanupImageConfigOption = "docker.cleanup.image" + dockerCleanupImageConfigDefault = true + // dockerTimeout is the length of time a request can be outstanding before // it is timed out. dockerTimeout = 5 * time.Minute + + // dockerImageResKey is the CreatedResources key for docker images + dockerImageResKey = "image" ) type DockerDriver struct { DriverContext - imageID string driverConfig *DockerDriverConfig } @@ -237,8 +244,6 @@ type DockerHandle struct { client *docker.Client waitClient *docker.Client logger *log.Logger - cleanupImage bool - imageID string containerID string version string clkSpeed float64 @@ -355,35 +360,42 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation { return cstructs.FSIsolationImage } -func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) error { +func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) { driverConfig, err := NewDockerDriverConfig(task, d.taskEnv) if err != nil { - return err + return nil, err } + // Set state needed by Start() + d.driverConfig = driverConfig + // Initialize docker API clients client, _, err := d.dockerClients() if err != nil { - return fmt.Errorf("Failed to connect to docker daemon: %s", err) + return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) } - if err := d.createImage(driverConfig, client, ctx.TaskDir); err != nil { - return err - } - - image := driverConfig.ImageName - // Now that we have the image we can get the image id - dockerImage, err := client.InspectImage(image) + // Ensure the image is available + pulled, err := d.createImage(driverConfig, client, ctx.TaskDir) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed getting image id for %s: %s", image, err) - return fmt.Errorf("Failed to determine image id for `%s`: %s", image, err) + return nil, err } - d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID) + if pulled { + // An image was downloaded, add the ID to created resources + dockerImage, err := client.InspectImage(driverConfig.ImageName) + if err == nil { + res := NewCreatedResources() + res.Add(dockerImageResKey, dockerImage.ID) + return res, nil + } - // Set state needed by Start() - d.imageID = dockerImage.ID - d.driverConfig = driverConfig - return nil + // When an error occurs we leak the image, but this should be + // extremely rare. There's no point in returning an error + // because the image won't be redownloaded as it now exists. + d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err) + } + + return nil, nil } func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -431,9 +443,9 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle config, err := d.createContainerConfig(ctx, task, d.driverConfig, syslogAddr) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", d.imageID, err) + d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %q: %v", d.driverConfig.ImageName, err) pluginClient.Kill() - return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", d.imageID, err) + return nil, fmt.Errorf("Failed to create container configuration for image %q: %v", d.driverConfig.ImageName, err) } container, rerr := d.createContainer(config) @@ -446,8 +458,6 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle d.logger.Printf("[INFO] driver.docker: created container %s", container.ID) - cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true) - // We don't need to start the container if the container is already running // since we don't create containers which are already present on the host // and are running @@ -471,9 +481,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle waitClient: waitClient, executor: exec, pluginClient: pluginClient, - cleanupImage: cleanupImage, logger: d.logger, - imageID: d.imageID, containerID: container.ID, version: d.config.Version, killTimeout: GetKillTimeout(task.KillTimeout, maxKill), @@ -489,6 +497,25 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return h, nil } +func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) { + if res == nil { + // Nothing to cleanup + return + } + + // If the client should cleanup images and the image was downloaded, remove it + cleanupImage := d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) + if ids := res.Resources[dockerImageResKey]; cleanupImage && len(ids) > 0 { + for _, id := range ids { + if err := client.RemoveImage(id); err != nil { + d.logger.Printf("[WARN] driver.docker: cleanup failed to remove downloaded image %q: %v", id, err) + } else { + d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id) + } + } + } +} + // dockerClients creates two *docker.Client, one for long running operations and // the other for shorter operations. In test / dev mode we can use ENV vars to // connect to the docker daemon. In production mode we will read docker.endpoint @@ -884,33 +911,37 @@ func (d *DockerDriver) Periodic() (bool, time.Duration) { // createImage creates a docker image either by pulling it from a registry or by // loading it from the file system -func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error { +// +// Returns true if an image was downloaded and should be cleaned up. +func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) (bool, error) { image := driverConfig.ImageName repo, tag := docker.ParseRepositoryTag(image) if tag == "" { tag = "latest" } - var dockerImage *docker.Image - var err error // We're going to check whether the image is already downloaded. If the tag // is "latest", or ForcePull is set, we have to check for a new version every time so we don't // bother to check and cache the id here. We'll download first, then cache. if driverConfig.ForcePull { d.logger.Printf("[DEBUG] driver.docker: force pull image '%s:%s' instead of inspecting local", repo, tag) } else if tag != "latest" { - dockerImage, err = client.InspectImage(image) + if dockerImage, _ := client.InspectImage(image); dockerImage != nil { + // Image exists, nothing to do + return false, nil + } + } + + // Load the image if specified + if len(driverConfig.LoadImages) > 0 { + return false, d.loadImage(driverConfig, client, taskDir) } // Download the image - if dockerImage == nil { - if len(driverConfig.LoadImages) > 0 { - return d.loadImage(driverConfig, client, taskDir) - } - - return d.pullImage(driverConfig, client, repo, tag) + if err := d.pullImage(driverConfig, client, repo, tag); err != nil { + return false, err } - return err + return true, nil } // pullImage creates an image by pulling it from a docker registry @@ -960,6 +991,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) return d.recoverablePullError(err, driverConfig.ImageName) } + d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag) return nil } @@ -1081,8 +1113,6 @@ START: } func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { - cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true) - // Split the handle pidBytes := []byte(strings.TrimPrefix(handleID, "DOCKER:")) pid := &dockerPID{} @@ -1138,9 +1168,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er waitClient: waitClient, executor: exec, pluginClient: pluginClient, - cleanupImage: cleanupImage, logger: d.logger, - imageID: pid.ImageID, containerID: pid.ContainerID, version: pid.Version, killTimeout: pid.KillTimeout, @@ -1161,7 +1189,6 @@ func (h *DockerHandle) ID() string { // Return a handle to the PID pid := dockerPID{ Version: h.version, - ImageID: h.imageID, ContainerID: h.containerID, KillTimeout: h.killTimeout, MaxKillTimeout: h.maxKillTimeout, @@ -1278,13 +1305,6 @@ func (h *DockerHandle) run() { h.logger.Printf("[ERR] driver.docker: error removing container: %v", err) } - // Cleanup the image - if h.cleanupImage { - if err := h.client.RemoveImage(h.imageID); err != nil { - h.logger.Printf("[DEBUG] driver.docker: error removing image: %v", err) - } - } - // Send the results h.waitCh <- dstructs.NewWaitResult(exitCode, 0, werr) close(h.waitCh) diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 9b1e129d5..fdde597db 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -99,21 +99,25 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie driver := NewDockerDriver(tctx.DriverCtx) copyImage(t, tctx.ExecCtx.TaskDir, "busybox.tar") - if err := driver.Prestart(tctx.ExecCtx, task); err != nil { + res, err := driver.Prestart(tctx.ExecCtx, task) + if err != nil { tctx.AllocDir.Destroy() t.Fatalf("error in prestart: %v", err) } handle, err := driver.Start(tctx.ExecCtx, task) if err != nil { + driver.Cleanup(tctx.ExecCtx, res) tctx.AllocDir.Destroy() t.Fatalf("Failed to start driver: %s\nStack\n%s", err, debug.Stack()) } if handle == nil { + driver.Cleanup(tctx.ExecCtx, res) tctx.AllocDir.Destroy() t.Fatalf("handle is nil\nStack\n%s", debug.Stack()) } cleanup := func() { + driver.Cleanup(tctx.ExecCtx, res) handle.Kill() tctx.AllocDir.Destroy() } @@ -182,14 +186,19 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer d.Cleanup(ctx.ExecCtx, res) + handle, err := d.Start(ctx.ExecCtx, task) if err != nil { + d.Cleanup(ctx.ExecCtx, res) t.Fatalf("err: %v", err) } if handle == nil { + d.Cleanup(ctx.ExecCtx, res) t.Fatalf("missing handle") } defer handle.Kill() @@ -276,9 +285,11 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) { // Copy the image into the task's directory copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer d.Cleanup(ctx.ExecCtx, res) handle, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -340,8 +351,9 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) - err := d.Prestart(ctx.ExecCtx, task) + res, err := d.Prestart(ctx.ExecCtx, task) if err == nil { + d.Cleanup(ctx.ExecCtx, res) t.Fatalf("want error in prestart: %v", err) } @@ -391,9 +403,11 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer d.Cleanup(ctx.ExecCtx, res) handle, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -477,7 +491,6 @@ func TestDockerDriver_StartN(t *testing.T) { t.Logf("Starting %d tasks", len(taskList)) // Let's spin up a bunch of things - var err error for idx, task := range taskList { ctx := testDriverContexts(t, task) ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} @@ -485,9 +498,11 @@ func TestDockerDriver_StartN(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart #%d: %v", idx+1, err) } + defer d.Cleanup(ctx.ExecCtx, res) handles[idx], err = d.Start(ctx.ExecCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) @@ -535,7 +550,6 @@ func TestDockerDriver_StartNVersions(t *testing.T) { t.Logf("Starting %d tasks", len(taskList)) // Let's spin up a bunch of things - var err error for idx, task := range taskList { ctx := testDriverContexts(t, task) ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} @@ -545,9 +559,11 @@ func TestDockerDriver_StartNVersions(t *testing.T) { copyImage(t, ctx.ExecCtx.TaskDir, "busybox_musl.tar") copyImage(t, ctx.ExecCtx.TaskDir, "busybox_glibc.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart #%d: %v", idx+1, err) } + defer d.Cleanup(ctx.ExecCtx, res) handles[idx], err = d.Start(ctx.ExecCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) @@ -708,7 +724,7 @@ func TestDockerDriver_ForcePull_IsInvalidConfig(t *testing.T) { ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} driver := NewDockerDriver(ctx.DriverCtx) - if err := driver.Prestart(ctx.ExecCtx, task); err == nil { + if _, err := driver.Prestart(ctx.ExecCtx, task); err == nil { t.Fatalf("error expected in prestart") } } @@ -916,9 +932,11 @@ func TestDockerDriver_User(t *testing.T) { defer ctx.AllocDir.Destroy() copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := driver.Prestart(ctx.ExecCtx, task); err != nil { + res, err := driver.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer driver.Cleanup(ctx.ExecCtx, res) // It should fail because the user "alice" does not exist on the given // image. @@ -1072,9 +1090,11 @@ done fmt.Errorf("Failed to write data") } - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer d.Cleanup(ctx.ExecCtx, res) handle, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -1194,9 +1214,11 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, _, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() - if err := driver.Prestart(execCtx, task); err != nil { + res, err := driver.Prestart(execCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer driver.Cleanup(execCtx, res) if _, err := driver.Start(execCtx, task); err == nil { t.Fatalf("Started driver successfully when volumes should have been disabled.") } @@ -1207,9 +1229,11 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, fn, cleanup := setupDockerVolumes(t, cfg, ".") defer cleanup() - if err := driver.Prestart(execCtx, task); err != nil { + res, err := driver.Prestart(execCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer driver.Cleanup(execCtx, res) handle, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -1243,9 +1267,11 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) { task, driver, execCtx, hostpath, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() - if err := driver.Prestart(execCtx, task); err != nil { + res, err := driver.Prestart(execCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer driver.Cleanup(execCtx, res) handle, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("Failed to start docker driver: %v", err) @@ -1266,6 +1292,42 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) { } } +// TestDockerDriver_Cleanup ensures Cleanup removes only downloaded images. +func TestDockerDriver_Cleanup(t *testing.T) { + if !testutil.DockerIsConnected(t) { + t.SkipNow() + } + + imageName := "hello-world:latest" + task := &structs.Task{ + Name: "cleanup_test", + Driver: "docker", + Config: map[string]interface{}{ + "image": imageName, + }, + } + tctx := testDriverContexts(t, task) + defer tctx.AllocDir.Destroy() + + // Run Prestart + driver := NewDockerDriver(tctx.DriverCtx).(*DockerDriver) + res, err := driver.Prestart(tctx.ExecCtx, task) + if err != nil { + t.Fatalf("error in prestart: %v", err) + } + if len(res.Resources) == 0 || len(res.Resources[dockerImageResKey]) == 0 { + t.Fatalf("no created resources: %#v", res) + } + + // Cleanup + driver.Cleanup(tctx.ExecCtx, res) + + // Ensure image was removed + if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil { + t.Fatalf("image exists but should have been removed. Does another %v container exist?", imageName) + } +} + func copyImage(t *testing.T, taskDir *allocdir.TaskDir, image string) { dst := filepath.Join(taskDir.LocalDir, image) copyFile(filepath.Join("./test-resources/docker", image), dst, t) diff --git a/client/driver/driver.go b/client/driver/driver.go index 893afa300..970332f60 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -51,6 +51,70 @@ func NewDriver(name string, ctx *DriverContext) (Driver, error) { // Factory is used to instantiate a new Driver type Factory func(*DriverContext) Driver +// CreatedResources is a map of resources (eg downloaded images) created by a driver +// that must be cleaned up. +type CreatedResources struct { + Resources map[string][]string +} + +func NewCreatedResources() *CreatedResources { + return &CreatedResources{Resources: make(map[string][]string)} +} + +// Add a new resource if it doesn't already exist. +func (r *CreatedResources) Add(k, v string) { + if r.Resources == nil { + r.Resources = map[string][]string{k: []string{v}} + return + } + existing, ok := r.Resources[k] + if !ok { + // Key doesn't exist, create it + r.Resources[k] = []string{v} + return + } + for _, item := range existing { + if item == v { + // resource exists, return + return + } + } + + // Resource type exists but value did not, append it + r.Resources[k] = append(existing, v) + return +} + +// Merge another CreatedResources into this one. If the other CreatedResources +// is nil this method is a noop. +func (r *CreatedResources) Merge(o *CreatedResources) { + if o == nil { + return + } + + for k, v := range o.Resources { + // New key + if len(r.Resources[k]) == 0 { + r.Resources[k] = v + continue + } + + // Existing key + OUTER: + for _, item := range v { + for _, existing := range r.Resources[k] { + if item == existing { + // Found it, move on + continue OUTER + } + } + + // New item, append it + r.Resources[k] = append(r.Resources[k], item) + } + } +} + // Driver is used for execution of tasks. This allows Nomad // to support many pluggable implementations of task drivers. // Examples could include LXC, Docker, Qemu, etc. @@ -60,7 +124,9 @@ type Driver interface { // Prestart prepares the task environment and performs expensive // intialization steps like downloading images. - Prestart(*ExecContext, *structs.Task) error + // + // CreatedResources may be non-nil even when an error occurs. + Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) // Start is used to being task execution Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) @@ -68,6 +134,12 @@ type Driver interface { // Open is used to re-open a handle to a task Open(ctx *ExecContext, handleID string) (DriverHandle, error) + // Cleanup is called to remove resources which were created for a task + // and no longer needed. + // + // Cleanup should log any errors it encounters. + Cleanup(*ExecContext, *CreatedResources) + // Drivers must validate their configuration Validate(map[string]interface{}) error diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 9db1148c8..51e0dbe58 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -256,3 +256,42 @@ func TestMapMergeStrStr(t *testing.T) { t.Errorf("\nExpected\n%+v\nGot\n%+v\n", d, c) } } + +func TestCreatedResources(t *testing.T) { + res1 := NewCreatedResources() + res1.Add("k1", "v1") + res1.Add("k1", "v2") + res1.Add("k1", "v1") + res1.Add("k2", "v1") + + expected := map[string][]string{ + "k1": {"v1", "v2"}, + "k2": {"v1"}, + } + if !reflect.DeepEqual(expected, res1.Resources) { + t.Fatalf("1. %#v != expected %#v", res1.Resources, expected) + } + + // Make sure merging nil works + var res2 *CreatedResources + res1.Merge(res2) + if !reflect.DeepEqual(expected, res1.Resources) { + t.Fatalf("2. %#v != expected %#v", res1.Resources, expected) + } + + // Make sure a normal merge works + res2 = NewCreatedResources() + res2.Add("k1", "v3") + res2.Add("k2", "v1") + res2.Add("k3", "v3") + res1.Merge(res2) + + expected = map[string][]string{ + "k1": {"v1", "v2", "v3"}, + "k2": {"v1"}, + "k3": {"v3"}, + } + if !reflect.DeepEqual(expected, res1.Resources) { + t.Fatalf("3. %#v != expected %#v", res1.Resources, expected) + } +} diff --git a/client/driver/exec.go b/client/driver/exec.go index ac96bd260..20dd1b504 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -94,8 +94,8 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } -func (d *ExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *ExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -173,6 +173,8 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return h, nil } +func (d *ExecDriver) Cleanup(*ExecContext, *CreatedResources) {} + type execId struct { Version string KillTimeout time.Duration diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 7db6696ef..e5d8b24f3 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -67,7 +67,7 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -111,7 +111,7 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -175,7 +175,7 @@ func TestExecDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -229,7 +229,7 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -283,7 +283,7 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -349,7 +349,7 @@ done fmt.Errorf("Failed to write data") } - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -414,7 +414,7 @@ func TestExecDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/java.go b/client/driver/java.go index 6f2af363e..3852b01a7 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -164,8 +164,8 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, return true, nil } -func (d *JavaDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *JavaDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -260,6 +260,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return h, nil } +func (d *JavaDriver) Cleanup(*ExecContext, *CreatedResources) {} + // cgroupsMounted returns true if the cgroups are mounted on a system otherwise // returns false func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool { diff --git a/client/driver/java_test.go b/client/driver/java_test.go index f0821c710..9e9e082c8 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -94,7 +94,7 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -149,7 +149,7 @@ func TestJavaDriver_Start_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -215,7 +215,7 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -277,7 +277,7 @@ func TestJavaDriver_Signal(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -336,7 +336,7 @@ func TestJavaDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewJavaDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/lxc.go b/client/driver/lxc.go index 8e9f7aa56..a9cbe8aec 100644 --- a/client/driver/lxc.go +++ b/client/driver/lxc.go @@ -171,8 +171,8 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e return true, nil } -func (d *LxcDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *LxcDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } // Start starts the LXC Driver @@ -286,6 +286,8 @@ func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return &handle, nil } +func (d *LxcDriver) Cleanup(*ExecContext, *CreatedResources) {} + // Open creates the driver to monitor an existing LXC container func (d *LxcDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { pid := &lxcPID{} diff --git a/client/driver/lxc_test.go b/client/driver/lxc_test.go index 5f9b8852a..b6188bc6b 100644 --- a/client/driver/lxc_test.go +++ b/client/driver/lxc_test.go @@ -72,7 +72,7 @@ func TestLxcDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewLxcDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -148,7 +148,7 @@ func TestLxcDriver_Open_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewLxcDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index b6d5737b5..0277a3128 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -79,8 +79,8 @@ func (d *MockDriver) FSIsolation() cstructs.FSIsolation { return cstructs.FSIsolationNone } -func (d *MockDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *MockDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } // Start starts the mock driver @@ -124,6 +124,8 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return &h, nil } +func (m *MockDriver) Cleanup(*ExecContext, *CreatedResources) {} + // Validate validates the mock driver configuration func (m *MockDriver) Validate(map[string]interface{}) error { return nil diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 9ad850c32..0323c791c 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -137,8 +137,8 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, return true, nil } -func (d *QemuDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *QemuDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } // Run an existing Qemu image. Start() will pull down an existing, valid Qemu @@ -341,6 +341,8 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro return h, nil } +func (d *QemuDriver) Cleanup(*ExecContext, *CreatedResources) {} + func (h *qemuHandle) ID() string { id := qemuId{ Version: h.version, diff --git a/client/driver/qemu_test.go b/client/driver/qemu_test.go index 16ba2ba31..8e7393060 100644 --- a/client/driver/qemu_test.go +++ b/client/driver/qemu_test.go @@ -80,7 +80,7 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/qemu/linux-0.2.img", filepath.Join(dst, "linux-0.2.img"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("Prestart faild: %v", err) } @@ -146,7 +146,7 @@ func TestQemuDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewQemuDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("Prestart faild: %v", err) } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 972afe9e0..db3fd4843 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -108,8 +108,8 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo return false, nil } -func (d *RawExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *RawExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -182,6 +182,8 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return h, nil } +func (d *RawExecDriver) Cleanup(*ExecContext, *CreatedResources) {} + type rawExecId struct { Version string KillTimeout time.Duration diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index c91d1c52f..c3efcfd71 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -77,7 +77,7 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -126,7 +126,7 @@ func TestRawExecDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -180,7 +180,7 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -233,7 +233,7 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -286,7 +286,7 @@ func TestRawExecDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -335,7 +335,7 @@ done fmt.Errorf("Failed to write data") } - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/rkt.go b/client/driver/rkt.go index bf4b7d204..5492b8073 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -208,8 +208,8 @@ func (d *RktDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } -func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) { + return nil, nil } // Run an existing Rkt image. @@ -456,6 +456,8 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return h, nil } +func (d *RktDriver) Cleanup(*ExecContext, *CreatedResources) {} + func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { // Parse the handle pidBytes := []byte(strings.TrimPrefix(handleID, "Rkt:")) diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index 25dd5c151..72699986c 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -98,7 +98,7 @@ func TestRktDriver_Start_DNS(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -149,7 +149,7 @@ func TestRktDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -210,7 +210,7 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -281,7 +281,7 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -343,7 +343,7 @@ func TestRktDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -384,7 +384,7 @@ func TestRktTrustPrefix(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -462,7 +462,7 @@ func TestRktDriver_PortsMapping(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/task_runner.go b/client/task_runner.go index f8331f803..a4b70ae8e 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -93,6 +93,12 @@ type TaskRunner struct { // Must acquire persistLock when accessing taskDirBuilt bool + // createdResources are all the resources created by the task driver + // across all attempts to start the task. + // + // Must acquire persistLock when accessing + createdResources *driver.CreatedResources + // payloadRendered tracks whether the payload has been rendered to disk payloadRendered bool @@ -130,7 +136,10 @@ type TaskRunner struct { // waitCh closing marks the run loop as having exited waitCh chan struct{} - // serialize SaveState calls + // persistLock must be acquired when accessing fields stored by + // SaveState. SaveState is called asynchronously to TaskRunner.Run by + // AllocRunner, so all state fields must be synchronized using this + // lock. persistLock sync.Mutex } @@ -141,6 +150,7 @@ type taskRunnerState struct { HandleID string ArtifactDownloaded bool TaskDirBuilt bool + CreatedResources *driver.CreatedResources PayloadRendered bool } @@ -177,22 +187,23 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type) tc := &TaskRunner{ - config: config, - updater: updater, - logger: logger, - restartTracker: restartTracker, - alloc: alloc, - task: task, - taskDir: taskDir, - vaultClient: vaultClient, - vaultFuture: NewTokenFuture().Set(""), - updateCh: make(chan *structs.Allocation, 64), - destroyCh: make(chan struct{}), - waitCh: make(chan struct{}), - startCh: make(chan struct{}, 1), - unblockCh: make(chan struct{}), - restartCh: make(chan *structs.TaskEvent), - signalCh: make(chan SignalEvent), + config: config, + updater: updater, + logger: logger, + restartTracker: restartTracker, + alloc: alloc, + task: task, + taskDir: taskDir, + createdResources: driver.NewCreatedResources(), + vaultClient: vaultClient, + vaultFuture: NewTokenFuture().Set(""), + updateCh: make(chan *structs.Allocation, 64), + destroyCh: make(chan struct{}), + waitCh: make(chan struct{}), + startCh: make(chan struct{}, 1), + unblockCh: make(chan struct{}), + restartCh: make(chan *structs.TaskEvent), + signalCh: make(chan SignalEvent), } return tc @@ -237,6 +248,7 @@ func (r *TaskRunner) RestoreState() error { } r.artifactsDownloaded = snap.ArtifactDownloaded r.taskDirBuilt = snap.TaskDirBuilt + r.createdResources = snap.CreatedResources r.payloadRendered = snap.PayloadRendered if err := r.setTaskEnv(); err != nil { @@ -298,6 +310,7 @@ func (r *TaskRunner) SaveState() error { ArtifactDownloaded: r.artifactsDownloaded, TaskDirBuilt: r.taskDirBuilt, PayloadRendered: r.payloadRendered, + CreatedResources: r.createdResources, } r.handleLock.Lock() @@ -870,6 +883,8 @@ func (r *TaskRunner) run() { select { case success := <-prestartResultCh: if !success { + //FIXME is this necessary? + r.cleanup() r.setState(structs.TaskStateDead, nil) return } @@ -958,6 +973,7 @@ func (r *TaskRunner) run() { running := r.running r.runningLock.Unlock() if !running { + r.cleanup() r.setState(structs.TaskStateDead, r.destroyEvent) return } @@ -976,6 +992,11 @@ func (r *TaskRunner) run() { r.killTask(killEvent) close(stopCollection) + + // Wait for handler to exit before calling cleanup + <-handleWaitCh + r.cleanup() + r.setState(structs.TaskStateDead, nil) return } @@ -984,6 +1005,7 @@ func (r *TaskRunner) run() { RESTART: restart := r.shouldRestart() if !restart { + r.cleanup() r.setState(structs.TaskStateDead, nil) return } @@ -997,6 +1019,16 @@ func (r *TaskRunner) run() { } } +// cleanup calls Driver.Cleanup when a task is stopping. Errors are logged. +func (r *TaskRunner) cleanup() { + if drv, err := r.createDriver(); err != nil { + r.logger.Printf("[WARN] client: error creating driver to cleanup resources: %v", err) + } else { + ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) + drv.Cleanup(ctx, r.createdResources) + } +} + // shouldRestart returns if the task should restart. If the return value is // true, the task's restart policy has already been considered and any wait time // between restarts has been applied. @@ -1095,7 +1127,14 @@ func (r *TaskRunner) startTask() error { // Run prestart ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) - if err := drv.Prestart(ctx, r.task); err != nil { + res, err := drv.Prestart(ctx, r.task) + + // Merge newly created resources into previously created resources + r.persistLock.Lock() + r.createdResources.Merge(res) + r.persistLock.Unlock() + + if err != nil { wrapped := fmt.Errorf("failed to initialize task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err) From cace868288ed05562ea9ef65d3807a09968511a2 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 12 Jan 2017 11:17:35 -0800 Subject: [PATCH 02/17] Stop being so confusing --- client/driver/docker.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index b2ebb5397..e5d9a9060 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -383,16 +383,17 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR if pulled { // An image was downloaded, add the ID to created resources dockerImage, err := client.InspectImage(driverConfig.ImageName) - if err == nil { - res := NewCreatedResources() - res.Add(dockerImageResKey, dockerImage.ID) - return res, nil + if err != nil { + // When an error occurs we leak the image, but this should be + // extremely rare. There's no point in returning an error + // because the image won't be redownloaded as it now exists. + d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err) + return nil, nil } - // When an error occurs we leak the image, but this should be - // extremely rare. There's no point in returning an error - // because the image won't be redownloaded as it now exists. - d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err) + res := NewCreatedResources() + res.Add(dockerImageResKey, dockerImage.ID) + return res, nil } return nil, nil From 1ec5c930a6e7a9182a039f534261b7254f0d070a Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 12 Jan 2017 17:21:54 -0800 Subject: [PATCH 03/17] Return errors from cleanup and let TaskRunner retry --- client/driver/docker.go | 83 +++++++++++++++++++++--------------- client/driver/docker_test.go | 50 +++++++++------------- client/driver/driver.go | 5 ++- client/driver/exec.go | 2 +- client/driver/java.go | 2 +- client/driver/lxc.go | 2 +- client/driver/mock_driver.go | 12 +++++- client/driver/qemu.go | 2 +- client/driver/raw_exec.go | 2 +- client/driver/rkt.go | 2 +- client/task_runner.go | 44 ++++++++++++++++--- client/task_runner_test.go | 50 ++++++++++++++++++++++ 12 files changed, 178 insertions(+), 78 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index e5d9a9060..51d60f7fe 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -376,27 +376,25 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR } // Ensure the image is available - pulled, err := d.createImage(driverConfig, client, ctx.TaskDir) - if err != nil { + if err := d.createImage(driverConfig, client, ctx.TaskDir); err != nil { return nil, err } - if pulled { - // An image was downloaded, add the ID to created resources - dockerImage, err := client.InspectImage(driverConfig.ImageName) - if err != nil { - // When an error occurs we leak the image, but this should be - // extremely rare. There's no point in returning an error - // because the image won't be redownloaded as it now exists. - d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err) - return nil, nil - } - res := NewCreatedResources() - res.Add(dockerImageResKey, dockerImage.ID) - return res, nil + // Regardless of whether the image was downloaded already or not, store + // it as a created resource. Cleanup will soft fail if the image is + // still in use by another contianer. + dockerImage, err := client.InspectImage(driverConfig.ImageName) + if err != nil { + // When an error occurs we leak the image, but this should be + // extremely rare. There's no point in returning an error + // because the image won't be redownloaded as it now exists. + d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err) + return nil, nil } - return nil, nil + res := NewCreatedResources() + res.Add(dockerImageResKey, dockerImage.ID) + return res, nil } func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -498,23 +496,40 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return h, nil } -func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) { - if res == nil { - // Nothing to cleanup - return +func (d *DockerDriver) Cleanup(_ *ExecContext, key, value string) error { + switch key { + case dockerImageResKey: + return d.cleanupImage(value) + default: + d.logger.Printf("[WARN] driver.docker: unknown resource to cleanup: %q -> %q", key, value) + return nil + } +} + +// cleanupImage removes a Docker image. No error is returned if the image +// doesn't exist or is still in use. Requires the global client to already be +// initialized. +func (d *DockerDriver) cleanupImage(id string) error { + if !d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) { + // Config says not to cleanup + return nil } - // If the client should cleanup images and the image was downloaded, remove it - cleanupImage := d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) - if ids := res.Resources[dockerImageResKey]; cleanupImage && len(ids) > 0 { - for _, id := range ids { - if err := client.RemoveImage(id); err != nil { - d.logger.Printf("[WARN] driver.docker: cleanup failed to remove downloaded image %q: %v", id, err) - } else { - d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id) - } + if err := client.RemoveImage(id); err != nil { + if err == docker.ErrNoSuchImage { + d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: does not exist", id) + return nil } + if derr, ok := err.(*docker.Error); ok && derr.Status == 409 { + d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id) + return nil + } + d.logger.Printf("[WARN] driver.docker: cleanup failed to remove downloaded image %q: %v", id, err) + return err } + + d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id) + return nil } // dockerClients creates two *docker.Client, one for long running operations and @@ -914,7 +929,7 @@ func (d *DockerDriver) Periodic() (bool, time.Duration) { // loading it from the file system // // Returns true if an image was downloaded and should be cleaned up. -func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) (bool, error) { +func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error { image := driverConfig.ImageName repo, tag := docker.ParseRepositoryTag(image) if tag == "" { @@ -929,20 +944,20 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc } else if tag != "latest" { if dockerImage, _ := client.InspectImage(image); dockerImage != nil { // Image exists, nothing to do - return false, nil + return nil } } // Load the image if specified if len(driverConfig.LoadImages) > 0 { - return false, d.loadImage(driverConfig, client, taskDir) + return d.loadImage(driverConfig, client, taskDir) } // Download the image if err := d.pullImage(driverConfig, client, repo, tag); err != nil { - return false, err + return err } - return true, nil + return nil } // pullImage creates an image by pulling it from a docker registry diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index fdde597db..0dc79587f 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -99,25 +99,22 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie driver := NewDockerDriver(tctx.DriverCtx) copyImage(t, tctx.ExecCtx.TaskDir, "busybox.tar") - res, err := driver.Prestart(tctx.ExecCtx, task) + _, err := driver.Prestart(tctx.ExecCtx, task) if err != nil { tctx.AllocDir.Destroy() t.Fatalf("error in prestart: %v", err) } handle, err := driver.Start(tctx.ExecCtx, task) if err != nil { - driver.Cleanup(tctx.ExecCtx, res) tctx.AllocDir.Destroy() t.Fatalf("Failed to start driver: %s\nStack\n%s", err, debug.Stack()) } if handle == nil { - driver.Cleanup(tctx.ExecCtx, res) tctx.AllocDir.Destroy() t.Fatalf("handle is nil\nStack\n%s", debug.Stack()) } cleanup := func() { - driver.Cleanup(tctx.ExecCtx, res) handle.Kill() tctx.AllocDir.Destroy() } @@ -186,19 +183,16 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - res, err := d.Prestart(ctx.ExecCtx, task) + _, err := d.Prestart(ctx.ExecCtx, task) if err != nil { t.Fatalf("error in prestart: %v", err) } - defer d.Cleanup(ctx.ExecCtx, res) handle, err := d.Start(ctx.ExecCtx, task) if err != nil { - d.Cleanup(ctx.ExecCtx, res) t.Fatalf("err: %v", err) } if handle == nil { - d.Cleanup(ctx.ExecCtx, res) t.Fatalf("missing handle") } defer handle.Kill() @@ -285,11 +279,10 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) { // Copy the image into the task's directory copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - res, err := d.Prestart(ctx.ExecCtx, task) + _, err := d.Prestart(ctx.ExecCtx, task) if err != nil { t.Fatalf("error in prestart: %v", err) } - defer d.Cleanup(ctx.ExecCtx, res) handle, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -351,9 +344,8 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) - res, err := d.Prestart(ctx.ExecCtx, task) + _, err := d.Prestart(ctx.ExecCtx, task) if err == nil { - d.Cleanup(ctx.ExecCtx, res) t.Fatalf("want error in prestart: %v", err) } @@ -403,11 +395,10 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - res, err := d.Prestart(ctx.ExecCtx, task) + _, err := d.Prestart(ctx.ExecCtx, task) if err != nil { t.Fatalf("error in prestart: %v", err) } - defer d.Cleanup(ctx.ExecCtx, res) handle, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -498,11 +489,10 @@ func TestDockerDriver_StartN(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - res, err := d.Prestart(ctx.ExecCtx, task) + _, err := d.Prestart(ctx.ExecCtx, task) if err != nil { t.Fatalf("error in prestart #%d: %v", idx+1, err) } - defer d.Cleanup(ctx.ExecCtx, res) handles[idx], err = d.Start(ctx.ExecCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) @@ -559,11 +549,10 @@ func TestDockerDriver_StartNVersions(t *testing.T) { copyImage(t, ctx.ExecCtx.TaskDir, "busybox_musl.tar") copyImage(t, ctx.ExecCtx.TaskDir, "busybox_glibc.tar") - res, err := d.Prestart(ctx.ExecCtx, task) + _, err := d.Prestart(ctx.ExecCtx, task) if err != nil { t.Fatalf("error in prestart #%d: %v", idx+1, err) } - defer d.Cleanup(ctx.ExecCtx, res) handles[idx], err = d.Start(ctx.ExecCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) @@ -932,11 +921,10 @@ func TestDockerDriver_User(t *testing.T) { defer ctx.AllocDir.Destroy() copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - res, err := driver.Prestart(ctx.ExecCtx, task) + _, err := driver.Prestart(ctx.ExecCtx, task) if err != nil { t.Fatalf("error in prestart: %v", err) } - defer driver.Cleanup(ctx.ExecCtx, res) // It should fail because the user "alice" does not exist on the given // image. @@ -1090,11 +1078,10 @@ done fmt.Errorf("Failed to write data") } - res, err := d.Prestart(ctx.ExecCtx, task) + _, err := d.Prestart(ctx.ExecCtx, task) if err != nil { t.Fatalf("error in prestart: %v", err) } - defer d.Cleanup(ctx.ExecCtx, res) handle, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -1214,11 +1201,10 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, _, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() - res, err := driver.Prestart(execCtx, task) + _, err = driver.Prestart(execCtx, task) if err != nil { t.Fatalf("error in prestart: %v", err) } - defer driver.Cleanup(execCtx, res) if _, err := driver.Start(execCtx, task); err == nil { t.Fatalf("Started driver successfully when volumes should have been disabled.") } @@ -1229,11 +1215,10 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, fn, cleanup := setupDockerVolumes(t, cfg, ".") defer cleanup() - res, err := driver.Prestart(execCtx, task) + _, err := driver.Prestart(execCtx, task) if err != nil { t.Fatalf("error in prestart: %v", err) } - defer driver.Cleanup(execCtx, res) handle, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -1267,11 +1252,10 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) { task, driver, execCtx, hostpath, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() - res, err := driver.Prestart(execCtx, task) + _, err = driver.Prestart(execCtx, task) if err != nil { t.Fatalf("error in prestart: %v", err) } - defer driver.Cleanup(execCtx, res) handle, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("Failed to start docker driver: %v", err) @@ -1320,12 +1304,20 @@ func TestDockerDriver_Cleanup(t *testing.T) { } // Cleanup - driver.Cleanup(tctx.ExecCtx, res) + if err := driver.Cleanup(tctx.ExecCtx, dockerImageResKey, res.Resources[dockerImageResKey][0]); err != nil { + t.Fatalf("Cleanup failed: %v", err) + } // Ensure image was removed if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil { t.Fatalf("image exists but should have been removed. Does another %v container exist?", imageName) } + + // The image doesn't exist which shouldn't be an error when calling + // Cleanup, so call it again to make sure. + if err := driver.Cleanup(tctx.ExecCtx, dockerImageResKey, res.Resources[dockerImageResKey][0]); err != nil { + t.Fatalf("Cleanup failed: %v", err) + } } func copyImage(t *testing.T, taskDir *allocdir.TaskDir, image string) { diff --git a/client/driver/driver.go b/client/driver/driver.go index 970332f60..ffa675a2e 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -137,8 +137,9 @@ type Driver interface { // Cleanup is called to remove resources which were created for a task // and no longer needed. // - // Cleanup should log any errors it encounters. - Cleanup(*ExecContext, *CreatedResources) + // Cleanup is called once for every value for every key in + // CreatedResources. Errors will cause Cleanup to be retried. + Cleanup(ctx *ExecContext, key, value string) error // Drivers must validate their configuration Validate(map[string]interface{}) error diff --git a/client/driver/exec.go b/client/driver/exec.go index 20dd1b504..05bfc7663 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -173,7 +173,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return h, nil } -func (d *ExecDriver) Cleanup(*ExecContext, *CreatedResources) {} +func (d *ExecDriver) Cleanup(*ExecContext, string, string) error { return nil } type execId struct { Version string diff --git a/client/driver/java.go b/client/driver/java.go index 3852b01a7..8b1bd3ea1 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -260,7 +260,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return h, nil } -func (d *JavaDriver) Cleanup(*ExecContext, *CreatedResources) {} +func (d *JavaDriver) Cleanup(*ExecContext, string, string) error { return nil } // cgroupsMounted returns true if the cgroups are mounted on a system otherwise // returns false diff --git a/client/driver/lxc.go b/client/driver/lxc.go index a9cbe8aec..b6d2e6ac3 100644 --- a/client/driver/lxc.go +++ b/client/driver/lxc.go @@ -286,7 +286,7 @@ func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return &handle, nil } -func (d *LxcDriver) Cleanup(*ExecContext, *CreatedResources) {} +func (d *LxcDriver) Cleanup(*ExecContext, string, string) error { return nil } // Open creates the driver to monitor an existing LXC container func (d *LxcDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index 0277a3128..afb8aabcf 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -8,6 +8,7 @@ import ( "fmt" "log" "os" + "strconv" "time" "github.com/mitchellh/mapstructure" @@ -62,6 +63,8 @@ type MockDriverConfig struct { type MockDriver struct { DriverContext fingerprint.StaticFingerprinter + + cleanupFailNum int } // NewMockDriver is a factory method which returns a new Mock Driver @@ -124,7 +127,14 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return &h, nil } -func (m *MockDriver) Cleanup(*ExecContext, *CreatedResources) {} +func (m *MockDriver) Cleanup(ctx *ExecContext, k, v string) error { + n, _ := strconv.Atoi(m.config.Options["cleanup_fail_num"]) + if k == m.config.Options["cleanup_fail_on"] && m.cleanupFailNum < n { + m.cleanupFailNum++ + return fmt.Errorf("mock_driver failure on %q call %d/%d", k, m.cleanupFailNum, n) + } + return nil +} // Validate validates the mock driver configuration func (m *MockDriver) Validate(map[string]interface{}) error { diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 0323c791c..5a558ba04 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -341,7 +341,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro return h, nil } -func (d *QemuDriver) Cleanup(*ExecContext, *CreatedResources) {} +func (d *QemuDriver) Cleanup(*ExecContext, string, string) error { return nil } func (h *qemuHandle) ID() string { id := qemuId{ diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index db3fd4843..1ce7cdaa6 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -182,7 +182,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return h, nil } -func (d *RawExecDriver) Cleanup(*ExecContext, *CreatedResources) {} +func (d *RawExecDriver) Cleanup(*ExecContext, string, string) error { return nil } type rawExecId struct { Version string diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 5492b8073..116784db7 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -456,7 +456,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return h, nil } -func (d *RktDriver) Cleanup(*ExecContext, *CreatedResources) {} +func (d *RktDriver) Cleanup(*ExecContext, string, string) error { return nil } func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { // Parse the handle diff --git a/client/task_runner.go b/client/task_runner.go index a4b70ae8e..82697b253 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -883,7 +883,6 @@ func (r *TaskRunner) run() { select { case success := <-prestartResultCh: if !success { - //FIXME is this necessary? r.cleanup() r.setState(structs.TaskStateDead, nil) return @@ -1021,12 +1020,45 @@ func (r *TaskRunner) run() { // cleanup calls Driver.Cleanup when a task is stopping. Errors are logged. func (r *TaskRunner) cleanup() { - if drv, err := r.createDriver(); err != nil { - r.logger.Printf("[WARN] client: error creating driver to cleanup resources: %v", err) - } else { - ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) - drv.Cleanup(ctx, r.createdResources) + drv, err := r.createDriver() + if err != nil { + r.logger.Printf("[ERR] client: error creating driver to cleanup resources: %v", err) + return } + + ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) + attempts := 1 + for ; len(r.createdResources.Resources) > 0; attempts++ { + for k, items := range r.createdResources.Resources { + var retry []string + for _, v := range items { + if err := drv.Cleanup(ctx, k, v); err != nil { + r.logger.Printf("[WARN] client: error cleaning up resource %s:%s for task %q (attempt %d): %v", k, v, r.task.Name, attempts, err) + retry = append(retry, v) + continue + } + } + + if len(retry) > 0 { + // At least one cleanup failed; keep it alive for retrying + r.createdResources.Resources[k] = retry + } else { + // No failures, remove resource + delete(r.createdResources.Resources, k) + } + } + + // Retry 3 times with sleeps between + if attempts > 3 { + break + } + time.Sleep(time.Duration(attempts) * time.Second) + } + + if len(r.createdResources.Resources) > 0 { + r.logger.Printf("[ERR] client: error cleaning up resources for task %q after %d attempts", r.task.Name, attempts) + } + return } // shouldRestart returns if the task should restart. If the return value is diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 5aeae7633..67cc17187 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -1305,3 +1305,53 @@ func TestTaskRunner_SimpleRun_Dispatch(t *testing.T) { t.Fatalf("Bad; got %v; want %v", string(data), string(expected)) } } + +func TestTaskRunner_CleanupOK(t *testing.T) { + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + key := "ERR" + + ctx := testTaskRunnerFromAlloc(t, false, alloc) + ctx.tr.config.Options = map[string]string{ + "cleanup_fail_on": key, + "cleanup_fail_num": "1", + } + ctx.tr.MarkReceived() + + ctx.tr.createdResources.Resources[key] = []string{"x", "y"} + ctx.tr.createdResources.Resources["foo"] = []string{"z"} + + defer ctx.Cleanup() + ctx.tr.Run() + + // Since we only failed once, createdResources should be empty + if len(ctx.tr.createdResources.Resources) > 0 { + t.Fatalf("expected all created resources to be removed: %#v", ctx.tr.createdResources.Resources) + } +} + +func TestTaskRunner_CleanupFail(t *testing.T) { + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + key := "ERR" + ctx := testTaskRunnerFromAlloc(t, false, alloc) + ctx.tr.config.Options = map[string]string{ + "cleanup_fail_on": key, + "cleanup_fail_num": "5", + } + ctx.tr.MarkReceived() + + ctx.tr.createdResources.Resources[key] = []string{"x"} + ctx.tr.createdResources.Resources["foo"] = []string{"y", "z"} + + defer ctx.Cleanup() + ctx.tr.Run() + + // Since we failed > 3 times, the failed key should remain + expected := map[string][]string{key: {"x"}} + if !reflect.DeepEqual(expected, ctx.tr.createdResources.Resources) { + t.Fatalf("expected %#v but found: %#v", expected, ctx.tr.createdResources.Resources) + } +} From b82c69886dc1351994725064ba05da975fb5cc11 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 13 Jan 2017 12:46:55 -0800 Subject: [PATCH 04/17] Add ID to output --- client/driver/docker.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 51d60f7fe..bad4fe05f 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -101,6 +101,7 @@ type DockerDriver struct { DriverContext driverConfig *DockerDriverConfig + imageID string } type DockerDriverAuth struct { @@ -394,6 +395,7 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR res := NewCreatedResources() res.Add(dockerImageResKey, dockerImage.ID) + d.imageID = dockerImage.ID return res, nil } @@ -1042,7 +1044,8 @@ CREATE: return container, nil } - d.logger.Printf("[DEBUG] driver.docker: failed to create container %q (attempt %d): %v", config.Name, attempted+1, createErr) + d.logger.Printf("[DEBUG] driver.docker: failed to create container %q from image %q (ID: %q) (attempt %d): %v", + config.Name, d.driverConfig.ImageName, d.imageID, attempted+1, createErr) if strings.Contains(strings.ToLower(createErr.Error()), "container already exists") { containers, err := client.ListContainers(docker.ListContainersOptions{ All: true, From db096b23b5511527ebb45a9cb8eb7c78eac0ec42 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 13 Jan 2017 16:46:08 -0800 Subject: [PATCH 05/17] Switch to use recoverable errors from Cleanup TaskRunner handles retrying but Cleanup handles all of CreatedResources. --- client/driver/docker.go | 50 +++++++++++++++++++++++------------ client/driver/docker_test.go | 4 +-- client/driver/driver.go | 20 +++++++++++--- client/driver/exec.go | 2 +- client/driver/java.go | 2 +- client/driver/lxc.go | 2 +- client/driver/mock_driver.go | 21 ++++++++++----- client/driver/qemu.go | 2 +- client/driver/raw_exec.go | 2 +- client/driver/rkt.go | 2 +- client/task_runner.go | 48 ++++++++++++++++----------------- nomad/node_endpoint.go | 4 +-- nomad/structs/structs.go | 11 +++++++- nomad/structs/structs_test.go | 19 +++++++++++++ 14 files changed, 128 insertions(+), 61 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index bad4fe05f..67a22c220 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -52,7 +52,7 @@ var ( // recoverableErrTimeouts returns a recoverable error if the error was due // to timeouts - recoverableErrTimeouts = func(err error) *structs.RecoverableError { + recoverableErrTimeouts = func(err error) error { r := false if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") || strings.Contains(err.Error(), "EOF") { @@ -449,12 +449,15 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return nil, fmt.Errorf("Failed to create container configuration for image %q: %v", d.driverConfig.ImageName, err) } - container, rerr := d.createContainer(config) - if rerr != nil { - d.logger.Printf("[ERR] driver.docker: failed to create container: %s", rerr) + container, err := d.createContainer(config) + if err != nil { + d.logger.Printf("[ERR] driver.docker: failed to create container: %s", err) pluginClient.Kill() - rerr.Err = fmt.Sprintf("Failed to create container: %s", rerr.Err) - return nil, rerr + if rerr, ok := err.(*structs.RecoverableError); ok { + rerr.Err = fmt.Sprintf("Failed to create container: %s", rerr.Err) + return nil, rerr + } + return nil, err } d.logger.Printf("[INFO] driver.docker: created container %s", container.ID) @@ -498,14 +501,27 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return h, nil } -func (d *DockerDriver) Cleanup(_ *ExecContext, key, value string) error { - switch key { - case dockerImageResKey: - return d.cleanupImage(value) - default: - d.logger.Printf("[WARN] driver.docker: unknown resource to cleanup: %q -> %q", key, value) - return nil +func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error { + retry := false + var merr multierror.Error + for key, resources := range res.Resources { + switch key { + case dockerImageResKey: + for _, value := range resources { + if err := d.cleanupImage(value); err != nil { + if structs.IsRecoverable(err) { + // This will be retried so put it back into the map + res.Add(key, value) + retry = true + } + merr.Errors = append(merr.Errors, err) + } + } + default: + d.logger.Printf("[WARN] driver.docker: unknown resource to cleanup: %q", key) + } } + return structs.NewRecoverableError(merr.ErrorOrNil(), retry) } // cleanupImage removes a Docker image. No error is returned if the image @@ -526,8 +542,8 @@ func (d *DockerDriver) cleanupImage(id string) error { d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id) return nil } - d.logger.Printf("[WARN] driver.docker: cleanup failed to remove downloaded image %q: %v", id, err) - return err + // Retry on unknown errors + return structs.NewRecoverableError(err, true) } d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id) @@ -1035,7 +1051,7 @@ func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docke // createContainer creates the container given the passed configuration. It // attempts to handle any transient Docker errors. -func (d *DockerDriver) createContainer(config docker.CreateContainerOptions) (*docker.Container, *structs.RecoverableError) { +func (d *DockerDriver) createContainer(config docker.CreateContainerOptions) (*docker.Container, error) { // Create a container attempted := 0 CREATE: @@ -1108,7 +1124,7 @@ CREATE: // startContainer starts the passed container. It attempts to handle any // transient Docker errors. -func (d *DockerDriver) startContainer(c *docker.Container) *structs.RecoverableError { +func (d *DockerDriver) startContainer(c *docker.Container) error { // Start a container attempted := 0 START: diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 0dc79587f..875273f8d 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -1304,7 +1304,7 @@ func TestDockerDriver_Cleanup(t *testing.T) { } // Cleanup - if err := driver.Cleanup(tctx.ExecCtx, dockerImageResKey, res.Resources[dockerImageResKey][0]); err != nil { + if err := driver.Cleanup(tctx.ExecCtx, res.Copy()); err != nil { t.Fatalf("Cleanup failed: %v", err) } @@ -1315,7 +1315,7 @@ func TestDockerDriver_Cleanup(t *testing.T) { // The image doesn't exist which shouldn't be an error when calling // Cleanup, so call it again to make sure. - if err := driver.Cleanup(tctx.ExecCtx, dockerImageResKey, res.Resources[dockerImageResKey][0]); err != nil { + if err := driver.Cleanup(tctx.ExecCtx, res.Copy()); err != nil { t.Fatalf("Cleanup failed: %v", err) } } diff --git a/client/driver/driver.go b/client/driver/driver.go index ffa675a2e..3c08455e3 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -85,6 +85,19 @@ func (r *CreatedResources) Add(k, v string) { return } +// Copy returns a new deep copy of CreatedResrouces. +func (r *CreatedResources) Copy() *CreatedResources { + newr := CreatedResources{ + Resources: make(map[string][]string, len(r.Resources)), + } + for k, v := range r.Resources { + newv := make([]string, len(v)) + copy(newv, v) + newr.Resources[k] = newv + } + return &newr +} + // Merge another CreatedResources into this one. If the other CreatedResources // is nil this method is a noop. func (r *CreatedResources) Merge(o *CreatedResources) { @@ -137,9 +150,10 @@ type Driver interface { // Cleanup is called to remove resources which were created for a task // and no longer needed. // - // Cleanup is called once for every value for every key in - // CreatedResources. Errors will cause Cleanup to be retried. - Cleanup(ctx *ExecContext, key, value string) error + // If Cleanup returns a recoverable error it may be retried. On retry + // it will be passed the same CreatedResources, so all successfully + // cleaned up resources should be removed. + Cleanup(*ExecContext, *CreatedResources) error // Drivers must validate their configuration Validate(map[string]interface{}) error diff --git a/client/driver/exec.go b/client/driver/exec.go index 05bfc7663..1fcd9f5f5 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -173,7 +173,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return h, nil } -func (d *ExecDriver) Cleanup(*ExecContext, string, string) error { return nil } +func (d *ExecDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } type execId struct { Version string diff --git a/client/driver/java.go b/client/driver/java.go index 8b1bd3ea1..1d0566921 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -260,7 +260,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return h, nil } -func (d *JavaDriver) Cleanup(*ExecContext, string, string) error { return nil } +func (d *JavaDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } // cgroupsMounted returns true if the cgroups are mounted on a system otherwise // returns false diff --git a/client/driver/lxc.go b/client/driver/lxc.go index b6d2e6ac3..5b541a3f8 100644 --- a/client/driver/lxc.go +++ b/client/driver/lxc.go @@ -286,7 +286,7 @@ func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return &handle, nil } -func (d *LxcDriver) Cleanup(*ExecContext, string, string) error { return nil } +func (d *LxcDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } // Open creates the driver to monitor an existing LXC container func (d *LxcDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index afb8aabcf..b81eca64f 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -127,13 +127,22 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return &h, nil } -func (m *MockDriver) Cleanup(ctx *ExecContext, k, v string) error { - n, _ := strconv.Atoi(m.config.Options["cleanup_fail_num"]) - if k == m.config.Options["cleanup_fail_on"] && m.cleanupFailNum < n { - m.cleanupFailNum++ - return fmt.Errorf("mock_driver failure on %q call %d/%d", k, m.cleanupFailNum, n) +// Cleanup deletes all keys except for Config.Options["cleanup_fail_on"] for +// Config.Options["cleanup_fail_num"] times. For failures it will return a +// recoverable error. +func (m *MockDriver) Cleanup(ctx *ExecContext, res *CreatedResources) error { + var err error + failn, _ := strconv.Atoi(m.config.Options["cleanup_fail_num"]) + failk := m.config.Options["cleanup_fail_on"] + for k := range res.Resources { + if k == failk && m.cleanupFailNum < failn { + m.cleanupFailNum++ + err = structs.NewRecoverableError(fmt.Errorf("mock_driver failure on %q call %d/%d", k, m.cleanupFailNum, failn), true) + } else { + delete(res.Resources, k) + } } - return nil + return err } // Validate validates the mock driver configuration diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 5a558ba04..075e8d5f0 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -341,7 +341,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro return h, nil } -func (d *QemuDriver) Cleanup(*ExecContext, string, string) error { return nil } +func (d *QemuDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } func (h *qemuHandle) ID() string { id := qemuId{ diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 1ce7cdaa6..5e084e41f 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -182,7 +182,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return h, nil } -func (d *RawExecDriver) Cleanup(*ExecContext, string, string) error { return nil } +func (d *RawExecDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } type rawExecId struct { Version string diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 116784db7..f95f32525 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -456,7 +456,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return h, nil } -func (d *RktDriver) Cleanup(*ExecContext, string, string) error { return nil } +func (d *RktDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { // Parse the handle diff --git a/client/task_runner.go b/client/task_runner.go index 82697b253..3d3f95ede 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -95,9 +95,8 @@ type TaskRunner struct { // createdResources are all the resources created by the task driver // across all attempts to start the task. - // - // Must acquire persistLock when accessing - createdResources *driver.CreatedResources + createdResources *driver.CreatedResources + createdResourcesLock sync.Mutex // payloadRendered tracks whether the payload has been rendered to disk payloadRendered bool @@ -304,13 +303,17 @@ func (r *TaskRunner) SaveState() error { r.persistLock.Lock() defer r.persistLock.Unlock() + r.createdResourcesLock.Lock() + res := r.createdResources.Copy() + r.createdResourcesLock.Unlock() + snap := taskRunnerState{ Task: r.task, Version: r.config.Version, ArtifactDownloaded: r.artifactsDownloaded, TaskDirBuilt: r.taskDirBuilt, PayloadRendered: r.payloadRendered, - CreatedResources: r.createdResources, + CreatedResources: res, } r.handleLock.Lock() @@ -1026,28 +1029,25 @@ func (r *TaskRunner) cleanup() { return } + r.createdResourcesLock.Lock() + res := r.createdResources.Copy() + r.createdResourcesLock.Unlock() + ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) attempts := 1 - for ; len(r.createdResources.Resources) > 0; attempts++ { - for k, items := range r.createdResources.Resources { - var retry []string - for _, v := range items { - if err := drv.Cleanup(ctx, k, v); err != nil { - r.logger.Printf("[WARN] client: error cleaning up resource %s:%s for task %q (attempt %d): %v", k, v, r.task.Name, attempts, err) - retry = append(retry, v) - continue - } - } - - if len(retry) > 0 { - // At least one cleanup failed; keep it alive for retrying - r.createdResources.Resources[k] = retry - } else { - // No failures, remove resource - delete(r.createdResources.Resources, k) - } + var cleanupErr error + for retry := true; retry; attempts++ { + retry = false + if cleanupErr = drv.Cleanup(ctx, res); cleanupErr != nil { + retry = structs.IsRecoverable(cleanupErr) } + // Copy current createdResources state in case SaveState is + // called between retries + r.createdResourcesLock.Lock() + r.createdResources = res.Copy() + r.createdResourcesLock.Unlock() + // Retry 3 times with sleeps between if attempts > 3 { break @@ -1055,8 +1055,8 @@ func (r *TaskRunner) cleanup() { time.Sleep(time.Duration(attempts) * time.Second) } - if len(r.createdResources.Resources) > 0 { - r.logger.Printf("[ERR] client: error cleaning up resources for task %q after %d attempts", r.task.Name, attempts) + if cleanupErr != nil { + r.logger.Printf("[ERR] client: error cleaning up resources for task %q after %d attempts: %v", r.task.Name, attempts, cleanupErr) } return } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 5328d9671..d2787535a 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -944,7 +944,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, // setErr is a helper for setting the recoverable error on the reply and // logging it setErr := func(e error, recoverable bool) { - reply.Error = structs.NewRecoverableError(e, recoverable) + reply.Error = structs.NewRecoverableError(e, recoverable).(*structs.RecoverableError) n.srv.logger.Printf("[ERR] nomad.client: DeriveVaultToken failed (recoverable %v): %v", recoverable, e) } @@ -1134,7 +1134,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, if rerr, ok := createErr.(*structs.RecoverableError); ok { reply.Error = rerr } else { - reply.Error = structs.NewRecoverableError(createErr, false) + reply.Error = structs.NewRecoverableError(createErr, false).(*structs.RecoverableError) } return nil diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 32eb3f85d..2be53e606 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4049,7 +4049,7 @@ type RecoverableError struct { // NewRecoverableError is used to wrap an error and mark it as recoverable or // not. -func NewRecoverableError(e error, recoverable bool) *RecoverableError { +func NewRecoverableError(e error, recoverable bool) error { if e == nil { return nil } @@ -4063,3 +4063,12 @@ func NewRecoverableError(e error, recoverable bool) *RecoverableError { func (r *RecoverableError) Error() string { return r.Err } + +// IsRecoverable returns true if error is a RecoverableError with +// Recoverable=true. Otherwise false is returned. +func IsRecoverable(e error) bool { + if re, ok := e.(*RecoverableError); ok { + return re.Recoverable + } + return false +} diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 4ba18b65c..8dbf5376f 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1,6 +1,7 @@ package structs import ( + "fmt" "reflect" "strings" "testing" @@ -1505,3 +1506,21 @@ func TestDispatchInputConfig_Validate(t *testing.T) { t.Fatalf("bad: %v", err) } } + +func TestIsRecoverable(t *testing.T) { + if IsRecoverable(nil) { + t.Errorf("nil should not be recoverable") + } + if IsRecoverable(NewRecoverableError(nil, true)) { + t.Errorf("NewRecoverableError(nil, true) should not be recoverable") + } + if IsRecoverable(fmt.Errorf("i promise im recoverable")) { + t.Errorf("Custom errors should not be recoverable") + } + if IsRecoverable(NewRecoverableError(fmt.Errorf(""), false)) { + t.Errorf("Explicitly unrecoverable errors should not be recoverable") + } + if !IsRecoverable(NewRecoverableError(fmt.Errorf(""), true)) { + t.Errorf("Explicitly recoverable errors *should* be recoverable") + } +} From 1f35e975103ea09e873e7d2d1ee817f8b5888e35 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 13 Jan 2017 16:53:58 -0800 Subject: [PATCH 06/17] Use Image ID instead of Image Name --- client/driver/docker.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 67a22c220..0cf5fb026 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -444,9 +444,9 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle config, err := d.createContainerConfig(ctx, task, d.driverConfig, syslogAddr) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %q: %v", d.driverConfig.ImageName, err) + d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %q (%q): %v", d.driverConfig.ImageName, d.imageID, err) pluginClient.Kill() - return nil, fmt.Errorf("Failed to create container configuration for image %q: %v", d.driverConfig.ImageName, err) + return nil, fmt.Errorf("Failed to create container configuration for image %q (%q): %v", d.driverConfig.ImageName, d.imageID, err) } container, err := d.createContainer(config) @@ -723,7 +723,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas } config := &docker.Config{ - Image: driverConfig.ImageName, + Image: d.imageID, Hostname: driverConfig.Hostname, User: task.User, Tty: driverConfig.TTY, From b100293a06a94b53684643954a96f239800a60e7 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 17 Jan 2017 12:51:19 -0800 Subject: [PATCH 07/17] Try to get test passing in Travis --- client/driver/exec_test.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index e5d8b24f3..279480435 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -145,12 +145,20 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { handle2.Kill() t.Fatalf("expected handle2 to be nil") } + // Test if the userpid is still present - userProc, err := os.FindProcess(id.UserPid) + userProc, _ := os.FindProcess(id.UserPid) - err = userProc.Signal(syscall.Signal(0)) + for retry := 3; retry > 0; retry-- { + if err = userProc.Signal(syscall.Signal(0)); err != nil { + // Process is gone as expected; exit + return + } - if err == nil { + // Killing processes is async; wait and check again + time.Sleep(time.Second) + } + if err = userProc.Signal(syscall.Signal(0)); err == nil { t.Fatalf("expected user process to die") } } From 8e82326c2f761f8a0a6f086e572f05a56359f7d2 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 17 Jan 2017 13:10:20 -0800 Subject: [PATCH 08/17] Prevent race between alloc runners Block ar1's periodic syncing which could recreate the state file ar2 was destroying. --- client/alloc_runner_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 390117926..649dbdebc 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -405,6 +405,10 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { t.Fatalf("err: %v", err) } + // Ensure ar1 doesn't recreate the state file + ar.persistLock.Lock() + defer ar.persistLock.Unlock() + // Ensure both alloc runners don't destroy ar.destroy = true From 4dcba012f400ee7833fcbbb8e667ef7a06999eed Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 17 Jan 2017 16:04:09 -0800 Subject: [PATCH 09/17] Return error from Prestart --- client/driver/docker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 0cf5fb026..dbe9dfc44 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -390,7 +390,7 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR // extremely rare. There's no point in returning an error // because the image won't be redownloaded as it now exists. d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err) - return nil, nil + return nil, err } res := NewCreatedResources() From 17d9e8e189eaf0a66176ec9834d08fe247d05234 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 17 Jan 2017 16:05:21 -0800 Subject: [PATCH 10/17] Remove outdated comment --- client/driver/docker.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index dbe9dfc44..2539e1512 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -386,9 +386,6 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR // still in use by another contianer. dockerImage, err := client.InspectImage(driverConfig.ImageName) if err != nil { - // When an error occurs we leak the image, but this should be - // extremely rare. There's no point in returning an error - // because the image won't be redownloaded as it now exists. d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err) return nil, err } From a17c3a9ded5ae88e758f46d2c37b2d27c77ee5f0 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 17 Jan 2017 16:13:40 -0800 Subject: [PATCH 11/17] Updated CreatedResources as images are cleaned --- client/driver/docker.go | 3 +++ client/driver/docker_test.go | 8 +++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 2539e1512..5d98ca963 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -504,6 +504,9 @@ func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error { for key, resources := range res.Resources { switch key { case dockerImageResKey: + // Remove and only add back images that failed to be + // removed in a retryable way + delete(res.Resources, key) for _, value := range resources { if err := d.cleanupImage(value); err != nil { if structs.IsRecoverable(err) { diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 875273f8d..ee38b629a 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -1304,10 +1304,16 @@ func TestDockerDriver_Cleanup(t *testing.T) { } // Cleanup - if err := driver.Cleanup(tctx.ExecCtx, res.Copy()); err != nil { + rescopy := res.Copy() + if err := driver.Cleanup(tctx.ExecCtx, rescopy); err != nil { t.Fatalf("Cleanup failed: %v", err) } + // Make sure rescopy is updated + if len(rescopy.Resources) > 0 { + t.Errorf("Cleanup should have cleared resource map: %#v", rescopy.Resources) + } + // Ensure image was removed if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil { t.Fatalf("image exists but should have been removed. Does another %v container exist?", imageName) From 9c012a71e01ae03142f8e818e70fa24b61e4b5be Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 17 Jan 2017 16:23:29 -0800 Subject: [PATCH 12/17] Remove outdated comment --- client/driver/docker.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 5d98ca963..2932e2647 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -510,11 +510,12 @@ func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error { for _, value := range resources { if err := d.cleanupImage(value); err != nil { if structs.IsRecoverable(err) { - // This will be retried so put it back into the map - res.Add(key, value) retry = true } merr.Errors = append(merr.Errors, err) + + // Re-add all failures to the map + res.Add(key, value) } } default: @@ -945,8 +946,6 @@ func (d *DockerDriver) Periodic() (bool, time.Duration) { // createImage creates a docker image either by pulling it from a registry or by // loading it from the file system -// -// Returns true if an image was downloaded and should be cleaned up. func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error { image := driverConfig.ImageName repo, tag := docker.ParseRepositoryTag(image) From cf06204c823eacd34b00bced36359e1765d2c439 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 17 Jan 2017 16:41:59 -0800 Subject: [PATCH 13/17] Add CreatedResources.Remove and use it --- client/driver/docker.go | 13 ++++++------ client/driver/driver.go | 17 +++++++++++++++ client/driver/driver_test.go | 40 +++++++++++++++++++++++++++++++++++- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 2932e2647..08217d81a 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -504,19 +504,18 @@ func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error { for key, resources := range res.Resources { switch key { case dockerImageResKey: - // Remove and only add back images that failed to be - // removed in a retryable way - delete(res.Resources, key) for _, value := range resources { - if err := d.cleanupImage(value); err != nil { + err := d.cleanupImage(value) + if err != nil { if structs.IsRecoverable(err) { retry = true } merr.Errors = append(merr.Errors, err) - - // Re-add all failures to the map - res.Add(key, value) + continue } + + // Remove cleaned image from resources + res.Remove(dockerImageResKey, value) } default: d.logger.Printf("[WARN] driver.docker: unknown resource to cleanup: %q", key) diff --git a/client/driver/driver.go b/client/driver/driver.go index 3c08455e3..52db1f485 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -85,6 +85,23 @@ func (r *CreatedResources) Add(k, v string) { return } +// Remove a resource. Return true if removed, otherwise false. +// +// Removes the entire key if the needle is the last value in the list. +func (r *CreatedResources) Remove(k, needle string) bool { + haystack := r.Resources[k] + for i, item := range haystack { + if item == needle { + r.Resources[k] = append(haystack[:i], haystack[i+1:]...) + if len(r.Resources[k]) == 0 { + delete(r.Resources, k) + } + return true + } + } + return false +} + // Copy returns a new deep copy of CreatedResrouces. func (r *CreatedResources) Copy() *CreatedResources { newr := CreatedResources{ diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 51e0dbe58..6cca2b4c7 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -257,7 +257,7 @@ func TestMapMergeStrStr(t *testing.T) { } } -func TestCreatedResources(t *testing.T) { +func TestCreatedResources_AddMerge(t *testing.T) { res1 := NewCreatedResources() res1.Add("k1", "v1") res1.Add("k1", "v2") @@ -295,3 +295,41 @@ func TestCreatedResources(t *testing.T) { t.Fatalf("3. %#v != expected %#v", res1.Resources, expected) } } + +func TestCreatedResources_CopyRemove(t *testing.T) { + res1 := NewCreatedResources() + res1.Add("k1", "v1") + res1.Add("k1", "v2") + res1.Add("k1", "v3") + res1.Add("k2", "v1") + + // Assert Copy creates a deep copy + res2 := res1.Copy() + + if !reflect.DeepEqual(res1, res2) { + t.Fatalf("%#v != %#v", res1, res2) + } + + // Assert removing v1 from k1 returns true and updates Resources slice + if removed := res2.Remove("k1", "v1"); !removed { + t.Fatalf("expected v1 to be removed: %#v", res2) + } + + if expected := []string{"v2", "v3"}; !reflect.DeepEqual(expected, res2.Resources["k1"]) { + t.Fatalf("unpexpected list for k1: %#v", res2.Resources["k1"]) + } + + // Assert removing the only value from a key removes the key + if removed := res2.Remove("k2", "v1"); !removed { + t.Fatalf("expected v1 to be removed from k2: %#v", res2.Resources) + } + + if _, found := res2.Resources["k2"]; found { + t.Fatalf("k2 should have been removed from Resources: %#v", res2.Resources) + } + + // Make sure res1 wasn't updated + if reflect.DeepEqual(res1, res2) { + t.Fatalf("res1 should not equal res2: #%v", res1) + } +} From c2ec30c47b285e9a30c667bacadf9632cff68826 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 19 Jan 2017 09:48:07 -0800 Subject: [PATCH 14/17] Bump unknown resource to ERR --- client/driver/docker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 08217d81a..d907945d2 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -518,7 +518,7 @@ func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error { res.Remove(dockerImageResKey, value) } default: - d.logger.Printf("[WARN] driver.docker: unknown resource to cleanup: %q", key) + d.logger.Printf("[ERR] driver.docker: unknown resource to cleanup: %q", key) } } return structs.NewRecoverableError(merr.ErrorOrNil(), retry) From cf0157af89f57e9a75af664efdcc2e929ade0932 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 19 Jan 2017 11:39:18 -0800 Subject: [PATCH 15/17] Fix incorrect lock usage --- client/task_runner.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/task_runner.go b/client/task_runner.go index 3d3f95ede..31076ccb1 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1162,9 +1162,9 @@ func (r *TaskRunner) startTask() error { res, err := drv.Prestart(ctx, r.task) // Merge newly created resources into previously created resources - r.persistLock.Lock() + r.createdResourcesLock.Lock() r.createdResources.Merge(res) - r.persistLock.Unlock() + r.createdResourcesLock.Unlock() if err != nil { wrapped := fmt.Errorf("failed to initialize task %q for alloc %q: %v", From 828151ea38f5d4d0d58de0cdaed3cac616d52bd8 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 19 Jan 2017 15:07:01 -0800 Subject: [PATCH 16/17] Exit early when cleanup succeeds --- client/task_runner.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client/task_runner.go b/client/task_runner.go index 31076ccb1..399c0de24 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1038,9 +1038,11 @@ func (r *TaskRunner) cleanup() { var cleanupErr error for retry := true; retry; attempts++ { retry = false - if cleanupErr = drv.Cleanup(ctx, res); cleanupErr != nil { - retry = structs.IsRecoverable(cleanupErr) + cleanupErr = drv.Cleanup(ctx, res) + if cleanupErr == nil { + return } + retry = structs.IsRecoverable(cleanupErr) // Copy current createdResources state in case SaveState is // called between retries From 783118b34d02813c779612845dbf3a59b8295e6b Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 19 Jan 2017 16:48:23 -0800 Subject: [PATCH 17/17] Update created resources before exiting cleanup --- client/task_runner.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/client/task_runner.go b/client/task_runner.go index 399c0de24..443846e58 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1037,11 +1037,7 @@ func (r *TaskRunner) cleanup() { attempts := 1 var cleanupErr error for retry := true; retry; attempts++ { - retry = false cleanupErr = drv.Cleanup(ctx, res) - if cleanupErr == nil { - return - } retry = structs.IsRecoverable(cleanupErr) // Copy current createdResources state in case SaveState is @@ -1051,7 +1047,7 @@ func (r *TaskRunner) cleanup() { r.createdResourcesLock.Unlock() // Retry 3 times with sleeps between - if attempts > 3 { + if !retry || attempts > 3 { break } time.Sleep(time.Duration(attempts) * time.Second)