Use rkt prepare + run-prepared instead of run.

The rkt driver currently executes run and asks that the pod UUID is
written to a file that is then polled for changes for up to five
seconds. Many container fetches will take longer than this, so this
method will often not be able to track the pod UUID reliably.

To avoid this problem, rkt allows pods to be first prepared, which will
return their UUID, and then run as a second invocation.

Here we convert the rkt driver's Start method to use this method
instead. This way, the UUID will always be tracked correctly.
This commit is contained in:
Lasse Dalegaard
2017-09-20 15:02:42 +02:00
parent a1e08240a1
commit 3dafacdd72

View File

@@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"os"
@@ -54,9 +53,6 @@ const (
// rktCmd is the command rkt is installed as.
rktCmd = "rkt"
// rktUuidDeadline is how long to wait for the uuid file to be written
rktUuidDeadline = 5 * time.Second
)
// RktDriver is a driver for running images via Rkt
@@ -247,8 +243,8 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
// ACI image
img := driverConfig.ImageName
// Build the command.
cmdArgs := make([]string, 0, 50)
// Global arguments given to both prepare and run-prepared
globalArgs := make([]string, 0, 50)
// Add debug option to rkt command.
debug := driverConfig.Debug
@@ -274,43 +270,44 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
// if we have a selective insecure_options, prefer them
// insecure options are rkt's global argument, so we do this before the actual "run"
if len(driverConfig.InsecureOptions) > 0 {
cmdArgs = append(cmdArgs, fmt.Sprintf("--insecure-options=%s", strings.Join(driverConfig.InsecureOptions, ",")))
globalArgs = append(globalArgs, fmt.Sprintf("--insecure-options=%s", strings.Join(driverConfig.InsecureOptions, ",")))
} else if insecure {
cmdArgs = append(cmdArgs, "--insecure-options=all")
globalArgs = append(globalArgs, "--insecure-options=all")
}
// debug is rkt's global argument, so add it before the actual "run"
cmdArgs = append(cmdArgs, fmt.Sprintf("--debug=%t", debug))
globalArgs = append(globalArgs, fmt.Sprintf("--debug=%t", debug))
cmdArgs = append(cmdArgs, "run")
prepareArgs := make([]string, 0, 50)
runArgs := make([]string, 0, 50)
prepareArgs = append(prepareArgs, globalArgs...)
prepareArgs = append(prepareArgs, "prepare")
runArgs = append(runArgs, globalArgs...)
runArgs = append(runArgs, "run-prepared")
// disable overlayfs
if driverConfig.NoOverlay {
cmdArgs = append(cmdArgs, "--no-overlay=true")
prepareArgs = append(prepareArgs, "--no-overlay=true")
}
// Write the UUID out to a file in the state dir so we can read it back
// in and access the pod by UUID from other commands
uuidPath := filepath.Join(ctx.TaskDir.Dir, "rkt.uuid")
cmdArgs = append(cmdArgs, fmt.Sprintf("--uuid-file-save=%s", uuidPath))
// Convert underscores to dashes in task names for use in volume names #2358
sanitizedName := strings.Replace(task.Name, "_", "-", -1)
// Mount /alloc
allocVolName := fmt.Sprintf("%s-%s-alloc", d.DriverContext.allocID, sanitizedName)
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", allocVolName, ctx.TaskDir.SharedAllocDir))
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", allocVolName, allocdir.SharedAllocContainerPath))
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", allocVolName, ctx.TaskDir.SharedAllocDir))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", allocVolName, allocdir.SharedAllocContainerPath))
// Mount /local
localVolName := fmt.Sprintf("%s-%s-local", d.DriverContext.allocID, sanitizedName)
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", localVolName, ctx.TaskDir.LocalDir))
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", localVolName, allocdir.TaskLocalContainerPath))
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", localVolName, ctx.TaskDir.LocalDir))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", localVolName, allocdir.TaskLocalContainerPath))
// Mount /secrets
secretsVolName := fmt.Sprintf("%s-%s-secrets", d.DriverContext.allocID, sanitizedName)
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", secretsVolName, ctx.TaskDir.SecretsDir))
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", secretsVolName, allocdir.TaskSecretsContainerPath))
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", secretsVolName, ctx.TaskDir.SecretsDir))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", secretsVolName, allocdir.TaskSecretsContainerPath))
// Mount arbitrary volumes if enabled
if len(driverConfig.Volumes) > 0 {
@@ -334,33 +331,34 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
return nil, fmt.Errorf("invalid rkt volume: %q", rawvol)
}
volName := fmt.Sprintf("%s-%s-%d", d.DriverContext.allocID, sanitizedName, i)
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%s", volName, parts[0], readOnly))
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, parts[1]))
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%s", volName, parts[0], readOnly))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, parts[1]))
}
}
cmdArgs = append(cmdArgs, img)
// Inject environment variables
for k, v := range ctx.TaskEnv.Map() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%s=%s", k, v))
prepareArgs = append(prepareArgs, fmt.Sprintf("--set-env=%s=%s", k, v))
}
// Image is set here, because the commands that follow apply to it
prepareArgs = append(prepareArgs, img)
// Check if the user has overridden the exec command.
if driverConfig.Command != "" {
cmdArgs = append(cmdArgs, fmt.Sprintf("--exec=%v", driverConfig.Command))
prepareArgs = append(prepareArgs, fmt.Sprintf("--exec=%v", driverConfig.Command))
}
// Add memory isolator
cmdArgs = append(cmdArgs, fmt.Sprintf("--memory=%vM", int64(task.Resources.MemoryMB)))
prepareArgs = append(prepareArgs, fmt.Sprintf("--memory=%vM", int64(task.Resources.MemoryMB)))
// Add CPU isolator
cmdArgs = append(cmdArgs, fmt.Sprintf("--cpu=%vm", int64(task.Resources.CPU)))
prepareArgs = append(prepareArgs, fmt.Sprintf("--cpu=%vm", int64(task.Resources.CPU)))
// Add DNS servers
if len(driverConfig.DNSServers) == 1 && (driverConfig.DNSServers[0] == "host" || driverConfig.DNSServers[0] == "none") {
// Special case single item lists with the special values "host" or "none"
cmdArgs = append(cmdArgs, fmt.Sprintf("--dns=%s", driverConfig.DNSServers[0]))
runArgs = append(runArgs, fmt.Sprintf("--dns=%s", driverConfig.DNSServers[0]))
} else {
for _, ip := range driverConfig.DNSServers {
if err := net.ParseIP(ip); err == nil {
@@ -368,20 +366,20 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
d.logger.Printf("[DEBUG] driver.rkt: %v", msg)
return nil, msg
} else {
cmdArgs = append(cmdArgs, fmt.Sprintf("--dns=%s", ip))
runArgs = append(runArgs, fmt.Sprintf("--dns=%s", ip))
}
}
}
// set DNS search domains
for _, domain := range driverConfig.DNSSearchDomains {
cmdArgs = append(cmdArgs, fmt.Sprintf("--dns-search=%s", domain))
runArgs = append(runArgs, fmt.Sprintf("--dns-search=%s", domain))
}
// set network
network := strings.Join(driverConfig.Net, ",")
if network != "" {
cmdArgs = append(cmdArgs, fmt.Sprintf("--net=%s", network))
runArgs = append(runArgs, fmt.Sprintf("--net=%s", network))
}
// Setup port mapping and exposed ports
@@ -407,7 +405,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
d.logger.Printf("[DEBUG] driver.rkt: exposed port %s", containerPort)
// Add port option to rkt run arguments. rkt allows multiple port args
cmdArgs = append(cmdArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
prepareArgs = append(prepareArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
}
for _, port := range network.DynamicPorts {
@@ -425,7 +423,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
d.logger.Printf("[DEBUG] driver.rkt: exposed port %s", containerPort)
// Add port option to rkt run arguments. rkt allows multiple port args
cmdArgs = append(cmdArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
prepareArgs = append(prepareArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
}
}
@@ -436,11 +434,11 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
// Need to start arguments with "--"
if len(parsed) > 0 {
cmdArgs = append(cmdArgs, "--")
prepareArgs = append(prepareArgs, "--")
}
for _, arg := range parsed {
cmdArgs = append(cmdArgs, fmt.Sprintf("%v", arg))
prepareArgs = append(prepareArgs, fmt.Sprintf("%v", arg))
}
}
@@ -455,6 +453,24 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
return nil, err
}
absPath, err := GetAbsolutePath(rktCmd)
if err != nil {
return nil, err
}
var outBuf, errBuf bytes.Buffer
cmd := exec.Command(rktCmd, prepareArgs...)
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
d.logger.Printf("[DEBUG] driver.rkt: preparing pod %q for task %q with: %v", img, d.taskName, prepareArgs)
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("Error preparing rkt pod: %s\n\nOutput: %s\n\nError: %s",
err, outBuf.String(), errBuf.String())
}
uuid := strings.TrimSpace(outBuf.String())
d.logger.Printf("[DEBUG] driver.rkt: pod %q for task %q prepared, UUID is: %s", img, d.taskName, uuid)
runArgs = append(runArgs, uuid)
// The task's environment is set via --set-env flags above, but the rkt
// command itself needs an evironment with PATH set to find iptables.
eb := env.NewEmptyBuilder()
@@ -473,14 +489,9 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
return nil, fmt.Errorf("failed to set executor context: %v", err)
}
absPath, err := GetAbsolutePath(rktCmd)
if err != nil {
return nil, err
}
execCmd := &executor.ExecCommand{
Cmd: absPath,
Args: cmdArgs,
Args: runArgs,
User: task.User,
}
ps, err := execIntf.LaunchCmd(execCmd)
@@ -489,25 +500,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
return nil, err
}
// Wait for UUID file to get written
uuid := ""
deadline := time.Now().Add(rktUuidDeadline)
var lastErr error
for time.Now().Before(deadline) {
if uuidBytes, err := ioutil.ReadFile(uuidPath); err != nil {
lastErr = err
} else {
uuid = string(uuidBytes)
break
}
time.Sleep(400 * time.Millisecond)
}
if uuid == "" {
d.logger.Printf("[WARN] driver.rkt: reading uuid from %q failed; unable to run script checks for task %q. Last error: %v",
uuidPath, d.taskName, lastErr)
}
d.logger.Printf("[DEBUG] driver.rkt: started ACI %q (UUID: %s) for task %q with: %v", img, uuid, d.taskName, cmdArgs)
d.logger.Printf("[DEBUG] driver.rkt: started ACI %q (UUID: %s) for task %q with: %v", img, uuid, d.taskName, runArgs)
maxKill := d.DriverContext.config.MaxKillTimeout
h := &rktHandle{
uuid: uuid,