only support latest and remove ring buffer

This commit is contained in:
Alex Dadgar
2016-06-12 09:32:38 -07:00
parent 1a7df4e7d2
commit 020f8b05d3
10 changed files with 34 additions and 329 deletions

View File

@@ -89,12 +89,11 @@ func (n *Nodes) Stats(nodeID string, q *QueryOptions) (*HostStats, error) {
if err != nil {
return nil, err
}
var resp []HostStats
var resp HostStats
if _, err := client.query("/v1/client/stats", &resp, nil); err != nil {
return nil, err
}
return &resp[0], nil
return &resp, nil
}
// Node is used to deserialize a node entry.

View File

@@ -30,6 +30,10 @@ const (
// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation)
type AllocStatsReporter interface {
LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error)
}
// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
config *config.Config
@@ -473,6 +477,12 @@ func (r *AllocRunner) Update(update *structs.Allocation) {
}
}
// StatsReporter returns an interface to query resource usage statistics of an
// allocation
func (r *AllocRunner) StatsReporter() AllocStatsReporter {
return r
}
// LatestAllocStats returns the latest allocation stats. If the optional taskFilter is set
// the allocation stats will only include the given task.
func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) {
@@ -524,13 +534,6 @@ func sumTaskResourceUsage(usages []*cstructs.TaskResourceUsage) *cstructs.Resour
return summed
}
// AllocStatsSince returns the allocation stats collected since the passed unix
// nanosecond timestamp. If the optional taskFilter is set the allocation stats
// will only include the given task.
func (r *AllocRunner) AllocStatsSince(taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error) {
return nil, nil
}
// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
// checks if the current running allocation is behind and should be updated.
func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool {

View File

@@ -26,8 +26,6 @@ import (
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/hashstructure"
cstructs "github.com/hashicorp/nomad/client/structs"
)
const (
@@ -83,20 +81,12 @@ const (
// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
// Client
type ClientStatsReporter interface {
// LatestAllocStats returns the latest allocation resource usage optionally
// filtering by task name
LatestAllocStats(allocID, taskFilter string) (*cstructs.AllocResourceUsage, error)
// AllocStatsSince returns the allocation resource usage collected since the
// passed timestamp optionally filtering by task name.
AllocStatsSince(allocID, taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error)
// GetAllocStats returns the AllocStatsReporter for the passed allocation.
// If it does not exist an error is reported.
GetAllocStats(allocID string) (AllocStatsReporter, error)
// LatestHostStats returns the latest resource usage stats for the host
LatestHostStats() *stats.HostStats
// HostStatsSince returns the collect resource usage stats for the host
// since the passed unix nanosecond time stamp
HostStatsSince(since int64) []*stats.HostStats
}
// Client is used to implement the client interaction with Nomad. Clients
@@ -145,7 +135,7 @@ type Client struct {
// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
resourceUsage *stats.RingBuff
resourceUsage *stats.HostStats
resourceUsageLock sync.RWMutex
shutdown bool
@@ -158,11 +148,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer) (*Client, error)
// Create a logger
logger := log.New(cfg.LogOutput, "", log.LstdFlags)
resourceUsage, err := stats.NewRingBuff(cfg.StatsDataPoints)
if err != nil {
return nil, err
}
// Create the client
c := &Client{
config: cfg,
@@ -171,7 +156,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer) (*Client, error)
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
hostStatsCollector: stats.NewHostStatsCollector(),
resourceUsage: resourceUsage,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
@@ -407,72 +391,21 @@ func (c *Client) StatsReporter() ClientStatsReporter {
return c
}
// LatestAllocStats returns the latest allocation resource usage optionally
// filtering by task name
func (c *Client) LatestAllocStats(allocID, taskFilter string) (*cstructs.AllocResourceUsage, error) {
func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
ar, ok := c.allocs[allocID]
if !ok {
return nil, fmt.Errorf("unknown allocation ID %q", allocID)
}
c.allocLock.RUnlock()
return ar.LatestAllocStats(taskFilter)
}
// AllocStatsSince returns the allocation resource usage collected since the
// passed timestamp optionally filtering by task name.
func (c *Client) AllocStatsSince(allocID, taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error) {
c.allocLock.RLock()
ar, ok := c.allocs[allocID]
if !ok {
return nil, fmt.Errorf("unknown allocation ID %q", allocID)
}
c.allocLock.RUnlock()
return ar.AllocStatsSince(taskFilter, since)
return ar.StatsReporter(), nil
}
// HostStats returns all the stats related to a Nomad client
func (c *Client) LatestHostStats() *stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()
val := c.resourceUsage.Peek()
ru, _ := val.(*stats.HostStats)
return ru
}
func (c *Client) HostStatsSince(since int64) []*stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()
values := c.resourceUsage.Values()
low := 0
high := len(values) - 1
var idx int
for {
mid := (low + high) >> 1
midVal, _ := values[mid].(*stats.HostStats)
if midVal.Timestamp < since {
low = mid + 1
} else if midVal.Timestamp > since {
high = mid - 1
} else if midVal.Timestamp == since {
idx = mid
break
}
if low > high {
idx = low
break
}
}
values = values[idx:]
ts := make([]*stats.HostStats, len(values))
for index, val := range values {
ru, _ := val.(*stats.HostStats)
ts[index] = ru
}
return ts
return c.resourceUsage
}
// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
@@ -1458,9 +1391,9 @@ func (c *Client) collectHostStats() {
continue
}
c.resourceUsageLock.RLock()
c.resourceUsage.Enqueue(ru)
c.resourceUsageLock.RUnlock()
c.resourceUsageLock.Lock()
c.resourceUsage = ru
c.resourceUsageLock.Unlock()
c.emitStats(ru)
case <-c.shutdownCh:
return

View File

@@ -114,10 +114,6 @@ type Config struct {
// ConsulConfig is this Agent's Consul configuration
ConsulConfig *config.ConsulConfig
// StatsDataPoints is the number of resource usage data points the Nomad
// client keeps in memory
StatsDataPoints int
// StatsCollectionInterval is the interval at which the Nomad client
// collects resource usage stats
StatsCollectionInterval time.Duration
@@ -143,7 +139,6 @@ func DefaultConfig() *Config {
},
LogOutput: os.Stderr,
Region: "global",
StatsDataPoints: 60,
StatsCollectionInterval: 1 * time.Second,
}
}

