stateful deployments: find feasible node for sticky host volumes (#24558)

This changeset implements node feasibility checks for sticky host volumes.
This commit is contained in:
Piotr Kazmierczak
2024-12-18 19:52:07 +01:00
committed by Tim Gross
parent 2adf6d5208
commit 8cbb74786c
8 changed files with 314 additions and 15 deletions

View File

@@ -278,6 +278,8 @@ type Allocation struct {
Resources *Resources
TaskResources map[string]*Resources
AllocatedResources *AllocatedResources
HostVolumeIDs []string
CSIVolumeIDs []string
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string

View File

@@ -11114,6 +11114,13 @@ type Allocation struct {
// AllocatedResources is the total resources allocated for the task group.
AllocatedResources *AllocatedResources
// HostVolumeIDs is a list of host volume IDs that this allocation
// has claimed.
HostVolumeIDs []string
// CSIVolumeIDs is a list of CSI volume IDs that this allocation has claimed.
CSIVolumeIDs []string
// Metrics associated with this allocation
Metrics *AllocMetric

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"reflect"
"regexp"
"slices"
"strconv"
"strings"
@@ -137,23 +138,28 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context
volumeReqs []*structs.VolumeRequest
namespace string
ctx Context
volumeReqs []*structs.VolumeRequest
hostVolumeIDs []string
namespace string
}
// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {
return &HostVolumeChecker{
ctx: ctx,
volumeReqs: []*structs.VolumeRequest{},
ctx: ctx,
volumeReqs: []*structs.VolumeRequest{},
hostVolumeIDs: []string{},
}
}
// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest) {
func (h *HostVolumeChecker) SetVolumes(
allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string,
) {
h.namespace = ns
h.volumeReqs = []*structs.VolumeRequest{}
h.hostVolumeIDs = allocHostVolumeIDs
for _, req := range volumes {
if req.Type != structs.VolumeTypeHost {
continue // filter CSI volumes
@@ -181,7 +187,6 @@ func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
}
func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
// Fast path: Requested no volumes. No need to check further.
if len(h.volumeReqs) == 0 {
return true
@@ -216,6 +221,15 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
if !capOk {
return false
}
if req.Sticky {
if slices.Contains(h.hostVolumeIDs, vol.ID) || len(h.hostVolumeIDs) == 0 {
return true
}
return false
}
} else if !req.ReadOnly {
// this is a static host volume and can only be mounted ReadOnly,
// validate that no requests for it are ReadWrite.

View File

@@ -177,7 +177,7 @@ func TestHostVolumeChecker(t *testing.T) {
alloc.NodeID = nodes[2].ID
for i, c := range cases {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
@@ -359,7 +359,116 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
}
}
func TestHostVolumeChecker_Sticky(t *testing.T) {
ci.Parallel(t)
store, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
hostVolCapsReadWrite := []*structs.HostVolumeCapability{
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeWriter,
},
}
dhv := &structs.HostVolume{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Name: "foo",
NodeID: nodes[1].ID,
RequestedCapabilities: hostVolCapsReadWrite,
State: structs.HostVolumeStateReady,
}
nodes[0].HostVolumes = map[string]*structs.ClientHostVolumeConfig{}
nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {ID: dhv.ID}}
for _, node := range nodes {
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node))
}
must.NoError(t, store.UpsertHostVolume(1000, dhv))
stickyRequest := map[string]*structs.VolumeRequest{
"foo": {
Type: "host",
Source: "foo",
Sticky: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
}
checker := NewHostVolumeChecker(ctx)
// alloc0 wants a previously registered volume ID that's available on node1
alloc0 := mock.Alloc()
alloc0.NodeID = nodes[1].ID
alloc0.HostVolumeIDs = []string{dhv.ID}
// alloc1 wants a volume ID that's available on node1 but hasn't used it
// before
alloc1 := mock.Alloc()
alloc1.NodeID = nodes[1].ID
// alloc2 wants a volume ID that's unrelated
alloc2 := mock.Alloc()
alloc2.NodeID = nodes[1].ID
alloc2.HostVolumeIDs = []string{uuid.Generate()}
// insert all the allocs into the state
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc0, alloc1, alloc2}))
cases := []struct {
name string
node *structs.Node
alloc *structs.Allocation
expect bool
}{
{
"alloc asking for a sticky volume on an infeasible node",
nodes[0],
alloc0,
false,
},
{
"alloc asking for a sticky volume on a feasible node",
nodes[1],
alloc0,
true,
},
{
"alloc asking for a sticky volume on a feasible node for the first time",
nodes[1],
alloc1,
true,
},
{
"alloc asking for an unrelated volume",
nodes[1],
alloc2,
false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})

View File

