csi: implement releasing volume claims for terminal allocs (#7076)

When an alloc is marked terminal, and after node unstage/unpublish
have been called, the client will sync the terminal alloc state with
the server via `Node.UpdateAlloc` RPC.

This changeset implements releasing the volume claim for each volume
associated with the terminal alloc. It doesn't yet implement the RPC
call we need to make to the `ControllerUnpublishVolume` CSI RPC.
This commit is contained in:
Tim Gross
2020-02-05 13:27:37 -05:00
committed by Tim Gross
parent cdbeb9a0b1
commit f868be18ce
2 changed files with 177 additions and 30 deletions

View File

@@ -1081,39 +1081,54 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
now := time.Now()
var evals []*structs.Evaluation
for _, alloc := range args.Alloc {
alloc.ModifyTime = now.UTC().UnixNano()
for _, allocToUpdate := range args.Alloc {
allocToUpdate.ModifyTime = now.UTC().UnixNano()
if !allocToUpdate.TerminalStatus() {
continue
}
alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID)
if alloc == nil {
continue
}
job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID)
if err != nil {
n.logger.Error("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err)
continue
}
if job == nil {
n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID)
continue
}
taskGroup := job.LookupTaskGroup(alloc.TaskGroup)
if taskGroup == nil {
continue
}
err = n.unclaimVolumesForTerminalAllocs(args, alloc, taskGroup)
if err != nil {
n.logger.Error("UpdateAlloc unable to release CSI volume",
"alloc", alloc.ID, "error", err)
continue
}
// Add an evaluation if this is a failed alloc that is eligible for rescheduling
if alloc.ClientStatus == structs.AllocClientStatusFailed {
// Only create evaluations if this is an existing alloc,
// and eligible as per its task group's ReschedulePolicy
if existingAlloc, _ := n.srv.State().AllocByID(nil, alloc.ID); existingAlloc != nil {
job, err := n.srv.State().JobByID(nil, existingAlloc.Namespace, existingAlloc.JobID)
if err != nil {
n.logger.Error("UpdateAlloc unable to find job", "job", existingAlloc.JobID, "error", err)
continue
}
if job == nil {
n.logger.Debug("UpdateAlloc unable to find job", "job", existingAlloc.JobID)
continue
}
taskGroup := job.LookupTaskGroup(existingAlloc.TaskGroup)
if taskGroup != nil && existingAlloc.FollowupEvalID == "" && existingAlloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: existingAlloc.Namespace,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: existingAlloc.JobID,
Type: job.Type,
Priority: job.Priority,
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
}
if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: alloc.JobID,
Type: job.Type,
Priority: job.Priority,
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
}
}
@@ -1155,6 +1170,32 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
return nil
}
// unclaimVolumesForTerminalAllocs unpublishes and unclaims CSI volumes
// that belong to the alloc if it is terminal.
func (n *Node) unclaimVolumesForTerminalAllocs(args *structs.AllocUpdateRequest, alloc *structs.Allocation, taskGroup *structs.TaskGroup) error {
for _, volume := range taskGroup.Volumes {
// TODO(tgross): we also need to call ControllerUnpublishVolume CSI RPC here
// but the server-side CSI client + routing hasn't been implemented yet
req := &structs.CSIVolumeClaimRequest{
VolumeID: volume.Source,
Allocation: alloc,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: args.WriteRequest,
}
resp, _, err := n.srv.raftApply(structs.CSIVolumeClaimRequestType, req)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
}
return nil
}
// batchUpdate is used to update all the allocations
func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Allocation, evals []*structs.Evaluation) {
// Group pending evals by jobID to prevent creating unnecessary evals

View File

@@ -2312,6 +2312,112 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) {
}
}
func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 })
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
codec := rpcClient(t, srv)
state := srv.fsm.State()
ws := memdb.NewWatchSet()
// Create a client node with a plugin
node := mock.Node()
node.CSINodePlugins = map[string]*structs.CSIInfo{
"csi-plugin-example": {PluginID: "csi-plugin-example",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
plugin := structs.NewCSIPlugin("csi-plugin-example", 1)
plugin.ControllerRequired = false
plugin.AddPlugin(node.ID, &structs.CSIInfo{})
err := state.UpsertNode(99, node)
require.NoError(t, err)
// Create the volume for the plugin
volId0 := uuid.Generate()
vols := []*structs.CSIVolume{{
ID: volId0,
Namespace: "notTheNamespace",
PluginID: "csi-plugin-example",
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
Topologies: []*structs.CSITopology{{
Segments: map[string]string{"foo": "bar"},
}},
}}
err = state.CSIVolumeRegister(4, vols)
require.NoError(t, err)
// Create a job with 2 allocations
job := mock.Job()
job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"_": {
Name: "someVolume",
Type: "",
Source: volId0,
ReadOnly: false,
},
}
err = state.UpsertJob(101, job)
require.NoError(t, err)
alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.NodeID = node.ID
err = state.UpsertJobSummary(102, mock.JobSummary(alloc1.JobID))
require.NoError(t, err)
alloc1.TaskGroup = job.TaskGroups[0].Name
alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.NodeID = node.ID
err = state.UpsertJobSummary(103, mock.JobSummary(alloc2.JobID))
require.NoError(t, err)
alloc2.TaskGroup = job.TaskGroups[0].Name
err = state.UpsertAllocs(104, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)
// Verify no claims are set
vol, err := state.CSIVolumeByID(ws, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 0)
require.Len(t, vol.WriteAllocs, 0)
// Claim the volumes and verify the claims were set
err = state.CSIVolumeClaim(105, volId0, alloc1, structs.CSIVolumeClaimWrite)
require.NoError(t, err)
err = state.CSIVolumeClaim(106, volId0, alloc2, structs.CSIVolumeClaimRead)
require.NoError(t, err)
vol, err = state.CSIVolumeByID(ws, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
require.Len(t, vol.WriteAllocs, 1)
// Update the 1st alloc as terminal/failed
alloc1.ClientStatus = structs.AllocClientStatusFailed
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc",
&structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc1},
WriteRequest: structs.WriteRequest{Region: "global"},
}, &structs.NodeAllocsResponse{})
require.NoError(t, err)
// Lookup the alloc and verify status was updated
out, err := state.AllocByID(ws, alloc1.ID)
require.NoError(t, err)
require.Equal(t, structs.AllocClientStatusFailed, out.ClientStatus)
// Verify the claim was released
vol, err = state.CSIVolumeByID(ws, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
require.Len(t, vol.WriteAllocs, 0)
}
func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
t.Parallel()