View File

@@ -1,67 +0,0 @@
package stats
import (
"fmt"
"sync"
)
var (
// The default size of the ring buffer
defaultCap = 60
)
// RingBuff is a data structure which is a circular list based on slices
type RingBuff struct {
head int
buff []interface{}
lock sync.RWMutex
}
// NewRingBuff creates a new ring buffer of the specified size
func NewRingBuff(capacity int) (*RingBuff, error) {
if capacity < 1 {
return nil, fmt.Errorf("can not create a ring buffer with capacity: %v", capacity)
}
return &RingBuff{buff: make([]interface{}, 0, capacity), head: -1}, nil
}
// Enqueue queues a new value in the ring buffer. This operation would
// over-write an older value if the list has reached it's capacity
func (r *RingBuff) Enqueue(value interface{}) {
r.lock.Lock()
defer r.lock.Unlock()
if len(r.buff) < cap(r.buff) {
r.buff = append(r.buff, struct{}{})
}
r.head += 1
if r.head == cap(r.buff) {
r.head = 0
}
r.buff[r.head] = value
}
// Peek returns the last value enqueued in the ring buffer
func (r *RingBuff) Peek() interface{} {
r.lock.RLock()
defer r.lock.RUnlock()
if r.head == -1 {
return nil
}
return r.buff[r.head]
}
// Values returns all the values in the buffer.
func (r *RingBuff) Values() []interface{} {
r.lock.RLock()
defer r.lock.RUnlock()
if r.head == len(r.buff)-1 {
vals := make([]interface{}, len(r.buff))
copy(vals, r.buff)
return vals
}
slice1 := r.buff[r.head+1:]
slice2 := r.buff[:r.head+1]
return append(slice1, slice2...)
}

View File

@@ -1,83 +0,0 @@
package stats
import (
"testing"
)
func TestRingBuffInvalid(t *testing.T) {
if _, err := NewRingBuff(0); err == nil {
t.Fatalf("expected err")
}
}
func TestRingBuffEnqueue(t *testing.T) {
rb, err := NewRingBuff(3)
if err != nil {
t.Fatalf("err: %v", err)
}
rb.Enqueue(1)
rb.Enqueue(2)
rb.Enqueue(3)
if val := rb.Peek(); val != 3 {
t.Fatalf("expected: %v, actual: %v", 3, val)
}
rb.Enqueue(4)
rb.Enqueue(5)
if val := rb.Peek(); val != 5 {
t.Fatalf("expected: %v, actual: %v", 5, val)
}
}
func TestRingBuffValues(t *testing.T) {
rb, err := NewRingBuff(3)
if err != nil {
t.Fatalf("err: %v", err)
}
rb.Enqueue(1)
rb.Enqueue(2)
rb.Enqueue(3)
rb.Enqueue(4)
expected := []interface{}{2, 3, 4}
if !sliceEq(expected, rb.Values()) {
t.Fatalf("expected: %v, actual: %v", expected, rb.Values())
}
rb.Enqueue(5)
expected = []interface{}{3, 4, 5}
if !sliceEq(expected, rb.Values()) {
t.Fatalf("expected: %v, actual: %v", expected, rb.Values())
}
rb.Enqueue(6)
expected = []interface{}{4, 5, 6}
if !sliceEq(expected, rb.Values()) {
t.Fatalf("expected: %v, actual: %v", expected, rb.Values())
}
}
func sliceEq(slice1, slice2 []interface{}) bool {
if slice1 == nil && slice2 == nil {
return true
}
if slice1 == nil || slice2 == nil {
return false
}
if len(slice1) != len(slice2) {
return false
}
for i := range slice1 {
if slice1[i] != slice2[i] {
return false
}
}
return true
}

