diff --git a/.changelog/20500.txt b/.changelog/20500.txt new file mode 100644 index 000000000..a272b0508 --- /dev/null +++ b/.changelog/20500.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: terminate old exec task processes before starting new ones, to avoid accidentally leaving running processes in case of an error +``` diff --git a/client/lib/cgroupslib/editor.go b/client/lib/cgroupslib/editor.go index e3f3db231..96ef95c4c 100644 --- a/client/lib/cgroupslib/editor.go +++ b/client/lib/cgroupslib/editor.go @@ -21,6 +21,10 @@ const ( root = "/sys/fs/cgroup" ) +func GetDefaultRoot() string { + return root +} + // OpenPath creates a handle for modifying cgroup interface files under // the given directory. // diff --git a/drivers/shared/executor/executor_linux.go b/drivers/shared/executor/executor_linux.go index ff5d03cfb..ceb80f2c4 100644 --- a/drivers/shared/executor/executor_linux.go +++ b/drivers/shared/executor/executor_linux.go @@ -12,6 +12,7 @@ import ( "io" "os" "os/exec" + "os/signal" "path" "path/filepath" "strings" @@ -78,9 +79,31 @@ type LibcontainerExecutor struct { userProc *libcontainer.Process userProcExited chan interface{} exitState *ProcessState + sigChan chan os.Signal +} + +func (l *LibcontainerExecutor) catchSignals() { + l.logger.Trace("waiting for signals") + defer signal.Stop(l.sigChan) + defer close(l.sigChan) + + signal.Notify(l.sigChan, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSEGV) + for { + signal := <-l.sigChan + if signal == syscall.SIGTERM || signal == syscall.SIGINT { + l.Shutdown("SIGINT", 0) + break + } + + if l.container != nil { + l.container.Signal(signal, false) + } + } } func NewExecutorWithIsolation(logger hclog.Logger, compute cpustats.Compute) Executor { + sigch := make(chan os.Signal, 4) + le := &LibcontainerExecutor{ id: strings.ReplaceAll(uuid.Generate(), "-", "_"), logger: logger.Named("isolated_executor"), @@ -88,7 +111,11 @@ func NewExecutorWithIsolation(logger hclog.Logger, compute cpustats.Compute) Exe totalCpuStats: cpustats.New(compute), userCpuStats: cpustats.New(compute), systemCpuStats: cpustats.New(compute), + sigChan: sigch, } + + go le.catchSignals() + le.processStats = procstats.New(compute, le) return le } @@ -97,6 +124,34 @@ func (l *LibcontainerExecutor) ListProcesses() *set.Set[int] { return procstats.List(l.command) } +// cleanOldProcessesInCGroup kills processes that might ended up orphans when the +// executor was unexpectedly killed and nomad can't reconnect to them. +func (l *LibcontainerExecutor) cleanOldProcessesInCGroup(nomadRelativePath string) { + l.logger.Debug("looking for old processes", "path", nomadRelativePath) + + root := cgroupslib.GetDefaultRoot() + orphansPIDs, err := cgroups.GetAllPids(filepath.Join(root, nomadRelativePath)) + if err != nil { + l.logger.Error("unable to get orphaned task PIDs", "error", err) + return + } + + for _, pid := range orphansPIDs { + l.logger.Info("killing orphaned process", "pid", pid) + + // Avoid bringing down the whole node by mistake, very unlikely case, + // but it's better to be sure. + if pid == 1 { + continue + } + + err := syscall.Kill(pid, syscall.SIGKILL) + if err != nil { + l.logger.Error("unable to send signal to process", "pid", pid, "error", err) + } + } +} + // Launch creates a new container in libcontainer and starts a new process with it func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, error) { l.logger.Trace("preparing to launch command", "command", command.Cmd, "args", strings.Join(command.Args, " ")) @@ -127,6 +182,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro return nil, fmt.Errorf("failed to configure container(%s): %v", l.id, err) } + l.cleanOldProcessesInCGroup(containerCfg.Cgroups.Path) container, err := factory.Create(l.id, containerCfg) if err != nil { return nil, fmt.Errorf("failed to create container(%s): %v", l.id, err) @@ -166,6 +222,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro if command.User != "" { process.User = command.User } + l.userProc = process l.totalCpuStats = cpustats.New(l.compute) @@ -187,7 +244,6 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro // start a goroutine to wait on the process to complete, so Wait calls can // be multiplexed l.userProcExited = make(chan interface{}) - go l.wait() return &ProcessState{ @@ -779,6 +835,7 @@ func (l *LibcontainerExecutor) configureCG2(cfg *runc.Config, command *ExecComma func (l *LibcontainerExecutor) newLibcontainerConfig(command *ExecCommand) (*runc.Config, error) { cfg := &runc.Config{ + ParentDeathSignal: 9, Cgroups: &runc.Cgroup{ Resources: &runc.Resources{ MemorySwappiness: nil, diff --git a/drivers/shared/executor/executor_linux_test.go b/drivers/shared/executor/executor_linux_test.go index 31a4d7f85..d72a55e9f 100644 --- a/drivers/shared/executor/executor_linux_test.go +++ b/drivers/shared/executor/executor_linux_test.go @@ -7,10 +7,12 @@ import ( "context" "fmt" "os" + "os/exec" "path/filepath" "regexp" "strconv" "strings" + "syscall" "testing" "time" @@ -27,6 +29,7 @@ import ( tu "github.com/hashicorp/nomad/testutil" lconfigs "github.com/opencontainers/runc/libcontainer/configs" "github.com/opencontainers/runc/libcontainer/devices" + "github.com/opencontainers/runtime-spec/specs-go" "github.com/shoenig/test" "github.com/shoenig/test/must" "github.com/stretchr/testify/require" @@ -856,3 +859,117 @@ func TestExecCommand_getCgroupOr_v1_relative(t *testing.T) { result2 := ec.getCgroupOr("cpuset", "/sys/fs/cgroup/cpuset/nomad/abc123") must.Eq(t, result2, "/sys/fs/cgroup/cpuset/custom/path") } + +func createCGroup(fullpath string) (cgroupslib.Interface, error) { + if err := os.MkdirAll(fullpath, 0755); err != nil { + return nil, err + } + + return cgroupslib.OpenPath(fullpath), nil +} + +func TestExecutor_CleanOldProcessesInCGroup(t *testing.T) { + ci.Parallel(t) + + testutil.ExecCompatible(t) + testutil.CgroupsCompatible(t) + + testExecCmd := testExecutorCommandWithChroot(t) + + allocDir := testExecCmd.allocDir + defer allocDir.Destroy() + + fullCGroupPath := testExecCmd.command.Resources.LinuxResources.CpusetCgroupPath + + execCmd := testExecCmd.command + execCmd.Cmd = "/bin/sleep" + execCmd.Args = []string{"1"} + execCmd.ResourceLimits = true + execCmd.ModePID = "private" + execCmd.ModeIPC = "private" + + // Create the CGroup the executor's command will run in and populate it with one process + cgInterface, err := createCGroup(fullCGroupPath) + must.NoError(t, err) + + cmd := exec.Command("/bin/sleep", "3000") + err = cmd.Start() + must.NoError(t, err) + + go func() { + err := cmd.Wait() + //This process will be killed by the executor as a prerequisite to run + // the executors command. + must.Error(t, err) + }() + + pid := cmd.Process.Pid + must.Positive(t, pid) + + err = cgInterface.Write("cgroup.procs", strconv.Itoa(pid)) + must.NoError(t, err) + + pids, err := cgInterface.PIDs() + must.NoError(t, err) + must.One(t, pids.Size()) + + // Run the executor normally and make sure the process that was originally running + // as part of the CGroup was killed, and only the executor's process is running. + execInterface := NewExecutorWithIsolation(testlog.HCLogger(t), compute) + executor := execInterface.(*LibcontainerExecutor) + defer executor.Shutdown("SIGKILL", 0) + + ps, err := executor.Launch(execCmd) + must.NoError(t, err) + must.Positive(t, ps.Pid) + + pids, err = cgInterface.PIDs() + must.NoError(t, err) + must.One(t, pids.Size()) + must.True(t, pids.Contains(ps.Pid)) + must.False(t, pids.Contains(pid)) + + estate, err := executor.Wait(context.Background()) + must.NoError(t, err) + must.Zero(t, estate.ExitCode) + + must.NoError(t, executor.Shutdown("", 0)) + executor.Wait(context.Background()) +} + +func TestExecutor_SignalCatching(t *testing.T) { + ci.Parallel(t) + + testutil.ExecCompatible(t) + testutil.CgroupsCompatible(t) + + testExecCmd := testExecutorCommandWithChroot(t) + + allocDir := testExecCmd.allocDir + defer allocDir.Destroy() + + execCmd := testExecCmd.command + execCmd.Cmd = "/bin/sleep" + execCmd.Args = []string{"100"} + execCmd.ResourceLimits = true + execCmd.ModePID = "private" + execCmd.ModeIPC = "private" + + execInterface := NewExecutorWithIsolation(testlog.HCLogger(t), compute) + + ps, err := execInterface.Launch(execCmd) + must.NoError(t, err) + must.Positive(t, ps.Pid) + + executor := execInterface.(*LibcontainerExecutor) + status, err := executor.container.OCIState() + must.NoError(t, err) + must.Eq(t, specs.StateRunning, status.Status) + + executor.sigChan <- syscall.SIGTERM + time.Sleep(1 * time.Second) + + status, err = executor.container.OCIState() + must.NoError(t, err) + must.Eq(t, specs.StateStopped, status.Status) +}