Merge pull request #2186 from hashicorp/f-driver-cleanup

Add Cleanup method to Driver interface
This commit is contained in:
Michael Schurter
2017-01-20 13:02:14 -08:00
committed by GitHub
23 changed files with 623 additions and 137 deletions

View File

@@ -405,6 +405,10 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Ensure ar1 doesn't recreate the state file
ar.persistLock.Lock()
defer ar.persistLock.Unlock()
// Ensure both alloc runners don't destroy
ar.destroy = true

View File

@@ -50,7 +50,7 @@ var (
// recoverableErrTimeouts returns a recoverable error if the error was due
// to timeouts
recoverableErrTimeouts = func(err error) *structs.RecoverableError {
recoverableErrTimeouts = func(err error) error {
r := false
if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") ||
strings.Contains(err.Error(), "EOF") {
@@ -82,16 +82,24 @@ const (
// Docker's privileged mode.
dockerPrivilegedConfigOption = "docker.privileged.enabled"
// dockerCleanupImageConfigOption is the key for whether or not to
// cleanup images after the task exits.
dockerCleanupImageConfigOption = "docker.cleanup.image"
dockerCleanupImageConfigDefault = true
// dockerTimeout is the length of time a request can be outstanding before
// it is timed out.
dockerTimeout = 5 * time.Minute
// dockerImageResKey is the CreatedResources key for docker images
dockerImageResKey = "image"
)
type DockerDriver struct {
DriverContext
imageID string
driverConfig *DockerDriverConfig
imageID string
}
type DockerDriverAuth struct {
@@ -235,8 +243,6 @@ type DockerHandle struct {
client *docker.Client
waitClient *docker.Client
logger *log.Logger
cleanupImage bool
imageID string
containerID string
version string
clkSpeed float64
@@ -353,35 +359,39 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation {
return cstructs.FSIsolationImage
}
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
driverConfig, err := NewDockerDriverConfig(task, d.taskEnv)
if err != nil {
return err
return nil, err
}
// Set state needed by Start()
d.driverConfig = driverConfig
// Initialize docker API clients
client, _, err := d.dockerClients()
if err != nil {
return fmt.Errorf("Failed to connect to docker daemon: %s", err)
return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err)
}
// Ensure the image is available
if err := d.createImage(driverConfig, client, ctx.TaskDir); err != nil {
return err
return nil, err
}
image := driverConfig.ImageName
// Now that we have the image we can get the image id
dockerImage, err := client.InspectImage(image)
// Regardless of whether the image was downloaded already or not, store
// it as a created resource. Cleanup will soft fail if the image is
// still in use by another contianer.
dockerImage, err := client.InspectImage(driverConfig.ImageName)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed getting image id for %s: %s", image, err)
return fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err)
return nil, err
}
d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID)
// Set state needed by Start()
res := NewCreatedResources()
res.Add(dockerImageResKey, dockerImage.ID)
d.imageID = dockerImage.ID
d.driverConfig = driverConfig
return nil
return res, nil
}
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
@@ -426,23 +436,24 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
config, err := d.createContainerConfig(ctx, task, d.driverConfig, syslogAddr)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", d.imageID, err)
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %q (%q): %v", d.driverConfig.ImageName, d.imageID, err)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", d.imageID, err)
return nil, fmt.Errorf("Failed to create container configuration for image %q (%q): %v", d.driverConfig.ImageName, d.imageID, err)
}
container, rerr := d.createContainer(config)
if rerr != nil {
d.logger.Printf("[ERR] driver.docker: failed to create container: %s", rerr)
container, err := d.createContainer(config)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to create container: %s", err)
pluginClient.Kill()
rerr.Err = fmt.Sprintf("Failed to create container: %s", rerr.Err)
return nil, rerr
if rerr, ok := err.(*structs.RecoverableError); ok {
rerr.Err = fmt.Sprintf("Failed to create container: %s", rerr.Err)
return nil, rerr
}
return nil, err
}
d.logger.Printf("[INFO] driver.docker: created container %s", container.ID)
cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true)
// We don't need to start the container if the container is already running
// since we don't create containers which are already present on the host
// and are running
@@ -466,9 +477,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
waitClient: waitClient,
executor: exec,
pluginClient: pluginClient,
cleanupImage: cleanupImage,
logger: d.logger,
imageID: d.imageID,
containerID: container.ID,
version: d.config.Version,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
@@ -484,6 +493,58 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return h, nil
}
func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error {
retry := false
var merr multierror.Error
for key, resources := range res.Resources {
switch key {
case dockerImageResKey:
for _, value := range resources {
err := d.cleanupImage(value)
if err != nil {
if structs.IsRecoverable(err) {
retry = true
}
merr.Errors = append(merr.Errors, err)
continue
}
// Remove cleaned image from resources
res.Remove(dockerImageResKey, value)
}
default:
d.logger.Printf("[ERR] driver.docker: unknown resource to cleanup: %q", key)
}
}
return structs.NewRecoverableError(merr.ErrorOrNil(), retry)
}
// cleanupImage removes a Docker image. No error is returned if the image
// doesn't exist or is still in use. Requires the global client to already be
// initialized.
func (d *DockerDriver) cleanupImage(id string) error {
if !d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) {
// Config says not to cleanup
return nil
}
if err := client.RemoveImage(id); err != nil {
if err == docker.ErrNoSuchImage {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: does not exist", id)
return nil
}
if derr, ok := err.(*docker.Error); ok && derr.Status == 409 {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id)
return nil
}
// Retry on unknown errors
return structs.NewRecoverableError(err, true)
}
d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id)
return nil
}
// dockerClients creates two *docker.Client, one for long running operations and
// the other for shorter operations. In test / dev mode we can use ENV vars to
// connect to the docker daemon. In production mode we will read docker.endpoint
@@ -657,7 +718,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
}
config := &docker.Config{
Image: driverConfig.ImageName,
Image: d.imageID,
Hostname: driverConfig.Hostname,
User: task.User,
Tty: driverConfig.TTY,
@@ -886,26 +947,28 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc
tag = "latest"
}
var dockerImage *docker.Image
var err error
// We're going to check whether the image is already downloaded. If the tag
// is "latest", or ForcePull is set, we have to check for a new version every time so we don't
// bother to check and cache the id here. We'll download first, then cache.
if driverConfig.ForcePull {
d.logger.Printf("[DEBUG] driver.docker: force pull image '%s:%s' instead of inspecting local", repo, tag)
} else if tag != "latest" {
dockerImage, err = client.InspectImage(image)
if dockerImage, _ := client.InspectImage(image); dockerImage != nil {
// Image exists, nothing to do
return nil
}
}
// Load the image if specified
if len(driverConfig.LoadImages) > 0 {
return d.loadImage(driverConfig, client, taskDir)
}
// Download the image
if dockerImage == nil {
if len(driverConfig.LoadImages) > 0 {
return d.loadImage(driverConfig, client, taskDir)
}
return d.pullImage(driverConfig, client, repo, tag)
if err := d.pullImage(driverConfig, client, repo, tag); err != nil {
return err
}
return err
return nil
}
// pullImage creates an image by pulling it from a docker registry
@@ -955,6 +1018,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
return d.recoverablePullError(err, driverConfig.ImageName)
}
d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag)
return nil
}
@@ -980,7 +1044,7 @@ func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docke
// createContainer creates the container given the passed configuration. It
// attempts to handle any transient Docker errors.
func (d *DockerDriver) createContainer(config docker.CreateContainerOptions) (*docker.Container, *structs.RecoverableError) {
func (d *DockerDriver) createContainer(config docker.CreateContainerOptions) (*docker.Container, error) {
// Create a container
attempted := 0
CREATE:
@@ -989,7 +1053,8 @@ CREATE:
return container, nil
}
d.logger.Printf("[DEBUG] driver.docker: failed to create container %q (attempt %d): %v", config.Name, attempted+1, createErr)
d.logger.Printf("[DEBUG] driver.docker: failed to create container %q from image %q (ID: %q) (attempt %d): %v",
config.Name, d.driverConfig.ImageName, d.imageID, attempted+1, createErr)
if strings.Contains(strings.ToLower(createErr.Error()), "container already exists") {
containers, err := client.ListContainers(docker.ListContainersOptions{
All: true,
@@ -1052,7 +1117,7 @@ CREATE:
// startContainer starts the passed container. It attempts to handle any
// transient Docker errors.
func (d *DockerDriver) startContainer(c *docker.Container) *structs.RecoverableError {
func (d *DockerDriver) startContainer(c *docker.Container) error {
// Start a container
attempted := 0
START:
@@ -1076,8 +1141,6 @@ START:
}
func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true)
// Split the handle
pidBytes := []byte(strings.TrimPrefix(handleID, "DOCKER:"))
pid := &dockerPID{}
@@ -1133,9 +1196,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
waitClient: waitClient,
executor: exec,
pluginClient: pluginClient,
cleanupImage: cleanupImage,
logger: d.logger,
imageID: pid.ImageID,
containerID: pid.ContainerID,
version: pid.Version,
killTimeout: pid.KillTimeout,
@@ -1156,7 +1217,6 @@ func (h *DockerHandle) ID() string {
// Return a handle to the PID
pid := dockerPID{
Version: h.version,
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
@@ -1273,13 +1333,6 @@ func (h *DockerHandle) run() {
h.logger.Printf("[ERR] driver.docker: error removing container: %v", err)
}
// Cleanup the image
if h.cleanupImage {
if err := h.client.RemoveImage(h.imageID); err != nil {
h.logger.Printf("[DEBUG] driver.docker: error removing image: %v", err)
}
}
// Send the results
h.waitCh <- dstructs.NewWaitResult(exitCode, 0, werr)
close(h.waitCh)

View File

@@ -99,7 +99,8 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie
driver := NewDockerDriver(tctx.DriverCtx)
copyImage(t, tctx.ExecCtx.TaskDir, "busybox.tar")
if err := driver.Prestart(tctx.ExecCtx, task); err != nil {
_, err := driver.Prestart(tctx.ExecCtx, task)
if err != nil {
tctx.AllocDir.Destroy()
t.Fatalf("error in prestart: %v", err)
}
@@ -182,9 +183,11 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
d := NewDockerDriver(ctx.DriverCtx)
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@@ -276,7 +279,8 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) {
// Copy the image into the task's directory
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -340,7 +344,7 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
err := d.Prestart(ctx.ExecCtx, task)
_, err := d.Prestart(ctx.ExecCtx, task)
if err == nil {
t.Fatalf("want error in prestart: %v", err)
}
@@ -391,7 +395,8 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
d := NewDockerDriver(ctx.DriverCtx)
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -477,7 +482,6 @@ func TestDockerDriver_StartN(t *testing.T) {
t.Logf("Starting %d tasks", len(taskList))
// Let's spin up a bunch of things
var err error
for idx, task := range taskList {
ctx := testDriverContexts(t, task)
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
@@ -485,7 +489,8 @@ func TestDockerDriver_StartN(t *testing.T) {
d := NewDockerDriver(ctx.DriverCtx)
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart #%d: %v", idx+1, err)
}
handles[idx], err = d.Start(ctx.ExecCtx, task)
@@ -535,7 +540,6 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
t.Logf("Starting %d tasks", len(taskList))
// Let's spin up a bunch of things
var err error
for idx, task := range taskList {
ctx := testDriverContexts(t, task)
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
@@ -545,7 +549,8 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
copyImage(t, ctx.ExecCtx.TaskDir, "busybox_musl.tar")
copyImage(t, ctx.ExecCtx.TaskDir, "busybox_glibc.tar")
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart #%d: %v", idx+1, err)
}
handles[idx], err = d.Start(ctx.ExecCtx, task)
@@ -708,7 +713,7 @@ func TestDockerDriver_ForcePull_IsInvalidConfig(t *testing.T) {
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
driver := NewDockerDriver(ctx.DriverCtx)
if err := driver.Prestart(ctx.ExecCtx, task); err == nil {
if _, err := driver.Prestart(ctx.ExecCtx, task); err == nil {
t.Fatalf("error expected in prestart")
}
}
@@ -916,7 +921,8 @@ func TestDockerDriver_User(t *testing.T) {
defer ctx.AllocDir.Destroy()
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
if err := driver.Prestart(ctx.ExecCtx, task); err != nil {
_, err := driver.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
@@ -1072,7 +1078,8 @@ done
fmt.Errorf("Failed to write data")
}
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -1194,7 +1201,8 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) {
task, driver, execCtx, _, cleanup := setupDockerVolumes(t, cfg, tmpvol)
defer cleanup()
if err := driver.Prestart(execCtx, task); err != nil {
_, err = driver.Prestart(execCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
if _, err := driver.Start(execCtx, task); err == nil {
@@ -1207,7 +1215,8 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) {
task, driver, execCtx, fn, cleanup := setupDockerVolumes(t, cfg, ".")
defer cleanup()
if err := driver.Prestart(execCtx, task); err != nil {
_, err := driver.Prestart(execCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := driver.Start(execCtx, task)
@@ -1243,7 +1252,8 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) {
task, driver, execCtx, hostpath, cleanup := setupDockerVolumes(t, cfg, tmpvol)
defer cleanup()
if err := driver.Prestart(execCtx, task); err != nil {
_, err = driver.Prestart(execCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := driver.Start(execCtx, task)
@@ -1266,6 +1276,56 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) {
}
}
// TestDockerDriver_Cleanup ensures Cleanup removes only downloaded images.
func TestDockerDriver_Cleanup(t *testing.T) {
if !testutil.DockerIsConnected(t) {
t.SkipNow()
}
imageName := "hello-world:latest"
task := &structs.Task{
Name: "cleanup_test",
Driver: "docker",
Config: map[string]interface{}{
"image": imageName,
},
}
tctx := testDriverContexts(t, task)
defer tctx.AllocDir.Destroy()
// Run Prestart
driver := NewDockerDriver(tctx.DriverCtx).(*DockerDriver)
res, err := driver.Prestart(tctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
if len(res.Resources) == 0 || len(res.Resources[dockerImageResKey]) == 0 {
t.Fatalf("no created resources: %#v", res)
}
// Cleanup
rescopy := res.Copy()
if err := driver.Cleanup(tctx.ExecCtx, rescopy); err != nil {
t.Fatalf("Cleanup failed: %v", err)
}
// Make sure rescopy is updated
if len(rescopy.Resources) > 0 {
t.Errorf("Cleanup should have cleared resource map: %#v", rescopy.Resources)
}
// Ensure image was removed
if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil {
t.Fatalf("image exists but should have been removed. Does another %v container exist?", imageName)
}
// The image doesn't exist which shouldn't be an error when calling
// Cleanup, so call it again to make sure.
if err := driver.Cleanup(tctx.ExecCtx, res.Copy()); err != nil {
t.Fatalf("Cleanup failed: %v", err)
}
}
func copyImage(t *testing.T, taskDir *allocdir.TaskDir, image string) {
dst := filepath.Join(taskDir.LocalDir, image)
copyFile(filepath.Join("./test-resources/docker", image), dst, t)

View File

@@ -51,6 +51,100 @@ func NewDriver(name string, ctx *DriverContext) (Driver, error) {
// Factory is used to instantiate a new Driver
type Factory func(*DriverContext) Driver
// CreatedResources is a map of resources (eg downloaded images) created by a driver
// that must be cleaned up.
type CreatedResources struct {
Resources map[string][]string
}
func NewCreatedResources() *CreatedResources {
return &CreatedResources{Resources: make(map[string][]string)}
}
// Add a new resource if it doesn't already exist.
func (r *CreatedResources) Add(k, v string) {
if r.Resources == nil {
r.Resources = map[string][]string{k: []string{v}}
return
}
existing, ok := r.Resources[k]
if !ok {
// Key doesn't exist, create it
r.Resources[k] = []string{v}
return
}
for _, item := range existing {
if item == v {
// resource exists, return
return
}
}
// Resource type exists but value did not, append it
r.Resources[k] = append(existing, v)
return
}
// Remove a resource. Return true if removed, otherwise false.
//
// Removes the entire key if the needle is the last value in the list.
func (r *CreatedResources) Remove(k, needle string) bool {
haystack := r.Resources[k]
for i, item := range haystack {
if item == needle {
r.Resources[k] = append(haystack[:i], haystack[i+1:]...)
if len(r.Resources[k]) == 0 {
delete(r.Resources, k)
}
return true
}
}
return false
}
// Copy returns a new deep copy of CreatedResrouces.
func (r *CreatedResources) Copy() *CreatedResources {
newr := CreatedResources{
Resources: make(map[string][]string, len(r.Resources)),
}
for k, v := range r.Resources {
newv := make([]string, len(v))
copy(newv, v)
newr.Resources[k] = newv
}
return &newr
}
// Merge another CreatedResources into this one. If the other CreatedResources
// is nil this method is a noop.
func (r *CreatedResources) Merge(o *CreatedResources) {
if o == nil {
return
}
for k, v := range o.Resources {
// New key
if len(r.Resources[k]) == 0 {
r.Resources[k] = v
continue
}
// Existing key
OUTER:
for _, item := range v {
for _, existing := range r.Resources[k] {
if item == existing {
// Found it, move on
continue OUTER
}
}
// New item, append it
r.Resources[k] = append(r.Resources[k], item)
}
}
}
// Driver is used for execution of tasks. This allows Nomad
// to support many pluggable implementations of task drivers.
// Examples could include LXC, Docker, Qemu, etc.
@@ -60,7 +154,9 @@ type Driver interface {
// Prestart prepares the task environment and performs expensive
// intialization steps like downloading images.
Prestart(*ExecContext, *structs.Task) error
//
// CreatedResources may be non-nil even when an error occurs.
Prestart(*ExecContext, *structs.Task) (*CreatedResources, error)
// Start is used to being task execution
Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error)
@@ -68,6 +164,14 @@ type Driver interface {
// Open is used to re-open a handle to a task
Open(ctx *ExecContext, handleID string) (DriverHandle, error)
// Cleanup is called to remove resources which were created for a task
// and no longer needed.
//
// If Cleanup returns a recoverable error it may be retried. On retry
// it will be passed the same CreatedResources, so all successfully
// cleaned up resources should be removed.
Cleanup(*ExecContext, *CreatedResources) error
// Drivers must validate their configuration
Validate(map[string]interface{}) error

View File

@@ -256,3 +256,80 @@ func TestMapMergeStrStr(t *testing.T) {
t.Errorf("\nExpected\n%+v\nGot\n%+v\n", d, c)
}
}
func TestCreatedResources_AddMerge(t *testing.T) {
res1 := NewCreatedResources()
res1.Add("k1", "v1")
res1.Add("k1", "v2")
res1.Add("k1", "v1")
res1.Add("k2", "v1")
expected := map[string][]string{
"k1": {"v1", "v2"},
"k2": {"v1"},
}
if !reflect.DeepEqual(expected, res1.Resources) {
t.Fatalf("1. %#v != expected %#v", res1.Resources, expected)
}
// Make sure merging nil works
var res2 *CreatedResources
res1.Merge(res2)
if !reflect.DeepEqual(expected, res1.Resources) {
t.Fatalf("2. %#v != expected %#v", res1.Resources, expected)
}
// Make sure a normal merge works
res2 = NewCreatedResources()
res2.Add("k1", "v3")
res2.Add("k2", "v1")
res2.Add("k3", "v3")
res1.Merge(res2)
expected = map[string][]string{
"k1": {"v1", "v2", "v3"},
"k2": {"v1"},
"k3": {"v3"},
}
if !reflect.DeepEqual(expected, res1.Resources) {
t.Fatalf("3. %#v != expected %#v", res1.Resources, expected)
}
}
func TestCreatedResources_CopyRemove(t *testing.T) {
res1 := NewCreatedResources()
res1.Add("k1", "v1")
res1.Add("k1", "v2")
res1.Add("k1", "v3")
res1.Add("k2", "v1")
// Assert Copy creates a deep copy
res2 := res1.Copy()
if !reflect.DeepEqual(res1, res2) {
t.Fatalf("%#v != %#v", res1, res2)
}
// Assert removing v1 from k1 returns true and updates Resources slice
if removed := res2.Remove("k1", "v1"); !removed {
t.Fatalf("expected v1 to be removed: %#v", res2)
}
if expected := []string{"v2", "v3"}; !reflect.DeepEqual(expected, res2.Resources["k1"]) {
t.Fatalf("unpexpected list for k1: %#v", res2.Resources["k1"])
}
// Assert removing the only value from a key removes the key
if removed := res2.Remove("k2", "v1"); !removed {
t.Fatalf("expected v1 to be removed from k2: %#v", res2.Resources)
}
if _, found := res2.Resources["k2"]; found {
t.Fatalf("k2 should have been removed from Resources: %#v", res2.Resources)
}
// Make sure res1 wasn't updated
if reflect.DeepEqual(res1, res2) {
t.Fatalf("res1 should not equal res2: #%v", res1)
}
}

View File

@@ -92,8 +92,8 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
func (d *ExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
func (d *ExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
return nil, nil
}
func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
@@ -167,6 +167,8 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return h, nil
}
func (d *ExecDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil }
type execId struct {
Version string
KillTimeout time.Duration

View File

@@ -67,7 +67,7 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -111,7 +111,7 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -145,12 +145,20 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) {
handle2.Kill()
t.Fatalf("expected handle2 to be nil")
}
// Test if the userpid is still present
userProc, err := os.FindProcess(id.UserPid)
userProc, _ := os.FindProcess(id.UserPid)
err = userProc.Signal(syscall.Signal(0))
for retry := 3; retry > 0; retry-- {
if err = userProc.Signal(syscall.Signal(0)); err != nil {
// Process is gone as expected; exit
return
}
if err == nil {
// Killing processes is async; wait and check again
time.Sleep(time.Second)
}
if err = userProc.Signal(syscall.Signal(0)); err == nil {
t.Fatalf("expected user process to die")
}
}
@@ -175,7 +183,7 @@ func TestExecDriver_Start_Wait(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -229,7 +237,7 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -283,7 +291,7 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -349,7 +357,7 @@ done
fmt.Errorf("Failed to write data")
}
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -414,7 +422,7 @@ func TestExecDriverUser(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)

View File

@@ -167,8 +167,8 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
return true, nil
}
func (d *JavaDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
func (d *JavaDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
return nil, nil
}
func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*JavaDriverConfig, error) {
@@ -293,6 +293,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return h, nil
}
func (d *JavaDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil }
// cgroupsMounted returns true if the cgroups are mounted on a system otherwise
// returns false
func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool {

View File

@@ -94,7 +94,7 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) {
dst := ctx.ExecCtx.TaskDir.Dir
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -149,7 +149,7 @@ func TestJavaDriver_Start_Wait(t *testing.T) {
dst := ctx.ExecCtx.TaskDir.Dir
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -215,7 +215,7 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) {
dst := ctx.ExecCtx.TaskDir.Dir
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -277,7 +277,7 @@ func TestJavaDriver_Signal(t *testing.T) {
dst := ctx.ExecCtx.TaskDir.Dir
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -336,7 +336,7 @@ func TestJavaDriverUser(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewJavaDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)

View File

@@ -171,8 +171,8 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e
return true, nil
}
func (d *LxcDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
func (d *LxcDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
return nil, nil
}
// Start starts the LXC Driver
@@ -286,6 +286,8 @@ func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
return &handle, nil
}
func (d *LxcDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil }
// Open creates the driver to monitor an existing LXC container
func (d *LxcDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
pid := &lxcPID{}

View File

@@ -72,7 +72,7 @@ func TestLxcDriver_Start_Wait(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewLxcDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -148,7 +148,7 @@ func TestLxcDriver_Open_Wait(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewLxcDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/mitchellh/mapstructure"
@@ -62,6 +63,8 @@ type MockDriverConfig struct {
type MockDriver struct {
DriverContext
fingerprint.StaticFingerprinter
cleanupFailNum int
}
// NewMockDriver is a factory method which returns a new Mock Driver
@@ -79,8 +82,8 @@ func (d *MockDriver) FSIsolation() cstructs.FSIsolation {
return cstructs.FSIsolationNone
}
func (d *MockDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
func (d *MockDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
return nil, nil
}
// Start starts the mock driver
@@ -124,6 +127,24 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return &h, nil
}
// Cleanup deletes all keys except for Config.Options["cleanup_fail_on"] for
// Config.Options["cleanup_fail_num"] times. For failures it will return a
// recoverable error.
func (m *MockDriver) Cleanup(ctx *ExecContext, res *CreatedResources) error {
var err error
failn, _ := strconv.Atoi(m.config.Options["cleanup_fail_num"])
failk := m.config.Options["cleanup_fail_on"]
for k := range res.Resources {
if k == failk && m.cleanupFailNum < failn {
m.cleanupFailNum++
err = structs.NewRecoverableError(fmt.Errorf("mock_driver failure on %q call %d/%d", k, m.cleanupFailNum, failn), true)
} else {
delete(res.Resources, k)
}
}
return err
}
// Validate validates the mock driver configuration
func (m *MockDriver) Validate(map[string]interface{}) error {
return nil

View File

@@ -136,8 +136,8 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
return true, nil
}
func (d *QemuDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
func (d *QemuDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
return nil, nil
}
// Run an existing Qemu image. Start() will pull down an existing, valid Qemu
@@ -336,6 +336,8 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
return h, nil
}
func (d *QemuDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil }
func (h *qemuHandle) ID() string {
id := qemuId{
Version: h.version,

View File

@@ -80,7 +80,7 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) {
dst := ctx.ExecCtx.TaskDir.Dir
copyFile("./test-resources/qemu/linux-0.2.img", filepath.Join(dst, "linux-0.2.img"), t)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("Prestart faild: %v", err)
}
@@ -146,7 +146,7 @@ func TestQemuDriverUser(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewQemuDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("Prestart faild: %v", err)
}

View File

@@ -106,8 +106,8 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo
return false, nil
}
func (d *RawExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
func (d *RawExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
return nil, nil
}
func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
@@ -177,6 +177,8 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
return h, nil
}
func (d *RawExecDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil }
type rawExecId struct {
Version string
KillTimeout time.Duration

View File

@@ -77,7 +77,7 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRawExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -126,7 +126,7 @@ func TestRawExecDriver_Start_Wait(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRawExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -180,7 +180,7 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRawExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -233,7 +233,7 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRawExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -286,7 +286,7 @@ func TestRawExecDriverUser(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRawExecDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -335,7 +335,7 @@ done
fmt.Errorf("Failed to write data")
}
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)

View File

@@ -207,8 +207,8 @@ func (d *RktDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
return nil, nil
}
// Run an existing Rkt image.
@@ -451,6 +451,8 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
return h, nil
}
func (d *RktDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil }
func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
// Parse the handle
pidBytes := []byte(strings.TrimPrefix(handleID, "Rkt:"))

View File

@@ -98,7 +98,7 @@ func TestRktDriver_Start_DNS(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRktDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -149,7 +149,7 @@ func TestRktDriver_Start_Wait(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRktDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -210,7 +210,7 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRktDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -281,7 +281,7 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRktDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -343,7 +343,7 @@ func TestRktDriverUser(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRktDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -384,7 +384,7 @@ func TestRktTrustPrefix(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRktDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)
@@ -462,7 +462,7 @@ func TestRktDriver_PortsMapping(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewRktDriver(ctx.DriverCtx)
if err := d.Prestart(ctx.ExecCtx, task); err != nil {
if _, err := d.Prestart(ctx.ExecCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(ctx.ExecCtx, task)

View File

@@ -93,6 +93,11 @@ type TaskRunner struct {
// Must acquire persistLock when accessing
taskDirBuilt bool
// createdResources are all the resources created by the task driver
// across all attempts to start the task.
createdResources *driver.CreatedResources
createdResourcesLock sync.Mutex
// payloadRendered tracks whether the payload has been rendered to disk
payloadRendered bool
@@ -130,7 +135,10 @@ type TaskRunner struct {
// waitCh closing marks the run loop as having exited
waitCh chan struct{}
// serialize SaveState calls
// persistLock must be acquired when accessing fields stored by
// SaveState. SaveState is called asynchronously to TaskRunner.Run by
// AllocRunner, so all state fields must be synchronized using this
// lock.
persistLock sync.Mutex
}
@@ -141,6 +149,7 @@ type taskRunnerState struct {
HandleID string
ArtifactDownloaded bool
TaskDirBuilt bool
CreatedResources *driver.CreatedResources
PayloadRendered bool
}
@@ -177,22 +186,23 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
tc := &TaskRunner{
config: config,
updater: updater,
logger: logger,
restartTracker: restartTracker,
alloc: alloc,
task: task,
taskDir: taskDir,
vaultClient: vaultClient,
vaultFuture: NewTokenFuture().Set(""),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
unblockCh: make(chan struct{}),
restartCh: make(chan *structs.TaskEvent),
signalCh: make(chan SignalEvent),
config: config,
updater: updater,
logger: logger,
restartTracker: restartTracker,
alloc: alloc,
task: task,
taskDir: taskDir,
createdResources: driver.NewCreatedResources(),
vaultClient: vaultClient,
vaultFuture: NewTokenFuture().Set(""),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
unblockCh: make(chan struct{}),
restartCh: make(chan *structs.TaskEvent),
signalCh: make(chan SignalEvent),
}
return tc
@@ -237,6 +247,7 @@ func (r *TaskRunner) RestoreState() error {
}
r.artifactsDownloaded = snap.ArtifactDownloaded
r.taskDirBuilt = snap.TaskDirBuilt
r.createdResources = snap.CreatedResources
r.payloadRendered = snap.PayloadRendered
if err := r.setTaskEnv(); err != nil {
@@ -292,12 +303,17 @@ func (r *TaskRunner) SaveState() error {
r.persistLock.Lock()
defer r.persistLock.Unlock()
r.createdResourcesLock.Lock()
res := r.createdResources.Copy()
r.createdResourcesLock.Unlock()
snap := taskRunnerState{
Task: r.task,
Version: r.config.Version,
ArtifactDownloaded: r.artifactsDownloaded,
TaskDirBuilt: r.taskDirBuilt,
PayloadRendered: r.payloadRendered,
CreatedResources: res,
}
r.handleLock.Lock()
@@ -870,6 +886,7 @@ func (r *TaskRunner) run() {
select {
case success := <-prestartResultCh:
if !success {
r.cleanup()
r.setState(structs.TaskStateDead, nil)
return
}
@@ -958,6 +975,7 @@ func (r *TaskRunner) run() {
running := r.running
r.runningLock.Unlock()
if !running {
r.cleanup()
r.setState(structs.TaskStateDead, r.destroyEvent)
return
}
@@ -976,6 +994,11 @@ func (r *TaskRunner) run() {
r.killTask(killEvent)
close(stopCollection)
// Wait for handler to exit before calling cleanup
<-handleWaitCh
r.cleanup()
r.setState(structs.TaskStateDead, nil)
return
}
@@ -984,6 +1007,7 @@ func (r *TaskRunner) run() {
RESTART:
restart := r.shouldRestart()
if !restart {
r.cleanup()
r.setState(structs.TaskStateDead, nil)
return
}
@@ -997,6 +1021,44 @@ func (r *TaskRunner) run() {
}
}
// cleanup calls Driver.Cleanup when a task is stopping. Errors are logged.
func (r *TaskRunner) cleanup() {
drv, err := r.createDriver()
if err != nil {
r.logger.Printf("[ERR] client: error creating driver to cleanup resources: %v", err)
return
}
r.createdResourcesLock.Lock()
res := r.createdResources.Copy()
r.createdResourcesLock.Unlock()
ctx := driver.NewExecContext(r.taskDir, r.alloc.ID)
attempts := 1
var cleanupErr error
for retry := true; retry; attempts++ {
cleanupErr = drv.Cleanup(ctx, res)
retry = structs.IsRecoverable(cleanupErr)
// Copy current createdResources state in case SaveState is
// called between retries
r.createdResourcesLock.Lock()
r.createdResources = res.Copy()
r.createdResourcesLock.Unlock()
// Retry 3 times with sleeps between
if !retry || attempts > 3 {
break
}
time.Sleep(time.Duration(attempts) * time.Second)
}
if cleanupErr != nil {
r.logger.Printf("[ERR] client: error cleaning up resources for task %q after %d attempts: %v", r.task.Name, attempts, cleanupErr)
}
return
}
// shouldRestart returns if the task should restart. If the return value is
// true, the task's restart policy has already been considered and any wait time
// between restarts has been applied.
@@ -1095,7 +1157,14 @@ func (r *TaskRunner) startTask() error {
// Run prestart
ctx := driver.NewExecContext(r.taskDir, r.alloc.ID)
if err := drv.Prestart(ctx, r.task); err != nil {
res, err := drv.Prestart(ctx, r.task)
// Merge newly created resources into previously created resources
r.createdResourcesLock.Lock()
r.createdResources.Merge(res)
r.createdResourcesLock.Unlock()
if err != nil {
wrapped := fmt.Errorf("failed to initialize task %q for alloc %q: %v",
r.task.Name, r.alloc.ID, err)

View File

@@ -1305,3 +1305,53 @@ func TestTaskRunner_SimpleRun_Dispatch(t *testing.T) {
t.Fatalf("Bad; got %v; want %v", string(data), string(expected))
}
}
func TestTaskRunner_CleanupOK(t *testing.T) {
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
key := "ERR"
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.config.Options = map[string]string{
"cleanup_fail_on": key,
"cleanup_fail_num": "1",
}
ctx.tr.MarkReceived()
ctx.tr.createdResources.Resources[key] = []string{"x", "y"}
ctx.tr.createdResources.Resources["foo"] = []string{"z"}
defer ctx.Cleanup()
ctx.tr.Run()
// Since we only failed once, createdResources should be empty
if len(ctx.tr.createdResources.Resources) > 0 {
t.Fatalf("expected all created resources to be removed: %#v", ctx.tr.createdResources.Resources)
}
}
func TestTaskRunner_CleanupFail(t *testing.T) {
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
key := "ERR"
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.config.Options = map[string]string{
"cleanup_fail_on": key,
"cleanup_fail_num": "5",
}
ctx.tr.MarkReceived()
ctx.tr.createdResources.Resources[key] = []string{"x"}
ctx.tr.createdResources.Resources["foo"] = []string{"y", "z"}
defer ctx.Cleanup()
ctx.tr.Run()
// Since we failed > 3 times, the failed key should remain
expected := map[string][]string{key: {"x"}}
if !reflect.DeepEqual(expected, ctx.tr.createdResources.Resources) {
t.Fatalf("expected %#v but found: %#v", expected, ctx.tr.createdResources.Resources)
}
}

View File

@@ -944,7 +944,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
// setErr is a helper for setting the recoverable error on the reply and
// logging it
setErr := func(e error, recoverable bool) {
reply.Error = structs.NewRecoverableError(e, recoverable)
reply.Error = structs.NewRecoverableError(e, recoverable).(*structs.RecoverableError)
n.srv.logger.Printf("[ERR] nomad.client: DeriveVaultToken failed (recoverable %v): %v", recoverable, e)
}
@@ -1134,7 +1134,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
if rerr, ok := createErr.(*structs.RecoverableError); ok {
reply.Error = rerr
} else {
reply.Error = structs.NewRecoverableError(createErr, false)
reply.Error = structs.NewRecoverableError(createErr, false).(*structs.RecoverableError)
}
return nil

View File

@@ -4054,7 +4054,7 @@ type RecoverableError struct {
// NewRecoverableError is used to wrap an error and mark it as recoverable or
// not.
func NewRecoverableError(e error, recoverable bool) *RecoverableError {
func NewRecoverableError(e error, recoverable bool) error {
if e == nil {
return nil
}
@@ -4068,3 +4068,12 @@ func NewRecoverableError(e error, recoverable bool) *RecoverableError {
func (r *RecoverableError) Error() string {
return r.Err
}
// IsRecoverable returns true if error is a RecoverableError with
// Recoverable=true. Otherwise false is returned.
func IsRecoverable(e error) bool {
if re, ok := e.(*RecoverableError); ok {
return re.Recoverable
}
return false
}

View File

@@ -1,6 +1,7 @@
package structs
import (
"fmt"
"reflect"
"strings"
"testing"
@@ -1539,3 +1540,21 @@ func TestDispatchInputConfig_Validate(t *testing.T) {
t.Fatalf("bad: %v", err)
}
}
func TestIsRecoverable(t *testing.T) {
if IsRecoverable(nil) {
t.Errorf("nil should not be recoverable")
}
if IsRecoverable(NewRecoverableError(nil, true)) {
t.Errorf("NewRecoverableError(nil, true) should not be recoverable")
}
if IsRecoverable(fmt.Errorf("i promise im recoverable")) {
t.Errorf("Custom errors should not be recoverable")
}
if IsRecoverable(NewRecoverableError(fmt.Errorf(""), false)) {
t.Errorf("Explicitly unrecoverable errors should not be recoverable")
}
if !IsRecoverable(NewRecoverableError(fmt.Errorf(""), true)) {
t.Errorf("Explicitly recoverable errors *should* be recoverable")
}
}