mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 19:05:42 +03:00
Interpolate everything that is a string
This commit is contained in:
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/hashicorp/go-plugin"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver/env"
|
||||
"github.com/hashicorp/nomad/client/driver/executor"
|
||||
dstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
@@ -138,20 +139,70 @@ func (c *DockerDriverConfig) Validate() error {
|
||||
|
||||
// NewDockerDriverConfig returns a docker driver config by parsing the HCL
|
||||
// config
|
||||
func NewDockerDriverConfig(task *structs.Task) (*DockerDriverConfig, error) {
|
||||
var driverConfig DockerDriverConfig
|
||||
driverConfig.SSL = true
|
||||
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
|
||||
func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*DockerDriverConfig, error) {
|
||||
var dconf DockerDriverConfig
|
||||
|
||||
// Default to SSL
|
||||
dconf.SSL = true
|
||||
|
||||
if err := mapstructure.WeakDecode(task.Config, &dconf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.Contains(driverConfig.ImageName, "https://") {
|
||||
driverConfig.ImageName = strings.Replace(driverConfig.ImageName, "https://", "", 1)
|
||||
}
|
||||
|
||||
if err := driverConfig.Validate(); err != nil {
|
||||
// Interpolate everthing that is a string
|
||||
dconf.ImageName = env.ReplaceEnv(dconf.ImageName)
|
||||
dconf.Command = env.ReplaceEnv(dconf.Command)
|
||||
dconf.IpcMode = env.ReplaceEnv(dconf.IpcMode)
|
||||
dconf.NetworkMode = env.ReplaceEnv(dconf.NetworkMode)
|
||||
dconf.PidMode = env.ReplaceEnv(dconf.PidMode)
|
||||
dconf.UTSMode = env.ReplaceEnv(dconf.UTSMode)
|
||||
dconf.Hostname = env.ReplaceEnv(dconf.Hostname)
|
||||
dconf.WorkDir = env.ReplaceEnv(dconf.WorkDir)
|
||||
dconf.Volumes = env.ParseAndReplace(dconf.Volumes)
|
||||
dconf.DNSServers = env.ParseAndReplace(dconf.DNSServers)
|
||||
dconf.DNSSearchDomains = env.ParseAndReplace(dconf.DNSSearchDomains)
|
||||
dconf.LoadImages = env.ParseAndReplace(dconf.LoadImages)
|
||||
|
||||
for _, m := range dconf.LabelsRaw {
|
||||
for k, v := range m {
|
||||
delete(m, k)
|
||||
m[env.ReplaceEnv(k)] = env.ReplaceEnv(v)
|
||||
}
|
||||
}
|
||||
|
||||
for _, a := range dconf.Auth {
|
||||
a.Username = env.ReplaceEnv(a.Username)
|
||||
a.Password = env.ReplaceEnv(a.Password)
|
||||
a.Email = env.ReplaceEnv(a.Email)
|
||||
a.ServerAddress = env.ReplaceEnv(a.ServerAddress)
|
||||
}
|
||||
|
||||
for _, l := range dconf.Logging {
|
||||
l.Type = env.ReplaceEnv(l.Type)
|
||||
for _, c := range l.ConfigRaw {
|
||||
for k, v := range c {
|
||||
delete(c, k)
|
||||
c[env.ReplaceEnv(k)] = env.ReplaceEnv(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, m := range dconf.PortMapRaw {
|
||||
for k, v := range m {
|
||||
delete(m, k)
|
||||
m[env.ReplaceEnv(k)] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Remove any http
|
||||
if strings.Contains(dconf.ImageName, "https://") {
|
||||
dconf.ImageName = strings.Replace(dconf.ImageName, "https://", "", 1)
|
||||
}
|
||||
|
||||
if err := dconf.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &driverConfig, nil
|
||||
return &dconf, nil
|
||||
}
|
||||
|
||||
type dockerPID struct {
|
||||
@@ -277,6 +328,134 @@ func (d *DockerDriver) Abilities() DriverAbilities {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
|
||||
// Set environment variables.
|
||||
d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath).
|
||||
SetTaskLocalDir(allocdir.TaskLocalContainerPath).SetSecretsDir(allocdir.TaskSecretsContainerPath).Build()
|
||||
|
||||
d.logger.Printf("%#v", d.taskEnv.EnvMap())
|
||||
driverConfig, err := NewDockerDriverConfig(task, d.taskEnv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true)
|
||||
|
||||
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
|
||||
}
|
||||
|
||||
// Initialize docker API clients
|
||||
client, waitClient, err := d.dockerClients()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err)
|
||||
}
|
||||
|
||||
if err := d.createImage(driverConfig, client, taskDir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
image := driverConfig.ImageName
|
||||
// Now that we have the image we can get the image id
|
||||
dockerImage, err := client.InspectImage(image)
|
||||
if err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: failed getting image id for %s: %s", image, err)
|
||||
return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
|
||||
}
|
||||
d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID)
|
||||
|
||||
bin, err := discover.NomadExecutable()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
|
||||
}
|
||||
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
|
||||
pluginConfig := &plugin.ClientConfig{
|
||||
Cmd: exec.Command(bin, "executor", pluginLogFile),
|
||||
}
|
||||
|
||||
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
Task: task,
|
||||
Driver: "docker",
|
||||
AllocDir: ctx.AllocDir,
|
||||
AllocID: ctx.AllocID,
|
||||
PortLowerBound: d.config.ClientMinPort,
|
||||
PortUpperBound: d.config.ClientMaxPort,
|
||||
}
|
||||
if err := exec.SetContext(executorCtx); err != nil {
|
||||
pluginClient.Kill()
|
||||
return nil, fmt.Errorf("failed to set executor context: %v", err)
|
||||
}
|
||||
|
||||
// Only launch syslog server if we're going to use it!
|
||||
syslogAddr := ""
|
||||
if runtime.GOOS == "darwin" && len(driverConfig.Logging) == 0 {
|
||||
d.logger.Printf("[DEBUG] driver.docker: disabling syslog driver as Docker for Mac workaround")
|
||||
} else if len(driverConfig.Logging) == 0 || driverConfig.Logging[0].Type == "syslog" {
|
||||
ss, err := exec.LaunchSyslogServer()
|
||||
if err != nil {
|
||||
pluginClient.Kill()
|
||||
return nil, fmt.Errorf("failed to start syslog collector: %v", err)
|
||||
}
|
||||
syslogAddr = ss.Addr
|
||||
}
|
||||
|
||||
config, err := d.createContainerConfig(ctx, task, driverConfig, syslogAddr)
|
||||
if err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err)
|
||||
pluginClient.Kill()
|
||||
return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err)
|
||||
}
|
||||
|
||||
container, rerr := d.createContainer(config)
|
||||
if rerr != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: failed to create container: %s", rerr)
|
||||
pluginClient.Kill()
|
||||
rerr.Err = fmt.Sprintf("Failed to create container: %s", rerr.Err)
|
||||
return nil, rerr
|
||||
}
|
||||
|
||||
d.logger.Printf("[INFO] driver.docker: created container %s", container.ID)
|
||||
|
||||
// Start the container
|
||||
err = client.StartContainer(container.ID, container.HostConfig)
|
||||
if err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err)
|
||||
pluginClient.Kill()
|
||||
return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err)
|
||||
}
|
||||
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)
|
||||
|
||||
// Return a driver handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &DockerHandle{
|
||||
client: client,
|
||||
waitClient: waitClient,
|
||||
executor: exec,
|
||||
pluginClient: pluginClient,
|
||||
cleanupImage: cleanupImage,
|
||||
logger: d.logger,
|
||||
imageID: dockerImage.ID,
|
||||
containerID: container.ID,
|
||||
version: d.config.Version,
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
doneCh: make(chan bool),
|
||||
waitCh: make(chan *dstructs.WaitResult, 1),
|
||||
}
|
||||
if err := exec.SyncServices(consulContext(d.config, container.ID)); err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err)
|
||||
}
|
||||
go h.collectStats()
|
||||
go h.run()
|
||||
return h, nil
|
||||
}
|
||||
|
||||
// dockerClients creates two *docker.Client, one for long running operations and
|
||||
// the other for shorter operations. In test / dev mode we can use ENV vars to
|
||||
// connect to the docker daemon. In production mode we will read docker.endpoint
|
||||
@@ -410,9 +589,7 @@ func (d *DockerDriver) containerBinds(driverConfig *DockerDriverConfig, alloc *a
|
||||
|
||||
volumesEnabled := d.config.ReadBoolDefault(dockerVolumesConfigOption, dockerVolumesConfigDefault)
|
||||
|
||||
// Expand environment variables in volume paths
|
||||
expandedVols := d.taskEnv.ParseAndReplace(driverConfig.Volumes)
|
||||
for _, userbind := range expandedVols {
|
||||
for _, userbind := range driverConfig.Volumes {
|
||||
parts := strings.Split(userbind, ":")
|
||||
if len(parts) < 2 {
|
||||
return nil, fmt.Errorf("invalid docker volume: %q", userbind)
|
||||
@@ -463,11 +640,6 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
|
||||
return c, err
|
||||
}
|
||||
|
||||
// Set environment variables.
|
||||
d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath)
|
||||
d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
|
||||
d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath)
|
||||
|
||||
config := &docker.Config{
|
||||
Image: driverConfig.ImageName,
|
||||
Hostname: driverConfig.Hostname,
|
||||
@@ -770,129 +942,6 @@ func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docke
|
||||
return errors.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
|
||||
driverConfig, err := NewDockerDriverConfig(task)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true)
|
||||
|
||||
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
|
||||
}
|
||||
|
||||
// Initialize docker API clients
|
||||
client, waitClient, err := d.dockerClients()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err)
|
||||
}
|
||||
|
||||
if err := d.createImage(driverConfig, client, taskDir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
image := driverConfig.ImageName
|
||||
// Now that we have the image we can get the image id
|
||||
dockerImage, err := client.InspectImage(image)
|
||||
if err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: failed getting image id for %s: %s", image, err)
|
||||
return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
|
||||
}
|
||||
d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID)
|
||||
|
||||
bin, err := discover.NomadExecutable()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
|
||||
}
|
||||
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
|
||||
pluginConfig := &plugin.ClientConfig{
|
||||
Cmd: exec.Command(bin, "executor", pluginLogFile),
|
||||
}
|
||||
|
||||
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
Task: task,
|
||||
Driver: "docker",
|
||||
AllocDir: ctx.AllocDir,
|
||||
AllocID: ctx.AllocID,
|
||||
PortLowerBound: d.config.ClientMinPort,
|
||||
PortUpperBound: d.config.ClientMaxPort,
|
||||
}
|
||||
if err := exec.SetContext(executorCtx); err != nil {
|
||||
pluginClient.Kill()
|
||||
return nil, fmt.Errorf("failed to set executor context: %v", err)
|
||||
}
|
||||
|
||||
// Only launch syslog server if we're going to use it!
|
||||
syslogAddr := ""
|
||||
if runtime.GOOS == "darwin" && len(driverConfig.Logging) == 0 {
|
||||
d.logger.Printf("[DEBUG] driver.docker: disabling syslog driver as Docker for Mac workaround")
|
||||
} else if len(driverConfig.Logging) == 0 || driverConfig.Logging[0].Type == "syslog" {
|
||||
ss, err := exec.LaunchSyslogServer()
|
||||
if err != nil {
|
||||
pluginClient.Kill()
|
||||
return nil, fmt.Errorf("failed to start syslog collector: %v", err)
|
||||
}
|
||||
syslogAddr = ss.Addr
|
||||
}
|
||||
|
||||
config, err := d.createContainerConfig(ctx, task, driverConfig, syslogAddr)
|
||||
if err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err)
|
||||
pluginClient.Kill()
|
||||
return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err)
|
||||
}
|
||||
|
||||
container, rerr := d.createContainer(config)
|
||||
if rerr != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: failed to create container: %s", rerr)
|
||||
pluginClient.Kill()
|
||||
rerr.Err = fmt.Sprintf("Failed to create container: %s", rerr.Err)
|
||||
return nil, rerr
|
||||
}
|
||||
|
||||
d.logger.Printf("[INFO] driver.docker: created container %s", container.ID)
|
||||
|
||||
// Start the container
|
||||
err = client.StartContainer(container.ID, container.HostConfig)
|
||||
if err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err)
|
||||
pluginClient.Kill()
|
||||
return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err)
|
||||
}
|
||||
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)
|
||||
|
||||
// Return a driver handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &DockerHandle{
|
||||
client: client,
|
||||
waitClient: waitClient,
|
||||
executor: exec,
|
||||
pluginClient: pluginClient,
|
||||
cleanupImage: cleanupImage,
|
||||
logger: d.logger,
|
||||
imageID: dockerImage.ID,
|
||||
containerID: container.ID,
|
||||
version: d.config.Version,
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
doneCh: make(chan bool),
|
||||
waitCh: make(chan *dstructs.WaitResult, 1),
|
||||
}
|
||||
if err := exec.SyncServices(consulContext(d.config, container.ID)); err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err)
|
||||
}
|
||||
go h.collectStats()
|
||||
go h.run()
|
||||
return h, nil
|
||||
}
|
||||
|
||||
// createContainer creates the container given the passed configuration. It
|
||||
// attempts to handle any transient Docker errors.
|
||||
func (d *DockerDriver) createContainer(config docker.CreateContainerOptions) (*docker.Container, *structs.RecoverableError) {
|
||||
|
||||
Reference in New Issue
Block a user