agent: allow configuration of in-memory telemetry sink. (#20166)

This change adds configuration options for setting the in-memory
telemetry sink collection and retention durations. This sink backs
the metrics JSON API and previously had hard-coded default values.

The new options are particularly useful when running development or
debug environments, where metrics collection is desired at a fast
and granular rate.
This commit is contained in:
James Rasell
2024-03-25 16:00:18 +01:00
committed by GitHub
parent 02d98b9357
commit facc3e8013
9 changed files with 193 additions and 34 deletions

View File

@@ -352,6 +352,11 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool {
return false
}
if err := config.Telemetry.Validate(); err != nil {
c.Ui.Error(fmt.Sprintf("telemetry block invalid: %v", err))
return false
}
// Set up the TLS configuration properly if we have one.
// XXX chelseakomlo: set up a TLSConfig New method which would wrap
// constructor-type actions like this.
@@ -1155,14 +1160,8 @@ func (c *Command) handleReload() {
}
}
// setupTelemetry is used ot setup the telemetry sub-systems
// setupTelemetry is used to set up the telemetry sub-systems.
func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
/* Setup telemetry
Aggregate on 10 second intervals for 1 minute. Expose the
metrics over stderr when there is a SIGUSR1 received.
*/
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
metrics.DefaultInmemSignal(inm)
var telConfig *Telemetry
if config.Telemetry == nil {
@@ -1171,6 +1170,9 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
telConfig = config.Telemetry
}
inm := metrics.NewInmemSink(telConfig.inMemoryCollectionInterval, telConfig.inMemoryRetentionPeriod)
metrics.DefaultInmemSignal(inm)
metricsConf := metrics.DefaultConfig("nomad")
metricsConf.EnableHostname = !telConfig.DisableHostname

View File

