diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index e9782b8f2..70e825605 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/hashicorp/nomad/client/lib/cgutil" + log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" @@ -152,6 +154,9 @@ type allocRunner struct { // runner to manage their mounting csiManager csimanager.Manager + // cpusetManager is responsible for configuring task cgroups if supported by the platform + cpusetManager cgutil.CpusetManager + // devicemanager is used to mount devices as well as lookup device // statistics devicemanager devicemanager.Manager @@ -208,6 +213,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { prevAllocMigrator: config.PrevAllocMigrator, dynamicRegistry: config.DynamicRegistry, csiManager: config.CSIManager, + cpusetManager: config.CpusetManager, devicemanager: config.DeviceManager, driverManager: config.DriverManager, serversContactedCh: config.ServersContactedCh, @@ -242,24 +248,25 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { for _, task := range tasks { trConfig := &taskrunner.Config{ - Alloc: ar.alloc, - ClientConfig: ar.clientConfig, - Task: task, - TaskDir: ar.allocDir.NewTaskDir(task.Name), - Logger: ar.logger, - StateDB: ar.stateDB, - StateUpdater: ar, - DynamicRegistry: ar.dynamicRegistry, - Consul: ar.consulClient, - ConsulProxies: ar.consulProxiesClient, - ConsulSI: ar.sidsClient, - Vault: ar.vaultClient, - DeviceStatsReporter: ar.deviceStatsReporter, - CSIManager: ar.csiManager, - DeviceManager: ar.devicemanager, - DriverManager: ar.driverManager, - ServersContactedCh: ar.serversContactedCh, - StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task), + Alloc: ar.alloc, + ClientConfig: ar.clientConfig, + Task: task, + TaskDir: ar.allocDir.NewTaskDir(task.Name), + Logger: ar.logger, + StateDB: ar.stateDB, + StateUpdater: ar, + DynamicRegistry: ar.dynamicRegistry, + Consul: ar.consulClient, + ConsulProxies: ar.consulProxiesClient, + ConsulSI: ar.sidsClient, + Vault: ar.vaultClient, + DeviceStatsReporter: ar.deviceStatsReporter, + CSIManager: ar.csiManager, + CpusetCgroupPathGetter: ar.cpusetManager.CgroupPathFor(ar.id, task.Name), + DeviceManager: ar.devicemanager, + DriverManager: ar.driverManager, + ServersContactedCh: ar.serversContactedCh, + StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task), } // Create, but do not Run, the task runner diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 4a1b0571e..07a184ffe 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -138,6 +138,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { alloc := ar.Alloc() ar.runnerHooks = []interfaces.RunnerHook{ newAllocDirHook(hookLogger, ar.allocDir), + newCgroupHook(ar.Alloc(), ar.cpusetManager), newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher), newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir), newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient), diff --git a/client/allocrunner/cgroup_hook.go b/client/allocrunner/cgroup_hook.go new file mode 100644 index 000000000..f73710472 --- /dev/null +++ b/client/allocrunner/cgroup_hook.go @@ -0,0 +1,32 @@ +package allocrunner + +import ( + "github.com/hashicorp/nomad/client/lib/cgutil" + "github.com/hashicorp/nomad/nomad/structs" +) + +func newCgroupHook(alloc *structs.Allocation, man cgutil.CpusetManager) *cgroupHook { + return &cgroupHook{ + alloc: alloc, + cpusetManager: man, + } +} + +type cgroupHook struct { + alloc *structs.Allocation + cpusetManager cgutil.CpusetManager +} + +func (c *cgroupHook) Name() string { + return "cgroup" +} + +func (c *cgroupHook) Prerun() error { + c.cpusetManager.AddAlloc(c.alloc) + return nil +} + +func (c *cgroupHook) Postrun() error { + c.cpusetManager.RemoveAlloc(c.alloc.ID) + return nil +} diff --git a/client/allocrunner/config.go b/client/allocrunner/config.go index fba790b5e..343d4eec5 100644 --- a/client/allocrunner/config.go +++ b/client/allocrunner/config.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" cstate "github.com/hashicorp/nomad/client/state" @@ -69,6 +70,9 @@ type Config struct { // DriverManager handles dispensing of driver plugins DriverManager drivermanager.Manager + // CpusetManager configures the cpuset cgroup if supported by the platform + CpusetManager cgutil.CpusetManager + // ServersContactedCh is closed when the first GetClientAllocs call to // servers succeeds and allocs are synced. ServersContactedCh chan struct{} diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index a377ae2df..23b33cc34 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/hashicorp/nomad/client/lib/cgutil" + metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" @@ -201,6 +203,10 @@ type TaskRunner struct { // statistics devicemanager devicemanager.Manager + // cpusetCgroupPathGetter is used to lookup the cgroup path if supported by the platform + cpusetCgroupPathGetter cgutil.CgroupPathGetter + + CpusetCgroupPathGetter cgutil.CgroupPathGetter // driverManager is used to dispense driver plugins and register event // handlers driverManager drivermanager.Manager @@ -265,6 +271,9 @@ type Config struct { // CSIManager is used to manage the mounting of CSI volumes into tasks CSIManager csimanager.Manager + // CpusetCgroupPathGetter is used to lookup the cgroup path if supported by the platform + CpusetCgroupPathGetter cgutil.CgroupPathGetter + // DeviceManager is used to mount devices as well as lookup device // statistics DeviceManager devicemanager.Manager @@ -303,36 +312,37 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { } tr := &TaskRunner{ - alloc: config.Alloc, - allocID: config.Alloc.ID, - clientConfig: config.ClientConfig, - task: config.Task, - taskDir: config.TaskDir, - taskName: config.Task.Name, - taskLeader: config.Task.Leader, - envBuilder: envBuilder, - dynamicRegistry: config.DynamicRegistry, - consulServiceClient: config.Consul, - consulProxiesClient: config.ConsulProxies, - siClient: config.ConsulSI, - vaultClient: config.Vault, - state: tstate, - localState: state.NewLocalState(), - stateDB: config.StateDB, - stateUpdater: config.StateUpdater, - deviceStatsReporter: config.DeviceStatsReporter, - killCtx: killCtx, - killCtxCancel: killCancel, - shutdownCtx: trCtx, - shutdownCtxCancel: trCancel, - triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), - waitCh: make(chan struct{}), - csiManager: config.CSIManager, - devicemanager: config.DeviceManager, - driverManager: config.DriverManager, - maxEvents: defaultMaxEvents, - serversContactedCh: config.ServersContactedCh, - startConditionMetCtx: config.StartConditionMetCtx, + alloc: config.Alloc, + allocID: config.Alloc.ID, + clientConfig: config.ClientConfig, + task: config.Task, + taskDir: config.TaskDir, + taskName: config.Task.Name, + taskLeader: config.Task.Leader, + envBuilder: envBuilder, + dynamicRegistry: config.DynamicRegistry, + consulServiceClient: config.Consul, + consulProxiesClient: config.ConsulProxies, + siClient: config.ConsulSI, + vaultClient: config.Vault, + state: tstate, + localState: state.NewLocalState(), + stateDB: config.StateDB, + stateUpdater: config.StateUpdater, + deviceStatsReporter: config.DeviceStatsReporter, + killCtx: killCtx, + killCtxCancel: killCancel, + shutdownCtx: trCtx, + shutdownCtxCancel: trCancel, + triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), + waitCh: make(chan struct{}), + csiManager: config.CSIManager, + cpusetCgroupPathGetter: config.CpusetCgroupPathGetter, + devicemanager: config.DeviceManager, + driverManager: config.DriverManager, + maxEvents: defaultMaxEvents, + serversContactedCh: config.ServersContactedCh, + startConditionMetCtx: config.StartConditionMetCtx, } // Create the logger based on the allocation ID @@ -741,6 +751,13 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { func (tr *TaskRunner) runDriver() error { taskConfig := tr.buildTaskConfig() + if tr.cpusetCgroupPathGetter != nil { + cpusetCgroupPath, err := tr.cpusetCgroupPathGetter(tr.killCtx) + if err != nil { + return err + } + taskConfig.Resources.LinuxResources.CpusetCgroupPath = cpusetCgroupPath + } // Build hcl context variables vars, errs, err := tr.envBuilder.Build().AllValues() diff --git a/client/client.go b/client/client.go index c68b58b3f..16ac649b2 100644 --- a/client/client.go +++ b/client/client.go @@ -7,6 +7,7 @@ import ( "net/rpc" "os" "path/filepath" + "runtime" "sort" "strconv" "strings" @@ -298,6 +299,9 @@ type Client struct { // with a nomad client. Currently only used for CSI. dynamicRegistry dynamicplugins.Registry + // cpusetManager configures cpusets on supported platforms + cpusetManager cgutil.CpusetManager + // EnterpriseClient is used to set and check enterprise features for clients EnterpriseClient *EnterpriseClient } @@ -357,6 +361,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie invalidAllocs: make(map[string]struct{}), serversContactedCh: make(chan struct{}), serversContactedOnce: sync.Once{}, + cpusetManager: cgutil.NewCpusetManager(cfg.CgroupParent, logger.Named("cpuset_manager")), EnterpriseClient: newEnterpriseClient(logger), } @@ -635,12 +640,14 @@ func (c *Client) init() error { c.logger.Info("using alloc directory", "alloc_dir", c.config.AllocDir) // Ensure cgroups are created on linux platform - if err := cgutil.InitCpusetParent(c.config.CgroupParent); err != nil { - // If the client cannot initialize the cgroup then reserved cores will - // not be reported and the cpuset manager will be disabled. This is - // common when running in dev mode under a non-root user for example. - c.logger.Warn("could not initialize cpuset cgroup subsystem, cpuset management disabled", "error", err) - c.config.DisableCgroupManagement = true + if runtime.GOOS == "linux" { + err := c.cpusetManager.Init() + if err != nil { + // if the client cannot initialize the cgroup then reserved cores will not be reported and the cpuset manager + // will be disabled. this is common when running in dev mode under a non-root user for example + c.logger.Warn("could not initialize cpuset cgroup subsystem, cpuset management disabled", "error", err) + c.config.DisableCgroupManagement = true + } } return nil } @@ -1125,6 +1132,7 @@ func (c *Client) restoreState() error { PrevAllocMigrator: prevAllocMigrator, DynamicRegistry: c.dynamicRegistry, CSIManager: c.csimanager, + CpusetManager: c.cpusetManager, DeviceManager: c.devicemanager, DriverManager: c.drivermanager, ServersContactedCh: c.serversContactedCh, @@ -2419,6 +2427,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error PrevAllocMigrator: prevAllocMigrator, DynamicRegistry: c.dynamicRegistry, CSIManager: c.csimanager, + CpusetManager: c.cpusetManager, DeviceManager: c.devicemanager, DriverManager: c.drivermanager, RPCClient: c, diff --git a/client/fingerprint/cpu.go b/client/fingerprint/cpu.go index 043800d78..e0fc8b2c5 100644 --- a/client/fingerprint/cpu.go +++ b/client/fingerprint/cpu.go @@ -74,7 +74,9 @@ func (f *CPUFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintR if cores, err := f.deriveReservableCores(req); err != nil { f.logger.Warn("failed to detect set of reservable cores", "error", err) } else { - reservableCores = cpuset.New(cores...).Difference(cpuset.New(req.Node.ReservedResources.Cpu.ReservedCpuCores...)).ToSlice() + if req.Node.ReservedResources != nil { + reservableCores = cpuset.New(cores...).Difference(cpuset.New(req.Node.ReservedResources.Cpu.ReservedCpuCores...)).ToSlice() + } f.logger.Debug("detected reservable cores", "cpuset", reservableCores) } } diff --git a/client/lib/cgutil/cgutil_default.go b/client/lib/cgutil/cgutil_default.go index 322a970b3..e9bc0f351 100644 --- a/client/lib/cgutil/cgutil_default.go +++ b/client/lib/cgutil/cgutil_default.go @@ -2,6 +2,10 @@ package cgutil +const ( + DefaultCgroupParent = "" +) + // FindCgroupMountpointDir is used to find the cgroup mount point on a Linux // system. Here it is a no-op implemtation func FindCgroupMountpointDir() (string, error) { diff --git a/client/lib/cgutil/cpuset_manager_linux.go b/client/lib/cgutil/cpuset_manager_linux.go index 185aa05aa..d46826251 100644 --- a/client/lib/cgutil/cpuset_manager_linux.go +++ b/client/lib/cgutil/cpuset_manager_linux.go @@ -139,6 +139,9 @@ func (c *cpusetManager) Init() error { } parentCpus, parentMems, err := getCpusetSubsystemSettings(cgroupParentPath) + if err != nil { + return fmt.Errorf("failed to detect parent cpuset settings: %v", err) + } c.parentCpuset, err = cpuset.Parse(parentCpus) if err != nil { return fmt.Errorf("failed to parse parent cpuset.cpus setting: %v", err) @@ -150,7 +153,7 @@ func (c *cpusetManager) Init() error { if err != nil { return err } - if err := fscommon.WriteFile(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), "cpuset.mems", string(parentMems)); err != nil { + if err := fscommon.WriteFile(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), "cpuset.mems", parentMems); err != nil { return err } } else if !os.IsExist(err) { @@ -274,11 +277,20 @@ func (_ *cpusetManager) setCgroupCpusetCPUs(path, cpus string) error { } func (c *cpusetManager) signalReconcile() { - c.signalCh <- struct{}{} + select { + case c.signalCh <- struct{}{}: + case <-c.doneCh: + } } func (c *cpusetManager) getCpuset(group string) (cpuset.CPUSet, error) { - man := cgroupFs.NewManager(&configs.Cgroup{Path: filepath.Join(c.cgroupParent, group)}, map[string]string{"cpuset": filepath.Join(c.cgroupParentPath, group)}, false) + man := cgroupFs.NewManager( + &configs.Cgroup{ + Path: filepath.Join(c.cgroupParent, group), + }, + map[string]string{"cpuset": filepath.Join(c.cgroupParentPath, group)}, + false, + ) stats, err := man.GetStats() if err != nil { return cpuset.CPUSet{}, err diff --git a/client/testutil/driver_compatible.go b/client/testutil/driver_compatible.go index 7829b987d..24e1abea6 100644 --- a/client/testutil/driver_compatible.go +++ b/client/testutil/driver_compatible.go @@ -6,7 +6,7 @@ import ( "syscall" "testing" - "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/client/lib/cgutil" ) // RequireRoot skips tests unless running on a Unix as root. @@ -50,7 +50,7 @@ func QemuCompatible(t *testing.T) { } func CgroupCompatible(t *testing.T) { - mount, err := fingerprint.FindCgroupMountpointDir() + mount, err := cgutil.FindCgroupMountpointDir() if err != nil || mount == "" { t.Skipf("Failed to find cgroup mount: %v %v", mount, err) } diff --git a/plugins/drivers/driver.go b/plugins/drivers/driver.go index 21a8e5650..9f44f20a3 100644 --- a/plugins/drivers/driver.go +++ b/plugins/drivers/driver.go @@ -371,6 +371,8 @@ type LinuxResources struct { CpusetCPUs string CpusetMems string + CpusetCgroupPath string + // PrecentTicks is used to calculate the CPUQuota, currently the docker // driver exposes cpu period and quota through the driver configuration // and thus the calculation for CPUQuota cannot be done on the client.