From f868be18ce3c112af405c3712c96ccfc863ee69a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 5 Feb 2020 13:27:37 -0500 Subject: [PATCH] csi: implement releasing volume claims for terminal allocs (#7076) When an alloc is marked terminal, and after node unstage/unpublish have been called, the client will sync the terminal alloc state with the server via `Node.UpdateAlloc` RPC. This changeset implements releasing the volume claim for each volume associated with the terminal alloc. It doesn't yet implement the RPC call we need to make to the `ControllerUnpublishVolume` CSI RPC. --- nomad/node_endpoint.go | 101 ++++++++++++++++++++++++---------- nomad/node_endpoint_test.go | 106 ++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 30 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index e26891349..6639c0c0d 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1081,39 +1081,54 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene now := time.Now() var evals []*structs.Evaluation - for _, alloc := range args.Alloc { - alloc.ModifyTime = now.UTC().UnixNano() + for _, allocToUpdate := range args.Alloc { + allocToUpdate.ModifyTime = now.UTC().UnixNano() + + if !allocToUpdate.TerminalStatus() { + continue + } + + alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID) + if alloc == nil { + continue + } + + job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID) + if err != nil { + n.logger.Error("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err) + continue + } + if job == nil { + n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID) + continue + } + + taskGroup := job.LookupTaskGroup(alloc.TaskGroup) + if taskGroup == nil { + continue + } + + err = n.unclaimVolumesForTerminalAllocs(args, alloc, taskGroup) + if err != nil { + n.logger.Error("UpdateAlloc unable to release CSI volume", + "alloc", alloc.ID, "error", err) + continue + } // Add an evaluation if this is a failed alloc that is eligible for rescheduling - if alloc.ClientStatus == structs.AllocClientStatusFailed { - // Only create evaluations if this is an existing alloc, - // and eligible as per its task group's ReschedulePolicy - if existingAlloc, _ := n.srv.State().AllocByID(nil, alloc.ID); existingAlloc != nil { - job, err := n.srv.State().JobByID(nil, existingAlloc.Namespace, existingAlloc.JobID) - if err != nil { - n.logger.Error("UpdateAlloc unable to find job", "job", existingAlloc.JobID, "error", err) - continue - } - if job == nil { - n.logger.Debug("UpdateAlloc unable to find job", "job", existingAlloc.JobID) - continue - } - taskGroup := job.LookupTaskGroup(existingAlloc.TaskGroup) - if taskGroup != nil && existingAlloc.FollowupEvalID == "" && existingAlloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) { - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: existingAlloc.Namespace, - TriggeredBy: structs.EvalTriggerRetryFailedAlloc, - JobID: existingAlloc.JobID, - Type: job.Type, - Priority: job.Priority, - Status: structs.EvalStatusPending, - CreateTime: now.UTC().UnixNano(), - ModifyTime: now.UTC().UnixNano(), - } - evals = append(evals, eval) - } + if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) { + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: alloc.Namespace, + TriggeredBy: structs.EvalTriggerRetryFailedAlloc, + JobID: alloc.JobID, + Type: job.Type, + Priority: job.Priority, + Status: structs.EvalStatusPending, + CreateTime: now.UTC().UnixNano(), + ModifyTime: now.UTC().UnixNano(), } + evals = append(evals, eval) } } @@ -1155,6 +1170,32 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene return nil } +// unclaimVolumesForTerminalAllocs unpublishes and unclaims CSI volumes +// that belong to the alloc if it is terminal. +func (n *Node) unclaimVolumesForTerminalAllocs(args *structs.AllocUpdateRequest, alloc *structs.Allocation, taskGroup *structs.TaskGroup) error { + for _, volume := range taskGroup.Volumes { + + // TODO(tgross): we also need to call ControllerUnpublishVolume CSI RPC here + // but the server-side CSI client + routing hasn't been implemented yet + + req := &structs.CSIVolumeClaimRequest{ + VolumeID: volume.Source, + Allocation: alloc, + Claim: structs.CSIVolumeClaimRelease, + WriteRequest: args.WriteRequest, + } + + resp, _, err := n.srv.raftApply(structs.CSIVolumeClaimRequestType, req) + if err != nil { + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + } + return nil +} + // batchUpdate is used to update all the allocations func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Allocation, evals []*structs.Evaluation) { // Group pending evals by jobID to prevent creating unnecessary evals diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 3ff670887..7f5f04882 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2312,6 +2312,112 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) { } } +func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { + t.Parallel() + srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 }) + defer shutdown() + testutil.WaitForLeader(t, srv.RPC) + + codec := rpcClient(t, srv) + state := srv.fsm.State() + ws := memdb.NewWatchSet() + + // Create a client node with a plugin + node := mock.Node() + node.CSINodePlugins = map[string]*structs.CSIInfo{ + "csi-plugin-example": {PluginID: "csi-plugin-example", + Healthy: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + plugin := structs.NewCSIPlugin("csi-plugin-example", 1) + plugin.ControllerRequired = false + plugin.AddPlugin(node.ID, &structs.CSIInfo{}) + err := state.UpsertNode(99, node) + require.NoError(t, err) + + // Create the volume for the plugin + volId0 := uuid.Generate() + vols := []*structs.CSIVolume{{ + ID: volId0, + Namespace: "notTheNamespace", + PluginID: "csi-plugin-example", + AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + Topologies: []*structs.CSITopology{{ + Segments: map[string]string{"foo": "bar"}, + }}, + }} + err = state.CSIVolumeRegister(4, vols) + require.NoError(t, err) + + // Create a job with 2 allocations + job := mock.Job() + job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ + "_": { + Name: "someVolume", + Type: "", + Source: volId0, + ReadOnly: false, + }, + } + err = state.UpsertJob(101, job) + require.NoError(t, err) + + alloc1 := mock.Alloc() + alloc1.JobID = job.ID + alloc1.NodeID = node.ID + err = state.UpsertJobSummary(102, mock.JobSummary(alloc1.JobID)) + require.NoError(t, err) + alloc1.TaskGroup = job.TaskGroups[0].Name + + alloc2 := mock.Alloc() + alloc2.JobID = job.ID + alloc2.NodeID = node.ID + err = state.UpsertJobSummary(103, mock.JobSummary(alloc2.JobID)) + require.NoError(t, err) + alloc2.TaskGroup = job.TaskGroups[0].Name + + err = state.UpsertAllocs(104, []*structs.Allocation{alloc1, alloc2}) + require.NoError(t, err) + + // Verify no claims are set + vol, err := state.CSIVolumeByID(ws, volId0) + require.NoError(t, err) + require.Len(t, vol.ReadAllocs, 0) + require.Len(t, vol.WriteAllocs, 0) + + // Claim the volumes and verify the claims were set + err = state.CSIVolumeClaim(105, volId0, alloc1, structs.CSIVolumeClaimWrite) + require.NoError(t, err) + err = state.CSIVolumeClaim(106, volId0, alloc2, structs.CSIVolumeClaimRead) + require.NoError(t, err) + vol, err = state.CSIVolumeByID(ws, volId0) + require.NoError(t, err) + require.Len(t, vol.ReadAllocs, 1) + require.Len(t, vol.WriteAllocs, 1) + + // Update the 1st alloc as terminal/failed + alloc1.ClientStatus = structs.AllocClientStatusFailed + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", + &structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc1}, + WriteRequest: structs.WriteRequest{Region: "global"}, + }, &structs.NodeAllocsResponse{}) + require.NoError(t, err) + + // Lookup the alloc and verify status was updated + out, err := state.AllocByID(ws, alloc1.ID) + require.NoError(t, err) + require.Equal(t, structs.AllocClientStatusFailed, out.ClientStatus) + + // Verify the claim was released + vol, err = state.CSIVolumeByID(ws, volId0) + require.NoError(t, err) + require.Len(t, vol.ReadAllocs, 1) + require.Len(t, vol.WriteAllocs, 0) +} + func TestClientEndpoint_CreateNodeEvals(t *testing.T) { t.Parallel()