From b60cc07e5a8401d2991ce58ee2b1e1bf44d116ae Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 5 Aug 2015 16:45:50 -0700 Subject: [PATCH] nomad: cleanup stats goroutines --- nomad/eval_broker.go | 20 ++++++++++++-------- nomad/plan_queue.go | 12 ++++++++---- nomad/server.go | 4 ++-- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 0e42471df..5fdd79e5a 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -395,16 +395,20 @@ func (b *EvalBroker) Stats() *BrokerStats { } // EmitStats is used to export metrics about the broker while enabled -func (b *EvalBroker) EmitStats(period time.Duration) { +func (b *EvalBroker) EmitStats(period time.Duration, stopCh chan struct{}) { for { - <-time.After(period) + select { + case <-time.After(period): + stats := b.Stats() + metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady)) + metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked)) + for sched, schedStats := range stats.ByScheduler { + metrics.SetGauge([]string{"nomad", "broker", sched, "ready"}, float32(schedStats.Ready)) + metrics.SetGauge([]string{"nomad", "broker", sched, "unacked"}, float32(schedStats.Unacked)) + } - stats := b.Stats() - metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady)) - metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked)) - for sched, schedStats := range stats.ByScheduler { - metrics.SetGauge([]string{"nomad", "broker", sched, "ready"}, float32(schedStats.Ready)) - metrics.SetGauge([]string{"nomad", "broker", sched, "unacked"}, float32(schedStats.Unacked)) + case <-stopCh: + return } } } diff --git a/nomad/plan_queue.go b/nomad/plan_queue.go index cb2e5eab5..d679b9371 100644 --- a/nomad/plan_queue.go +++ b/nomad/plan_queue.go @@ -195,12 +195,16 @@ func (q *PlanQueue) Stats() *QueueStats { } // EmitStats is used to export metrics about the broker while enabled -func (q *PlanQueue) EmitStats(period time.Duration) { +func (q *PlanQueue) EmitStats(period time.Duration, stopCh chan struct{}) { for { - <-time.After(period) + select { + case <-time.After(period): + stats := q.Stats() + metrics.SetGauge([]string{"nomad", "plan", "queue_depth"}, float32(stats.Depth)) - stats := q.Stats() - metrics.SetGauge([]string{"nomad", "plan", "queue_depth"}, float32(stats.Depth)) + case <-stopCh: + return + } } } diff --git a/nomad/server.go b/nomad/server.go index 86b8a6792..1b4501f97 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -188,10 +188,10 @@ func NewServer(config *Config) (*Server, error) { go s.listen() // Emit metrics for the eval broker - go evalBroker.EmitStats(time.Second) + go evalBroker.EmitStats(time.Second, s.shutdownCh) // Emit metrics for the plan queue - go planQueue.EmitStats(time.Second) + go planQueue.EmitStats(time.Second, s.shutdownCh) // Done return s, nil