mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
numa: enable numa topology detection (#18146)
* client: refactor cgroups management in client * client: fingerprint numa topology * client: plumb numa and cgroups changes to drivers * client: cleanup task resource accounting * client: numa client and config plumbing * lib: add a stack implementation * tools: remove ec2info tool * plugins: fixup testing for cgroups / numa changes * build: update makefile and package tests and cl
This commit is contained in:
@@ -21,10 +21,11 @@ import (
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/lib/cpustats"
|
||||
"github.com/hashicorp/nomad/client/lib/fifo"
|
||||
"github.com/hashicorp/nomad/client/lib/resources"
|
||||
"github.com/hashicorp/nomad/client/lib/numalib"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper/stats"
|
||||
"github.com/hashicorp/nomad/drivers/shared/executor/procstats"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/syndtr/gocapability/capability"
|
||||
)
|
||||
@@ -157,6 +158,21 @@ type ExecCommand struct {
|
||||
Capabilities []string
|
||||
}
|
||||
|
||||
// Cgroup returns the path to the cgroup the Nomad client is managing for the
|
||||
// task that is about to be run.
|
||||
//
|
||||
// On cgroups v1 systems this returns the path to the cpuset cgroup specifically.
|
||||
//
|
||||
// On cgroups v2 systems this returns the patah to the task's scope.
|
||||
//
|
||||
// On non-Linux systems this returns the empty string and has no meaning.
|
||||
func (c *ExecCommand) Cgroup() string {
|
||||
if c == nil || c.Resources == nil || c.Resources.LinuxResources == nil {
|
||||
return ""
|
||||
}
|
||||
return c.Resources.LinuxResources.CpusetCgroupPath
|
||||
}
|
||||
|
||||
// SetWriters sets the writer for the process stdout and stderr. This should
|
||||
// not be used if writing to a file path such as a fifo file. SetStdoutWriter
|
||||
// is mainly used for unit testing purposes.
|
||||
@@ -239,37 +255,34 @@ func (v *ExecutorVersion) GoString() string {
|
||||
// supervises processes. In addition to process supervision it provides resource
|
||||
// and file system isolation
|
||||
type UniversalExecutor struct {
|
||||
childCmd exec.Cmd
|
||||
commandCfg *ExecCommand
|
||||
childCmd exec.Cmd
|
||||
command *ExecCommand
|
||||
|
||||
exitState *ProcessState
|
||||
processExited chan interface{}
|
||||
|
||||
// containment is used to cleanup resources created by the executor
|
||||
// currently only used for killing pids via freezer cgroup on linux
|
||||
containment resources.Containment
|
||||
|
||||
totalCpuStats *stats.CpuStats
|
||||
userCpuStats *stats.CpuStats
|
||||
systemCpuStats *stats.CpuStats
|
||||
pidCollector *pidCollector
|
||||
top cpustats.Topology
|
||||
totalCpuStats *cpustats.Tracker
|
||||
userCpuStats *cpustats.Tracker
|
||||
systemCpuStats *cpustats.Tracker
|
||||
processStats procstats.ProcessStats
|
||||
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
// NewExecutor returns an Executor
|
||||
func NewExecutor(logger hclog.Logger, cpuTotalTicks uint64) Executor {
|
||||
logger = logger.Named("executor")
|
||||
stats.SetCpuTotalTicks(cpuTotalTicks)
|
||||
|
||||
return &UniversalExecutor{
|
||||
logger: logger,
|
||||
func NewExecutor(logger hclog.Logger) Executor {
|
||||
top := numalib.Scan(numalib.PlatformScanners()) // TODO(shoenig) grpc plumbing
|
||||
ue := &UniversalExecutor{
|
||||
logger: logger.Named("executor"),
|
||||
top: top,
|
||||
processExited: make(chan interface{}),
|
||||
totalCpuStats: stats.NewCpuStats(),
|
||||
userCpuStats: stats.NewCpuStats(),
|
||||
systemCpuStats: stats.NewCpuStats(),
|
||||
pidCollector: newPidCollector(logger),
|
||||
totalCpuStats: cpustats.New(top),
|
||||
userCpuStats: cpustats.New(top),
|
||||
systemCpuStats: cpustats.New(top),
|
||||
}
|
||||
ue.processStats = procstats.New(top, ue)
|
||||
return ue
|
||||
}
|
||||
|
||||
// Version returns the api version of the executor
|
||||
@@ -282,7 +295,7 @@ func (e *UniversalExecutor) Version() (*ExecutorVersion, error) {
|
||||
func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) {
|
||||
e.logger.Trace("preparing to launch command", "command", command.Cmd, "args", strings.Join(command.Args, " "))
|
||||
|
||||
e.commandCfg = command
|
||||
e.command = command
|
||||
|
||||
// setting the user of the process
|
||||
if command.User != "" {
|
||||
@@ -293,27 +306,26 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error)
|
||||
}
|
||||
|
||||
// set the task dir as the working directory for the command
|
||||
e.childCmd.Dir = e.commandCfg.TaskDir
|
||||
e.childCmd.Dir = e.command.TaskDir
|
||||
|
||||
// start command in separate process group
|
||||
if err := e.setNewProcessGroup(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Maybe setup containment (for now, cgroups only only on linux)
|
||||
if e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup {
|
||||
pid := os.Getpid()
|
||||
if err := e.configureResourceContainer(pid); err != nil {
|
||||
e.logger.Error("failed to configure resource container", "pid", pid, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
// setup containment (i.e. cgroups on linux)
|
||||
if cleanup, err := e.configureResourceContainer(command, os.Getpid()); err != nil {
|
||||
e.logger.Error("failed to configure resource container", "error", err)
|
||||
return nil, err
|
||||
} else {
|
||||
defer cleanup()
|
||||
}
|
||||
|
||||
stdout, err := e.commandCfg.Stdout()
|
||||
stdout, err := e.command.Stdout()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stderr, err := e.commandCfg.Stderr()
|
||||
stderr, err := e.command.Stderr()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -336,14 +348,13 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error)
|
||||
// Set the commands arguments
|
||||
e.childCmd.Path = path
|
||||
e.childCmd.Args = append([]string{e.childCmd.Path}, command.Args...)
|
||||
e.childCmd.Env = e.commandCfg.Env
|
||||
e.childCmd.Env = e.command.Env
|
||||
|
||||
// Start the process
|
||||
if err = withNetworkIsolation(e.childCmd.Start, command.NetworkIsolation); err != nil {
|
||||
return nil, fmt.Errorf("failed to start command path=%q --- args=%q: %v", path, e.childCmd.Args, err)
|
||||
}
|
||||
|
||||
go e.pidCollector.collectPids(e.processExited, e.getAllPids)
|
||||
go e.wait()
|
||||
return &ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil
|
||||
}
|
||||
@@ -352,7 +363,14 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error)
|
||||
func (e *UniversalExecutor) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) {
|
||||
ctx, cancel := context.WithDeadline(context.Background(), deadline)
|
||||
defer cancel()
|
||||
return ExecScript(ctx, e.childCmd.Dir, e.commandCfg.Env, e.childCmd.SysProcAttr, e.commandCfg.NetworkIsolation, name, args)
|
||||
|
||||
if cleanup, err := e.setSubCmdCgroup(&e.childCmd, e.command.Cgroup()); err != nil {
|
||||
return nil, 0, err
|
||||
} else {
|
||||
defer cleanup()
|
||||
}
|
||||
|
||||
return ExecScript(ctx, e.childCmd.Dir, e.command.Env, e.childCmd.SysProcAttr, e.command.NetworkIsolation, name, args)
|
||||
}
|
||||
|
||||
// ExecScript executes cmd with args and returns the output, exit code, and
|
||||
@@ -364,6 +382,7 @@ func ExecScript(ctx context.Context, dir string, env []string, attrs *syscall.Sy
|
||||
|
||||
// Copy runtime environment from the main command
|
||||
cmd.SysProcAttr = attrs
|
||||
|
||||
cmd.Dir = dir
|
||||
cmd.Env = env
|
||||
|
||||
@@ -431,13 +450,18 @@ func (e *UniversalExecutor) ExecStreaming(ctx context.Context, command []string,
|
||||
return nil
|
||||
},
|
||||
processStart: func() error {
|
||||
if u := e.commandCfg.User; u != "" {
|
||||
if u := e.command.User; u != "" {
|
||||
if err := setCmdUser(cmd, u); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return withNetworkIsolation(cmd.Start, e.commandCfg.NetworkIsolation)
|
||||
cgroup := e.command.Cgroup()
|
||||
if cleanup, err := e.setSubCmdCgroup(cmd, cgroup); err != nil {
|
||||
return err
|
||||
} else {
|
||||
defer cleanup()
|
||||
}
|
||||
return withNetworkIsolation(cmd.Start, e.command.NetworkIsolation)
|
||||
},
|
||||
processWait: func() (*os.ProcessState, error) {
|
||||
err := cmd.Wait()
|
||||
@@ -464,7 +488,7 @@ func (e *UniversalExecutor) UpdateResources(resources *drivers.Resources) error
|
||||
|
||||
func (e *UniversalExecutor) wait() {
|
||||
defer close(e.processExited)
|
||||
defer e.commandCfg.Close()
|
||||
defer e.command.Close()
|
||||
pid := e.childCmd.Process.Pid
|
||||
err := e.childCmd.Wait()
|
||||
if err == nil {
|
||||
@@ -514,7 +538,7 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error {
|
||||
var merr multierror.Error
|
||||
|
||||
// If the executor did not launch a process, return.
|
||||
if e.commandCfg == nil {
|
||||
if e.command == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -559,6 +583,11 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error {
|
||||
proc.Kill()
|
||||
}
|
||||
|
||||
// Issue sigkill to the process group (if possible)
|
||||
if err = e.killProcessTree(proc); err != nil {
|
||||
e.logger.Warn("failed to shutdown process group", "pid", proc.Pid, "error", err)
|
||||
}
|
||||
|
||||
// Wait for process to exit
|
||||
select {
|
||||
case <-e.processExited:
|
||||
@@ -567,26 +596,10 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error {
|
||||
merr.Errors = append(merr.Errors, fmt.Errorf("process did not exit after 15 seconds"))
|
||||
}
|
||||
|
||||
// prefer killing the process via platform-dependent resource containment
|
||||
killByContainment := e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup
|
||||
|
||||
if !killByContainment {
|
||||
// there is no containment, so kill the group the old fashioned way by sending
|
||||
// SIGKILL to the negative pid
|
||||
if cleanupChildrenErr := e.killProcessTree(proc); cleanupChildrenErr != nil && cleanupChildrenErr.Error() != finishedErr {
|
||||
merr.Errors = append(merr.Errors,
|
||||
fmt.Errorf("can't kill process with pid %d: %v", e.childCmd.Process.Pid, cleanupChildrenErr))
|
||||
}
|
||||
} else {
|
||||
// there is containment available (e.g. cgroups) so defer to that implementation
|
||||
// for killing the processes
|
||||
if cleanupErr := e.containment.Cleanup(); cleanupErr != nil {
|
||||
e.logger.Warn("containment cleanup failed", "error", cleanupErr)
|
||||
merr.Errors = append(merr.Errors, cleanupErr)
|
||||
}
|
||||
}
|
||||
|
||||
if err = merr.ErrorOrNil(); err != nil {
|
||||
// Note that proclib in the TR shutdown may also dispatch a final platform
|
||||
// cleanup technique (e.g. cgroup kill), but if we get to the point where
|
||||
// that matters the Task was doing something naughty.
|
||||
e.logger.Warn("failed to shutdown due to some error", "error", err.Error())
|
||||
return err
|
||||
}
|
||||
@@ -628,16 +641,12 @@ func (e *UniversalExecutor) handleStats(ch chan *cstructs.TaskResourceUsage, ctx
|
||||
timer.Reset(interval)
|
||||
}
|
||||
|
||||
pidStats, err := e.pidCollector.pidStats()
|
||||
if err != nil {
|
||||
e.logger.Warn("error collecting stats", "error", err)
|
||||
return
|
||||
}
|
||||
stats := e.processStats.StatProcesses()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case ch <- aggregatedResourceUsage(e.systemCpuStats, pidStats):
|
||||
case ch <- procstats.Aggregate(e.systemCpuStats, stats):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user