@@ -6,6 +6,7 @@ package scheduler
import (
"fmt"
"runtime/debug"
"slices"
"sort"
"time"
@@ -657,6 +658,18 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
"old_alloc_name", oldAllocName, "new_alloc_name", newAllocName)
}
// Are there sticky volumes requested by the task group for the first time? If
// yes, make sure the allocation stores their IDs for future reschedules.
var newHostVolumeIDs []string
for _, v := range tg.Volumes {
if v.Sticky {
if missing.PreviousAllocation() != nil && len(missing.PreviousAllocation().HostVolumeIDs) > 0 {
continue
}
newHostVolumeIDs = append(newHostVolumeIDs, option.Node.HostVolumes[v.Source].ID)
}
}
// Create an allocation for this
alloc := &structs.Allocation{
ID: uuid.Generate(),
@@ -681,6 +694,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
},
}
if len(newHostVolumeIDs) > 0 {
alloc.HostVolumeIDs = newHostVolumeIDs
}
// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if prevAllocation != nil {
@@ -689,6 +706,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
updateRescheduleTracker(alloc, prevAllocation, now)
}
if len(prevAllocation.HostVolumeIDs) > 0 {
alloc.HostVolumeIDs = prevAllocation.HostVolumeIDs
}
// If the allocation has task handles,
// copy them to the new allocation
propagateTaskState(alloc, prevAllocation, missing.PreviousLost())
@@ -838,6 +859,10 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs
}
}
selectOptions.PenaltyNodeIDs = penaltyNodes
if prevAllocation.HostVolumeIDs != nil {
selectOptions.AllocationHostVolumeIDs = prevAllocation.HostVolumeIDs
}
}
if preferredNode != nil {
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
@@ -910,6 +935,29 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No
return preferredNode, nil
}
}
for _, vol := range place.TaskGroup().Volumes {
if !vol.Sticky {
continue
}
var preferredNode *structs.Node
preferredNode, err := s.state.NodeByID(nil, prev.NodeID)
if err != nil {
return nil, err
}
if preferredNode != nil && preferredNode.Ready() {
// if this node has at least one of the allocation volumes, it's a
// preferred one
for _, vol := range preferredNode.HostVolumes {
if slices.Contains(prev.HostVolumeIDs, vol.ID) {
return preferredNode, nil
}
}
}
}
return nil, nil
}

View File

@@ -218,6 +218,121 @@ func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
}
}
func TestServiceSched_JobRegister_StickyVolumes(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
hostVolCapsReadWrite := []*structs.HostVolumeCapability{
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeWriter,
},
}
dhv := &structs.HostVolume{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Name: "foo",
NodeID: nodes[1].ID,
RequestedCapabilities: hostVolCapsReadWrite,
State: structs.HostVolumeStateReady,
}
nodes[0].HostVolumes = map[string]*structs.ClientHostVolumeConfig{}
nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {ID: dhv.ID}}
for _, node := range nodes {
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, 1000, node))
}
must.NoError(t, h.State.UpsertHostVolume(1000, dhv))
stickyRequest := map[string]*structs.VolumeRequest{
"foo": {
Type: "host",
Source: "foo",
Sticky: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
}
// Create a job
job := mock.Job()
job.TaskGroups[0].Volumes = stickyRequest
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
must.NoError(t, h.Process(NewServiceScheduler, eval))
// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
}
}
must.MapLen(t, 10, planned)
// Ensure that the allocations got the host volume ID added
for _, p := range planned {
must.Eq(t, p.PreviousAllocation, "")
must.Eq(t, p.HostVolumeIDs[0], dhv.ID)
}
// Update the job to force a rolling upgrade
updated := job.Copy()
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
// Create a mock evaluation to handle the update
eval = &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
must.NoError(t, h.Process(NewServiceScheduler, eval))
// Ensure we have created only one new allocation
must.SliceLen(t, 2, h.Plans)
plan = h.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
must.SliceLen(t, 10, newPlanned)
// Ensure that the new allocations retain the host volume ID
for _, new := range newPlanned {
must.Eq(t, new.HostVolumeIDs[0], dhv.ID)
}
}
func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) {
ci.Parallel(t)

View File

@@ -118,8 +118,11 @@ type State interface {
// CSIVolumeByID fetch CSI volumes, containing controller jobs
CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error)
// HostVolumeByID fetches host volume by its ID
HostVolumeByID(memdb.WatchSet, string, string, bool) (*structs.HostVolume, error)
// HostVolumesByNodeID gets an iterator with all the volumes attached to a
// given node
HostVolumesByNodeID(memdb.WatchSet, string, state.SortOption) (memdb.ResultIterator, error)
// LatestIndex returns the greatest index value for all indexes.

View File

@@ -35,10 +35,11 @@ type Stack interface {
}
type SelectOptions struct {
PenaltyNodeIDs map[string]struct{}
PreferredNodes []*structs.Node
Preempt bool
AllocName string
PenaltyNodeIDs map[string]struct{}
PreferredNodes []*structs.Node
Preempt bool
AllocName string
AllocationHostVolumeIDs []string
}
// GenericStack is the Stack used for the Generic scheduler. It is
@@ -156,7 +157,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
@@ -349,7 +350,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])