csi: release claims via csi_hook postrun unpublish RPC (#8580)

Add a Postrun hook to send the `CSIVolume.Unpublish` RPC to the server. This
may forward client RPCs to the node plugins or to the controller plugins,
depending on whether other allocations on this node have claims on this
volume.

By making clients responsible for running the `CSIVolume.Unpublish` RPC (and
making the RPC available to a `nomad volume detach` command), the
volumewatcher becomes only used by the core GC job and we no longer need
async volume GC from job deregister and node update.
This commit is contained in:
Tim Gross
2020-08-06 14:51:46 -04:00
committed by GitHub
parent 713b5f5007
commit 9384b1f77e
9 changed files with 47 additions and 294 deletions

View File

@@ -5,6 +5,7 @@ import (
"fmt"
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
@@ -15,12 +16,13 @@ import (
//
// It is a noop for allocs that do not depend on CSI Volumes.
type csiHook struct {
ar *allocRunner
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager
rpcClient RPCer
updater hookResourceSetter
ar *allocRunner
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager
rpcClient RPCer
updater hookResourceSetter
volumeRequests map[string]*volumeAndRequest
}
func (c *csiHook) Name() string {
@@ -43,6 +45,7 @@ func (c *csiHook) Prerun() error {
if err != nil {
return fmt.Errorf("claim volumes: %v", err)
}
c.volumeRequests = volumes
mounts := make(map[string]*csimanager.MountInfo, len(volumes))
for alias, pair := range volumes {
@@ -73,6 +76,37 @@ func (c *csiHook) Prerun() error {
return nil
}
// Postrun sends an RPC to the server to unpublish the volume. This may
// forward client RPCs to the node plugins or to the controller plugins,
// depending on whether other allocations on this node have claims on this
// volume.
func (c *csiHook) Postrun() error {
if !c.shouldRun() {
return nil
}
var mErr *multierror.Error
for _, pair := range c.volumeRequests {
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: pair.request.Source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Mode: structs.CSIVolumeClaimRelease,
},
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region, Namespace: c.alloc.Job.Namespace},
}
err := c.rpcClient.RPC("CSIVolume.Unpublish",
req, &structs.CSIVolumeUnpublishResponse{})
if err != nil {
mErr = multierror.Append(mErr, err)
}
}
return mErr.ErrorOrNil()
}
type volumeAndRequest struct {
volume *structs.CSIVolume
request *structs.VolumeRequest

View File

@@ -402,7 +402,7 @@ func DefaultConfig() *Config {
CSIPluginGCInterval: 5 * time.Minute,
CSIPluginGCThreshold: 1 * time.Hour,
CSIVolumeClaimGCInterval: 5 * time.Minute,
CSIVolumeClaimGCThreshold: 1 * time.Hour,
CSIVolumeClaimGCThreshold: 5 * time.Minute,
EvalNackTimeout: 60 * time.Second,
EvalDeliveryLimit: 3,
EvalNackInitialReenqueueDelay: 1 * time.Second,

View File

@@ -783,7 +783,7 @@ NEXT_VOLUME:
if err != nil {
return err
}
if alloc == nil {
if alloc == nil || alloc.TerminalStatus() {
err = gcClaims(vol.Namespace, vol.ID)
if err != nil {
return err
@@ -796,7 +796,7 @@ NEXT_VOLUME:
if err != nil {
return err
}
if alloc == nil {
if alloc == nil || alloc.TerminalStatus() {
err = gcClaims(vol.Namespace, vol.ID)
if err != nil {
return err

View File

@@ -1,76 +0,0 @@
package nomad
import (
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)
// csiBatchRelease is a helper for any time we need to release a bunch
// of volume claims at once. It de-duplicates the volumes and batches
// the raft messages into manageable chunks. Intended for use by RPCs
// that have already been forwarded to the leader.
type csiBatchRelease struct {
srv *Server
logger log.Logger
maxBatchSize int
seen map[string]struct{}
batches []*structs.CSIVolumeClaimBatchRequest
}
func newCSIBatchRelease(srv *Server, logger log.Logger, max int) *csiBatchRelease {
return &csiBatchRelease{
srv: srv,
logger: logger,
maxBatchSize: max,
seen: map[string]struct{}{},
batches: []*structs.CSIVolumeClaimBatchRequest{{}},
}
}
// add the volume ID + namespace to the deduplicated batches
func (c *csiBatchRelease) add(vol, namespace string) {
id := vol + "\x00" + namespace
// ignore duplicates
_, seen := c.seen[id]
if seen {
return
}
c.seen[id] = struct{}{}
req := structs.CSIVolumeClaimRequest{
VolumeID: vol,
Claim: structs.CSIVolumeClaimRelease,
}
req.Namespace = namespace
req.Region = c.srv.config.Region
for _, batch := range c.batches {
// otherwise append to the first non-full batch
if len(batch.Claims) < c.maxBatchSize {
batch.Claims = append(batch.Claims, req)
return
}
}
// no non-full batch found, make a new one
newBatch := &structs.CSIVolumeClaimBatchRequest{
Claims: []structs.CSIVolumeClaimRequest{req}}
c.batches = append(c.batches, newBatch)
}
// apply flushes the batches to raft
func (c *csiBatchRelease) apply() error {
var result *multierror.Error
for _, batch := range c.batches {
if len(batch.Claims) > 0 {
_, _, err := c.srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch)
if err != nil {
c.logger.Error("csi raft apply failed", "error", err, "method", "claim")
result = multierror.Append(result, err)
}
}
}
return result.ErrorOrNil()
}

View File

@@ -1,34 +0,0 @@
package nomad
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestCSI_Batcher(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
batcher := newCSIBatchRelease(srv, nil, 5)
batcher.add("vol0", "global")
batcher.add("vol", "0global")
batcher.add("vol1", "global")
batcher.add("vol1", "global")
batcher.add("vol2", "global")
batcher.add("vol2", "other")
batcher.add("vol3", "global")
batcher.add("vol4", "global")
batcher.add("vol5", "global")
batcher.add("vol6", "global")
require.Len(t, batcher.batches, 2)
require.Len(t, batcher.batches[0].Claims, 5, "first batch")
require.Equal(t, batcher.batches[0].Claims[4].VolumeID, "vol2")
require.Equal(t, batcher.batches[0].Claims[4].Namespace, "other")
require.Len(t, batcher.batches[1].Claims, 4, "second batch")
}

View File

@@ -780,19 +780,6 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}
// For a job with volumes, find its volumes before deleting the job.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(j.srv, j.logger, 100)
if job != nil {
for _, tg := range job.TaskGroups {
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC.add(vol.Source, job.Namespace)
}
}
}
}
var eval *structs.Evaluation
// The job priority / type is strange for this, since it's not a high
@@ -832,13 +819,6 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
reply.EvalCreateIndex = index
reply.Index = index
// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
err = volumesToGC.apply()
if err != nil {
result = multierror.Append(result, err)
}
// COMPAT(1.1.0) - Remove entire conditional block
// 0.12.1 introduced atomic job deregistration eval
if eval != nil && args.Eval == nil {
@@ -852,16 +832,15 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
return err
}
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}
return result.ErrorOrNil()
return nil
}
// BatchDeregister is used to remove a set of jobs from the cluster.

View File

@@ -1083,10 +1083,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
now := time.Now()
var evals []*structs.Evaluation
// A set of de-duplicated volumes that need their volume claims released.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100)
for _, allocToUpdate := range args.Alloc {
allocToUpdate.ModifyTime = now.UTC().UnixNano()
@@ -1115,14 +1111,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}
// If the terminal alloc has CSI volumes, add the volumes to the batch
// of volumes we'll release the claims of.
for _, vol := range taskGroup.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC.add(vol.Source, alloc.Namespace)
}
}
// Add an evaluation if this is a failed alloc that is eligible for rescheduling
if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
eval := &structs.Evaluation{
@@ -1140,13 +1128,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
}
}
// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
err := volumesToGC.apply()
if err != nil {
result = multierror.Append(result, err)
}
// Add this to the batch
n.updatesLock.Lock()
n.updates = append(n.updates, args.Alloc...)
@@ -1177,13 +1158,12 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
// Wait for the future
if err := future.Wait(); err != nil {
result = multierror.Append(result, err)
return result.ErrorOrNil()
return err
}
// Setup the response
reply.Index = future.Index()
return result.ErrorOrNil()
return nil
}
// batchUpdate is used to update all the allocations

View File

@@ -2313,132 +2313,6 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) {
}
}
func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 })
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
codec := rpcClient(t, srv)
state := srv.fsm.State()
index := uint64(0)
ws := memdb.NewWatchSet()
// Create a client node, plugin, and volume
node := mock.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version
node.CSINodePlugins = map[string]*structs.CSIInfo{
"csi-plugin-example": {PluginID: "csi-plugin-example",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
ControllerInfo: &structs.CSIControllerInfo{},
},
}
index++
err := state.UpsertNode(index, node)
require.NoError(t, err)
volId0 := uuid.Generate()
ns := structs.DefaultNamespace
vols := []*structs.CSIVolume{{
ID: volId0,
Namespace: ns,
PluginID: "csi-plugin-example",
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}
index++
err = state.CSIVolumeRegister(index, vols)
require.NoError(t, err)
vol, err := state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 0)
require.Len(t, vol.WriteAllocs, 0)
// Create a job with 2 allocations
job := mock.Job()
job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"_": {
Name: "someVolume",
Type: structs.VolumeTypeCSI,
Source: volId0,
ReadOnly: false,
},
}
index++
err = state.UpsertJob(index, job)
require.NoError(t, err)
alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.NodeID = node.ID
index++
err = state.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID))
require.NoError(t, err)
alloc1.TaskGroup = job.TaskGroups[0].Name
alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.NodeID = node.ID
index++
err = state.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID))
require.NoError(t, err)
alloc2.TaskGroup = job.TaskGroups[0].Name
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)
// Claim the volumes and verify the claims were set. We need to
// apply this through the FSM so that we make sure the index is
// properly updated to test later
batch := &structs.CSIVolumeClaimBatchRequest{
Claims: []structs.CSIVolumeClaimRequest{
{
VolumeID: volId0,
AllocationID: alloc1.ID,
NodeID: alloc1.NodeID,
Claim: structs.CSIVolumeClaimWrite,
},
{
VolumeID: volId0,
AllocationID: alloc2.ID,
NodeID: alloc2.NodeID,
Claim: structs.CSIVolumeClaimRead,
},
}}
_, lastIndex, err := srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch)
require.NoError(t, err)
vol, err = state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
require.Len(t, vol.WriteAllocs, 1)
// Update the 1st alloc as terminal/failed
alloc1.ClientStatus = structs.AllocClientStatusFailed
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc",
&structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc1},
WriteRequest: structs.WriteRequest{Region: "global"},
}, &structs.NodeAllocsResponse{})
require.NoError(t, err)
// Lookup the alloc and verify status was updated
out, err := state.AllocByID(ws, alloc1.ID)
require.NoError(t, err)
require.Equal(t, structs.AllocClientStatusFailed, out.ClientStatus)
// Verify the index has been updated to trigger a volume claim release
req := &structs.CSIVolumeGetRequest{ID: volId0}
req.Region = "global"
getResp := &structs.CSIVolumeGetResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req, getResp)
require.NoError(t, err)
require.Greater(t, getResp.Volume.ModifyIndex, lastIndex)
}
func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
t.Parallel()

View File

@@ -168,10 +168,6 @@ func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool {
func (vw *volumeWatcher) volumeReapImpl(vol *structs.CSIVolume) error {
if len(vol.PastClaims) == 0 {
return nil
}
// PastClaims written by a volume GC core job will have no allocation,
// so we need to find out which allocs are eligible for cleanup.
for _, claim := range vol.PastClaims {