numa: fix scheduler panic due to topology serialization bug (#23284)

The NUMA topology struct field `NodeIDs` is a `idset.Set`, which has no public
members. As a result, this field is never serialized via msgpack and persisted
in state. When `numa.affinity = "prefer"`, the scheduler dereferences this nil
field and panics the scheduler worker.

Ideally we would fix this by adding a msgpack serialization extension, but
because the field already exists and is just always empty, this breaks RPC wire
compatibility across upgrades. Instead, create a new field that's populated at
the same time we populate the more useful `idset.Set`, and repopulate the set on
demand.

Fixes: https://hashicorp.atlassian.net/browse/NET-9924
This commit is contained in:
Tim Gross
2024-06-11 08:55:00 -04:00
committed by GitHub
parent 288a048a2e
commit 7d73065066
19 changed files with 134 additions and 61 deletions

View File

@@ -173,7 +173,7 @@ func (f *CPUFingerprint) setNUMA(response *FingerprintResponse) {
return
}
nodes := f.top.Nodes()
nodes := f.top.GetNodes()
response.AddAttribute("numa.node.count", f.nodes(nodes.Size()))
nodes.ForEach(func(id hw.NodeID) error {

View File

@@ -192,7 +192,7 @@ func (s *Set[T]) Size() int {
// Empty returns whether the set is empty.
func (s *Set[T]) Empty() bool {
if s == nil {
if s == nil || s.items == nil {
return true
}
return s.items.Empty()

View File

@@ -30,8 +30,8 @@ type MacOS struct{}
func (m *MacOS) ScanSystem(top *Topology) {
// all apple hardware is non-numa; just assume as much
top.NodeIDs = idset.Empty[hw.NodeID]()
top.NodeIDs.Insert(nodeID)
top.nodeIDs = idset.Empty[hw.NodeID]()
top.nodeIDs.Insert(nodeID)
// arch specific detection
switch m1cpu.IsAppleSilicon() {

View File

@@ -21,8 +21,9 @@ const (
func scanGeneric(top *Topology) {
// hardware may or may not be NUMA, but for now we only
// detect such topology on linux systems
top.NodeIDs = idset.Empty[hw.NodeID]()
top.NodeIDs.Insert(genericNodeID)
top.nodeIDs = idset.Empty[hw.NodeID]()
top.nodeIDs.Insert(genericNodeID)
top.Nodes = top.nodeIDs.Slice()
// cores
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

View File

@@ -67,22 +67,23 @@ func (*Sysfs) available() bool {
func (*Sysfs) discoverOnline(st *Topology, readerFunc pathReaderFn) {
ids, err := getIDSet[hw.NodeID](nodeOnline, readerFunc)
if err == nil {
st.NodeIDs = ids
st.nodeIDs = ids
st.Nodes = st.nodeIDs.Slice()
}
}
func (*Sysfs) discoverCosts(st *Topology, readerFunc pathReaderFn) {
if st.NodeIDs.Empty() {
if st.nodeIDs.Empty() {
return
}
dimension := st.NodeIDs.Size()
st.Distances = make(SLIT, st.NodeIDs.Size())
dimension := st.nodeIDs.Size()
st.Distances = make(SLIT, st.nodeIDs.Size())
for i := 0; i < dimension; i++ {
st.Distances[i] = make([]Cost, dimension)
}
_ = st.NodeIDs.ForEach(func(id hw.NodeID) error {
_ = st.nodeIDs.ForEach(func(id hw.NodeID) error {
s, err := getString(distanceFile, readerFunc, id)
if err != nil {
return err
@@ -104,20 +105,21 @@ func (*Sysfs) discoverCores(st *Topology, readerFunc pathReaderFn) {
st.Cores = make([]Core, onlineCores.Size())
switch {
case st.NodeIDs == nil:
case st.nodeIDs == nil:
// We did not find node data, no node to associate with
_ = onlineCores.ForEach(func(core hw.CoreID) error {
st.NodeIDs = idset.From[hw.NodeID]([]hw.NodeID{0})
st.nodeIDs = idset.From[hw.NodeID]([]hw.NodeID{0})
const node = 0
const socket = 0
cpuMax, _ := getNumeric[hw.KHz](cpuMaxFile, 64, readerFunc, core)
base, _ := getNumeric[hw.KHz](cpuBaseFile, 64, readerFunc, core)
st.insert(node, socket, core, Performance, cpuMax, base)
st.Nodes = st.nodeIDs.Slice()
return nil
})
default:
// We found node data, associate cores to nodes
_ = st.NodeIDs.ForEach(func(node hw.NodeID) error {
_ = st.nodeIDs.ForEach(func(node hw.NodeID) error {
s, err := readerFunc(fmt.Sprintf(cpulistFile, node))
if err != nil {
return err
@@ -231,7 +233,7 @@ func (s *Fallback) ScanSystem(top *Topology) {
broken := false
switch {
case top.NodeIDs.Empty():
case top.nodeIDs.Empty():
broken = true
case len(top.Distances) == 0:
broken = true
@@ -251,7 +253,8 @@ func (s *Fallback) ScanSystem(top *Topology) {
// we have a broken topology; reset it and fallback to the generic scanner
// basically treating this client like a windows / unsupported OS
top.NodeIDs = nil
top.nodeIDs = nil
top.Nodes = nil
top.Distances = nil
top.Cores = nil

View File

@@ -85,7 +85,7 @@ func TestSysfs_discoverOnline(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
sy := &Sysfs{}
sy.discoverOnline(st, tt.readerFunc)
must.Eq(t, tt.expectedIDSet, st.NodeIDs)
must.Eq(t, tt.expectedIDSet, st.GetNodes())
})
}
}
@@ -113,7 +113,7 @@ func TestSysfs_discoverCosts(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sy := &Sysfs{}
st.NodeIDs = tt.nodeIDs
st.SetNodes(tt.nodeIDs)
sy.discoverCosts(st, tt.readerFunc)
must.Eq(t, tt.expectedDistances, st.Distances)
})
@@ -136,7 +136,8 @@ func TestSysfs_discoverCores(t *testing.T) {
// issue#19372
{"one node and bad sys data", oneNode, badSysData, &Topology{
NodeIDs: oneNode,
nodeIDs: oneNode,
Nodes: oneNode.Slice(),
Cores: []Core{
{
SocketID: 0,
@@ -157,7 +158,8 @@ func TestSysfs_discoverCores(t *testing.T) {
},
}},
{"two nodes and good sys data", twoNodes, goodSysData, &Topology{
NodeIDs: twoNodes,
nodeIDs: twoNodes,
Nodes: twoNodes.Slice(),
Cores: []Core{
{
SocketID: 1,
@@ -197,7 +199,7 @@ func TestSysfs_discoverCores(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sy := &Sysfs{}
st.NodeIDs = tt.nodeIDs
st.SetNodes(tt.nodeIDs)
sy.discoverCores(st, tt.readerFunc)
must.Eq(t, tt.expectedTopology, st)
})

View File

@@ -54,7 +54,12 @@ type (
// The JSON encoding is not used yet but my be part of the gRPC plumbing
// in the future.
type Topology struct {
NodeIDs *idset.Set[hw.NodeID]
// COMPAT: idset.Set wasn't being serialized correctly but we can't change
// the encoding of a field once its shipped. Nodes is the wire
// representation
nodeIDs *idset.Set[hw.NodeID]
Nodes []uint8
Distances SLIT
Cores []Core
@@ -66,7 +71,25 @@ type Topology struct {
// NewTopology is a constructor for the Topology object, only used in tests for
// mocking.
func NewTopology(nodeIDs *idset.Set[hw.NodeID], distances SLIT, cores []Core) *Topology {
return &Topology{NodeIDs: nodeIDs, Distances: distances, Cores: cores}
t := &Topology{
nodeIDs: nodeIDs,
Distances: distances, Cores: cores}
t.SetNodes(nodeIDs)
return t
}
func (t *Topology) SetNodes(nodes *idset.Set[hw.NodeID]) {
t.nodeIDs = nodes
if !nodes.Empty() {
t.Nodes = nodes.Slice()
} else {
t.Nodes = []uint8{}
}
}
func (t *Topology) SetNodesFrom(nodes []uint8) {
t.nodeIDs = idset.From[hw.NodeID](nodes)
t.Nodes = nodes
}
// A Core represents one logical (vCPU) core on a processor. Basically the slice
@@ -139,12 +162,15 @@ func (st *Topology) SupportsNUMA() bool {
}
}
// Nodes returns the set of NUMA Node IDs.
func (st *Topology) Nodes() *idset.Set[hw.NodeID] {
// GetNodes returns the set of NUMA Node IDs.
func (st *Topology) GetNodes() *idset.Set[hw.NodeID] {
if !st.SupportsNUMA() {
return nil
}
return st.NodeIDs
if st.nodeIDs.Empty() {
st.nodeIDs = idset.From[hw.NodeID](st.Nodes)
}
return st.nodeIDs
}
// NodeCores returns the set of Core IDs for the given NUMA Node ID.

View File

@@ -133,7 +133,6 @@ func TestJobEndpoint_Register_NonOverlapping(t *testing.T) {
node := mock.Node()
node.NodeResources.Processors = structs.NodeProcessorResources{
Topology: &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{[]numalib.Cost{10}},
Cores: []numalib.Core{{
ID: 0,
@@ -142,6 +141,7 @@ func TestJobEndpoint_Register_NonOverlapping(t *testing.T) {
}},
},
}
node.NodeResources.Processors.Topology.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
node.NodeResources.Compatibility()
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node))

View File

@@ -31,10 +31,7 @@ func topologyFromLegacyGeneric(old LegacyNodeCpuResources) *numalib.Topology {
withheld := (frequency * hw.MHz(coreCount)) - hw.MHz(old.CpuShares)
return &numalib.Topology{
// legacy: assume one node with id 0
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
t := &numalib.Topology{
// legacy: with one node the distance matrix is 1-D
Distances: numalib.SLIT{{10}},
@@ -47,4 +44,8 @@ func topologyFromLegacyGeneric(old LegacyNodeCpuResources) *numalib.Topology {
// legacy: set since we can compute the value
OverrideWitholdCompute: withheld,
}
// legacy: assume one node with id 0
t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
return t
}

View File

@@ -71,10 +71,7 @@ func topologyFromLegacyLinux(old LegacyNodeCpuResources) *numalib.Topology {
withheld := (frequency * hw.MHz(old.TotalCpuCores)) - hw.MHz(old.CpuShares)
return &numalib.Topology{
// legacy: assume one node with id 0
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
t := &numalib.Topology{
// legacy: with one node the distance matrix is 1-D
Distances: numalib.SLIT{{10}},
@@ -87,4 +84,8 @@ func topologyFromLegacyLinux(old LegacyNodeCpuResources) *numalib.Topology {
// legacy: set since we can compute the value
OverrideWitholdCompute: withheld,
}
// legacy: assume one node with id 0
t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
return t
}

View File

@@ -29,7 +29,6 @@ func TestNUMA_topologyFromLegacy_plain(t *testing.T) {
result := topologyFromLegacy(old)
exp := &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{{10}},
Cores: []numalib.Core{
makeLegacyCore(0),
@@ -40,12 +39,13 @@ func TestNUMA_topologyFromLegacy_plain(t *testing.T) {
OverrideTotalCompute: 12800,
OverrideWitholdCompute: 0,
}
exp.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
// only compares total compute
must.Equal(t, exp, result)
// check underlying fields
must.Eq(t, exp.NodeIDs, result.NodeIDs)
must.Eq(t, exp.GetNodes(), result.GetNodes())
must.Eq(t, exp.Distances, result.Distances)
must.Eq(t, exp.Cores, result.Cores)
must.Eq(t, exp.OverrideTotalCompute, result.OverrideTotalCompute)
@@ -66,7 +66,6 @@ func TestNUMA_topologyFromLegacy_reservations(t *testing.T) {
result := topologyFromLegacy(old)
exp := &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{{10}},
Cores: []numalib.Core{
makeLegacyCore(1),
@@ -76,12 +75,13 @@ func TestNUMA_topologyFromLegacy_reservations(t *testing.T) {
OverrideTotalCompute: 9600,
OverrideWitholdCompute: 3200, // core 0 excluded
}
exp.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
// only compares total compute
must.Equal(t, exp, result)
// check underlying fields
must.Eq(t, exp.NodeIDs, result.NodeIDs)
must.Eq(t, exp.GetNodes(), result.GetNodes())
must.Eq(t, exp.Distances, result.Distances)
must.Eq(t, exp.Cores, result.Cores)
must.Eq(t, exp.OverrideTotalCompute, result.OverrideTotalCompute)

View File

@@ -5,19 +5,52 @@ package structs
import (
"reflect"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/helper"
)
var (
// extendedTypes is a mapping of extended types to their extension function
// TODO: the duplicates could be simplified by looking up the base type in the case of a pointer type in ConvertExt
extendedTypes = map[reflect.Type]extendFunc{
reflect.TypeOf(Node{}): nodeExt,
reflect.TypeOf(&Node{}): nodeExt,
reflect.TypeOf(CSIVolume{}): csiVolumeExt,
reflect.TypeOf(&CSIVolume{}): csiVolumeExt,
reflect.TypeOf(Node{}): nodeExt,
reflect.TypeOf(&Node{}): nodeExt,
reflect.TypeOf(CSIVolume{}): csiVolumeExt,
reflect.TypeOf(&CSIVolume{}): csiVolumeExt,
reflect.TypeOf(&numalib.Topology{}): numaTopoExt,
}
)
// numaTopoExt is used to JSON encode topology to correctly handle the private
// idset.Set fields and so that NUMA NodeIDs are encoded as []int because
// go-msgpack will further JSON encode []uint8 into a base64-encoded bytestring,
// rather than an array
func numaTopoExt(v interface{}) interface{} {
topo := v.(*numalib.Topology)
var nodes []int
if topo.GetNodes() != nil {
nodes = helper.ConvertSlice(
topo.GetNodes().Slice(), func(n uint8) int { return int(n) })
}
return &struct {
Nodes []int
Distances numalib.SLIT
Cores []numalib.Core
OverrideTotalCompute hw.MHz
OverrideWitholdCompute hw.MHz
}{
Nodes: nodes,
Distances: topo.Distances,
Cores: topo.Cores,
OverrideTotalCompute: topo.OverrideTotalCompute,
OverrideWitholdCompute: topo.OverrideWitholdCompute,
}
}
// nodeExt ensures the node is sanitized and adds the legacy field .Drain back to encoded Node objects
func nodeExt(v interface{}) interface{} {
node := v.(*Node).Sanitize()

View File

@@ -95,7 +95,6 @@ func node2k() *Node {
NodeResources: &NodeResources{
Processors: NodeProcessorResources{
Topology: &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{[]numalib.Cost{10}},
Cores: []numalib.Core{{
ID: 0,
@@ -148,6 +147,7 @@ func node2k() *Node {
},
},
}
n.NodeResources.Processors.Topology.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
n.NodeResources.Compatibility()
return n
}
@@ -521,7 +521,6 @@ func TestScoreFitBinPack(t *testing.T) {
node.NodeResources = &NodeResources{
Processors: NodeProcessorResources{
Topology: &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{[]numalib.Cost{10}},
Cores: []numalib.Core{{
ID: 0,
@@ -534,6 +533,7 @@ func TestScoreFitBinPack(t *testing.T) {
MemoryMB: 8192,
},
}
node.NodeResources.Processors.Topology.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
node.NodeResources.Compatibility()
node.ReservedResources = &NodeReservedResources{
Cpu: NodeReservedCpuResources{

View File

@@ -7235,7 +7235,7 @@ func TestNodeResources_Merge(t *testing.T) {
}
topo2 := MockBasicTopology()
topo2.NodeIDs = idset.From[hw.NodeID]([]hw.NodeID{0, 1, 2})
topo2.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0, 1, 2}))
res.Merge(&NodeResources{
Processors: NodeProcessorResources{topo2},

View File

@@ -55,13 +55,14 @@ func MockBasicTopology() *numalib.Topology {
BaseSpeed: 3500,
}
}
return &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
t := &numalib.Topology{
Distances: numalib.SLIT{[]numalib.Cost{10}},
Cores: cores,
OverrideTotalCompute: 0,
OverrideWitholdCompute: 0,
}
t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
return t
}
// MockWorkstationTopology returns a numalib.Topology that looks like a typical
@@ -82,11 +83,12 @@ func MockWorkstationTopology() *numalib.Topology {
BaseSpeed: 3_000,
}
}
return &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0, 1}),
t := &numalib.Topology{
Distances: numalib.SLIT{[]numalib.Cost{10, 20}, {20, 10}},
Cores: cores,
}
t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0, 1}))
return t
}
// MockR6aTopology returns a numalib.Topology that looks like an EC2 r6a.metal
@@ -133,11 +135,12 @@ func MockR6aTopology() *numalib.Topology {
[]numalib.Cost{32, 32, 12, 10},
}
return &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0, 1, 2, 3}),
t := &numalib.Topology{
Distances: distances,
Cores: cores,
}
t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0, 1, 2, 3}))
return t
}
func MockNode() *Node {

View File

@@ -118,13 +118,15 @@ func nomadTopologyFromProto(pb *proto.ClientTopology) *numalib.Topology {
if pb == nil {
return nil
}
return &numalib.Topology{
NodeIDs: idset.FromFunc(pb.NodeIds, func(i uint32) hw.NodeID { return hw.NodeID(i) }),
t := &numalib.Topology{
Distances: nomadTopologyDistancesFromProto(pb.Distances),
Cores: nomadTopologyCoresFromProto(pb.Cores),
OverrideTotalCompute: hw.MHz(pb.OverrideTotalCompute),
OverrideWitholdCompute: hw.MHz(pb.OverrideWitholdCompute),
}
t.SetNodes(idset.FromFunc(pb.NodeIds, func(i uint32) hw.NodeID { return hw.NodeID(i) }))
return t
}
func nomadTopologyDistancesFromProto(pb *proto.ClientTopologySLIT) numalib.SLIT {
@@ -166,7 +168,7 @@ func nomadTopologyToProto(top *numalib.Topology) *proto.ClientTopology {
return nil
}
return &proto.ClientTopology{
NodeIds: helper.ConvertSlice(top.NodeIDs.Slice(), func(id hw.NodeID) uint32 { return uint32(id) }),
NodeIds: helper.ConvertSlice(top.GetNodes().Slice(), func(id hw.NodeID) uint32 { return uint32(id) }),
Distances: nomadTopologyDistancesToProto(top.Distances),
Cores: nomadTopologyCoresToProto(top.Cores),
OverrideTotalCompute: uint64(top.OverrideTotalCompute),

View File

@@ -15,7 +15,6 @@ import (
func Test_nomadTopologyToProto(t *testing.T) {
top := &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0, 1}),
Distances: numalib.SLIT{{10, 20}, {20, 10}},
Cores: []numalib.Core{
{
@@ -32,6 +31,7 @@ func Test_nomadTopologyToProto(t *testing.T) {
OverrideTotalCompute: 90_000,
OverrideWitholdCompute: 2000,
}
top.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0, 1}))
pb := nomadTopologyToProto(top)
must.Eq(t, &proto.ClientTopology{
@@ -80,8 +80,7 @@ func Test_nomadTopologyFromProto(t *testing.T) {
OverrideWitholdCompute: 2000,
}
top := nomadTopologyFromProto(pb)
must.Eq(t, &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0, 1}),
expect := &numalib.Topology{
Distances: numalib.SLIT{{10, 20}, {20, 10}},
Cores: []numalib.Core{
{
@@ -97,7 +96,9 @@ func Test_nomadTopologyFromProto(t *testing.T) {
},
OverrideTotalCompute: 90_000,
OverrideWitholdCompute: 2000,
}, top)
}
expect.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0, 1}))
must.Eq(t, expect, top)
}
func Test_nomadTopologyDistancesToProto(t *testing.T) {

View File

@@ -43,7 +43,6 @@ func cpuResources(shares int) (structs.LegacyNodeCpuResources, structs.NodeProce
n := &structs.NodeResources{
Processors: structs.NodeProcessorResources{
Topology: &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{[]numalib.Cost{10}},
Cores: []numalib.Core{{
SocketID: 0,
@@ -56,6 +55,7 @@ func cpuResources(shares int) (structs.LegacyNodeCpuResources, structs.NodeProce
},
},
}
n.Processors.Topology.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
// polyfill the legacy struct
n.Compatibility()

View File

@@ -1279,7 +1279,6 @@ func TestBinPackIterator_ReservedCores(t *testing.T) {
state, ctx := testContext(t)
topology := &numalib.Topology{
NodeIDs: idset.From[hw.NodeID]([]hw.NodeID{0}),
Distances: numalib.SLIT{[]numalib.Cost{10}},
Cores: []numalib.Core{{
ID: 0,
@@ -1291,6 +1290,7 @@ func TestBinPackIterator_ReservedCores(t *testing.T) {
BaseSpeed: 1024,
}},
}
topology.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0}))
legacyCpuResources, processorResources := cpuResourcesFrom(topology)
nodes := []*RankedNode{