From 33c015bcc767d970b5cf1029970313501c01d0b4 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 10 Jan 2017 13:24:45 -0800 Subject: [PATCH] Add Cleanup method to Driver interface Cleanup can be used for cleaning up resources created by drivers to run a task. Initially the Docker driver is the only user (to remove downloaded images). --- client/driver/docker.go | 118 +++++++++++++++++++-------------- client/driver/docker_test.go | 92 ++++++++++++++++++++----- client/driver/driver.go | 74 ++++++++++++++++++++- client/driver/driver_test.go | 39 +++++++++++ client/driver/exec.go | 6 +- client/driver/exec_test.go | 14 ++-- client/driver/java.go | 6 +- client/driver/java_test.go | 10 +-- client/driver/lxc.go | 6 +- client/driver/lxc_test.go | 4 +- client/driver/mock_driver.go | 6 +- client/driver/qemu.go | 6 +- client/driver/qemu_test.go | 4 +- client/driver/raw_exec.go | 6 +- client/driver/raw_exec_test.go | 12 ++-- client/driver/rkt.go | 6 +- client/driver/rkt_test.go | 14 ++-- client/task_runner.go | 75 ++++++++++++++++----- 18 files changed, 372 insertions(+), 126 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index a740fc51f..b2ebb5397 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -84,15 +84,22 @@ const ( // Docker's privileged mode. dockerPrivilegedConfigOption = "docker.privileged.enabled" + // dockerCleanupImageConfigOption is the key for whether or not to + // cleanup images after the task exits. + dockerCleanupImageConfigOption = "docker.cleanup.image" + dockerCleanupImageConfigDefault = true + // dockerTimeout is the length of time a request can be outstanding before // it is timed out. dockerTimeout = 5 * time.Minute + + // dockerImageResKey is the CreatedResources key for docker images + dockerImageResKey = "image" ) type DockerDriver struct { DriverContext - imageID string driverConfig *DockerDriverConfig } @@ -237,8 +244,6 @@ type DockerHandle struct { client *docker.Client waitClient *docker.Client logger *log.Logger - cleanupImage bool - imageID string containerID string version string clkSpeed float64 @@ -355,35 +360,42 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation { return cstructs.FSIsolationImage } -func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) error { +func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) { driverConfig, err := NewDockerDriverConfig(task, d.taskEnv) if err != nil { - return err + return nil, err } + // Set state needed by Start() + d.driverConfig = driverConfig + // Initialize docker API clients client, _, err := d.dockerClients() if err != nil { - return fmt.Errorf("Failed to connect to docker daemon: %s", err) + return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) } - if err := d.createImage(driverConfig, client, ctx.TaskDir); err != nil { - return err - } - - image := driverConfig.ImageName - // Now that we have the image we can get the image id - dockerImage, err := client.InspectImage(image) + // Ensure the image is available + pulled, err := d.createImage(driverConfig, client, ctx.TaskDir) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed getting image id for %s: %s", image, err) - return fmt.Errorf("Failed to determine image id for `%s`: %s", image, err) + return nil, err } - d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID) + if pulled { + // An image was downloaded, add the ID to created resources + dockerImage, err := client.InspectImage(driverConfig.ImageName) + if err == nil { + res := NewCreatedResources() + res.Add(dockerImageResKey, dockerImage.ID) + return res, nil + } - // Set state needed by Start() - d.imageID = dockerImage.ID - d.driverConfig = driverConfig - return nil + // When an error occurs we leak the image, but this should be + // extremely rare. There's no point in returning an error + // because the image won't be redownloaded as it now exists. + d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err) + } + + return nil, nil } func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -431,9 +443,9 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle config, err := d.createContainerConfig(ctx, task, d.driverConfig, syslogAddr) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", d.imageID, err) + d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %q: %v", d.driverConfig.ImageName, err) pluginClient.Kill() - return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", d.imageID, err) + return nil, fmt.Errorf("Failed to create container configuration for image %q: %v", d.driverConfig.ImageName, err) } container, rerr := d.createContainer(config) @@ -446,8 +458,6 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle d.logger.Printf("[INFO] driver.docker: created container %s", container.ID) - cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true) - // We don't need to start the container if the container is already running // since we don't create containers which are already present on the host // and are running @@ -471,9 +481,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle waitClient: waitClient, executor: exec, pluginClient: pluginClient, - cleanupImage: cleanupImage, logger: d.logger, - imageID: d.imageID, containerID: container.ID, version: d.config.Version, killTimeout: GetKillTimeout(task.KillTimeout, maxKill), @@ -489,6 +497,25 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return h, nil } +func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) { + if res == nil { + // Nothing to cleanup + return + } + + // If the client should cleanup images and the image was downloaded, remove it + cleanupImage := d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) + if ids := res.Resources[dockerImageResKey]; cleanupImage && len(ids) > 0 { + for _, id := range ids { + if err := client.RemoveImage(id); err != nil { + d.logger.Printf("[WARN] driver.docker: cleanup failed to remove downloaded image %q: %v", id, err) + } else { + d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id) + } + } + } +} + // dockerClients creates two *docker.Client, one for long running operations and // the other for shorter operations. In test / dev mode we can use ENV vars to // connect to the docker daemon. In production mode we will read docker.endpoint @@ -884,33 +911,37 @@ func (d *DockerDriver) Periodic() (bool, time.Duration) { // createImage creates a docker image either by pulling it from a registry or by // loading it from the file system -func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error { +// +// Returns true if an image was downloaded and should be cleaned up. +func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) (bool, error) { image := driverConfig.ImageName repo, tag := docker.ParseRepositoryTag(image) if tag == "" { tag = "latest" } - var dockerImage *docker.Image - var err error // We're going to check whether the image is already downloaded. If the tag // is "latest", or ForcePull is set, we have to check for a new version every time so we don't // bother to check and cache the id here. We'll download first, then cache. if driverConfig.ForcePull { d.logger.Printf("[DEBUG] driver.docker: force pull image '%s:%s' instead of inspecting local", repo, tag) } else if tag != "latest" { - dockerImage, err = client.InspectImage(image) + if dockerImage, _ := client.InspectImage(image); dockerImage != nil { + // Image exists, nothing to do + return false, nil + } + } + + // Load the image if specified + if len(driverConfig.LoadImages) > 0 { + return false, d.loadImage(driverConfig, client, taskDir) } // Download the image - if dockerImage == nil { - if len(driverConfig.LoadImages) > 0 { - return d.loadImage(driverConfig, client, taskDir) - } - - return d.pullImage(driverConfig, client, repo, tag) + if err := d.pullImage(driverConfig, client, repo, tag); err != nil { + return false, err } - return err + return true, nil } // pullImage creates an image by pulling it from a docker registry @@ -960,6 +991,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) return d.recoverablePullError(err, driverConfig.ImageName) } + d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag) return nil } @@ -1081,8 +1113,6 @@ START: } func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { - cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true) - // Split the handle pidBytes := []byte(strings.TrimPrefix(handleID, "DOCKER:")) pid := &dockerPID{} @@ -1138,9 +1168,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er waitClient: waitClient, executor: exec, pluginClient: pluginClient, - cleanupImage: cleanupImage, logger: d.logger, - imageID: pid.ImageID, containerID: pid.ContainerID, version: pid.Version, killTimeout: pid.KillTimeout, @@ -1161,7 +1189,6 @@ func (h *DockerHandle) ID() string { // Return a handle to the PID pid := dockerPID{ Version: h.version, - ImageID: h.imageID, ContainerID: h.containerID, KillTimeout: h.killTimeout, MaxKillTimeout: h.maxKillTimeout, @@ -1278,13 +1305,6 @@ func (h *DockerHandle) run() { h.logger.Printf("[ERR] driver.docker: error removing container: %v", err) } - // Cleanup the image - if h.cleanupImage { - if err := h.client.RemoveImage(h.imageID); err != nil { - h.logger.Printf("[DEBUG] driver.docker: error removing image: %v", err) - } - } - // Send the results h.waitCh <- dstructs.NewWaitResult(exitCode, 0, werr) close(h.waitCh) diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 9b1e129d5..fdde597db 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -99,21 +99,25 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie driver := NewDockerDriver(tctx.DriverCtx) copyImage(t, tctx.ExecCtx.TaskDir, "busybox.tar") - if err := driver.Prestart(tctx.ExecCtx, task); err != nil { + res, err := driver.Prestart(tctx.ExecCtx, task) + if err != nil { tctx.AllocDir.Destroy() t.Fatalf("error in prestart: %v", err) } handle, err := driver.Start(tctx.ExecCtx, task) if err != nil { + driver.Cleanup(tctx.ExecCtx, res) tctx.AllocDir.Destroy() t.Fatalf("Failed to start driver: %s\nStack\n%s", err, debug.Stack()) } if handle == nil { + driver.Cleanup(tctx.ExecCtx, res) tctx.AllocDir.Destroy() t.Fatalf("handle is nil\nStack\n%s", debug.Stack()) } cleanup := func() { + driver.Cleanup(tctx.ExecCtx, res) handle.Kill() tctx.AllocDir.Destroy() } @@ -182,14 +186,19 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer d.Cleanup(ctx.ExecCtx, res) + handle, err := d.Start(ctx.ExecCtx, task) if err != nil { + d.Cleanup(ctx.ExecCtx, res) t.Fatalf("err: %v", err) } if handle == nil { + d.Cleanup(ctx.ExecCtx, res) t.Fatalf("missing handle") } defer handle.Kill() @@ -276,9 +285,11 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) { // Copy the image into the task's directory copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer d.Cleanup(ctx.ExecCtx, res) handle, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -340,8 +351,9 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) - err := d.Prestart(ctx.ExecCtx, task) + res, err := d.Prestart(ctx.ExecCtx, task) if err == nil { + d.Cleanup(ctx.ExecCtx, res) t.Fatalf("want error in prestart: %v", err) } @@ -391,9 +403,11 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer d.Cleanup(ctx.ExecCtx, res) handle, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -477,7 +491,6 @@ func TestDockerDriver_StartN(t *testing.T) { t.Logf("Starting %d tasks", len(taskList)) // Let's spin up a bunch of things - var err error for idx, task := range taskList { ctx := testDriverContexts(t, task) ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} @@ -485,9 +498,11 @@ func TestDockerDriver_StartN(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart #%d: %v", idx+1, err) } + defer d.Cleanup(ctx.ExecCtx, res) handles[idx], err = d.Start(ctx.ExecCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) @@ -535,7 +550,6 @@ func TestDockerDriver_StartNVersions(t *testing.T) { t.Logf("Starting %d tasks", len(taskList)) // Let's spin up a bunch of things - var err error for idx, task := range taskList { ctx := testDriverContexts(t, task) ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} @@ -545,9 +559,11 @@ func TestDockerDriver_StartNVersions(t *testing.T) { copyImage(t, ctx.ExecCtx.TaskDir, "busybox_musl.tar") copyImage(t, ctx.ExecCtx.TaskDir, "busybox_glibc.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart #%d: %v", idx+1, err) } + defer d.Cleanup(ctx.ExecCtx, res) handles[idx], err = d.Start(ctx.ExecCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) @@ -708,7 +724,7 @@ func TestDockerDriver_ForcePull_IsInvalidConfig(t *testing.T) { ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} driver := NewDockerDriver(ctx.DriverCtx) - if err := driver.Prestart(ctx.ExecCtx, task); err == nil { + if _, err := driver.Prestart(ctx.ExecCtx, task); err == nil { t.Fatalf("error expected in prestart") } } @@ -916,9 +932,11 @@ func TestDockerDriver_User(t *testing.T) { defer ctx.AllocDir.Destroy() copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := driver.Prestart(ctx.ExecCtx, task); err != nil { + res, err := driver.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer driver.Cleanup(ctx.ExecCtx, res) // It should fail because the user "alice" does not exist on the given // image. @@ -1072,9 +1090,11 @@ done fmt.Errorf("Failed to write data") } - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + res, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer d.Cleanup(ctx.ExecCtx, res) handle, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -1194,9 +1214,11 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, _, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() - if err := driver.Prestart(execCtx, task); err != nil { + res, err := driver.Prestart(execCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer driver.Cleanup(execCtx, res) if _, err := driver.Start(execCtx, task); err == nil { t.Fatalf("Started driver successfully when volumes should have been disabled.") } @@ -1207,9 +1229,11 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, fn, cleanup := setupDockerVolumes(t, cfg, ".") defer cleanup() - if err := driver.Prestart(execCtx, task); err != nil { + res, err := driver.Prestart(execCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer driver.Cleanup(execCtx, res) handle, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -1243,9 +1267,11 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) { task, driver, execCtx, hostpath, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() - if err := driver.Prestart(execCtx, task); err != nil { + res, err := driver.Prestart(execCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + defer driver.Cleanup(execCtx, res) handle, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("Failed to start docker driver: %v", err) @@ -1266,6 +1292,42 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) { } } +// TestDockerDriver_Cleanup ensures Cleanup removes only downloaded images. +func TestDockerDriver_Cleanup(t *testing.T) { + if !testutil.DockerIsConnected(t) { + t.SkipNow() + } + + imageName := "hello-world:latest" + task := &structs.Task{ + Name: "cleanup_test", + Driver: "docker", + Config: map[string]interface{}{ + "image": imageName, + }, + } + tctx := testDriverContexts(t, task) + defer tctx.AllocDir.Destroy() + + // Run Prestart + driver := NewDockerDriver(tctx.DriverCtx).(*DockerDriver) + res, err := driver.Prestart(tctx.ExecCtx, task) + if err != nil { + t.Fatalf("error in prestart: %v", err) + } + if len(res.Resources) == 0 || len(res.Resources[dockerImageResKey]) == 0 { + t.Fatalf("no created resources: %#v", res) + } + + // Cleanup + driver.Cleanup(tctx.ExecCtx, res) + + // Ensure image was removed + if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil { + t.Fatalf("image exists but should have been removed. Does another %v container exist?", imageName) + } +} + func copyImage(t *testing.T, taskDir *allocdir.TaskDir, image string) { dst := filepath.Join(taskDir.LocalDir, image) copyFile(filepath.Join("./test-resources/docker", image), dst, t) diff --git a/client/driver/driver.go b/client/driver/driver.go index 893afa300..970332f60 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -51,6 +51,70 @@ func NewDriver(name string, ctx *DriverContext) (Driver, error) { // Factory is used to instantiate a new Driver type Factory func(*DriverContext) Driver +// CreatedResources is a map of resources (eg downloaded images) created by a driver +// that must be cleaned up. +type CreatedResources struct { + Resources map[string][]string +} + +func NewCreatedResources() *CreatedResources { + return &CreatedResources{Resources: make(map[string][]string)} +} + +// Add a new resource if it doesn't already exist. +func (r *CreatedResources) Add(k, v string) { + if r.Resources == nil { + r.Resources = map[string][]string{k: []string{v}} + return + } + existing, ok := r.Resources[k] + if !ok { + // Key doesn't exist, create it + r.Resources[k] = []string{v} + return + } + for _, item := range existing { + if item == v { + // resource exists, return + return + } + } + + // Resource type exists but value did not, append it + r.Resources[k] = append(existing, v) + return +} + +// Merge another CreatedResources into this one. If the other CreatedResources +// is nil this method is a noop. +func (r *CreatedResources) Merge(o *CreatedResources) { + if o == nil { + return + } + + for k, v := range o.Resources { + // New key + if len(r.Resources[k]) == 0 { + r.Resources[k] = v + continue + } + + // Existing key + OUTER: + for _, item := range v { + for _, existing := range r.Resources[k] { + if item == existing { + // Found it, move on + continue OUTER + } + } + + // New item, append it + r.Resources[k] = append(r.Resources[k], item) + } + } +} + // Driver is used for execution of tasks. This allows Nomad // to support many pluggable implementations of task drivers. // Examples could include LXC, Docker, Qemu, etc. @@ -60,7 +124,9 @@ type Driver interface { // Prestart prepares the task environment and performs expensive // intialization steps like downloading images. - Prestart(*ExecContext, *structs.Task) error + // + // CreatedResources may be non-nil even when an error occurs. + Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) // Start is used to being task execution Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) @@ -68,6 +134,12 @@ type Driver interface { // Open is used to re-open a handle to a task Open(ctx *ExecContext, handleID string) (DriverHandle, error) + // Cleanup is called to remove resources which were created for a task + // and no longer needed. + // + // Cleanup should log any errors it encounters. + Cleanup(*ExecContext, *CreatedResources) + // Drivers must validate their configuration Validate(map[string]interface{}) error diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 9db1148c8..51e0dbe58 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -256,3 +256,42 @@ func TestMapMergeStrStr(t *testing.T) { t.Errorf("\nExpected\n%+v\nGot\n%+v\n", d, c) } } + +func TestCreatedResources(t *testing.T) { + res1 := NewCreatedResources() + res1.Add("k1", "v1") + res1.Add("k1", "v2") + res1.Add("k1", "v1") + res1.Add("k2", "v1") + + expected := map[string][]string{ + "k1": {"v1", "v2"}, + "k2": {"v1"}, + } + if !reflect.DeepEqual(expected, res1.Resources) { + t.Fatalf("1. %#v != expected %#v", res1.Resources, expected) + } + + // Make sure merging nil works + var res2 *CreatedResources + res1.Merge(res2) + if !reflect.DeepEqual(expected, res1.Resources) { + t.Fatalf("2. %#v != expected %#v", res1.Resources, expected) + } + + // Make sure a normal merge works + res2 = NewCreatedResources() + res2.Add("k1", "v3") + res2.Add("k2", "v1") + res2.Add("k3", "v3") + res1.Merge(res2) + + expected = map[string][]string{ + "k1": {"v1", "v2", "v3"}, + "k2": {"v1"}, + "k3": {"v3"}, + } + if !reflect.DeepEqual(expected, res1.Resources) { + t.Fatalf("3. %#v != expected %#v", res1.Resources, expected) + } +} diff --git a/client/driver/exec.go b/client/driver/exec.go index ac96bd260..20dd1b504 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -94,8 +94,8 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } -func (d *ExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *ExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -173,6 +173,8 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return h, nil } +func (d *ExecDriver) Cleanup(*ExecContext, *CreatedResources) {} + type execId struct { Version string KillTimeout time.Duration diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 7db6696ef..e5d8b24f3 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -67,7 +67,7 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -111,7 +111,7 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -175,7 +175,7 @@ func TestExecDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -229,7 +229,7 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -283,7 +283,7 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -349,7 +349,7 @@ done fmt.Errorf("Failed to write data") } - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -414,7 +414,7 @@ func TestExecDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/java.go b/client/driver/java.go index 6f2af363e..3852b01a7 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -164,8 +164,8 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, return true, nil } -func (d *JavaDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *JavaDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -260,6 +260,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return h, nil } +func (d *JavaDriver) Cleanup(*ExecContext, *CreatedResources) {} + // cgroupsMounted returns true if the cgroups are mounted on a system otherwise // returns false func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool { diff --git a/client/driver/java_test.go b/client/driver/java_test.go index f0821c710..9e9e082c8 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -94,7 +94,7 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -149,7 +149,7 @@ func TestJavaDriver_Start_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -215,7 +215,7 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -277,7 +277,7 @@ func TestJavaDriver_Signal(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -336,7 +336,7 @@ func TestJavaDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewJavaDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/lxc.go b/client/driver/lxc.go index 8e9f7aa56..a9cbe8aec 100644 --- a/client/driver/lxc.go +++ b/client/driver/lxc.go @@ -171,8 +171,8 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e return true, nil } -func (d *LxcDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *LxcDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } // Start starts the LXC Driver @@ -286,6 +286,8 @@ func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return &handle, nil } +func (d *LxcDriver) Cleanup(*ExecContext, *CreatedResources) {} + // Open creates the driver to monitor an existing LXC container func (d *LxcDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { pid := &lxcPID{} diff --git a/client/driver/lxc_test.go b/client/driver/lxc_test.go index 5f9b8852a..b6188bc6b 100644 --- a/client/driver/lxc_test.go +++ b/client/driver/lxc_test.go @@ -72,7 +72,7 @@ func TestLxcDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewLxcDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -148,7 +148,7 @@ func TestLxcDriver_Open_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewLxcDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index b6d5737b5..0277a3128 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -79,8 +79,8 @@ func (d *MockDriver) FSIsolation() cstructs.FSIsolation { return cstructs.FSIsolationNone } -func (d *MockDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *MockDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } // Start starts the mock driver @@ -124,6 +124,8 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return &h, nil } +func (m *MockDriver) Cleanup(*ExecContext, *CreatedResources) {} + // Validate validates the mock driver configuration func (m *MockDriver) Validate(map[string]interface{}) error { return nil diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 9ad850c32..0323c791c 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -137,8 +137,8 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, return true, nil } -func (d *QemuDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *QemuDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } // Run an existing Qemu image. Start() will pull down an existing, valid Qemu @@ -341,6 +341,8 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro return h, nil } +func (d *QemuDriver) Cleanup(*ExecContext, *CreatedResources) {} + func (h *qemuHandle) ID() string { id := qemuId{ Version: h.version, diff --git a/client/driver/qemu_test.go b/client/driver/qemu_test.go index 16ba2ba31..8e7393060 100644 --- a/client/driver/qemu_test.go +++ b/client/driver/qemu_test.go @@ -80,7 +80,7 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/qemu/linux-0.2.img", filepath.Join(dst, "linux-0.2.img"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("Prestart faild: %v", err) } @@ -146,7 +146,7 @@ func TestQemuDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewQemuDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("Prestart faild: %v", err) } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 972afe9e0..db3fd4843 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -108,8 +108,8 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo return false, nil } -func (d *RawExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *RawExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -182,6 +182,8 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return h, nil } +func (d *RawExecDriver) Cleanup(*ExecContext, *CreatedResources) {} + type rawExecId struct { Version string KillTimeout time.Duration diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index c91d1c52f..c3efcfd71 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -77,7 +77,7 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -126,7 +126,7 @@ func TestRawExecDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -180,7 +180,7 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -233,7 +233,7 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -286,7 +286,7 @@ func TestRawExecDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -335,7 +335,7 @@ done fmt.Errorf("Failed to write data") } - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/rkt.go b/client/driver/rkt.go index bf4b7d204..5492b8073 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -208,8 +208,8 @@ func (d *RktDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } -func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) { + return nil, nil } // Run an existing Rkt image. @@ -456,6 +456,8 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return h, nil } +func (d *RktDriver) Cleanup(*ExecContext, *CreatedResources) {} + func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { // Parse the handle pidBytes := []byte(strings.TrimPrefix(handleID, "Rkt:")) diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index 25dd5c151..72699986c 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -98,7 +98,7 @@ func TestRktDriver_Start_DNS(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -149,7 +149,7 @@ func TestRktDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -210,7 +210,7 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -281,7 +281,7 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -343,7 +343,7 @@ func TestRktDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -384,7 +384,7 @@ func TestRktTrustPrefix(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -462,7 +462,7 @@ func TestRktDriver_PortsMapping(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/task_runner.go b/client/task_runner.go index f8331f803..a4b70ae8e 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -93,6 +93,12 @@ type TaskRunner struct { // Must acquire persistLock when accessing taskDirBuilt bool + // createdResources are all the resources created by the task driver + // across all attempts to start the task. + // + // Must acquire persistLock when accessing + createdResources *driver.CreatedResources + // payloadRendered tracks whether the payload has been rendered to disk payloadRendered bool @@ -130,7 +136,10 @@ type TaskRunner struct { // waitCh closing marks the run loop as having exited waitCh chan struct{} - // serialize SaveState calls + // persistLock must be acquired when accessing fields stored by + // SaveState. SaveState is called asynchronously to TaskRunner.Run by + // AllocRunner, so all state fields must be synchronized using this + // lock. persistLock sync.Mutex } @@ -141,6 +150,7 @@ type taskRunnerState struct { HandleID string ArtifactDownloaded bool TaskDirBuilt bool + CreatedResources *driver.CreatedResources PayloadRendered bool } @@ -177,22 +187,23 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type) tc := &TaskRunner{ - config: config, - updater: updater, - logger: logger, - restartTracker: restartTracker, - alloc: alloc, - task: task, - taskDir: taskDir, - vaultClient: vaultClient, - vaultFuture: NewTokenFuture().Set(""), - updateCh: make(chan *structs.Allocation, 64), - destroyCh: make(chan struct{}), - waitCh: make(chan struct{}), - startCh: make(chan struct{}, 1), - unblockCh: make(chan struct{}), - restartCh: make(chan *structs.TaskEvent), - signalCh: make(chan SignalEvent), + config: config, + updater: updater, + logger: logger, + restartTracker: restartTracker, + alloc: alloc, + task: task, + taskDir: taskDir, + createdResources: driver.NewCreatedResources(), + vaultClient: vaultClient, + vaultFuture: NewTokenFuture().Set(""), + updateCh: make(chan *structs.Allocation, 64), + destroyCh: make(chan struct{}), + waitCh: make(chan struct{}), + startCh: make(chan struct{}, 1), + unblockCh: make(chan struct{}), + restartCh: make(chan *structs.TaskEvent), + signalCh: make(chan SignalEvent), } return tc @@ -237,6 +248,7 @@ func (r *TaskRunner) RestoreState() error { } r.artifactsDownloaded = snap.ArtifactDownloaded r.taskDirBuilt = snap.TaskDirBuilt + r.createdResources = snap.CreatedResources r.payloadRendered = snap.PayloadRendered if err := r.setTaskEnv(); err != nil { @@ -298,6 +310,7 @@ func (r *TaskRunner) SaveState() error { ArtifactDownloaded: r.artifactsDownloaded, TaskDirBuilt: r.taskDirBuilt, PayloadRendered: r.payloadRendered, + CreatedResources: r.createdResources, } r.handleLock.Lock() @@ -870,6 +883,8 @@ func (r *TaskRunner) run() { select { case success := <-prestartResultCh: if !success { + //FIXME is this necessary? + r.cleanup() r.setState(structs.TaskStateDead, nil) return } @@ -958,6 +973,7 @@ func (r *TaskRunner) run() { running := r.running r.runningLock.Unlock() if !running { + r.cleanup() r.setState(structs.TaskStateDead, r.destroyEvent) return } @@ -976,6 +992,11 @@ func (r *TaskRunner) run() { r.killTask(killEvent) close(stopCollection) + + // Wait for handler to exit before calling cleanup + <-handleWaitCh + r.cleanup() + r.setState(structs.TaskStateDead, nil) return } @@ -984,6 +1005,7 @@ func (r *TaskRunner) run() { RESTART: restart := r.shouldRestart() if !restart { + r.cleanup() r.setState(structs.TaskStateDead, nil) return } @@ -997,6 +1019,16 @@ func (r *TaskRunner) run() { } } +// cleanup calls Driver.Cleanup when a task is stopping. Errors are logged. +func (r *TaskRunner) cleanup() { + if drv, err := r.createDriver(); err != nil { + r.logger.Printf("[WARN] client: error creating driver to cleanup resources: %v", err) + } else { + ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) + drv.Cleanup(ctx, r.createdResources) + } +} + // shouldRestart returns if the task should restart. If the return value is // true, the task's restart policy has already been considered and any wait time // between restarts has been applied. @@ -1095,7 +1127,14 @@ func (r *TaskRunner) startTask() error { // Run prestart ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) - if err := drv.Prestart(ctx, r.task); err != nil { + res, err := drv.Prestart(ctx, r.task) + + // Merge newly created resources into previously created resources + r.persistLock.Lock() + r.createdResources.Merge(res) + r.persistLock.Unlock() + + if err != nil { wrapped := fmt.Errorf("failed to initialize task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err)