mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
telemetry: fix excessive CPU consumption in executor (#25870)
Collecting metrics from processes is expensive, especially on platforms like Windows. The executor code has a 5s cache of stats to ensure that we don't thrash syscalls on nodes running many allocations. But the timestamp used to calculate TTL of this cache was never being set, so we were always treating it as expired. This causes excess CPU utilization on client nodes. Ensure that when we fill the cache, we set the timestamp. In testing on Windows, this reduces exector CPU overhead by roughly 75%. This changeset includes two other related items: * The `telemetry.publish_allocation_metrics` field correctly prevents a node from publishing metrics, but the stats hook on the taskrunner still collects the metrics, which can be expensive. Thread the configuration value into the stats hook so that we don't collect if `telemetry.publish_allocation_metrics = false`. * The `linuxProcStats` type in the executor's `procstats` package is misnamed as a result of a couple rounds of refactoring. It's used by all task executors, not just Linux. Rename this and move a comment about how Windows processes are listed so that the comment is closer to where the logic is implemented. Fixes: https://github.com/hashicorp/nomad/issues/23323 Fixes: https://hashicorp.atlassian.net/browse/NMD-455
This commit is contained in:
7
.changelog/25870.txt
Normal file
7
.changelog/25870.txt
Normal file
@@ -0,0 +1,7 @@
|
||||
```release-note:bug
|
||||
telemetry: Fix excess CPU consumption from alloc stats collection
|
||||
```
|
||||
|
||||
```release-note:bug
|
||||
telemetry: Fixed a bug where alloc stats were still collected (but not published) if telemetry.publish_allocation_metrics=false.
|
||||
```
|
||||
@@ -30,15 +30,20 @@ type statsHook struct {
|
||||
// cancel is called by Exited
|
||||
cancel context.CancelFunc
|
||||
|
||||
// stats collection is expensive, so if we're not going to publish the stats
|
||||
// there's no reason to collect them
|
||||
doPublish bool
|
||||
|
||||
mu sync.Mutex
|
||||
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
func newStatsHook(su StatsUpdater, interval time.Duration, logger hclog.Logger) *statsHook {
|
||||
func newStatsHook(su StatsUpdater, interval time.Duration, doPublish bool, logger hclog.Logger) *statsHook {
|
||||
h := &statsHook{
|
||||
updater: su,
|
||||
interval: interval,
|
||||
updater: su,
|
||||
interval: interval,
|
||||
doPublish: doPublish,
|
||||
}
|
||||
h.logger = logger.Named(h.Name())
|
||||
return h
|
||||
@@ -51,6 +56,10 @@ func (*statsHook) Name() string {
|
||||
func (h *statsHook) Poststart(_ context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if !h.doPublish {
|
||||
h.logger.Trace("not collecting task stats")
|
||||
return nil
|
||||
}
|
||||
|
||||
// This shouldn't happen, but better safe than risk leaking a goroutine
|
||||
if h.cancel != nil {
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
// Statically assert the stats hook implements the expected interfaces
|
||||
@@ -88,7 +88,6 @@ func (m *mockDriverStats) Called() int {
|
||||
func TestTaskRunner_StatsHook_PoststartExited(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
require := require.New(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
su := newMockStatsUpdater()
|
||||
ds := new(mockDriverStats)
|
||||
@@ -96,23 +95,23 @@ func TestTaskRunner_StatsHook_PoststartExited(t *testing.T) {
|
||||
poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds}
|
||||
|
||||
// Create hook
|
||||
h := newStatsHook(su, time.Minute, logger)
|
||||
h := newStatsHook(su, time.Minute, true, logger)
|
||||
|
||||
// Always call Exited to cleanup goroutines
|
||||
defer h.Exited(context.Background(), nil, nil)
|
||||
|
||||
// Run prestart
|
||||
require.NoError(h.Poststart(context.Background(), poststartReq, nil))
|
||||
must.NoError(t, h.Poststart(context.Background(), poststartReq, nil))
|
||||
|
||||
// An initial stats collection should run and call the updater
|
||||
select {
|
||||
case ru := <-su.Ch:
|
||||
require.Equal(uint64(1), ru.ResourceUsage.MemoryStats.RSS)
|
||||
must.Eq(t, uint64(1), ru.ResourceUsage.MemoryStats.RSS)
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatalf("timeout waiting for initial stats collection")
|
||||
}
|
||||
|
||||
require.NoError(h.Exited(context.Background(), nil, nil))
|
||||
must.NoError(t, h.Exited(context.Background(), nil, nil))
|
||||
}
|
||||
|
||||
// TestTaskRunner_StatsHook_Periodic asserts the stats hook collects stats on
|
||||
@@ -120,7 +119,6 @@ func TestTaskRunner_StatsHook_PoststartExited(t *testing.T) {
|
||||
func TestTaskRunner_StatsHook_Periodic(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
require := require.New(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
su := newMockStatsUpdater()
|
||||
|
||||
@@ -131,11 +129,11 @@ func TestTaskRunner_StatsHook_Periodic(t *testing.T) {
|
||||
// Exited() can complete within the interval.
|
||||
const interval = 500 * time.Millisecond
|
||||
|
||||
h := newStatsHook(su, interval, logger)
|
||||
h := newStatsHook(su, interval, true, logger)
|
||||
defer h.Exited(context.Background(), nil, nil)
|
||||
|
||||
// Run prestart
|
||||
require.NoError(h.Poststart(context.Background(), poststartReq, nil))
|
||||
must.NoError(t, h.Poststart(context.Background(), poststartReq, nil))
|
||||
|
||||
// An initial stats collection should run and call the updater
|
||||
var firstrun int64
|
||||
@@ -160,7 +158,7 @@ func TestTaskRunner_StatsHook_Periodic(t *testing.T) {
|
||||
}
|
||||
|
||||
// Exiting should prevent further updates
|
||||
require.NoError(h.Exited(context.Background(), nil, nil))
|
||||
must.NoError(t, h.Exited(context.Background(), nil, nil))
|
||||
|
||||
// Should *not* get another update in ~500ms (see interval above)
|
||||
// we may get a single update due to race with exit
|
||||
@@ -185,7 +183,6 @@ WAITING:
|
||||
func TestTaskRunner_StatsHook_NotImplemented(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
require := require.New(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
su := newMockStatsUpdater()
|
||||
ds := &mockDriverStats{
|
||||
@@ -194,11 +191,11 @@ func TestTaskRunner_StatsHook_NotImplemented(t *testing.T) {
|
||||
|
||||
poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds}
|
||||
|
||||
h := newStatsHook(su, 1, logger)
|
||||
h := newStatsHook(su, 1, true, logger)
|
||||
defer h.Exited(context.Background(), nil, nil)
|
||||
|
||||
// Run prestart
|
||||
require.NoError(h.Poststart(context.Background(), poststartReq, nil))
|
||||
must.NoError(t, h.Poststart(context.Background(), poststartReq, nil))
|
||||
|
||||
// An initial stats collection should run and *not* call the updater
|
||||
select {
|
||||
@@ -220,11 +217,11 @@ func TestTaskRunner_StatsHook_Backoff(t *testing.T) {
|
||||
|
||||
poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds}
|
||||
|
||||
h := newStatsHook(su, time.Minute, logger)
|
||||
h := newStatsHook(su, time.Minute, true, logger)
|
||||
defer h.Exited(context.Background(), nil, nil)
|
||||
|
||||
// Run prestart
|
||||
require.NoError(t, h.Poststart(context.Background(), poststartReq, nil))
|
||||
must.NoError(t, h.Poststart(context.Background(), poststartReq, nil))
|
||||
|
||||
timeout := time.After(500 * time.Millisecond)
|
||||
|
||||
@@ -237,5 +234,5 @@ DRAIN:
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, ds.Called(), 1)
|
||||
must.Eq(t, ds.Called(), 1)
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ func (tr *TaskRunner) initHooks() {
|
||||
newDispatchHook(alloc, hookLogger),
|
||||
newVolumeHook(tr, hookLogger),
|
||||
newArtifactHook(tr, tr.getter, hookLogger),
|
||||
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
|
||||
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, tr.clientConfig.PublishAllocationMetrics, hookLogger),
|
||||
newDeviceHook(tr.devicemanager, hookLogger),
|
||||
newAPIHook(tr.shutdownCtx, tr.clientConfig.APIListenerRegistrar, hookLogger),
|
||||
newWranglerHook(tr.wranglers, task.Name, alloc.ID, task.UsesCores(), hookLogger),
|
||||
|
||||
@@ -717,7 +717,7 @@ func (e *UniversalExecutor) handleStats(ch chan *cstructs.TaskResourceUsage, ctx
|
||||
timer.Reset(interval)
|
||||
}
|
||||
|
||||
stats := e.processStats.StatProcesses()
|
||||
stats := e.processStats.StatProcesses(time.Now())
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@@ -445,7 +445,7 @@ func (l *LibcontainerExecutor) handleStats(ch chan *cstructs.TaskResourceUsage,
|
||||
stats := lstats.CgroupStats
|
||||
|
||||
// get the map of process pids in this container
|
||||
pstats := l.processStats.StatProcesses()
|
||||
pstats := l.processStats.StatProcesses(ts)
|
||||
|
||||
// Memory Related Stats
|
||||
swap := stats.MemoryStats.SwapUsage
|
||||
|
||||
@@ -12,16 +12,14 @@ import (
|
||||
"github.com/hashicorp/nomad/client/lib/cpustats"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/shirou/gopsutil/v3/process"
|
||||
"oss.indeed.com/go/libtime"
|
||||
)
|
||||
|
||||
func New(compute cpustats.Compute, pl ProcessList) ProcessStats {
|
||||
const cacheTTL = 5 * time.Second
|
||||
return &linuxProcStats{
|
||||
return &taskProcStats{
|
||||
cacheTTL: cacheTTL,
|
||||
procList: pl,
|
||||
compute: compute,
|
||||
clock: libtime.SystemClock(),
|
||||
latest: make(map[ProcessID]*stats),
|
||||
cache: make(ProcUsages),
|
||||
}
|
||||
@@ -33,10 +31,9 @@ type stats struct {
|
||||
SystemCPU *cpustats.Tracker
|
||||
}
|
||||
|
||||
type linuxProcStats struct {
|
||||
type taskProcStats struct {
|
||||
cacheTTL time.Duration
|
||||
procList ProcessList
|
||||
clock libtime.Clock
|
||||
compute cpustats.Compute
|
||||
|
||||
lock sync.Mutex
|
||||
@@ -45,14 +42,13 @@ type linuxProcStats struct {
|
||||
at time.Time
|
||||
}
|
||||
|
||||
func (lps *linuxProcStats) expired() bool {
|
||||
age := lps.clock.Since(lps.at)
|
||||
return age > lps.cacheTTL
|
||||
func (lps *taskProcStats) expired(t time.Time) bool {
|
||||
return t.After(lps.at.Add(lps.cacheTTL))
|
||||
}
|
||||
|
||||
// scanPIDs will update lps.latest with the set of detected live pids that make
|
||||
// up the task process tree / are in the tasks cgroup
|
||||
func (lps *linuxProcStats) scanPIDs() {
|
||||
func (lps *taskProcStats) scanPIDs() {
|
||||
currentPIDs := lps.procList.ListProcesses()
|
||||
|
||||
// remove old pids no longer present
|
||||
@@ -74,15 +70,11 @@ func (lps *linuxProcStats) scanPIDs() {
|
||||
}
|
||||
}
|
||||
|
||||
func (lps *linuxProcStats) cached() ProcUsages {
|
||||
return lps.cache
|
||||
}
|
||||
|
||||
func (lps *linuxProcStats) StatProcesses() ProcUsages {
|
||||
func (lps *taskProcStats) StatProcesses(now time.Time) ProcUsages {
|
||||
lps.lock.Lock()
|
||||
defer lps.lock.Unlock()
|
||||
|
||||
if !lps.expired() {
|
||||
if !lps.expired(now) {
|
||||
return lps.cache
|
||||
}
|
||||
|
||||
@@ -131,5 +123,6 @@ func (lps *linuxProcStats) StatProcesses() ProcUsages {
|
||||
}
|
||||
|
||||
lps.cache = result
|
||||
lps.at = now
|
||||
return result
|
||||
}
|
||||
|
||||
45
drivers/shared/executor/procstats/getstats_test.go
Normal file
45
drivers/shared/executor/procstats/getstats_test.go
Normal file
@@ -0,0 +1,45 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package procstats
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-set/v3"
|
||||
"github.com/hashicorp/nomad/client/lib/cpustats"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
type mockPL struct{}
|
||||
|
||||
func (mockPL) ListProcesses() set.Collection[ProcessID] { return set.New[ProcessID](0) }
|
||||
|
||||
func TestStatProcesses(t *testing.T) {
|
||||
compute := cpustats.Compute{
|
||||
TotalCompute: 1000,
|
||||
NumCores: 1,
|
||||
}
|
||||
pl := mockPL{}
|
||||
|
||||
stats := &taskProcStats{
|
||||
cacheTTL: 10 * time.Second,
|
||||
procList: pl,
|
||||
compute: compute,
|
||||
latest: make(map[ProcessID]*stats),
|
||||
cache: make(ProcUsages),
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
stats.StatProcesses(now)
|
||||
cachedAt := stats.at
|
||||
must.NotEq(t, time.Time{}, cachedAt)
|
||||
|
||||
stats.StatProcesses(now)
|
||||
must.Eq(t, cachedAt, stats.at, must.Sprint("cache should not have been updated"))
|
||||
|
||||
later := now.Add(30 * time.Second)
|
||||
stats.StatProcesses(later)
|
||||
must.Eq(t, later, stats.at, must.Sprint("cache should have been updated"))
|
||||
}
|
||||
@@ -38,14 +38,14 @@ func genMockProcs(needles, haystack int) ([]ps.Process, []ProcessID) {
|
||||
expect := []ProcessID{42}
|
||||
|
||||
// TODO: make this into a tree structure, not just a linear tree
|
||||
for i := 0; i < needles; i++ {
|
||||
for i := range needles {
|
||||
parent := 42 + i
|
||||
pid := parent + 1
|
||||
procs = append(procs, mockProc(pid, parent))
|
||||
expect = append(expect, pid)
|
||||
}
|
||||
|
||||
for i := 0; i < haystack; i++ {
|
||||
for i := range haystack {
|
||||
parent := 200 + i
|
||||
pid := parent + 1
|
||||
procs = append(procs, mockProc(pid, parent))
|
||||
|
||||
@@ -12,19 +12,6 @@ import (
|
||||
|
||||
// ListByPid will scan the process table and return a set of the process family
|
||||
// tree starting with executorPID as the root.
|
||||
//
|
||||
// The implementation here specifically avoids using more than one system
|
||||
// call. Unlike on Linux where we just read a cgroup, on Windows we must build
|
||||
// the tree manually. We do so knowing only the child->parent relationships.
|
||||
//
|
||||
// So this turns into a fun leet code problem, where we invert the tree using
|
||||
// only a bucket of edges pointing in the wrong direction. Basically we just
|
||||
// iterate every process, recursively follow its parent, and determine whether
|
||||
// executorPID is an ancestor.
|
||||
//
|
||||
// See https://github.com/hashicorp/nomad/issues/20042 as an example of what
|
||||
// happens when you use syscalls to work your way from the root down to its
|
||||
// descendants.
|
||||
func ListByPid(executorPID int) set.Collection[ProcessID] {
|
||||
procs := list(executorPID, ps.Processes)
|
||||
return procs
|
||||
|
||||
@@ -31,7 +31,7 @@ type ProcUsages map[string]*drivers.ResourceUsage
|
||||
// for gathering CPU and memory process stats for all processes associated with
|
||||
// a task.
|
||||
type ProcessStats interface {
|
||||
StatProcesses() ProcUsages
|
||||
StatProcesses(time.Time) ProcUsages
|
||||
}
|
||||
|
||||
// A ProcessList is anything (i.e. a task driver) that implements ListProcesses
|
||||
@@ -82,6 +82,22 @@ func Aggregate(systemStats *cpustats.Tracker, procStats ProcUsages) *drivers.Tas
|
||||
}
|
||||
}
|
||||
|
||||
// list will scan the process table and return a set of the process family tree
|
||||
// starting with executorPID as the root. This is only ever used on Windows, but
|
||||
// lives in the shared code so we can run its tests even on Linux.
|
||||
//
|
||||
// The implementation here specifically avoids using more than one system
|
||||
// call. Unlike on Linux where we just read a cgroup, on Windows we must build
|
||||
// the tree manually. We do so knowing only the child->parent relationships.
|
||||
//
|
||||
// So this turns into a fun leet code problem, where we invert the tree using
|
||||
// only a bucket of edges pointing in the wrong direction. Basically we just
|
||||
// iterate every process, recursively follow its parent, and determine whether
|
||||
// executorPID is an ancestor.
|
||||
//
|
||||
// See https://github.com/hashicorp/nomad/issues/20042 as an example of what
|
||||
// happens when you use syscalls to work your way from the root down to its
|
||||
// descendants.
|
||||
func list(executorPID int, processes func() ([]ps.Process, error)) set.Collection[ProcessID] {
|
||||
processFamily := set.From([]ProcessID{executorPID})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user