Merge pull request #2425 from hashicorp/f-client-metrics

Add metrics to show allocations on the client
This commit is contained in:
Alex Dadgar
2017-03-09 21:05:53 -08:00
committed by GitHub
4 changed files with 81 additions and 37 deletions

View File

@@ -54,24 +54,28 @@ func (e *Evaluations) Allocations(evalID string, q *QueryOptions) ([]*Allocation
// Evaluation is used to serialize an evaluation.
type Evaluation struct {
ID string
Priority int
Type string
TriggeredBy string
JobID string
JobModifyIndex uint64
NodeID string
NodeModifyIndex uint64
Status string
StatusDescription string
Wait time.Duration
NextEval string
PreviousEval string
BlockedEval string
FailedTGAllocs map[string]*AllocationMetric
QueuedAllocations map[string]int
CreateIndex uint64
ModifyIndex uint64
ID string
Priority int
Type string
TriggeredBy string
JobID string
JobModifyIndex uint64
NodeID string
NodeModifyIndex uint64
Status string
StatusDescription string
Wait time.Duration
NextEval string
PreviousEval string
BlockedEval string
FailedTGAllocs map[string]*AllocationMetric
ClassEligibility map[string]bool
EscapedComputedClass bool
AnnotatePlan bool
QueuedAllocations map[string]int
SnapshotIndex uint64
CreateIndex uint64
ModifyIndex uint64
}
// EvalIndexSort is a wrapper to sort evaluations by CreateIndex.

View File

@@ -574,15 +574,21 @@ func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
// handleDestroy blocks till the AllocRunner should be destroyed and does the
// necessary cleanup.
func (r *AllocRunner) handleDestroy() {
select {
case <-r.destroyCh:
if err := r.DestroyContext(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
r.alloc.ID, err)
}
if err := r.DestroyState(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v",
r.alloc.ID, err)
for {
select {
case <-r.destroyCh:
if err := r.DestroyContext(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
r.alloc.ID, err)
}
if err := r.DestroyState(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v",
r.alloc.ID, err)
}
return
case <-r.updateCh:
r.logger.Printf("[ERR] client: dropping update to terminal alloc '%s'", r.alloc.ID)
}
}
}

View File

@@ -310,7 +310,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
go c.run()
// Start collecting stats
go c.collectHostStats()
go c.emitStats()
c.logger.Printf("[INFO] client: Node ID %q", c.Node().ID)
return c, nil
@@ -2170,8 +2170,8 @@ func (c *Client) consulReaperImpl() error {
return c.consulSyncer.ReapUnmatched(domains)
}
// collectHostStats collects host resource usage stats periodically
func (c *Client) collectHostStats() {
// emitStats collects host resource usage stats periodically
func (c *Client) emitStats() {
// Start collecting host stats right away and then keep collecting every
// collection interval
next := time.NewTimer(0)
@@ -2188,16 +2188,18 @@ func (c *Client) collectHostStats() {
// Publish Node metrics if operator has opted in
if c.config.PublishNodeMetrics {
c.emitStats(c.hostStatsCollector.Stats())
c.emitHostStats(c.hostStatsCollector.Stats())
}
c.emitClientMetrics()
case <-c.shutdownCh:
return
}
}
}
// emitStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitStats(hStats *stats.HostStats) {
// emitHostStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitHostStats(hStats *stats.HostStats) {
nodeID := c.Node().ID
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available))
@@ -2263,6 +2265,38 @@ func (c *Client) emitStats(hStats *stats.HostStats) {
}
}
// emitClientMetrics emits lower volume client metrics
func (c *Client) emitClientMetrics() {
nodeID := c.Node().ID
// Emit allocation metrics
c.migratingAllocsLock.Lock()
migrating := len(c.migratingAllocs)
c.migratingAllocsLock.Unlock()
c.blockedAllocsLock.Lock()
blocked := len(c.blockedAllocations)
c.blockedAllocsLock.Unlock()
pending, running, terminal := 0, 0, 0
for _, ar := range c.getAllocRunners() {
switch ar.Alloc().ClientStatus {
case structs.AllocClientStatusPending:
pending++
case structs.AllocClientStatusRunning:
running++
case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed:
terminal++
}
}
metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating))
metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked))
metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending))
metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running))
metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal))
}
func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resources {
// Unfortunately the allocs only have IP so we need to match them to the
// device

View File

@@ -3827,15 +3827,15 @@ type Evaluation struct {
// during the evaluation. This should not be set during normal operations.
AnnotatePlan bool
// QueuedAllocations is the number of unplaced allocations at the time the
// evaluation was processed. The map is keyed by Task Group names.
QueuedAllocations map[string]int
// SnapshotIndex is the Raft index of the snapshot used to process the
// evaluation. As such it will only be set once it has gone through the
// scheduler.
SnapshotIndex uint64
// QueuedAllocations is the number of unplaced allocations at the time the
// evaluation was processed. The map is keyed by Task Group names.
QueuedAllocations map[string]int
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64