From b946906865003d429b92f0b3288bca03cbc39b21 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Sun, 5 Apr 2020 10:47:40 -0400 Subject: [PATCH] csi: make volume GC in job deregister safely async The `Job.Deregister` call will block on the client CSI controller RPCs while the alloc still exists on the Nomad client node. So we need to make the volume claim reaping async from the `Job.Deregister`. This allows `nomad job stop` to return immediately. In order to make this work, this changeset changes the volume GC so that the GC jobs are on a by-volume basis rather than a by-job basis; we won't have to query the (possibly deleted) job at the time of volume GC. We smuggle the volume ID and whether it's a purge into the GC eval ID the same way we smuggled the job ID previously. --- client/csi_endpoint.go | 4 +- nomad/core_sched.go | 125 ++++++++++++++---------------------- nomad/job_endpoint.go | 107 ++++++++++++++++++++---------- nomad/node_endpoint.go | 21 +++--- nomad/node_endpoint_test.go | 2 +- 5 files changed, 133 insertions(+), 126 deletions(-) diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index a4251e473..668bca303 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -143,12 +143,12 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum csiReq := req.ToCSIRequest() // Submit the request for a volume to the CSI Plugin. - ctx, cancelFn := c.requestContext() + ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second) defer cancelFn() // CSI ControllerUnpublishVolume errors for timeout, codes.Unavailable and // codes.ResourceExhausted are retried; all other errors are fatal. _, err = plugin.ControllerUnpublishVolume(ctx, csiReq, - grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout), + grpc_retry.WithPerRetryTimeout(10*time.Second), grpc_retry.WithMax(3), grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond))) if err != nil { diff --git a/nomad/core_sched.go b/nomad/core_sched.go index fa64df8af..934bab4c6 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -157,11 +157,6 @@ OUTER: c.logger.Debug("job GC found eligible objects", "jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc)) - // Clean up any outstanding volume claims - if err := c.volumeClaimReap(gcJob, eval.LeaderACL); err != nil { - return err - } - // Reap the evals and allocs if err := c.evalReap(gcEval, gcAlloc); err != nil { return err @@ -720,90 +715,64 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time, func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { c.logger.Trace("garbage collecting unclaimed CSI volume claims") - // JobID smuggled in with the eval's own JobID - var jobID string - evalJobID := strings.Split(eval.JobID, ":") - if len(evalJobID) != 2 { - c.logger.Error("volume gc called without jobID") + // Volume ID smuggled in with the eval's own JobID + evalVolID := strings.Split(eval.JobID, ":") + if len(evalVolID) != 3 { + c.logger.Error("volume gc called without volID") return nil } - jobID = evalJobID[1] - job, err := c.srv.State().JobByID(nil, eval.Namespace, jobID) - if err != nil || job == nil { - c.logger.Trace( - "cannot find job to perform volume claim GC. it may have been garbage collected", - "job", jobID) - return nil - } - return c.volumeClaimReap([]*structs.Job{job}, eval.LeaderACL) + volID := evalVolID[1] + runningAllocs := evalVolID[2] == "purge" + return volumeClaimReap(c.srv, volID, eval.Namespace, + c.srv.config.Region, eval.LeaderACL, runningAllocs) } -// volumeClaimReap contacts the leader and releases volume claims from terminal allocs -func (c *CoreScheduler) volumeClaimReap(jobs []*structs.Job, leaderACL string) error { - return volumeClaimReap(c.srv, c.logger, jobs, leaderACL, false) -} +func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error { -// volumeClaimReap contacts the leader and releases volume claims from terminal allocs -func volumeClaimReap(srv *Server, logger log.Logger, jobs []*structs.Job, leaderACL string, runningAllocs bool) error { ws := memdb.NewWatchSet() + + vol, err := srv.State().CSIVolumeByID(ws, namespace, volID) + if err != nil { + return err + } + if vol == nil { + return nil + } + vol, err = srv.State().CSIVolumeDenormalize(ws, vol) + if err != nil { + return err + } + + plug, err := srv.State().CSIPluginByID(ws, vol.PluginID) + if err != nil { + return err + } + + gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs) + var result *multierror.Error - - for _, job := range jobs { - logger.Trace("garbage collecting unclaimed CSI volume claims for job", "job", job.ID) - for _, taskGroup := range job.TaskGroups { - for _, tgVolume := range taskGroup.Volumes { - if tgVolume.Type != structs.VolumeTypeCSI { - continue // filter to just CSI volumes - } - volID := tgVolume.Source - vol, err := srv.State().CSIVolumeByID(ws, job.Namespace, volID) - if err != nil { - result = multierror.Append(result, err) - continue - } - if vol == nil { - logger.Trace("cannot find volume to be GC'd. it may have been deregistered", - "volume", volID) - continue - } - vol, err = srv.State().CSIVolumeDenormalize(ws, vol) - if err != nil { - result = multierror.Append(result, err) - continue - } - - plug, err := srv.State().CSIPluginByID(ws, vol.PluginID) - if err != nil { - result = multierror.Append(result, err) - continue - } - - gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs) - - for _, claim := range gcClaims { - nodeClaims, err = volumeClaimReapImpl(srv, - &volumeClaimReapArgs{ - vol: vol, - plug: plug, - allocID: claim.allocID, - nodeID: claim.nodeID, - mode: claim.mode, - region: job.Region, - namespace: job.Namespace, - leaderACL: leaderACL, - nodeClaims: nodeClaims, - }, - ) - if err != nil { - result = multierror.Append(result, err) - continue - } - } - } + for _, claim := range gcClaims { + nodeClaims, err = volumeClaimReapImpl(srv, + &volumeClaimReapArgs{ + vol: vol, + plug: plug, + allocID: claim.allocID, + nodeID: claim.nodeID, + mode: claim.mode, + namespace: namespace, + region: region, + leaderACL: leaderACL, + nodeClaims: nodeClaims, + }, + ) + if err != nil { + result = multierror.Append(result, err) + continue } } return result.ErrorOrNil() + } type gcClaimRequest struct { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 96b0276c9..5f06ccdde 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -707,9 +707,14 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD return err } - // For a job with volumes, run volume claim GC before deleting the job - if job != nil { - volumeClaimReap(j.srv, j.logger, []*structs.Job{job}, j.srv.getLeaderAcl(), true) + // For a job with volumes, find its volumes before deleting the job + volumesToGC := make(map[string]*structs.VolumeRequest) + for _, tg := range job.TaskGroups { + for _, vol := range tg.Volumes { + if vol.Type == structs.VolumeTypeCSI { + volumesToGC[vol.Source] = vol + } + } } // Commit this update via Raft @@ -722,43 +727,75 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Populate the reply with job information reply.JobModifyIndex = index - // If the job is periodic or parameterized, we don't create an eval. - if job != nil && (job.IsPeriodic() || job.IsParameterized()) { - return nil - } - - // Create a new evaluation - // XXX: The job priority / type is strange for this, since it's not a high - // priority even if the job was. + evals := []*structs.Evaluation{} now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: structs.JobDefaultPriority, - Type: structs.JobTypeService, - TriggeredBy: structs.EvalTriggerJobDeregister, - JobID: args.JobID, - JobModifyIndex: index, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, + + // Add an evaluation for garbage collecting the the CSI volume claims + // of terminal allocs + for _, vol := range volumesToGC { + // we have to build this eval by hand rather than calling srv.CoreJob + // here because we need to use the volume's namespace + + runningAllocs := ":ok" + if args.Purge { + runningAllocs = ":purge" + } + + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: job.Namespace, + Priority: structs.CoreJobPriority, + Type: structs.JobTypeCore, + TriggeredBy: structs.EvalTriggerAllocStop, + JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source + runningAllocs, + LeaderACL: j.srv.getLeaderAcl(), + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + evals = append(evals, eval) } - // Commit this evaluation via Raft - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - j.logger.Error("eval create failed", "error", err, "method", "deregister") - return err + // If the job is periodic or parameterized, we don't create an eval + // for the job, but might still need one for the volumes + if job == nil || !(job.IsPeriodic() || job.IsParameterized()) { + // Create a new evaluation + // XXX: The job priority / type is strange for this, since it's not a high + // priority even if the job was. + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: structs.JobDefaultPriority, + Type: structs.JobTypeService, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: args.JobID, + JobModifyIndex: index, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + evals = append(evals, eval) + } + + if len(evals) > 0 { + update := &structs.EvalUpdateRequest{ + Evals: evals, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } + + // Commit this evaluation via Raft + _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + j.logger.Error("eval create failed", "error", err, "method", "deregister") + return err + } + + // Populate the reply with eval information + reply.EvalID = evals[len(evals)-1].ID + reply.EvalCreateIndex = evalIndex + reply.Index = evalIndex } - // Populate the reply with eval information - reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex return nil } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index d8348a79e..fcfbcfcc2 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1081,9 +1081,9 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene now := time.Now() var evals []*structs.Evaluation - // A set of de-duplicated IDs for jobs that need volume claim GC. - // Later we'll create a gc eval for each job. - jobsWithVolumeGCs := make(map[string]*structs.Job) + // A set of de-duplicated volumes that need volume claim GC. + // Later we'll create a gc eval for each volume. + volumesToGC := make(map[string][]string) // ID+namespace -> [id, namespace] for _, allocToUpdate := range args.Alloc { allocToUpdate.ModifyTime = now.UTC().UnixNano() @@ -1097,9 +1097,10 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene continue } + // if the job has been purged, this will always return error job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID) if err != nil { - n.logger.Error("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err) + n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err) continue } if job == nil { @@ -1116,7 +1117,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene // of jobs we're going to call volume claim GC on. for _, vol := range taskGroup.Volumes { if vol.Type == structs.VolumeTypeCSI { - jobsWithVolumeGCs[job.ID] = job + volumesToGC[vol.Source+alloc.Namespace] = []string{vol.Source, alloc.Namespace} } } @@ -1138,17 +1139,17 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene } // Add an evaluation for garbage collecting the the CSI volume claims - // of jobs with terminal allocs - for _, job := range jobsWithVolumeGCs { + // of terminal allocs + for _, volAndNamespace := range volumesToGC { // we have to build this eval by hand rather than calling srv.CoreJob - // here because we need to use the alloc's namespace + // here because we need to use the volume's namespace eval := &structs.Evaluation{ ID: uuid.Generate(), - Namespace: job.Namespace, + Namespace: volAndNamespace[1], Priority: structs.CoreJobPriority, Type: structs.JobTypeCore, TriggeredBy: structs.EvalTriggerAllocStop, - JobID: structs.CoreJobCSIVolumeClaimGC + ":" + job.ID, + JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0] + ":no", LeaderACL: n.srv.getLeaderAcl(), Status: structs.EvalStatusPending, CreateTime: now.UTC().UnixNano(), diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index c8f5856ea..9c1c5746b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2406,7 +2406,7 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { // Verify the eval for the claim GC was emitted // Lookup the evaluations - eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+job.ID) + eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+volId0+":no") require.NotNil(t, eval) require.Nil(t, err) }