@@ -942,6 +942,20 @@ func (s *ServerConfig) EncryptBytes() ([]byte, error) {
// Telemetry is the telemetry configuration for the server
type Telemetry struct {
// InMemoryCollectionInterval configures the in-memory sink collection
// interval. This sink is always configured and backs the JSON metrics API
// endpoint. This option is particularly useful for debugging or
// development.
InMemoryCollectionInterval string `hcl:"in_memory_collection_interval"`
inMemoryCollectionInterval time.Duration `hcl:"-"`
// InMemoryRetentionPeriod configures the in-memory sink retention period
// This sink is always configured and backs the JSON metrics API endpoint.
// This option is particularly useful for debugging or development.
InMemoryRetentionPeriod string `hcl:"in_memory_retention_period"`
inMemoryRetentionPeriod time.Duration `hcl:"-"`
StatsiteAddr string `hcl:"statsite_address"`
StatsdAddr string `hcl:"statsd_address"`
DataDogAddr string `hcl:"datadog_address"`
@@ -1062,8 +1076,8 @@ func (t *Telemetry) Copy() *Telemetry {
}
// PrefixFilters parses the PrefixFilter field and returns a list of allowed and blocked filters
func (a *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
for _, rule := range a.PrefixFilter {
func (t *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
for _, rule := range t.PrefixFilter {
if rule == "" {
continue
}
@@ -1079,6 +1093,30 @@ func (a *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
return allowed, blocked, nil
}
// Validate the telemetry configuration options. These are used by the agent,
// regardless of mode, so can live here rather than a structs package. It is
// safe to call, without checking whether the config object is nil first.
func (t *Telemetry) Validate() error {
if t == nil {
return nil
}
// Ensure we have durations that are greater than zero.
if t.inMemoryCollectionInterval <= 0 {
return errors.New("telemetry in-memory collection interval must be greater than zero")
}
if t.inMemoryRetentionPeriod <= 0 {
return errors.New("telemetry in-memory retention period must be greater than zero")
}
// Ensure the in-memory durations do not conflict.
if t.inMemoryCollectionInterval > t.inMemoryRetentionPeriod {
return errors.New("telemetry in-memory collection interval cannot be greater than retention period")
}
return nil
}
// Ports encapsulates the various ports we bind to for network services. If any
// are not specified then the defaults are used instead.
type Ports struct {
@@ -1391,8 +1429,12 @@ func DefaultConfig() *Config {
},
SyslogFacility: "LOCAL0",
Telemetry: &Telemetry{
CollectionInterval: "1s",
collectionInterval: 1 * time.Second,
InMemoryCollectionInterval: "10s",
inMemoryCollectionInterval: 10 * time.Second,
InMemoryRetentionPeriod: "1m",
inMemoryRetentionPeriod: 1 * time.Minute,
CollectionInterval: "1s",
collectionInterval: 1 * time.Second,
},
TLSConfig: &config.TLSConfig{},
Sentinel: &config.SentinelConfig{},
@@ -2371,9 +2413,21 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
}
// Merge is used to merge two telemetry configs together
func (a *Telemetry) Merge(b *Telemetry) *Telemetry {
result := *a
func (t *Telemetry) Merge(b *Telemetry) *Telemetry {
result := *t
if b.InMemoryCollectionInterval != "" {
result.InMemoryCollectionInterval = b.InMemoryCollectionInterval
}
if b.inMemoryCollectionInterval != 0 {
result.inMemoryCollectionInterval = b.inMemoryCollectionInterval
}
if b.InMemoryRetentionPeriod != "" {
result.InMemoryRetentionPeriod = b.InMemoryRetentionPeriod
}
if b.inMemoryRetentionPeriod != 0 {
result.inMemoryRetentionPeriod = b.inMemoryRetentionPeriod
}
if b.StatsiteAddr != "" {
result.StatsiteAddr = b.StatsiteAddr
}

View File

@@ -109,6 +109,8 @@ func ParseConfigFile(path string) (*Config, error) {
{"server.server_join.retry_interval", &c.Server.ServerJoin.RetryInterval, &c.Server.ServerJoin.RetryIntervalHCL, nil},
{"autopilot.server_stabilization_time", &c.Autopilot.ServerStabilizationTime, &c.Autopilot.ServerStabilizationTimeHCL, nil},
{"autopilot.last_contact_threshold", &c.Autopilot.LastContactThreshold, &c.Autopilot.LastContactThresholdHCL, nil},
{"telemetry.in_memory_collection_interval", &c.Telemetry.inMemoryCollectionInterval, &c.Telemetry.InMemoryCollectionInterval, nil},
{"telemetry.in_memory_retention_period", &c.Telemetry.inMemoryRetentionPeriod, &c.Telemetry.InMemoryRetentionPeriod, nil},
{"telemetry.collection_interval", &c.Telemetry.collectionInterval, &c.Telemetry.CollectionInterval, nil},
{"client.template.block_query_wait", nil, &c.Client.TemplateConfig.BlockQueryWaitTimeHCL,
func(d *time.Duration) {

View File

@@ -197,15 +197,19 @@ var basicConfig = &Config{
},
},
Telemetry: &Telemetry{
StatsiteAddr: "127.0.0.1:1234",
StatsdAddr: "127.0.0.1:2345",
PrometheusMetrics: true,
DisableHostname: true,
UseNodeName: false,
CollectionInterval: "3s",
collectionInterval: 3 * time.Second,
PublishAllocationMetrics: true,
PublishNodeMetrics: true,
StatsiteAddr: "127.0.0.1:1234",
StatsdAddr: "127.0.0.1:2345",
PrometheusMetrics: true,
DisableHostname: true,
UseNodeName: false,
InMemoryCollectionInterval: "1m",
inMemoryCollectionInterval: 1 * time.Minute,
InMemoryRetentionPeriod: "24h",
inMemoryRetentionPeriod: 24 * time.Hour,
CollectionInterval: "3s",
collectionInterval: 3 * time.Second,
PublishAllocationMetrics: true,
PublishNodeMetrics: true,
},
LeaveOnInt: true,
LeaveOnTerm: true,
@@ -1080,3 +1084,23 @@ func TestConfig_MultipleConsul(t *testing.T) {
})
}
}
func TestConfig_Telemetry(t *testing.T) {
ci.Parallel(t)
// Ensure merging a mostly empty struct correctly inherits default values
// set.
inputTelemetry1 := &Telemetry{PrometheusMetrics: true}
mergedTelemetry1 := DefaultConfig().Telemetry.Merge(inputTelemetry1)
must.Eq(t, mergedTelemetry1.inMemoryCollectionInterval, 10*time.Second)
must.Eq(t, mergedTelemetry1.inMemoryRetentionPeriod, 1*time.Minute)
// Ensure we can then overlay user specified data.
inputTelemetry2 := &Telemetry{
inMemoryCollectionInterval: 1 * time.Second,
inMemoryRetentionPeriod: 10 * time.Second,
}
mergedTelemetry2 := mergedTelemetry1.Merge(inputTelemetry2)
must.Eq(t, mergedTelemetry2.inMemoryCollectionInterval, 1*time.Second)
must.Eq(t, mergedTelemetry2.inMemoryRetentionPeriod, 10*time.Second)
}

View File

@@ -4,6 +4,7 @@
package agent
import (
"errors"
"fmt"
"net"
"os"
@@ -1385,6 +1386,63 @@ func TestTelemetry_PrefixFilters(t *testing.T) {
}
}
func TestTelemetry_Validate(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
name string
inputTelemetry *Telemetry
expectedError error
}{
{
name: "nil",
inputTelemetry: nil,
expectedError: nil,
},
{
name: "invalid",
inputTelemetry: &Telemetry{
inMemoryCollectionInterval: 10 * time.Second,
inMemoryRetentionPeriod: 1 * time.Second,
},
expectedError: errors.New("telemetry in-memory collection interval cannot be greater than retention period"),
},
{
name: "valid",
inputTelemetry: &Telemetry{
inMemoryCollectionInterval: 1 * time.Second,
inMemoryRetentionPeriod: 10 * time.Second,
},
expectedError: nil,
},
{
name: "missing in-memory interval",
inputTelemetry: &Telemetry{
inMemoryRetentionPeriod: 10 * time.Second,
},
expectedError: errors.New("telemetry in-memory collection interval must be greater than zero"),
},
{
name: "missing in-memory collection",
inputTelemetry: &Telemetry{
inMemoryCollectionInterval: 10 * time.Second,
},
expectedError: errors.New("telemetry in-memory retention period must be greater than zero"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualError := tc.inputTelemetry.Validate()
if tc.expectedError != nil {
must.EqError(t, actualError, tc.expectedError.Error())
} else {
must.NoError(t, actualError)
}
})
}
}
func TestTelemetry_Parse(t *testing.T) {
ci.Parallel(t)

View File

@@ -200,13 +200,15 @@ audit {
}
telemetry {
statsite_address = "127.0.0.1:1234"
statsd_address = "127.0.0.1:2345"
prometheus_metrics = true
disable_hostname = true
collection_interval = "3s"
publish_allocation_metrics = true
publish_node_metrics = true
in_memory_collection_interval = "1m"
in_memory_retention_period = "24h"
statsite_address = "127.0.0.1:1234"
statsd_address = "127.0.0.1:2345"
prometheus_metrics = true
disable_hostname = true
collection_interval = "3s"
publish_allocation_metrics = true
publish_node_metrics = true
}
leave_on_interrupt = true

View File

@@ -170,7 +170,6 @@
"server_service_name": "nomad",
"service_auth_method": "nomad-services",
"task_auth_method": "nomad-tasks",
"service_identity": {
"aud": [
"consul.io",
@@ -361,6 +360,8 @@
"syslog_facility": "LOCAL1",
"telemetry": [
{
"in_memory_collection_interval": "1m",
"in_memory_retention_period": "24h",
"collection_interval": "3s",
"disable_hostname": true,
"prometheus_metrics": true,
@@ -394,7 +395,10 @@
"cert_file": "/path/to/cert/file",
"create_from_role": "test_role",
"default_identity": {
"aud": ["vault.io", "nomad.io"],
"aud": [
"vault.io",
"nomad.io"
],
"env": false,
"file": true,
"ttl": "3h"
@@ -408,9 +412,9 @@
"token": "12345"
}
],
"reporting":{
"license":{
"enabled":"true"
"reporting": {
"license": {
"enabled": "true"
}
}
}