From 41b853b44d88dcacbd9717b570f902e46445149e Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 31 Aug 2021 16:51:30 -0400 Subject: [PATCH] scheduler: warn when system jobs cannot place an alloc When a system or sysbatch job specify constraints that none of the current nodes meet, report a warning to the user. Also, for sysbatch job, mark the job as dead as a result. A sample run would look like: ``` $ nomad job run ./example.nomad ==> 2021-08-31T16:57:35-04:00: Monitoring evaluation "b48e8882" 2021-08-31T16:57:35-04:00: Evaluation triggered by job "example" ==> 2021-08-31T16:57:36-04:00: Monitoring evaluation "b48e8882" 2021-08-31T16:57:36-04:00: Evaluation status changed: "pending" -> "complete" ==> 2021-08-31T16:57:36-04:00: Evaluation "b48e8882" finished with status "complete" but failed to place all allocations: 2021-08-31T16:57:36-04:00: Task Group "cache" (failed to place 1 allocation): * Constraint "${meta.tag} = bar": 2 nodes excluded by filter * Constraint "${attr.kernel.name} = linux": 1 nodes excluded by filter $ nomad job status example ID = example Name = example Submit Date = 2021-08-31T16:57:35-04:00 Type = sysbatch Priority = 50 Datacenters = dc1 Namespace = default Status = dead Periodic = false Parameterized = false Summary Task Group Queued Starting Running Failed Complete Lost cache 0 0 0 0 0 0 Allocations No allocations placed ``` --- scheduler/scheduler_sysbatch_test.go | 67 +++++++++++++++++++++++++--- scheduler/scheduler_system.go | 47 ++++++++++++++++--- 2 files changed, 102 insertions(+), 12 deletions(-) diff --git a/scheduler/scheduler_sysbatch_test.go b/scheduler/scheduler_sysbatch_test.go index a8feeb8d2..aba43df98 100644 --- a/scheduler/scheduler_sysbatch_test.go +++ b/scheduler/scheduler_sysbatch_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/kr/pretty" "github.com/stretchr/testify/require" ) @@ -758,13 +759,17 @@ func TestSysBatch_RetryLimit(t *testing.T) { func TestSysBatch_Queued_With_Constraints(t *testing.T) { h := NewHarness(t) - // Register a node - node := mock.Node() - node.Attributes["kernel.name"] = "darwin" - require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + nodes := createNodes(t, h, 3) // Generate a sysbatch job which can't be placed on the node job := mock.SystemBatchJob() + job.Constraints = []*structs.Constraint{ + { + LTarget: "${attr.kernel.name}", + RTarget: "not_existing_os", + Operand: "=", + }, + } require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)) // Create a mock evaluation to deal with the node update @@ -772,9 +777,8 @@ func TestSysBatch_Queued_With_Constraints(t *testing.T) { Namespace: structs.DefaultNamespace, ID: uuid.Generate(), Priority: 50, - TriggeredBy: structs.EvalTriggerNodeUpdate, + TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, - NodeID: node.ID, Status: structs.EvalStatusPending, } require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) @@ -787,6 +791,57 @@ func TestSysBatch_Queued_With_Constraints(t *testing.T) { val, ok := h.Evals[0].QueuedAllocations["pinger"] require.True(t, ok) require.Zero(t, val) + + failedTGAllocs := h.Evals[0].FailedTGAllocs + pretty.Println(failedTGAllocs) + require.NotNil(t, failedTGAllocs) + require.Contains(t, failedTGAllocs, "pinger") + require.Equal(t, len(nodes), failedTGAllocs["pinger"].NodesEvaluated) + require.Equal(t, len(nodes), failedTGAllocs["pinger"].NodesFiltered) + +} + +func TestSysBatch_Queued_With_Constraints_PartialMatch(t *testing.T) { + h := NewHarness(t) + + // linux machines + linux := createNodes(t, h, 3) + for i := 0; i < 3; i++ { + node := mock.Node() + node.Attributes["kernel.name"] = "darwin" + node.ComputeClass() + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + // Generate a sysbatch job which can't be placed on the node + job := mock.SystemBatchJob() + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)) + + // Create a mock evaluation to deal with the node update + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSysBatchScheduler, eval) + require.NoError(t, err) + + foundNodes := map[string]bool{} + for n := range h.Plans[0].NodeAllocation { + foundNodes[n] = true + } + expected := map[string]bool{} + for _, n := range linux { + expected[n.ID] = true + } + + require.Equal(t, expected, foundNodes) } // This test ensures that the scheduler correctly ignores ineligible diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 304e86a8b..296c75f43 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -279,6 +279,24 @@ func (s *SystemScheduler) computeJobAllocs() error { return s.computePlacements(diff.place) } +func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric { + if acc == nil { + v := *curr + return &v + } + + acc.NodesEvaluated += curr.NodesEvaluated + acc.NodesFiltered += curr.NodesFiltered + for k, v := range curr.ClassFiltered { + acc.ClassFiltered[k] += v + } + for k, v := range curr.ConstraintFiltered { + acc.ConstraintFiltered[k] += v + } + acc.AllocationTime += curr.AllocationTime + return acc +} + // computePlacements computes placements for allocations func (s *SystemScheduler) computePlacements(place []allocTuple) error { nodeByID := make(map[string]*structs.Node, len(s.nodes)) @@ -286,8 +304,13 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { nodeByID[node.ID] = node } + // track node filtering, to only report an error if all nodes have been filtered + var filteredMetrics map[string]*structs.AllocMetric + nodes := make([]*structs.Node, 1) for _, missing := range place { + tgName := missing.TaskGroup.Name + node, ok := nodeByID[missing.Alloc.NodeID] if !ok { s.logger.Debug("could not find node %q", missing.Alloc.NodeID) @@ -309,13 +332,25 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { // couldn't create an allocation then decrementing queued for that // task group if s.ctx.metrics.NodesFiltered > 0 { - s.queuedAllocs[missing.TaskGroup.Name] -= 1 + queued := s.queuedAllocs[tgName] - 1 + s.queuedAllocs[tgName] = queued + if filteredMetrics == nil { + filteredMetrics = map[string]*structs.AllocMetric{} + } + filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics()) + + if queued <= 0 { + if s.failedTGAllocs == nil { + s.failedTGAllocs = make(map[string]*structs.AllocMetric) + } + s.failedTGAllocs[tgName] = filteredMetrics[tgName] + } // If we are annotating the plan, then decrement the desired // placements based on whether the node meets the constraints if s.eval.AnnotatePlan && s.plan.Annotations != nil && s.plan.Annotations.DesiredTGUpdates != nil { - desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name] + desired := s.plan.Annotations.DesiredTGUpdates[tgName] desired.Place -= 1 } @@ -324,7 +359,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { } // Check if this task group has already failed, reported to the user as a count - if metric, ok := s.failedTGAllocs[missing.TaskGroup.Name]; ok { + if metric, ok := s.failedTGAllocs[tgName]; ok { metric.CoalescedFailures += 1 metric.ExhaustResources(missing.TaskGroup) continue @@ -345,7 +380,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { s.ctx.Metrics().ExhaustResources(missing.TaskGroup) // Actual failure to start this task on this candidate node, report it individually - s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics() + s.failedTGAllocs[tgName] = s.ctx.Metrics() s.addBlocked(node) continue @@ -378,7 +413,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { EvalID: s.eval.ID, Name: missing.Name, JobID: s.job.ID, - TaskGroup: missing.TaskGroup.Name, + TaskGroup: tgName, Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, NodeName: option.Node.Name, @@ -410,7 +445,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { if s.eval.AnnotatePlan && s.plan.Annotations != nil { s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub(nil)) if s.plan.Annotations.DesiredTGUpdates != nil { - desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name] + desired := s.plan.Annotations.DesiredTGUpdates[tgName] desired.Preemptions += 1 } }