diff --git a/command/agent/agent.go b/command/agent/agent.go index 0c8072629..60907a0a8 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -233,6 +233,11 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi // Set the TLS config conf.TLSConfig = agentConfig.TLSConfig + // Setup telemetry related config + conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval + conf.DisableTaggedMetrics = agentConfig.Telemetry.DisableTaggedMetrics + conf.BackwardsCompatibleMetrics = agentConfig.Telemetry.BackwardsCompatibleMetrics + return conf, nil } diff --git a/nomad/config.go b/nomad/config.go index db4939106..a96345b84 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -245,6 +245,18 @@ type Config struct { // SentinelConfig is this Agent's Sentinel configuration SentinelConfig *config.SentinelConfig + + // StatsCollectionInterval is the interval at which the Nomad server + // publishes metrics which are periodic in nature like updating gauges + StatsCollectionInterval time.Duration + + // DisableTaggedMetrics determines whether metrics will be displayed via a + // key/value/tag format, or simply a key/value format + DisableTaggedMetrics bool + + // BackwardsCompatibleMetrics determines whether to show methods of + // displaying metrics for older verions, or to only show the new format + BackwardsCompatibleMetrics bool } // CheckVersion is used to check if the ProtocolVersion is valid @@ -303,6 +315,7 @@ func DefaultConfig() *Config { TLSConfig: &config.TLSConfig{}, ReplicationBackoff: 30 * time.Second, SentinelGCInterval: 30 * time.Second, + StatsCollectionInterval: 1 * time.Minute, } // Enable all known schedulers by default diff --git a/nomad/leader.go b/nomad/leader.go index 8a4d2dbab..d3fad7940 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -180,6 +180,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Periodically unblock failed allocations go s.periodicUnblockFailedEvals(stopCh) + // Periodically publish job summary metrics + go s.publishJobSummaryMetrics(stopCh) + // Setup the heartbeat timers. This is done both when starting up or when // a leader fail over happens. Since the timers are maintained by the leader // node, effectively this means all the timers are renewed at the time of failover. @@ -519,6 +522,74 @@ func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) { } } +// publishJobSummaryMetrics publishes the job summaries as metrics +func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) { + timer := time.NewTimer(0) + defer timer.Stop() + + for { + select { + case <-stopCh: + return + case <-timer.C: + timer.Reset(s.config.StatsCollectionInterval) + state, err := s.State().Snapshot() + if err != nil { + s.logger.Printf("[ERR] nomad: failed to get state: %v", err) + continue + } + ws := memdb.NewWatchSet() + iter, err := state.JobSummaries(ws) + if err != nil { + s.logger.Printf("[ERR] nomad: failed to get job summaries: %v", err) + continue + } + + for { + raw := iter.Next() + if raw == nil { + break + } + summary := raw.(*structs.JobSummary) + for name, tgSummary := range summary.Summary { + if !s.config.DisableTaggedMetrics { + labels := []metrics.Label{ + { + Name: "job", + Value: summary.JobID, + }, + { + Name: "task_group", + Value: name, + }, + } + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"}, + float32(tgSummary.Queued), labels) + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"}, + float32(tgSummary.Complete), labels) + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"}, + float32(tgSummary.Failed), labels) + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"}, + float32(tgSummary.Running), labels) + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"}, + float32(tgSummary.Starting), labels) + metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"}, + float32(tgSummary.Lost), labels) + } + if s.config.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost)) + } + } + } + } + } +} + // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error {