mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 19:05:42 +03:00
Updating kill timeout adheres to operator specified maximum
This commit is contained in:
@@ -83,11 +83,12 @@ func (c *DockerDriverConfig) Validate() error {
|
||||
}
|
||||
|
||||
type dockerPID struct {
|
||||
Version string
|
||||
ImageID string
|
||||
ContainerID string
|
||||
KillTimeout time.Duration
|
||||
PluginConfig *PluginReattachConfig
|
||||
Version string
|
||||
ImageID string
|
||||
ContainerID string
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
PluginConfig *PluginReattachConfig
|
||||
}
|
||||
|
||||
type DockerHandle struct {
|
||||
@@ -101,6 +102,7 @@ type DockerHandle struct {
|
||||
containerID string
|
||||
version string
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
@@ -600,6 +602,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
||||
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)
|
||||
|
||||
// Return a driver handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &DockerHandle{
|
||||
client: client,
|
||||
logCollector: logCollector,
|
||||
@@ -610,7 +613,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
||||
imageID: dockerImage.ID,
|
||||
containerID: container.ID,
|
||||
version: d.config.Version,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
@@ -679,6 +683,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
|
||||
containerID: pid.ContainerID,
|
||||
version: pid.Version,
|
||||
killTimeout: pid.KillTimeout,
|
||||
maxKillTimeout: pid.MaxKillTimeout,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
@@ -689,11 +694,12 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
|
||||
func (h *DockerHandle) ID() string {
|
||||
// Return a handle to the PID
|
||||
pid := dockerPID{
|
||||
Version: h.version,
|
||||
ImageID: h.imageID,
|
||||
ContainerID: h.containerID,
|
||||
KillTimeout: h.killTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
Version: h.version,
|
||||
ImageID: h.imageID,
|
||||
ContainerID: h.containerID,
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
}
|
||||
data, err := json.Marshal(pid)
|
||||
if err != nil {
|
||||
@@ -712,7 +718,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
|
||||
|
||||
func (h *DockerHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil {
|
||||
h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err)
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"log"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
@@ -84,24 +83,6 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node
|
||||
}
|
||||
}
|
||||
|
||||
// KillTimeout returns the timeout that should be used for the task between
|
||||
// signaling and killing the task.
|
||||
func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration {
|
||||
max := d.config.MaxKillTimeout.Nanoseconds()
|
||||
desired := task.KillTimeout.Nanoseconds()
|
||||
|
||||
// Make the minimum time between signal and kill, 1 second.
|
||||
if desired == 0 {
|
||||
desired = (1 * time.Second).Nanoseconds()
|
||||
}
|
||||
|
||||
if desired < max {
|
||||
return time.Duration(desired)
|
||||
}
|
||||
|
||||
return d.config.MaxKillTimeout
|
||||
}
|
||||
|
||||
// DriverHandle is an opaque handle into a driver used for task
|
||||
// manipulation
|
||||
type DriverHandle interface {
|
||||
|
||||
@@ -42,6 +42,7 @@ type execHandle struct {
|
||||
userPid int
|
||||
allocDir *allocdir.AllocDir
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
logger *log.Logger
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
@@ -134,13 +135,15 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
d.logger.Printf("[DEBUG] driver.exec: started process via plugin with pid: %v", ps.Pid)
|
||||
|
||||
// Return a driver handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &execHandle{
|
||||
pluginClient: pluginClient,
|
||||
userPid: ps.Pid,
|
||||
executor: exec,
|
||||
allocDir: ctx.AllocDir,
|
||||
isolationConfig: ps.IsolationConfig,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
logger: d.logger,
|
||||
version: d.config.Version,
|
||||
doneCh: make(chan struct{}),
|
||||
@@ -153,6 +156,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
type execId struct {
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
UserPid int
|
||||
TaskDir string
|
||||
AllocDir *allocdir.AllocDir
|
||||
@@ -198,6 +202,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
||||
logger: d.logger,
|
||||
version: id.Version,
|
||||
killTimeout: id.KillTimeout,
|
||||
maxKillTimeout: id.MaxKillTimeout,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
@@ -209,6 +214,7 @@ func (h *execHandle) ID() string {
|
||||
id := execId{
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
AllocDir: h.allocDir,
|
||||
@@ -228,7 +234,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {
|
||||
|
||||
func (h *execHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
|
||||
// Update is not possible
|
||||
|
||||
@@ -47,13 +47,14 @@ type javaHandle struct {
|
||||
executor executor.Executor
|
||||
isolationConfig *cstructs.IsolationConfig
|
||||
|
||||
taskDir string
|
||||
allocDir *allocdir.AllocDir
|
||||
killTimeout time.Duration
|
||||
version string
|
||||
logger *log.Logger
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
taskDir string
|
||||
allocDir *allocdir.AllocDir
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
version string
|
||||
logger *log.Logger
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// NewJavaDriver is used to create a new exec driver
|
||||
@@ -182,6 +183,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
d.logger.Printf("[DEBUG] driver.java: started process with pid: %v", ps.Pid)
|
||||
|
||||
// Return a driver handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &javaHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
@@ -189,7 +191,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
isolationConfig: ps.IsolationConfig,
|
||||
taskDir: taskDir,
|
||||
allocDir: ctx.AllocDir,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
version: d.config.Version,
|
||||
logger: d.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
@@ -210,6 +213,7 @@ func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool {
|
||||
type javaId struct {
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
PluginConfig *PluginReattachConfig
|
||||
IsolationConfig *cstructs.IsolationConfig
|
||||
TaskDir string
|
||||
@@ -257,6 +261,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
||||
logger: d.logger,
|
||||
version: id.Version,
|
||||
killTimeout: id.KillTimeout,
|
||||
maxKillTimeout: id.MaxKillTimeout,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
@@ -269,6 +274,7 @@ func (h *javaHandle) ID() string {
|
||||
id := javaId{
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
TaskDir: h.taskDir,
|
||||
@@ -289,7 +295,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {
|
||||
|
||||
func (h *javaHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
|
||||
// Update is not possible
|
||||
|
||||
@@ -44,15 +44,16 @@ type QemuDriverConfig struct {
|
||||
|
||||
// qemuHandle is returned from Start/Open as a handle to the PID
|
||||
type qemuHandle struct {
|
||||
pluginClient *plugin.Client
|
||||
userPid int
|
||||
executor executor.Executor
|
||||
allocDir *allocdir.AllocDir
|
||||
killTimeout time.Duration
|
||||
logger *log.Logger
|
||||
version string
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
pluginClient *plugin.Client
|
||||
userPid int
|
||||
executor executor.Executor
|
||||
allocDir *allocdir.AllocDir
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
logger *log.Logger
|
||||
version string
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// NewQemuDriver is used to create a new exec driver
|
||||
@@ -219,16 +220,18 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
d.logger.Printf("[INFO] Started new QemuVM: %s", vmID)
|
||||
|
||||
// Create and Return Handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &qemuHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
userPid: ps.Pid,
|
||||
allocDir: ctx.AllocDir,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
version: d.config.Version,
|
||||
logger: d.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
userPid: ps.Pid,
|
||||
allocDir: ctx.AllocDir,
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
version: d.config.Version,
|
||||
logger: d.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
|
||||
go h.run()
|
||||
@@ -236,11 +239,12 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
}
|
||||
|
||||
type qemuId struct {
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
UserPid int
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
UserPid int
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
}
|
||||
|
||||
func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
|
||||
@@ -264,15 +268,16 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
||||
|
||||
// Return a driver handle
|
||||
h := &qemuHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: executor,
|
||||
userPid: id.UserPid,
|
||||
allocDir: id.AllocDir,
|
||||
logger: d.logger,
|
||||
killTimeout: id.KillTimeout,
|
||||
version: id.Version,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executor: executor,
|
||||
userPid: id.UserPid,
|
||||
allocDir: id.AllocDir,
|
||||
logger: d.logger,
|
||||
killTimeout: id.KillTimeout,
|
||||
maxKillTimeout: id.MaxKillTimeout,
|
||||
version: id.Version,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
@@ -280,11 +285,12 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
||||
|
||||
func (h *qemuHandle) ID() string {
|
||||
id := qemuId{
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
AllocDir: h.allocDir,
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
AllocDir: h.allocDir,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(id)
|
||||
@@ -300,7 +306,7 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult {
|
||||
|
||||
func (h *qemuHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
|
||||
// Update is not possible
|
||||
|
||||
@@ -35,15 +35,16 @@ type RawExecDriver struct {
|
||||
|
||||
// rawExecHandle is returned from Start/Open as a handle to the PID
|
||||
type rawExecHandle struct {
|
||||
version string
|
||||
pluginClient *plugin.Client
|
||||
userPid int
|
||||
executor executor.Executor
|
||||
killTimeout time.Duration
|
||||
allocDir *allocdir.AllocDir
|
||||
logger *log.Logger
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
version string
|
||||
pluginClient *plugin.Client
|
||||
userPid int
|
||||
executor executor.Executor
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
allocDir *allocdir.AllocDir
|
||||
logger *log.Logger
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// NewRawExecDriver is used to create a new raw exec driver
|
||||
@@ -125,27 +126,30 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
|
||||
d.logger.Printf("[DEBUG] driver.raw_exec: started process with pid: %v", ps.Pid)
|
||||
|
||||
// Return a driver handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &rawExecHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
userPid: ps.Pid,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
allocDir: ctx.AllocDir,
|
||||
version: d.config.Version,
|
||||
logger: d.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
userPid: ps.Pid,
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
allocDir: ctx.AllocDir,
|
||||
version: d.config.Version,
|
||||
logger: d.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
}
|
||||
|
||||
type rawExecId struct {
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
UserPid int
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
UserPid int
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
}
|
||||
|
||||
func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
|
||||
@@ -168,15 +172,16 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
|
||||
|
||||
// Return a driver handle
|
||||
h := &rawExecHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: executor,
|
||||
userPid: id.UserPid,
|
||||
logger: d.logger,
|
||||
killTimeout: id.KillTimeout,
|
||||
allocDir: id.AllocDir,
|
||||
version: id.Version,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executor: executor,
|
||||
userPid: id.UserPid,
|
||||
logger: d.logger,
|
||||
killTimeout: id.KillTimeout,
|
||||
maxKillTimeout: id.MaxKillTimeout,
|
||||
allocDir: id.AllocDir,
|
||||
version: id.Version,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
@@ -184,11 +189,12 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
|
||||
|
||||
func (h *rawExecHandle) ID() string {
|
||||
id := rawExecId{
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
AllocDir: h.allocDir,
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
AllocDir: h.allocDir,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(id)
|
||||
@@ -204,7 +210,7 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult {
|
||||
|
||||
func (h *rawExecHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
|
||||
// Update is not possible
|
||||
|
||||
@@ -55,23 +55,25 @@ type RktDriverConfig struct {
|
||||
|
||||
// rktHandle is returned from Start/Open as a handle to the PID
|
||||
type rktHandle struct {
|
||||
pluginClient *plugin.Client
|
||||
executorPid int
|
||||
executor executor.Executor
|
||||
allocDir *allocdir.AllocDir
|
||||
logger *log.Logger
|
||||
killTimeout time.Duration
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
pluginClient *plugin.Client
|
||||
executorPid int
|
||||
executor executor.Executor
|
||||
allocDir *allocdir.AllocDir
|
||||
logger *log.Logger
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// rktPID is a struct to map the pid running the process to the vm image on
|
||||
// disk
|
||||
type rktPID struct {
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
ExecutorPid int
|
||||
KillTimeout time.Duration
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
ExecutorPid int
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
}
|
||||
|
||||
// NewRktDriver is used to create a new exec driver
|
||||
@@ -227,15 +229,17 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
||||
}
|
||||
|
||||
d.logger.Printf("[DEBUG] driver.rkt: started ACI %q with: %v", img, cmdArgs)
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &rktHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
executorPid: ps.Pid,
|
||||
allocDir: ctx.AllocDir,
|
||||
logger: d.logger,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
executorPid: ps.Pid,
|
||||
allocDir: ctx.AllocDir,
|
||||
logger: d.logger,
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
@@ -244,18 +248,18 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
||||
func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
|
||||
// Parse the handle
|
||||
pidBytes := []byte(strings.TrimPrefix(handleID, "Rkt:"))
|
||||
qpid := &rktPID{}
|
||||
if err := json.Unmarshal(pidBytes, qpid); err != nil {
|
||||
id := &rktPID{}
|
||||
if err := json.Unmarshal(pidBytes, id); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse Rkt handle '%s': %v", handleID, err)
|
||||
}
|
||||
|
||||
pluginConfig := &plugin.ClientConfig{
|
||||
Reattach: qpid.PluginConfig.PluginConfig(),
|
||||
Reattach: id.PluginConfig.PluginConfig(),
|
||||
}
|
||||
executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
|
||||
if err != nil {
|
||||
d.logger.Println("[ERROR] driver.rkt: error connecting to plugin so destroying plugin pid and user pid")
|
||||
if e := destroyPlugin(qpid.PluginConfig.Pid, qpid.ExecutorPid); e != nil {
|
||||
if e := destroyPlugin(id.PluginConfig.Pid, id.ExecutorPid); e != nil {
|
||||
d.logger.Printf("[ERROR] driver.rkt: error destroying plugin and executor pid: %v", e)
|
||||
}
|
||||
return nil, fmt.Errorf("error connecting to plugin: %v", err)
|
||||
@@ -263,14 +267,15 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error
|
||||
|
||||
// Return a driver handle
|
||||
h := &rktHandle{
|
||||
pluginClient: pluginClient,
|
||||
executorPid: qpid.ExecutorPid,
|
||||
allocDir: qpid.AllocDir,
|
||||
executor: executor,
|
||||
logger: d.logger,
|
||||
killTimeout: qpid.KillTimeout,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executorPid: id.ExecutorPid,
|
||||
allocDir: id.AllocDir,
|
||||
executor: executor,
|
||||
logger: d.logger,
|
||||
killTimeout: id.KillTimeout,
|
||||
maxKillTimeout: id.MaxKillTimeout,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
|
||||
go h.run()
|
||||
@@ -280,10 +285,11 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error
|
||||
func (h *rktHandle) ID() string {
|
||||
// Return a handle to the PID
|
||||
pid := &rktPID{
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
KillTimeout: h.killTimeout,
|
||||
ExecutorPid: h.executorPid,
|
||||
AllocDir: h.allocDir,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
ExecutorPid: h.executorPid,
|
||||
AllocDir: h.allocDir,
|
||||
}
|
||||
data, err := json.Marshal(pid)
|
||||
if err != nil {
|
||||
@@ -298,7 +304,7 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult {
|
||||
|
||||
func (h *rktHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
|
||||
// Update is not possible
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-plugin"
|
||||
@@ -109,3 +110,26 @@ func validateCommand(command, argField string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetKillTimeout returns the kill timeout to use given the tasks desired kill
|
||||
// timeout and the operator configured max kill timeout.
|
||||
func GetKillTimeout(desired, max time.Duration) time.Duration {
|
||||
maxNanos := max.Nanoseconds()
|
||||
desiredNanos := desired.Nanoseconds()
|
||||
|
||||
// Make the minimum time between signal and kill, 1 second.
|
||||
if desiredNanos <= 0 {
|
||||
desiredNanos = (1 * time.Second).Nanoseconds()
|
||||
}
|
||||
|
||||
// Protect against max not being set properly.
|
||||
if maxNanos <= 0 {
|
||||
maxNanos = (10 * time.Second).Nanoseconds()
|
||||
}
|
||||
|
||||
if desiredNanos < maxNanos {
|
||||
return time.Duration(desiredNanos)
|
||||
}
|
||||
|
||||
return max
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user