mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
client/ar: thread through cpuset manager
This commit is contained in:
@@ -7,6 +7,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/lib/cgutil"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
@@ -152,6 +154,9 @@ type allocRunner struct {
|
||||
// runner to manage their mounting
|
||||
csiManager csimanager.Manager
|
||||
|
||||
// cpusetManager is responsible for configuring task cgroups if supported by the platform
|
||||
cpusetManager cgutil.CpusetManager
|
||||
|
||||
// devicemanager is used to mount devices as well as lookup device
|
||||
// statistics
|
||||
devicemanager devicemanager.Manager
|
||||
@@ -208,6 +213,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
|
||||
prevAllocMigrator: config.PrevAllocMigrator,
|
||||
dynamicRegistry: config.DynamicRegistry,
|
||||
csiManager: config.CSIManager,
|
||||
cpusetManager: config.CpusetManager,
|
||||
devicemanager: config.DeviceManager,
|
||||
driverManager: config.DriverManager,
|
||||
serversContactedCh: config.ServersContactedCh,
|
||||
@@ -242,24 +248,25 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
|
||||
func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
|
||||
for _, task := range tasks {
|
||||
trConfig := &taskrunner.Config{
|
||||
Alloc: ar.alloc,
|
||||
ClientConfig: ar.clientConfig,
|
||||
Task: task,
|
||||
TaskDir: ar.allocDir.NewTaskDir(task.Name),
|
||||
Logger: ar.logger,
|
||||
StateDB: ar.stateDB,
|
||||
StateUpdater: ar,
|
||||
DynamicRegistry: ar.dynamicRegistry,
|
||||
Consul: ar.consulClient,
|
||||
ConsulProxies: ar.consulProxiesClient,
|
||||
ConsulSI: ar.sidsClient,
|
||||
Vault: ar.vaultClient,
|
||||
DeviceStatsReporter: ar.deviceStatsReporter,
|
||||
CSIManager: ar.csiManager,
|
||||
DeviceManager: ar.devicemanager,
|
||||
DriverManager: ar.driverManager,
|
||||
ServersContactedCh: ar.serversContactedCh,
|
||||
StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task),
|
||||
Alloc: ar.alloc,
|
||||
ClientConfig: ar.clientConfig,
|
||||
Task: task,
|
||||
TaskDir: ar.allocDir.NewTaskDir(task.Name),
|
||||
Logger: ar.logger,
|
||||
StateDB: ar.stateDB,
|
||||
StateUpdater: ar,
|
||||
DynamicRegistry: ar.dynamicRegistry,
|
||||
Consul: ar.consulClient,
|
||||
ConsulProxies: ar.consulProxiesClient,
|
||||
ConsulSI: ar.sidsClient,
|
||||
Vault: ar.vaultClient,
|
||||
DeviceStatsReporter: ar.deviceStatsReporter,
|
||||
CSIManager: ar.csiManager,
|
||||
CpusetCgroupPathGetter: ar.cpusetManager.CgroupPathFor(ar.id, task.Name),
|
||||
DeviceManager: ar.devicemanager,
|
||||
DriverManager: ar.driverManager,
|
||||
ServersContactedCh: ar.serversContactedCh,
|
||||
StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task),
|
||||
}
|
||||
|
||||
// Create, but do not Run, the task runner
|
||||
|
||||
@@ -138,6 +138,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
|
||||
alloc := ar.Alloc()
|
||||
ar.runnerHooks = []interfaces.RunnerHook{
|
||||
newAllocDirHook(hookLogger, ar.allocDir),
|
||||
newCgroupHook(ar.Alloc(), ar.cpusetManager),
|
||||
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
|
||||
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
|
||||
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient),
|
||||
|
||||
32
client/allocrunner/cgroup_hook.go
Normal file
32
client/allocrunner/cgroup_hook.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/client/lib/cgutil"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
func newCgroupHook(alloc *structs.Allocation, man cgutil.CpusetManager) *cgroupHook {
|
||||
return &cgroupHook{
|
||||
alloc: alloc,
|
||||
cpusetManager: man,
|
||||
}
|
||||
}
|
||||
|
||||
type cgroupHook struct {
|
||||
alloc *structs.Allocation
|
||||
cpusetManager cgutil.CpusetManager
|
||||
}
|
||||
|
||||
func (c *cgroupHook) Name() string {
|
||||
return "cgroup"
|
||||
}
|
||||
|
||||
func (c *cgroupHook) Prerun() error {
|
||||
c.cpusetManager.AddAlloc(c.alloc)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cgroupHook) Postrun() error {
|
||||
c.cpusetManager.RemoveAlloc(c.alloc.ID)
|
||||
return nil
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
"github.com/hashicorp/nomad/client/dynamicplugins"
|
||||
"github.com/hashicorp/nomad/client/interfaces"
|
||||
"github.com/hashicorp/nomad/client/lib/cgutil"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
cstate "github.com/hashicorp/nomad/client/state"
|
||||
@@ -69,6 +70,9 @@ type Config struct {
|
||||
// DriverManager handles dispensing of driver plugins
|
||||
DriverManager drivermanager.Manager
|
||||
|
||||
// CpusetManager configures the cpuset cgroup if supported by the platform
|
||||
CpusetManager cgutil.CpusetManager
|
||||
|
||||
// ServersContactedCh is closed when the first GetClientAllocs call to
|
||||
// servers succeeds and allocs are synced.
|
||||
ServersContactedCh chan struct{}
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/lib/cgutil"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
@@ -201,6 +203,10 @@ type TaskRunner struct {
|
||||
// statistics
|
||||
devicemanager devicemanager.Manager
|
||||
|
||||
// cpusetCgroupPathGetter is used to lookup the cgroup path if supported by the platform
|
||||
cpusetCgroupPathGetter cgutil.CgroupPathGetter
|
||||
|
||||
CpusetCgroupPathGetter cgutil.CgroupPathGetter
|
||||
// driverManager is used to dispense driver plugins and register event
|
||||
// handlers
|
||||
driverManager drivermanager.Manager
|
||||
@@ -265,6 +271,9 @@ type Config struct {
|
||||
// CSIManager is used to manage the mounting of CSI volumes into tasks
|
||||
CSIManager csimanager.Manager
|
||||
|
||||
// CpusetCgroupPathGetter is used to lookup the cgroup path if supported by the platform
|
||||
CpusetCgroupPathGetter cgutil.CgroupPathGetter
|
||||
|
||||
// DeviceManager is used to mount devices as well as lookup device
|
||||
// statistics
|
||||
DeviceManager devicemanager.Manager
|
||||
@@ -303,36 +312,37 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
||||
}
|
||||
|
||||
tr := &TaskRunner{
|
||||
alloc: config.Alloc,
|
||||
allocID: config.Alloc.ID,
|
||||
clientConfig: config.ClientConfig,
|
||||
task: config.Task,
|
||||
taskDir: config.TaskDir,
|
||||
taskName: config.Task.Name,
|
||||
taskLeader: config.Task.Leader,
|
||||
envBuilder: envBuilder,
|
||||
dynamicRegistry: config.DynamicRegistry,
|
||||
consulServiceClient: config.Consul,
|
||||
consulProxiesClient: config.ConsulProxies,
|
||||
siClient: config.ConsulSI,
|
||||
vaultClient: config.Vault,
|
||||
state: tstate,
|
||||
localState: state.NewLocalState(),
|
||||
stateDB: config.StateDB,
|
||||
stateUpdater: config.StateUpdater,
|
||||
deviceStatsReporter: config.DeviceStatsReporter,
|
||||
killCtx: killCtx,
|
||||
killCtxCancel: killCancel,
|
||||
shutdownCtx: trCtx,
|
||||
shutdownCtxCancel: trCancel,
|
||||
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
|
||||
waitCh: make(chan struct{}),
|
||||
csiManager: config.CSIManager,
|
||||
devicemanager: config.DeviceManager,
|
||||
driverManager: config.DriverManager,
|
||||
maxEvents: defaultMaxEvents,
|
||||
serversContactedCh: config.ServersContactedCh,
|
||||
startConditionMetCtx: config.StartConditionMetCtx,
|
||||
alloc: config.Alloc,
|
||||
allocID: config.Alloc.ID,
|
||||
clientConfig: config.ClientConfig,
|
||||
task: config.Task,
|
||||
taskDir: config.TaskDir,
|
||||
taskName: config.Task.Name,
|
||||
taskLeader: config.Task.Leader,
|
||||
envBuilder: envBuilder,
|
||||
dynamicRegistry: config.DynamicRegistry,
|
||||
consulServiceClient: config.Consul,
|
||||
consulProxiesClient: config.ConsulProxies,
|
||||
siClient: config.ConsulSI,
|
||||
vaultClient: config.Vault,
|
||||
state: tstate,
|
||||
localState: state.NewLocalState(),
|
||||
stateDB: config.StateDB,
|
||||
stateUpdater: config.StateUpdater,
|
||||
deviceStatsReporter: config.DeviceStatsReporter,
|
||||
killCtx: killCtx,
|
||||
killCtxCancel: killCancel,
|
||||
shutdownCtx: trCtx,
|
||||
shutdownCtxCancel: trCancel,
|
||||
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
|
||||
waitCh: make(chan struct{}),
|
||||
csiManager: config.CSIManager,
|
||||
cpusetCgroupPathGetter: config.CpusetCgroupPathGetter,
|
||||
devicemanager: config.DeviceManager,
|
||||
driverManager: config.DriverManager,
|
||||
maxEvents: defaultMaxEvents,
|
||||
serversContactedCh: config.ServersContactedCh,
|
||||
startConditionMetCtx: config.StartConditionMetCtx,
|
||||
}
|
||||
|
||||
// Create the logger based on the allocation ID
|
||||
@@ -741,6 +751,13 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
|
||||
func (tr *TaskRunner) runDriver() error {
|
||||
|
||||
taskConfig := tr.buildTaskConfig()
|
||||
if tr.cpusetCgroupPathGetter != nil {
|
||||
cpusetCgroupPath, err := tr.cpusetCgroupPathGetter(tr.killCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
taskConfig.Resources.LinuxResources.CpusetCgroupPath = cpusetCgroupPath
|
||||
}
|
||||
|
||||
// Build hcl context variables
|
||||
vars, errs, err := tr.envBuilder.Build().AllValues()
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"net/rpc"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -298,6 +299,9 @@ type Client struct {
|
||||
// with a nomad client. Currently only used for CSI.
|
||||
dynamicRegistry dynamicplugins.Registry
|
||||
|
||||
// cpusetManager configures cpusets on supported platforms
|
||||
cpusetManager cgutil.CpusetManager
|
||||
|
||||
// EnterpriseClient is used to set and check enterprise features for clients
|
||||
EnterpriseClient *EnterpriseClient
|
||||
}
|
||||
@@ -357,6 +361,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
|
||||
invalidAllocs: make(map[string]struct{}),
|
||||
serversContactedCh: make(chan struct{}),
|
||||
serversContactedOnce: sync.Once{},
|
||||
cpusetManager: cgutil.NewCpusetManager(cfg.CgroupParent, logger.Named("cpuset_manager")),
|
||||
EnterpriseClient: newEnterpriseClient(logger),
|
||||
}
|
||||
|
||||
@@ -635,12 +640,14 @@ func (c *Client) init() error {
|
||||
c.logger.Info("using alloc directory", "alloc_dir", c.config.AllocDir)
|
||||
|
||||
// Ensure cgroups are created on linux platform
|
||||
if err := cgutil.InitCpusetParent(c.config.CgroupParent); err != nil {
|
||||
// If the client cannot initialize the cgroup then reserved cores will
|
||||
// not be reported and the cpuset manager will be disabled. This is
|
||||
// common when running in dev mode under a non-root user for example.
|
||||
c.logger.Warn("could not initialize cpuset cgroup subsystem, cpuset management disabled", "error", err)
|
||||
c.config.DisableCgroupManagement = true
|
||||
if runtime.GOOS == "linux" {
|
||||
err := c.cpusetManager.Init()
|
||||
if err != nil {
|
||||
// if the client cannot initialize the cgroup then reserved cores will not be reported and the cpuset manager
|
||||
// will be disabled. this is common when running in dev mode under a non-root user for example
|
||||
c.logger.Warn("could not initialize cpuset cgroup subsystem, cpuset management disabled", "error", err)
|
||||
c.config.DisableCgroupManagement = true
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1125,6 +1132,7 @@ func (c *Client) restoreState() error {
|
||||
PrevAllocMigrator: prevAllocMigrator,
|
||||
DynamicRegistry: c.dynamicRegistry,
|
||||
CSIManager: c.csimanager,
|
||||
CpusetManager: c.cpusetManager,
|
||||
DeviceManager: c.devicemanager,
|
||||
DriverManager: c.drivermanager,
|
||||
ServersContactedCh: c.serversContactedCh,
|
||||
@@ -2419,6 +2427,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
|
||||
PrevAllocMigrator: prevAllocMigrator,
|
||||
DynamicRegistry: c.dynamicRegistry,
|
||||
CSIManager: c.csimanager,
|
||||
CpusetManager: c.cpusetManager,
|
||||
DeviceManager: c.devicemanager,
|
||||
DriverManager: c.drivermanager,
|
||||
RPCClient: c,
|
||||
|
||||
@@ -74,7 +74,9 @@ func (f *CPUFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintR
|
||||
if cores, err := f.deriveReservableCores(req); err != nil {
|
||||
f.logger.Warn("failed to detect set of reservable cores", "error", err)
|
||||
} else {
|
||||
reservableCores = cpuset.New(cores...).Difference(cpuset.New(req.Node.ReservedResources.Cpu.ReservedCpuCores...)).ToSlice()
|
||||
if req.Node.ReservedResources != nil {
|
||||
reservableCores = cpuset.New(cores...).Difference(cpuset.New(req.Node.ReservedResources.Cpu.ReservedCpuCores...)).ToSlice()
|
||||
}
|
||||
f.logger.Debug("detected reservable cores", "cpuset", reservableCores)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,10 @@
|
||||
|
||||
package cgutil
|
||||
|
||||
const (
|
||||
DefaultCgroupParent = ""
|
||||
)
|
||||
|
||||
// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux
|
||||
// system. Here it is a no-op implemtation
|
||||
func FindCgroupMountpointDir() (string, error) {
|
||||
|
||||
@@ -139,6 +139,9 @@ func (c *cpusetManager) Init() error {
|
||||
}
|
||||
|
||||
parentCpus, parentMems, err := getCpusetSubsystemSettings(cgroupParentPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to detect parent cpuset settings: %v", err)
|
||||
}
|
||||
c.parentCpuset, err = cpuset.Parse(parentCpus)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse parent cpuset.cpus setting: %v", err)
|
||||
@@ -150,7 +153,7 @@ func (c *cpusetManager) Init() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := fscommon.WriteFile(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), "cpuset.mems", string(parentMems)); err != nil {
|
||||
if err := fscommon.WriteFile(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), "cpuset.mems", parentMems); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if !os.IsExist(err) {
|
||||
@@ -274,11 +277,20 @@ func (_ *cpusetManager) setCgroupCpusetCPUs(path, cpus string) error {
|
||||
}
|
||||
|
||||
func (c *cpusetManager) signalReconcile() {
|
||||
c.signalCh <- struct{}{}
|
||||
select {
|
||||
case c.signalCh <- struct{}{}:
|
||||
case <-c.doneCh:
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cpusetManager) getCpuset(group string) (cpuset.CPUSet, error) {
|
||||
man := cgroupFs.NewManager(&configs.Cgroup{Path: filepath.Join(c.cgroupParent, group)}, map[string]string{"cpuset": filepath.Join(c.cgroupParentPath, group)}, false)
|
||||
man := cgroupFs.NewManager(
|
||||
&configs.Cgroup{
|
||||
Path: filepath.Join(c.cgroupParent, group),
|
||||
},
|
||||
map[string]string{"cpuset": filepath.Join(c.cgroupParentPath, group)},
|
||||
false,
|
||||
)
|
||||
stats, err := man.GetStats()
|
||||
if err != nil {
|
||||
return cpuset.CPUSet{}, err
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
"github.com/hashicorp/nomad/client/lib/cgutil"
|
||||
)
|
||||
|
||||
// RequireRoot skips tests unless running on a Unix as root.
|
||||
@@ -50,7 +50,7 @@ func QemuCompatible(t *testing.T) {
|
||||
}
|
||||
|
||||
func CgroupCompatible(t *testing.T) {
|
||||
mount, err := fingerprint.FindCgroupMountpointDir()
|
||||
mount, err := cgutil.FindCgroupMountpointDir()
|
||||
if err != nil || mount == "" {
|
||||
t.Skipf("Failed to find cgroup mount: %v %v", mount, err)
|
||||
}
|
||||
|
||||
@@ -371,6 +371,8 @@ type LinuxResources struct {
|
||||
CpusetCPUs string
|
||||
CpusetMems string
|
||||
|
||||
CpusetCgroupPath string
|
||||
|
||||
// PrecentTicks is used to calculate the CPUQuota, currently the docker
|
||||
// driver exposes cpu period and quota through the driver configuration
|
||||
// and thus the calculation for CPUQuota cannot be done on the client.
|
||||
|
||||
Reference in New Issue
Block a user