Files
nomad/vendor/github.com/DataDog/datadog-go/statsd/statsd.go
Seth Hoenig 15fb4c990a deps: Switch to Go modules for dependency management
This PR switches the Nomad repository from using govendor to Go modules
for managing dependencies. Aspects of the Nomad workflow remain pretty
much the same. The usual Makefile targets should continue to work as
they always did. The API submodule simply defers to the parent Nomad
version on the repository, keeping the semantics of API versioning that
currently exists.
2020-06-02 14:30:36 -05:00

504 lines
16 KiB
Go

// Copyright 2013 Ooyala, Inc.
/*
Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd,
adding tags and histograms and pushing upstream to Datadog.
Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD.
Example Usage:
// Create the client
c, err := statsd.New("127.0.0.1:8125")
if err != nil {
log.Fatal(err)
}
// Prefix every metric with the app name
c.Namespace = "flubber."
// Send the EC2 availability zone as a tag with every metric
c.Tags = append(c.Tags, "us-east-1a")
err = c.Gauge("request.duration", 1.2, nil, 1)
statsd is based on go-statsd-client.
*/
package statsd
import (
"fmt"
"math/rand"
"os"
"strings"
"sync"
"time"
)
/*
OptimalUDPPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes
is optimal for regular networks with an MTU of 1500 so datagrams don't get
fragmented. It's generally recommended not to fragment UDP datagrams as losing
a single fragment will cause the entire datagram to be lost.
*/
const OptimalUDPPayloadSize = 1432
/*
MaxUDPPayloadSize defines the maximum payload size for a UDP datagram.
Its value comes from the calculation: 65535 bytes Max UDP datagram size -
8byte UDP header - 60byte max IP headers
any number greater than that will see frames being cut out.
*/
const MaxUDPPayloadSize = 65467
// DefaultUDPBufferPoolSize is the default size of the buffer pool for UDP clients.
const DefaultUDPBufferPoolSize = 2048
// DefaultUDSBufferPoolSize is the default size of the buffer pool for UDS clients.
const DefaultUDSBufferPoolSize = 512
/*
DefaultMaxAgentPayloadSize is the default maximum payload size the agent
can receive. This can be adjusted by changing dogstatsd_buffer_size in the
agent configuration file datadog.yaml.
*/
const DefaultMaxAgentPayloadSize = 8192
/*
TelemetryInterval is the interval at which telemetry will be sent by the client.
*/
const TelemetryInterval = 10 * time.Second
/*
clientTelemetryTag is a tag identifying this specific client.
*/
var clientTelemetryTag = "client:go"
/*
UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket
traffic instead of UDP.
*/
const UnixAddressPrefix = "unix://"
// Client-side entity ID injection for container tagging
const (
entityIDEnvName = "DD_ENTITY_ID"
entityIDTagName = "dd.internal.entity_id"
)
type metricType int
const (
gauge metricType = iota
count
histogram
distribution
set
timing
event
serviceCheck
)
type metric struct {
metricType metricType
namespace string
globalTags []string
name string
fvalue float64
ivalue int64
svalue string
evalue *Event
scvalue *ServiceCheck
tags []string
rate float64
}
type noClientErr string
// ErrNoClient is returned if statsd reporting methods are invoked on
// a nil client.
const ErrNoClient = noClientErr("statsd client is nil")
func (e noClientErr) Error() string {
return string(e)
}
// ClientInterface is an interface that exposes the common client functions for the
// purpose of being able to provide a no-op client or even mocking. This can aid
// downstream users' with their testing.
type ClientInterface interface {
// Gauge measures the value of a metric at a particular time.
Gauge(name string, value float64, tags []string, rate float64) error
// Count tracks how many times something happened per second.
Count(name string, value int64, tags []string, rate float64) error
// Histogram tracks the statistical distribution of a set of values on each host.
Histogram(name string, value float64, tags []string, rate float64) error
// Distribution tracks the statistical distribution of a set of values across your infrastructure.
Distribution(name string, value float64, tags []string, rate float64) error
// Decr is just Count of -1
Decr(name string, tags []string, rate float64) error
// Incr is just Count of 1
Incr(name string, tags []string, rate float64) error
// Set counts the number of unique elements in a group.
Set(name string, value string, tags []string, rate float64) error
// Timing sends timing information, it is an alias for TimeInMilliseconds
Timing(name string, value time.Duration, tags []string, rate float64) error
// TimeInMilliseconds sends timing information in milliseconds.
// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
TimeInMilliseconds(name string, value float64, tags []string, rate float64) error
// Event sends the provided Event.
Event(e *Event) error
// SimpleEvent sends an event with the provided title and text.
SimpleEvent(title, text string) error
// ServiceCheck sends the provided ServiceCheck.
ServiceCheck(sc *ServiceCheck) error
// SimpleServiceCheck sends an serviceCheck with the provided name and status.
SimpleServiceCheck(name string, status ServiceCheckStatus) error
// Close the client connection.
Close() error
// Flush forces a flush of all the queued dogstatsd payloads.
Flush() error
// SetWriteTimeout allows the user to set a custom write timeout.
SetWriteTimeout(d time.Duration) error
}
// A Client is a handle for sending messages to dogstatsd. It is safe to
// use one Client from multiple goroutines simultaneously.
type Client struct {
// Sender handles the underlying networking protocol
sender *sender
// Namespace to prepend to all statsd calls
Namespace string
// Tags are global tags to be added to every statsd call
Tags []string
// skipErrors turns off error passing and allows UDS to emulate UDP behaviour
SkipErrors bool
flushTime time.Duration
bufferPool *bufferPool
buffer *statsdBuffer
telemetryTags []string
stop chan struct{}
sync.Mutex
}
// Verify that Client implements the ClientInterface.
// https://golang.org/doc/faq#guarantee_satisfies_interface
var _ ClientInterface = &Client{}
// New returns a pointer to a new Client given an addr in the format "hostname:port" or
// "unix:///path/to/socket".
func New(addr string, options ...Option) (*Client, error) {
var w statsdWriter
o, err := resolveOptions(options)
if err != nil {
return nil, err
}
var writerType string
optimalPayloadSize := OptimalUDPPayloadSize
defaultBufferPoolSize := DefaultUDPBufferPoolSize
if !strings.HasPrefix(addr, UnixAddressPrefix) {
w, err = newUDPWriter(addr)
writerType = "udp"
} else {
// FIXME: The agent has a performance pitfall preventing us from using better defaults here.
// Once it's fixed, use `DefaultMaxAgentPayloadSize` and `DefaultUDSBufferPoolSize` instead.
optimalPayloadSize = OptimalUDPPayloadSize
defaultBufferPoolSize = DefaultUDPBufferPoolSize
w, err = newUDSWriter(addr[len(UnixAddressPrefix)-1:])
writerType = "uds"
}
if err != nil {
return nil, err
}
if o.MaxBytesPerPayload == 0 {
o.MaxBytesPerPayload = optimalPayloadSize
}
if o.BufferPoolSize == 0 {
o.BufferPoolSize = defaultBufferPoolSize
}
if o.SenderQueueSize == 0 {
o.SenderQueueSize = defaultBufferPoolSize
}
return newWithWriter(w, o, writerType)
}
// NewWithWriter creates a new Client with given writer. Writer is a
// io.WriteCloser + SetWriteTimeout(time.Duration) error
func NewWithWriter(w statsdWriter, options ...Option) (*Client, error) {
o, err := resolveOptions(options)
if err != nil {
return nil, err
}
return newWithWriter(w, o, "custom")
}
func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, error) {
w.SetWriteTimeout(o.WriteTimeoutUDS)
c := Client{
Namespace: o.Namespace,
Tags: o.Tags,
telemetryTags: []string{clientTelemetryTag, "transport:" + writerName},
}
// Inject DD_ENTITY_ID as a constant tag if found
entityID := os.Getenv(entityIDEnvName)
if entityID != "" {
entityTag := fmt.Sprintf("%s:%s", entityIDTagName, entityID)
c.Tags = append(c.Tags, entityTag)
}
if o.MaxBytesPerPayload == 0 {
o.MaxBytesPerPayload = OptimalUDPPayloadSize
}
c.bufferPool = newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload)
c.buffer = c.bufferPool.borrowBuffer()
c.sender = newSender(w, o.SenderQueueSize, c.bufferPool)
c.flushTime = o.BufferFlushInterval
c.stop = make(chan struct{}, 1)
go c.watch()
go c.telemetry()
return &c, nil
}
// NewBuffered returns a Client that buffers its output and sends it in chunks.
// Buflen is the length of the buffer in number of commands.
//
// When addr is empty, the client will default to a UDP client and use the DD_AGENT_HOST
// and (optionally) the DD_DOGSTATSD_PORT environment variables to build the target address.
func NewBuffered(addr string, buflen int) (*Client, error) {
return New(addr, WithMaxMessagesPerPayload(buflen))
}
// SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP.
func (c *Client) SetWriteTimeout(d time.Duration) error {
if c == nil {
return ErrNoClient
}
return c.sender.transport.SetWriteTimeout(d)
}
func (c *Client) watch() {
ticker := time.NewTicker(c.flushTime)
for {
select {
case <-ticker.C:
c.Lock()
c.flushUnsafe()
c.Unlock()
case <-c.stop:
ticker.Stop()
return
}
}
}
func (c *Client) telemetry() {
ticker := time.NewTicker(TelemetryInterval)
for {
select {
case <-ticker.C:
metrics := c.sender.flushMetrics()
c.telemetryCount("datadog.dogstatsd.client.packets_sent", int64(metrics.TotalSentPayloads), c.telemetryTags, 1)
c.telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(metrics.TotalSentBytes), c.telemetryTags, 1)
c.telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(metrics.TotalDroppedPayloads), c.telemetryTags, 1)
c.telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(metrics.TotalDroppedBytes), c.telemetryTags, 1)
c.telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(metrics.TotalDroppedPayloadsQueueFull), c.telemetryTags, 1)
c.telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(metrics.TotalDroppedBytesQueueFull), c.telemetryTags, 1)
c.telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(metrics.TotalDroppedPayloadsWriter), c.telemetryTags, 1)
c.telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(metrics.TotalDroppedBytesWriter), c.telemetryTags, 1)
case <-c.stop:
ticker.Stop()
return
}
}
}
// same as Count but without global namespace / tags
func (c *Client) telemetryCount(name string, value int64, tags []string, rate float64) {
c.addMetric(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate})
}
// Flush forces a flush of all the queued dogstatsd payloads
// This method is blocking and will not return until everything is sent
// through the network
func (c *Client) Flush() error {
if c == nil {
return ErrNoClient
}
c.Lock()
defer c.Unlock()
c.flushUnsafe()
c.sender.flush()
return nil
}
// flush the current buffer. Lock must be held by caller.
// flushed buffer written to the network asynchronously.
func (c *Client) flushUnsafe() {
if len(c.buffer.bytes()) > 0 {
c.sender.send(c.buffer)
c.buffer = c.bufferPool.borrowBuffer()
}
}
func (c *Client) shouldSample(rate float64) bool {
if rate < 1 && rand.Float64() > rate {
return true
}
return false
}
func (c *Client) globalTags() []string {
if c != nil {
return c.Tags
}
return nil
}
func (c *Client) namespace() string {
if c != nil {
return c.Namespace
}
return ""
}
func (c *Client) writeMetricUnsafe(m metric) error {
switch m.metricType {
case gauge:
return c.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
case count:
return c.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate)
case histogram:
return c.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
case distribution:
return c.buffer.writeDistribution(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
case set:
return c.buffer.writeSet(m.namespace, m.globalTags, m.name, m.svalue, m.tags, m.rate)
case timing:
return c.buffer.writeTiming(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
case event:
return c.buffer.writeEvent(*m.evalue, m.globalTags)
case serviceCheck:
return c.buffer.writeServiceCheck(*m.scvalue, m.globalTags)
default:
return nil
}
}
func (c *Client) addMetric(m metric) error {
if c == nil {
return ErrNoClient
}
if c.shouldSample(m.rate) {
return nil
}
c.Lock()
var err error
if err = c.writeMetricUnsafe(m); err == errBufferFull {
c.flushUnsafe()
err = c.writeMetricUnsafe(m)
}
c.Unlock()
return err
}
// Gauge measures the value of a metric at a particular time.
func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error {
return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate})
}
// Count tracks how many times something happened per second.
func (c *Client) Count(name string, value int64, tags []string, rate float64) error {
return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: count, name: name, ivalue: value, tags: tags, rate: rate})
}
// Histogram tracks the statistical distribution of a set of values on each host.
func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error {
return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate})
}
// Distribution tracks the statistical distribution of a set of values across your infrastructure.
func (c *Client) Distribution(name string, value float64, tags []string, rate float64) error {
return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate})
}
// Decr is just Count of -1
func (c *Client) Decr(name string, tags []string, rate float64) error {
return c.Count(name, -1, tags, rate)
}
// Incr is just Count of 1
func (c *Client) Incr(name string, tags []string, rate float64) error {
return c.Count(name, 1, tags, rate)
}
// Set counts the number of unique elements in a group.
func (c *Client) Set(name string, value string, tags []string, rate float64) error {
return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: set, name: name, svalue: value, tags: tags, rate: rate})
}
// Timing sends timing information, it is an alias for TimeInMilliseconds
func (c *Client) Timing(name string, value time.Duration, tags []string, rate float64) error {
return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate)
}
// TimeInMilliseconds sends timing information in milliseconds.
// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error {
return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: timing, name: name, fvalue: value, tags: tags, rate: rate})
}
// Event sends the provided Event.
func (c *Client) Event(e *Event) error {
return c.addMetric(metric{globalTags: c.globalTags(), metricType: event, evalue: e, rate: 1})
}
// SimpleEvent sends an event with the provided title and text.
func (c *Client) SimpleEvent(title, text string) error {
e := NewEvent(title, text)
return c.Event(e)
}
// ServiceCheck sends the provided ServiceCheck.
func (c *Client) ServiceCheck(sc *ServiceCheck) error {
return c.addMetric(metric{globalTags: c.globalTags(), metricType: serviceCheck, scvalue: sc, rate: 1})
}
// SimpleServiceCheck sends an serviceCheck with the provided name and status.
func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error {
sc := NewServiceCheck(name, status)
return c.ServiceCheck(sc)
}
// Close the client connection.
func (c *Client) Close() error {
if c == nil {
return ErrNoClient
}
select {
case c.stop <- struct{}{}:
default:
}
c.Flush()
return c.sender.close()
}