diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index eec6ffc8a..14ae17a7c 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -248,17 +248,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) { err = state.CSIVolumeRegister(1003, vols) require.NoError(t, err) - // Upsert the job and alloc - alloc.NodeID = node.ID - summary := mock.JobSummary(alloc.JobID) - require.NoError(t, state.UpsertJobSummary(1004, summary)) - require.NoError(t, state.UpsertAllocs(1005, []*structs.Allocation{alloc})) - - // Now our claim should succeed - err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) - require.NoError(t, err) - - // Verify the claim was set + // Verify that the volume exists, and is healthy volGetReq := &structs.CSIVolumeGetRequest{ ID: id0, QueryOptions: structs.QueryOptions{ @@ -270,11 +260,30 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) { err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", volGetReq, volGetResp) require.NoError(t, err) require.Equal(t, id0, volGetResp.Volume.ID) + require.True(t, volGetResp.Volume.Schedulable) + require.Len(t, volGetResp.Volume.ReadAllocs, 0) + require.Len(t, volGetResp.Volume.WriteAllocs, 0) + + // Upsert the job and alloc + alloc.NodeID = node.ID + summary := mock.JobSummary(alloc.JobID) + require.NoError(t, state.UpsertJobSummary(1004, summary)) + require.NoError(t, state.UpsertAllocs(1005, []*structs.Allocation{alloc})) + + // Now our claim should succeed + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) + require.NoError(t, err) + + // Verify the claim was set + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", volGetReq, volGetResp) + require.NoError(t, err) + require.Equal(t, id0, volGetResp.Volume.ID) require.Len(t, volGetResp.Volume.ReadAllocs, 0) require.Len(t, volGetResp.Volume.WriteAllocs, 1) - // Make another writer claim for a different alloc + // Make another writer claim for a different job alloc2 := mock.Alloc() + alloc2.JobID = uuid.Generate() summary = mock.JobSummary(alloc2.JobID) require.NoError(t, state.UpsertJobSummary(1005, summary)) require.NoError(t, state.UpsertAllocs(1006, []*structs.Allocation{alloc2})) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index ad2493d39..04fae83ad 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1793,8 +1793,15 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, alloc *s if err != nil { return err } - if !volume.Claim(claim, alloc) { - return fmt.Errorf("volume max claim reached") + + volume, err = s.CSIVolumeDenormalize(ws, volume) + if err != nil { + return err + } + + err = volume.Claim(claim, alloc) + if err != nil { + return err } volume.ModifyIndex = index diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 97522e2a7..13dc6093b 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2947,8 +2947,8 @@ func TestStateStore_CSIVolume(t *testing.T) { require.Equal(t, 1, len(vs)) // Claims - a0 := &structs.Allocation{ID: "al"} - a1 := &structs.Allocation{ID: "gator"} + a0 := &structs.Allocation{ID: uuid.Generate()} + a1 := &structs.Allocation{ID: uuid.Generate()} r := structs.CSIVolumeClaimRead w := structs.CSIVolumeClaimWrite u := structs.CSIVolumeClaimRelease @@ -2964,7 +2964,7 @@ func TestStateStore_CSIVolume(t *testing.T) { iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie") require.NoError(t, err) vs = slurp(iter) - require.False(t, vs[0].CanWrite()) + require.False(t, vs[0].WriteFreeClaims()) err = state.CSIVolumeClaim(2, ns, vol0, a0, u) require.NoError(t, err) @@ -2972,7 +2972,7 @@ func TestStateStore_CSIVolume(t *testing.T) { iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie") require.NoError(t, err) vs = slurp(iter) - require.True(t, vs[0].CanReadOnly()) + require.True(t, vs[0].ReadSchedulable()) } // TestStateStore_CSIPluginNodes uses the state from jobs, and uses node fingerprinting to update health diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index bbd19b99a..e0d6e5cf9 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -296,7 +296,7 @@ func (v *CSIVolume) Stub() *CSIVolListStub { return &stub } -func (v *CSIVolume) CanReadOnly() bool { +func (v *CSIVolume) ReadSchedulable() bool { if !v.Schedulable { return false } @@ -304,21 +304,31 @@ func (v *CSIVolume) CanReadOnly() bool { return v.ResourceExhausted == time.Time{} } -func (v *CSIVolume) CanWrite() bool { +// WriteSchedulable determines if the volume is schedulable for writes, considering only +// volume health +func (v *CSIVolume) WriteSchedulable() bool { if !v.Schedulable { return false } switch v.AccessMode { - case CSIVolumeAccessModeSingleNodeWriter, CSIVolumeAccessModeMultiNodeSingleWriter: - return len(v.WriteAllocs) == 0 - case CSIVolumeAccessModeMultiNodeMultiWriter: + case CSIVolumeAccessModeSingleNodeWriter, CSIVolumeAccessModeMultiNodeSingleWriter, CSIVolumeAccessModeMultiNodeMultiWriter: return v.ResourceExhausted == time.Time{} default: return false } } +// WriteFreeClaims determines if there are any free write claims available +func (v *CSIVolume) WriteFreeClaims() bool { + switch v.AccessMode { + case CSIVolumeAccessModeSingleNodeWriter, CSIVolumeAccessModeMultiNodeSingleWriter, CSIVolumeAccessModeMultiNodeMultiWriter: + return len(v.WriteAllocs) == 0 + default: + return false + } +} + // Copy returns a copy of the volume, which shares only the Topologies slice func (v *CSIVolume) Copy() *CSIVolume { copy := *v @@ -337,7 +347,7 @@ func (v *CSIVolume) Copy() *CSIVolume { } // Claim updates the allocations and changes the volume state -func (v *CSIVolume) Claim(claim CSIVolumeClaimMode, alloc *Allocation) bool { +func (v *CSIVolume) Claim(claim CSIVolumeClaimMode, alloc *Allocation) error { switch claim { case CSIVolumeClaimRead: return v.ClaimRead(alloc) @@ -346,46 +356,57 @@ func (v *CSIVolume) Claim(claim CSIVolumeClaimMode, alloc *Allocation) bool { case CSIVolumeClaimRelease: return v.ClaimRelease(alloc) } - return false + return nil } // ClaimRead marks an allocation as using a volume read-only -func (v *CSIVolume) ClaimRead(alloc *Allocation) bool { +func (v *CSIVolume) ClaimRead(alloc *Allocation) error { if _, ok := v.ReadAllocs[alloc.ID]; ok { - return true + return nil } - if !v.CanReadOnly() { - return false + if !v.ReadSchedulable() { + return fmt.Errorf("unschedulable") } + // Allocations are copy on write, so we want to keep the id but don't need the // pointer. We'll get it from the db in denormalize. v.ReadAllocs[alloc.ID] = nil delete(v.WriteAllocs, alloc.ID) - return true + return nil } // ClaimWrite marks an allocation as using a volume as a writer -func (v *CSIVolume) ClaimWrite(alloc *Allocation) bool { +func (v *CSIVolume) ClaimWrite(alloc *Allocation) error { if _, ok := v.WriteAllocs[alloc.ID]; ok { - return true + return nil } - if !v.CanWrite() { - return false + if !v.WriteSchedulable() { + return fmt.Errorf("unschedulable") } + + if !v.WriteFreeClaims() { + // Check the blocking allocations to see if they belong to this job + for _, a := range v.WriteAllocs { + if a.Namespace != alloc.Namespace || a.JobID != alloc.JobID { + return fmt.Errorf("volume max claim reached") + } + } + } + // Allocations are copy on write, so we want to keep the id but don't need the // pointer. We'll get it from the db in denormalize. v.WriteAllocs[alloc.ID] = nil delete(v.ReadAllocs, alloc.ID) - return true + return nil } // ClaimRelease is called when the allocation has terminated and already stopped using the volume -func (v *CSIVolume) ClaimRelease(alloc *Allocation) bool { +func (v *CSIVolume) ClaimRelease(alloc *Allocation) error { delete(v.ReadAllocs, alloc.ID) delete(v.WriteAllocs, alloc.ID) - return true + return nil } // Equality by value diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index 2978baf49..5e9fedfc0 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -11,20 +11,18 @@ func TestCSIVolumeClaim(t *testing.T) { vol.AccessMode = CSIVolumeAccessModeMultiNodeSingleWriter vol.Schedulable = true - alloc := &Allocation{ID: "a1"} - alloc2 := &Allocation{ID: "a2"} + alloc := &Allocation{ID: "a1", Namespace: "n", JobID: "j"} - vol.ClaimRead(alloc) - require.True(t, vol.CanReadOnly()) - require.True(t, vol.CanWrite()) - require.True(t, vol.ClaimRead(alloc)) + require.NoError(t, vol.ClaimRead(alloc)) + require.True(t, vol.ReadSchedulable()) + require.True(t, vol.WriteSchedulable()) + require.NoError(t, vol.ClaimRead(alloc)) - vol.ClaimWrite(alloc) - require.True(t, vol.CanReadOnly()) - require.False(t, vol.CanWrite()) - require.False(t, vol.ClaimWrite(alloc2)) + require.NoError(t, vol.ClaimWrite(alloc)) + require.True(t, vol.ReadSchedulable()) + require.False(t, vol.WriteFreeClaims()) vol.ClaimRelease(alloc) - require.True(t, vol.CanReadOnly()) - require.True(t, vol.CanWrite()) + require.True(t, vol.ReadSchedulable()) + require.True(t, vol.WriteFreeClaims()) } diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 35ce4f57c..afb4c84e9 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -15,14 +15,16 @@ import ( ) const ( - FilterConstraintHostVolumes = "missing compatible host volumes" - FilterConstraintCSIPlugins = "missing CSI plugins" - FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed" - FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s" - FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s has exhausted its available reader claims" - FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s has exhausted its available writer claims or is read-only" - FilterConstraintDrivers = "missing drivers" - FilterConstraintDevices = "missing devices" + FilterConstraintHostVolumes = "missing compatible host volumes" + FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s" + FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s" + FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed" + FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s" + FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims" + FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only" + FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" // + FilterConstraintDrivers = "missing drivers" + FilterConstraintDevices = "missing devices" ) // FeasibleIterator is used to iteratively yield nodes that @@ -191,6 +193,7 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { type CSIVolumeChecker struct { ctx Context namespace string + jobID string volumes map[string]*structs.VolumeRequest } @@ -200,6 +203,10 @@ func NewCSIVolumeChecker(ctx Context) *CSIVolumeChecker { } } +func (c *CSIVolumeChecker) SetJobID(jobID string) { + c.jobID = jobID +} + func (c *CSIVolumeChecker) SetNamespace(namespace string) { c.namespace = namespace } @@ -231,7 +238,7 @@ func (c *CSIVolumeChecker) Feasible(n *structs.Node) bool { func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) { // We can mount the volume if // - if required, a healthy controller plugin is running the driver - // - the volume has free claims + // - the volume has free claims, or this job owns the claims // - this node is running the node plugin, implies matching topology // Fast path: Requested no volumes. No need to check further. @@ -241,8 +248,6 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) { ws := memdb.NewWatchSet() for _, req := range c.volumes { - // Get the volume to check that it's healthy (there's a healthy controller - // and the volume hasn't encountered an error or been marked for GC vol, err := c.ctx.State().CSIVolumeByID(ws, c.namespace, req.Source) if err != nil { return false, FilterConstraintCSIVolumesLookupFailed @@ -253,15 +258,32 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) { // Check that this node has a healthy running plugin with the right PluginID plugin, ok := n.CSINodePlugins[vol.PluginID] - if !(ok && plugin.Healthy) { - return false, FilterConstraintCSIPlugins + if !ok { + return false, fmt.Sprintf(FilterConstraintCSIPluginTemplate, vol.PluginID, n.ID) } + if !plugin.Healthy { + return false, fmt.Sprintf(FilterConstraintCSIPluginUnhealthyTemplate, vol.PluginID, n.ID) + } + if req.ReadOnly { - if !vol.CanReadOnly() { + if !vol.ReadSchedulable() { return false, fmt.Sprintf(FilterConstraintCSIVolumeNoReadTemplate, vol.ID) } - } else if !vol.CanWrite() { - return false, fmt.Sprintf(FilterConstraintCSIVolumeNoWriteTemplate, vol.ID) + } else { + if !vol.WriteSchedulable() { + return false, fmt.Sprintf(FilterConstraintCSIVolumeNoWriteTemplate, vol.ID) + } + if vol.WriteFreeClaims() { + return true, "" + } + + // Check the blocking allocations to see if they belong to this job + for id := range vol.WriteAllocs { + a, err := c.ctx.State().AllocByID(ws, id) + if err != nil || a.Namespace != c.namespace || a.JobID != c.jobID { + return false, fmt.Sprintf(FilterConstraintCSIVolumeInUseTemplate, vol.ID) + } + } } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 1b7b68635..66974bc32 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -76,6 +76,9 @@ type State interface { // AllocsByNode returns all the allocations by node AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) + // AllocByID returns the allocation + AllocByID(ws memdb.WatchSet, allocID string) (*structs.Allocation, error) + // AllocsByNodeTerminal returns all the allocations by node filtering by terminal status AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error) diff --git a/scheduler/stack.go b/scheduler/stack.go index c673870b0..73d66ba13 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -97,6 +97,7 @@ func (s *GenericStack) SetJob(job *structs.Job) { s.spread.SetJob(job) s.ctx.Eligibility().SetJob(job) s.taskGroupCSIVolumes.SetNamespace(job.Namespace) + s.taskGroupCSIVolumes.SetJobID(job.ID) if contextual, ok := s.quota.(ContextualIterator); ok { contextual.SetJob(job)