diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 3f6673635..88eaddaec 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -86,12 +86,19 @@ func (c *csiHook) Postrun() error { var mErr *multierror.Error for _, pair := range c.volumeRequests { + + mode := structs.CSIVolumeClaimRead + if !pair.request.ReadOnly { + mode = structs.CSIVolumeClaimWrite + } + req := &structs.CSIVolumeUnpublishRequest{ VolumeID: pair.request.Source, Claim: &structs.CSIVolumeClaim{ AllocationID: c.alloc.ID, NodeID: c.alloc.NodeID, - Mode: structs.CSIVolumeClaimRelease, + Mode: mode, + State: structs.CSIVolumeClaimStateUnpublishing, }, WriteRequest: structs.WriteRequest{ Region: c.alloc.Job.Region, diff --git a/command/agent/csi_endpoint.go b/command/agent/csi_endpoint.go index d3f6a2c42..028a5ccee 100644 --- a/command/agent/csi_endpoint.go +++ b/command/agent/csi_endpoint.go @@ -187,7 +187,7 @@ func (s *HTTPServer) csiVolumeDetach(id string, resp http.ResponseWriter, req *h VolumeID: id, Claim: &structs.CSIVolumeClaim{ NodeID: nodeID, - Mode: structs.CSIVolumeClaimRelease, + Mode: structs.CSIVolumeClaimGC, }, } s.parseWriteRequest(req, &args.WriteRequest) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 1ac135d0a..1b67f25d6 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -723,7 +723,8 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { gcClaims := func(ns, volID string) error { req := &structs.CSIVolumeClaimRequest{ VolumeID: volID, - Claim: structs.CSIVolumeClaimRelease, + Claim: structs.CSIVolumeClaimGC, + State: structs.CSIVolumeClaimStateUnpublishing, WriteRequest: structs.WriteRequest{ Namespace: ns, Region: c.srv.Region(), diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 78655f4a8..a88d24c0a 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -368,10 +368,13 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS return fmt.Errorf("missing volume ID") } + isNewClaim := args.Claim != structs.CSIVolumeClaimGC && + args.State == structs.CSIVolumeClaimStateTaken + // COMPAT(1.0): the NodeID field was added after 0.11.0 and so we // need to ensure it's been populated during upgrades from 0.11.0 // to later patch versions. Remove this block in 1.0 - if args.Claim != structs.CSIVolumeClaimRelease && args.NodeID == "" { + if isNewClaim && args.NodeID == "" { state := v.srv.fsm.State() ws := memdb.NewWatchSet() alloc, err := state.AllocByID(ws, args.AllocationID) @@ -385,7 +388,7 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS args.NodeID = alloc.NodeID } - if args.Claim != structs.CSIVolumeClaimRelease { + if isNewClaim { // if this is a new claim, add a Volume and PublishContext from the // controller (if any) to the reply err = v.controllerPublishVolume(args, reply) @@ -548,7 +551,6 @@ func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *st } claim := args.Claim - claim.Mode = structs.CSIVolumeClaimRelease // previous checkpoints may have set the past claim state already. // in practice we should never see CSIVolumeClaimStateControllerDetached diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 09fa5db9b..87c453ba8 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2227,7 +2227,7 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s } var alloc *structs.Allocation - if claim.Mode != structs.CSIVolumeClaimRelease { + if claim.State == structs.CSIVolumeClaimStateTaken { alloc, err = s.AllocByID(ws, claim.AllocationID) if err != nil { s.logger.Error("AllocByID failed", "error", err) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index e104850b5..982080b21 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2951,7 +2951,7 @@ func TestStateStore_CSIVolume(t *testing.T) { // Claims r := structs.CSIVolumeClaimRead w := structs.CSIVolumeClaimWrite - u := structs.CSIVolumeClaimRelease + u := structs.CSIVolumeClaimGC claim0 := &structs.CSIVolumeClaim{ AllocationID: a0.ID, NodeID: node.ID, diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index b0c5796ca..79d3c9cab 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -226,6 +226,7 @@ const ( CSIVolumeClaimStateNodeDetached CSIVolumeClaimStateControllerDetached CSIVolumeClaimStateReadyToFree + CSIVolumeClaimStateUnpublishing ) // CSIVolume is the full representation of a CSI Volume @@ -443,15 +444,17 @@ func (v *CSIVolume) Copy() *CSIVolume { // Claim updates the allocations and changes the volume state func (v *CSIVolume) Claim(claim *CSIVolumeClaim, alloc *Allocation) error { - switch claim.Mode { - case CSIVolumeClaimRead: - return v.ClaimRead(claim, alloc) - case CSIVolumeClaimWrite: - return v.ClaimWrite(claim, alloc) - case CSIVolumeClaimRelease: - return v.ClaimRelease(claim) + + if claim.State == CSIVolumeClaimStateTaken { + switch claim.Mode { + case CSIVolumeClaimRead: + return v.ClaimRead(claim, alloc) + case CSIVolumeClaimWrite: + return v.ClaimWrite(claim, alloc) + } } - return nil + // either GC or a Unpublish checkpoint + return v.ClaimRelease(claim) } // ClaimRead marks an allocation as using a volume read-only @@ -633,7 +636,11 @@ type CSIVolumeClaimMode int const ( CSIVolumeClaimRead CSIVolumeClaimMode = iota CSIVolumeClaimWrite - CSIVolumeClaimRelease + + // for GC we don't have a specific claim to set the state on, so instead we + // create a new claim for GC in order to bump the ModifyIndex and trigger + // volumewatcher + CSIVolumeClaimGC ) type CSIVolumeClaimBatchRequest struct { diff --git a/nomad/volumewatcher/volume_watcher_test.go b/nomad/volumewatcher/volume_watcher_test.go index 2d348ffe8..848ca58b9 100644 --- a/nomad/volumewatcher/volume_watcher_test.go +++ b/nomad/volumewatcher/volume_watcher_test.go @@ -44,7 +44,7 @@ func TestVolumeWatch_Reap(t *testing.T) { vol.PastClaims = map[string]*structs.CSIVolumeClaim{ alloc.ID: { NodeID: node.ID, - Mode: structs.CSIVolumeClaimRelease, + Mode: structs.CSIVolumeClaimRead, State: structs.CSIVolumeClaimStateNodeDetached, }, } @@ -56,7 +56,7 @@ func TestVolumeWatch_Reap(t *testing.T) { vol.PastClaims = map[string]*structs.CSIVolumeClaim{ "": { NodeID: node.ID, - Mode: structs.CSIVolumeClaimRelease, + Mode: structs.CSIVolumeClaimGC, }, } err = w.volumeReapImpl(vol) @@ -68,7 +68,7 @@ func TestVolumeWatch_Reap(t *testing.T) { vol.PastClaims = map[string]*structs.CSIVolumeClaim{ "": { NodeID: node.ID, - Mode: structs.CSIVolumeClaimRelease, + Mode: structs.CSIVolumeClaimRead, }, } err = w.volumeReapImpl(vol) diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 760836774..1b767367e 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -35,7 +35,10 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) require.NoError(err) - claim := &structs.CSIVolumeClaim{Mode: structs.CSIVolumeClaimRelease} + claim := &structs.CSIVolumeClaim{ + Mode: structs.CSIVolumeClaimGC, + State: structs.CSIVolumeClaimStateNodeDetached, + } index++ err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim) require.NoError(err) @@ -147,7 +150,6 @@ func TestVolumeWatch_StartStop(t *testing.T) { claim = &structs.CSIVolumeClaim{ AllocationID: alloc1.ID, NodeID: node.ID, - Mode: structs.CSIVolumeClaimRelease, } index++ err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)