mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
scheduler: warn when system jobs cannot place an alloc
When a system or sysbatch job specify constraints that none of the
current nodes meet, report a warning to the user.
Also, for sysbatch job, mark the job as dead as a result.
A sample run would look like:
```
$ nomad job run ./example.nomad
==> 2021-08-31T16:57:35-04:00: Monitoring evaluation "b48e8882"
2021-08-31T16:57:35-04:00: Evaluation triggered by job "example"
==> 2021-08-31T16:57:36-04:00: Monitoring evaluation "b48e8882"
2021-08-31T16:57:36-04:00: Evaluation status changed: "pending" -> "complete"
==> 2021-08-31T16:57:36-04:00: Evaluation "b48e8882" finished with status "complete" but failed to place all allocations:
2021-08-31T16:57:36-04:00: Task Group "cache" (failed to place 1 allocation):
* Constraint "${meta.tag} = bar": 2 nodes excluded by filter
* Constraint "${attr.kernel.name} = linux": 1 nodes excluded by filter
$ nomad job status example
ID = example
Name = example
Submit Date = 2021-08-31T16:57:35-04:00
Type = sysbatch
Priority = 50
Datacenters = dc1
Namespace = default
Status = dead
Periodic = false
Parameterized = false
Summary
Task Group Queued Starting Running Failed Complete Lost
cache 0 0 0 0 0 0
Allocations
No allocations placed
```
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -758,13 +759,17 @@ func TestSysBatch_RetryLimit(t *testing.T) {
|
||||
func TestSysBatch_Queued_With_Constraints(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
// Register a node
|
||||
node := mock.Node()
|
||||
node.Attributes["kernel.name"] = "darwin"
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
nodes := createNodes(t, h, 3)
|
||||
|
||||
// Generate a sysbatch job which can't be placed on the node
|
||||
job := mock.SystemBatchJob()
|
||||
job.Constraints = []*structs.Constraint{
|
||||
{
|
||||
LTarget: "${attr.kernel.name}",
|
||||
RTarget: "not_existing_os",
|
||||
Operand: "=",
|
||||
},
|
||||
}
|
||||
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))
|
||||
|
||||
// Create a mock evaluation to deal with the node update
|
||||
@@ -772,9 +777,8 @@ func TestSysBatch_Queued_With_Constraints(t *testing.T) {
|
||||
Namespace: structs.DefaultNamespace,
|
||||
ID: uuid.Generate(),
|
||||
Priority: 50,
|
||||
TriggeredBy: structs.EvalTriggerNodeUpdate,
|
||||
TriggeredBy: structs.EvalTriggerJobRegister,
|
||||
JobID: job.ID,
|
||||
NodeID: node.ID,
|
||||
Status: structs.EvalStatusPending,
|
||||
}
|
||||
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
|
||||
@@ -787,6 +791,57 @@ func TestSysBatch_Queued_With_Constraints(t *testing.T) {
|
||||
val, ok := h.Evals[0].QueuedAllocations["pinger"]
|
||||
require.True(t, ok)
|
||||
require.Zero(t, val)
|
||||
|
||||
failedTGAllocs := h.Evals[0].FailedTGAllocs
|
||||
pretty.Println(failedTGAllocs)
|
||||
require.NotNil(t, failedTGAllocs)
|
||||
require.Contains(t, failedTGAllocs, "pinger")
|
||||
require.Equal(t, len(nodes), failedTGAllocs["pinger"].NodesEvaluated)
|
||||
require.Equal(t, len(nodes), failedTGAllocs["pinger"].NodesFiltered)
|
||||
|
||||
}
|
||||
|
||||
func TestSysBatch_Queued_With_Constraints_PartialMatch(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
// linux machines
|
||||
linux := createNodes(t, h, 3)
|
||||
for i := 0; i < 3; i++ {
|
||||
node := mock.Node()
|
||||
node.Attributes["kernel.name"] = "darwin"
|
||||
node.ComputeClass()
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
}
|
||||
|
||||
// Generate a sysbatch job which can't be placed on the node
|
||||
job := mock.SystemBatchJob()
|
||||
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))
|
||||
|
||||
// Create a mock evaluation to deal with the node update
|
||||
eval := &structs.Evaluation{
|
||||
Namespace: structs.DefaultNamespace,
|
||||
ID: uuid.Generate(),
|
||||
Priority: 50,
|
||||
TriggeredBy: structs.EvalTriggerJobRegister,
|
||||
JobID: job.ID,
|
||||
Status: structs.EvalStatusPending,
|
||||
}
|
||||
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
|
||||
|
||||
// Process the evaluation
|
||||
err := h.Process(NewSysBatchScheduler, eval)
|
||||
require.NoError(t, err)
|
||||
|
||||
foundNodes := map[string]bool{}
|
||||
for n := range h.Plans[0].NodeAllocation {
|
||||
foundNodes[n] = true
|
||||
}
|
||||
expected := map[string]bool{}
|
||||
for _, n := range linux {
|
||||
expected[n.ID] = true
|
||||
}
|
||||
|
||||
require.Equal(t, expected, foundNodes)
|
||||
}
|
||||
|
||||
// This test ensures that the scheduler correctly ignores ineligible
|
||||
|
||||
@@ -279,6 +279,24 @@ func (s *SystemScheduler) computeJobAllocs() error {
|
||||
return s.computePlacements(diff.place)
|
||||
}
|
||||
|
||||
func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric {
|
||||
if acc == nil {
|
||||
v := *curr
|
||||
return &v
|
||||
}
|
||||
|
||||
acc.NodesEvaluated += curr.NodesEvaluated
|
||||
acc.NodesFiltered += curr.NodesFiltered
|
||||
for k, v := range curr.ClassFiltered {
|
||||
acc.ClassFiltered[k] += v
|
||||
}
|
||||
for k, v := range curr.ConstraintFiltered {
|
||||
acc.ConstraintFiltered[k] += v
|
||||
}
|
||||
acc.AllocationTime += curr.AllocationTime
|
||||
return acc
|
||||
}
|
||||
|
||||
// computePlacements computes placements for allocations
|
||||
func (s *SystemScheduler) computePlacements(place []allocTuple) error {
|
||||
nodeByID := make(map[string]*structs.Node, len(s.nodes))
|
||||
@@ -286,8 +304,13 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
|
||||
nodeByID[node.ID] = node
|
||||
}
|
||||
|
||||
// track node filtering, to only report an error if all nodes have been filtered
|
||||
var filteredMetrics map[string]*structs.AllocMetric
|
||||
|
||||
nodes := make([]*structs.Node, 1)
|
||||
for _, missing := range place {
|
||||
tgName := missing.TaskGroup.Name
|
||||
|
||||
node, ok := nodeByID[missing.Alloc.NodeID]
|
||||
if !ok {
|
||||
s.logger.Debug("could not find node %q", missing.Alloc.NodeID)
|
||||
@@ -309,13 +332,25 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
|
||||
// couldn't create an allocation then decrementing queued for that
|
||||
// task group
|
||||
if s.ctx.metrics.NodesFiltered > 0 {
|
||||
s.queuedAllocs[missing.TaskGroup.Name] -= 1
|
||||
queued := s.queuedAllocs[tgName] - 1
|
||||
s.queuedAllocs[tgName] = queued
|
||||
if filteredMetrics == nil {
|
||||
filteredMetrics = map[string]*structs.AllocMetric{}
|
||||
}
|
||||
filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics())
|
||||
|
||||
if queued <= 0 {
|
||||
if s.failedTGAllocs == nil {
|
||||
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
|
||||
}
|
||||
s.failedTGAllocs[tgName] = filteredMetrics[tgName]
|
||||
}
|
||||
|
||||
// If we are annotating the plan, then decrement the desired
|
||||
// placements based on whether the node meets the constraints
|
||||
if s.eval.AnnotatePlan && s.plan.Annotations != nil &&
|
||||
s.plan.Annotations.DesiredTGUpdates != nil {
|
||||
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name]
|
||||
desired := s.plan.Annotations.DesiredTGUpdates[tgName]
|
||||
desired.Place -= 1
|
||||
}
|
||||
|
||||
@@ -324,7 +359,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
|
||||
}
|
||||
|
||||
// Check if this task group has already failed, reported to the user as a count
|
||||
if metric, ok := s.failedTGAllocs[missing.TaskGroup.Name]; ok {
|
||||
if metric, ok := s.failedTGAllocs[tgName]; ok {
|
||||
metric.CoalescedFailures += 1
|
||||
metric.ExhaustResources(missing.TaskGroup)
|
||||
continue
|
||||
@@ -345,7 +380,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
|
||||
s.ctx.Metrics().ExhaustResources(missing.TaskGroup)
|
||||
|
||||
// Actual failure to start this task on this candidate node, report it individually
|
||||
s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
|
||||
s.failedTGAllocs[tgName] = s.ctx.Metrics()
|
||||
s.addBlocked(node)
|
||||
|
||||
continue
|
||||
@@ -378,7 +413,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
|
||||
EvalID: s.eval.ID,
|
||||
Name: missing.Name,
|
||||
JobID: s.job.ID,
|
||||
TaskGroup: missing.TaskGroup.Name,
|
||||
TaskGroup: tgName,
|
||||
Metrics: s.ctx.Metrics(),
|
||||
NodeID: option.Node.ID,
|
||||
NodeName: option.Node.Name,
|
||||
@@ -410,7 +445,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
|
||||
if s.eval.AnnotatePlan && s.plan.Annotations != nil {
|
||||
s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub(nil))
|
||||
if s.plan.Annotations.DesiredTGUpdates != nil {
|
||||
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name]
|
||||
desired := s.plan.Annotations.DesiredTGUpdates[tgName]
|
||||
desired.Preemptions += 1
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user