diff --git a/.changelog/25870.txt b/.changelog/25870.txt new file mode 100644 index 000000000..9829fdbb1 --- /dev/null +++ b/.changelog/25870.txt @@ -0,0 +1,7 @@ +```release-note:bug +telemetry: Fix excess CPU consumption from alloc stats collection +``` + +```release-note:bug +telemetry: Fixed a bug where alloc stats were still collected (but not published) if telemetry.publish_allocation_metrics=false. +``` diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index 41c57577c..212bc078e 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -30,15 +30,20 @@ type statsHook struct { // cancel is called by Exited cancel context.CancelFunc + // stats collection is expensive, so if we're not going to publish the stats + // there's no reason to collect them + doPublish bool + mu sync.Mutex logger hclog.Logger } -func newStatsHook(su StatsUpdater, interval time.Duration, logger hclog.Logger) *statsHook { +func newStatsHook(su StatsUpdater, interval time.Duration, doPublish bool, logger hclog.Logger) *statsHook { h := &statsHook{ - updater: su, - interval: interval, + updater: su, + interval: interval, + doPublish: doPublish, } h.logger = logger.Named(h.Name()) return h @@ -51,6 +56,10 @@ func (*statsHook) Name() string { func (h *statsHook) Poststart(_ context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error { h.mu.Lock() defer h.mu.Unlock() + if !h.doPublish { + h.logger.Trace("not collecting task stats") + return nil + } // This shouldn't happen, but better safe than risk leaking a goroutine if h.cancel != nil { diff --git a/client/allocrunner/taskrunner/stats_hook_test.go b/client/allocrunner/taskrunner/stats_hook_test.go index 4b1a08181..cc527c53b 100644 --- a/client/allocrunner/taskrunner/stats_hook_test.go +++ b/client/allocrunner/taskrunner/stats_hook_test.go @@ -13,7 +13,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/interfaces" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/testlog" - "github.com/stretchr/testify/require" + "github.com/shoenig/test/must" ) // Statically assert the stats hook implements the expected interfaces @@ -88,7 +88,6 @@ func (m *mockDriverStats) Called() int { func TestTaskRunner_StatsHook_PoststartExited(t *testing.T) { ci.Parallel(t) - require := require.New(t) logger := testlog.HCLogger(t) su := newMockStatsUpdater() ds := new(mockDriverStats) @@ -96,23 +95,23 @@ func TestTaskRunner_StatsHook_PoststartExited(t *testing.T) { poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds} // Create hook - h := newStatsHook(su, time.Minute, logger) + h := newStatsHook(su, time.Minute, true, logger) // Always call Exited to cleanup goroutines defer h.Exited(context.Background(), nil, nil) // Run prestart - require.NoError(h.Poststart(context.Background(), poststartReq, nil)) + must.NoError(t, h.Poststart(context.Background(), poststartReq, nil)) // An initial stats collection should run and call the updater select { case ru := <-su.Ch: - require.Equal(uint64(1), ru.ResourceUsage.MemoryStats.RSS) + must.Eq(t, uint64(1), ru.ResourceUsage.MemoryStats.RSS) case <-time.After(10 * time.Second): t.Fatalf("timeout waiting for initial stats collection") } - require.NoError(h.Exited(context.Background(), nil, nil)) + must.NoError(t, h.Exited(context.Background(), nil, nil)) } // TestTaskRunner_StatsHook_Periodic asserts the stats hook collects stats on @@ -120,7 +119,6 @@ func TestTaskRunner_StatsHook_PoststartExited(t *testing.T) { func TestTaskRunner_StatsHook_Periodic(t *testing.T) { ci.Parallel(t) - require := require.New(t) logger := testlog.HCLogger(t) su := newMockStatsUpdater() @@ -131,11 +129,11 @@ func TestTaskRunner_StatsHook_Periodic(t *testing.T) { // Exited() can complete within the interval. const interval = 500 * time.Millisecond - h := newStatsHook(su, interval, logger) + h := newStatsHook(su, interval, true, logger) defer h.Exited(context.Background(), nil, nil) // Run prestart - require.NoError(h.Poststart(context.Background(), poststartReq, nil)) + must.NoError(t, h.Poststart(context.Background(), poststartReq, nil)) // An initial stats collection should run and call the updater var firstrun int64 @@ -160,7 +158,7 @@ func TestTaskRunner_StatsHook_Periodic(t *testing.T) { } // Exiting should prevent further updates - require.NoError(h.Exited(context.Background(), nil, nil)) + must.NoError(t, h.Exited(context.Background(), nil, nil)) // Should *not* get another update in ~500ms (see interval above) // we may get a single update due to race with exit @@ -185,7 +183,6 @@ WAITING: func TestTaskRunner_StatsHook_NotImplemented(t *testing.T) { ci.Parallel(t) - require := require.New(t) logger := testlog.HCLogger(t) su := newMockStatsUpdater() ds := &mockDriverStats{ @@ -194,11 +191,11 @@ func TestTaskRunner_StatsHook_NotImplemented(t *testing.T) { poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds} - h := newStatsHook(su, 1, logger) + h := newStatsHook(su, 1, true, logger) defer h.Exited(context.Background(), nil, nil) // Run prestart - require.NoError(h.Poststart(context.Background(), poststartReq, nil)) + must.NoError(t, h.Poststart(context.Background(), poststartReq, nil)) // An initial stats collection should run and *not* call the updater select { @@ -220,11 +217,11 @@ func TestTaskRunner_StatsHook_Backoff(t *testing.T) { poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds} - h := newStatsHook(su, time.Minute, logger) + h := newStatsHook(su, time.Minute, true, logger) defer h.Exited(context.Background(), nil, nil) // Run prestart - require.NoError(t, h.Poststart(context.Background(), poststartReq, nil)) + must.NoError(t, h.Poststart(context.Background(), poststartReq, nil)) timeout := time.After(500 * time.Millisecond) @@ -237,5 +234,5 @@ DRAIN: } } - require.Equal(t, ds.Called(), 1) + must.Eq(t, ds.Called(), 1) } diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 296221f16..04faa353e 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -70,7 +70,7 @@ func (tr *TaskRunner) initHooks() { newDispatchHook(alloc, hookLogger), newVolumeHook(tr, hookLogger), newArtifactHook(tr, tr.getter, hookLogger), - newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger), + newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, tr.clientConfig.PublishAllocationMetrics, hookLogger), newDeviceHook(tr.devicemanager, hookLogger), newAPIHook(tr.shutdownCtx, tr.clientConfig.APIListenerRegistrar, hookLogger), newWranglerHook(tr.wranglers, task.Name, alloc.ID, task.UsesCores(), hookLogger), diff --git a/drivers/shared/executor/executor.go b/drivers/shared/executor/executor.go index 7f42ec06c..66af5c29f 100644 --- a/drivers/shared/executor/executor.go +++ b/drivers/shared/executor/executor.go @@ -717,7 +717,7 @@ func (e *UniversalExecutor) handleStats(ch chan *cstructs.TaskResourceUsage, ctx timer.Reset(interval) } - stats := e.processStats.StatProcesses() + stats := e.processStats.StatProcesses(time.Now()) select { case <-ctx.Done(): diff --git a/drivers/shared/executor/executor_linux.go b/drivers/shared/executor/executor_linux.go index ed2fc6b44..215c5224c 100644 --- a/drivers/shared/executor/executor_linux.go +++ b/drivers/shared/executor/executor_linux.go @@ -445,7 +445,7 @@ func (l *LibcontainerExecutor) handleStats(ch chan *cstructs.TaskResourceUsage, stats := lstats.CgroupStats // get the map of process pids in this container - pstats := l.processStats.StatProcesses() + pstats := l.processStats.StatProcesses(ts) // Memory Related Stats swap := stats.MemoryStats.SwapUsage diff --git a/drivers/shared/executor/procstats/getstats.go b/drivers/shared/executor/procstats/getstats.go index 690942110..57181fde0 100644 --- a/drivers/shared/executor/procstats/getstats.go +++ b/drivers/shared/executor/procstats/getstats.go @@ -12,16 +12,14 @@ import ( "github.com/hashicorp/nomad/client/lib/cpustats" "github.com/hashicorp/nomad/plugins/drivers" "github.com/shirou/gopsutil/v3/process" - "oss.indeed.com/go/libtime" ) func New(compute cpustats.Compute, pl ProcessList) ProcessStats { const cacheTTL = 5 * time.Second - return &linuxProcStats{ + return &taskProcStats{ cacheTTL: cacheTTL, procList: pl, compute: compute, - clock: libtime.SystemClock(), latest: make(map[ProcessID]*stats), cache: make(ProcUsages), } @@ -33,10 +31,9 @@ type stats struct { SystemCPU *cpustats.Tracker } -type linuxProcStats struct { +type taskProcStats struct { cacheTTL time.Duration procList ProcessList - clock libtime.Clock compute cpustats.Compute lock sync.Mutex @@ -45,14 +42,13 @@ type linuxProcStats struct { at time.Time } -func (lps *linuxProcStats) expired() bool { - age := lps.clock.Since(lps.at) - return age > lps.cacheTTL +func (lps *taskProcStats) expired(t time.Time) bool { + return t.After(lps.at.Add(lps.cacheTTL)) } // scanPIDs will update lps.latest with the set of detected live pids that make // up the task process tree / are in the tasks cgroup -func (lps *linuxProcStats) scanPIDs() { +func (lps *taskProcStats) scanPIDs() { currentPIDs := lps.procList.ListProcesses() // remove old pids no longer present @@ -74,15 +70,11 @@ func (lps *linuxProcStats) scanPIDs() { } } -func (lps *linuxProcStats) cached() ProcUsages { - return lps.cache -} - -func (lps *linuxProcStats) StatProcesses() ProcUsages { +func (lps *taskProcStats) StatProcesses(now time.Time) ProcUsages { lps.lock.Lock() defer lps.lock.Unlock() - if !lps.expired() { + if !lps.expired(now) { return lps.cache } @@ -131,5 +123,6 @@ func (lps *linuxProcStats) StatProcesses() ProcUsages { } lps.cache = result + lps.at = now return result } diff --git a/drivers/shared/executor/procstats/getstats_test.go b/drivers/shared/executor/procstats/getstats_test.go new file mode 100644 index 000000000..900ac1296 --- /dev/null +++ b/drivers/shared/executor/procstats/getstats_test.go @@ -0,0 +1,45 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package procstats + +import ( + "testing" + "time" + + "github.com/hashicorp/go-set/v3" + "github.com/hashicorp/nomad/client/lib/cpustats" + "github.com/shoenig/test/must" +) + +type mockPL struct{} + +func (mockPL) ListProcesses() set.Collection[ProcessID] { return set.New[ProcessID](0) } + +func TestStatProcesses(t *testing.T) { + compute := cpustats.Compute{ + TotalCompute: 1000, + NumCores: 1, + } + pl := mockPL{} + + stats := &taskProcStats{ + cacheTTL: 10 * time.Second, + procList: pl, + compute: compute, + latest: make(map[ProcessID]*stats), + cache: make(ProcUsages), + } + + now := time.Now() + stats.StatProcesses(now) + cachedAt := stats.at + must.NotEq(t, time.Time{}, cachedAt) + + stats.StatProcesses(now) + must.Eq(t, cachedAt, stats.at, must.Sprint("cache should not have been updated")) + + later := now.Add(30 * time.Second) + stats.StatProcesses(later) + must.Eq(t, later, stats.at, must.Sprint("cache should have been updated")) +} diff --git a/drivers/shared/executor/procstats/list_test.go b/drivers/shared/executor/procstats/list_test.go index 4e34e6e3a..17fe2b6c0 100644 --- a/drivers/shared/executor/procstats/list_test.go +++ b/drivers/shared/executor/procstats/list_test.go @@ -38,14 +38,14 @@ func genMockProcs(needles, haystack int) ([]ps.Process, []ProcessID) { expect := []ProcessID{42} // TODO: make this into a tree structure, not just a linear tree - for i := 0; i < needles; i++ { + for i := range needles { parent := 42 + i pid := parent + 1 procs = append(procs, mockProc(pid, parent)) expect = append(expect, pid) } - for i := 0; i < haystack; i++ { + for i := range haystack { parent := 200 + i pid := parent + 1 procs = append(procs, mockProc(pid, parent)) diff --git a/drivers/shared/executor/procstats/list_windows.go b/drivers/shared/executor/procstats/list_windows.go index 721aabbc7..66d0ddeb8 100644 --- a/drivers/shared/executor/procstats/list_windows.go +++ b/drivers/shared/executor/procstats/list_windows.go @@ -12,19 +12,6 @@ import ( // ListByPid will scan the process table and return a set of the process family // tree starting with executorPID as the root. -// -// The implementation here specifically avoids using more than one system -// call. Unlike on Linux where we just read a cgroup, on Windows we must build -// the tree manually. We do so knowing only the child->parent relationships. -// -// So this turns into a fun leet code problem, where we invert the tree using -// only a bucket of edges pointing in the wrong direction. Basically we just -// iterate every process, recursively follow its parent, and determine whether -// executorPID is an ancestor. -// -// See https://github.com/hashicorp/nomad/issues/20042 as an example of what -// happens when you use syscalls to work your way from the root down to its -// descendants. func ListByPid(executorPID int) set.Collection[ProcessID] { procs := list(executorPID, ps.Processes) return procs diff --git a/drivers/shared/executor/procstats/procstats.go b/drivers/shared/executor/procstats/procstats.go index a70f7d117..8bf8d469d 100644 --- a/drivers/shared/executor/procstats/procstats.go +++ b/drivers/shared/executor/procstats/procstats.go @@ -31,7 +31,7 @@ type ProcUsages map[string]*drivers.ResourceUsage // for gathering CPU and memory process stats for all processes associated with // a task. type ProcessStats interface { - StatProcesses() ProcUsages + StatProcesses(time.Time) ProcUsages } // A ProcessList is anything (i.e. a task driver) that implements ListProcesses @@ -82,6 +82,22 @@ func Aggregate(systemStats *cpustats.Tracker, procStats ProcUsages) *drivers.Tas } } +// list will scan the process table and return a set of the process family tree +// starting with executorPID as the root. This is only ever used on Windows, but +// lives in the shared code so we can run its tests even on Linux. +// +// The implementation here specifically avoids using more than one system +// call. Unlike on Linux where we just read a cgroup, on Windows we must build +// the tree manually. We do so knowing only the child->parent relationships. +// +// So this turns into a fun leet code problem, where we invert the tree using +// only a bucket of edges pointing in the wrong direction. Basically we just +// iterate every process, recursively follow its parent, and determine whether +// executorPID is an ancestor. +// +// See https://github.com/hashicorp/nomad/issues/20042 as an example of what +// happens when you use syscalls to work your way from the root down to its +// descendants. func list(executorPID int, processes func() ([]ps.Process, error)) set.Collection[ProcessID] { processFamily := set.From([]ProcessID{executorPID})