mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
csi: the scheduler allows a job with a volume write claim to be updated (#7438)
* nomad/structs/csi: split CanWrite into health, in use * scheduler/scheduler: expose AllocByID in the state interface * nomad/state/state_store_test * scheduler/stack: SetJobID on the matcher * scheduler/feasible: when a volume writer is in use, check if it's us * scheduler/feasible: remove SetJob * nomad/state/state_store: denormalize allocs before Claim * nomad/structs/csi: return errors on claim, with context * nomad/csi_endpoint_test: new alloc doesn't look like an update * nomad/state/state_store_test: change test reference to CanWrite
This commit is contained in:
@@ -248,17 +248,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
|
||||
err = state.CSIVolumeRegister(1003, vols)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Upsert the job and alloc
|
||||
alloc.NodeID = node.ID
|
||||
summary := mock.JobSummary(alloc.JobID)
|
||||
require.NoError(t, state.UpsertJobSummary(1004, summary))
|
||||
require.NoError(t, state.UpsertAllocs(1005, []*structs.Allocation{alloc}))
|
||||
|
||||
// Now our claim should succeed
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the claim was set
|
||||
// Verify that the volume exists, and is healthy
|
||||
volGetReq := &structs.CSIVolumeGetRequest{
|
||||
ID: id0,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
@@ -270,11 +260,30 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", volGetReq, volGetResp)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, id0, volGetResp.Volume.ID)
|
||||
require.True(t, volGetResp.Volume.Schedulable)
|
||||
require.Len(t, volGetResp.Volume.ReadAllocs, 0)
|
||||
require.Len(t, volGetResp.Volume.WriteAllocs, 0)
|
||||
|
||||
// Upsert the job and alloc
|
||||
alloc.NodeID = node.ID
|
||||
summary := mock.JobSummary(alloc.JobID)
|
||||
require.NoError(t, state.UpsertJobSummary(1004, summary))
|
||||
require.NoError(t, state.UpsertAllocs(1005, []*structs.Allocation{alloc}))
|
||||
|
||||
// Now our claim should succeed
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the claim was set
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", volGetReq, volGetResp)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, id0, volGetResp.Volume.ID)
|
||||
require.Len(t, volGetResp.Volume.ReadAllocs, 0)
|
||||
require.Len(t, volGetResp.Volume.WriteAllocs, 1)
|
||||
|
||||
// Make another writer claim for a different alloc
|
||||
// Make another writer claim for a different job
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.JobID = uuid.Generate()
|
||||
summary = mock.JobSummary(alloc2.JobID)
|
||||
require.NoError(t, state.UpsertJobSummary(1005, summary))
|
||||
require.NoError(t, state.UpsertAllocs(1006, []*structs.Allocation{alloc2}))
|
||||
|
||||
@@ -1793,8 +1793,15 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, alloc *s
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !volume.Claim(claim, alloc) {
|
||||
return fmt.Errorf("volume max claim reached")
|
||||
|
||||
volume, err = s.CSIVolumeDenormalize(ws, volume)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = volume.Claim(claim, alloc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
volume.ModifyIndex = index
|
||||
|
||||
@@ -2947,8 +2947,8 @@ func TestStateStore_CSIVolume(t *testing.T) {
|
||||
require.Equal(t, 1, len(vs))
|
||||
|
||||
// Claims
|
||||
a0 := &structs.Allocation{ID: "al"}
|
||||
a1 := &structs.Allocation{ID: "gator"}
|
||||
a0 := &structs.Allocation{ID: uuid.Generate()}
|
||||
a1 := &structs.Allocation{ID: uuid.Generate()}
|
||||
r := structs.CSIVolumeClaimRead
|
||||
w := structs.CSIVolumeClaimWrite
|
||||
u := structs.CSIVolumeClaimRelease
|
||||
@@ -2964,7 +2964,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
|
||||
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
|
||||
require.NoError(t, err)
|
||||
vs = slurp(iter)
|
||||
require.False(t, vs[0].CanWrite())
|
||||
require.False(t, vs[0].WriteFreeClaims())
|
||||
|
||||
err = state.CSIVolumeClaim(2, ns, vol0, a0, u)
|
||||
require.NoError(t, err)
|
||||
@@ -2972,7 +2972,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
|
||||
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
|
||||
require.NoError(t, err)
|
||||
vs = slurp(iter)
|
||||
require.True(t, vs[0].CanReadOnly())
|
||||
require.True(t, vs[0].ReadSchedulable())
|
||||
}
|
||||
|
||||
// TestStateStore_CSIPluginNodes uses the state from jobs, and uses node fingerprinting to update health
|
||||
|
||||
@@ -296,7 +296,7 @@ func (v *CSIVolume) Stub() *CSIVolListStub {
|
||||
return &stub
|
||||
}
|
||||
|
||||
func (v *CSIVolume) CanReadOnly() bool {
|
||||
func (v *CSIVolume) ReadSchedulable() bool {
|
||||
if !v.Schedulable {
|
||||
return false
|
||||
}
|
||||
@@ -304,21 +304,31 @@ func (v *CSIVolume) CanReadOnly() bool {
|
||||
return v.ResourceExhausted == time.Time{}
|
||||
}
|
||||
|
||||
func (v *CSIVolume) CanWrite() bool {
|
||||
// WriteSchedulable determines if the volume is schedulable for writes, considering only
|
||||
// volume health
|
||||
func (v *CSIVolume) WriteSchedulable() bool {
|
||||
if !v.Schedulable {
|
||||
return false
|
||||
}
|
||||
|
||||
switch v.AccessMode {
|
||||
case CSIVolumeAccessModeSingleNodeWriter, CSIVolumeAccessModeMultiNodeSingleWriter:
|
||||
return len(v.WriteAllocs) == 0
|
||||
case CSIVolumeAccessModeMultiNodeMultiWriter:
|
||||
case CSIVolumeAccessModeSingleNodeWriter, CSIVolumeAccessModeMultiNodeSingleWriter, CSIVolumeAccessModeMultiNodeMultiWriter:
|
||||
return v.ResourceExhausted == time.Time{}
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// WriteFreeClaims determines if there are any free write claims available
|
||||
func (v *CSIVolume) WriteFreeClaims() bool {
|
||||
switch v.AccessMode {
|
||||
case CSIVolumeAccessModeSingleNodeWriter, CSIVolumeAccessModeMultiNodeSingleWriter, CSIVolumeAccessModeMultiNodeMultiWriter:
|
||||
return len(v.WriteAllocs) == 0
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Copy returns a copy of the volume, which shares only the Topologies slice
|
||||
func (v *CSIVolume) Copy() *CSIVolume {
|
||||
copy := *v
|
||||
@@ -337,7 +347,7 @@ func (v *CSIVolume) Copy() *CSIVolume {
|
||||
}
|
||||
|
||||
// Claim updates the allocations and changes the volume state
|
||||
func (v *CSIVolume) Claim(claim CSIVolumeClaimMode, alloc *Allocation) bool {
|
||||
func (v *CSIVolume) Claim(claim CSIVolumeClaimMode, alloc *Allocation) error {
|
||||
switch claim {
|
||||
case CSIVolumeClaimRead:
|
||||
return v.ClaimRead(alloc)
|
||||
@@ -346,46 +356,57 @@ func (v *CSIVolume) Claim(claim CSIVolumeClaimMode, alloc *Allocation) bool {
|
||||
case CSIVolumeClaimRelease:
|
||||
return v.ClaimRelease(alloc)
|
||||
}
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClaimRead marks an allocation as using a volume read-only
|
||||
func (v *CSIVolume) ClaimRead(alloc *Allocation) bool {
|
||||
func (v *CSIVolume) ClaimRead(alloc *Allocation) error {
|
||||
if _, ok := v.ReadAllocs[alloc.ID]; ok {
|
||||
return true
|
||||
return nil
|
||||
}
|
||||
|
||||
if !v.CanReadOnly() {
|
||||
return false
|
||||
if !v.ReadSchedulable() {
|
||||
return fmt.Errorf("unschedulable")
|
||||
}
|
||||
|
||||
// Allocations are copy on write, so we want to keep the id but don't need the
|
||||
// pointer. We'll get it from the db in denormalize.
|
||||
v.ReadAllocs[alloc.ID] = nil
|
||||
delete(v.WriteAllocs, alloc.ID)
|
||||
return true
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClaimWrite marks an allocation as using a volume as a writer
|
||||
func (v *CSIVolume) ClaimWrite(alloc *Allocation) bool {
|
||||
func (v *CSIVolume) ClaimWrite(alloc *Allocation) error {
|
||||
if _, ok := v.WriteAllocs[alloc.ID]; ok {
|
||||
return true
|
||||
return nil
|
||||
}
|
||||
|
||||
if !v.CanWrite() {
|
||||
return false
|
||||
if !v.WriteSchedulable() {
|
||||
return fmt.Errorf("unschedulable")
|
||||
}
|
||||
|
||||
if !v.WriteFreeClaims() {
|
||||
// Check the blocking allocations to see if they belong to this job
|
||||
for _, a := range v.WriteAllocs {
|
||||
if a.Namespace != alloc.Namespace || a.JobID != alloc.JobID {
|
||||
return fmt.Errorf("volume max claim reached")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Allocations are copy on write, so we want to keep the id but don't need the
|
||||
// pointer. We'll get it from the db in denormalize.
|
||||
v.WriteAllocs[alloc.ID] = nil
|
||||
delete(v.ReadAllocs, alloc.ID)
|
||||
return true
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClaimRelease is called when the allocation has terminated and already stopped using the volume
|
||||
func (v *CSIVolume) ClaimRelease(alloc *Allocation) bool {
|
||||
func (v *CSIVolume) ClaimRelease(alloc *Allocation) error {
|
||||
delete(v.ReadAllocs, alloc.ID)
|
||||
delete(v.WriteAllocs, alloc.ID)
|
||||
return true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Equality by value
|
||||
|
||||
@@ -11,20 +11,18 @@ func TestCSIVolumeClaim(t *testing.T) {
|
||||
vol.AccessMode = CSIVolumeAccessModeMultiNodeSingleWriter
|
||||
vol.Schedulable = true
|
||||
|
||||
alloc := &Allocation{ID: "a1"}
|
||||
alloc2 := &Allocation{ID: "a2"}
|
||||
alloc := &Allocation{ID: "a1", Namespace: "n", JobID: "j"}
|
||||
|
||||
vol.ClaimRead(alloc)
|
||||
require.True(t, vol.CanReadOnly())
|
||||
require.True(t, vol.CanWrite())
|
||||
require.True(t, vol.ClaimRead(alloc))
|
||||
require.NoError(t, vol.ClaimRead(alloc))
|
||||
require.True(t, vol.ReadSchedulable())
|
||||
require.True(t, vol.WriteSchedulable())
|
||||
require.NoError(t, vol.ClaimRead(alloc))
|
||||
|
||||
vol.ClaimWrite(alloc)
|
||||
require.True(t, vol.CanReadOnly())
|
||||
require.False(t, vol.CanWrite())
|
||||
require.False(t, vol.ClaimWrite(alloc2))
|
||||
require.NoError(t, vol.ClaimWrite(alloc))
|
||||
require.True(t, vol.ReadSchedulable())
|
||||
require.False(t, vol.WriteFreeClaims())
|
||||
|
||||
vol.ClaimRelease(alloc)
|
||||
require.True(t, vol.CanReadOnly())
|
||||
require.True(t, vol.CanWrite())
|
||||
require.True(t, vol.ReadSchedulable())
|
||||
require.True(t, vol.WriteFreeClaims())
|
||||
}
|
||||
|
||||
@@ -15,14 +15,16 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
FilterConstraintHostVolumes = "missing compatible host volumes"
|
||||
FilterConstraintCSIPlugins = "missing CSI plugins"
|
||||
FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed"
|
||||
FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s"
|
||||
FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s has exhausted its available reader claims"
|
||||
FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s has exhausted its available writer claims or is read-only"
|
||||
FilterConstraintDrivers = "missing drivers"
|
||||
FilterConstraintDevices = "missing devices"
|
||||
FilterConstraintHostVolumes = "missing compatible host volumes"
|
||||
FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s"
|
||||
FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s"
|
||||
FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed"
|
||||
FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s"
|
||||
FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims"
|
||||
FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only"
|
||||
FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" //
|
||||
FilterConstraintDrivers = "missing drivers"
|
||||
FilterConstraintDevices = "missing devices"
|
||||
)
|
||||
|
||||
// FeasibleIterator is used to iteratively yield nodes that
|
||||
@@ -191,6 +193,7 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
|
||||
type CSIVolumeChecker struct {
|
||||
ctx Context
|
||||
namespace string
|
||||
jobID string
|
||||
volumes map[string]*structs.VolumeRequest
|
||||
}
|
||||
|
||||
@@ -200,6 +203,10 @@ func NewCSIVolumeChecker(ctx Context) *CSIVolumeChecker {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CSIVolumeChecker) SetJobID(jobID string) {
|
||||
c.jobID = jobID
|
||||
}
|
||||
|
||||
func (c *CSIVolumeChecker) SetNamespace(namespace string) {
|
||||
c.namespace = namespace
|
||||
}
|
||||
@@ -231,7 +238,7 @@ func (c *CSIVolumeChecker) Feasible(n *structs.Node) bool {
|
||||
func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
|
||||
// We can mount the volume if
|
||||
// - if required, a healthy controller plugin is running the driver
|
||||
// - the volume has free claims
|
||||
// - the volume has free claims, or this job owns the claims
|
||||
// - this node is running the node plugin, implies matching topology
|
||||
|
||||
// Fast path: Requested no volumes. No need to check further.
|
||||
@@ -241,8 +248,6 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
for _, req := range c.volumes {
|
||||
// Get the volume to check that it's healthy (there's a healthy controller
|
||||
// and the volume hasn't encountered an error or been marked for GC
|
||||
vol, err := c.ctx.State().CSIVolumeByID(ws, c.namespace, req.Source)
|
||||
if err != nil {
|
||||
return false, FilterConstraintCSIVolumesLookupFailed
|
||||
@@ -253,15 +258,32 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
|
||||
|
||||
// Check that this node has a healthy running plugin with the right PluginID
|
||||
plugin, ok := n.CSINodePlugins[vol.PluginID]
|
||||
if !(ok && plugin.Healthy) {
|
||||
return false, FilterConstraintCSIPlugins
|
||||
if !ok {
|
||||
return false, fmt.Sprintf(FilterConstraintCSIPluginTemplate, vol.PluginID, n.ID)
|
||||
}
|
||||
if !plugin.Healthy {
|
||||
return false, fmt.Sprintf(FilterConstraintCSIPluginUnhealthyTemplate, vol.PluginID, n.ID)
|
||||
}
|
||||
|
||||
if req.ReadOnly {
|
||||
if !vol.CanReadOnly() {
|
||||
if !vol.ReadSchedulable() {
|
||||
return false, fmt.Sprintf(FilterConstraintCSIVolumeNoReadTemplate, vol.ID)
|
||||
}
|
||||
} else if !vol.CanWrite() {
|
||||
return false, fmt.Sprintf(FilterConstraintCSIVolumeNoWriteTemplate, vol.ID)
|
||||
} else {
|
||||
if !vol.WriteSchedulable() {
|
||||
return false, fmt.Sprintf(FilterConstraintCSIVolumeNoWriteTemplate, vol.ID)
|
||||
}
|
||||
if vol.WriteFreeClaims() {
|
||||
return true, ""
|
||||
}
|
||||
|
||||
// Check the blocking allocations to see if they belong to this job
|
||||
for id := range vol.WriteAllocs {
|
||||
a, err := c.ctx.State().AllocByID(ws, id)
|
||||
if err != nil || a.Namespace != c.namespace || a.JobID != c.jobID {
|
||||
return false, fmt.Sprintf(FilterConstraintCSIVolumeInUseTemplate, vol.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -76,6 +76,9 @@ type State interface {
|
||||
// AllocsByNode returns all the allocations by node
|
||||
AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error)
|
||||
|
||||
// AllocByID returns the allocation
|
||||
AllocByID(ws memdb.WatchSet, allocID string) (*structs.Allocation, error)
|
||||
|
||||
// AllocsByNodeTerminal returns all the allocations by node filtering by terminal status
|
||||
AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error)
|
||||
|
||||
|
||||
@@ -97,6 +97,7 @@ func (s *GenericStack) SetJob(job *structs.Job) {
|
||||
s.spread.SetJob(job)
|
||||
s.ctx.Eligibility().SetJob(job)
|
||||
s.taskGroupCSIVolumes.SetNamespace(job.Namespace)
|
||||
s.taskGroupCSIVolumes.SetJobID(job.ID)
|
||||
|
||||
if contextual, ok := s.quota.(ContextualIterator); ok {
|
||||
contextual.SetJob(job)
|
||||
|
||||
Reference in New Issue
Block a user