[gh-6980] Client: clean up old allocs before running new ones using the exec task driver. (#20500)

Whenever the "exec" task driver is being used, nomad runs a plug in that in time runs the task on a container under the hood. If by any circumstance the executor is killed, the task is reparented to the init service and wont be stopped by Nomad in case of a job updated or stop.

This commit introduces two mechanisms to avoid this behaviour:

* Adds signal catching and handling to the executor, so in case of a SIGTERM, the signal will also be passed on to the task.
* Adds a pre start clean up of the processes in the container, ensuring only the ones the executor runs are present at any given time.
This commit is contained in:
Juana De La Cuesta
2024-05-14 09:51:27 +02:00
committed by GitHub
parent 5b328d9adc
commit 169818b1bd
4 changed files with 182 additions and 1 deletions

View File

@@ -12,6 +12,7 @@ import (
"io"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"strings"
@@ -78,9 +79,31 @@ type LibcontainerExecutor struct {
userProc *libcontainer.Process
userProcExited chan interface{}
exitState *ProcessState
sigChan chan os.Signal
}
func (l *LibcontainerExecutor) catchSignals() {
l.logger.Trace("waiting for signals")
defer signal.Stop(l.sigChan)
defer close(l.sigChan)
signal.Notify(l.sigChan, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSEGV)
for {
signal := <-l.sigChan
if signal == syscall.SIGTERM || signal == syscall.SIGINT {
l.Shutdown("SIGINT", 0)
break
}
if l.container != nil {
l.container.Signal(signal, false)
}
}
}
func NewExecutorWithIsolation(logger hclog.Logger, compute cpustats.Compute) Executor {
sigch := make(chan os.Signal, 4)
le := &LibcontainerExecutor{
id: strings.ReplaceAll(uuid.Generate(), "-", "_"),
logger: logger.Named("isolated_executor"),
@@ -88,7 +111,11 @@ func NewExecutorWithIsolation(logger hclog.Logger, compute cpustats.Compute) Exe
totalCpuStats: cpustats.New(compute),
userCpuStats: cpustats.New(compute),
systemCpuStats: cpustats.New(compute),
sigChan: sigch,
}
go le.catchSignals()
le.processStats = procstats.New(compute, le)
return le
}
@@ -97,6 +124,34 @@ func (l *LibcontainerExecutor) ListProcesses() *set.Set[int] {
return procstats.List(l.command)
}
// cleanOldProcessesInCGroup kills processes that might ended up orphans when the
// executor was unexpectedly killed and nomad can't reconnect to them.
func (l *LibcontainerExecutor) cleanOldProcessesInCGroup(nomadRelativePath string) {
l.logger.Debug("looking for old processes", "path", nomadRelativePath)
root := cgroupslib.GetDefaultRoot()
orphansPIDs, err := cgroups.GetAllPids(filepath.Join(root, nomadRelativePath))
if err != nil {
l.logger.Error("unable to get orphaned task PIDs", "error", err)
return
}
for _, pid := range orphansPIDs {
l.logger.Info("killing orphaned process", "pid", pid)
// Avoid bringing down the whole node by mistake, very unlikely case,
// but it's better to be sure.
if pid == 1 {
continue
}
err := syscall.Kill(pid, syscall.SIGKILL)
if err != nil {
l.logger.Error("unable to send signal to process", "pid", pid, "error", err)
}
}
}
// Launch creates a new container in libcontainer and starts a new process with it
func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, error) {
l.logger.Trace("preparing to launch command", "command", command.Cmd, "args", strings.Join(command.Args, " "))
@@ -127,6 +182,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
return nil, fmt.Errorf("failed to configure container(%s): %v", l.id, err)
}
l.cleanOldProcessesInCGroup(containerCfg.Cgroups.Path)
container, err := factory.Create(l.id, containerCfg)
if err != nil {
return nil, fmt.Errorf("failed to create container(%s): %v", l.id, err)
@@ -166,6 +222,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
if command.User != "" {
process.User = command.User
}
l.userProc = process
l.totalCpuStats = cpustats.New(l.compute)
@@ -187,7 +244,6 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
// start a goroutine to wait on the process to complete, so Wait calls can
// be multiplexed
l.userProcExited = make(chan interface{})
go l.wait()
return &ProcessState{
@@ -779,6 +835,7 @@ func (l *LibcontainerExecutor) configureCG2(cfg *runc.Config, command *ExecComma
func (l *LibcontainerExecutor) newLibcontainerConfig(command *ExecCommand) (*runc.Config, error) {
cfg := &runc.Config{
ParentDeathSignal: 9,
Cgroups: &runc.Cgroup{
Resources: &runc.Resources{
MemorySwappiness: nil,