mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
Proper reference counting through task restarts
This PR fixes an issue in which the reference count on a Docker image would become inflated through task restarts.
This commit is contained in:
@@ -857,7 +857,7 @@ func (c *Client) setupDrivers() error {
|
||||
|
||||
var avail []string
|
||||
var skipped []string
|
||||
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil, nil)
|
||||
driverCtx := driver.NewDriverContext("", "", c.config, c.config.Node, c.logger, nil, nil)
|
||||
for name := range driver.BuiltinDrivers {
|
||||
// Skip fingerprinting drivers that are not in the whitelist if it is
|
||||
// enabled.
|
||||
|
||||
@@ -419,8 +419,9 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation {
|
||||
return cstructs.FSIsolationImage
|
||||
}
|
||||
|
||||
// getDockerCoordinator returns the docker coordinator
|
||||
func (d *DockerDriver) getDockerCoordinator(client *docker.Client) *dockerCoordinator {
|
||||
// getDockerCoordinator returns the docker coordinator and the caller ID to use when
|
||||
// interacting with the coordinator
|
||||
func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoordinator, string) {
|
||||
config := &dockerCoordinatorConfig{
|
||||
client: client,
|
||||
cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault),
|
||||
@@ -428,7 +429,7 @@ func (d *DockerDriver) getDockerCoordinator(client *docker.Client) *dockerCoordi
|
||||
removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault),
|
||||
}
|
||||
|
||||
return GetDockerCoordinator(config)
|
||||
return GetDockerCoordinator(config), fmt.Sprintf("%s-%s", d.DriverContext.allocID, d.DriverContext.taskName)
|
||||
}
|
||||
|
||||
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
|
||||
@@ -474,7 +475,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
||||
TaskEnv: d.taskEnv,
|
||||
Task: task,
|
||||
Driver: "docker",
|
||||
AllocID: ctx.AllocID,
|
||||
AllocID: d.DriverContext.allocID,
|
||||
LogDir: ctx.TaskDir.LogDir,
|
||||
TaskDir: ctx.TaskDir.Dir,
|
||||
PortLowerBound: d.config.ClientMinPort,
|
||||
@@ -588,14 +589,14 @@ func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error {
|
||||
// cleanupImage removes a Docker image. No error is returned if the image
|
||||
// doesn't exist or is still in use. Requires the global client to already be
|
||||
// initialized.
|
||||
func (d *DockerDriver) cleanupImage(id string) error {
|
||||
func (d *DockerDriver) cleanupImage(imageID string) error {
|
||||
if !d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) {
|
||||
// Config says not to cleanup
|
||||
return nil
|
||||
}
|
||||
|
||||
coordinator := d.getDockerCoordinator(client)
|
||||
coordinator.RemoveImage(id)
|
||||
coordinator, callerID := d.getDockerCoordinator(client)
|
||||
coordinator.RemoveImage(imageID, callerID)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -914,7 +915,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
|
||||
|
||||
config.Env = d.taskEnv.EnvList()
|
||||
|
||||
containerName := fmt.Sprintf("%s-%s", task.Name, ctx.AllocID)
|
||||
containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID)
|
||||
d.logger.Printf("[DEBUG] driver.docker: setting container name to: %s", containerName)
|
||||
|
||||
var networkingConfig *docker.NetworkingConfig
|
||||
@@ -952,7 +953,7 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc
|
||||
tag = "latest"
|
||||
}
|
||||
|
||||
coordinator := d.getDockerCoordinator(client)
|
||||
coordinator, callerID := d.getDockerCoordinator(client)
|
||||
|
||||
// We're going to check whether the image is already downloaded. If the tag
|
||||
// is "latest", or ForcePull is set, we have to check for a new version every time so we don't
|
||||
@@ -962,7 +963,7 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc
|
||||
} else if tag != "latest" {
|
||||
if dockerImage, _ := client.InspectImage(image); dockerImage != nil {
|
||||
// Image exists so just increment its reference count
|
||||
coordinator.IncrementImageReference(dockerImage.ID, image)
|
||||
coordinator.IncrementImageReference(dockerImage.ID, image, callerID)
|
||||
return dockerImage.ID, nil
|
||||
}
|
||||
}
|
||||
@@ -1001,8 +1002,8 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke
|
||||
}
|
||||
|
||||
d.emitEvent("Downloading image %s:%s", repo, tag)
|
||||
coordinator := d.getDockerCoordinator(client)
|
||||
return coordinator.PullImage(driverConfig.ImageName, authOptions)
|
||||
coordinator, callerID := d.getDockerCoordinator(client)
|
||||
return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID)
|
||||
}
|
||||
|
||||
// loadImage creates an image by loading it from the file system
|
||||
@@ -1027,8 +1028,8 @@ func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docke
|
||||
return "", recoverableErrTimeouts(err)
|
||||
}
|
||||
|
||||
coordinator := d.getDockerCoordinator(client)
|
||||
coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName)
|
||||
coordinator, callerID := d.getDockerCoordinator(client)
|
||||
coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName, callerID)
|
||||
return dockerImage.ID, nil
|
||||
}
|
||||
|
||||
@@ -1186,8 +1187,8 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
|
||||
|
||||
// Increment the reference count since we successfully attached to this
|
||||
// container
|
||||
coordinator := d.getDockerCoordinator(client)
|
||||
coordinator.IncrementImageReference(pid.ImageID, pid.Image)
|
||||
coordinator, callerID := d.getDockerCoordinator(client)
|
||||
coordinator.IncrementImageReference(pid.ImageID, pid.Image, callerID)
|
||||
|
||||
// Return a driver handle
|
||||
h := &DockerHandle{
|
||||
|
||||
@@ -99,7 +99,7 @@ type dockerCoordinator struct {
|
||||
pullFutures map[string]*pullFuture
|
||||
|
||||
// imageRefCount is the reference count of image IDs
|
||||
imageRefCount map[string]int
|
||||
imageRefCount map[string]map[string]struct{}
|
||||
|
||||
// deleteFuture is indexed by image ID and has a cancable delete future
|
||||
deleteFuture map[string]context.CancelFunc
|
||||
@@ -114,7 +114,7 @@ func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
|
||||
return &dockerCoordinator{
|
||||
dockerCoordinatorConfig: config,
|
||||
pullFutures: make(map[string]*pullFuture),
|
||||
imageRefCount: make(map[string]int),
|
||||
imageRefCount: make(map[string]map[string]struct{}),
|
||||
deleteFuture: make(map[string]context.CancelFunc),
|
||||
}
|
||||
}
|
||||
@@ -130,7 +130,7 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
|
||||
|
||||
// PullImage is used to pull an image. It returns the pulled imaged ID or an
|
||||
// error that occured during the pull
|
||||
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration) (imageID string, err error) {
|
||||
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string) (imageID string, err error) {
|
||||
// Get the future
|
||||
d.imageLock.Lock()
|
||||
future, ok := d.pullFutures[image]
|
||||
@@ -157,7 +157,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf
|
||||
|
||||
// If we are cleaning up, we increment the reference count on the image
|
||||
if err == nil && d.cleanup {
|
||||
d.incrementImageReferenceImpl(id, image)
|
||||
d.incrementImageReferenceImpl(id, image, callerID)
|
||||
}
|
||||
|
||||
return id, err
|
||||
@@ -202,29 +202,39 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth
|
||||
}
|
||||
|
||||
// IncrementImageReference is used to increment an image reference count
|
||||
func (d *dockerCoordinator) IncrementImageReference(id, image string) {
|
||||
func (d *dockerCoordinator) IncrementImageReference(imageID, imageName, callerID string) {
|
||||
d.imageLock.Lock()
|
||||
defer d.imageLock.Unlock()
|
||||
d.incrementImageReferenceImpl(id, image)
|
||||
if d.cleanup {
|
||||
d.incrementImageReferenceImpl(imageID, imageName, callerID)
|
||||
}
|
||||
}
|
||||
|
||||
// incrementImageReferenceImpl assumes the lock is held
|
||||
func (d *dockerCoordinator) incrementImageReferenceImpl(id, image string) {
|
||||
func (d *dockerCoordinator) incrementImageReferenceImpl(imageID, imageName, callerID string) {
|
||||
// Cancel any pending delete
|
||||
if cancel, ok := d.deleteFuture[id]; ok {
|
||||
d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", image)
|
||||
if cancel, ok := d.deleteFuture[imageID]; ok {
|
||||
d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", imageName)
|
||||
cancel()
|
||||
delete(d.deleteFuture, id)
|
||||
delete(d.deleteFuture, imageID)
|
||||
}
|
||||
|
||||
// Increment the reference
|
||||
d.imageRefCount[id] += 1
|
||||
d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", image, id, d.imageRefCount[id])
|
||||
references, ok := d.imageRefCount[imageID]
|
||||
if !ok {
|
||||
references = make(map[string]struct{})
|
||||
d.imageRefCount[imageID] = references
|
||||
}
|
||||
|
||||
if _, ok := references[callerID]; !ok {
|
||||
references[callerID] = struct{}{}
|
||||
d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", imageName, imageID, len(references))
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveImage removes the given image. If there are any errors removing the
|
||||
// image, the remove is retried internally.
|
||||
func (d *dockerCoordinator) RemoveImage(id string) {
|
||||
func (d *dockerCoordinator) RemoveImage(imageID, callerID string) {
|
||||
d.imageLock.Lock()
|
||||
defer d.imageLock.Unlock()
|
||||
|
||||
@@ -232,36 +242,36 @@ func (d *dockerCoordinator) RemoveImage(id string) {
|
||||
return
|
||||
}
|
||||
|
||||
references, ok := d.imageRefCount[id]
|
||||
references, ok := d.imageRefCount[imageID]
|
||||
if !ok {
|
||||
d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", id)
|
||||
d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", imageID)
|
||||
return
|
||||
}
|
||||
|
||||
// Decrement the reference count
|
||||
references--
|
||||
d.imageRefCount[id] = references
|
||||
d.logger.Printf("[DEBUG] driver.docker: image id %q reference count decremented: %d", id, references)
|
||||
delete(references, callerID)
|
||||
count := len(references)
|
||||
d.logger.Printf("[DEBUG] driver.docker: image id %q reference count decremented: %d", imageID, count)
|
||||
|
||||
// Nothing to do
|
||||
if references != 0 {
|
||||
if count != 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// This should never be the case but we safefty guard so we don't leak a
|
||||
// cancel.
|
||||
if cancel, ok := d.deleteFuture[id]; ok {
|
||||
d.logger.Printf("[ERR] driver.docker: image id %q has lingering delete future", id)
|
||||
if cancel, ok := d.deleteFuture[imageID]; ok {
|
||||
d.logger.Printf("[ERR] driver.docker: image id %q has lingering delete future", imageID)
|
||||
cancel()
|
||||
}
|
||||
|
||||
// Setup a future to delete the image
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
d.deleteFuture[id] = cancel
|
||||
go d.removeImageImpl(id, ctx)
|
||||
d.deleteFuture[imageID] = cancel
|
||||
go d.removeImageImpl(imageID, ctx)
|
||||
|
||||
// Delete the key from the reference count
|
||||
delete(d.imageRefCount, id)
|
||||
delete(d.imageRefCount, imageID)
|
||||
}
|
||||
|
||||
// removeImageImpl is used to remove an image. It wil wait the specified remove
|
||||
|
||||
@@ -63,7 +63,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
|
||||
id := ""
|
||||
for i := 0; i < 10; i++ {
|
||||
go func() {
|
||||
id, _ = coordinator.PullImage(image, nil)
|
||||
id, _ = coordinator.PullImage(image, nil, structs.GenerateUUID())
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -74,8 +74,8 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
|
||||
}
|
||||
|
||||
// Check the reference count
|
||||
if r := coordinator.imageRefCount[id]; r != 10 {
|
||||
return false, fmt.Errorf("Got reference count %d; want %d", r, 10)
|
||||
if references := coordinator.imageRefCount[id]; len(references) != 10 {
|
||||
return false, fmt.Errorf("Got reference count %d; want %d", len(references), 10)
|
||||
}
|
||||
|
||||
// Ensure there is no pull future
|
||||
@@ -107,33 +107,35 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) {
|
||||
coordinator := NewDockerCoordinator(config)
|
||||
|
||||
id := ""
|
||||
callerIDs := make([]string, 10, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
id, _ = coordinator.PullImage(image, nil)
|
||||
callerIDs[i] = structs.GenerateUUID()
|
||||
id, _ = coordinator.PullImage(image, nil, callerIDs[i])
|
||||
}
|
||||
|
||||
// Check the reference count
|
||||
if r := coordinator.imageRefCount[id]; r != 10 {
|
||||
t.Fatalf("Got reference count %d; want %d", r, 10)
|
||||
if references := coordinator.imageRefCount[id]; len(references) != 10 {
|
||||
t.Fatalf("Got reference count %d; want %d", len(references), 10)
|
||||
}
|
||||
|
||||
// Remove some
|
||||
for i := 0; i < 8; i++ {
|
||||
coordinator.RemoveImage(id)
|
||||
coordinator.RemoveImage(id, callerIDs[i])
|
||||
}
|
||||
|
||||
// Check the reference count
|
||||
if r := coordinator.imageRefCount[id]; r != 2 {
|
||||
t.Fatalf("Got reference count %d; want %d", r, 2)
|
||||
if references := coordinator.imageRefCount[id]; len(references) != 2 {
|
||||
t.Fatalf("Got reference count %d; want %d", len(references), 2)
|
||||
}
|
||||
|
||||
// Remove all
|
||||
for i := 0; i < 2; i++ {
|
||||
coordinator.RemoveImage(id)
|
||||
for i := 8; i < 10; i++ {
|
||||
coordinator.RemoveImage(id, callerIDs[i])
|
||||
}
|
||||
|
||||
// Check the reference count
|
||||
if r := coordinator.imageRefCount[id]; r != 0 {
|
||||
t.Fatalf("Got reference count %d; want %d", r, 0)
|
||||
if references := coordinator.imageRefCount[id]; len(references) != 0 {
|
||||
t.Fatalf("Got reference count %d; want %d", len(references), 0)
|
||||
}
|
||||
|
||||
// Check that only one delete happened
|
||||
@@ -165,29 +167,30 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
|
||||
|
||||
// Create a coordinator
|
||||
coordinator := NewDockerCoordinator(config)
|
||||
callerID := structs.GenerateUUID()
|
||||
|
||||
// Pull image
|
||||
id, _ := coordinator.PullImage(image, nil)
|
||||
id, _ := coordinator.PullImage(image, nil, callerID)
|
||||
|
||||
// Check the reference count
|
||||
if r := coordinator.imageRefCount[id]; r != 1 {
|
||||
t.Fatalf("Got reference count %d; want %d", r, 10)
|
||||
if references := coordinator.imageRefCount[id]; len(references) != 1 {
|
||||
t.Fatalf("Got reference count %d; want %d", len(references), 1)
|
||||
}
|
||||
|
||||
// Remove image
|
||||
coordinator.RemoveImage(id)
|
||||
coordinator.RemoveImage(id, callerID)
|
||||
|
||||
// Check the reference count
|
||||
if r := coordinator.imageRefCount[id]; r != 0 {
|
||||
t.Fatalf("Got reference count %d; want %d", r, 0)
|
||||
if references := coordinator.imageRefCount[id]; len(references) != 0 {
|
||||
t.Fatalf("Got reference count %d; want %d", len(references), 0)
|
||||
}
|
||||
|
||||
// Pull image again within delay
|
||||
id, _ = coordinator.PullImage(image, nil)
|
||||
id, _ = coordinator.PullImage(image, nil, callerID)
|
||||
|
||||
// Check the reference count
|
||||
if r := coordinator.imageRefCount[id]; r != 1 {
|
||||
t.Fatalf("Got reference count %d; want %d", r, 0)
|
||||
if references := coordinator.imageRefCount[id]; len(references) != 1 {
|
||||
t.Fatalf("Got reference count %d; want %d", len(references), 1)
|
||||
}
|
||||
|
||||
// Check that only no delete happened
|
||||
@@ -211,17 +214,18 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) {
|
||||
|
||||
// Create a coordinator
|
||||
coordinator := NewDockerCoordinator(config)
|
||||
callerID := structs.GenerateUUID()
|
||||
|
||||
// Pull image
|
||||
id, _ := coordinator.PullImage(image, nil)
|
||||
id, _ := coordinator.PullImage(image, nil, callerID)
|
||||
|
||||
// Check the reference count
|
||||
if r := coordinator.imageRefCount[id]; r != 0 {
|
||||
t.Fatalf("Got reference count %d; want %d", r, 10)
|
||||
if references := coordinator.imageRefCount[id]; len(references) != 0 {
|
||||
t.Fatalf("Got reference count %d; want %d", len(references), 0)
|
||||
}
|
||||
|
||||
// Remove image
|
||||
coordinator.RemoveImage(id)
|
||||
coordinator.RemoveImage(id, callerID)
|
||||
|
||||
// Check that only no delete happened
|
||||
if removes := mock.removed[id]; removes != 0 {
|
||||
|
||||
@@ -1177,7 +1177,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
|
||||
}
|
||||
|
||||
alloc := mock.Alloc()
|
||||
execCtx := NewExecContext(taskDir, alloc.ID)
|
||||
execCtx := NewExecContext(taskDir)
|
||||
cleanup := func() {
|
||||
allocDir.Destroy()
|
||||
if filepath.IsAbs(hostpath) {
|
||||
@@ -1195,7 +1195,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
|
||||
emitter := func(m string, args ...interface{}) {
|
||||
logger.Printf("[EVENT] "+m, args...)
|
||||
}
|
||||
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv, emitter)
|
||||
driverCtx := NewDriverContext(task.Name, alloc.ID, cfg, cfg.Node, testLogger(), taskEnv, emitter)
|
||||
driver := NewDockerDriver(driverCtx)
|
||||
copyImage(t, taskDir, "busybox.tar")
|
||||
|
||||
|
||||
@@ -201,6 +201,7 @@ type LogEventFn func(message string, args ...interface{})
|
||||
// each time we do it. Used in conjection with Factory, above.
|
||||
type DriverContext struct {
|
||||
taskName string
|
||||
allocID string
|
||||
config *config.Config
|
||||
logger *log.Logger
|
||||
node *structs.Node
|
||||
@@ -219,10 +220,11 @@ func NewEmptyDriverContext() *DriverContext {
|
||||
// This enables other packages to create DriverContexts but keeps the fields
|
||||
// private to the driver. If we want to change this later we can gorename all of
|
||||
// the fields in DriverContext.
|
||||
func NewDriverContext(taskName string, config *config.Config, node *structs.Node,
|
||||
func NewDriverContext(taskName, allocID string, config *config.Config, node *structs.Node,
|
||||
logger *log.Logger, taskEnv *env.TaskEnvironment, eventEmitter LogEventFn) *DriverContext {
|
||||
return &DriverContext{
|
||||
taskName: taskName,
|
||||
allocID: allocID,
|
||||
config: config,
|
||||
node: node,
|
||||
logger: logger,
|
||||
@@ -258,16 +260,12 @@ type DriverHandle interface {
|
||||
type ExecContext struct {
|
||||
// TaskDir contains information about the task directory structure.
|
||||
TaskDir *allocdir.TaskDir
|
||||
|
||||
// Alloc ID
|
||||
AllocID string
|
||||
}
|
||||
|
||||
// NewExecContext is used to create a new execution context
|
||||
func NewExecContext(td *allocdir.TaskDir, allocID string) *ExecContext {
|
||||
func NewExecContext(td *allocdir.TaskDir) *ExecContext {
|
||||
return &ExecContext{
|
||||
TaskDir: td,
|
||||
AllocID: allocID,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -110,7 +110,7 @@ func testDriverContexts(t *testing.T, task *structs.Task) *testContext {
|
||||
return nil
|
||||
}
|
||||
|
||||
execCtx := NewExecContext(td, alloc.ID)
|
||||
execCtx := NewExecContext(td)
|
||||
|
||||
taskEnv, err := GetTaskEnv(td, cfg.Node, task, alloc, cfg, "")
|
||||
if err != nil {
|
||||
@@ -123,7 +123,7 @@ func testDriverContexts(t *testing.T, task *structs.Task) *testContext {
|
||||
emitter := func(m string, args ...interface{}) {
|
||||
logger.Printf("[EVENT] "+m, args...)
|
||||
}
|
||||
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, logger, taskEnv, emitter)
|
||||
driverCtx := NewDriverContext(task.Name, alloc.ID, cfg, cfg.Node, logger, taskEnv, emitter)
|
||||
|
||||
return &testContext{allocDir, driverCtx, execCtx}
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
Driver: "exec",
|
||||
AllocID: ctx.AllocID,
|
||||
AllocID: d.DriverContext.allocID,
|
||||
LogDir: ctx.TaskDir.LogDir,
|
||||
TaskDir: ctx.TaskDir.Dir,
|
||||
Task: task,
|
||||
|
||||
@@ -248,7 +248,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
Driver: "java",
|
||||
AllocID: ctx.AllocID,
|
||||
AllocID: d.DriverContext.allocID,
|
||||
Task: task,
|
||||
TaskDir: ctx.TaskDir.Dir,
|
||||
LogDir: ctx.TaskDir.LogDir,
|
||||
|
||||
@@ -186,7 +186,7 @@ func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
||||
lxcPath = path
|
||||
}
|
||||
|
||||
containerName := fmt.Sprintf("%s-%s", task.Name, ctx.AllocID)
|
||||
containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID)
|
||||
c, err := lxc.NewContainer(containerName, lxcPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to initialize container: %v", err)
|
||||
|
||||
@@ -102,7 +102,7 @@ func TestLxcDriver_Start_Wait(t *testing.T) {
|
||||
})
|
||||
|
||||
// Look for mounted directories in their proper location
|
||||
containerName := fmt.Sprintf("%s-%s", task.Name, ctx.ExecCtx.AllocID)
|
||||
containerName := fmt.Sprintf("%s-%s", task.Name, ctx.DriverCtx.allocID)
|
||||
for _, mnt := range []string{"alloc", "local", "secrets"} {
|
||||
fullpath := filepath.Join(lxcHandle.lxcPath, containerName, "rootfs", mnt)
|
||||
stat, err := os.Stat(fullpath)
|
||||
|
||||
@@ -238,7 +238,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
Driver: "qemu",
|
||||
AllocID: ctx.AllocID,
|
||||
AllocID: d.DriverContext.allocID,
|
||||
Task: task,
|
||||
TaskDir: ctx.TaskDir.Dir,
|
||||
LogDir: ctx.TaskDir.LogDir,
|
||||
|
||||
@@ -129,7 +129,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
Driver: "raw_exec",
|
||||
AllocID: ctx.AllocID,
|
||||
AllocID: d.DriverContext.allocID,
|
||||
Task: task,
|
||||
TaskDir: ctx.TaskDir.Dir,
|
||||
LogDir: ctx.TaskDir.LogDir,
|
||||
|
||||
@@ -257,17 +257,17 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
||||
sanitizedName := strings.Replace(task.Name, "_", "-", -1)
|
||||
|
||||
// Mount /alloc
|
||||
allocVolName := fmt.Sprintf("%s-%s-alloc", ctx.AllocID, sanitizedName)
|
||||
allocVolName := fmt.Sprintf("%s-%s-alloc", d.DriverContext.allocID, sanitizedName)
|
||||
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", allocVolName, ctx.TaskDir.SharedAllocDir))
|
||||
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", allocVolName, allocdir.SharedAllocContainerPath))
|
||||
|
||||
// Mount /local
|
||||
localVolName := fmt.Sprintf("%s-%s-local", ctx.AllocID, sanitizedName)
|
||||
localVolName := fmt.Sprintf("%s-%s-local", d.DriverContext.allocID, sanitizedName)
|
||||
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", localVolName, ctx.TaskDir.LocalDir))
|
||||
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", localVolName, allocdir.TaskLocalContainerPath))
|
||||
|
||||
// Mount /secrets
|
||||
secretsVolName := fmt.Sprintf("%s-%s-secrets", ctx.AllocID, sanitizedName)
|
||||
secretsVolName := fmt.Sprintf("%s-%s-secrets", d.DriverContext.allocID, sanitizedName)
|
||||
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", secretsVolName, ctx.TaskDir.SecretsDir))
|
||||
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", secretsVolName, allocdir.TaskSecretsContainerPath))
|
||||
|
||||
@@ -281,7 +281,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
||||
if len(parts) != 2 {
|
||||
return nil, fmt.Errorf("invalid rkt volume: %q", rawvol)
|
||||
}
|
||||
volName := fmt.Sprintf("%s-%s-%d", ctx.AllocID, sanitizedName, i)
|
||||
volName := fmt.Sprintf("%s-%s-%d", d.DriverContext.allocID, sanitizedName, i)
|
||||
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", volName, parts[0]))
|
||||
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, parts[1]))
|
||||
}
|
||||
@@ -413,7 +413,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
Driver: "rkt",
|
||||
AllocID: ctx.AllocID,
|
||||
AllocID: d.DriverContext.allocID,
|
||||
Task: task,
|
||||
TaskDir: ctx.TaskDir.Dir,
|
||||
LogDir: ctx.TaskDir.LogDir,
|
||||
|
||||
@@ -280,7 +280,7 @@ func (r *TaskRunner) RestoreState() error {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx := driver.NewExecContext(r.taskDir, r.alloc.ID)
|
||||
ctx := driver.NewExecContext(r.taskDir)
|
||||
handle, err := d.Open(ctx, snap.HandleID)
|
||||
|
||||
// In the case it fails, we relaunch the task in the Run() method.
|
||||
@@ -378,7 +378,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
|
||||
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
|
||||
}
|
||||
|
||||
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env, eventEmitter)
|
||||
driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, env, eventEmitter)
|
||||
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
|
||||
@@ -1061,7 +1061,7 @@ func (r *TaskRunner) cleanup() {
|
||||
|
||||
res := r.getCreatedResources()
|
||||
|
||||
ctx := driver.NewExecContext(r.taskDir, r.alloc.ID)
|
||||
ctx := driver.NewExecContext(r.taskDir)
|
||||
attempts := 1
|
||||
var cleanupErr error
|
||||
for retry := true; retry; attempts++ {
|
||||
@@ -1182,7 +1182,7 @@ func (r *TaskRunner) startTask() error {
|
||||
}
|
||||
|
||||
// Run prestart
|
||||
ctx := driver.NewExecContext(r.taskDir, r.alloc.ID)
|
||||
ctx := driver.NewExecContext(r.taskDir)
|
||||
res, err := drv.Prestart(ctx, r.task)
|
||||
|
||||
// Merge newly created resources into previously created resources
|
||||
|
||||
Reference in New Issue
Block a user