From 4c9ab7b76049895e09cb1f3f6adf0343a73f1a91 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 30 Oct 2017 12:19:11 -0700 Subject: [PATCH 1/4] Publishing metrics for job summary --- command/agent/agent.go | 3 +++ nomad/config.go | 4 ++++ nomad/leader.go | 49 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/command/agent/agent.go b/command/agent/agent.go index 0c8072629..e47e8be3e 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -233,6 +233,9 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi // Set the TLS config conf.TLSConfig = agentConfig.TLSConfig + // Setup telemetry related config + conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval + return conf, nil } diff --git a/nomad/config.go b/nomad/config.go index db4939106..fecac1dfa 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -245,6 +245,10 @@ type Config struct { // SentinelConfig is this Agent's Sentinel configuration SentinelConfig *config.SentinelConfig + + // StatsCollectionInterval is the interval at which the Nomad server + // publishes metrics which are periodic in nature like updating gauges + StatsCollectionInterval time.Duration } // CheckVersion is used to check if the ProtocolVersion is valid diff --git a/nomad/leader.go b/nomad/leader.go index 8a4d2dbab..97b52c2de 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -180,6 +180,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Periodically unblock failed allocations go s.periodicUnblockFailedEvals(stopCh) + // Periodically publish job summary metrics + go s.publishJobSummaryMetrics(stopCh) + // Setup the heartbeat timers. This is done both when starting up or when // a leader fail over happens. Since the timers are maintained by the leader // node, effectively this means all the timers are renewed at the time of failover. @@ -519,6 +522,52 @@ func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) { } } +// publishJobSummaryMetrics publishes the job summaries as metrics +func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) { + // Using a timer instead of a ticker so that we can publish after the + // current batch of metrics have been published + timer := time.NewTimer(0) + defer timer.Stop() + + for { + select { + case <-stopCh: + return + case <-timer.C: + state, err := s.State().Snapshot() + if err != nil { + timer.Reset(s.config.StatsCollectionInterval) + s.logger.Printf("[ERR] nomad: failed to get state: %v", err) + continue + } + ws := memdb.NewWatchSet() + iter, err := state.JobSummaries(ws) + if err != nil { + timer.Reset(s.config.StatsCollectionInterval) + s.logger.Printf("[ERR] nomad: failed to get job summaries: %v", err) + continue + } + + for { + raw := iter.Next() + if raw == nil { + break + } + summary := raw.(*structs.JobSummary) + for name, tgSummary := range summary.Summary { + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost)) + } + } + timer.Reset(s.config.StatsCollectionInterval) + } + } +} + // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error { From 7687b8b8159474bde220e5952ef1bc3aa7bfe220 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 30 Oct 2017 12:25:37 -0700 Subject: [PATCH 2/4] Setting the default stats collection interval --- nomad/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nomad/config.go b/nomad/config.go index fecac1dfa..91a5fcc55 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -307,6 +307,7 @@ func DefaultConfig() *Config { TLSConfig: &config.TLSConfig{}, ReplicationBackoff: 30 * time.Second, SentinelGCInterval: 30 * time.Second, + StatsCollectionInterval: 1 * time.Minute, } // Enable all known schedulers by default From 3e1d24876ebe84201c958b11adc8491973204b96 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 30 Oct 2017 16:10:58 -0700 Subject: [PATCH 3/4] Adding support for tagged metrics --- command/agent/agent.go | 2 ++ nomad/config.go | 8 ++++++++ nomad/leader.go | 38 ++++++++++++++++++++++++++++++++------ 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index e47e8be3e..60907a0a8 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -235,6 +235,8 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi // Setup telemetry related config conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval + conf.DisableTaggedMetrics = agentConfig.Telemetry.DisableTaggedMetrics + conf.BackwardsCompatibleMetrics = agentConfig.Telemetry.BackwardsCompatibleMetrics return conf, nil } diff --git a/nomad/config.go b/nomad/config.go index 91a5fcc55..a96345b84 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -249,6 +249,14 @@ type Config struct { // StatsCollectionInterval is the interval at which the Nomad server // publishes metrics which are periodic in nature like updating gauges StatsCollectionInterval time.Duration + + // DisableTaggedMetrics determines whether metrics will be displayed via a + // key/value/tag format, or simply a key/value format + DisableTaggedMetrics bool + + // BackwardsCompatibleMetrics determines whether to show methods of + // displaying metrics for older verions, or to only show the new format + BackwardsCompatibleMetrics bool } // CheckVersion is used to check if the ProtocolVersion is valid diff --git a/nomad/leader.go b/nomad/leader.go index 97b52c2de..6c8e2e72f 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -555,12 +555,38 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) { } summary := raw.(*structs.JobSummary) for name, tgSummary := range summary.Summary { - metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued)) - metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete)) - metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed)) - metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running)) - metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting)) - metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost)) + if !s.config.DisableTaggedMetrics { + labels := []metrics.Label{ + { + Name: "job", + Value: summary.JobID, + }, + { + Name: "task_group", + Value: name, + }, + } + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"}, + float32(tgSummary.Queued), labels) + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"}, + float32(tgSummary.Complete), labels) + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"}, + float32(tgSummary.Failed), labels) + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"}, + float32(tgSummary.Running), labels) + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"}, + float32(tgSummary.Starting), labels) + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"}, + float32(tgSummary.Lost), labels) + } + if s.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost)) + } } } timer.Reset(s.config.StatsCollectionInterval) From 5351b10ccdb37fd107865043ed127d5ae1d60954 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 1 Nov 2017 13:14:44 -0700 Subject: [PATCH 4/4] Resetting the timer at the beginning of the loop --- nomad/leader.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index 6c8e2e72f..d3fad7940 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -524,8 +524,6 @@ func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) { // publishJobSummaryMetrics publishes the job summaries as metrics func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) { - // Using a timer instead of a ticker so that we can publish after the - // current batch of metrics have been published timer := time.NewTimer(0) defer timer.Stop() @@ -534,16 +532,15 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) { case <-stopCh: return case <-timer.C: + timer.Reset(s.config.StatsCollectionInterval) state, err := s.State().Snapshot() if err != nil { - timer.Reset(s.config.StatsCollectionInterval) s.logger.Printf("[ERR] nomad: failed to get state: %v", err) continue } ws := memdb.NewWatchSet() iter, err := state.JobSummaries(ws) if err != nil { - timer.Reset(s.config.StatsCollectionInterval) s.logger.Printf("[ERR] nomad: failed to get job summaries: %v", err) continue } @@ -589,7 +586,6 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) { } } } - timer.Reset(s.config.StatsCollectionInterval) } } }