diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 0786af497..a3277f858 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -151,20 +151,17 @@ OUTER: return err } - // Call to the leader to deregister the jobs. - for _, job := range gcJob { - req := structs.JobDeregisterRequest{ - JobID: job.ID, - Purge: true, - WriteRequest: structs.WriteRequest{ - Region: c.srv.config.Region, - Namespace: job.Namespace, - AuthToken: eval.LeaderACL, - }, - } - var resp structs.JobDeregisterResponse - if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil { - c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err) + // Reap the jobs + return c.jobReap(gcJob, eval.LeaderACL) +} + +// jobReap contacts the leader and issues a reap on the passed jobs +func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error { + // Call to the leader to issue the reap + for _, req := range c.partitionJobReap(jobs, leaderACL) { + var resp structs.JobBatchDeregisterResponse + if err := c.srv.RPC("Job.BatchDeregister", req, &resp); err != nil { + c.srv.logger.Printf("[ERR] sched.core: batch job reap failed: %v", err) return err } } @@ -172,6 +169,44 @@ OUTER: return nil } +// partitionJobReap returns a list of JobBatchDeregisterRequests to make, +// ensuring a single request does not contain too many jobs. This is necessary +// to ensure that the Raft transaction does not become too large. +func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string) []*structs.JobBatchDeregisterRequest { + option := &structs.JobDeregisterOptions{Purge: true} + var requests []*structs.JobBatchDeregisterRequest + submittedJobs := 0 + for submittedJobs != len(jobs) { + req := &structs.JobBatchDeregisterRequest{ + Jobs: make(map[structs.NamespacedID]*structs.JobDeregisterOptions), + WriteRequest: structs.WriteRequest{ + Region: c.srv.config.Region, + AuthToken: leaderACL, + }, + } + requests = append(requests, req) + available := maxIdsPerReap + + if remaining := len(jobs) - submittedJobs; remaining > 0 { + if remaining <= available { + for _, job := range jobs[submittedJobs:] { + jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace} + req.Jobs[jns] = option + } + submittedJobs += remaining + } else { + for _, job := range jobs[submittedJobs : submittedJobs+available] { + jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace} + req.Jobs[jns] = option + } + submittedJobs += available + } + } + } + + return requests +} + // evalGC is used to garbage collect old evaluations func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { // Iterate over the evaluations diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index a1ab80754..68ac7dbc8 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -1767,6 +1767,33 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) { } } +func TestCoreScheduler_PartitionJobReap(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create a core scheduler + snap, err := s1.fsm.State().Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Set the max ids per reap to something lower. + maxIdsPerReap = 2 + + jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()} + requests := core.(*CoreScheduler).partitionJobReap(jobs, "") + require.Len(requests, 2) + + first := requests[0] + second := requests[1] + require.Len(first.Jobs, 2) + require.Len(second.Jobs, 1) +} + // Tests various scenarios when allocations are eligible to be GCed func TestAllocation_GCEligible(t *testing.T) { type testCase struct {