mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
CSI: reorder controller volume detachment (#12387)
In #12112 and #12113 we solved for the problem of races in releasing volume claims, but there was a case that we missed. During a node drain with a controller attach/detach, we can hit a race where we call controller publish before the unpublish has completed. This is discouraged in the spec but plugins are supposed to handle it safely. But if the storage provider's API is slow enough and the plugin doesn't handle the case safely, the volume can get "locked" into a state where the provider's API won't detach it cleanly. Check the claim before making any external controller publish RPC calls so that Nomad is responsible for the canonical information about whether a volume is currently claimed. This has a couple side-effects that also had to get fixed here: * Changing the order means that the volume will have a past claim without a valid external node ID because it came from the client, and this uncovered a separate bug where we didn't assert the external node ID was valid before returning it. Fallthrough to getting the ID from the plugins in the state store in this case. We avoided this originally because of concerns around plugins getting lost during node drain but now that we've fixed that we may want to revisit it in future work. * We should make sure we're handling `FailedPrecondition` cases from the controller plugin the same way we handle other retryable cases. * Several tests had to be updated because they were assuming we fail in a particular order that we're no longer doing.
This commit is contained in:
@@ -272,7 +272,7 @@ func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.C
|
||||
}
|
||||
}
|
||||
c.logger.Debug(
|
||||
"volume could not be claimed because it is in use, retrying in %v", backoff)
|
||||
"volume could not be claimed because it is in use", "retry_in", backoff)
|
||||
t.Reset(backoff)
|
||||
}
|
||||
return &resp, err
|
||||
@@ -377,8 +377,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
|
||||
backoff = c.maxBackoffInterval
|
||||
}
|
||||
}
|
||||
c.logger.Debug(
|
||||
"volume could not be unmounted, retrying in %v", backoff)
|
||||
c.logger.Debug("volume could not be unmounted", "retry_in", backoff)
|
||||
t.Reset(backoff)
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -2384,21 +2384,17 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
|
||||
c := core.(*CoreScheduler)
|
||||
require.NoError(c.csiVolumeClaimGC(gc))
|
||||
|
||||
// TODO(tgross): the condition below means this test doesn't tell
|
||||
// us much; ideally we should be intercepting the claim request
|
||||
// and verifying that we send the expected claims but we don't
|
||||
// have test infra in place to do that for server RPCs
|
||||
|
||||
// sending the GC claim will trigger the volumewatcher's normal
|
||||
// code path. but the volumewatcher will hit an error here
|
||||
// because there's no path to the node, so we shouldn't see
|
||||
// the WriteClaims removed
|
||||
// code path. the volumewatcher will hit an error here because
|
||||
// there's no path to the node, but this is a node-only plugin so
|
||||
// we accept that the node has been GC'd and there's no point
|
||||
// holding onto the claim
|
||||
require.Eventually(func() bool {
|
||||
vol, _ := state.CSIVolumeByID(ws, ns, volID)
|
||||
return len(vol.WriteClaims) == 1 &&
|
||||
len(vol.WriteAllocs) == 1 &&
|
||||
len(vol.PastClaims) == 1
|
||||
}, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly")
|
||||
return len(vol.WriteClaims) == 0 &&
|
||||
len(vol.WriteAllocs) == 0 &&
|
||||
len(vol.PastClaims) == 0
|
||||
}, time.Second*2, 10*time.Millisecond, "claims were not released")
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -472,15 +472,6 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
|
||||
args.NodeID = alloc.NodeID
|
||||
}
|
||||
|
||||
if isNewClaim {
|
||||
// if this is a new claim, add a Volume and PublishContext from the
|
||||
// controller (if any) to the reply
|
||||
err = v.controllerPublishVolume(args, reply)
|
||||
if err != nil {
|
||||
return fmt.Errorf("controller publish: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
|
||||
if err != nil {
|
||||
v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
|
||||
@@ -490,6 +481,15 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
|
||||
return respErr
|
||||
}
|
||||
|
||||
if isNewClaim {
|
||||
// if this is a new claim, add a Volume and PublishContext from the
|
||||
// controller (if any) to the reply
|
||||
err = v.controllerPublishVolume(args, reply)
|
||||
if err != nil {
|
||||
return fmt.Errorf("controller publish: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
reply.Index = index
|
||||
v.srv.setQueryMeta(&reply.QueryMeta)
|
||||
return nil
|
||||
@@ -570,7 +570,10 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest,
|
||||
|
||||
err = v.srv.RPC(method, cReq, cResp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("attach volume: %v", err)
|
||||
if strings.Contains(err.Error(), "FailedPrecondition") {
|
||||
return fmt.Errorf("%v: %v", structs.ErrCSIClientRPCRetryable, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
resp.PublishContext = cResp.PublishContext
|
||||
return nil
|
||||
@@ -725,6 +728,11 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
|
||||
}
|
||||
|
||||
func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
|
||||
if claim.AccessMode == structs.CSIVolumeAccessModeUnknown {
|
||||
// claim has already been released client-side
|
||||
return nil
|
||||
}
|
||||
|
||||
req := &cstructs.ClientCSINodeDetachVolumeRequest{
|
||||
PluginID: vol.PluginID,
|
||||
VolumeID: vol.ID,
|
||||
@@ -820,17 +828,17 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
|
||||
// and GC'd by this point, so looking there is the last resort.
|
||||
func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) (string, error) {
|
||||
for _, rClaim := range vol.ReadClaims {
|
||||
if rClaim.NodeID == claim.NodeID {
|
||||
if rClaim.NodeID == claim.NodeID && rClaim.ExternalNodeID != "" {
|
||||
return rClaim.ExternalNodeID, nil
|
||||
}
|
||||
}
|
||||
for _, wClaim := range vol.WriteClaims {
|
||||
if wClaim.NodeID == claim.NodeID {
|
||||
if wClaim.NodeID == claim.NodeID && wClaim.ExternalNodeID != "" {
|
||||
return wClaim.ExternalNodeID, nil
|
||||
}
|
||||
}
|
||||
for _, pClaim := range vol.PastClaims {
|
||||
if pClaim.NodeID == claim.NodeID {
|
||||
if pClaim.NodeID == claim.NodeID && pClaim.ExternalNodeID != "" {
|
||||
return pClaim.ExternalNodeID, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,7 +256,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
|
||||
}
|
||||
claimResp := &structs.CSIVolumeClaimResponse{}
|
||||
err := msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
|
||||
require.EqualError(t, err, fmt.Sprintf("controller publish: volume not found: %s", id0),
|
||||
require.EqualError(t, err, fmt.Sprintf("volume not found: %s", id0),
|
||||
"expected 'volume not found' error because volume hasn't yet been created")
|
||||
|
||||
// Create a plugin and volume
|
||||
@@ -449,14 +449,14 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
|
||||
claimResp := &structs.CSIVolumeClaimResponse{}
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
|
||||
// Because the node is not registered
|
||||
require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node")
|
||||
require.EqualError(t, err, "controller publish: controller attach volume: No path to node")
|
||||
|
||||
// The node SecretID is authorized for all policies
|
||||
claimReq.AuthToken = node.SecretID
|
||||
claimReq.Namespace = ""
|
||||
claimResp = &structs.CSIVolumeClaimResponse{}
|
||||
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
|
||||
require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node")
|
||||
require.EqualError(t, err, "controller publish: controller attach volume: No path to node")
|
||||
}
|
||||
|
||||
func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
|
||||
@@ -515,7 +515,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
|
||||
{
|
||||
name: "first unpublish",
|
||||
startingState: structs.CSIVolumeClaimStateTaken,
|
||||
expectedErrMsg: "could not detach from node: No path to node",
|
||||
expectedErrMsg: "could not detach from controller: controller detach volume: No path to node",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user