scheduler: allow canaries to be migrated on node drain (#25726)

When a node is drained that has canaries that are not yet healthy, the canaries
may not be properly migrated and the deployment will halt. This happens only if
there are more than `migrate.max_parallel` canaries on the node and the canaries
are not yet healthy (ex. they have a long `update.min_healthy_time`). In this
circumstance, the first batch of canaries are marked for migration by the
drainer correctly. But then the reconciler counts these migrated canaries
against the total number of expected canaries and no longer progresses the
deployment. Because an insufficient number of allocations have reported they're
healthy, the deployment cannot be promoted.

When the reconciler looks for canaries to cancel, it leaves in the list any
canaries that are already terminal (because there shouldn't be any work to
do). But this ends up skipping the creation of a new canary to replace terminal
canaries that have been marked for migration. Add a conditional for this case to
cause the canary to be removed from the list of active canaries so we can
replace it.

Ref: https://hashicorp.atlassian.net/browse/NMD-560
Fixes: https://github.com/hashicorp/nomad/issues/17842
This commit is contained in:
Tim Gross
2025-04-24 09:24:28 -04:00
committed by GitHub
parent 3ad0df71a8
commit 5208ad4c2c
4 changed files with 142 additions and 17 deletions

3
.changelog/25726.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug where draining a node with canaries could result in a stuck deployment
```

View File

@@ -2024,7 +2024,7 @@ func TestServiceSched_JobModify(t *testing.T) {
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
@@ -2417,7 +2417,7 @@ func TestServiceSched_JobModify_Datacenters(t *testing.T) {
job2.Datacenters = []string{"dc1", "dc2"}
require.NoError(h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
@@ -2490,7 +2490,7 @@ func TestServiceSched_JobModify_IncrCount_NodeLimit(t *testing.T) {
job2.TaskGroups[0].Count = 3
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
@@ -2601,7 +2601,7 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) {
job2.TaskGroups[0].Count = 0
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
@@ -2699,7 +2699,7 @@ func TestServiceSched_JobModify_Rolling(t *testing.T) {
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
@@ -2930,7 +2930,7 @@ func TestServiceSched_JobModify_Canaries(t *testing.T) {
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
@@ -3073,7 +3073,7 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) {
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
@@ -3714,7 +3714,7 @@ func TestServiceSched_NodeDown(t *testing.T) {
allocs := []*structs.Allocation{alloc}
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
@@ -4173,6 +4173,117 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_NodeDrain_Canaries(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
n1 := mock.Node()
n2 := mock.DrainNode()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), n1))
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), n2))
job := mock.Job()
job.TaskGroups[0].Count = 2
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// previous version allocations
var allocs []*structs.Allocation
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = n1.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
t.Logf("prev alloc=%q", alloc.ID)
}
// canaries on draining node
job = job.Copy()
job.Meta["owner"] = "changed"
job.Version++
var canaries []string
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = n2.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.ClientStatus = structs.AllocClientStatusComplete
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(false),
Canary: true,
}
alloc.DesiredTransition = structs.DesiredTransition{
Migrate: pointer.Of(true),
}
allocs = append(allocs, alloc)
canaries = append(canaries, alloc.ID)
t.Logf("stopped canary alloc=%q", alloc.ID)
}
// first canary placed from previous drainer eval
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = n2.ID
alloc.Name = fmt.Sprintf("my-job.web[0]")
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.PreviousAllocation = canaries[0]
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(false),
Canary: true,
}
allocs = append(allocs, alloc)
canaries = append(canaries, alloc.ID)
t.Logf("new canary alloc=%q", alloc.ID)
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
deployment := mock.Deployment()
deployment.JobID = job.ID
deployment.JobVersion = job.Version
deployment.JobCreateIndex = job.CreateIndex
deployment.JobSpecModifyIndex = job.JobModifyIndex
deployment.TaskGroups["web"] = &structs.DeploymentState{
AutoRevert: false,
AutoPromote: false,
Promoted: false,
PlacedCanaries: canaries,
DesiredCanaries: 2,
DesiredTotal: 2,
PlacedAllocs: 3,
HealthyAllocs: 0,
UnhealthyAllocs: 0,
}
must.NoError(t, h.State.UpsertDeployment(h.NextIndex(), deployment))
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: n2.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))
must.NoError(t, h.Process(NewServiceScheduler, eval))
must.Len(t, 1, h.Plans)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
must.MapLen(t, 0, h.Plans[0].NodeAllocation)
must.MapLen(t, 1, h.Plans[0].NodeUpdate)
must.Len(t, 2, h.Plans[0].NodeUpdate[n2.ID])
for _, alloc := range h.Plans[0].NodeUpdate[n2.ID] {
must.SliceContains(t, canaries, alloc.ID)
}
}
func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) {
ci.Parallel(t)

View File

@@ -3226,6 +3226,8 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
allocs = append(allocs, alloc)
}
n := mock.DrainNode()
// Create two canaries for the new job
handled := make(map[string]allocUpdateType)
for i := 0; i < 2; i++ {
@@ -3233,7 +3235,7 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
canary := mock.Alloc()
canary.Job = job
canary.JobID = job.ID
canary.NodeID = uuid.Generate()
canary.NodeID = n.ID
canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
canary.TaskGroup = job.TaskGroups[0].Name
canary.DeploymentID = d.ID
@@ -3244,8 +3246,9 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
// Build a map of tainted nodes that contains the last canary
tainted := make(map[string]*structs.Node, 1)
n := mock.DrainNode()
n.ID = allocs[11].NodeID
// This is what drainer sets for draining allocations
allocs[10].DesiredTransition.Migrate = pointer.Of(true)
allocs[11].DesiredTransition.Migrate = pointer.Of(true)
tainted[n.ID] = n
@@ -3258,18 +3261,18 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 1,
place: 2,
inplace: 0,
stop: 1,
stop: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Canary: 1,
Ignore: 11,
Canary: 2,
Ignore: 10,
},
},
})
assertNamesHaveIndexes(t, intRange(1, 1), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(1, 1), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 1), placeResultsToNames(r.place))
}
// Tests the reconciler handles migrating a canary correctly on a lost node

View File

@@ -307,6 +307,14 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
continue
}
// Terminal canaries that have been marked for migration need to be
// migrated, otherwise we block deployments from progressing by
// counting them as running canaries.
if alloc.DeploymentStatus.IsCanary() && alloc.DesiredTransition.ShouldMigrate() {
migrate[alloc.ID] = alloc
continue
}
// Terminal allocs, if not reconnect, are always untainted as they
// should never be migrated.
untainted[alloc.ID] = alloc