From 4d68d935e4737e3f1d8d18b0b64c5facd64203ba Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 8 Aug 2018 09:41:56 -0500 Subject: [PATCH] Use heap to store top K scoring nodes. Scoring metadata is now aggregated by scorer type to make it easier to parse when reading it in the CLI. --- api/allocations.go | 8 ++++ command/alloc_status_test.go | 54 ++++++++++++++++++++++++ command/monitor.go | 13 +++++- nomad/structs/funcs.go | 26 ++++++++++++ nomad/structs/structs.go | 82 +++++++++++++++++++++++++++++++++--- scheduler/generic_sched.go | 3 ++ scheduler/stack_test.go | 6 ++- 7 files changed, 182 insertions(+), 10 deletions(-) diff --git a/api/allocations.go b/api/allocations.go index 8d0279127..cc266150b 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -112,6 +112,14 @@ type AllocationMetric struct { Scores map[string]float64 AllocationTime time.Duration CoalescedFailures int + ScoreMetaData map[string][]*NodeScoreMeta +} + +// NodeScoreMeta is used to serialize node scoring metadata +// displayed in the CLI during verbose mode +type NodeScoreMeta struct { + NodeID string + Score float64 } // AllocationListStub is used to return a subset of an allocation diff --git a/command/alloc_status_test.go b/command/alloc_status_test.go index 65f3ee54c..88c05e370 100644 --- a/command/alloc_status_test.go +++ b/command/alloc_status_test.go @@ -224,6 +224,60 @@ func TestAllocStatusCommand_RescheduleInfo(t *testing.T) { require.Regexp(regexp.MustCompile(".*Reschedule Attempts\\s*=\\s*1/2"), out) } +func TestAllocStatusCommand_ScoreMetrics(t *testing.T) { + t.Parallel() + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + // Wait for a node to be ready + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := client.Nodes().List(nil) + if err != nil { + return false, err + } + for _, node := range nodes { + if node.Status == structs.NodeStatusReady { + return true, nil + } + } + return false, fmt.Errorf("no ready nodes") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + ui := new(cli.MockUi) + cmd := &AllocStatusCommand{Meta: Meta{Ui: ui}} + // Test reschedule attempt info + require := require.New(t) + state := srv.Agent.Server().State() + a := mock.Alloc() + mockNode1 := mock.Node() + mockNode2 := mock.Node() + a.Metrics = &structs.AllocMetric{ + ScoreMetaData: map[string][]*structs.NodeScoreMeta{ + "binpack": []*structs.NodeScoreMeta{ + {NodeID: mockNode1.ID, Score: 0.77}, + {NodeID: mockNode2.ID, Score: 0.75}, + }, + "node-affinity": []*structs.NodeScoreMeta{ + {NodeID: mockNode1.ID, Score: 0.5}, + {NodeID: mockNode2.ID, Score: 0.33}, + }, + }, + } + require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a})) + + if code := cmd.Run([]string{"-address=" + url, "-verbose", a.ID}); code != 0 { + t.Fatalf("expected exit 0, got: %d", code) + } + out := ui.OutputWriter.String() + require.Contains(out, "Placement Metrics") + require.Contains(out, fmt.Sprintf("Scorer %q, Node %q", "binpack", mockNode1.ID)) + require.Contains(out, fmt.Sprintf("Scorer %q, Node %q", "binpack", mockNode2.ID)) + require.Contains(out, fmt.Sprintf("Scorer %q, Node %q", "node-affinity", mockNode1.ID)) + require.Contains(out, fmt.Sprintf("Scorer %q, Node %q", "binpack", mockNode2.ID)) +} + func TestAllocStatusCommand_AutocompleteArgs(t *testing.T) { assert := assert.New(t) t.Parallel() diff --git a/command/monitor.go b/command/monitor.go index 5449186a2..05978e366 100644 --- a/command/monitor.go +++ b/command/monitor.go @@ -373,8 +373,17 @@ func formatAllocMetrics(metrics *api.AllocationMetric, scores bool, prefix strin // Print scores if scores { - for name, score := range metrics.Scores { - out += fmt.Sprintf("%s* Score %q = %f\n", prefix, name, score) + if len(metrics.ScoreMetaData) > 0 { + for scorer, scoreMeta := range metrics.ScoreMetaData { + for _, nodeScore := range scoreMeta { + out += fmt.Sprintf("%s* Scorer %q, Node %q = %f\n", prefix, scorer, nodeScore.NodeID, nodeScore.Score) + } + } + } else { + // Backwards compatibility for old allocs + for name, score := range metrics.Scores { + out += fmt.Sprintf("%s* Score %q = %f\n", prefix, name, score) + } } } diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 2e4910181..6f85a4dc1 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -247,6 +247,32 @@ func CopySliceSpreadTarget(s []*SpreadTarget) []*SpreadTarget { return c } +func CopyNodeScoreMetaMap(m map[string][]*NodeScoreMeta) map[string][]*NodeScoreMeta { + l := len(m) + if l == 0 { + return nil + } + + cm := make(map[string][]*NodeScoreMeta, l) + for k, v := range m { + cm[k] = CopySliceNodeScoreMeta(v) + } + return cm +} + +func CopySliceNodeScoreMeta(s []*NodeScoreMeta) []*NodeScoreMeta { + l := len(s) + if l == 0 { + return nil + } + + c := make([]*NodeScoreMeta, l) + for i, v := range s { + c[i] = v.Copy() + } + return c +} + // VaultPoliciesSet takes the structure returned by VaultPolicies and returns // the set of required policies func VaultPoliciesSet(policies map[string]map[string]*Vault) []string { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 613478d66..906fccaf3 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -24,6 +24,9 @@ import ( "golang.org/x/crypto/blake2b" + "container/heap" + "math" + "github.com/gorhill/cronexpr" "github.com/hashicorp/consul/api" multierror "github.com/hashicorp/go-multierror" @@ -31,12 +34,11 @@ import ( "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/args" + "github.com/hashicorp/nomad/helper/lib" "github.com/hashicorp/nomad/helper/uuid" "github.com/mitchellh/copystructure" "github.com/ugorji/go/codec" - "math" - hcodec "github.com/hashicorp/go-msgpack/codec" ) @@ -133,6 +135,10 @@ const ( // MaxRetainedNodeEvents is the maximum number of node events that will be // retained for a single node MaxRetainedNodeEvents = 10 + + // MaxRetainedNodeScores is the number of top scoring nodes for which we + // retain scoring metadata + MaxRetainedNodeScores = 5 ) // Context defines the scope in which a search for Nomad object operates, and @@ -6489,8 +6495,16 @@ type AllocMetric struct { // Scores is the scores of the final few nodes remaining // for placement. The top score is typically selected. + // Deprecated: Replaced by ScoreMetaData in Nomad 0.9 Scores map[string]float64 + // ScoreMetaData is a slice of top scoring nodes per scorer + ScoreMetaData map[string][]*NodeScoreMeta + + // topScores is used to maintain a heap of the top K scoring nodes per + // scorer type + topScores map[string]*lib.ScoreHeap + // AllocationTime is a measure of how long the allocation // attempt took. This can affect performance and SLAs. AllocationTime time.Duration @@ -6515,6 +6529,7 @@ func (a *AllocMetric) Copy() *AllocMetric { na.DimensionExhausted = helper.CopyMapStringInt(na.DimensionExhausted) na.QuotaExhausted = helper.CopySliceString(na.QuotaExhausted) na.Scores = helper.CopyMapStringFloat64(na.Scores) + na.ScoreMetaData = CopyNodeScoreMetaMap(na.ScoreMetaData) return na } @@ -6562,12 +6577,67 @@ func (a *AllocMetric) ExhaustQuota(dimensions []string) { a.QuotaExhausted = append(a.QuotaExhausted, dimensions...) } +// ScoreNode is used to gather top K scoring nodes in a heap func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) { - if a.Scores == nil { - a.Scores = make(map[string]float64) + if a.topScores == nil { + a.topScores = make(map[string]*lib.ScoreHeap) } - key := fmt.Sprintf("%s.%s", node.ID, name) - a.Scores[key] = score + scorerHeap, ok := a.topScores[name] + if !ok { + scorerHeap = lib.NewScoreHeap(MaxRetainedNodeScores) + a.topScores[name] = scorerHeap + } + heap.Push(scorerHeap, &lib.HeapItem{ + Value: node.ID, + Score: score, + }) +} + +// PopulateScoreMetaData populates a map of scorer to scoring metadata +// The map is populated by popping elements from a heap of top K scores +// maintained per scorer +func (a *AllocMetric) PopulateScoreMetaData() { + if a.ScoreMetaData == nil { + a.ScoreMetaData = make(map[string][]*NodeScoreMeta) + } + if a.topScores != nil && len(a.ScoreMetaData) == 0 { + for scorer, scoreHeap := range a.topScores { + scoreMeta := make([]*NodeScoreMeta, scoreHeap.Len()) + i := scoreHeap.Len() - 1 + // Pop from the heap and store in reverse to get top K + // scoring nodes in descending order + for scoreHeap.Len() > 0 { + item := heap.Pop(scoreHeap).(*lib.HeapItem) + scoreMeta[i] = &NodeScoreMeta{ + NodeID: item.Value, + Score: item.Score, + } + i-- + } + a.ScoreMetaData[scorer] = scoreMeta + } + } +} + +// NodeScoreMeta captures scoring meta data derived from +// different scoring factors. The meta data from top K scores +// are displayed in the CLI +type NodeScoreMeta struct { + NodeID string + Score float64 +} + +func (s *NodeScoreMeta) Copy() *NodeScoreMeta { + if s == nil { + return nil + } + ns := new(NodeScoreMeta) + *ns = *s + return ns +} + +func (s *NodeScoreMeta) String() string { + return fmt.Sprintf("%s %v", s.NodeID, s.Score) } // AllocDeploymentStatus captures the status of the allocation as part of the diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 35909dedc..59c1233b6 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -470,6 +470,9 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Store the available nodes by datacenter s.ctx.Metrics().NodesAvailable = byDC + // Compute top K scoring node metadata + s.ctx.Metrics().PopulateScoreMetaData() + // Set fields based on if we found an allocation option if option != nil { // Create an allocation for this diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index c94aa9208..453ed9f9e 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -295,6 +295,7 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { stack.SetJob(job) selectOptions := &SelectOptions{} node, _ := stack.Select(job.TaskGroups[0], selectOptions) + ctx.Metrics().PopulateScoreMetaData() if node == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -311,7 +312,7 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { t.Fatalf("bad: %#v", met) } // Expect two metrics, one with bin packing score and one with normalized score - if len(met.Scores) != 2 { + if len(met.ScoreMetaData) != 2 { t.Fatalf("bad: %#v", met) } } @@ -517,6 +518,7 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) { selectOptions := &SelectOptions{} node, _ := stack.Select(job.TaskGroups[0], selectOptions) + ctx.Metrics().PopulateScoreMetaData() if node == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -533,7 +535,7 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) { t.Fatalf("bad: %#v", met) } // Should have two scores, one from bin packing and one from normalization - if len(met.Scores) != 2 { + if len(met.ScoreMetaData) != 2 { t.Fatalf("bad: %#v", met) } }