mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Reconciler mutation improvements (#26325)
Refactors of the `computeGroup` code in the reconciler to make understanding its mutations more manageable. Some of this work makes mutation more consistent but more importantly it's intended to make it readily _detectable_ while still being readable. Includes: * In the `computeCanaries` function, we mutate the dstate and the result and then the return values are used to further mutate the result in the caller. Move all this mutation into the function. * In the `computeMigrations` function, we mutate the result and then the return values are used to further mutate the result in the caller. Move all this mutation into the function. * In the `cancelUnneededCanaries` function, we mutate the result and then the return values are used to further mutate the result in the caller. Move all this mutation into the function, and annotate which `allocSet`s are mutated by taking a pointer to the set. * The `createRescheduleLaterEvals` function currently mutates the results and returns updates to mutate the results in the caller. Move all this mutation into the function to help cleanup `computeGroup`. * Extract `computeReconnecting` method from `computeGroup`. There's some tangled logic in `computeGroup` for determining changes to make for reconnecting allocations. Pull this out into its own function. Annotate mutability in the function by passing pointers to `allocSet` where needed, and mutate the result to update counts. Rename the old `computeReconnecting` method to `appendReconnectingUpdates` to mirror the naming of the similar logic for disconnects. * Extract `computeDisconnecting` method from `computeGroup`. There's some tangled logic in `computeGroup` for determining changes to make for disconnected allocations. Pull this out into its own function. Annotate mutability in the function by passing pointers to `allocSet` where needed, and mutate the result to update counts. * The `appendUnknownDisconnectingUpdates` method used to create updates for disconnected allocations mutates one of its `allocSet` arguments to change the allocations that the reschedule now set points to. Pull this update out into the caller. * A handful of small docstring and helper function fixes Ref: https://hashicorp.atlassian.net/browse/NMD-819
This commit is contained in:
@@ -218,6 +218,18 @@ func (set allocSet) fromKeys(keys []string) allocSet {
|
||||
return from
|
||||
}
|
||||
|
||||
// update returns a new allocSet which copies this set but with updates for any
|
||||
// alloc that's also in the "other" set
|
||||
func (set allocSet) update(other allocSet) (updated allocSet) {
|
||||
updated = updated.union(set)
|
||||
for id, alloc := range other {
|
||||
if _, ok := updated[id]; ok {
|
||||
updated[id] = alloc
|
||||
}
|
||||
}
|
||||
return updated
|
||||
}
|
||||
|
||||
// AllocNameIndex is used to select allocation names for placement or removal
|
||||
// given an existing set of placed allocations.
|
||||
type AllocNameIndex struct {
|
||||
|
||||
@@ -18,7 +18,6 @@ import (
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
metrics "github.com/hashicorp/go-metrics/compat"
|
||||
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
sstructs "github.com/hashicorp/nomad/scheduler/structs"
|
||||
@@ -461,44 +460,23 @@ func (a *AllocReconciler) computeGroup(group string, all allocSet) (*ReconcileRe
|
||||
all, ignore := all.filterOldTerminalAllocs(a.jobState)
|
||||
result.DesiredTGUpdates[group].Ignore += uint64(len(ignore))
|
||||
|
||||
var canaries allocSet
|
||||
canaries, all, result.Stop = a.cancelUnneededCanaries(all, result.DesiredTGUpdates[group])
|
||||
canaries := a.cancelUnneededCanaries(&all, group, result)
|
||||
|
||||
// Determine what set of allocations are on tainted nodes
|
||||
untainted, migrate, lost, disconnecting, reconnecting, ignore, expiring := all.filterByTainted(a.clusterState)
|
||||
result.DesiredTGUpdates[group].Ignore += uint64(len(ignore))
|
||||
|
||||
// Determine what set of terminal allocations need to be rescheduled
|
||||
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.jobState.JobIsBatch, false, a.clusterState.Now, a.jobState.EvalID, a.jobState.DeploymentCurrent)
|
||||
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(
|
||||
a.jobState.JobIsBatch, false, a.clusterState.Now,
|
||||
a.jobState.EvalID, a.jobState.DeploymentCurrent)
|
||||
|
||||
// If there are allocations reconnecting we need to reconcile them and
|
||||
// their replacements first because there is specific logic when deciding
|
||||
// which ones to keep that can only be applied when the client reconnects.
|
||||
// If there are allocations reconnecting we need to reconcile them and their
|
||||
// replacements first because there is specific logic when deciding which
|
||||
// ones to keep that can only be applied when the client reconnects.
|
||||
if len(reconnecting) > 0 {
|
||||
// Pass all allocations because the replacements we need to find may be
|
||||
// in any state, including themselves being reconnected.
|
||||
reconnect, stopAllocSet, stopAllocResult := a.reconcileReconnecting(reconnecting, all, tg)
|
||||
result.Stop = append(result.Stop, stopAllocResult...)
|
||||
|
||||
// Stop the reconciled allocations and remove them from untainted, migrate, lost
|
||||
// and disconnecting sets, since they have been already handled.
|
||||
result.DesiredTGUpdates[group].Stop += uint64(len(stopAllocSet))
|
||||
|
||||
untainted = untainted.difference(stopAllocSet)
|
||||
migrate = migrate.difference(stopAllocSet)
|
||||
lost = lost.difference(stopAllocSet)
|
||||
disconnecting = disconnecting.difference(stopAllocSet)
|
||||
|
||||
// Validate and add reconnecting allocations to the plan so they are
|
||||
// logged.
|
||||
if len(reconnect) > 0 {
|
||||
result.ReconnectUpdates = a.computeReconnecting(reconnect)
|
||||
result.DesiredTGUpdates[tg.Name].Reconnect = uint64(len(result.ReconnectUpdates))
|
||||
|
||||
// The rest of the reconnecting allocations is now untainted and will
|
||||
// be further reconciled below.
|
||||
untainted = untainted.union(reconnect)
|
||||
}
|
||||
a.computeReconnecting(&untainted, &migrate, &lost, &disconnecting,
|
||||
reconnecting, all, tg, result)
|
||||
}
|
||||
|
||||
if len(expiring) > 0 {
|
||||
@@ -512,35 +490,23 @@ func (a *AllocReconciler) computeGroup(group string, all allocSet) (*ReconcileRe
|
||||
result.DesiredFollowupEvals = map[string][]*structs.Evaluation{}
|
||||
result.DisconnectUpdates = make(allocSet)
|
||||
|
||||
// Determine what set of disconnecting allocations need to be rescheduled now,
|
||||
// which ones later and which ones can't be rescheduled at all.
|
||||
// Determine what set of disconnecting allocations need to be rescheduled
|
||||
// now, which ones later and which ones can't be rescheduled at all.
|
||||
timeoutLaterEvals := map[string]string{}
|
||||
if len(disconnecting) > 0 {
|
||||
if tg.GetDisconnectLostTimeout() != 0 {
|
||||
untaintedDisconnecting, rescheduleDisconnecting, laterDisconnecting := disconnecting.filterByRescheduleable(
|
||||
a.jobState.JobIsBatch, true, a.clusterState.Now, a.jobState.EvalID, a.jobState.DeploymentCurrent)
|
||||
|
||||
rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
|
||||
untainted = untainted.union(untaintedDisconnecting)
|
||||
rescheduleLater = append(rescheduleLater, laterDisconnecting...)
|
||||
|
||||
// Find delays for any disconnecting allocs that have
|
||||
// disconnect.lost_after, create followup evals, and update the
|
||||
// ClientStatus to unknown.
|
||||
var followupEvals []*structs.Evaluation
|
||||
timeoutLaterEvals, followupEvals = a.createTimeoutLaterEvals(disconnecting, tg.Name)
|
||||
result.DesiredFollowupEvals[tg.Name] = append(result.DesiredFollowupEvals[tg.Name], followupEvals...)
|
||||
}
|
||||
|
||||
updates := appendUnknownDisconnectingUpdates(disconnecting, timeoutLaterEvals, rescheduleNow)
|
||||
maps.Copy(result.DisconnectUpdates, updates)
|
||||
result.DesiredTGUpdates[tg.Name].Disconnect = uint64(len(result.DisconnectUpdates))
|
||||
result.DesiredTGUpdates[tg.Name].RescheduleNow = uint64(len(rescheduleNow))
|
||||
timeoutLaterEvals = a.computeDisconnecting(
|
||||
disconnecting,
|
||||
&untainted,
|
||||
&rescheduleNow,
|
||||
&rescheduleLater,
|
||||
tg,
|
||||
result,
|
||||
)
|
||||
}
|
||||
|
||||
// Find delays for any lost allocs that have disconnect.stop_on_client_after
|
||||
lostLaterEvals := map[string]string{}
|
||||
lostLater := []*delayedRescheduleInfo{}
|
||||
lostLater := []*delayedRescheduleInfo{} // guards computePlacements
|
||||
|
||||
if len(lost) > 0 {
|
||||
lostLater = lost.delayByStopAfter()
|
||||
@@ -549,17 +515,15 @@ func (a *AllocReconciler) computeGroup(group string, all allocSet) (*ReconcileRe
|
||||
result.DesiredFollowupEvals[tg.Name] = append(result.DesiredFollowupEvals[tg.Name], followupEvals...)
|
||||
}
|
||||
|
||||
// Merge disconnecting with the disconnect.stop_on_client_after set into the
|
||||
// lostLaterEvals so that computeStop can add them to the stop set.
|
||||
lostLaterEvals = helper.MergeMapStringString(lostLaterEvals, timeoutLaterEvals)
|
||||
// Merge evals for disconnecting with the disconnect.stop_on_client_after
|
||||
// set into the lostLaterEvals so that computeStop can add them to the stop
|
||||
// set.
|
||||
maps.Copy(lostLaterEvals, timeoutLaterEvals)
|
||||
|
||||
if len(rescheduleLater) > 0 {
|
||||
// Create batched follow-up evaluations for allocations that are
|
||||
// reschedulable later and mark the allocations for in place updating
|
||||
var followups []*structs.Evaluation
|
||||
followups, result.AttributeUpdates = a.createRescheduleLaterEvals(rescheduleLater, all, result.DisconnectUpdates)
|
||||
result.DesiredFollowupEvals[tg.Name] = append(result.DesiredFollowupEvals[tg.Name], followups...)
|
||||
result.DesiredTGUpdates[tg.Name].RescheduleLater = uint64(len(rescheduleLater))
|
||||
a.createRescheduleLaterEvals(rescheduleLater, all, tg.Name, result)
|
||||
}
|
||||
// Create a structure for choosing names. Seed with the taken names
|
||||
// which is the union of untainted, rescheduled, allocs on migrating
|
||||
@@ -597,8 +561,7 @@ func (a *AllocReconciler) computeGroup(group string, all allocSet) (*ReconcileRe
|
||||
}
|
||||
requiresCanaries := requiresCanaries(tg, dstate, destructive, canaries)
|
||||
if requiresCanaries {
|
||||
placeCanaries := a.computeCanaries(tg, dstate, destructive, canaries, result.DesiredTGUpdates[group], nameIndex)
|
||||
result.Place = append(result.Place, placeCanaries...)
|
||||
a.computeCanaries(tg, dstate, destructive, canaries, nameIndex, group, result)
|
||||
}
|
||||
|
||||
// Determine how many non-canary allocs we can place
|
||||
@@ -634,9 +597,7 @@ func (a *AllocReconciler) computeGroup(group string, all allocSet) (*ReconcileRe
|
||||
result.DesiredTGUpdates[group].Ignore += uint64(len(destructive))
|
||||
}
|
||||
|
||||
stopMigrations, placeMigrations := a.computeMigrations(result.DesiredTGUpdates[group], migrate, tg, isCanarying)
|
||||
result.Stop = append(result.Stop, stopMigrations...)
|
||||
result.Place = append(result.Place, placeMigrations...)
|
||||
a.computeMigrations(result, migrate, tg, isCanarying)
|
||||
result.Deployment = a.createDeployment(
|
||||
tg.Name, tg.Update, existingDeployment, dstate, all, destructive, int(result.DesiredTGUpdates[group].InPlaceUpdate))
|
||||
|
||||
@@ -740,15 +701,23 @@ func requiresCanaries(tg *structs.TaskGroup, dstate *structs.DeploymentState, de
|
||||
|
||||
// computeCanaries returns the set of new canaries to place. It mutates the
|
||||
// Canary field on the DesiredUpdates and the DesiredCanaries on the dstate
|
||||
func (a *AllocReconciler) computeCanaries(tg *structs.TaskGroup, dstate *structs.DeploymentState,
|
||||
destructive, canaries allocSet, desiredChanges *structs.DesiredUpdates, nameIndex *AllocNameIndex) []AllocPlaceResult {
|
||||
func (a *AllocReconciler) computeCanaries(
|
||||
tg *structs.TaskGroup, dstate *structs.DeploymentState,
|
||||
destructive, canaries allocSet,
|
||||
nameIndex *AllocNameIndex,
|
||||
group string,
|
||||
result *ReconcileResults,
|
||||
) {
|
||||
|
||||
dstate.DesiredCanaries = tg.Update.Canary
|
||||
|
||||
placementResult := []AllocPlaceResult{}
|
||||
|
||||
if !a.jobState.DeploymentPaused && !a.jobState.DeploymentFailed {
|
||||
desiredChanges.Canary += uint64(tg.Update.Canary - len(canaries))
|
||||
for _, name := range nameIndex.NextCanaries(uint(desiredChanges.Canary), canaries, destructive) {
|
||||
result.DesiredTGUpdates[group].Canary += uint64(tg.Update.Canary - len(canaries))
|
||||
total := uint(result.DesiredTGUpdates[group].Canary)
|
||||
|
||||
for _, name := range nameIndex.NextCanaries(total, canaries, destructive) {
|
||||
placementResult = append(placementResult, AllocPlaceResult{
|
||||
name: name,
|
||||
canary: true,
|
||||
@@ -757,19 +726,18 @@ func (a *AllocReconciler) computeCanaries(tg *structs.TaskGroup, dstate *structs
|
||||
}
|
||||
}
|
||||
|
||||
return placementResult
|
||||
result.Place = append(result.Place, placementResult...)
|
||||
}
|
||||
|
||||
// cancelUnneededCanaries handles the canaries for the group by stopping the
|
||||
// unneeded ones and returning the current set of canaries and the updated total
|
||||
// set of allocs for the group
|
||||
func (a *AllocReconciler) cancelUnneededCanaries(original allocSet, desiredChanges *structs.DesiredUpdates) (
|
||||
canaries, all allocSet, allocsToStop []AllocStopResult) {
|
||||
func (a *AllocReconciler) cancelUnneededCanaries(all *allocSet, group string, result *ReconcileResults) (
|
||||
canaries allocSet) {
|
||||
|
||||
// Stop any canary from an older deployment or from a failed one
|
||||
var stop []string
|
||||
|
||||
all = original
|
||||
|
||||
// Cancel any non-promoted canaries from the older deployment
|
||||
if a.jobState.DeploymentOld != nil {
|
||||
for _, dstate := range a.jobState.DeploymentOld.TaskGroups {
|
||||
@@ -791,9 +759,9 @@ func (a *AllocReconciler) cancelUnneededCanaries(original allocSet, desiredChang
|
||||
// stopSet is the allocSet that contains the canaries we desire to stop from
|
||||
// above.
|
||||
stopSet := all.fromKeys(stop)
|
||||
allocsToStop = markStop(stopSet, "", sstructs.StatusAllocNotNeeded)
|
||||
desiredChanges.Stop += uint64(len(stopSet))
|
||||
all = all.difference(stopSet)
|
||||
allocsToStop := markStop(stopSet, "", sstructs.StatusAllocNotNeeded)
|
||||
result.DesiredTGUpdates[group].Stop += uint64(len(stopSet))
|
||||
*all = all.difference(stopSet)
|
||||
|
||||
// Capture our current set of canaries and handle any migrations that are
|
||||
// needed by just stopping them.
|
||||
@@ -815,10 +783,11 @@ func (a *AllocReconciler) cancelUnneededCanaries(original allocSet, desiredChang
|
||||
)
|
||||
|
||||
canaries = untainted
|
||||
all = all.difference(migrate, lost)
|
||||
*all = all.difference(migrate, lost)
|
||||
}
|
||||
|
||||
return
|
||||
result.Stop = allocsToStop
|
||||
return canaries
|
||||
}
|
||||
|
||||
// computeUnderProvisionedBy returns the number of allocs that still need to be
|
||||
@@ -1033,21 +1002,19 @@ func (a *AllocReconciler) computeDestructiveUpdates(destructive allocSet, underP
|
||||
return destructiveResult
|
||||
}
|
||||
|
||||
// computeMigrations returns the stops and placements for the allocs marked for
|
||||
// migration. It mutates the Migrate field on the DesiredUpdates counts
|
||||
func (a *AllocReconciler) computeMigrations(desiredChanges *structs.DesiredUpdates, migrate allocSet,
|
||||
tg *structs.TaskGroup, isCanarying bool) ([]AllocStopResult, []AllocPlaceResult) {
|
||||
// computeMigrations updates the result with the stops and placements required
|
||||
// for migration.
|
||||
func (a *AllocReconciler) computeMigrations(result *ReconcileResults, migrate allocSet,
|
||||
tg *structs.TaskGroup, isCanarying bool) {
|
||||
|
||||
allocsToStop := []AllocStopResult{}
|
||||
allocsToPlace := []AllocPlaceResult{}
|
||||
result.DesiredTGUpdates[tg.Name].Migrate += uint64(len(migrate))
|
||||
|
||||
desiredChanges.Migrate += uint64(len(migrate))
|
||||
for _, alloc := range migrate.nameOrder() {
|
||||
allocsToStop = append(allocsToStop, AllocStopResult{
|
||||
result.Stop = append(result.Stop, AllocStopResult{
|
||||
Alloc: alloc,
|
||||
StatusDescription: sstructs.StatusAllocMigrating,
|
||||
})
|
||||
allocsToPlace = append(allocsToPlace, AllocPlaceResult{
|
||||
result.Place = append(result.Place, AllocPlaceResult{
|
||||
name: alloc.Name,
|
||||
canary: alloc.DeploymentStatus.IsCanary(),
|
||||
taskGroup: tg,
|
||||
@@ -1057,8 +1024,6 @@ func (a *AllocReconciler) computeMigrations(desiredChanges *structs.DesiredUpdat
|
||||
minJobVersion: alloc.Job.Version,
|
||||
})
|
||||
}
|
||||
|
||||
return allocsToStop, allocsToPlace
|
||||
}
|
||||
|
||||
// createDeployment creates a new deployment if necessary.
|
||||
@@ -1244,6 +1209,39 @@ func (a *AllocReconciler) computeStop(group *structs.TaskGroup, nameIndex *Alloc
|
||||
return stopAllocSet, stopAllocResult
|
||||
}
|
||||
|
||||
// If there are allocations reconnecting we need to reconcile them and their
|
||||
// replacements first because there is specific logic when deciding which ones
|
||||
// to keep that can only be applied when the client reconnects.
|
||||
func (a *AllocReconciler) computeReconnecting(
|
||||
untainted, migrate, lost, disconnecting *allocSet, reconnecting, all allocSet,
|
||||
tg *structs.TaskGroup, result *ReconcileResults) {
|
||||
|
||||
// Pass all allocations because the replacements we need to find may be in
|
||||
// any state, including themselves being reconnected.
|
||||
reconnect, stopAllocSet, stopAllocResult := a.reconcileReconnecting(reconnecting, all, tg)
|
||||
result.Stop = append(result.Stop, stopAllocResult...)
|
||||
|
||||
// Stop the reconciled allocations and remove them from untainted, migrate,
|
||||
// lost and disconnecting sets, since they have been already handled.
|
||||
result.DesiredTGUpdates[tg.Name].Stop += uint64(len(stopAllocSet))
|
||||
|
||||
*untainted = untainted.difference(stopAllocSet)
|
||||
*migrate = migrate.difference(stopAllocSet)
|
||||
*lost = lost.difference(stopAllocSet)
|
||||
*disconnecting = disconnecting.difference(stopAllocSet)
|
||||
|
||||
// Validate and add reconnecting allocations to the plan so they are
|
||||
// logged.
|
||||
if len(reconnect) > 0 {
|
||||
result.ReconnectUpdates = a.appendReconnectingUpdates(reconnect)
|
||||
result.DesiredTGUpdates[tg.Name].Reconnect = uint64(len(result.ReconnectUpdates))
|
||||
|
||||
// The rest of the reconnecting allocations are now untainted and will
|
||||
// be further reconciled below.
|
||||
*untainted = untainted.union(reconnect)
|
||||
}
|
||||
}
|
||||
|
||||
// reconcileReconnecting receives the set of allocations that are reconnecting
|
||||
// and all other allocations for the same group and determines which ones to
|
||||
// reconnect, which ones to stop, and the stop results for the latter.
|
||||
@@ -1390,11 +1388,15 @@ func (a *AllocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
|
||||
return
|
||||
}
|
||||
|
||||
// createRescheduleLaterEvals creates batched followup evaluations with the WaitUntil field
|
||||
// set for allocations that are eligible to be rescheduled later, and marks the alloc with
|
||||
// the followupEvalID. this function modifies disconnectUpdates in place.
|
||||
func (a *AllocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet,
|
||||
disconnectUpdates allocSet) ([]*structs.Evaluation, allocSet) {
|
||||
// createRescheduleLaterEvals creates batched followup evaluations with the
|
||||
// WaitUntil field set for allocations that are eligible to be rescheduled
|
||||
// later, and marks the alloc with the FollowupEvalID in the result.
|
||||
// TODO(tgross): this needs a better name?
|
||||
func (a *AllocReconciler) createRescheduleLaterEvals(
|
||||
rescheduleLater []*delayedRescheduleInfo,
|
||||
all allocSet,
|
||||
group string,
|
||||
result *ReconcileResults) {
|
||||
|
||||
// followupEvals are created in the same way as for delayed lost allocs
|
||||
allocIDToFollowupEvalID, followupEvals := a.createLostLaterEvals(rescheduleLater)
|
||||
@@ -1408,22 +1410,25 @@ func (a *AllocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedR
|
||||
updatedAlloc.FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID]
|
||||
|
||||
// Can't updated an allocation that is disconnected
|
||||
if _, ok := disconnectUpdates[laterAlloc.allocID]; !ok {
|
||||
if d, ok := result.DisconnectUpdates[laterAlloc.allocID]; !ok {
|
||||
attributeUpdates[laterAlloc.allocID] = updatedAlloc
|
||||
} else {
|
||||
disconnectUpdates[laterAlloc.allocID].FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID]
|
||||
d.FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID]
|
||||
}
|
||||
}
|
||||
|
||||
return followupEvals, attributeUpdates
|
||||
result.AttributeUpdates = attributeUpdates
|
||||
result.DesiredFollowupEvals[group] = append(
|
||||
result.DesiredFollowupEvals[group], followupEvals...)
|
||||
result.DesiredTGUpdates[group].RescheduleLater = uint64(len(rescheduleLater))
|
||||
}
|
||||
|
||||
// computeReconnecting copies existing allocations in the unknown state, but
|
||||
// whose nodes have been identified as ready. The Allocations DesiredStatus is
|
||||
// set to running, and these allocs are appended to the Plan as non-destructive
|
||||
// updates. Clients are responsible for reconciling the DesiredState with the
|
||||
// actual state as the node comes back online.
|
||||
func (a *AllocReconciler) computeReconnecting(reconnecting allocSet) allocSet {
|
||||
// appendReconnectingUpdates copies existing allocations in the unknown state,
|
||||
// but whose nodes have been identified as ready. The Allocations DesiredStatus
|
||||
// is set to running, and these allocs are appended to the Plan as
|
||||
// non-destructive updates. Clients are responsible for reconciling the
|
||||
// DesiredState with the actual state as the node comes back online.
|
||||
func (a *AllocReconciler) appendReconnectingUpdates(reconnecting allocSet) allocSet {
|
||||
|
||||
reconnectingUpdates := make(allocSet)
|
||||
|
||||
@@ -1458,9 +1463,9 @@ func (a *AllocReconciler) computeReconnecting(reconnecting allocSet) allocSet {
|
||||
return reconnectingUpdates
|
||||
}
|
||||
|
||||
// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for
|
||||
// lost allocations. followupEvals are appended to a.result as a side effect, we return a
|
||||
// map of alloc IDs to their followupEval IDs.
|
||||
// createLostLaterEvals creates batched followup evaluations with the WaitUntil
|
||||
// field set for lost allocations. followupEvals are appended to a.result as a
|
||||
// side effect, we return a map of alloc IDs to their followupEval IDs.
|
||||
func (a *AllocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo) (map[string]string, []*structs.Evaluation) {
|
||||
if len(rescheduleLater) == 0 {
|
||||
return map[string]string{}, nil
|
||||
@@ -1528,8 +1533,7 @@ func (a *AllocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName
|
||||
|
||||
timeoutDelays, err := disconnecting.delayByLostAfter(a.clusterState.Now)
|
||||
if err != nil {
|
||||
a.logger.Error("error for task_group",
|
||||
"task_group", tgName, "error", err)
|
||||
a.logger.Error("error for task_group", "task_group", tgName, "error", err)
|
||||
return map[string]string{}, nil
|
||||
}
|
||||
|
||||
@@ -1589,11 +1593,51 @@ func (a *AllocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName
|
||||
return allocIDToFollowupEvalID, evals
|
||||
}
|
||||
|
||||
// Create updates that will be applied to the allocs to mark the FollowupEvalID
|
||||
// and the unknown ClientStatus and AllocState.
|
||||
// computeDisconnecting returns an allocSet disconnecting allocs that need to be
|
||||
// rescheduled now, a set to reschedule later, a set of follow-up evals, and
|
||||
// those allocations which can't be rescheduled.
|
||||
func (a *AllocReconciler) computeDisconnecting(
|
||||
disconnecting allocSet,
|
||||
untainted, rescheduleNow *allocSet,
|
||||
rescheduleLater *[]*delayedRescheduleInfo, tg *structs.TaskGroup,
|
||||
result *ReconcileResults,
|
||||
) (
|
||||
timeoutLaterEvals map[string]string,
|
||||
) {
|
||||
timeoutLaterEvals = make(map[string]string)
|
||||
|
||||
if tg.GetDisconnectLostTimeout() != 0 {
|
||||
untaintedDisconnecting, rescheduleDisconnecting, laterDisconnecting := disconnecting.filterByRescheduleable(
|
||||
a.jobState.JobIsBatch, true, a.clusterState.Now, a.jobState.EvalID, a.jobState.DeploymentCurrent)
|
||||
|
||||
*rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
|
||||
*untainted = untainted.union(untaintedDisconnecting)
|
||||
*rescheduleLater = append(*rescheduleLater, laterDisconnecting...)
|
||||
|
||||
// Find delays for any disconnecting allocs that have
|
||||
// disconnect.lost_after, create followup evals, and update the
|
||||
// ClientStatus to unknown.
|
||||
var followupEvals []*structs.Evaluation
|
||||
timeoutLaterEvals, followupEvals = a.createTimeoutLaterEvals(disconnecting, tg.Name)
|
||||
result.DesiredFollowupEvals[tg.Name] = append(result.DesiredFollowupEvals[tg.Name], followupEvals...)
|
||||
}
|
||||
|
||||
updates := appendUnknownDisconnectingUpdates(disconnecting, timeoutLaterEvals)
|
||||
*rescheduleNow = rescheduleNow.update(updates)
|
||||
|
||||
maps.Copy(result.DisconnectUpdates, updates)
|
||||
result.DesiredTGUpdates[tg.Name].Disconnect = uint64(len(result.DisconnectUpdates))
|
||||
result.DesiredTGUpdates[tg.Name].RescheduleNow = uint64(len(*rescheduleNow))
|
||||
|
||||
return timeoutLaterEvals
|
||||
}
|
||||
|
||||
// appendUnknownDisconnectingUpdates returns a new allocSet of allocations with
|
||||
// updates to mark the FollowupEvalID and and the unknown ClientStatus and
|
||||
// AllocState.
|
||||
func appendUnknownDisconnectingUpdates(disconnecting allocSet,
|
||||
allocIDToFollowupEvalID map[string]string, rescheduleNow allocSet) map[string]*structs.Allocation {
|
||||
resultingDisconnectUpdates := map[string]*structs.Allocation{}
|
||||
allocIDToFollowupEvalID map[string]string) allocSet {
|
||||
resultingDisconnectUpdates := make(allocSet)
|
||||
for id, alloc := range disconnecting {
|
||||
updatedAlloc := alloc.Copy()
|
||||
updatedAlloc.ClientStatus = structs.AllocClientStatusUnknown
|
||||
@@ -1601,14 +1645,6 @@ func appendUnknownDisconnectingUpdates(disconnecting allocSet,
|
||||
updatedAlloc.ClientDescription = sstructs.StatusAllocUnknown
|
||||
updatedAlloc.FollowupEvalID = allocIDToFollowupEvalID[id]
|
||||
resultingDisconnectUpdates[updatedAlloc.ID] = updatedAlloc
|
||||
|
||||
// update the reschedule set so that any placements holding onto this
|
||||
// pointer are using the right pointer for PreviousAllocation()
|
||||
for i, alloc := range rescheduleNow {
|
||||
if alloc.ID == updatedAlloc.ID {
|
||||
rescheduleNow[i] = updatedAlloc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return resultingDisconnectUpdates
|
||||
|
||||
@@ -228,9 +228,13 @@ func TestAllocReconciler_cancelUnneededCanaries(t *testing.T) {
|
||||
group := job.TaskGroups[0].Name
|
||||
all := m[group] // <-- allocset of all allocs for tg
|
||||
all, _ = all.filterOldTerminalAllocs(jobState)
|
||||
original := all
|
||||
|
||||
result := new(ReconcileResults)
|
||||
result.DesiredTGUpdates = map[string]*structs.DesiredUpdates{group: {}}
|
||||
|
||||
// runs the method under test
|
||||
canaries, _, stopAllocs := ar.cancelUnneededCanaries(all, new(structs.DesiredUpdates))
|
||||
canaries := ar.cancelUnneededCanaries(&all, group, result)
|
||||
|
||||
expectedStopped := []string{}
|
||||
if jobState.DeploymentOld != nil {
|
||||
@@ -247,8 +251,8 @@ func TestAllocReconciler_cancelUnneededCanaries(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
stopSet := all.fromKeys(expectedStopped)
|
||||
all = all.difference(stopSet)
|
||||
stopSet := original.fromKeys(expectedStopped)
|
||||
all = original.difference(stopSet)
|
||||
|
||||
expectedCanaries := []string{}
|
||||
if jobState.DeploymentCurrent != nil {
|
||||
@@ -261,7 +265,7 @@ func TestAllocReconciler_cancelUnneededCanaries(t *testing.T) {
|
||||
|
||||
stopSet = stopSet.union(migrate, lost)
|
||||
|
||||
must.Eq(t, len(stopAllocs), len(stopSet))
|
||||
must.Eq(t, len(result.Stop), len(stopSet))
|
||||
must.Eq(t, len(canaries), len(canariesOnUntaintedNodes))
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user