mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 19:05:42 +03:00
csi: Postrun hook should not change mode (#9323)
The unpublish workflow requires that we know the mode (RW vs RO) if we want to unpublish the node. Update the hook and the Unpublish RPC so that we mark the claim for release in a new state but leave the mode alone. This fixes a bug where RO claims were failing node unpublish. The core job GC doesn't know the mode, but we don't need it for that workflow, so add a mode specifically for GC; the volumewatcher uses this as a sentinel to check whether claims (with their specific RW vs RO modes) need to be claimed.
This commit is contained in:
@@ -86,12 +86,19 @@ func (c *csiHook) Postrun() error {
|
||||
var mErr *multierror.Error
|
||||
|
||||
for _, pair := range c.volumeRequests {
|
||||
|
||||
mode := structs.CSIVolumeClaimRead
|
||||
if !pair.request.ReadOnly {
|
||||
mode = structs.CSIVolumeClaimWrite
|
||||
}
|
||||
|
||||
req := &structs.CSIVolumeUnpublishRequest{
|
||||
VolumeID: pair.request.Source,
|
||||
Claim: &structs.CSIVolumeClaim{
|
||||
AllocationID: c.alloc.ID,
|
||||
NodeID: c.alloc.NodeID,
|
||||
Mode: structs.CSIVolumeClaimRelease,
|
||||
Mode: mode,
|
||||
State: structs.CSIVolumeClaimStateUnpublishing,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.alloc.Job.Region,
|
||||
|
||||
@@ -187,7 +187,7 @@ func (s *HTTPServer) csiVolumeDetach(id string, resp http.ResponseWriter, req *h
|
||||
VolumeID: id,
|
||||
Claim: &structs.CSIVolumeClaim{
|
||||
NodeID: nodeID,
|
||||
Mode: structs.CSIVolumeClaimRelease,
|
||||
Mode: structs.CSIVolumeClaimGC,
|
||||
},
|
||||
}
|
||||
s.parseWriteRequest(req, &args.WriteRequest)
|
||||
|
||||
@@ -723,7 +723,8 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
|
||||
gcClaims := func(ns, volID string) error {
|
||||
req := &structs.CSIVolumeClaimRequest{
|
||||
VolumeID: volID,
|
||||
Claim: structs.CSIVolumeClaimRelease,
|
||||
Claim: structs.CSIVolumeClaimGC,
|
||||
State: structs.CSIVolumeClaimStateUnpublishing,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Namespace: ns,
|
||||
Region: c.srv.Region(),
|
||||
|
||||
@@ -368,10 +368,13 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
|
||||
return fmt.Errorf("missing volume ID")
|
||||
}
|
||||
|
||||
isNewClaim := args.Claim != structs.CSIVolumeClaimGC &&
|
||||
args.State == structs.CSIVolumeClaimStateTaken
|
||||
|
||||
// COMPAT(1.0): the NodeID field was added after 0.11.0 and so we
|
||||
// need to ensure it's been populated during upgrades from 0.11.0
|
||||
// to later patch versions. Remove this block in 1.0
|
||||
if args.Claim != structs.CSIVolumeClaimRelease && args.NodeID == "" {
|
||||
if isNewClaim && args.NodeID == "" {
|
||||
state := v.srv.fsm.State()
|
||||
ws := memdb.NewWatchSet()
|
||||
alloc, err := state.AllocByID(ws, args.AllocationID)
|
||||
@@ -385,7 +388,7 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
|
||||
args.NodeID = alloc.NodeID
|
||||
}
|
||||
|
||||
if args.Claim != structs.CSIVolumeClaimRelease {
|
||||
if isNewClaim {
|
||||
// if this is a new claim, add a Volume and PublishContext from the
|
||||
// controller (if any) to the reply
|
||||
err = v.controllerPublishVolume(args, reply)
|
||||
@@ -548,7 +551,6 @@ func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *st
|
||||
}
|
||||
|
||||
claim := args.Claim
|
||||
claim.Mode = structs.CSIVolumeClaimRelease
|
||||
|
||||
// previous checkpoints may have set the past claim state already.
|
||||
// in practice we should never see CSIVolumeClaimStateControllerDetached
|
||||
|
||||
@@ -2227,7 +2227,7 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s
|
||||
}
|
||||
|
||||
var alloc *structs.Allocation
|
||||
if claim.Mode != structs.CSIVolumeClaimRelease {
|
||||
if claim.State == structs.CSIVolumeClaimStateTaken {
|
||||
alloc, err = s.AllocByID(ws, claim.AllocationID)
|
||||
if err != nil {
|
||||
s.logger.Error("AllocByID failed", "error", err)
|
||||
|
||||
@@ -2951,7 +2951,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
|
||||
// Claims
|
||||
r := structs.CSIVolumeClaimRead
|
||||
w := structs.CSIVolumeClaimWrite
|
||||
u := structs.CSIVolumeClaimRelease
|
||||
u := structs.CSIVolumeClaimGC
|
||||
claim0 := &structs.CSIVolumeClaim{
|
||||
AllocationID: a0.ID,
|
||||
NodeID: node.ID,
|
||||
|
||||
@@ -226,6 +226,7 @@ const (
|
||||
CSIVolumeClaimStateNodeDetached
|
||||
CSIVolumeClaimStateControllerDetached
|
||||
CSIVolumeClaimStateReadyToFree
|
||||
CSIVolumeClaimStateUnpublishing
|
||||
)
|
||||
|
||||
// CSIVolume is the full representation of a CSI Volume
|
||||
@@ -443,15 +444,17 @@ func (v *CSIVolume) Copy() *CSIVolume {
|
||||
|
||||
// Claim updates the allocations and changes the volume state
|
||||
func (v *CSIVolume) Claim(claim *CSIVolumeClaim, alloc *Allocation) error {
|
||||
switch claim.Mode {
|
||||
case CSIVolumeClaimRead:
|
||||
return v.ClaimRead(claim, alloc)
|
||||
case CSIVolumeClaimWrite:
|
||||
return v.ClaimWrite(claim, alloc)
|
||||
case CSIVolumeClaimRelease:
|
||||
return v.ClaimRelease(claim)
|
||||
|
||||
if claim.State == CSIVolumeClaimStateTaken {
|
||||
switch claim.Mode {
|
||||
case CSIVolumeClaimRead:
|
||||
return v.ClaimRead(claim, alloc)
|
||||
case CSIVolumeClaimWrite:
|
||||
return v.ClaimWrite(claim, alloc)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
// either GC or a Unpublish checkpoint
|
||||
return v.ClaimRelease(claim)
|
||||
}
|
||||
|
||||
// ClaimRead marks an allocation as using a volume read-only
|
||||
@@ -633,7 +636,11 @@ type CSIVolumeClaimMode int
|
||||
const (
|
||||
CSIVolumeClaimRead CSIVolumeClaimMode = iota
|
||||
CSIVolumeClaimWrite
|
||||
CSIVolumeClaimRelease
|
||||
|
||||
// for GC we don't have a specific claim to set the state on, so instead we
|
||||
// create a new claim for GC in order to bump the ModifyIndex and trigger
|
||||
// volumewatcher
|
||||
CSIVolumeClaimGC
|
||||
)
|
||||
|
||||
type CSIVolumeClaimBatchRequest struct {
|
||||
|
||||
@@ -44,7 +44,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
|
||||
vol.PastClaims = map[string]*structs.CSIVolumeClaim{
|
||||
alloc.ID: {
|
||||
NodeID: node.ID,
|
||||
Mode: structs.CSIVolumeClaimRelease,
|
||||
Mode: structs.CSIVolumeClaimRead,
|
||||
State: structs.CSIVolumeClaimStateNodeDetached,
|
||||
},
|
||||
}
|
||||
@@ -56,7 +56,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
|
||||
vol.PastClaims = map[string]*structs.CSIVolumeClaim{
|
||||
"": {
|
||||
NodeID: node.ID,
|
||||
Mode: structs.CSIVolumeClaimRelease,
|
||||
Mode: structs.CSIVolumeClaimGC,
|
||||
},
|
||||
}
|
||||
err = w.volumeReapImpl(vol)
|
||||
@@ -68,7 +68,7 @@ func TestVolumeWatch_Reap(t *testing.T) {
|
||||
vol.PastClaims = map[string]*structs.CSIVolumeClaim{
|
||||
"": {
|
||||
NodeID: node.ID,
|
||||
Mode: structs.CSIVolumeClaimRelease,
|
||||
Mode: structs.CSIVolumeClaimRead,
|
||||
},
|
||||
}
|
||||
err = w.volumeReapImpl(vol)
|
||||
|
||||
@@ -35,7 +35,10 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
|
||||
err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
|
||||
require.NoError(err)
|
||||
|
||||
claim := &structs.CSIVolumeClaim{Mode: structs.CSIVolumeClaimRelease}
|
||||
claim := &structs.CSIVolumeClaim{
|
||||
Mode: structs.CSIVolumeClaimGC,
|
||||
State: structs.CSIVolumeClaimStateNodeDetached,
|
||||
}
|
||||
index++
|
||||
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
|
||||
require.NoError(err)
|
||||
@@ -147,7 +150,6 @@ func TestVolumeWatch_StartStop(t *testing.T) {
|
||||
claim = &structs.CSIVolumeClaim{
|
||||
AllocationID: alloc1.ID,
|
||||
NodeID: node.ID,
|
||||
Mode: structs.CSIVolumeClaimRelease,
|
||||
}
|
||||
index++
|
||||
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
|
||||
|
||||
Reference in New Issue
Block a user