mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
tests: update disconnected client scheduler tests to avoid blocking (#20615)
While working on #20462, I discovered that some of the scheduler tests for disconnected clients making long blocking queries. The tests used `testutil.WaitForResult` to wait for an evaluation to be written to the state store. The evaluation was never written, but the tests were not correctly returning an error for an empty query. This resulted in the tests blocking for 5s and then continuing anyways. In practice, the evaluation is never written to the state store as part of the test harness `Process` method, so this test assertion was meaningless. Remove the broken assertion from the two top-level tests that used it, and upgrade these tests to use `shoenig/test` in the process. This will save us ~50s per test run.
This commit is contained in:
@@ -18,7 +18,6 @@ import (
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/shoenig/test/must"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -3644,6 +3643,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
jobSpecFn func(*structs.Job)
|
||||
when time.Time
|
||||
rescheduled bool
|
||||
@@ -3651,13 +3651,15 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
|
||||
// Test using stop_after_client_disconnect, remove after its deprecated in favor
|
||||
// of Disconnect.StopOnClientAfter introduced in 1.8.0.
|
||||
{
|
||||
rescheduled: true,
|
||||
name: "legacy no stop_after_client_disconnect with reschedule",
|
||||
jobSpecFn: func(job *structs.Job) {
|
||||
job.TaskGroups[0].Count = 1
|
||||
job.TaskGroups[0].StopAfterClientDisconnect = nil
|
||||
},
|
||||
rescheduled: true,
|
||||
},
|
||||
{
|
||||
name: "legacy stop_after_client_disconnect without reschedule",
|
||||
jobSpecFn: func(job *structs.Job) {
|
||||
job.TaskGroups[0].Count = 1
|
||||
job.TaskGroups[0].StopAfterClientDisconnect = pointer.Of(1 * time.Second)
|
||||
@@ -3665,6 +3667,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
|
||||
rescheduled: false,
|
||||
},
|
||||
{
|
||||
name: "legacy stop_after_client_disconnect with reschedule",
|
||||
jobSpecFn: func(job *structs.Job) {
|
||||
job.TaskGroups[0].Count = 1
|
||||
job.TaskGroups[0].StopAfterClientDisconnect = pointer.Of(1 * time.Second)
|
||||
@@ -3673,6 +3676,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
|
||||
rescheduled: true,
|
||||
},
|
||||
{
|
||||
name: "legacy stop_after_client_disconnect reschedule later",
|
||||
jobSpecFn: func(job *structs.Job) {
|
||||
job.TaskGroups[0].Count = 1
|
||||
job.TaskGroups[0].StopAfterClientDisconnect = pointer.Of(1 * time.Second)
|
||||
@@ -3682,15 +3686,17 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
|
||||
},
|
||||
// Tests using the new disconnect block
|
||||
{
|
||||
rescheduled: true,
|
||||
name: "no StopOnClientAfter with reschedule",
|
||||
jobSpecFn: func(job *structs.Job) {
|
||||
job.TaskGroups[0].Count = 1
|
||||
job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{
|
||||
StopOnClientAfter: nil,
|
||||
}
|
||||
},
|
||||
rescheduled: true,
|
||||
},
|
||||
{
|
||||
name: "StopOnClientAfter without reschedule",
|
||||
jobSpecFn: func(job *structs.Job) {
|
||||
job.TaskGroups[0].Count = 1
|
||||
job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{
|
||||
@@ -3700,6 +3706,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
|
||||
rescheduled: false,
|
||||
},
|
||||
{
|
||||
name: "StopOnClientAfter with reschedule",
|
||||
jobSpecFn: func(job *structs.Job) {
|
||||
job.TaskGroups[0].Count = 1
|
||||
job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{
|
||||
@@ -3710,6 +3717,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
|
||||
rescheduled: true,
|
||||
},
|
||||
{
|
||||
name: "StopOnClientAfter reschedule later",
|
||||
jobSpecFn: func(job *structs.Job) {
|
||||
job.TaskGroups[0].Count = 1
|
||||
job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{
|
||||
@@ -3722,18 +3730,18 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tc := range cases {
|
||||
t.Run(fmt.Sprintf(""), func(t *testing.T) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
// Node, which is down
|
||||
node := mock.Node()
|
||||
node.Status = structs.NodeStatusDown
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
job := mock.Job()
|
||||
|
||||
tc.jobSpecFn(job)
|
||||
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
|
||||
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
|
||||
|
||||
// Alloc for the running group
|
||||
alloc := mock.Alloc()
|
||||
@@ -3751,7 +3759,7 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
|
||||
}}
|
||||
}
|
||||
allocs := []*structs.Allocation{alloc}
|
||||
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
|
||||
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
|
||||
|
||||
// Create a mock evaluation to deal with drain
|
||||
evals := []*structs.Evaluation{{
|
||||
@@ -3764,93 +3772,69 @@ func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
|
||||
Status: structs.EvalStatusPending,
|
||||
}}
|
||||
eval := evals[0]
|
||||
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
|
||||
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
|
||||
|
||||
// Process the evaluation
|
||||
err := h.Process(NewServiceScheduler, eval)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, h.Evals[0].Status, structs.EvalStatusComplete)
|
||||
require.Len(t, h.Plans, 1, "plan")
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, h.Evals[0].Status, structs.EvalStatusComplete)
|
||||
must.Len(t, 1, h.Plans, must.Sprint("expected a plan"))
|
||||
|
||||
// One followup eval created, either delayed or blocked
|
||||
require.Len(t, h.CreateEvals, 1)
|
||||
must.Len(t, 1, h.CreateEvals)
|
||||
e := h.CreateEvals[0]
|
||||
require.Equal(t, eval.ID, e.PreviousEval)
|
||||
must.Eq(t, eval.ID, e.PreviousEval)
|
||||
|
||||
if tc.rescheduled {
|
||||
require.Equal(t, "blocked", e.Status)
|
||||
must.Eq(t, "blocked", e.Status)
|
||||
} else {
|
||||
require.Equal(t, "pending", e.Status)
|
||||
require.NotEmpty(t, e.WaitUntil)
|
||||
must.Eq(t, "pending", e.Status)
|
||||
must.NotEq(t, time.Time{}, e.WaitUntil)
|
||||
}
|
||||
|
||||
// This eval is still being inserted in the state store
|
||||
ws := memdb.NewWatchSet()
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
found, err := h.State.EvalByID(ws, e.ID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if found == nil {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
alloc, err = h.State.AllocByID(ws, alloc.ID)
|
||||
require.NoError(t, err)
|
||||
alloc, err = h.State.AllocByID(nil, alloc.ID)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Allocations have been transitioned to lost
|
||||
require.Equal(t, structs.AllocDesiredStatusStop, alloc.DesiredStatus)
|
||||
require.Equal(t, structs.AllocClientStatusLost, alloc.ClientStatus)
|
||||
must.Eq(t, structs.AllocDesiredStatusStop, alloc.DesiredStatus)
|
||||
must.Eq(t, structs.AllocClientStatusLost, alloc.ClientStatus)
|
||||
// At least 1, 2 if we manually set the tc.when
|
||||
require.NotEmpty(t, alloc.AllocStates)
|
||||
must.SliceNotEmpty(t, alloc.AllocStates)
|
||||
|
||||
if tc.rescheduled {
|
||||
// Register a new node, leave it up, process the followup eval
|
||||
node = mock.Node()
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
require.NoError(t, h.Process(NewServiceScheduler, eval))
|
||||
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
must.NoError(t, h.Process(NewServiceScheduler, eval))
|
||||
|
||||
as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return len(as) == 2, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
as, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
|
||||
must.NoError(t, err)
|
||||
must.Len(t, 2, as)
|
||||
|
||||
a2 := as[0]
|
||||
if a2.ID == alloc.ID {
|
||||
a2 = as[1]
|
||||
}
|
||||
|
||||
require.Equal(t, structs.AllocClientStatusPending, a2.ClientStatus)
|
||||
require.Equal(t, structs.AllocDesiredStatusRun, a2.DesiredStatus)
|
||||
require.Equal(t, node.ID, a2.NodeID)
|
||||
must.Eq(t, structs.AllocClientStatusPending, a2.ClientStatus)
|
||||
must.Eq(t, structs.AllocDesiredStatusRun, a2.DesiredStatus)
|
||||
must.Eq(t, node.ID, a2.NodeID)
|
||||
|
||||
// No blocked evals
|
||||
require.Empty(t, h.ReblockEvals)
|
||||
require.Len(t, h.CreateEvals, 1)
|
||||
require.Equal(t, h.CreateEvals[0].ID, e.ID)
|
||||
must.SliceEmpty(t, h.ReblockEvals)
|
||||
must.Len(t, 1, h.CreateEvals)
|
||||
must.Eq(t, h.CreateEvals[0].ID, e.ID)
|
||||
} else {
|
||||
// No new alloc was created
|
||||
as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
|
||||
require.NoError(t, err)
|
||||
as, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
|
||||
must.NoError(t, err)
|
||||
|
||||
require.Len(t, as, 1)
|
||||
must.Len(t, 1, as)
|
||||
old := as[0]
|
||||
|
||||
require.Equal(t, alloc.ID, old.ID)
|
||||
require.Equal(t, structs.AllocClientStatusLost, old.ClientStatus)
|
||||
require.Equal(t, structs.AllocDesiredStatusStop, old.DesiredStatus)
|
||||
must.Eq(t, alloc.ID, old.ID)
|
||||
must.Eq(t, structs.AllocClientStatusLost, old.ClientStatus)
|
||||
must.Eq(t, structs.AllocDesiredStatusStop, old.DesiredStatus)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -7307,14 +7291,14 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
|
||||
|
||||
job := version.jobSpec(maxClientDisconnect)
|
||||
job.TaskGroups[0].Count = count
|
||||
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
|
||||
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
|
||||
|
||||
disconnectedNode, job, unknownAllocs := initNodeAndAllocs(t, h, job,
|
||||
structs.NodeStatusReady, structs.AllocClientStatusRunning)
|
||||
|
||||
// Now disconnect the node
|
||||
disconnectedNode.Status = structs.NodeStatusDisconnected
|
||||
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), disconnectedNode))
|
||||
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), disconnectedNode))
|
||||
|
||||
// Create an evaluation triggered by the disconnect
|
||||
evals := []*structs.Evaluation{{
|
||||
@@ -7328,88 +7312,49 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
|
||||
}}
|
||||
|
||||
nodeStatusUpdateEval := evals[0]
|
||||
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
|
||||
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
|
||||
|
||||
// Process the evaluation
|
||||
err := h.Process(NewServiceScheduler, nodeStatusUpdateEval)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status)
|
||||
require.Len(t, h.Plans, 1, "plan")
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, structs.EvalStatusComplete, h.Evals[0].Status)
|
||||
must.Len(t, 1, h.Plans, must.Sprint("expected a plan"))
|
||||
|
||||
// Two followup delayed eval created
|
||||
require.Len(t, h.CreateEvals, 2)
|
||||
must.Len(t, 2, h.CreateEvals)
|
||||
followUpEval1 := h.CreateEvals[0]
|
||||
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval1.PreviousEval)
|
||||
require.Equal(t, "pending", followUpEval1.Status)
|
||||
require.NotEmpty(t, followUpEval1.WaitUntil)
|
||||
must.Eq(t, nodeStatusUpdateEval.ID, followUpEval1.PreviousEval)
|
||||
must.Eq(t, "pending", followUpEval1.Status)
|
||||
must.NotEq(t, time.Time{}, followUpEval1.WaitUntil)
|
||||
|
||||
followUpEval2 := h.CreateEvals[1]
|
||||
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval2.PreviousEval)
|
||||
require.Equal(t, "pending", followUpEval2.Status)
|
||||
require.NotEmpty(t, followUpEval2.WaitUntil)
|
||||
|
||||
// Insert eval1 in the state store
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
found, err := h.State.EvalByID(nil, followUpEval1.ID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if found == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
|
||||
require.Equal(t, "pending", found.Status)
|
||||
require.NotEmpty(t, found.WaitUntil)
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
// Insert eval2 in the state store
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
found, err := h.State.EvalByID(nil, followUpEval2.ID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if found == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
|
||||
require.Equal(t, "pending", found.Status)
|
||||
require.NotEmpty(t, found.WaitUntil)
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
|
||||
require.NoError(t, err)
|
||||
})
|
||||
must.Eq(t, nodeStatusUpdateEval.ID, followUpEval2.PreviousEval)
|
||||
must.Eq(t, "pending", followUpEval2.Status)
|
||||
must.NotEq(t, time.Time{}, followUpEval2.WaitUntil)
|
||||
|
||||
// Validate that the ClientStatus updates are part of the plan.
|
||||
require.Len(t, h.Plans[0].NodeAllocation[disconnectedNode.ID], count)
|
||||
// Pending update should have unknown status.
|
||||
must.Len(t, count, h.Plans[0].NodeAllocation[disconnectedNode.ID])
|
||||
|
||||
// Pending update should have unknown status.
|
||||
for _, nodeAlloc := range h.Plans[0].NodeAllocation[disconnectedNode.ID] {
|
||||
require.Equal(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown)
|
||||
}
|
||||
|
||||
// Simulate that NodeAllocation got processed.
|
||||
err = h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), h.Plans[0].NodeAllocation[disconnectedNode.ID])
|
||||
require.NoError(t, err, "plan.NodeUpdate")
|
||||
must.NoError(t, h.State.UpsertAllocs(
|
||||
structs.MsgTypeTestSetup, h.NextIndex(),
|
||||
h.Plans[0].NodeAllocation[disconnectedNode.ID]))
|
||||
|
||||
// Validate that the StateStore Upsert applied the ClientStatus we specified.
|
||||
|
||||
for _, alloc := range unknownAllocs {
|
||||
alloc, err = h.State.AllocByID(nil, alloc.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, alloc.ClientStatus, structs.AllocClientStatusUnknown)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, alloc.ClientStatus, structs.AllocClientStatusUnknown)
|
||||
|
||||
// Allocations have been transitioned to unknown
|
||||
require.Equal(t, structs.AllocDesiredStatusRun, alloc.DesiredStatus)
|
||||
require.Equal(t, structs.AllocClientStatusUnknown, alloc.ClientStatus)
|
||||
must.Eq(t, structs.AllocDesiredStatusRun, alloc.DesiredStatus)
|
||||
must.Eq(t, structs.AllocClientStatusUnknown, alloc.ClientStatus)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user