mirror of
https://github.com/kemko/nomad.git
synced 2026-01-02 00:15:43 +03:00
Track top k nodes by norm score rather than top k nodes per scorer
This commit is contained in:
@@ -112,14 +112,15 @@ type AllocationMetric struct {
|
||||
Scores map[string]float64
|
||||
AllocationTime time.Duration
|
||||
CoalescedFailures int
|
||||
ScoreMetaData map[string][]*NodeScoreMeta
|
||||
ScoreMetaData []*NodeScoreMeta
|
||||
}
|
||||
|
||||
// NodeScoreMeta is used to serialize node scoring metadata
|
||||
// displayed in the CLI during verbose mode
|
||||
type NodeScoreMeta struct {
|
||||
NodeID string
|
||||
Score float64
|
||||
NodeID string
|
||||
Scores map[string]float64
|
||||
NormScore float64
|
||||
}
|
||||
|
||||
// AllocationListStub is used to return a subset of an allocation
|
||||
|
||||
@@ -374,11 +374,24 @@ func formatAllocMetrics(metrics *api.AllocationMetric, scores bool, prefix strin
|
||||
// Print scores
|
||||
if scores {
|
||||
if len(metrics.ScoreMetaData) > 0 {
|
||||
for scorer, scoreMeta := range metrics.ScoreMetaData {
|
||||
for _, nodeScore := range scoreMeta {
|
||||
out += fmt.Sprintf("%s* Scorer %q, Node %q = %f\n", prefix, scorer, nodeScore.NodeID, nodeScore.Score)
|
||||
scoreOutput := make([]string, len(metrics.ScoreMetaData)+1)
|
||||
|
||||
for i, scoreMeta := range metrics.ScoreMetaData {
|
||||
// Add header as first row
|
||||
if i == 0 {
|
||||
scoreOutput[0] = "Node|"
|
||||
for scorerName, _ := range scoreMeta.Scores {
|
||||
scoreOutput[0] += fmt.Sprintf("%v|", scorerName)
|
||||
}
|
||||
scoreOutput[0] += "Final Score"
|
||||
}
|
||||
scoreOutput[i+1] = fmt.Sprintf("%v|", scoreMeta.NodeID)
|
||||
for _, scoreVal := range scoreMeta.Scores {
|
||||
scoreOutput[i+1] += fmt.Sprintf("%v|", scoreVal)
|
||||
}
|
||||
scoreOutput[i+1] += fmt.Sprintf("%v", scoreMeta.NormScore)
|
||||
}
|
||||
out += formatList(scoreOutput)
|
||||
} else {
|
||||
// Backwards compatibility for old allocs
|
||||
for name, score := range metrics.Scores {
|
||||
|
||||
@@ -4,17 +4,17 @@ import (
|
||||
"container/heap"
|
||||
)
|
||||
|
||||
// An HeapItem represents elements being managed in the Score heap
|
||||
type HeapItem struct {
|
||||
Value string // The Value of the item; arbitrary.
|
||||
Score float64 // The Score of the item in the heap
|
||||
// HeapItem is an interface type implemented by objects stored in the ScoreHeap
|
||||
type HeapItem interface {
|
||||
Data() interface{} // The data object
|
||||
Score() float64 // Score to use as the sort criteria
|
||||
}
|
||||
|
||||
// A ScoreHeap implements heap.Interface and is a min heap
|
||||
// that keeps the top K elements by Score. Push can be called
|
||||
// with an arbitrary number of values but only the top K are stored
|
||||
type ScoreHeap struct {
|
||||
items []*HeapItem
|
||||
items []HeapItem
|
||||
capacity int
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ func NewScoreHeap(capacity uint32) *ScoreHeap {
|
||||
func (pq ScoreHeap) Len() int { return len(pq.items) }
|
||||
|
||||
func (pq ScoreHeap) Less(i, j int) bool {
|
||||
return pq.items[i].Score < pq.items[j].Score
|
||||
return pq.items[i].Score() < pq.items[j].Score()
|
||||
}
|
||||
|
||||
func (pq ScoreHeap) Swap(i, j int) {
|
||||
@@ -35,7 +35,7 @@ func (pq ScoreHeap) Swap(i, j int) {
|
||||
// Push implements heap.Interface and only stores
|
||||
// the top K elements by Score
|
||||
func (pq *ScoreHeap) Push(x interface{}) {
|
||||
item := x.(*HeapItem)
|
||||
item := x.(HeapItem)
|
||||
if len(pq.items) < pq.capacity {
|
||||
pq.items = append(pq.items, item)
|
||||
} else {
|
||||
@@ -43,7 +43,7 @@ func (pq *ScoreHeap) Push(x interface{}) {
|
||||
// greater than the min Score so far
|
||||
minIndex := 0
|
||||
min := pq.items[minIndex]
|
||||
if item.Score > min.Score {
|
||||
if item.Score() > min.Score() {
|
||||
// Replace min and heapify
|
||||
pq.items[minIndex] = item
|
||||
heap.Fix(pq, minIndex)
|
||||
|
||||
@@ -7,11 +7,24 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type heapItem struct {
|
||||
Value string
|
||||
ScoreVal float64
|
||||
}
|
||||
|
||||
func (h *heapItem) Data() interface{} {
|
||||
return h.Value
|
||||
}
|
||||
|
||||
func (h *heapItem) Score() float64 {
|
||||
return h.ScoreVal
|
||||
}
|
||||
|
||||
func TestScoreHeap(t *testing.T) {
|
||||
type testCase struct {
|
||||
desc string
|
||||
items map[string]float64
|
||||
expected []*HeapItem
|
||||
expected []*heapItem
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
@@ -28,12 +41,12 @@ func TestScoreHeap(t *testing.T) {
|
||||
"lemon": 3.9,
|
||||
"cherry": 0.03,
|
||||
},
|
||||
expected: []*HeapItem{
|
||||
{Value: "pear", Score: 2.32},
|
||||
{Value: "banana", Score: 3.0},
|
||||
{Value: "lemon", Score: 3.9},
|
||||
{Value: "watermelon", Score: 5.45},
|
||||
{Value: "strawberry", Score: 9.03},
|
||||
expected: []*heapItem{
|
||||
{Value: "pear", ScoreVal: 2.32},
|
||||
{Value: "banana", ScoreVal: 3.0},
|
||||
{Value: "lemon", ScoreVal: 3.9},
|
||||
{Value: "watermelon", ScoreVal: 5.45},
|
||||
{Value: "strawberry", ScoreVal: 9.03},
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -43,10 +56,10 @@ func TestScoreHeap(t *testing.T) {
|
||||
"okra": -1.0,
|
||||
"corn": 0.25,
|
||||
},
|
||||
expected: []*HeapItem{
|
||||
{Value: "okra", Score: -1.0},
|
||||
{Value: "corn", Score: 0.25},
|
||||
{Value: "eggplant", Score: 9.0},
|
||||
expected: []*heapItem{
|
||||
{Value: "okra", ScoreVal: -1.0},
|
||||
{Value: "corn", ScoreVal: 0.25},
|
||||
{Value: "eggplant", ScoreVal: 9.0},
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -56,9 +69,9 @@ func TestScoreHeap(t *testing.T) {
|
||||
// Create Score heap, push elements into it
|
||||
pq := NewScoreHeap(5)
|
||||
for value, score := range tc.items {
|
||||
heapItem := &HeapItem{
|
||||
Value: value,
|
||||
Score: score,
|
||||
heapItem := &heapItem{
|
||||
Value: value,
|
||||
ScoreVal: score,
|
||||
}
|
||||
heap.Push(pq, heapItem)
|
||||
}
|
||||
@@ -69,7 +82,7 @@ func TestScoreHeap(t *testing.T) {
|
||||
|
||||
i := 0
|
||||
for pq.Len() > 0 {
|
||||
item := heap.Pop(pq).(*HeapItem)
|
||||
item := heap.Pop(pq).(*heapItem)
|
||||
require.Equal(tc.expected[i], item)
|
||||
i++
|
||||
}
|
||||
|
||||
@@ -247,19 +247,6 @@ func CopySliceSpreadTarget(s []*SpreadTarget) []*SpreadTarget {
|
||||
return c
|
||||
}
|
||||
|
||||
func CopyNodeScoreMetaMap(m map[string][]*NodeScoreMeta) map[string][]*NodeScoreMeta {
|
||||
l := len(m)
|
||||
if l == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
cm := make(map[string][]*NodeScoreMeta, l)
|
||||
for k, v := range m {
|
||||
cm[k] = CopySliceNodeScoreMeta(v)
|
||||
}
|
||||
return cm
|
||||
}
|
||||
|
||||
func CopySliceNodeScoreMeta(s []*NodeScoreMeta) []*NodeScoreMeta {
|
||||
l := len(s)
|
||||
if l == 0 {
|
||||
|
||||
@@ -139,6 +139,9 @@ const (
|
||||
// MaxRetainedNodeScores is the number of top scoring nodes for which we
|
||||
// retain scoring metadata
|
||||
MaxRetainedNodeScores = 5
|
||||
|
||||
// Normalized scorer name
|
||||
NormScorerName = "normalized-score"
|
||||
)
|
||||
|
||||
// Context defines the scope in which a search for Nomad object operates, and
|
||||
@@ -6498,12 +6501,16 @@ type AllocMetric struct {
|
||||
// Deprecated: Replaced by ScoreMetaData in Nomad 0.9
|
||||
Scores map[string]float64
|
||||
|
||||
// ScoreMetaData is a slice of top scoring nodes per scorer
|
||||
ScoreMetaData map[string][]*NodeScoreMeta
|
||||
// ScoreMetaData is a slice of top scoring nodes displayed in the CLI
|
||||
ScoreMetaData []*NodeScoreMeta
|
||||
|
||||
// topScores is used to maintain a heap of the top K scoring nodes per
|
||||
// scorer type
|
||||
topScores map[string]*lib.ScoreHeap
|
||||
// nodeScoreMeta is used to keep scores for a single node id. It is cleared out after
|
||||
// we receive normalized score during the last step of the scoring stack.
|
||||
nodeScoreMeta *NodeScoreMeta
|
||||
|
||||
// topScores is used to maintain a heap of the top K nodes with
|
||||
// the highest normalized score
|
||||
topScores *lib.ScoreHeap
|
||||
|
||||
// AllocationTime is a measure of how long the allocation
|
||||
// attempt took. This can affect performance and SLAs.
|
||||
@@ -6529,7 +6536,7 @@ func (a *AllocMetric) Copy() *AllocMetric {
|
||||
na.DimensionExhausted = helper.CopyMapStringInt(na.DimensionExhausted)
|
||||
na.QuotaExhausted = helper.CopySliceString(na.QuotaExhausted)
|
||||
na.Scores = helper.CopyMapStringFloat64(na.Scores)
|
||||
na.ScoreMetaData = CopyNodeScoreMetaMap(na.ScoreMetaData)
|
||||
na.ScoreMetaData = CopySliceNodeScoreMeta(na.ScoreMetaData)
|
||||
return na
|
||||
}
|
||||
|
||||
@@ -6579,52 +6586,55 @@ func (a *AllocMetric) ExhaustQuota(dimensions []string) {
|
||||
|
||||
// ScoreNode is used to gather top K scoring nodes in a heap
|
||||
func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) {
|
||||
if a.topScores == nil {
|
||||
a.topScores = make(map[string]*lib.ScoreHeap)
|
||||
if a.nodeScoreMeta == nil {
|
||||
a.nodeScoreMeta = &NodeScoreMeta{
|
||||
NodeID: node.ID,
|
||||
Scores: make(map[string]float64),
|
||||
}
|
||||
}
|
||||
scorerHeap, ok := a.topScores[name]
|
||||
if !ok {
|
||||
scorerHeap = lib.NewScoreHeap(MaxRetainedNodeScores)
|
||||
a.topScores[name] = scorerHeap
|
||||
if name == NormScorerName {
|
||||
a.nodeScoreMeta.NormScore = score
|
||||
// Once we have the normalized score we can push to the heap
|
||||
// that tracks top K by normalized score
|
||||
|
||||
// Create the heap if its not there already
|
||||
if a.topScores == nil {
|
||||
a.topScores = lib.NewScoreHeap(MaxRetainedNodeScores)
|
||||
}
|
||||
heap.Push(a.topScores, a.nodeScoreMeta)
|
||||
|
||||
// Clear out this entry because its now in the heap
|
||||
a.nodeScoreMeta = nil
|
||||
} else {
|
||||
a.nodeScoreMeta.Scores[name] = score
|
||||
}
|
||||
heap.Push(scorerHeap, &lib.HeapItem{
|
||||
Value: node.ID,
|
||||
Score: score,
|
||||
})
|
||||
}
|
||||
|
||||
// PopulateScoreMetaData populates a map of scorer to scoring metadata
|
||||
// The map is populated by popping elements from a heap of top K scores
|
||||
// maintained per scorer
|
||||
func (a *AllocMetric) PopulateScoreMetaData() {
|
||||
if a.ScoreMetaData == nil {
|
||||
a.ScoreMetaData = make(map[string][]*NodeScoreMeta)
|
||||
}
|
||||
if a.topScores != nil && len(a.ScoreMetaData) == 0 {
|
||||
for scorer, scoreHeap := range a.topScores {
|
||||
scoreMeta := make([]*NodeScoreMeta, scoreHeap.Len())
|
||||
i := scoreHeap.Len() - 1
|
||||
// Pop from the heap and store in reverse to get top K
|
||||
// scoring nodes in descending order
|
||||
for scoreHeap.Len() > 0 {
|
||||
item := heap.Pop(scoreHeap).(*lib.HeapItem)
|
||||
scoreMeta[i] = &NodeScoreMeta{
|
||||
NodeID: item.Value,
|
||||
Score: item.Score,
|
||||
}
|
||||
i--
|
||||
}
|
||||
a.ScoreMetaData[scorer] = scoreMeta
|
||||
if a.topScores != nil {
|
||||
if a.ScoreMetaData == nil {
|
||||
a.ScoreMetaData = make([]*NodeScoreMeta, a.topScores.Len())
|
||||
}
|
||||
i := a.topScores.Len() - 1
|
||||
// Pop from the heap and store in reverse to get top K
|
||||
// scoring nodes in descending order
|
||||
for a.topScores.Len() > 0 {
|
||||
item := heap.Pop(a.topScores).(*NodeScoreMeta)
|
||||
a.ScoreMetaData[i] = item
|
||||
i--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NodeScoreMeta captures scoring meta data derived from
|
||||
// different scoring factors. The meta data from top K scores
|
||||
// are displayed in the CLI
|
||||
// different scoring factors.
|
||||
type NodeScoreMeta struct {
|
||||
NodeID string
|
||||
Score float64
|
||||
NodeID string
|
||||
Scores map[string]float64
|
||||
NormScore float64
|
||||
}
|
||||
|
||||
func (s *NodeScoreMeta) Copy() *NodeScoreMeta {
|
||||
@@ -6637,7 +6647,15 @@ func (s *NodeScoreMeta) Copy() *NodeScoreMeta {
|
||||
}
|
||||
|
||||
func (s *NodeScoreMeta) String() string {
|
||||
return fmt.Sprintf("%s %v", s.NodeID, s.Score)
|
||||
return fmt.Sprintf("%s %f %v", s.NodeID, s.NormScore, s.Scores)
|
||||
}
|
||||
|
||||
func (s *NodeScoreMeta) Score() float64 {
|
||||
return s.NormScore
|
||||
}
|
||||
|
||||
func (s *NodeScoreMeta) Data() interface{} {
|
||||
return s
|
||||
}
|
||||
|
||||
// AllocDeploymentStatus captures the status of the allocation as part of the
|
||||
|
||||
@@ -140,7 +140,6 @@ func (iter *SpreadIterator) Next() *RankedNode {
|
||||
scoreBoost := evenSpreadScoreBoost(pset, option.Node)
|
||||
totalSpreadScore += scoreBoost
|
||||
} else {
|
||||
|
||||
// Get the desired count
|
||||
desiredCount, ok := spreadDetails.desiredCounts[nValue]
|
||||
if !ok {
|
||||
|
||||
@@ -311,8 +311,8 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) {
|
||||
if met.ClassExhausted["linux-medium-pci"] != 1 {
|
||||
t.Fatalf("bad: %#v", met)
|
||||
}
|
||||
// Expect two metrics, one with bin packing score and one with normalized score
|
||||
if len(met.ScoreMetaData) != 2 {
|
||||
// Expect score metadata for one node
|
||||
if len(met.ScoreMetaData) != 1 {
|
||||
t.Fatalf("bad: %#v", met)
|
||||
}
|
||||
}
|
||||
@@ -535,7 +535,7 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) {
|
||||
t.Fatalf("bad: %#v", met)
|
||||
}
|
||||
// Should have two scores, one from bin packing and one from normalization
|
||||
if len(met.ScoreMetaData) != 2 {
|
||||
if len(met.ScoreMetaData) != 1 {
|
||||
t.Fatalf("bad: %#v", met)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user