diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 390117926..649dbdebc 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -405,6 +405,10 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { t.Fatalf("err: %v", err) } + // Ensure ar1 doesn't recreate the state file + ar.persistLock.Lock() + defer ar.persistLock.Unlock() + // Ensure both alloc runners don't destroy ar.destroy = true diff --git a/client/driver/docker.go b/client/driver/docker.go index a58884812..7db042072 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -50,7 +50,7 @@ var ( // recoverableErrTimeouts returns a recoverable error if the error was due // to timeouts - recoverableErrTimeouts = func(err error) *structs.RecoverableError { + recoverableErrTimeouts = func(err error) error { r := false if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") || strings.Contains(err.Error(), "EOF") { @@ -82,16 +82,24 @@ const ( // Docker's privileged mode. dockerPrivilegedConfigOption = "docker.privileged.enabled" + // dockerCleanupImageConfigOption is the key for whether or not to + // cleanup images after the task exits. + dockerCleanupImageConfigOption = "docker.cleanup.image" + dockerCleanupImageConfigDefault = true + // dockerTimeout is the length of time a request can be outstanding before // it is timed out. dockerTimeout = 5 * time.Minute + + // dockerImageResKey is the CreatedResources key for docker images + dockerImageResKey = "image" ) type DockerDriver struct { DriverContext - imageID string driverConfig *DockerDriverConfig + imageID string } type DockerDriverAuth struct { @@ -235,8 +243,6 @@ type DockerHandle struct { client *docker.Client waitClient *docker.Client logger *log.Logger - cleanupImage bool - imageID string containerID string version string clkSpeed float64 @@ -353,35 +359,39 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation { return cstructs.FSIsolationImage } -func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) error { +func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) { driverConfig, err := NewDockerDriverConfig(task, d.taskEnv) if err != nil { - return err + return nil, err } + // Set state needed by Start() + d.driverConfig = driverConfig + // Initialize docker API clients client, _, err := d.dockerClients() if err != nil { - return fmt.Errorf("Failed to connect to docker daemon: %s", err) + return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) } + // Ensure the image is available if err := d.createImage(driverConfig, client, ctx.TaskDir); err != nil { - return err + return nil, err } - image := driverConfig.ImageName - // Now that we have the image we can get the image id - dockerImage, err := client.InspectImage(image) + // Regardless of whether the image was downloaded already or not, store + // it as a created resource. Cleanup will soft fail if the image is + // still in use by another contianer. + dockerImage, err := client.InspectImage(driverConfig.ImageName) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed getting image id for %s: %s", image, err) - return fmt.Errorf("Failed to determine image id for `%s`: %s", image, err) + d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err) + return nil, err } - d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID) - // Set state needed by Start() + res := NewCreatedResources() + res.Add(dockerImageResKey, dockerImage.ID) d.imageID = dockerImage.ID - d.driverConfig = driverConfig - return nil + return res, nil } func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -426,23 +436,24 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle config, err := d.createContainerConfig(ctx, task, d.driverConfig, syslogAddr) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", d.imageID, err) + d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %q (%q): %v", d.driverConfig.ImageName, d.imageID, err) pluginClient.Kill() - return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", d.imageID, err) + return nil, fmt.Errorf("Failed to create container configuration for image %q (%q): %v", d.driverConfig.ImageName, d.imageID, err) } - container, rerr := d.createContainer(config) - if rerr != nil { - d.logger.Printf("[ERR] driver.docker: failed to create container: %s", rerr) + container, err := d.createContainer(config) + if err != nil { + d.logger.Printf("[ERR] driver.docker: failed to create container: %s", err) pluginClient.Kill() - rerr.Err = fmt.Sprintf("Failed to create container: %s", rerr.Err) - return nil, rerr + if rerr, ok := err.(*structs.RecoverableError); ok { + rerr.Err = fmt.Sprintf("Failed to create container: %s", rerr.Err) + return nil, rerr + } + return nil, err } d.logger.Printf("[INFO] driver.docker: created container %s", container.ID) - cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true) - // We don't need to start the container if the container is already running // since we don't create containers which are already present on the host // and are running @@ -466,9 +477,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle waitClient: waitClient, executor: exec, pluginClient: pluginClient, - cleanupImage: cleanupImage, logger: d.logger, - imageID: d.imageID, containerID: container.ID, version: d.config.Version, killTimeout: GetKillTimeout(task.KillTimeout, maxKill), @@ -484,6 +493,58 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return h, nil } +func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error { + retry := false + var merr multierror.Error + for key, resources := range res.Resources { + switch key { + case dockerImageResKey: + for _, value := range resources { + err := d.cleanupImage(value) + if err != nil { + if structs.IsRecoverable(err) { + retry = true + } + merr.Errors = append(merr.Errors, err) + continue + } + + // Remove cleaned image from resources + res.Remove(dockerImageResKey, value) + } + default: + d.logger.Printf("[ERR] driver.docker: unknown resource to cleanup: %q", key) + } + } + return structs.NewRecoverableError(merr.ErrorOrNil(), retry) +} + +// cleanupImage removes a Docker image. No error is returned if the image +// doesn't exist or is still in use. Requires the global client to already be +// initialized. +func (d *DockerDriver) cleanupImage(id string) error { + if !d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) { + // Config says not to cleanup + return nil + } + + if err := client.RemoveImage(id); err != nil { + if err == docker.ErrNoSuchImage { + d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: does not exist", id) + return nil + } + if derr, ok := err.(*docker.Error); ok && derr.Status == 409 { + d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id) + return nil + } + // Retry on unknown errors + return structs.NewRecoverableError(err, true) + } + + d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id) + return nil +} + // dockerClients creates two *docker.Client, one for long running operations and // the other for shorter operations. In test / dev mode we can use ENV vars to // connect to the docker daemon. In production mode we will read docker.endpoint @@ -657,7 +718,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas } config := &docker.Config{ - Image: driverConfig.ImageName, + Image: d.imageID, Hostname: driverConfig.Hostname, User: task.User, Tty: driverConfig.TTY, @@ -886,26 +947,28 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc tag = "latest" } - var dockerImage *docker.Image - var err error // We're going to check whether the image is already downloaded. If the tag // is "latest", or ForcePull is set, we have to check for a new version every time so we don't // bother to check and cache the id here. We'll download first, then cache. if driverConfig.ForcePull { d.logger.Printf("[DEBUG] driver.docker: force pull image '%s:%s' instead of inspecting local", repo, tag) } else if tag != "latest" { - dockerImage, err = client.InspectImage(image) + if dockerImage, _ := client.InspectImage(image); dockerImage != nil { + // Image exists, nothing to do + return nil + } + } + + // Load the image if specified + if len(driverConfig.LoadImages) > 0 { + return d.loadImage(driverConfig, client, taskDir) } // Download the image - if dockerImage == nil { - if len(driverConfig.LoadImages) > 0 { - return d.loadImage(driverConfig, client, taskDir) - } - - return d.pullImage(driverConfig, client, repo, tag) + if err := d.pullImage(driverConfig, client, repo, tag); err != nil { + return err } - return err + return nil } // pullImage creates an image by pulling it from a docker registry @@ -955,6 +1018,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) return d.recoverablePullError(err, driverConfig.ImageName) } + d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag) return nil } @@ -980,7 +1044,7 @@ func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docke // createContainer creates the container given the passed configuration. It // attempts to handle any transient Docker errors. -func (d *DockerDriver) createContainer(config docker.CreateContainerOptions) (*docker.Container, *structs.RecoverableError) { +func (d *DockerDriver) createContainer(config docker.CreateContainerOptions) (*docker.Container, error) { // Create a container attempted := 0 CREATE: @@ -989,7 +1053,8 @@ CREATE: return container, nil } - d.logger.Printf("[DEBUG] driver.docker: failed to create container %q (attempt %d): %v", config.Name, attempted+1, createErr) + d.logger.Printf("[DEBUG] driver.docker: failed to create container %q from image %q (ID: %q) (attempt %d): %v", + config.Name, d.driverConfig.ImageName, d.imageID, attempted+1, createErr) if strings.Contains(strings.ToLower(createErr.Error()), "container already exists") { containers, err := client.ListContainers(docker.ListContainersOptions{ All: true, @@ -1052,7 +1117,7 @@ CREATE: // startContainer starts the passed container. It attempts to handle any // transient Docker errors. -func (d *DockerDriver) startContainer(c *docker.Container) *structs.RecoverableError { +func (d *DockerDriver) startContainer(c *docker.Container) error { // Start a container attempted := 0 START: @@ -1076,8 +1141,6 @@ START: } func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { - cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true) - // Split the handle pidBytes := []byte(strings.TrimPrefix(handleID, "DOCKER:")) pid := &dockerPID{} @@ -1133,9 +1196,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er waitClient: waitClient, executor: exec, pluginClient: pluginClient, - cleanupImage: cleanupImage, logger: d.logger, - imageID: pid.ImageID, containerID: pid.ContainerID, version: pid.Version, killTimeout: pid.KillTimeout, @@ -1156,7 +1217,6 @@ func (h *DockerHandle) ID() string { // Return a handle to the PID pid := dockerPID{ Version: h.version, - ImageID: h.imageID, ContainerID: h.containerID, KillTimeout: h.killTimeout, MaxKillTimeout: h.maxKillTimeout, @@ -1273,13 +1333,6 @@ func (h *DockerHandle) run() { h.logger.Printf("[ERR] driver.docker: error removing container: %v", err) } - // Cleanup the image - if h.cleanupImage { - if err := h.client.RemoveImage(h.imageID); err != nil { - h.logger.Printf("[DEBUG] driver.docker: error removing image: %v", err) - } - } - // Send the results h.waitCh <- dstructs.NewWaitResult(exitCode, 0, werr) close(h.waitCh) diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 9b1e129d5..ee38b629a 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -99,7 +99,8 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie driver := NewDockerDriver(tctx.DriverCtx) copyImage(t, tctx.ExecCtx.TaskDir, "busybox.tar") - if err := driver.Prestart(tctx.ExecCtx, task); err != nil { + _, err := driver.Prestart(tctx.ExecCtx, task) + if err != nil { tctx.AllocDir.Destroy() t.Fatalf("error in prestart: %v", err) } @@ -182,9 +183,11 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + _, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } + handle, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -276,7 +279,8 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) { // Copy the image into the task's directory copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + _, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -340,7 +344,7 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) - err := d.Prestart(ctx.ExecCtx, task) + _, err := d.Prestart(ctx.ExecCtx, task) if err == nil { t.Fatalf("want error in prestart: %v", err) } @@ -391,7 +395,8 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + _, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -477,7 +482,6 @@ func TestDockerDriver_StartN(t *testing.T) { t.Logf("Starting %d tasks", len(taskList)) // Let's spin up a bunch of things - var err error for idx, task := range taskList { ctx := testDriverContexts(t, task) ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} @@ -485,7 +489,8 @@ func TestDockerDriver_StartN(t *testing.T) { d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + _, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart #%d: %v", idx+1, err) } handles[idx], err = d.Start(ctx.ExecCtx, task) @@ -535,7 +540,6 @@ func TestDockerDriver_StartNVersions(t *testing.T) { t.Logf("Starting %d tasks", len(taskList)) // Let's spin up a bunch of things - var err error for idx, task := range taskList { ctx := testDriverContexts(t, task) ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} @@ -545,7 +549,8 @@ func TestDockerDriver_StartNVersions(t *testing.T) { copyImage(t, ctx.ExecCtx.TaskDir, "busybox_musl.tar") copyImage(t, ctx.ExecCtx.TaskDir, "busybox_glibc.tar") - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + _, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart #%d: %v", idx+1, err) } handles[idx], err = d.Start(ctx.ExecCtx, task) @@ -708,7 +713,7 @@ func TestDockerDriver_ForcePull_IsInvalidConfig(t *testing.T) { ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} driver := NewDockerDriver(ctx.DriverCtx) - if err := driver.Prestart(ctx.ExecCtx, task); err == nil { + if _, err := driver.Prestart(ctx.ExecCtx, task); err == nil { t.Fatalf("error expected in prestart") } } @@ -916,7 +921,8 @@ func TestDockerDriver_User(t *testing.T) { defer ctx.AllocDir.Destroy() copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") - if err := driver.Prestart(ctx.ExecCtx, task); err != nil { + _, err := driver.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } @@ -1072,7 +1078,8 @@ done fmt.Errorf("Failed to write data") } - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + _, err := d.Prestart(ctx.ExecCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -1194,7 +1201,8 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, _, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() - if err := driver.Prestart(execCtx, task); err != nil { + _, err = driver.Prestart(execCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } if _, err := driver.Start(execCtx, task); err == nil { @@ -1207,7 +1215,8 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { task, driver, execCtx, fn, cleanup := setupDockerVolumes(t, cfg, ".") defer cleanup() - if err := driver.Prestart(execCtx, task); err != nil { + _, err := driver.Prestart(execCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := driver.Start(execCtx, task) @@ -1243,7 +1252,8 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) { task, driver, execCtx, hostpath, cleanup := setupDockerVolumes(t, cfg, tmpvol) defer cleanup() - if err := driver.Prestart(execCtx, task); err != nil { + _, err = driver.Prestart(execCtx, task) + if err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := driver.Start(execCtx, task) @@ -1266,6 +1276,56 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) { } } +// TestDockerDriver_Cleanup ensures Cleanup removes only downloaded images. +func TestDockerDriver_Cleanup(t *testing.T) { + if !testutil.DockerIsConnected(t) { + t.SkipNow() + } + + imageName := "hello-world:latest" + task := &structs.Task{ + Name: "cleanup_test", + Driver: "docker", + Config: map[string]interface{}{ + "image": imageName, + }, + } + tctx := testDriverContexts(t, task) + defer tctx.AllocDir.Destroy() + + // Run Prestart + driver := NewDockerDriver(tctx.DriverCtx).(*DockerDriver) + res, err := driver.Prestart(tctx.ExecCtx, task) + if err != nil { + t.Fatalf("error in prestart: %v", err) + } + if len(res.Resources) == 0 || len(res.Resources[dockerImageResKey]) == 0 { + t.Fatalf("no created resources: %#v", res) + } + + // Cleanup + rescopy := res.Copy() + if err := driver.Cleanup(tctx.ExecCtx, rescopy); err != nil { + t.Fatalf("Cleanup failed: %v", err) + } + + // Make sure rescopy is updated + if len(rescopy.Resources) > 0 { + t.Errorf("Cleanup should have cleared resource map: %#v", rescopy.Resources) + } + + // Ensure image was removed + if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil { + t.Fatalf("image exists but should have been removed. Does another %v container exist?", imageName) + } + + // The image doesn't exist which shouldn't be an error when calling + // Cleanup, so call it again to make sure. + if err := driver.Cleanup(tctx.ExecCtx, res.Copy()); err != nil { + t.Fatalf("Cleanup failed: %v", err) + } +} + func copyImage(t *testing.T, taskDir *allocdir.TaskDir, image string) { dst := filepath.Join(taskDir.LocalDir, image) copyFile(filepath.Join("./test-resources/docker", image), dst, t) diff --git a/client/driver/driver.go b/client/driver/driver.go index c4560e978..c91d41e5d 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -51,6 +51,100 @@ func NewDriver(name string, ctx *DriverContext) (Driver, error) { // Factory is used to instantiate a new Driver type Factory func(*DriverContext) Driver +// CreatedResources is a map of resources (eg downloaded images) created by a driver +// that must be cleaned up. +type CreatedResources struct { + Resources map[string][]string +} + +func NewCreatedResources() *CreatedResources { + return &CreatedResources{Resources: make(map[string][]string)} +} + +// Add a new resource if it doesn't already exist. +func (r *CreatedResources) Add(k, v string) { + if r.Resources == nil { + r.Resources = map[string][]string{k: []string{v}} + return + } + existing, ok := r.Resources[k] + if !ok { + // Key doesn't exist, create it + r.Resources[k] = []string{v} + return + } + for _, item := range existing { + if item == v { + // resource exists, return + return + } + } + + // Resource type exists but value did not, append it + r.Resources[k] = append(existing, v) + return +} + +// Remove a resource. Return true if removed, otherwise false. +// +// Removes the entire key if the needle is the last value in the list. +func (r *CreatedResources) Remove(k, needle string) bool { + haystack := r.Resources[k] + for i, item := range haystack { + if item == needle { + r.Resources[k] = append(haystack[:i], haystack[i+1:]...) + if len(r.Resources[k]) == 0 { + delete(r.Resources, k) + } + return true + } + } + return false +} + +// Copy returns a new deep copy of CreatedResrouces. +func (r *CreatedResources) Copy() *CreatedResources { + newr := CreatedResources{ + Resources: make(map[string][]string, len(r.Resources)), + } + for k, v := range r.Resources { + newv := make([]string, len(v)) + copy(newv, v) + newr.Resources[k] = newv + } + return &newr +} + +// Merge another CreatedResources into this one. If the other CreatedResources +// is nil this method is a noop. +func (r *CreatedResources) Merge(o *CreatedResources) { + if o == nil { + return + } + + for k, v := range o.Resources { + // New key + if len(r.Resources[k]) == 0 { + r.Resources[k] = v + continue + } + + // Existing key + OUTER: + for _, item := range v { + for _, existing := range r.Resources[k] { + if item == existing { + // Found it, move on + continue OUTER + } + } + + // New item, append it + r.Resources[k] = append(r.Resources[k], item) + } + } +} + // Driver is used for execution of tasks. This allows Nomad // to support many pluggable implementations of task drivers. // Examples could include LXC, Docker, Qemu, etc. @@ -60,7 +154,9 @@ type Driver interface { // Prestart prepares the task environment and performs expensive // intialization steps like downloading images. - Prestart(*ExecContext, *structs.Task) error + // + // CreatedResources may be non-nil even when an error occurs. + Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) // Start is used to being task execution Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) @@ -68,6 +164,14 @@ type Driver interface { // Open is used to re-open a handle to a task Open(ctx *ExecContext, handleID string) (DriverHandle, error) + // Cleanup is called to remove resources which were created for a task + // and no longer needed. + // + // If Cleanup returns a recoverable error it may be retried. On retry + // it will be passed the same CreatedResources, so all successfully + // cleaned up resources should be removed. + Cleanup(*ExecContext, *CreatedResources) error + // Drivers must validate their configuration Validate(map[string]interface{}) error diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 9db1148c8..6cca2b4c7 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -256,3 +256,80 @@ func TestMapMergeStrStr(t *testing.T) { t.Errorf("\nExpected\n%+v\nGot\n%+v\n", d, c) } } + +func TestCreatedResources_AddMerge(t *testing.T) { + res1 := NewCreatedResources() + res1.Add("k1", "v1") + res1.Add("k1", "v2") + res1.Add("k1", "v1") + res1.Add("k2", "v1") + + expected := map[string][]string{ + "k1": {"v1", "v2"}, + "k2": {"v1"}, + } + if !reflect.DeepEqual(expected, res1.Resources) { + t.Fatalf("1. %#v != expected %#v", res1.Resources, expected) + } + + // Make sure merging nil works + var res2 *CreatedResources + res1.Merge(res2) + if !reflect.DeepEqual(expected, res1.Resources) { + t.Fatalf("2. %#v != expected %#v", res1.Resources, expected) + } + + // Make sure a normal merge works + res2 = NewCreatedResources() + res2.Add("k1", "v3") + res2.Add("k2", "v1") + res2.Add("k3", "v3") + res1.Merge(res2) + + expected = map[string][]string{ + "k1": {"v1", "v2", "v3"}, + "k2": {"v1"}, + "k3": {"v3"}, + } + if !reflect.DeepEqual(expected, res1.Resources) { + t.Fatalf("3. %#v != expected %#v", res1.Resources, expected) + } +} + +func TestCreatedResources_CopyRemove(t *testing.T) { + res1 := NewCreatedResources() + res1.Add("k1", "v1") + res1.Add("k1", "v2") + res1.Add("k1", "v3") + res1.Add("k2", "v1") + + // Assert Copy creates a deep copy + res2 := res1.Copy() + + if !reflect.DeepEqual(res1, res2) { + t.Fatalf("%#v != %#v", res1, res2) + } + + // Assert removing v1 from k1 returns true and updates Resources slice + if removed := res2.Remove("k1", "v1"); !removed { + t.Fatalf("expected v1 to be removed: %#v", res2) + } + + if expected := []string{"v2", "v3"}; !reflect.DeepEqual(expected, res2.Resources["k1"]) { + t.Fatalf("unpexpected list for k1: %#v", res2.Resources["k1"]) + } + + // Assert removing the only value from a key removes the key + if removed := res2.Remove("k2", "v1"); !removed { + t.Fatalf("expected v1 to be removed from k2: %#v", res2.Resources) + } + + if _, found := res2.Resources["k2"]; found { + t.Fatalf("k2 should have been removed from Resources: %#v", res2.Resources) + } + + // Make sure res1 wasn't updated + if reflect.DeepEqual(res1, res2) { + t.Fatalf("res1 should not equal res2: #%v", res1) + } +} diff --git a/client/driver/exec.go b/client/driver/exec.go index 3d1930e16..1a1b7457f 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -92,8 +92,8 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } -func (d *ExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *ExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -167,6 +167,8 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return h, nil } +func (d *ExecDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } + type execId struct { Version string KillTimeout time.Duration diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 7db6696ef..279480435 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -67,7 +67,7 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -111,7 +111,7 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -145,12 +145,20 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { handle2.Kill() t.Fatalf("expected handle2 to be nil") } + // Test if the userpid is still present - userProc, err := os.FindProcess(id.UserPid) + userProc, _ := os.FindProcess(id.UserPid) - err = userProc.Signal(syscall.Signal(0)) + for retry := 3; retry > 0; retry-- { + if err = userProc.Signal(syscall.Signal(0)); err != nil { + // Process is gone as expected; exit + return + } - if err == nil { + // Killing processes is async; wait and check again + time.Sleep(time.Second) + } + if err = userProc.Signal(syscall.Signal(0)); err == nil { t.Fatalf("expected user process to die") } } @@ -175,7 +183,7 @@ func TestExecDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -229,7 +237,7 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -283,7 +291,7 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -349,7 +357,7 @@ done fmt.Errorf("Failed to write data") } - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -414,7 +422,7 @@ func TestExecDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/java.go b/client/driver/java.go index 366b891e8..0a5240f81 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -167,8 +167,8 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, return true, nil } -func (d *JavaDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *JavaDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*JavaDriverConfig, error) { @@ -293,6 +293,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return h, nil } +func (d *JavaDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } + // cgroupsMounted returns true if the cgroups are mounted on a system otherwise // returns false func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool { diff --git a/client/driver/java_test.go b/client/driver/java_test.go index 8fa5ec26b..53285d83b 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -94,7 +94,7 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -149,7 +149,7 @@ func TestJavaDriver_Start_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -215,7 +215,7 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -277,7 +277,7 @@ func TestJavaDriver_Signal(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -336,7 +336,7 @@ func TestJavaDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewJavaDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/lxc.go b/client/driver/lxc.go index 8e9f7aa56..5b541a3f8 100644 --- a/client/driver/lxc.go +++ b/client/driver/lxc.go @@ -171,8 +171,8 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e return true, nil } -func (d *LxcDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *LxcDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } // Start starts the LXC Driver @@ -286,6 +286,8 @@ func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return &handle, nil } +func (d *LxcDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } + // Open creates the driver to monitor an existing LXC container func (d *LxcDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { pid := &lxcPID{} diff --git a/client/driver/lxc_test.go b/client/driver/lxc_test.go index 5f9b8852a..b6188bc6b 100644 --- a/client/driver/lxc_test.go +++ b/client/driver/lxc_test.go @@ -72,7 +72,7 @@ func TestLxcDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewLxcDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -148,7 +148,7 @@ func TestLxcDriver_Open_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewLxcDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index b6d5737b5..b81eca64f 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -8,6 +8,7 @@ import ( "fmt" "log" "os" + "strconv" "time" "github.com/mitchellh/mapstructure" @@ -62,6 +63,8 @@ type MockDriverConfig struct { type MockDriver struct { DriverContext fingerprint.StaticFingerprinter + + cleanupFailNum int } // NewMockDriver is a factory method which returns a new Mock Driver @@ -79,8 +82,8 @@ func (d *MockDriver) FSIsolation() cstructs.FSIsolation { return cstructs.FSIsolationNone } -func (d *MockDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *MockDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } // Start starts the mock driver @@ -124,6 +127,24 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return &h, nil } +// Cleanup deletes all keys except for Config.Options["cleanup_fail_on"] for +// Config.Options["cleanup_fail_num"] times. For failures it will return a +// recoverable error. +func (m *MockDriver) Cleanup(ctx *ExecContext, res *CreatedResources) error { + var err error + failn, _ := strconv.Atoi(m.config.Options["cleanup_fail_num"]) + failk := m.config.Options["cleanup_fail_on"] + for k := range res.Resources { + if k == failk && m.cleanupFailNum < failn { + m.cleanupFailNum++ + err = structs.NewRecoverableError(fmt.Errorf("mock_driver failure on %q call %d/%d", k, m.cleanupFailNum, failn), true) + } else { + delete(res.Resources, k) + } + } + return err +} + // Validate validates the mock driver configuration func (m *MockDriver) Validate(map[string]interface{}) error { return nil diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 98575c5e6..746740829 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -136,8 +136,8 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, return true, nil } -func (d *QemuDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *QemuDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } // Run an existing Qemu image. Start() will pull down an existing, valid Qemu @@ -336,6 +336,8 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro return h, nil } +func (d *QemuDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } + func (h *qemuHandle) ID() string { id := qemuId{ Version: h.version, diff --git a/client/driver/qemu_test.go b/client/driver/qemu_test.go index 16ba2ba31..8e7393060 100644 --- a/client/driver/qemu_test.go +++ b/client/driver/qemu_test.go @@ -80,7 +80,7 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) { dst := ctx.ExecCtx.TaskDir.Dir copyFile("./test-resources/qemu/linux-0.2.img", filepath.Join(dst, "linux-0.2.img"), t) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("Prestart faild: %v", err) } @@ -146,7 +146,7 @@ func TestQemuDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewQemuDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("Prestart faild: %v", err) } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index eb4e1b790..e7dbe2bb2 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -106,8 +106,8 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo return false, nil } -func (d *RawExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *RawExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) { + return nil, nil } func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { @@ -177,6 +177,8 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return h, nil } +func (d *RawExecDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } + type rawExecId struct { Version string KillTimeout time.Duration diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index c91d1c52f..c3efcfd71 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -77,7 +77,7 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -126,7 +126,7 @@ func TestRawExecDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -180,7 +180,7 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -233,7 +233,7 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -286,7 +286,7 @@ func TestRawExecDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRawExecDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -335,7 +335,7 @@ done fmt.Errorf("Failed to write data") } - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("prestart err: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/driver/rkt.go b/client/driver/rkt.go index d99621368..116659729 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -207,8 +207,8 @@ func (d *RktDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } -func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) error { - return nil +func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) { + return nil, nil } // Run an existing Rkt image. @@ -451,6 +451,8 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return h, nil } +func (d *RktDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } + func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { // Parse the handle pidBytes := []byte(strings.TrimPrefix(handleID, "Rkt:")) diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index 25dd5c151..72699986c 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -98,7 +98,7 @@ func TestRktDriver_Start_DNS(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -149,7 +149,7 @@ func TestRktDriver_Start_Wait(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -210,7 +210,7 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -281,7 +281,7 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -343,7 +343,7 @@ func TestRktDriverUser(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -384,7 +384,7 @@ func TestRktTrustPrefix(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) @@ -462,7 +462,7 @@ func TestRktDriver_PortsMapping(t *testing.T) { defer ctx.AllocDir.Destroy() d := NewRktDriver(ctx.DriverCtx) - if err := d.Prestart(ctx.ExecCtx, task); err != nil { + if _, err := d.Prestart(ctx.ExecCtx, task); err != nil { t.Fatalf("error in prestart: %v", err) } handle, err := d.Start(ctx.ExecCtx, task) diff --git a/client/task_runner.go b/client/task_runner.go index f8331f803..443846e58 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -93,6 +93,11 @@ type TaskRunner struct { // Must acquire persistLock when accessing taskDirBuilt bool + // createdResources are all the resources created by the task driver + // across all attempts to start the task. + createdResources *driver.CreatedResources + createdResourcesLock sync.Mutex + // payloadRendered tracks whether the payload has been rendered to disk payloadRendered bool @@ -130,7 +135,10 @@ type TaskRunner struct { // waitCh closing marks the run loop as having exited waitCh chan struct{} - // serialize SaveState calls + // persistLock must be acquired when accessing fields stored by + // SaveState. SaveState is called asynchronously to TaskRunner.Run by + // AllocRunner, so all state fields must be synchronized using this + // lock. persistLock sync.Mutex } @@ -141,6 +149,7 @@ type taskRunnerState struct { HandleID string ArtifactDownloaded bool TaskDirBuilt bool + CreatedResources *driver.CreatedResources PayloadRendered bool } @@ -177,22 +186,23 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type) tc := &TaskRunner{ - config: config, - updater: updater, - logger: logger, - restartTracker: restartTracker, - alloc: alloc, - task: task, - taskDir: taskDir, - vaultClient: vaultClient, - vaultFuture: NewTokenFuture().Set(""), - updateCh: make(chan *structs.Allocation, 64), - destroyCh: make(chan struct{}), - waitCh: make(chan struct{}), - startCh: make(chan struct{}, 1), - unblockCh: make(chan struct{}), - restartCh: make(chan *structs.TaskEvent), - signalCh: make(chan SignalEvent), + config: config, + updater: updater, + logger: logger, + restartTracker: restartTracker, + alloc: alloc, + task: task, + taskDir: taskDir, + createdResources: driver.NewCreatedResources(), + vaultClient: vaultClient, + vaultFuture: NewTokenFuture().Set(""), + updateCh: make(chan *structs.Allocation, 64), + destroyCh: make(chan struct{}), + waitCh: make(chan struct{}), + startCh: make(chan struct{}, 1), + unblockCh: make(chan struct{}), + restartCh: make(chan *structs.TaskEvent), + signalCh: make(chan SignalEvent), } return tc @@ -237,6 +247,7 @@ func (r *TaskRunner) RestoreState() error { } r.artifactsDownloaded = snap.ArtifactDownloaded r.taskDirBuilt = snap.TaskDirBuilt + r.createdResources = snap.CreatedResources r.payloadRendered = snap.PayloadRendered if err := r.setTaskEnv(); err != nil { @@ -292,12 +303,17 @@ func (r *TaskRunner) SaveState() error { r.persistLock.Lock() defer r.persistLock.Unlock() + r.createdResourcesLock.Lock() + res := r.createdResources.Copy() + r.createdResourcesLock.Unlock() + snap := taskRunnerState{ Task: r.task, Version: r.config.Version, ArtifactDownloaded: r.artifactsDownloaded, TaskDirBuilt: r.taskDirBuilt, PayloadRendered: r.payloadRendered, + CreatedResources: res, } r.handleLock.Lock() @@ -870,6 +886,7 @@ func (r *TaskRunner) run() { select { case success := <-prestartResultCh: if !success { + r.cleanup() r.setState(structs.TaskStateDead, nil) return } @@ -958,6 +975,7 @@ func (r *TaskRunner) run() { running := r.running r.runningLock.Unlock() if !running { + r.cleanup() r.setState(structs.TaskStateDead, r.destroyEvent) return } @@ -976,6 +994,11 @@ func (r *TaskRunner) run() { r.killTask(killEvent) close(stopCollection) + + // Wait for handler to exit before calling cleanup + <-handleWaitCh + r.cleanup() + r.setState(structs.TaskStateDead, nil) return } @@ -984,6 +1007,7 @@ func (r *TaskRunner) run() { RESTART: restart := r.shouldRestart() if !restart { + r.cleanup() r.setState(structs.TaskStateDead, nil) return } @@ -997,6 +1021,44 @@ func (r *TaskRunner) run() { } } +// cleanup calls Driver.Cleanup when a task is stopping. Errors are logged. +func (r *TaskRunner) cleanup() { + drv, err := r.createDriver() + if err != nil { + r.logger.Printf("[ERR] client: error creating driver to cleanup resources: %v", err) + return + } + + r.createdResourcesLock.Lock() + res := r.createdResources.Copy() + r.createdResourcesLock.Unlock() + + ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) + attempts := 1 + var cleanupErr error + for retry := true; retry; attempts++ { + cleanupErr = drv.Cleanup(ctx, res) + retry = structs.IsRecoverable(cleanupErr) + + // Copy current createdResources state in case SaveState is + // called between retries + r.createdResourcesLock.Lock() + r.createdResources = res.Copy() + r.createdResourcesLock.Unlock() + + // Retry 3 times with sleeps between + if !retry || attempts > 3 { + break + } + time.Sleep(time.Duration(attempts) * time.Second) + } + + if cleanupErr != nil { + r.logger.Printf("[ERR] client: error cleaning up resources for task %q after %d attempts: %v", r.task.Name, attempts, cleanupErr) + } + return +} + // shouldRestart returns if the task should restart. If the return value is // true, the task's restart policy has already been considered and any wait time // between restarts has been applied. @@ -1095,7 +1157,14 @@ func (r *TaskRunner) startTask() error { // Run prestart ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) - if err := drv.Prestart(ctx, r.task); err != nil { + res, err := drv.Prestart(ctx, r.task) + + // Merge newly created resources into previously created resources + r.createdResourcesLock.Lock() + r.createdResources.Merge(res) + r.createdResourcesLock.Unlock() + + if err != nil { wrapped := fmt.Errorf("failed to initialize task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err) diff --git a/client/task_runner_test.go b/client/task_runner_test.go index e76426294..7a5b9cb8c 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -1305,3 +1305,53 @@ func TestTaskRunner_SimpleRun_Dispatch(t *testing.T) { t.Fatalf("Bad; got %v; want %v", string(data), string(expected)) } } + +func TestTaskRunner_CleanupOK(t *testing.T) { + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + key := "ERR" + + ctx := testTaskRunnerFromAlloc(t, false, alloc) + ctx.tr.config.Options = map[string]string{ + "cleanup_fail_on": key, + "cleanup_fail_num": "1", + } + ctx.tr.MarkReceived() + + ctx.tr.createdResources.Resources[key] = []string{"x", "y"} + ctx.tr.createdResources.Resources["foo"] = []string{"z"} + + defer ctx.Cleanup() + ctx.tr.Run() + + // Since we only failed once, createdResources should be empty + if len(ctx.tr.createdResources.Resources) > 0 { + t.Fatalf("expected all created resources to be removed: %#v", ctx.tr.createdResources.Resources) + } +} + +func TestTaskRunner_CleanupFail(t *testing.T) { + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + key := "ERR" + ctx := testTaskRunnerFromAlloc(t, false, alloc) + ctx.tr.config.Options = map[string]string{ + "cleanup_fail_on": key, + "cleanup_fail_num": "5", + } + ctx.tr.MarkReceived() + + ctx.tr.createdResources.Resources[key] = []string{"x"} + ctx.tr.createdResources.Resources["foo"] = []string{"y", "z"} + + defer ctx.Cleanup() + ctx.tr.Run() + + // Since we failed > 3 times, the failed key should remain + expected := map[string][]string{key: {"x"}} + if !reflect.DeepEqual(expected, ctx.tr.createdResources.Resources) { + t.Fatalf("expected %#v but found: %#v", expected, ctx.tr.createdResources.Resources) + } +} diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 5328d9671..d2787535a 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -944,7 +944,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, // setErr is a helper for setting the recoverable error on the reply and // logging it setErr := func(e error, recoverable bool) { - reply.Error = structs.NewRecoverableError(e, recoverable) + reply.Error = structs.NewRecoverableError(e, recoverable).(*structs.RecoverableError) n.srv.logger.Printf("[ERR] nomad.client: DeriveVaultToken failed (recoverable %v): %v", recoverable, e) } @@ -1134,7 +1134,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, if rerr, ok := createErr.(*structs.RecoverableError); ok { reply.Error = rerr } else { - reply.Error = structs.NewRecoverableError(createErr, false) + reply.Error = structs.NewRecoverableError(createErr, false).(*structs.RecoverableError) } return nil diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fa63def23..ceb7df38a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4054,7 +4054,7 @@ type RecoverableError struct { // NewRecoverableError is used to wrap an error and mark it as recoverable or // not. -func NewRecoverableError(e error, recoverable bool) *RecoverableError { +func NewRecoverableError(e error, recoverable bool) error { if e == nil { return nil } @@ -4068,3 +4068,12 @@ func NewRecoverableError(e error, recoverable bool) *RecoverableError { func (r *RecoverableError) Error() string { return r.Err } + +// IsRecoverable returns true if error is a RecoverableError with +// Recoverable=true. Otherwise false is returned. +func IsRecoverable(e error) bool { + if re, ok := e.(*RecoverableError); ok { + return re.Recoverable + } + return false +} diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index b0ddb7aaf..72c097224 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1,6 +1,7 @@ package structs import ( + "fmt" "reflect" "strings" "testing" @@ -1539,3 +1540,21 @@ func TestDispatchInputConfig_Validate(t *testing.T) { t.Fatalf("bad: %v", err) } } + +func TestIsRecoverable(t *testing.T) { + if IsRecoverable(nil) { + t.Errorf("nil should not be recoverable") + } + if IsRecoverable(NewRecoverableError(nil, true)) { + t.Errorf("NewRecoverableError(nil, true) should not be recoverable") + } + if IsRecoverable(fmt.Errorf("i promise im recoverable")) { + t.Errorf("Custom errors should not be recoverable") + } + if IsRecoverable(NewRecoverableError(fmt.Errorf(""), false)) { + t.Errorf("Explicitly unrecoverable errors should not be recoverable") + } + if !IsRecoverable(NewRecoverableError(fmt.Errorf(""), true)) { + t.Errorf("Explicitly recoverable errors *should* be recoverable") + } +}