From d95a3766aed73b04c37ab1185dd1e799b60e2a39 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 19 Mar 2025 14:32:46 -0400 Subject: [PATCH] client: fix client blocking during garbage collection (#25123) This change removes any blocking calls to destroyAllocRunner, which caused nomad clients to block when running allocations in certain scenarios. In addition, this change consolidates client GC by removing the MakeRoomFor method, which is redundant to keepUsageBelowThreshold. --------- Co-authored-by: Tim Gross --- .changelog/25123.txt | 3 + client/client.go | 6 - client/gc.go | 110 +---------- client/gc_test.go | 462 +++++++------------------------------------ 4 files changed, 74 insertions(+), 507 deletions(-) create mode 100644 .changelog/25123.txt diff --git a/.changelog/25123.txt b/.changelog/25123.txt new file mode 100644 index 000000000..3ab8d9b6d --- /dev/null +++ b/.changelog/25123.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: remove blocking call during client gc +``` diff --git a/client/client.go b/client/client.go index 34d292c8a..f0d77bf77 100644 --- a/client/client.go +++ b/client/client.go @@ -2615,12 +2615,6 @@ func (c *Client) runAllocs(update *allocUpdates) { c.updateAlloc(update) } - // Make room for new allocations before running - if err := c.garbageCollector.MakeRoomFor(diff.added); err != nil { - c.logger.Error("error making room for new allocations", "error", err) - errs++ - } - // Start the new allocations for _, add := range diff.added { migrateToken := update.migrateTokens[add.ID] diff --git a/client/gc.go b/client/gc.go index 7c4327891..154f8e711 100644 --- a/client/gc.go +++ b/client/gc.go @@ -12,7 +12,6 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/hoststats" - "github.com/hashicorp/nomad/nomad/structs" ) const ( @@ -165,8 +164,7 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { break } - // Destroy the alloc runner and wait until it exits - a.destroyAllocRunner(gcAlloc.allocID, gcAlloc.allocRunner, reason) + go a.destroyAllocRunner(gcAlloc.allocID, gcAlloc.allocRunner, reason) } return nil } @@ -232,112 +230,6 @@ func (a *AllocGarbageCollector) CollectAll() { } } -// MakeRoomFor garbage collects enough number of allocations in the terminal -// state to make room for new allocations -func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error { - if len(allocations) == 0 { - // Nothing to make room for! - return nil - } - - // GC allocs until below the max limit + the new allocations - max := a.config.MaxAllocs - len(allocations) - for a.allocCounter.NumAllocs() > max { - select { - case <-a.shutdownCh: - return nil - default: - } - - gcAlloc := a.allocRunners.Pop() - if gcAlloc == nil { - // It's fine if we can't lower below the limit here as - // we'll keep trying to drop below the limit with each - // periodic gc - break - } - - // Destroy the alloc runner and wait until it exits - a.destroyAllocRunner(gcAlloc.allocID, gcAlloc.allocRunner, fmt.Sprintf("new allocations and over max (%d)", a.config.MaxAllocs)) - } - - totalResource := &structs.AllocatedSharedResources{} - for _, alloc := range allocations { - // COMPAT(0.11): Remove in 0.11 - if alloc.AllocatedResources != nil { - totalResource.Add(&alloc.AllocatedResources.Shared) - } else { - totalResource.DiskMB += int64(alloc.Resources.DiskMB) - } - } - - // If the host has enough free space to accommodate the new allocations then - // we don't need to garbage collect terminated allocations - if hostStats := a.statsCollector.Stats(); hostStats != nil { - var availableForAllocations uint64 - if hostStats.AllocDirStats.Available < uint64(a.config.ReservedDiskMB*MB) { - availableForAllocations = 0 - } else { - availableForAllocations = hostStats.AllocDirStats.Available - uint64(a.config.ReservedDiskMB*MB) - } - if uint64(totalResource.DiskMB*MB) < availableForAllocations { - return nil - } - } - - var diskCleared int64 - for { - select { - case <-a.shutdownCh: - return nil - default: - } - - // Collect host stats and see if we still need to remove older - // allocations - var allocDirStats *hoststats.DiskStats - if err := a.statsCollector.Collect(); err == nil { - if hostStats := a.statsCollector.Stats(); hostStats != nil { - allocDirStats = hostStats.AllocDirStats - } - } - - if allocDirStats != nil { - if allocDirStats.Available >= uint64(totalResource.DiskMB*MB) { - break - } - } else { - // Falling back to a simpler model to know if we have enough disk - // space if stats collection fails - if diskCleared >= totalResource.DiskMB { - break - } - } - - gcAlloc := a.allocRunners.Pop() - if gcAlloc == nil { - break - } - - ar := gcAlloc.allocRunner - alloc := ar.Alloc() - - // COMPAT(0.11): Remove in 0.11 - var allocDiskMB int64 - if alloc.AllocatedResources != nil { - allocDiskMB = alloc.AllocatedResources.Shared.DiskMB - } else { - allocDiskMB = int64(alloc.Resources.DiskMB) - } - - // Destroy the alloc runner and wait until it exits - a.destroyAllocRunner(gcAlloc.allocID, ar, fmt.Sprintf("freeing %d MB for new allocations", allocDiskMB)) - - diskCleared += allocDiskMB - } - return nil -} - // MarkForCollection starts tracking an allocation for Garbage Collection func (a *AllocGarbageCollector) MarkForCollection(allocID string, ar interfaces.AllocRunner) { if a.allocRunners.Push(allocID, ar) { diff --git a/client/gc_test.go b/client/gc_test.go index fa7ebdffb..33d61e579 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -4,21 +4,17 @@ package client import ( - "fmt" "testing" "time" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocrunner" "github.com/hashicorp/nomad/client/allocrunner/interfaces" - "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/hoststats" "github.com/hashicorp/nomad/helper/testlog" - "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/require" + "github.com/shoenig/test/must" ) func gcConfig() *GCConfig { @@ -192,400 +188,82 @@ func TestAllocGarbageCollector_CollectAll(t *testing.T) { } } -func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) { +func TestAllocGarbageCollector_KeepUsageBelowThreshold(t *testing.T) { ci.Parallel(t) - logger := testlog.HCLogger(t) - statsCollector := &MockStatsCollector{} - conf := gcConfig() - conf.ReservedDiskMB = 20 - gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - - ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - defer cleanup1() - ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - defer cleanup2() - - go ar1.Run() - go ar2.Run() - - gc.MarkForCollection(ar1.Alloc().ID, ar1) - gc.MarkForCollection(ar2.Alloc().ID, ar2) - - // Exit the alloc runners - exitAllocRunner(ar1, ar2) - - // Make stats collector report 200MB free out of which 20MB is reserved - statsCollector.availableValues = []uint64{200 * MB} - statsCollector.usedPercents = []float64{0} - statsCollector.inodePercents = []float64{0} - - alloc := mock.Alloc() - alloc.AllocatedResources.Shared.DiskMB = 150 - if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { - t.Fatalf("err: %v", err) + testCases := []struct { + name string + counter *MockAllocCounter + stats *MockStatsCollector + expGC bool + }{ + { + name: "garbage collects alloc when disk usage above threshold", + counter: &MockAllocCounter{}, + stats: &MockStatsCollector{ + availableValues: []uint64{0, 0}, + usedPercents: []float64{85, 85}, // above threshold + inodePercents: []float64{0, 0}, + }, + expGC: true, + }, + { + name: "garbage collects alloc when inode usage above threshold", + counter: &MockAllocCounter{}, + stats: &MockStatsCollector{ + availableValues: []uint64{0, 0}, + usedPercents: []float64{0, 0}, + inodePercents: []float64{90, 90}, // above threshold + }, + expGC: true, + }, + { + name: "garbage collects alloc when liveAllocs above maxAllocs threshold", + counter: &MockAllocCounter{ + allocs: 150, // above threshold + }, + stats: &MockStatsCollector{ + availableValues: []uint64{0, 0}, + usedPercents: []float64{0, 0}, + inodePercents: []float64{0, 0}, + }, + expGC: true, + }, + { + name: "exits when there is no reason to GC", + counter: &MockAllocCounter{ + allocs: 0, + }, + stats: &MockStatsCollector{ + availableValues: []uint64{0, 0}, + usedPercents: []float64{0, 0}, + inodePercents: []float64{0, 0}, + }, + expGC: false, + }, } - // When we have enough disk available and don't need to do any GC so we - // should have two ARs in the GC queue - for i := 0; i < 2; i++ { - if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { - t.Fatalf("err: %v", gcAlloc) - } - } -} + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + logger := testlog.HCLogger(t) + gc := NewAllocGarbageCollector(logger, tc.stats, tc.counter, gcConfig()) -func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) { - ci.Parallel(t) + // add a single alloc for garbage collection + ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) + defer cleanup1() + exitAllocRunner(ar1) + gc.MarkForCollection(ar1.Alloc().ID, ar1) - logger := testlog.HCLogger(t) - statsCollector := &MockStatsCollector{} - conf := gcConfig() - conf.ReservedDiskMB = 20 - gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) + // gc + err := gc.keepUsageBelowThreshold() + must.NoError(t, err) - ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - defer cleanup1() - ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - defer cleanup2() - - go ar1.Run() - go ar2.Run() - - gc.MarkForCollection(ar1.Alloc().ID, ar1) - gc.MarkForCollection(ar2.Alloc().ID, ar2) - - // Exit the alloc runners - exitAllocRunner(ar1, ar2) - - // Make stats collector report 80MB and 175MB free in subsequent calls - statsCollector.availableValues = []uint64{80 * MB, 80 * MB, 175 * MB} - statsCollector.usedPercents = []float64{0, 0, 0} - statsCollector.inodePercents = []float64{0, 0, 0} - - alloc := mock.Alloc() - alloc.AllocatedResources.Shared.DiskMB = 150 - if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { - t.Fatalf("err: %v", err) - } - - // We should be GC-ing one alloc - if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { - t.Fatalf("err: %v", gcAlloc) - } - - if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { - t.Fatalf("gcAlloc: %v", gcAlloc) - } -} - -func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) { - ci.Parallel(t) - - logger := testlog.HCLogger(t) - statsCollector := &MockStatsCollector{} - conf := gcConfig() - conf.ReservedDiskMB = 20 - gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - - ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - defer cleanup1() - ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - defer cleanup2() - - go ar1.Run() - go ar2.Run() - - gc.MarkForCollection(ar1.Alloc().ID, ar1) - gc.MarkForCollection(ar2.Alloc().ID, ar2) - - // Exit the alloc runners - exitAllocRunner(ar1, ar2) - - // Make stats collector report 80MB and 95MB free in subsequent calls - statsCollector.availableValues = []uint64{80 * MB, 80 * MB, 95 * MB} - statsCollector.usedPercents = []float64{0, 0, 0} - statsCollector.inodePercents = []float64{0, 0, 0} - - alloc := mock.Alloc() - alloc.AllocatedResources.Shared.DiskMB = 150 - if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { - t.Fatalf("err: %v", err) - } - - // We should be GC-ing all the alloc runners - if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { - t.Fatalf("gcAlloc: %v", gcAlloc) - } -} - -func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) { - ci.Parallel(t) - - logger := testlog.HCLogger(t) - statsCollector := &MockStatsCollector{} - conf := gcConfig() - conf.ReservedDiskMB = 20 - gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - - ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - cleanup1() - ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - cleanup2() - - go ar1.Run() - go ar2.Run() - - gc.MarkForCollection(ar1.Alloc().ID, ar1) - gc.MarkForCollection(ar2.Alloc().ID, ar2) - - // Exit the alloc runners - exitAllocRunner(ar1, ar2) - - alloc := mock.Alloc() - alloc.AllocatedResources.Shared.DiskMB = 150 - if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { - t.Fatalf("err: %v", err) - } - - // We should be GC-ing one alloc - if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { - t.Fatalf("err: %v", gcAlloc) - } - - if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { - t.Fatalf("gcAlloc: %v", gcAlloc) - } -} - -// TestAllocGarbageCollector_MakeRoomFor_MaxAllocs asserts that when making room for new -// allocs, terminal allocs are GC'd until old_allocs + new_allocs <= limit -func TestAllocGarbageCollector_MakeRoomFor_MaxAllocs(t *testing.T) { - ci.Parallel(t) - - const maxAllocs = 6 - require := require.New(t) - - server, serverAddr, cleanupS := testServer(t, nil) - defer cleanupS() - testutil.WaitForLeader(t, server.RPC) - - client, cleanup := TestClient(t, func(c *config.Config) { - c.GCMaxAllocs = maxAllocs - c.GCDiskUsageThreshold = 100 - c.GCInodeUsageThreshold = 100 - c.GCParallelDestroys = 1 - c.GCInterval = time.Hour - c.RPCHandler = server - c.Servers = []string{serverAddr} - c.GetDefaultConsul().ClientAutoJoin = new(bool) - }) - defer cleanup() - waitTilNodeReady(client, t) - - job := mock.Job() - job.TaskGroups[0].Count = 1 - job.TaskGroups[0].Tasks[0].Driver = "mock_driver" - job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ - "run_for": "30s", - } - - index := uint64(98) - nextIndex := func() uint64 { - index++ - return index - } - - upsertJobFn := func(server *nomad.Server, j *structs.Job) { - state := server.State() - require.NoError(state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(), nil, j)) - require.NoError(state.UpsertJobSummary(nextIndex(), mock.JobSummary(j.ID))) - } - - // Insert the Job - upsertJobFn(server, job) - - upsertAllocFn := func(server *nomad.Server, a *structs.Allocation) { - state := server.State() - require.NoError(state.UpsertAllocs(structs.MsgTypeTestSetup, nextIndex(), []*structs.Allocation{a})) - } - - upsertNewAllocFn := func(server *nomad.Server, j *structs.Job) *structs.Allocation { - alloc := mock.Alloc() - alloc.Job = j - alloc.JobID = j.ID - alloc.NodeID = client.NodeID() - - upsertAllocFn(server, alloc) - - return alloc.Copy() - } - - var allocations []*structs.Allocation - - // Fill the node with allocations - for i := 0; i < maxAllocs; i++ { - allocations = append(allocations, upsertNewAllocFn(server, job)) - } - - // Wait until the allocations are ready - testutil.WaitForResult(func() (bool, error) { - ar := len(client.getAllocRunners()) - - return ar == maxAllocs, fmt.Errorf("Expected %d allocs, got %d", maxAllocs, ar) - }, func(err error) { - t.Fatalf("Allocs did not start: %v", err) - }) - - // Mark the first three as terminal - for i := 0; i < 3; i++ { - allocations[i].DesiredStatus = structs.AllocDesiredStatusStop - upsertAllocFn(server, allocations[i].Copy()) - } - - // Wait until the allocations are stopped - testutil.WaitForResult(func() (bool, error) { - ar := client.getAllocRunners() - stopped := 0 - for _, r := range ar { - if r.Alloc().TerminalStatus() { - stopped++ + gcAlloc := gc.allocRunners.Pop() + if tc.expGC { + must.Nil(t, gcAlloc) + } else { + must.NotNil(t, gcAlloc) } - } - - return stopped == 3, fmt.Errorf("Expected %d terminal allocs, got %d", 3, stopped) - }, func(err error) { - t.Fatalf("Allocs did not terminate: %v", err) - }) - - // Upsert a new allocation - // This does not get appended to `allocations` as we do not use them again. - upsertNewAllocFn(server, job) - - // A single allocation should be GC'd - testutil.WaitForResult(func() (bool, error) { - ar := client.getAllocRunners() - destroyed := 0 - for _, r := range ar { - if r.IsDestroyed() { - destroyed++ - } - } - - return destroyed == 1, fmt.Errorf("Expected %d gc'd ars, got %d", 1, destroyed) - }, func(err error) { - t.Fatalf("Allocs did not get GC'd: %v", err) - }) - - // Upsert a new allocation - // This does not get appended to `allocations` as we do not use them again. - upsertNewAllocFn(server, job) - - // 2 allocations should be GC'd - testutil.WaitForResult(func() (bool, error) { - ar := client.getAllocRunners() - destroyed := 0 - for _, r := range ar { - if r.IsDestroyed() { - destroyed++ - } - } - - return destroyed == 2, fmt.Errorf("Expected %d gc'd ars, got %d", 2, destroyed) - }, func(err error) { - t.Fatalf("Allocs did not get GC'd: %v", err) - }) - - // check that all 8 get run eventually - testutil.WaitForResult(func() (bool, error) { - ar := client.getAllocRunners() - if len(ar) != 8 { - return false, fmt.Errorf("expected 8 ARs, found %d: %v", len(ar), ar) - } - return true, nil - }, func(err error) { - require.NoError(err) - }) -} - -func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) { - ci.Parallel(t) - - logger := testlog.HCLogger(t) - statsCollector := &MockStatsCollector{} - conf := gcConfig() - conf.ReservedDiskMB = 20 - gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - - ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - defer cleanup1() - ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - defer cleanup2() - - go ar1.Run() - go ar2.Run() - - gc.MarkForCollection(ar1.Alloc().ID, ar1) - gc.MarkForCollection(ar2.Alloc().ID, ar2) - - // Exit the alloc runners - exitAllocRunner(ar1, ar2) - - statsCollector.availableValues = []uint64{1000} - statsCollector.usedPercents = []float64{20} - statsCollector.inodePercents = []float64{10} - - if err := gc.keepUsageBelowThreshold(); err != nil { - t.Fatalf("err: %v", err) - } - - // We shouldn't GC any of the allocs since the used percent values are below - // threshold - for i := 0; i < 2; i++ { - if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { - t.Fatalf("err: %v", gcAlloc) - } - } -} - -func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) { - ci.Parallel(t) - - logger := testlog.HCLogger(t) - statsCollector := &MockStatsCollector{} - conf := gcConfig() - conf.ReservedDiskMB = 20 - gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - - ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - defer cleanup1() - ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc()) - defer cleanup2() - - go ar1.Run() - go ar2.Run() - - gc.MarkForCollection(ar1.Alloc().ID, ar1) - gc.MarkForCollection(ar2.Alloc().ID, ar2) - - // Exit the alloc runners - exitAllocRunner(ar1, ar2) - - statsCollector.availableValues = []uint64{1000, 800} - statsCollector.usedPercents = []float64{85, 60} - statsCollector.inodePercents = []float64{50, 30} - - if err := gc.keepUsageBelowThreshold(); err != nil { - t.Fatalf("err: %v", err) - } - - // We should be GC-ing only one of the alloc runners since the second time - // used percent returns a number below threshold. - if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { - t.Fatalf("err: %v", gcAlloc) - } - - if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { - t.Fatalf("gcAlloc: %v", gcAlloc) + }) } }