diff --git a/api/nodes.go b/api/nodes.go index 9a418c2c5..c9ade5506 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -89,12 +89,11 @@ func (n *Nodes) Stats(nodeID string, q *QueryOptions) (*HostStats, error) { if err != nil { return nil, err } - var resp []HostStats + var resp HostStats if _, err := client.query("/v1/client/stats", &resp, nil); err != nil { return nil, err } - - return &resp[0], nil + return &resp, nil } // Node is used to deserialize a node entry. diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 4c3214552..dc649f57c 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -30,6 +30,10 @@ const ( // AllocStateUpdater is used to update the status of an allocation type AllocStateUpdater func(alloc *structs.Allocation) +type AllocStatsReporter interface { + LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) +} + // AllocRunner is used to wrap an allocation and provide the execution context. type AllocRunner struct { config *config.Config @@ -473,6 +477,12 @@ func (r *AllocRunner) Update(update *structs.Allocation) { } } +// StatsReporter returns an interface to query resource usage statistics of an +// allocation +func (r *AllocRunner) StatsReporter() AllocStatsReporter { + return r +} + // LatestAllocStats returns the latest allocation stats. If the optional taskFilter is set // the allocation stats will only include the given task. func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) { @@ -524,13 +534,6 @@ func sumTaskResourceUsage(usages []*cstructs.TaskResourceUsage) *cstructs.Resour return summed } -// AllocStatsSince returns the allocation stats collected since the passed unix -// nanosecond timestamp. If the optional taskFilter is set the allocation stats -// will only include the given task. -func (r *AllocRunner) AllocStatsSince(taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error) { - return nil, nil -} - // shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and // checks if the current running allocation is behind and should be updated. func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool { diff --git a/client/client.go b/client/client.go index 4ec1314fd..020c5bbba 100644 --- a/client/client.go +++ b/client/client.go @@ -26,8 +26,6 @@ import ( "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/hashstructure" - - cstructs "github.com/hashicorp/nomad/client/structs" ) const ( @@ -83,20 +81,12 @@ const ( // ClientStatsReporter exposes all the APIs related to resource usage of a Nomad // Client type ClientStatsReporter interface { - // LatestAllocStats returns the latest allocation resource usage optionally - // filtering by task name - LatestAllocStats(allocID, taskFilter string) (*cstructs.AllocResourceUsage, error) - - // AllocStatsSince returns the allocation resource usage collected since the - // passed timestamp optionally filtering by task name. - AllocStatsSince(allocID, taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error) + // GetAllocStats returns the AllocStatsReporter for the passed allocation. + // If it does not exist an error is reported. + GetAllocStats(allocID string) (AllocStatsReporter, error) // LatestHostStats returns the latest resource usage stats for the host LatestHostStats() *stats.HostStats - - // HostStatsSince returns the collect resource usage stats for the host - // since the passed unix nanosecond time stamp - HostStatsSince(since int64) []*stats.HostStats } // Client is used to implement the client interaction with Nomad. Clients @@ -145,7 +135,7 @@ type Client struct { // HostStatsCollector collects host resource usage stats hostStatsCollector *stats.HostStatsCollector - resourceUsage *stats.RingBuff + resourceUsage *stats.HostStats resourceUsageLock sync.RWMutex shutdown bool @@ -158,11 +148,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer) (*Client, error) // Create a logger logger := log.New(cfg.LogOutput, "", log.LstdFlags) - resourceUsage, err := stats.NewRingBuff(cfg.StatsDataPoints) - if err != nil { - return nil, err - } - // Create the client c := &Client{ config: cfg, @@ -171,7 +156,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer) (*Client, error) connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), logger: logger, hostStatsCollector: stats.NewHostStatsCollector(), - resourceUsage: resourceUsage, allocs: make(map[string]*AllocRunner), allocUpdates: make(chan *structs.Allocation, 64), shutdownCh: make(chan struct{}), @@ -407,72 +391,21 @@ func (c *Client) StatsReporter() ClientStatsReporter { return c } -// LatestAllocStats returns the latest allocation resource usage optionally -// filtering by task name -func (c *Client) LatestAllocStats(allocID, taskFilter string) (*cstructs.AllocResourceUsage, error) { +func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) { c.allocLock.RLock() + defer c.allocLock.RUnlock() ar, ok := c.allocs[allocID] if !ok { return nil, fmt.Errorf("unknown allocation ID %q", allocID) } - c.allocLock.RUnlock() - return ar.LatestAllocStats(taskFilter) -} - -// AllocStatsSince returns the allocation resource usage collected since the -// passed timestamp optionally filtering by task name. -func (c *Client) AllocStatsSince(allocID, taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error) { - c.allocLock.RLock() - ar, ok := c.allocs[allocID] - if !ok { - return nil, fmt.Errorf("unknown allocation ID %q", allocID) - } - c.allocLock.RUnlock() - return ar.AllocStatsSince(taskFilter, since) + return ar.StatsReporter(), nil } // HostStats returns all the stats related to a Nomad client func (c *Client) LatestHostStats() *stats.HostStats { c.resourceUsageLock.RLock() defer c.resourceUsageLock.RUnlock() - val := c.resourceUsage.Peek() - ru, _ := val.(*stats.HostStats) - return ru -} - -func (c *Client) HostStatsSince(since int64) []*stats.HostStats { - c.resourceUsageLock.RLock() - defer c.resourceUsageLock.RUnlock() - - values := c.resourceUsage.Values() - low := 0 - high := len(values) - 1 - var idx int - - for { - mid := (low + high) >> 1 - midVal, _ := values[mid].(*stats.HostStats) - if midVal.Timestamp < since { - low = mid + 1 - } else if midVal.Timestamp > since { - high = mid - 1 - } else if midVal.Timestamp == since { - idx = mid - break - } - if low > high { - idx = low - break - } - } - values = values[idx:] - ts := make([]*stats.HostStats, len(values)) - for index, val := range values { - ru, _ := val.(*stats.HostStats) - ts[index] = ru - } - return ts - + return c.resourceUsage } // GetAllocFS returns the AllocFS interface for the alloc dir of an allocation @@ -1458,9 +1391,9 @@ func (c *Client) collectHostStats() { continue } - c.resourceUsageLock.RLock() - c.resourceUsage.Enqueue(ru) - c.resourceUsageLock.RUnlock() + c.resourceUsageLock.Lock() + c.resourceUsage = ru + c.resourceUsageLock.Unlock() c.emitStats(ru) case <-c.shutdownCh: return diff --git a/client/config/config.go b/client/config/config.go index d2ae39e4d..65b4affd3 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -114,10 +114,6 @@ type Config struct { // ConsulConfig is this Agent's Consul configuration ConsulConfig *config.ConsulConfig - // StatsDataPoints is the number of resource usage data points the Nomad - // client keeps in memory - StatsDataPoints int - // StatsCollectionInterval is the interval at which the Nomad client // collects resource usage stats StatsCollectionInterval time.Duration @@ -143,7 +139,6 @@ func DefaultConfig() *Config { }, LogOutput: os.Stderr, Region: "global", - StatsDataPoints: 60, StatsCollectionInterval: 1 * time.Second, } } diff --git a/client/stats/ring.go b/client/stats/ring.go deleted file mode 100644 index 8ee8ac679..000000000 --- a/client/stats/ring.go +++ /dev/null @@ -1,67 +0,0 @@ -package stats - -import ( - "fmt" - "sync" -) - -var ( - // The default size of the ring buffer - defaultCap = 60 -) - -// RingBuff is a data structure which is a circular list based on slices -type RingBuff struct { - head int - buff []interface{} - - lock sync.RWMutex -} - -// NewRingBuff creates a new ring buffer of the specified size -func NewRingBuff(capacity int) (*RingBuff, error) { - if capacity < 1 { - return nil, fmt.Errorf("can not create a ring buffer with capacity: %v", capacity) - } - return &RingBuff{buff: make([]interface{}, 0, capacity), head: -1}, nil -} - -// Enqueue queues a new value in the ring buffer. This operation would -// over-write an older value if the list has reached it's capacity -func (r *RingBuff) Enqueue(value interface{}) { - r.lock.Lock() - defer r.lock.Unlock() - if len(r.buff) < cap(r.buff) { - r.buff = append(r.buff, struct{}{}) - } - r.head += 1 - if r.head == cap(r.buff) { - r.head = 0 - } - r.buff[r.head] = value -} - -// Peek returns the last value enqueued in the ring buffer -func (r *RingBuff) Peek() interface{} { - r.lock.RLock() - defer r.lock.RUnlock() - if r.head == -1 { - return nil - } - return r.buff[r.head] -} - -// Values returns all the values in the buffer. -func (r *RingBuff) Values() []interface{} { - r.lock.RLock() - defer r.lock.RUnlock() - if r.head == len(r.buff)-1 { - vals := make([]interface{}, len(r.buff)) - copy(vals, r.buff) - return vals - } - - slice1 := r.buff[r.head+1:] - slice2 := r.buff[:r.head+1] - return append(slice1, slice2...) -} diff --git a/client/stats/ring_test.go b/client/stats/ring_test.go deleted file mode 100644 index 11e4028e2..000000000 --- a/client/stats/ring_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package stats - -import ( - "testing" -) - -func TestRingBuffInvalid(t *testing.T) { - if _, err := NewRingBuff(0); err == nil { - t.Fatalf("expected err") - } -} - -func TestRingBuffEnqueue(t *testing.T) { - rb, err := NewRingBuff(3) - if err != nil { - t.Fatalf("err: %v", err) - } - - rb.Enqueue(1) - rb.Enqueue(2) - rb.Enqueue(3) - if val := rb.Peek(); val != 3 { - t.Fatalf("expected: %v, actual: %v", 3, val) - } - - rb.Enqueue(4) - rb.Enqueue(5) - if val := rb.Peek(); val != 5 { - t.Fatalf("expected: %v, actual: %v", 5, val) - } -} - -func TestRingBuffValues(t *testing.T) { - rb, err := NewRingBuff(3) - if err != nil { - t.Fatalf("err: %v", err) - } - rb.Enqueue(1) - rb.Enqueue(2) - rb.Enqueue(3) - rb.Enqueue(4) - - expected := []interface{}{2, 3, 4} - if !sliceEq(expected, rb.Values()) { - t.Fatalf("expected: %v, actual: %v", expected, rb.Values()) - } - - rb.Enqueue(5) - expected = []interface{}{3, 4, 5} - if !sliceEq(expected, rb.Values()) { - t.Fatalf("expected: %v, actual: %v", expected, rb.Values()) - } - - rb.Enqueue(6) - expected = []interface{}{4, 5, 6} - if !sliceEq(expected, rb.Values()) { - t.Fatalf("expected: %v, actual: %v", expected, rb.Values()) - } - -} - -func sliceEq(slice1, slice2 []interface{}) bool { - - if slice1 == nil && slice2 == nil { - return true - } - - if slice1 == nil || slice2 == nil { - return false - } - - if len(slice1) != len(slice2) { - return false - } - - for i := range slice1 { - if slice1[i] != slice2[i] { - return false - } - } - - return true -} diff --git a/client/task_runner.go b/client/task_runner.go index 310216a23..26679803f 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -17,7 +17,6 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/getter" - "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/client/driver/env" @@ -52,7 +51,7 @@ type TaskRunner struct { running bool runningLock sync.Mutex - resourceUsage *stats.RingBuff + resourceUsage *cstructs.TaskResourceUsage resourceUsageLock sync.RWMutex task *structs.Task @@ -99,18 +98,11 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, } restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type) - resourceUsage, err := stats.NewRingBuff(config.StatsDataPoints) - if err != nil { - logger.Printf("[ERR] client: can't create resource usage buffer: %v", err) - return nil - } - tc := &TaskRunner{ config: config, updater: updater, logger: logger, restartTracker: restartTracker, - resourceUsage: resourceUsage, ctx: ctx, alloc: alloc, task: task, @@ -502,7 +494,7 @@ func (r *TaskRunner) collectResourceUsageStats(stopCollection <-chan struct{}) { } r.resourceUsageLock.Lock() - r.resourceUsage.Enqueue(ru) + r.resourceUsage = ru r.resourceUsageLock.Unlock() r.emitStats(ru) case <-stopCollection: @@ -523,45 +515,7 @@ func (r *TaskRunner) LatestResourceUsage() *cstructs.TaskResourceUsage { return nil } - val := r.resourceUsage.Peek() - ru, _ := val.(*cstructs.TaskResourceUsage) - return ru -} - -// ResourceUsageSince returns the list of all the resource utilization datapoints -// collected since the given timestamp -func (r *TaskRunner) ResourceUsageSince(since int64) []*cstructs.TaskResourceUsage { - r.resourceUsageLock.RLock() - defer r.resourceUsageLock.RUnlock() - - values := r.resourceUsage.Values() - low := 0 - high := len(values) - 1 - var idx int - - for { - mid := (low + high) / 2 - midVal, _ := values[mid].(*cstructs.TaskResourceUsage) - if midVal.Timestamp < since { - low = mid + 1 - } else if midVal.Timestamp > since { - high = mid - 1 - } else if midVal.Timestamp == since { - idx = mid - break - } - if low > high { - idx = low - break - } - } - values = values[idx:] - ts := make([]*cstructs.TaskResourceUsage, len(values)) - for index, val := range values { - ru, _ := val.(*cstructs.TaskResourceUsage) - ts[index] = ru - } - return ts + return r.resourceUsage } // handleUpdate takes an updated allocation and updates internal state to diff --git a/command/agent/agent.go b/command/agent/agent.go index 194e3848a..840144b29 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -341,7 +341,6 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) { conf.ConsulConfig = a.config.Consul - conf.StatsDataPoints = a.config.Client.StatsConfig.DataPoints conf.StatsCollectionInterval = a.config.Client.StatsConfig.collectionInterval return conf, nil diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 887f50229..ff3ee69be 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -1,9 +1,7 @@ package agent import ( - "fmt" "net/http" - "strconv" "strings" "github.com/hashicorp/nomad/nomad/structs" @@ -75,19 +73,13 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ } allocID := tokens[0] + // Get the stats reporter clientStats := s.agent.client.StatsReporter() - task := req.URL.Query().Get("task") - var since int - var err error - if sinceTime := req.URL.Query().Get("since"); sinceTime != "" { - since, err = strconv.Atoi(sinceTime) - if err != nil { - return nil, CodedError(400, fmt.Sprintf("can't read the since query parameter: %v", err)) - } + aStats, err := clientStats.GetAllocStats(allocID) + if err != nil { + return nil, err } - if since > 0 { - return clientStats.AllocStatsSince(allocID, task, int64(since)) - } - return clientStats.LatestAllocStats(allocID, task) + task := req.URL.Query().Get("task") + return aStats.LatestAllocStats(task) } diff --git a/command/agent/stats_endpoint.go b/command/agent/stats_endpoint.go index c725d373d..e7e6928ae 100644 --- a/command/agent/stats_endpoint.go +++ b/command/agent/stats_endpoint.go @@ -1,12 +1,6 @@ package agent -import ( - "fmt" - "net/http" - "strconv" - - "github.com/hashicorp/nomad/client/stats" -) +import "net/http" const ( invalidSinceErrPrefix = "can't read the since query parameter" @@ -17,20 +11,6 @@ func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Requ return nil, clientNotRunning } - var since int - var err error - ts := false - if sinceTime := req.URL.Query().Get("since"); sinceTime != "" { - ts = true - since, err = strconv.Atoi(sinceTime) - if err != nil { - return nil, CodedError(400, fmt.Sprintf("%s: %v", invalidSinceErrPrefix, err)) - } - } - clientStats := s.agent.client.StatsReporter() - if ts { - return clientStats.HostStatsSince(int64(since)), nil - } - return []*stats.HostStats{clientStats.LatestHostStats()}, nil + return clientStats.LatestHostStats(), nil }