diff --git a/client/client.go b/client/client.go index 8590735bb..8d3a08053 100644 --- a/client/client.go +++ b/client/client.go @@ -474,7 +474,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie }) // Create the cpu core partition manager - c.partitions = cgroupslib.GetPartition( + c.partitions = cgroupslib.GetPartition(c.logger.Named("partitions"), c.topology.UsableCores(), ) @@ -1276,7 +1276,6 @@ func (c *Client) restoreState() error { // wait for servers to be contacted before proceeding with the // restoration process. arConf.ServersContactedCh = c.serversContactedCh - ar, err := c.allocrunnerFactory(arConf) if err != nil { c.logger.Error("error running alloc", "error", err, "alloc_id", alloc.ID) diff --git a/client/lib/cgroupslib/partition_default.go b/client/lib/cgroupslib/partition_default.go index f685c671d..bb2565d9f 100644 --- a/client/lib/cgroupslib/partition_default.go +++ b/client/lib/cgroupslib/partition_default.go @@ -6,11 +6,12 @@ package cgroupslib import ( + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/lib/idset" "github.com/hashicorp/nomad/client/lib/numalib/hw" ) // GetPartition creates a no-op Partition that does not do anything. -func GetPartition(*idset.Set[hw.CoreID]) Partition { +func GetPartition(log hclog.Logger, cores *idset.Set[hw.CoreID]) Partition { return NoopPartition() } diff --git a/client/lib/cgroupslib/partition_linux.go b/client/lib/cgroupslib/partition_linux.go index ee0891fa1..165454dd0 100644 --- a/client/lib/cgroupslib/partition_linux.go +++ b/client/lib/cgroupslib/partition_linux.go @@ -6,24 +6,26 @@ package cgroupslib import ( + "fmt" "os" "path/filepath" "sync" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/lib/idset" "github.com/hashicorp/nomad/client/lib/numalib/hw" ) // GetPartition creates a Partition suitable for managing cores on this // Linux system. -func GetPartition(cores *idset.Set[hw.CoreID]) Partition { - return NewPartition(cores) +func GetPartition(log hclog.Logger, cores *idset.Set[hw.CoreID]) Partition { + return NewPartition(log, cores) } // NewPartition creates a cpuset partition manager for managing the books // when allocations are created and destroyed. The initial set of cores is // the usable set of cores by Nomad. -func NewPartition(cores *idset.Set[hw.CoreID]) Partition { +func NewPartition(log hclog.Logger, cores *idset.Set[hw.CoreID]) Partition { var ( sharePath string reservePath string @@ -41,6 +43,8 @@ func NewPartition(cores *idset.Set[hw.CoreID]) Partition { } return &partition{ + usableCores: cores.Copy(), + log: log, sharePath: sharePath, reservePath: reservePath, share: cores.Copy(), @@ -49,8 +53,10 @@ func NewPartition(cores *idset.Set[hw.CoreID]) Partition { } type partition struct { + log hclog.Logger sharePath string reservePath string + usableCores *idset.Set[hw.CoreID] lock sync.Mutex share *idset.Set[hw.CoreID] @@ -58,41 +64,49 @@ type partition struct { } func (p *partition) Restore(cores *idset.Set[hw.CoreID]) { + p.lock.Lock() defer p.lock.Unlock() p.share.RemoveSet(cores) - p.reserve.InsertSet(cores) + // Use the intersection with the usable cores to avoid adding more cores than available. + p.reserve.InsertSet(p.usableCores.Intersect(cores)) + } func (p *partition) Reserve(cores *idset.Set[hw.CoreID]) error { + p.lock.Lock() defer p.lock.Unlock() p.share.RemoveSet(cores) - p.reserve.InsertSet(cores) + // Use the intersection with the usable cores to avoid adding more cores than available. + p.reserve.InsertSet(p.usableCores.Intersect(cores)) return p.write() } func (p *partition) Release(cores *idset.Set[hw.CoreID]) error { + p.lock.Lock() defer p.lock.Unlock() p.reserve.RemoveSet(cores) - p.share.InsertSet(cores) + // Use the intersection with the usable cores to avoid removing more cores than available. + p.share.InsertSet(p.usableCores.Intersect(cores)) return p.write() } func (p *partition) write() error { shareStr := p.share.String() if err := os.WriteFile(p.sharePath, []byte(shareStr), 0644); err != nil { - return err + return fmt.Errorf("cgroupslib: unable to update share cpuset with %q: %w", shareStr, err) } + reserveStr := p.reserve.String() if err := os.WriteFile(p.reservePath, []byte(reserveStr), 0644); err != nil { - return err + return fmt.Errorf("cgroupslib: unable to update reserve cpuset with %q: %w", reserveStr, err) } return nil } diff --git a/client/lib/cgroupslib/partition_test.go b/client/lib/cgroupslib/partition_test.go index 601ff6a21..8cb3b1163 100644 --- a/client/lib/cgroupslib/partition_test.go +++ b/client/lib/cgroupslib/partition_test.go @@ -20,6 +20,7 @@ func testPartition(t *testing.T) *partition { shareFile := filepath.Join(dir, "share.cpus") reserveFile := filepath.Join(dir, "reserve.cpus") return &partition{ + usableCores: idset.From[hw.CoreID]([]hw.CoreID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}), sharePath: shareFile, reservePath: reserveFile, share: idset.From[hw.CoreID]([]hw.CoreID{10, 11, 12, 13, 14, 15, 16, 17, 18, 19}), @@ -93,4 +94,10 @@ func TestPartition_Release(t *testing.T) { p.Release(coreset(11, 18)) must.FileContains(t, p.sharePath, "10-19") must.FileContains(t, p.reservePath, "") + + // release more cores than the usable ones + // test partition only has 20 usable cores. + p.Release(coreset(11, 18, 19, 20, 21)) + must.FileContains(t, p.sharePath, "10-19") + must.FileContains(t, p.reservePath, "") }