mirror of
https://github.com/kemko/nomad.git
synced 2026-01-09 11:55:42 +03:00
Limit GC size
This commit is contained in:
@@ -10,6 +10,13 @@ import (
|
||||
"github.com/hashicorp/nomad/scheduler"
|
||||
)
|
||||
|
||||
var (
|
||||
// maxIdsPerReap is the maximum number of evals and allocations to reap in a
|
||||
// single Raft transaction. This is to ensure that the Raft message does not
|
||||
// become too large.
|
||||
maxIdsPerReap = (1024 * 512) / 36 // 0.5 MB of ids.
|
||||
)
|
||||
|
||||
// CoreScheduler is a special "scheduler" that is registered
|
||||
// as "_core". It is used to run various administrative work
|
||||
// across the cluster.
|
||||
@@ -232,22 +239,62 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64)
|
||||
// allocs.
|
||||
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
|
||||
// Call to the leader to issue the reap
|
||||
req := structs.EvalDeleteRequest{
|
||||
Evals: evals,
|
||||
Allocs: allocs,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.srv.config.Region,
|
||||
},
|
||||
}
|
||||
var resp structs.GenericResponse
|
||||
if err := c.srv.RPC("Eval.Reap", &req, &resp); err != nil {
|
||||
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
|
||||
return err
|
||||
for _, req := range c.partitionReap(evals, allocs) {
|
||||
var resp structs.GenericResponse
|
||||
if err := c.srv.RPC("Eval.Reap", req, &resp); err != nil {
|
||||
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// partitionReap returns a list of EvalDeleteRequest to make, ensuring a single
|
||||
// request does not contain too many allocations and evaluations. This is
|
||||
// necessary to ensure that the Raft transaction does not become too large.
|
||||
func (c *CoreScheduler) partitionReap(evals, allocs []string) []*structs.EvalDeleteRequest {
|
||||
var requests []*structs.EvalDeleteRequest
|
||||
var submittedEvals, submittedAllocs int
|
||||
for submittedEvals != len(evals) || submittedAllocs != len(allocs) {
|
||||
req := &structs.EvalDeleteRequest{
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.srv.config.Region,
|
||||
},
|
||||
}
|
||||
requests = append(requests, req)
|
||||
available := maxIdsPerReap
|
||||
|
||||
// Add the evals first
|
||||
if remaining := len(evals) - submittedEvals; remaining > 0 {
|
||||
if remaining <= available {
|
||||
req.Evals = evals[submittedEvals:]
|
||||
available -= remaining
|
||||
submittedEvals += remaining
|
||||
} else {
|
||||
req.Evals = evals[submittedEvals : submittedEvals+available]
|
||||
submittedEvals += available
|
||||
|
||||
// Exhausted space so skip adding allocs
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Add the allocs
|
||||
if remaining := len(allocs) - submittedAllocs; remaining > 0 {
|
||||
if remaining <= available {
|
||||
req.Allocs = allocs[submittedAllocs:]
|
||||
submittedAllocs += remaining
|
||||
} else {
|
||||
req.Allocs = allocs[submittedAllocs : submittedAllocs+available]
|
||||
submittedAllocs += available
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return requests
|
||||
}
|
||||
|
||||
// nodeGC is used to garbage collect old nodes
|
||||
func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
|
||||
// Iterate over the evaluations
|
||||
|
||||
@@ -534,3 +534,41 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoreScheduler_PartitionReap(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create a core scheduler
|
||||
snap, err := s1.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
core := NewCoreScheduler(s1, snap)
|
||||
|
||||
// Set the max ids per reap to something lower.
|
||||
maxIdsPerReap = 2
|
||||
|
||||
evals := []string{"a", "b", "c"}
|
||||
allocs := []string{"1", "2", "3"}
|
||||
requests := core.(*CoreScheduler).partitionReap(evals, allocs)
|
||||
if len(requests) != 3 {
|
||||
t.Fatalf("Expected 3 requests got: %v", requests)
|
||||
}
|
||||
|
||||
first := requests[0]
|
||||
if len(first.Evals) != 2 && len(first.Allocs) != 0 {
|
||||
t.Fatalf("Unexpected first request: %v", first)
|
||||
}
|
||||
|
||||
second := requests[1]
|
||||
if len(second.Evals) != 1 && len(second.Allocs) != 1 {
|
||||
t.Fatalf("Unexpected second request: %v", second)
|
||||
}
|
||||
|
||||
third := requests[2]
|
||||
if len(third.Evals) != 0 && len(third.Allocs) != 2 {
|
||||
t.Fatalf("Unexpected third request: %v", third)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user