diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index a4251e473..668bca303 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -143,12 +143,12 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum csiReq := req.ToCSIRequest() // Submit the request for a volume to the CSI Plugin. - ctx, cancelFn := c.requestContext() + ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second) defer cancelFn() // CSI ControllerUnpublishVolume errors for timeout, codes.Unavailable and // codes.ResourceExhausted are retried; all other errors are fatal. _, err = plugin.ControllerUnpublishVolume(ctx, csiReq, - grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout), + grpc_retry.WithPerRetryTimeout(10*time.Second), grpc_retry.WithMax(3), grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond))) if err != nil { diff --git a/nomad/core_sched.go b/nomad/core_sched.go index fa64df8af..934bab4c6 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -157,11 +157,6 @@ OUTER: c.logger.Debug("job GC found eligible objects", "jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc)) - // Clean up any outstanding volume claims - if err := c.volumeClaimReap(gcJob, eval.LeaderACL); err != nil { - return err - } - // Reap the evals and allocs if err := c.evalReap(gcEval, gcAlloc); err != nil { return err @@ -720,90 +715,64 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time, func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { c.logger.Trace("garbage collecting unclaimed CSI volume claims") - // JobID smuggled in with the eval's own JobID - var jobID string - evalJobID := strings.Split(eval.JobID, ":") - if len(evalJobID) != 2 { - c.logger.Error("volume gc called without jobID") + // Volume ID smuggled in with the eval's own JobID + evalVolID := strings.Split(eval.JobID, ":") + if len(evalVolID) != 3 { + c.logger.Error("volume gc called without volID") return nil } - jobID = evalJobID[1] - job, err := c.srv.State().JobByID(nil, eval.Namespace, jobID) - if err != nil || job == nil { - c.logger.Trace( - "cannot find job to perform volume claim GC. it may have been garbage collected", - "job", jobID) - return nil - } - return c.volumeClaimReap([]*structs.Job{job}, eval.LeaderACL) + volID := evalVolID[1] + runningAllocs := evalVolID[2] == "purge" + return volumeClaimReap(c.srv, volID, eval.Namespace, + c.srv.config.Region, eval.LeaderACL, runningAllocs) } -// volumeClaimReap contacts the leader and releases volume claims from terminal allocs -func (c *CoreScheduler) volumeClaimReap(jobs []*structs.Job, leaderACL string) error { - return volumeClaimReap(c.srv, c.logger, jobs, leaderACL, false) -} +func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error { -// volumeClaimReap contacts the leader and releases volume claims from terminal allocs -func volumeClaimReap(srv *Server, logger log.Logger, jobs []*structs.Job, leaderACL string, runningAllocs bool) error { ws := memdb.NewWatchSet() + + vol, err := srv.State().CSIVolumeByID(ws, namespace, volID) + if err != nil { + return err + } + if vol == nil { + return nil + } + vol, err = srv.State().CSIVolumeDenormalize(ws, vol) + if err != nil { + return err + } + + plug, err := srv.State().CSIPluginByID(ws, vol.PluginID) + if err != nil { + return err + } + + gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs) + var result *multierror.Error - - for _, job := range jobs { - logger.Trace("garbage collecting unclaimed CSI volume claims for job", "job", job.ID) - for _, taskGroup := range job.TaskGroups { - for _, tgVolume := range taskGroup.Volumes { - if tgVolume.Type != structs.VolumeTypeCSI { - continue // filter to just CSI volumes - } - volID := tgVolume.Source - vol, err := srv.State().CSIVolumeByID(ws, job.Namespace, volID) - if err != nil { - result = multierror.Append(result, err) - continue - } - if vol == nil { - logger.Trace("cannot find volume to be GC'd. it may have been deregistered", - "volume", volID) - continue - } - vol, err = srv.State().CSIVolumeDenormalize(ws, vol) - if err != nil { - result = multierror.Append(result, err) - continue - } - - plug, err := srv.State().CSIPluginByID(ws, vol.PluginID) - if err != nil { - result = multierror.Append(result, err) - continue - } - - gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs) - - for _, claim := range gcClaims { - nodeClaims, err = volumeClaimReapImpl(srv, - &volumeClaimReapArgs{ - vol: vol, - plug: plug, - allocID: claim.allocID, - nodeID: claim.nodeID, - mode: claim.mode, - region: job.Region, - namespace: job.Namespace, - leaderACL: leaderACL, - nodeClaims: nodeClaims, - }, - ) - if err != nil { - result = multierror.Append(result, err) - continue - } - } - } + for _, claim := range gcClaims { + nodeClaims, err = volumeClaimReapImpl(srv, + &volumeClaimReapArgs{ + vol: vol, + plug: plug, + allocID: claim.allocID, + nodeID: claim.nodeID, + mode: claim.mode, + namespace: namespace, + region: region, + leaderACL: leaderACL, + nodeClaims: nodeClaims, + }, + ) + if err != nil { + result = multierror.Append(result, err) + continue } } return result.ErrorOrNil() + } type gcClaimRequest struct { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 96b0276c9..5f06ccdde 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -707,9 +707,14 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD return err } - // For a job with volumes, run volume claim GC before deleting the job - if job != nil { - volumeClaimReap(j.srv, j.logger, []*structs.Job{job}, j.srv.getLeaderAcl(), true) + // For a job with volumes, find its volumes before deleting the job + volumesToGC := make(map[string]*structs.VolumeRequest) + for _, tg := range job.TaskGroups { + for _, vol := range tg.Volumes { + if vol.Type == structs.VolumeTypeCSI { + volumesToGC[vol.Source] = vol + } + } } // Commit this update via Raft @@ -722,43 +727,75 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Populate the reply with job information reply.JobModifyIndex = index - // If the job is periodic or parameterized, we don't create an eval. - if job != nil && (job.IsPeriodic() || job.IsParameterized()) { - return nil - } - - // Create a new evaluation - // XXX: The job priority / type is strange for this, since it's not a high - // priority even if the job was. + evals := []*structs.Evaluation{} now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: structs.JobDefaultPriority, - Type: structs.JobTypeService, - TriggeredBy: structs.EvalTriggerJobDeregister, - JobID: args.JobID, - JobModifyIndex: index, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, + + // Add an evaluation for garbage collecting the the CSI volume claims + // of terminal allocs + for _, vol := range volumesToGC { + // we have to build this eval by hand rather than calling srv.CoreJob + // here because we need to use the volume's namespace + + runningAllocs := ":ok" + if args.Purge { + runningAllocs = ":purge" + } + + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: job.Namespace, + Priority: structs.CoreJobPriority, + Type: structs.JobTypeCore, + TriggeredBy: structs.EvalTriggerAllocStop, + JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source + runningAllocs, + LeaderACL: j.srv.getLeaderAcl(), + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + evals = append(evals, eval) } - // Commit this evaluation via Raft - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - j.logger.Error("eval create failed", "error", err, "method", "deregister") - return err + // If the job is periodic or parameterized, we don't create an eval + // for the job, but might still need one for the volumes + if job == nil || !(job.IsPeriodic() || job.IsParameterized()) { + // Create a new evaluation + // XXX: The job priority / type is strange for this, since it's not a high + // priority even if the job was. + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: structs.JobDefaultPriority, + Type: structs.JobTypeService, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: args.JobID, + JobModifyIndex: index, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + evals = append(evals, eval) + } + + if len(evals) > 0 { + update := &structs.EvalUpdateRequest{ + Evals: evals, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } + + // Commit this evaluation via Raft + _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + j.logger.Error("eval create failed", "error", err, "method", "deregister") + return err + } + + // Populate the reply with eval information + reply.EvalID = evals[len(evals)-1].ID + reply.EvalCreateIndex = evalIndex + reply.Index = evalIndex } - // Populate the reply with eval information - reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex return nil } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index d8348a79e..fcfbcfcc2 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1081,9 +1081,9 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene now := time.Now() var evals []*structs.Evaluation - // A set of de-duplicated IDs for jobs that need volume claim GC. - // Later we'll create a gc eval for each job. - jobsWithVolumeGCs := make(map[string]*structs.Job) + // A set of de-duplicated volumes that need volume claim GC. + // Later we'll create a gc eval for each volume. + volumesToGC := make(map[string][]string) // ID+namespace -> [id, namespace] for _, allocToUpdate := range args.Alloc { allocToUpdate.ModifyTime = now.UTC().UnixNano() @@ -1097,9 +1097,10 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene continue } + // if the job has been purged, this will always return error job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID) if err != nil { - n.logger.Error("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err) + n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err) continue } if job == nil { @@ -1116,7 +1117,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene // of jobs we're going to call volume claim GC on. for _, vol := range taskGroup.Volumes { if vol.Type == structs.VolumeTypeCSI { - jobsWithVolumeGCs[job.ID] = job + volumesToGC[vol.Source+alloc.Namespace] = []string{vol.Source, alloc.Namespace} } } @@ -1138,17 +1139,17 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene } // Add an evaluation for garbage collecting the the CSI volume claims - // of jobs with terminal allocs - for _, job := range jobsWithVolumeGCs { + // of terminal allocs + for _, volAndNamespace := range volumesToGC { // we have to build this eval by hand rather than calling srv.CoreJob - // here because we need to use the alloc's namespace + // here because we need to use the volume's namespace eval := &structs.Evaluation{ ID: uuid.Generate(), - Namespace: job.Namespace, + Namespace: volAndNamespace[1], Priority: structs.CoreJobPriority, Type: structs.JobTypeCore, TriggeredBy: structs.EvalTriggerAllocStop, - JobID: structs.CoreJobCSIVolumeClaimGC + ":" + job.ID, + JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0] + ":no", LeaderACL: n.srv.getLeaderAcl(), Status: structs.EvalStatusPending, CreateTime: now.UTC().UnixNano(), diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index c8f5856ea..9c1c5746b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2406,7 +2406,7 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { // Verify the eval for the claim GC was emitted // Lookup the evaluations - eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+job.ID) + eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+volId0+":no") require.NotNil(t, eval) require.Nil(t, err) }