Merge pull request #3467 from hashicorp/f-publish-job-summary-metrics

Publishing metrics for job summary
This commit is contained in:
Diptanu Choudhury
2017-11-01 13:16:24 -07:00
committed by GitHub
3 changed files with 89 additions and 0 deletions

View File

@@ -233,6 +233,11 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi
// Set the TLS config
conf.TLSConfig = agentConfig.TLSConfig
// Setup telemetry related config
conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval
conf.DisableTaggedMetrics = agentConfig.Telemetry.DisableTaggedMetrics
conf.BackwardsCompatibleMetrics = agentConfig.Telemetry.BackwardsCompatibleMetrics
return conf, nil
}

View File

@@ -245,6 +245,18 @@ type Config struct {
// SentinelConfig is this Agent's Sentinel configuration
SentinelConfig *config.SentinelConfig
// StatsCollectionInterval is the interval at which the Nomad server
// publishes metrics which are periodic in nature like updating gauges
StatsCollectionInterval time.Duration
// DisableTaggedMetrics determines whether metrics will be displayed via a
// key/value/tag format, or simply a key/value format
DisableTaggedMetrics bool
// BackwardsCompatibleMetrics determines whether to show methods of
// displaying metrics for older verions, or to only show the new format
BackwardsCompatibleMetrics bool
}
// CheckVersion is used to check if the ProtocolVersion is valid
@@ -303,6 +315,7 @@ func DefaultConfig() *Config {
TLSConfig: &config.TLSConfig{},
ReplicationBackoff: 30 * time.Second,
SentinelGCInterval: 30 * time.Second,
StatsCollectionInterval: 1 * time.Minute,
}
// Enable all known schedulers by default

View File

@@ -180,6 +180,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Periodically unblock failed allocations
go s.periodicUnblockFailedEvals(stopCh)
// Periodically publish job summary metrics
go s.publishJobSummaryMetrics(stopCh)
// Setup the heartbeat timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node, effectively this means all the timers are renewed at the time of failover.
@@ -519,6 +522,74 @@ func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) {
}
}
// publishJobSummaryMetrics publishes the job summaries as metrics
func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) {
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-stopCh:
return
case <-timer.C:
timer.Reset(s.config.StatsCollectionInterval)
state, err := s.State().Snapshot()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to get state: %v", err)
continue
}
ws := memdb.NewWatchSet()
iter, err := state.JobSummaries(ws)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to get job summaries: %v", err)
continue
}
for {
raw := iter.Next()
if raw == nil {
break
}
summary := raw.(*structs.JobSummary)
for name, tgSummary := range summary.Summary {
if !s.config.DisableTaggedMetrics {
labels := []metrics.Label{
{
Name: "job",
Value: summary.JobID,
},
{
Name: "task_group",
Value: name,
},
}
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"},
float32(tgSummary.Queued), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"},
float32(tgSummary.Complete), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"},
float32(tgSummary.Failed), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"},
float32(tgSummary.Running), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"},
float32(tgSummary.Starting), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
float32(tgSummary.Lost), labels)
}
if s.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost))
}
}
}
}
}
}
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {