scheduler: retain eval metrics on port collision (#19933)

When an allocation can't be placed because of a port collision the
resulting blocked eval is expected to have a metric reporting the port
that caused the conflict, but this metrics was not being emitted when
preemption was enabled.
This commit is contained in:
Luiz Aoqui
2024-02-09 18:18:48 -05:00
committed by GitHub
parent b52a44717e
commit 4a8b01430b
4 changed files with 146 additions and 0 deletions

3
.changelog/19933.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug that caused blocked evaluations due to port conflict to not have a reason explaining why the evaluation was blocked
```

View File

@@ -2489,6 +2489,90 @@ func TestJobRegister_ACL_RejectedBySchedulerConfig(t *testing.T) {
}
}
func TestJobEndpoint_Register_PortCollistion(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
name string
configFn func(c *Config)
}{
{
name: "no preemption",
configFn: func(c *Config) {
c.DefaultSchedulerConfig = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{
ServiceSchedulerEnabled: false,
},
}
},
},
{
name: "with preemption",
configFn: func(c *Config) {
c.DefaultSchedulerConfig = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{
ServiceSchedulerEnabled: true,
},
}
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s1, cleanupS1 := TestServer(t, tc.configFn)
defer cleanupS1()
state := s1.fsm.State()
// Create test node.
node := mock.Node()
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node))
// Create test job with a static port.
job := mock.Job()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Networks[0].DynamicPorts = nil
job.TaskGroups[0].Networks[0].ReservedPorts = []structs.Port{
{Label: "http", Value: 80},
}
job.TaskGroups[0].Tasks[0].Services = nil
testutil.RegisterJob(t, s1.RPC, job)
testutil.WaitForJobAllocStatus(t, s1.RPC, job, map[string]int{
structs.AllocClientStatusPending: 1,
})
// Register second job with port conflict.
job2 := job.Copy()
job2.ID = fmt.Sprintf("conflict-%s", uuid.Generate())
job2.Name = job2.ID
testutil.RegisterJob(t, s1.RPC, job2)
// Wait for job registration eval to complete.
evals := testutil.WaitForJobEvalStatus(t, s1.RPC, job2, map[string]int{
structs.EvalStatusComplete: 1,
structs.EvalStatusBlocked: 1,
})
var blockedEval *structs.Evaluation
for _, e := range evals {
if e.Status == structs.EvalStatusBlocked {
blockedEval = e
break
}
}
// Ensure blocked eval is properly annotated.
must.MapLen(t, 1, blockedEval.FailedTGAllocs)
must.NotNil(t, blockedEval.FailedTGAllocs["web"])
must.Eq(t, map[string]int{
"network: reserved port collision http=80": 1,
}, blockedEval.FailedTGAllocs["web"].DimensionExhausted)
})
}
}
func TestJobEndpoint_Revert(t *testing.T) {
ci.Parallel(t)

View File

@@ -322,6 +322,8 @@ OUTER:
netPreemptions := preemptor.PreemptForNetwork(ask, netIdx)
if netPreemptions == nil {
iter.ctx.Logger().Named("binpack").Debug("preemption not possible ", "network_resource", ask)
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("network: %s", err))
netIdx.Release()
continue OUTER
}
@@ -339,6 +341,8 @@ OUTER:
offer, err = netIdx.AssignPorts(ask)
if err != nil {
iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create network offer after considering preemption", "error", err)
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("network: %s", err))
netIdx.Release()
continue OUTER
}
@@ -392,6 +396,8 @@ OUTER:
netPreemptions := preemptor.PreemptForNetwork(ask, netIdx)
if netPreemptions == nil {
iter.ctx.Logger().Named("binpack").Debug("preemption not possible ", "network_resource", ask)
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("network: %s", err))
netIdx.Release()
continue OUTER
}
@@ -409,6 +415,8 @@ OUTER:
offer, err = netIdx.AssignTaskNetwork(ask)
if offer == nil {
iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create network offer after considering preemption", "error", err)
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("network: %s", err))
netIdx.Release()
continue OUTER
}

View File

@@ -377,6 +377,57 @@ func WaitForJobAllocStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, a
return allocs
}
// WaitforJobEvalStatus blocks until the job's evals match the status described
// in the map of <Eval.Status>: <count>.
func WaitForJobEvalStatus(t testing.TB, rpc rpcFn, job *structs.Job, evalStatus map[string]int) []*structs.Evaluation {
return WaitForJobEvalStatusWithToken(t, rpc, job, evalStatus, "")
}
// WaitForJobEvalStatusWithToken is the same as WaitforJobEvalStatus with ACL
// enabled.
func WaitForJobEvalStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, evalStatus map[string]int, token string) []*structs.Evaluation {
var evals []*structs.Evaluation
errorFunc := func() error {
req := &structs.JobSpecificRequest{
JobID: job.ID,
QueryOptions: structs.QueryOptions{
AuthToken: token,
Namespace: job.Namespace,
Region: job.Region,
},
}
var resp structs.JobEvaluationsResponse
err := rpc("Job.Evaluations", req, &resp)
if err != nil {
return fmt.Errorf("failed to call Job.Evaluations RPC: %w", err)
}
got := make(map[string]int)
for _, eval := range resp.Evaluations {
got[eval.Status]++
}
if diff := cmp.Diff(evalStatus, got); diff != "" {
return fmt.Errorf("eval status mismatch (-want +got):\n%s", diff)
}
evals = resp.Evaluations
return nil
}
must.Wait(t,
wait.InitialSuccess(
wait.ErrorFunc(errorFunc),
wait.Timeout(time.Duration(TestMultiplier())*time.Second),
wait.Gap(10*time.Millisecond),
),
must.Sprintf("failed to wait for job %s eval status", job.ID),
)
return evals
}
// WaitForFiles blocks until all the files in the slice are present
func WaitForFiles(t testing.TB, files []string) {
WaitForResult(func() (bool, error) {