mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
Merge pull request #4960 from hashicorp/dani/b-gc-tests
Re-enable Client GC tests
This commit is contained in:
@@ -2,79 +2,16 @@ package allocrunner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocwatcher"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulapi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/shared/catalog"
|
||||
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// MockStateUpdater implements the AllocStateHandler interface and records
|
||||
// alloc updates.
|
||||
type MockStateUpdater struct {
|
||||
Updates []*structs.Allocation
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// AllocStateUpdated implements the AllocStateHandler interface and records an
|
||||
// alloc update.
|
||||
func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation) {
|
||||
m.mu.Lock()
|
||||
m.Updates = append(m.Updates, alloc)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// Last returns a copy of the last alloc (or nil) update. Safe for concurrent
|
||||
// access with updates.
|
||||
func (m *MockStateUpdater) Last() *structs.Allocation {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
n := len(m.Updates)
|
||||
if n == 0 {
|
||||
return nil
|
||||
}
|
||||
return m.Updates[n-1].Copy()
|
||||
}
|
||||
|
||||
// Reset resets the recorded alloc updates.
|
||||
func (m *MockStateUpdater) Reset() {
|
||||
m.mu.Lock()
|
||||
m.Updates = nil
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// testAllocRunnerConfig returns a new allocrunner.Config with mocks and noop
|
||||
// versions of dependencies along with a cleanup func.
|
||||
func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) {
|
||||
pluginLoader := catalog.TestPluginLoader(t)
|
||||
clientConf, cleanup := config.TestClientConfig(t)
|
||||
conf := &Config{
|
||||
// Copy the alloc in case the caller edits and reuses it
|
||||
Alloc: alloc.Copy(),
|
||||
Logger: clientConf.Logger,
|
||||
ClientConfig: clientConf,
|
||||
StateDB: state.NoopDB{},
|
||||
Consul: consulapi.NewMockConsulServiceClient(t, clientConf.Logger),
|
||||
Vault: vaultclient.NewMockVaultClient(),
|
||||
StateUpdater: &MockStateUpdater{},
|
||||
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
|
||||
PluginSingletonLoader: singleton.NewSingletonLoader(clientConf.Logger, pluginLoader),
|
||||
DeviceManager: devicemanager.NoopMockManager(),
|
||||
}
|
||||
return conf, cleanup
|
||||
}
|
||||
|
||||
// TestAllocRunner_AllocState_Initialized asserts that getting TaskStates via
|
||||
// AllocState() are initialized even before the AllocRunner has run.
|
||||
func TestAllocRunner_AllocState_Initialized(t *testing.T) {
|
||||
|
||||
81
client/allocrunner/testing.go
Normal file
81
client/allocrunner/testing.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocwatcher"
|
||||
clientconfig "github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/shared/catalog"
|
||||
"github.com/hashicorp/nomad/plugins/shared/singleton"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// MockStateUpdater implements the AllocStateHandler interface and records
|
||||
// alloc updates.
|
||||
type MockStateUpdater struct {
|
||||
Updates []*structs.Allocation
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// AllocStateUpdated implements the AllocStateHandler interface and records an
|
||||
// alloc update.
|
||||
func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation) {
|
||||
m.mu.Lock()
|
||||
m.Updates = append(m.Updates, alloc)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// Last returns a copy of the last alloc (or nil) update. Safe for concurrent
|
||||
// access with updates.
|
||||
func (m *MockStateUpdater) Last() *structs.Allocation {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
n := len(m.Updates)
|
||||
if n == 0 {
|
||||
return nil
|
||||
}
|
||||
return m.Updates[n-1].Copy()
|
||||
}
|
||||
|
||||
// Reset resets the recorded alloc updates.
|
||||
func (m *MockStateUpdater) Reset() {
|
||||
m.mu.Lock()
|
||||
m.Updates = nil
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) {
|
||||
pluginLoader := catalog.TestPluginLoader(t)
|
||||
clientConf, cleanup := clientconfig.TestClientConfig(t)
|
||||
conf := &Config{
|
||||
// Copy the alloc in case the caller edits and reuses it
|
||||
Alloc: alloc.Copy(),
|
||||
Logger: clientConf.Logger,
|
||||
ClientConfig: clientConf,
|
||||
StateDB: state.NoopDB{},
|
||||
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
|
||||
Vault: vaultclient.NewMockVaultClient(),
|
||||
StateUpdater: &MockStateUpdater{},
|
||||
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
|
||||
PluginSingletonLoader: singleton.NewSingletonLoader(clientConf.Logger, pluginLoader),
|
||||
DeviceManager: devicemanager.NoopMockManager(),
|
||||
}
|
||||
return conf, cleanup
|
||||
}
|
||||
|
||||
func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation) (*allocRunner, func()) {
|
||||
t.Helper()
|
||||
cfg, cleanup := testAllocRunnerConfig(t, alloc)
|
||||
ar, err := NewAllocRunner(cfg)
|
||||
if err != nil {
|
||||
require.NoError(t, err, "Failed to setup AllocRunner")
|
||||
}
|
||||
|
||||
return ar, cleanup
|
||||
}
|
||||
@@ -1,7 +1,5 @@
|
||||
package client
|
||||
|
||||
/*
|
||||
TODO(clientv2)
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
@@ -11,9 +9,11 @@ import (
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/stats"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func gcConfig() *GCConfig {
|
||||
@@ -28,7 +28,7 @@ func gcConfig() *GCConfig {
|
||||
|
||||
// exitAllocRunner is a helper that updates the allocs on the given alloc
|
||||
// runners to be terminal
|
||||
func exitAllocRunner(runners ...*allocrunner.AllocRunner) {
|
||||
func exitAllocRunner(runners ...AllocRunner) {
|
||||
for _, ar := range runners {
|
||||
terminalAlloc := ar.Alloc()
|
||||
terminalAlloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
@@ -40,15 +40,19 @@ func TestIndexedGCAllocPQ(t *testing.T) {
|
||||
t.Parallel()
|
||||
pq := NewIndexedGCAllocPQ()
|
||||
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar3 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar4 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup1()
|
||||
ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup2()
|
||||
ar3, cleanup3 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup3()
|
||||
ar4, cleanup4 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup4()
|
||||
|
||||
pq.Push(ar1)
|
||||
pq.Push(ar2)
|
||||
pq.Push(ar3)
|
||||
pq.Push(ar4)
|
||||
pq.Push(ar1.Alloc().ID, ar1)
|
||||
pq.Push(ar2.Alloc().ID, ar2)
|
||||
pq.Push(ar3.Alloc().ID, ar3)
|
||||
pq.Push(ar4.Alloc().ID, ar4)
|
||||
|
||||
allocID := pq.Pop().allocRunner.Alloc().ID
|
||||
if allocID != ar1.Alloc().ID {
|
||||
@@ -119,11 +123,13 @@ func (m *MockStatsCollector) Stats() *stats.HostStats {
|
||||
|
||||
func TestAllocGarbageCollector_MarkForCollection(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig())
|
||||
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
gc.MarkForCollection(ar1)
|
||||
ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup1()
|
||||
|
||||
gc.MarkForCollection(ar1.Alloc().ID, ar1)
|
||||
|
||||
gcAlloc := gc.allocRunners.Pop()
|
||||
if gcAlloc == nil || gcAlloc.allocRunner != ar1 {
|
||||
@@ -133,16 +139,19 @@ func TestAllocGarbageCollector_MarkForCollection(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_Collect(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig())
|
||||
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup1()
|
||||
ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup2()
|
||||
|
||||
go ar1.Run()
|
||||
go ar2.Run()
|
||||
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
gc.MarkForCollection(ar1.Alloc().ID, ar1)
|
||||
gc.MarkForCollection(ar2.Alloc().ID, ar2)
|
||||
|
||||
// Exit the alloc runners
|
||||
exitAllocRunner(ar1, ar2)
|
||||
@@ -156,13 +165,16 @@ func TestAllocGarbageCollector_Collect(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_CollectAll(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig())
|
||||
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup1()
|
||||
ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup2()
|
||||
|
||||
gc.MarkForCollection(ar1.Alloc().ID, ar1)
|
||||
gc.MarkForCollection(ar2.Alloc().ID, ar2)
|
||||
|
||||
gc.CollectAll()
|
||||
gcAlloc := gc.allocRunners.Pop()
|
||||
@@ -173,19 +185,22 @@ func TestAllocGarbageCollector_CollectAll(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup1()
|
||||
ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup2()
|
||||
|
||||
go ar1.Run()
|
||||
go ar2.Run()
|
||||
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
gc.MarkForCollection(ar1.Alloc().ID, ar1)
|
||||
gc.MarkForCollection(ar2.Alloc().ID, ar2)
|
||||
|
||||
// Exit the alloc runners
|
||||
exitAllocRunner(ar1, ar2)
|
||||
@@ -212,19 +227,22 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T)
|
||||
|
||||
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup1()
|
||||
ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup2()
|
||||
|
||||
go ar1.Run()
|
||||
go ar2.Run()
|
||||
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
gc.MarkForCollection(ar1.Alloc().ID, ar1)
|
||||
gc.MarkForCollection(ar2.Alloc().ID, ar2)
|
||||
|
||||
// Exit the alloc runners
|
||||
exitAllocRunner(ar1, ar2)
|
||||
@@ -252,19 +270,22 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup1()
|
||||
ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup2()
|
||||
|
||||
go ar1.Run()
|
||||
go ar2.Run()
|
||||
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
gc.MarkForCollection(ar1.Alloc().ID, ar1)
|
||||
gc.MarkForCollection(ar2.Alloc().ID, ar2)
|
||||
|
||||
// Exit the alloc runners
|
||||
exitAllocRunner(ar1, ar2)
|
||||
@@ -288,19 +309,22 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
cleanup1()
|
||||
ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
cleanup2()
|
||||
|
||||
go ar1.Run()
|
||||
go ar2.Run()
|
||||
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
gc.MarkForCollection(ar1.Alloc().ID, ar1)
|
||||
gc.MarkForCollection(ar2.Alloc().ID, ar2)
|
||||
|
||||
// Exit the alloc runners
|
||||
exitAllocRunner(ar1, ar2)
|
||||
@@ -321,151 +345,163 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAllocGarbageCollector_MaxAllocs asserts that when making room for new
|
||||
// TestAllocGarbageCollector_MakeRoomFor_MaxAllocs asserts that when making room for new
|
||||
// allocs, terminal allocs are GC'd until old_allocs + new_allocs <= limit
|
||||
func TestAllocGarbageCollector_MaxAllocs(t *testing.T) {
|
||||
t.Parallel()
|
||||
func TestAllocGarbageCollector_MakeRoomFor_MaxAllocs(t *testing.T) {
|
||||
const maxAllocs = 6
|
||||
require := require.New(t)
|
||||
|
||||
server, serverAddr := testServer(t, nil)
|
||||
defer server.Shutdown()
|
||||
testutil.WaitForLeader(t, server.RPC)
|
||||
|
||||
const maxAllocs = 6
|
||||
client := TestClient(t, func(c *config.Config) {
|
||||
client, cleanup := TestClient(t, func(c *config.Config) {
|
||||
c.GCMaxAllocs = maxAllocs
|
||||
c.GCDiskUsageThreshold = 100
|
||||
c.GCInodeUsageThreshold = 100
|
||||
c.GCParallelDestroys = 1
|
||||
c.GCInterval = time.Hour
|
||||
|
||||
c.RPCHandler = server
|
||||
c.Servers = []string{serverAddr}
|
||||
c.ConsulConfig.ClientAutoJoin = new(bool) // squelch logs
|
||||
c.ConsulConfig.ClientAutoJoin = new(bool)
|
||||
})
|
||||
defer client.Shutdown()
|
||||
defer cleanup()
|
||||
waitTilNodeReady(client, t)
|
||||
|
||||
callN := 0
|
||||
assertAllocs := func(expectedAll, expectedDestroyed int) {
|
||||
// Wait for allocs to be started
|
||||
callN++
|
||||
client.logger.Printf("[TEST] %d -- Waiting for %d total allocs, %d GC'd", callN, expectedAll, expectedDestroyed)
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
all, destroyed := 0, 0
|
||||
for _, ar := range client.getAllocRunners() {
|
||||
all++
|
||||
if ar.IsDestroyed() {
|
||||
destroyed++
|
||||
}
|
||||
}
|
||||
return all == expectedAll && destroyed == expectedDestroyed, fmt.Errorf(
|
||||
"expected %d allocs (found %d); expected %d destroy (found %d)",
|
||||
expectedAll, all, expectedDestroyed, destroyed,
|
||||
)
|
||||
}, func(err error) {
|
||||
client.logger.Printf("[TEST] %d -- FAILED to find %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed)
|
||||
t.Fatalf("%d alloc state: %v", callN, err)
|
||||
})
|
||||
client.logger.Printf("[TEST] %d -- Found %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed)
|
||||
}
|
||||
|
||||
// Create a job
|
||||
state := server.State()
|
||||
job := mock.Job()
|
||||
job.TaskGroups[0].Count = 1
|
||||
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
|
||||
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
|
||||
"run_for": "30s",
|
||||
}
|
||||
nodeID := client.Node().ID
|
||||
if err := state.UpsertJob(98, job); err != nil {
|
||||
t.Fatalf("error upserting job: %v", err)
|
||||
}
|
||||
if err := state.UpsertJobSummary(99, mock.JobSummary(job.ID)); err != nil {
|
||||
t.Fatalf("error upserting job summary: %v", err)
|
||||
|
||||
index := uint64(98)
|
||||
nextIndex := func() uint64 {
|
||||
index++
|
||||
return index
|
||||
}
|
||||
|
||||
newAlloc := func() *structs.Allocation {
|
||||
upsertJobFn := func(server *nomad.Server, j *structs.Job) {
|
||||
state := server.State()
|
||||
require.NoError(state.UpsertJob(nextIndex(), j))
|
||||
require.NoError(state.UpsertJobSummary(nextIndex(), mock.JobSummary(j.ID)))
|
||||
}
|
||||
|
||||
// Insert the Job
|
||||
upsertJobFn(server, job)
|
||||
|
||||
upsertAllocFn := func(server *nomad.Server, a *structs.Allocation) {
|
||||
state := server.State()
|
||||
require.NoError(state.UpsertAllocs(nextIndex(), []*structs.Allocation{a}))
|
||||
}
|
||||
|
||||
upsertNewAllocFn := func(server *nomad.Server, j *structs.Job) *structs.Allocation {
|
||||
alloc := mock.Alloc()
|
||||
alloc.JobID = job.ID
|
||||
alloc.Job = job
|
||||
alloc.NodeID = nodeID
|
||||
return alloc
|
||||
alloc.Job = j
|
||||
alloc.JobID = j.ID
|
||||
alloc.NodeID = client.NodeID()
|
||||
|
||||
upsertAllocFn(server, alloc)
|
||||
|
||||
return alloc.Copy()
|
||||
}
|
||||
|
||||
// Create the allocations
|
||||
allocs := make([]*structs.Allocation, 7)
|
||||
for i := 0; i < len(allocs); i++ {
|
||||
allocs[i] = newAlloc()
|
||||
var allocations []*structs.Allocation
|
||||
|
||||
// Fill the node with allocations
|
||||
for i := 0; i < maxAllocs; i++ {
|
||||
allocations = append(allocations, upsertNewAllocFn(server, job))
|
||||
}
|
||||
|
||||
// Upsert a copy of the allocs as modifying the originals later would
|
||||
// cause a race
|
||||
{
|
||||
allocsCopy := make([]*structs.Allocation, len(allocs))
|
||||
for i, a := range allocs {
|
||||
allocsCopy[i] = a.Copy()
|
||||
// Wait until the allocations are ready
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
ar := len(client.getAllocRunners())
|
||||
|
||||
return ar == maxAllocs, fmt.Errorf("Expected %d allocs, got %d", maxAllocs, ar)
|
||||
}, func(err error) {
|
||||
t.Fatalf("Allocs did not start: %v", err)
|
||||
})
|
||||
|
||||
// Mark the first three as terminal
|
||||
for i := 0; i < 3; i++ {
|
||||
allocations[i].DesiredStatus = structs.AllocDesiredStatusStop
|
||||
upsertAllocFn(server, allocations[i].Copy())
|
||||
}
|
||||
|
||||
// Wait until the allocations are stopped
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
ar := client.getAllocRunners()
|
||||
stopped := 0
|
||||
for _, r := range ar {
|
||||
if r.Alloc().TerminalStatus() {
|
||||
stopped++
|
||||
}
|
||||
}
|
||||
if err := state.UpsertAllocs(100, allocsCopy); err != nil {
|
||||
t.Fatalf("error upserting initial allocs: %v", err)
|
||||
|
||||
return stopped == 3, fmt.Errorf("Expected %d terminal allocs, got %d", 3, stopped)
|
||||
}, func(err error) {
|
||||
t.Fatalf("Allocs did not terminate: %v", err)
|
||||
})
|
||||
|
||||
// Upsert a new allocation
|
||||
// This does not get appended to `allocations` as we do not use them again.
|
||||
upsertNewAllocFn(server, job)
|
||||
|
||||
// A single allocation should be GC'd
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
ar := client.getAllocRunners()
|
||||
destroyed := 0
|
||||
for _, r := range ar {
|
||||
if r.IsDestroyed() {
|
||||
destroyed++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 7 total, 0 GC'd
|
||||
assertAllocs(7, 0)
|
||||
return destroyed == 1, fmt.Errorf("Expected %d gc'd ars, got %d", 1, destroyed)
|
||||
}, func(err error) {
|
||||
t.Fatalf("Allocs did not get GC'd: %v", err)
|
||||
})
|
||||
|
||||
// Set the first few as terminal so they're marked for gc
|
||||
const terminalN = 4
|
||||
for i := 0; i < terminalN; i++ {
|
||||
// Copy the alloc so the pointers aren't shared
|
||||
alloc := allocs[i].Copy()
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
allocs[i] = alloc
|
||||
}
|
||||
if err := state.UpsertAllocs(101, allocs[:terminalN]); err != nil {
|
||||
t.Fatalf("error upserting stopped allocs: %v", err)
|
||||
}
|
||||
// Upsert a new allocation
|
||||
// This does not get appended to `allocations` as we do not use them again.
|
||||
upsertNewAllocFn(server, job)
|
||||
|
||||
// 7 total, 1 GC'd to get down to limit of 6
|
||||
assertAllocs(7, 1)
|
||||
// 2 allocations should be GC'd
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
ar := client.getAllocRunners()
|
||||
destroyed := 0
|
||||
for _, r := range ar {
|
||||
if r.IsDestroyed() {
|
||||
destroyed++
|
||||
}
|
||||
}
|
||||
|
||||
// Add one more alloc
|
||||
if err := state.UpsertAllocs(102, []*structs.Allocation{newAlloc()}); err != nil {
|
||||
t.Fatalf("error upserting new alloc: %v", err)
|
||||
}
|
||||
return destroyed == 2, fmt.Errorf("Expected %d gc'd ars, got %d", 2, destroyed)
|
||||
}, func(err error) {
|
||||
t.Fatalf("Allocs did not get GC'd: %v", err)
|
||||
})
|
||||
|
||||
// 8 total, 1 GC'd to get down to limit of 6
|
||||
// If this fails it may be due to the gc's Run and MarkRoomFor methods
|
||||
// gc'ing concurrently. May have to disable gc's run loop if this test
|
||||
// is flaky.
|
||||
assertAllocs(8, 2)
|
||||
|
||||
// Add new allocs to cause the gc of old terminal ones
|
||||
newAllocs := make([]*structs.Allocation, 4)
|
||||
for i := 0; i < len(newAllocs); i++ {
|
||||
newAllocs[i] = newAlloc()
|
||||
}
|
||||
if err := state.UpsertAllocs(200, newAllocs); err != nil {
|
||||
t.Fatalf("error upserting %d new allocs: %v", len(newAllocs), err)
|
||||
}
|
||||
|
||||
// 12 total, 4 GC'd total because all other allocs are alive
|
||||
assertAllocs(12, 4)
|
||||
require.Len(client.getAllocRunners(), 8)
|
||||
}
|
||||
|
||||
func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup1()
|
||||
ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup2()
|
||||
|
||||
go ar1.Run()
|
||||
go ar2.Run()
|
||||
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
gc.MarkForCollection(ar1.Alloc().ID, ar1)
|
||||
gc.MarkForCollection(ar2.Alloc().ID, ar2)
|
||||
|
||||
// Exit the alloc runners
|
||||
exitAllocRunner(ar1, ar2)
|
||||
@@ -489,19 +525,22 @@ func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
ar1, cleanup1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup1()
|
||||
ar2, cleanup2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc())
|
||||
defer cleanup2()
|
||||
|
||||
go ar1.Run()
|
||||
go ar2.Run()
|
||||
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
gc.MarkForCollection(ar1.Alloc().ID, ar1)
|
||||
gc.MarkForCollection(ar2.Alloc().ID, ar2)
|
||||
|
||||
// Exit the alloc runners
|
||||
exitAllocRunner(ar1, ar2)
|
||||
@@ -524,4 +563,3 @@ func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) {
|
||||
t.Fatalf("gcAlloc: %v", gcAlloc)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user