View File

@@ -17,7 +17,6 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/client/driver/env"
@@ -52,7 +51,7 @@ type TaskRunner struct {
running bool
runningLock sync.Mutex
resourceUsage *stats.RingBuff
resourceUsage *cstructs.TaskResourceUsage
resourceUsageLock sync.RWMutex
task *structs.Task
@@ -99,18 +98,11 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
}
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
resourceUsage, err := stats.NewRingBuff(config.StatsDataPoints)
if err != nil {
logger.Printf("[ERR] client: can't create resource usage buffer: %v", err)
return nil
}
tc := &TaskRunner{
config: config,
updater: updater,
logger: logger,
restartTracker: restartTracker,
resourceUsage: resourceUsage,
ctx: ctx,
alloc: alloc,
task: task,
@@ -502,7 +494,7 @@ func (r *TaskRunner) collectResourceUsageStats(stopCollection <-chan struct{}) {
}
r.resourceUsageLock.Lock()
r.resourceUsage.Enqueue(ru)
r.resourceUsage = ru
r.resourceUsageLock.Unlock()
r.emitStats(ru)
case <-stopCollection:
@@ -523,45 +515,7 @@ func (r *TaskRunner) LatestResourceUsage() *cstructs.TaskResourceUsage {
return nil
}
val := r.resourceUsage.Peek()
ru, _ := val.(*cstructs.TaskResourceUsage)
return ru
}
// ResourceUsageSince returns the list of all the resource utilization datapoints
// collected since the given timestamp
func (r *TaskRunner) ResourceUsageSince(since int64) []*cstructs.TaskResourceUsage {
r.resourceUsageLock.RLock()
defer r.resourceUsageLock.RUnlock()
values := r.resourceUsage.Values()
low := 0
high := len(values) - 1
var idx int
for {
mid := (low + high) / 2
midVal, _ := values[mid].(*cstructs.TaskResourceUsage)
if midVal.Timestamp < since {
low = mid + 1
} else if midVal.Timestamp > since {
high = mid - 1
} else if midVal.Timestamp == since {
idx = mid
break
}
if low > high {
idx = low
break
}
}
values = values[idx:]
ts := make([]*cstructs.TaskResourceUsage, len(values))
for index, val := range values {
ru, _ := val.(*cstructs.TaskResourceUsage)
ts[index] = ru
}
return ts
return r.resourceUsage
}
// handleUpdate takes an updated allocation and updates internal state to

View File

@@ -341,7 +341,6 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
conf.ConsulConfig = a.config.Consul
conf.StatsDataPoints = a.config.Client.StatsConfig.DataPoints
conf.StatsCollectionInterval = a.config.Client.StatsConfig.collectionInterval
return conf, nil

View File

@@ -1,9 +1,7 @@
package agent
import (
"fmt"
"net/http"
"strconv"
"strings"
"github.com/hashicorp/nomad/nomad/structs"
@@ -75,19 +73,13 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
}
allocID := tokens[0]
// Get the stats reporter
clientStats := s.agent.client.StatsReporter()
task := req.URL.Query().Get("task")
var since int
var err error
if sinceTime := req.URL.Query().Get("since"); sinceTime != "" {
since, err = strconv.Atoi(sinceTime)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("can't read the since query parameter: %v", err))
}
aStats, err := clientStats.GetAllocStats(allocID)
if err != nil {
return nil, err
}
if since > 0 {
return clientStats.AllocStatsSince(allocID, task, int64(since))
}
return clientStats.LatestAllocStats(allocID, task)
task := req.URL.Query().Get("task")
return aStats.LatestAllocStats(task)
}

View File

@@ -1,12 +1,6 @@
package agent
import (
"fmt"
"net/http"
"strconv"
"github.com/hashicorp/nomad/client/stats"
)
import "net/http"
const (
invalidSinceErrPrefix = "can't read the since query parameter"
@@ -17,20 +11,6 @@ func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Requ
return nil, clientNotRunning
}
var since int
var err error
ts := false
if sinceTime := req.URL.Query().Get("since"); sinceTime != "" {
ts = true
since, err = strconv.Atoi(sinceTime)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("%s: %v", invalidSinceErrPrefix, err))
}
}
clientStats := s.agent.client.StatsReporter()
if ts {
return clientStats.HostStatsSince(int64(since)), nil
}
return []*stats.HostStats{clientStats.LatestHostStats()}, nil
return clientStats.LatestHostStats(), nil
}