Making the stats collection interval and number of data points to keep in memory configurable

This commit is contained in:
Diptanu Choudhury
2016-05-24 22:30:10 -07:00
parent cf8861e3e6
commit bf6c034fec
9 changed files with 117 additions and 17 deletions

View File

@@ -72,10 +72,6 @@ const (
// consulSyncInterval is the interval at which the client syncs with consul
// to remove services and checks which are no longer valid
consulSyncInterval = 15 * time.Second
// hostStatsCollectorIntv is the interval on which we collect host resource
// usage stats
hostStatsCollectorIntv = 1 * time.Second
)
// DefaultConfig returns the default configuration
@@ -142,7 +138,7 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Create a logger
logger := log.New(cfg.LogOutput, "", log.LstdFlags)
resourceUsage, err := stats.NewRingBuff(60)
resourceUsage, err := stats.NewRingBuff(cfg.StatsDataPoints)
if err != nil {
return nil, err
}
@@ -1284,7 +1280,7 @@ func (c *Client) syncConsul() {
// collectHostStats collects host resource usage stats periodically
func (c *Client) collectHostStats() {
next := time.NewTimer(hostStatsCollectorIntv)
next := time.NewTimer(c.config.StatsCollectionInterval)
for {
select {
case <-next.C:
@@ -1296,7 +1292,7 @@ func (c *Client) collectHostStats() {
c.resourceUsageLock.RLock()
c.resourceUsage.Enqueue(ru)
c.resourceUsageLock.RUnlock()
next.Reset(1 * time.Second)
next.Reset(c.config.StatsCollectionInterval)
case <-c.shutdownCh:
next.Stop()
return

View File

@@ -112,6 +112,14 @@ type Config struct {
// ConsulConfig is the configuration to connect with Consul Agent
ConsulConfig *consul.ConsulConfig
// StatsDataPoints is the number of resource usage data points the Nomad
// client keeps in memory
StatsDataPoints int
// StatsCollectionInterval is the interval at which the Nomad client
// collects resource usage stats
StatsCollectionInterval time.Duration
}
func (c *Config) Copy() *Config {

View File

@@ -33,10 +33,6 @@ const (
// killFailureLimit is how many times we will attempt to kill a task before
// giving up and potentially leaking resources.
killFailureLimit = 5
// statsCollectionIntv is the time interval at which the task runner
// collects resource usage statistics from the driver
statsCollectionIntv = 1 * time.Second
)
// TaskStatsReporter exposes APIs to query resource usage of a Task
@@ -100,7 +96,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
}
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
resourceUsage, err := stats.NewRingBuff(60)
resourceUsage, err := stats.NewRingBuff(config.StatsDataPoints)
if err != nil {
logger.Printf("[ERR] client: can't create resource usage buffer: %v", err)
return nil
@@ -454,14 +450,14 @@ func (r *TaskRunner) startTask() error {
r.handleLock.Lock()
r.handle = handle
r.handleLock.Unlock()
go r.monitorResourceUsage()
go r.collectResourceUsageStats()
return nil
}
// monitorUsage starts collecting resource usage stats of a Task
func (r *TaskRunner) monitorResourceUsage() {
// collectResourceUsageStats starts collecting resource usage stats of a Task
func (r *TaskRunner) collectResourceUsageStats() {
for {
next := time.NewTimer(statsCollectionIntv)
next := time.NewTimer(r.config.StatsCollectionInterval)
select {
case <-next.C:
ru, err := r.handle.Stats()
@@ -471,7 +467,7 @@ func (r *TaskRunner) monitorResourceUsage() {
r.resourceUsageLock.Lock()
r.resourceUsage.Enqueue(ru)
r.resourceUsageLock.Unlock()
next.Reset(statsCollectionIntv)
next.Reset(r.config.StatsCollectionInterval)
case <-r.handle.WaitCh():
next.Stop()
return

View File

@@ -274,6 +274,9 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
conf.ConsulConfig = a.consulConfig
conf.StatsDataPoints = a.config.Client.StatsConfig.DataPoints
conf.StatsCollectionInterval = a.config.Client.StatsConfig.collectionInterval
return conf, nil
}

View File

@@ -188,6 +188,14 @@ func (c *Command) readConfig() *Config {
}
config.Server.retryInterval = dur
// Parse the stats collection interval
dur, err = time.ParseDuration(config.Client.StatsConfig.CollectionInterval)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing stats collection interval: %s", err))
return nil
}
config.Client.StatsConfig.collectionInterval = dur
// Check that the server is running in at least one mode.
if !(config.Server.Enabled || config.Client.Enabled) {
c.Ui.Error("Must specify either server, client or dev mode for the agent.")

View File

@@ -45,6 +45,10 @@ client {
client_min_port = 1000
client_max_port = 2000
max_kill_timeout = "10s"
stats {
data_points = 35
collection_interval = "5s"
}
}
server {
enabled = true

View File

@@ -179,6 +179,17 @@ type ConsulConfig struct {
ClientAutoJoin bool `mapstructure:"client_auto_join"`
}
// StatsConfig determines behavior of resource usage stats collections
type StatsConfig struct {
// DataPoints is the number of data points Nomad client stores in-memory
DataPoints int `mapstructure:"data_points"`
// CollectionInterval is the interval of resource usage stats collection
CollectionInterval string `mapstructure:"collection_interval"`
collectionInterval time.Duration `mapstructure:"_"`
}
// ClientConfig is configuration specific to the client mode
type ClientConfig struct {
// Enabled controls if we are a client
@@ -226,6 +237,10 @@ type ClientConfig struct {
// be used to target a certain utilization or to prevent Nomad from using a
// particular set of ports.
Reserved *Resources `mapstructure:"reserved"`
// StatsConfig determines behavior of resource usage stats collection in
// Nomad client
StatsConfig *StatsConfig `mapstructure:"stats"`
}
// ServerConfig is configuration specific to the server mode
@@ -435,6 +450,10 @@ func DefaultConfig() *Config {
ClientMinPort: 14000,
ClientMaxPort: 14512,
Reserved: &Resources{},
StatsConfig: &StatsConfig{
DataPoints: 60,
CollectionInterval: "1s",
},
},
Server: &ServerConfig{
Enabled: false,
@@ -680,6 +699,9 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
if b.Reserved != nil {
result.Reserved = result.Reserved.Merge(b.Reserved)
}
if b.StatsConfig != nil {
result.StatsConfig = result.StatsConfig.Merge(b.StatsConfig)
}
// Add the servers
result.Servers = append(result.Servers, b.Servers...)
@@ -855,6 +877,18 @@ func (r *Resources) Merge(b *Resources) *Resources {
return &result
}
func (s *StatsConfig) Merge(b *StatsConfig) *StatsConfig {
result := *s
if b.DataPoints != 0 {
result.DataPoints = b.DataPoints
}
if b.CollectionInterval != "" {
result.CollectionInterval = b.CollectionInterval
result.collectionInterval = b.collectionInterval
}
return &result
}
// LoadConfig loads the configuration at the given path, regardless if
// its a file or directory.
func LoadConfig(path string) (*Config, error) {

View File

@@ -319,6 +319,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
"client_max_port",
"client_min_port",
"reserved",
"stats",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err
@@ -332,6 +333,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
delete(m, "options")
delete(m, "meta")
delete(m, "reserved")
delete(m, "stats")
var config ClientConfig
if err := mapstructure.WeakDecode(m, &config); err != nil {
@@ -373,10 +375,55 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
}
}
// Parse stats config
if o := listVal.Filter("stats"); len(o.Items) > 0 {
if err := parseStats(&config.StatsConfig, o); err != nil {
return multierror.Prefix(err, "stats ->")
}
}
*result = &config
return nil
}
func parseStats(result **StatsConfig, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'stats' block allowed")
}
// Get our stats object
obj := list.Items[0]
var listVal *ast.ObjectList
if ot, ok := obj.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("client value: should be an object")
}
// check for invalid keys
valid := []string{
"data_points",
"collection_interval",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
var stats StatsConfig
if err := mapstructure.WeakDecode(m, &stats); err != nil {
return err
}
*result = &stats
return nil
}
func parseReserved(result **Resources, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {

View File

@@ -63,6 +63,10 @@ func TestConfig_Parse(t *testing.T) {
ReservedPorts: "1,100,10-12",
ParsedReservedPorts: []int{1, 10, 11, 12, 100},
},
StatsConfig: &StatsConfig{
DataPoints: 35,
CollectionInterval: "5s",
},
},
Server: &ServerConfig{
Enabled: true,