mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Merge pull request #23922 from hashicorp/b-NET-10880
[NET-10880] Keep a register of the usable cores to avoid using more than that
This commit is contained in:
@@ -474,7 +474,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
|
||||
})
|
||||
|
||||
// Create the cpu core partition manager
|
||||
c.partitions = cgroupslib.GetPartition(
|
||||
c.partitions = cgroupslib.GetPartition(c.logger.Named("partitions"),
|
||||
c.topology.UsableCores(),
|
||||
)
|
||||
|
||||
@@ -1276,7 +1276,6 @@ func (c *Client) restoreState() error {
|
||||
// wait for servers to be contacted before proceeding with the
|
||||
// restoration process.
|
||||
arConf.ServersContactedCh = c.serversContactedCh
|
||||
|
||||
ar, err := c.allocrunnerFactory(arConf)
|
||||
if err != nil {
|
||||
c.logger.Error("error running alloc", "error", err, "alloc_id", alloc.ID)
|
||||
|
||||
@@ -6,11 +6,12 @@
|
||||
package cgroupslib
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/lib/idset"
|
||||
"github.com/hashicorp/nomad/client/lib/numalib/hw"
|
||||
)
|
||||
|
||||
// GetPartition creates a no-op Partition that does not do anything.
|
||||
func GetPartition(*idset.Set[hw.CoreID]) Partition {
|
||||
func GetPartition(log hclog.Logger, cores *idset.Set[hw.CoreID]) Partition {
|
||||
return NoopPartition()
|
||||
}
|
||||
|
||||
@@ -6,24 +6,26 @@
|
||||
package cgroupslib
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/lib/idset"
|
||||
"github.com/hashicorp/nomad/client/lib/numalib/hw"
|
||||
)
|
||||
|
||||
// GetPartition creates a Partition suitable for managing cores on this
|
||||
// Linux system.
|
||||
func GetPartition(cores *idset.Set[hw.CoreID]) Partition {
|
||||
return NewPartition(cores)
|
||||
func GetPartition(log hclog.Logger, cores *idset.Set[hw.CoreID]) Partition {
|
||||
return NewPartition(log, cores)
|
||||
}
|
||||
|
||||
// NewPartition creates a cpuset partition manager for managing the books
|
||||
// when allocations are created and destroyed. The initial set of cores is
|
||||
// the usable set of cores by Nomad.
|
||||
func NewPartition(cores *idset.Set[hw.CoreID]) Partition {
|
||||
func NewPartition(log hclog.Logger, cores *idset.Set[hw.CoreID]) Partition {
|
||||
var (
|
||||
sharePath string
|
||||
reservePath string
|
||||
@@ -41,6 +43,8 @@ func NewPartition(cores *idset.Set[hw.CoreID]) Partition {
|
||||
}
|
||||
|
||||
return &partition{
|
||||
usableCores: cores.Copy(),
|
||||
log: log,
|
||||
sharePath: sharePath,
|
||||
reservePath: reservePath,
|
||||
share: cores.Copy(),
|
||||
@@ -49,8 +53,10 @@ func NewPartition(cores *idset.Set[hw.CoreID]) Partition {
|
||||
}
|
||||
|
||||
type partition struct {
|
||||
log hclog.Logger
|
||||
sharePath string
|
||||
reservePath string
|
||||
usableCores *idset.Set[hw.CoreID]
|
||||
|
||||
lock sync.Mutex
|
||||
share *idset.Set[hw.CoreID]
|
||||
@@ -58,41 +64,49 @@ type partition struct {
|
||||
}
|
||||
|
||||
func (p *partition) Restore(cores *idset.Set[hw.CoreID]) {
|
||||
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
p.share.RemoveSet(cores)
|
||||
p.reserve.InsertSet(cores)
|
||||
// Use the intersection with the usable cores to avoid adding more cores than available.
|
||||
p.reserve.InsertSet(p.usableCores.Intersect(cores))
|
||||
|
||||
}
|
||||
|
||||
func (p *partition) Reserve(cores *idset.Set[hw.CoreID]) error {
|
||||
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
p.share.RemoveSet(cores)
|
||||
p.reserve.InsertSet(cores)
|
||||
|
||||
// Use the intersection with the usable cores to avoid adding more cores than available.
|
||||
p.reserve.InsertSet(p.usableCores.Intersect(cores))
|
||||
return p.write()
|
||||
}
|
||||
|
||||
func (p *partition) Release(cores *idset.Set[hw.CoreID]) error {
|
||||
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
p.reserve.RemoveSet(cores)
|
||||
p.share.InsertSet(cores)
|
||||
|
||||
// Use the intersection with the usable cores to avoid removing more cores than available.
|
||||
p.share.InsertSet(p.usableCores.Intersect(cores))
|
||||
return p.write()
|
||||
}
|
||||
|
||||
func (p *partition) write() error {
|
||||
shareStr := p.share.String()
|
||||
if err := os.WriteFile(p.sharePath, []byte(shareStr), 0644); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("cgroupslib: unable to update share cpuset with %q: %w", shareStr, err)
|
||||
}
|
||||
|
||||
reserveStr := p.reserve.String()
|
||||
if err := os.WriteFile(p.reservePath, []byte(reserveStr), 0644); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("cgroupslib: unable to update reserve cpuset with %q: %w", reserveStr, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ func testPartition(t *testing.T) *partition {
|
||||
shareFile := filepath.Join(dir, "share.cpus")
|
||||
reserveFile := filepath.Join(dir, "reserve.cpus")
|
||||
return &partition{
|
||||
usableCores: idset.From[hw.CoreID]([]hw.CoreID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}),
|
||||
sharePath: shareFile,
|
||||
reservePath: reserveFile,
|
||||
share: idset.From[hw.CoreID]([]hw.CoreID{10, 11, 12, 13, 14, 15, 16, 17, 18, 19}),
|
||||
@@ -93,4 +94,10 @@ func TestPartition_Release(t *testing.T) {
|
||||
p.Release(coreset(11, 18))
|
||||
must.FileContains(t, p.sharePath, "10-19")
|
||||
must.FileContains(t, p.reservePath, "")
|
||||
|
||||
// release more cores than the usable ones
|
||||
// test partition only has 20 usable cores.
|
||||
p.Release(coreset(11, 18, 19, 20, 21))
|
||||
must.FileContains(t, p.sharePath, "10-19")
|
||||
must.FileContains(t, p.reservePath, "")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user