mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
Merge pull request #4906 from hashicorp/f-metric-prefix-master
Port metric prefix filtering to master
This commit is contained in:
@@ -325,6 +325,7 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
|
||||
// Setup telemetry related config
|
||||
conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval
|
||||
conf.DisableTaggedMetrics = agentConfig.Telemetry.DisableTaggedMetrics
|
||||
conf.DisableDispatchedJobSummaryMetrics = agentConfig.Telemetry.DisableDispatchedJobSummaryMetrics
|
||||
conf.BackwardsCompatibleMetrics = agentConfig.Telemetry.BackwardsCompatibleMetrics
|
||||
|
||||
return conf, nil
|
||||
|
||||
@@ -880,6 +880,18 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
|
||||
metricsConf.EnableHostname = true
|
||||
}
|
||||
|
||||
allowedPrefixes, blockedPrefixes, err := telConfig.PrefixFilters()
|
||||
if err != nil {
|
||||
return inm, err
|
||||
}
|
||||
|
||||
metricsConf.AllowedPrefixes = allowedPrefixes
|
||||
metricsConf.BlockedPrefixes = blockedPrefixes
|
||||
|
||||
if telConfig.FilterDefault != nil {
|
||||
metricsConf.FilterDefault = *telConfig.FilterDefault
|
||||
}
|
||||
|
||||
// Configure the statsite sink
|
||||
var fanout metrics.FanoutSink
|
||||
if telConfig.StatsiteAddr != "" {
|
||||
@@ -963,6 +975,7 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
|
||||
metricsConf.EnableHostname = false
|
||||
metrics.NewGlobal(metricsConf, inm)
|
||||
}
|
||||
|
||||
return inm, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -441,6 +441,19 @@ type Telemetry struct {
|
||||
// key/value structure as done in older versions of Nomad
|
||||
BackwardsCompatibleMetrics bool `mapstructure:"backwards_compatible_metrics"`
|
||||
|
||||
// PrefixFilter allows for filtering out metrics from being collected
|
||||
PrefixFilter []string `mapstructure:"prefix_filter"`
|
||||
|
||||
// FilterDefault controls whether to allow metrics that have not been specified
|
||||
// by the filter
|
||||
FilterDefault *bool `mapstructure:"filter_default"`
|
||||
|
||||
// DisableDispatchedJobSummaryMetrics allows ignoring dispatched jobs when
|
||||
// publishing Job summary metrics. This is useful in environments that produce
|
||||
// high numbers of single count dispatch jobs as the metrics for each take up
|
||||
// a small memory overhead.
|
||||
DisableDispatchedJobSummaryMetrics bool `mapstructure:"disable_dispatched_job_summary_metrics"`
|
||||
|
||||
// Circonus: see https://github.com/circonus-labs/circonus-gometrics
|
||||
// for more details on the various configuration options.
|
||||
// Valid configuration combinations:
|
||||
@@ -513,6 +526,24 @@ type Telemetry struct {
|
||||
CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"`
|
||||
}
|
||||
|
||||
// PrefixFilters parses the PrefixFilter field and returns a list of allowed and blocked filters
|
||||
func (t *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
|
||||
for _, rule := range t.PrefixFilter {
|
||||
if rule == "" {
|
||||
continue
|
||||
}
|
||||
switch rule[0] {
|
||||
case '+':
|
||||
allowed = append(allowed, rule[1:])
|
||||
case '-':
|
||||
blocked = append(blocked, rule[1:])
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("Filter rule must begin with either '+' or '-': %q", rule)
|
||||
}
|
||||
}
|
||||
return allowed, blocked, nil
|
||||
}
|
||||
|
||||
// Ports encapsulates the various ports we bind to for network services. If any
|
||||
// are not specified then the defaults are used instead.
|
||||
type Ports struct {
|
||||
@@ -1289,6 +1320,18 @@ func (a *Telemetry) Merge(b *Telemetry) *Telemetry {
|
||||
result.BackwardsCompatibleMetrics = b.BackwardsCompatibleMetrics
|
||||
}
|
||||
|
||||
if b.PrefixFilter != nil {
|
||||
result.PrefixFilter = b.PrefixFilter
|
||||
}
|
||||
|
||||
if b.FilterDefault != nil {
|
||||
result.FilterDefault = b.FilterDefault
|
||||
}
|
||||
|
||||
if b.DisableDispatchedJobSummaryMetrics {
|
||||
result.DisableDispatchedJobSummaryMetrics = b.DisableDispatchedJobSummaryMetrics
|
||||
}
|
||||
|
||||
return &result
|
||||
}
|
||||
|
||||
|
||||
@@ -734,6 +734,9 @@ func parseTelemetry(result **Telemetry, list *ast.ObjectList) error {
|
||||
"circonus_broker_select_tag",
|
||||
"disable_tagged_metrics",
|
||||
"backwards_compatible_metrics",
|
||||
"prefix_filter",
|
||||
"filter_default",
|
||||
"disable_dispatched_job_summary_metrics",
|
||||
}
|
||||
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
|
||||
return err
|
||||
|
||||
@@ -77,6 +77,7 @@ func TestConfig_Merge(t *testing.T) {
|
||||
CirconusCheckTags: "cat1:tag1,cat2:tag2",
|
||||
CirconusBrokerID: "0",
|
||||
CirconusBrokerSelectTag: "dc:dc1",
|
||||
PrefixFilter: []string{"filter1", "filter2"},
|
||||
},
|
||||
Client: &ClientConfig{
|
||||
Enabled: false,
|
||||
@@ -225,6 +226,9 @@ func TestConfig_Merge(t *testing.T) {
|
||||
CirconusCheckTags: "cat1:tag1,cat2:tag2",
|
||||
CirconusBrokerID: "1",
|
||||
CirconusBrokerSelectTag: "dc:dc2",
|
||||
PrefixFilter: []string{"prefix1", "prefix2"},
|
||||
DisableDispatchedJobSummaryMetrics: true,
|
||||
FilterDefault: helper.BoolToPtr(false),
|
||||
},
|
||||
Client: &ClientConfig{
|
||||
Enabled: true,
|
||||
@@ -990,3 +994,68 @@ func TestMergeServerJoin(t *testing.T) {
|
||||
require.Equal(result.RetryInterval, retryInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTelemetry_PrefixFilters(t *testing.T) {
|
||||
t.Parallel()
|
||||
cases := []struct {
|
||||
in []string
|
||||
expAllow []string
|
||||
expBlock []string
|
||||
expErr bool
|
||||
}{
|
||||
{
|
||||
in: []string{"+foo"},
|
||||
expAllow: []string{"foo"},
|
||||
},
|
||||
{
|
||||
in: []string{"-foo"},
|
||||
expBlock: []string{"foo"},
|
||||
},
|
||||
{
|
||||
in: []string{"+a.b.c", "-x.y.z"},
|
||||
expAllow: []string{"a.b.c"},
|
||||
expBlock: []string{"x.y.z"},
|
||||
},
|
||||
{
|
||||
in: []string{"+foo", "bad", "-bar"},
|
||||
expErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for i, c := range cases {
|
||||
t.Run(fmt.Sprintf("PrefixCase%d", i), func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
tel := &Telemetry{
|
||||
PrefixFilter: c.in,
|
||||
}
|
||||
|
||||
allow, block, err := tel.PrefixFilters()
|
||||
require.Exactly(c.expAllow, allow)
|
||||
require.Exactly(c.expBlock, block)
|
||||
require.Equal(c.expErr, err != nil)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTelemetry_Parse(t *testing.T) {
|
||||
require := require.New(t)
|
||||
dir, err := ioutil.TempDir("", "nomad")
|
||||
require.NoError(err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
file1 := filepath.Join(dir, "config1.hcl")
|
||||
err = ioutil.WriteFile(file1, []byte(`telemetry{
|
||||
prefix_filter = ["+nomad.raft"]
|
||||
filter_default = false
|
||||
disable_dispatched_job_summary_metrics = true
|
||||
}`), 0600)
|
||||
require.NoError(err)
|
||||
|
||||
// Works on config dir
|
||||
config, err := LoadConfig(dir)
|
||||
require.NoError(err)
|
||||
|
||||
require.False(*config.Telemetry.FilterDefault)
|
||||
require.Exactly([]string{"+nomad.raft"}, config.Telemetry.PrefixFilter)
|
||||
require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics)
|
||||
}
|
||||
|
||||
@@ -280,6 +280,10 @@ type Config struct {
|
||||
// key/value/tag format, or simply a key/value format
|
||||
DisableTaggedMetrics bool
|
||||
|
||||
// DisableDispatchedJobSummaryMetrics allows for ignore dispatched jobs when
|
||||
// publishing Job summary metrics
|
||||
DisableDispatchedJobSummaryMetrics bool
|
||||
|
||||
// BackwardsCompatibleMetrics determines whether to show methods of
|
||||
// displaying metrics for older versions, or to only show the new format
|
||||
BackwardsCompatibleMetrics bool
|
||||
|
||||
122
nomad/leader.go
122
nomad/leader.go
@@ -624,68 +624,82 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) {
|
||||
break
|
||||
}
|
||||
summary := raw.(*structs.JobSummary)
|
||||
for name, tgSummary := range summary.Summary {
|
||||
if !s.config.DisableTaggedMetrics {
|
||||
labels := []metrics.Label{
|
||||
{
|
||||
Name: "job",
|
||||
Value: summary.JobID,
|
||||
},
|
||||
{
|
||||
Name: "task_group",
|
||||
Value: name,
|
||||
},
|
||||
}
|
||||
|
||||
if strings.Contains(summary.JobID, "/dispatch-") {
|
||||
jobInfo := strings.Split(summary.JobID, "/dispatch-")
|
||||
labels = append(labels, metrics.Label{
|
||||
Name: "parent_id",
|
||||
Value: jobInfo[0],
|
||||
}, metrics.Label{
|
||||
Name: "dispatch_id",
|
||||
Value: jobInfo[1],
|
||||
})
|
||||
}
|
||||
|
||||
if strings.Contains(summary.JobID, "/periodic-") {
|
||||
jobInfo := strings.Split(summary.JobID, "/periodic-")
|
||||
labels = append(labels, metrics.Label{
|
||||
Name: "parent_id",
|
||||
Value: jobInfo[0],
|
||||
}, metrics.Label{
|
||||
Name: "periodic_id",
|
||||
Value: jobInfo[1],
|
||||
})
|
||||
}
|
||||
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"},
|
||||
float32(tgSummary.Queued), labels)
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"},
|
||||
float32(tgSummary.Complete), labels)
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"},
|
||||
float32(tgSummary.Failed), labels)
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"},
|
||||
float32(tgSummary.Running), labels)
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"},
|
||||
float32(tgSummary.Starting), labels)
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
|
||||
float32(tgSummary.Lost), labels)
|
||||
if s.config.DisableDispatchedJobSummaryMetrics {
|
||||
job, err := state.JobByID(ws, summary.Namespace, summary.JobID)
|
||||
if err != nil {
|
||||
s.logger.Error("error getting job for summary", "error", err)
|
||||
continue
|
||||
}
|
||||
if s.config.BackwardsCompatibleMetrics {
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued))
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete))
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed))
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running))
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting))
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost))
|
||||
if job.Dispatched {
|
||||
continue
|
||||
}
|
||||
}
|
||||
s.iterateJobSummaryMetrics(summary)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) iterateJobSummaryMetrics(summary *structs.JobSummary) {
|
||||
for name, tgSummary := range summary.Summary {
|
||||
if !s.config.DisableTaggedMetrics {
|
||||
labels := []metrics.Label{
|
||||
{
|
||||
Name: "job",
|
||||
Value: summary.JobID,
|
||||
},
|
||||
{
|
||||
Name: "task_group",
|
||||
Value: name,
|
||||
},
|
||||
}
|
||||
|
||||
if strings.Contains(summary.JobID, "/dispatch-") {
|
||||
jobInfo := strings.Split(summary.JobID, "/dispatch-")
|
||||
labels = append(labels, metrics.Label{
|
||||
Name: "parent_id",
|
||||
Value: jobInfo[0],
|
||||
}, metrics.Label{
|
||||
Name: "dispatch_id",
|
||||
Value: jobInfo[1],
|
||||
})
|
||||
}
|
||||
|
||||
if strings.Contains(summary.JobID, "/periodic-") {
|
||||
jobInfo := strings.Split(summary.JobID, "/periodic-")
|
||||
labels = append(labels, metrics.Label{
|
||||
Name: "parent_id",
|
||||
Value: jobInfo[0],
|
||||
}, metrics.Label{
|
||||
Name: "periodic_id",
|
||||
Value: jobInfo[1],
|
||||
})
|
||||
}
|
||||
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"},
|
||||
float32(tgSummary.Queued), labels)
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"},
|
||||
float32(tgSummary.Complete), labels)
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"},
|
||||
float32(tgSummary.Failed), labels)
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"},
|
||||
float32(tgSummary.Running), labels)
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"},
|
||||
float32(tgSummary.Starting), labels)
|
||||
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
|
||||
float32(tgSummary.Lost), labels)
|
||||
}
|
||||
if s.config.BackwardsCompatibleMetrics {
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued))
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete))
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed))
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running))
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting))
|
||||
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// revokeLeadership is invoked once we step down as leader.
|
||||
// This is used to cleanup any state that may be specific to a leader.
|
||||
func (s *Server) revokeLeadership() error {
|
||||
|
||||
@@ -64,13 +64,36 @@ The following options are available on all telemetry configurations.
|
||||
only be added to tagged metrics. Note that this option is used to transition
|
||||
monitoring to tagged metrics and will eventually be deprecated.
|
||||
|
||||
|
||||
- `disable_tagged_metrics` `(bool: false)` - Specifies if Nomad should not emit
|
||||
tagged metrics and only emit metrics compatible with versions below Nomad
|
||||
0.7. Note that this option is used to transition monitoring to tagged
|
||||
metrics and will eventually be deprecated.
|
||||
|
||||
- `filter_default` `(bool: true)` - This controls whether to allow metrics that
|
||||
have not been specified by the filter. Defaults to true, which will allow all
|
||||
metrics when no filters are provided. When set to false with no filters, no
|
||||
metrics will be sent.
|
||||
|
||||
- `prefix_filter` `(list: [])` - This is a list of filter rules to apply for
|
||||
allowing/blocking metrics by prefix. A leading "<b>+</b>" will enable any
|
||||
metrics with the given prefix, and a leading "<b>-</b>" will block them. If
|
||||
there is overlap between two rules, the more specific rule will take
|
||||
precedence. Blocking will take priority if the same prefix is listed multiple
|
||||
times.
|
||||
|
||||
```javascript
|
||||
[
|
||||
"-nomad.raft",
|
||||
"+nomad.raft.apply",
|
||||
"-nomad.memberlist",
|
||||
]
|
||||
```
|
||||
|
||||
- `disable_dispatched_job_summary_metrics` `(bool: false)` - Specifies if Nomad
|
||||
should ignore jobs dispatched from a parameterized job when publishing job
|
||||
summary statistics. Since each job has a small memory overhead for tracking
|
||||
summary statistics, it is sometimes desired to trade these statistics for
|
||||
more memory when dispatching high volumes of jobs.
|
||||
|
||||
### `statsite`
|
||||
|
||||
|
||||
Reference in New Issue
Block a user