mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 17:35:43 +03:00
Reporting time series of stats
This commit is contained in:
@@ -12,7 +12,6 @@ import (
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
@@ -29,6 +28,11 @@ const (
|
||||
// AllocStateUpdater is used to update the status of an allocation
|
||||
type AllocStateUpdater func(alloc *structs.Allocation)
|
||||
|
||||
type AllocStatsReporter interface {
|
||||
AllocStats() map[string]TaskStatsReporter
|
||||
TaskStats(task string) (TaskStatsReporter, error)
|
||||
}
|
||||
|
||||
// AllocRunner is used to wrap an allocation and provide the execution context.
|
||||
type AllocRunner struct {
|
||||
config *config.Config
|
||||
@@ -472,21 +476,25 @@ func (r *AllocRunner) Update(update *structs.Allocation) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *AllocRunner) ResourceUsage() map[string]*cstructs.TaskResourceUsage {
|
||||
res := make(map[string]*cstructs.TaskResourceUsage)
|
||||
func (r *AllocRunner) StatsReporter() AllocStatsReporter {
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *AllocRunner) AllocStats() map[string]TaskStatsReporter {
|
||||
res := make(map[string]TaskStatsReporter)
|
||||
for task, tr := range r.tasks {
|
||||
res[task] = tr.ResourceUsage()
|
||||
res[task] = tr.StatsReporter()
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (r *AllocRunner) ResourceUsageOfTask(task string) (*cstructs.TaskResourceUsage, error) {
|
||||
func (r *AllocRunner) TaskStats(task string) (TaskStatsReporter, error) {
|
||||
tr, ok := r.tasks[task]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("task %q not running", task)
|
||||
}
|
||||
|
||||
return tr.ResourceUsage(), nil
|
||||
return tr.StatsReporter(), nil
|
||||
}
|
||||
|
||||
// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -395,20 +394,12 @@ func (c *Client) Node() *structs.Node {
|
||||
return c.config.Node
|
||||
}
|
||||
|
||||
func (c *Client) ResourceUsageOfAlloc(alloc string) (map[string]*cstructs.TaskResourceUsage, error) {
|
||||
func (c *Client) AllocStats(alloc string) (AllocStatsReporter, error) {
|
||||
ar, ok := c.allocs[alloc]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("allocation: %q not running on this client", alloc)
|
||||
}
|
||||
return ar.ResourceUsage(), nil
|
||||
}
|
||||
|
||||
func (c *Client) ResourceUsageOfTask(alloc string, task string) (*cstructs.TaskResourceUsage, error) {
|
||||
ar, ok := c.allocs[alloc]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("allocation: %q not running on this client", alloc)
|
||||
}
|
||||
return ar.ResourceUsageOfTask(task)
|
||||
return ar.StatsReporter(), nil
|
||||
}
|
||||
|
||||
// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/opencontainers/runc/libcontainer/cgroups"
|
||||
@@ -157,7 +158,7 @@ func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) {
|
||||
ThrottledPeriods: stats.CpuStats.ThrottlingData.ThrottledPeriods,
|
||||
ThrottledTime: stats.CpuStats.ThrottlingData.ThrottledTime,
|
||||
}
|
||||
return &cstructs.TaskResourceUsage{MemoryStats: ms, CpuStats: cs}, nil
|
||||
return &cstructs.TaskResourceUsage{MemoryStats: ms, CpuStats: cs, Timestamp: time.Now()}, nil
|
||||
}
|
||||
|
||||
// runAs takes a user id as a string and looks up the user, and sets the command
|
||||
|
||||
@@ -104,4 +104,5 @@ type CpuUsage struct {
|
||||
type TaskResourceUsage struct {
|
||||
MemoryStats *MemoryStats
|
||||
CpuStats *CpuUsage
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/client/getter"
|
||||
"github.com/hashicorp/nomad/client/stats"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
||||
"github.com/hashicorp/nomad/client/driver/env"
|
||||
@@ -34,6 +35,11 @@ const (
|
||||
killFailureLimit = 5
|
||||
)
|
||||
|
||||
type TaskStatsReporter interface {
|
||||
ResourceUsage() *cstructs.TaskResourceUsage
|
||||
ResourceUsageTS() []*cstructs.TaskResourceUsage
|
||||
}
|
||||
|
||||
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
|
||||
type TaskRunner struct {
|
||||
config *config.Config
|
||||
@@ -43,7 +49,7 @@ type TaskRunner struct {
|
||||
alloc *structs.Allocation
|
||||
restartTracker *RestartTracker
|
||||
|
||||
resourceUsage *cstructs.TaskResourceUsage
|
||||
resourceUsage *stats.RingBuff
|
||||
resourceUsageLock sync.RWMutex
|
||||
stopResourceMonitorCh chan struct{}
|
||||
|
||||
@@ -90,11 +96,18 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
}
|
||||
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
|
||||
|
||||
resourceUsage, err := stats.NewRingBuff(60)
|
||||
if err != nil {
|
||||
logger.Printf("[ERR] client: can't create resource usage buffer: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
tc := &TaskRunner{
|
||||
config: config,
|
||||
updater: updater,
|
||||
logger: logger,
|
||||
restartTracker: restartTracker,
|
||||
resourceUsage: resourceUsage,
|
||||
ctx: ctx,
|
||||
alloc: alloc,
|
||||
task: task,
|
||||
@@ -455,7 +468,7 @@ func (r *TaskRunner) monitorUsage() {
|
||||
r.logger.Printf("[DEBUG] client.taskrunner: error fetching stats of task %v: %v", r.task.Name, err)
|
||||
}
|
||||
r.resourceUsageLock.RLock()
|
||||
r.resourceUsage = ru
|
||||
r.resourceUsage.Enqueue(ru)
|
||||
r.resourceUsageLock.RUnlock()
|
||||
next.Reset(1 * time.Second)
|
||||
case <-r.stopResourceMonitorCh:
|
||||
@@ -465,10 +478,29 @@ func (r *TaskRunner) monitorUsage() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *TaskRunner) StatsReporter() TaskStatsReporter {
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *TaskRunner) ResourceUsage() *cstructs.TaskResourceUsage {
|
||||
r.resourceUsageLock.RLock()
|
||||
defer r.resourceUsageLock.RUnlock()
|
||||
return r.resourceUsage
|
||||
val := r.resourceUsage.Peek()
|
||||
if val != nil {
|
||||
return val.(*cstructs.TaskResourceUsage)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *TaskRunner) ResourceUsageTS() []*cstructs.TaskResourceUsage {
|
||||
r.resourceUsageLock.RLock()
|
||||
defer r.resourceUsageLock.RUnlock()
|
||||
values := r.resourceUsage.Values()
|
||||
ts := make([]*cstructs.TaskResourceUsage, len(values))
|
||||
for index, val := range values {
|
||||
ts[index] = val.(*cstructs.TaskResourceUsage)
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
// handleUpdate takes an updated allocation and updates internal state to
|
||||
|
||||
@@ -13,8 +13,20 @@ func (s *HTTPServer) StatsRequest(resp http.ResponseWriter, req *http.Request) (
|
||||
if allocID = req.URL.Query().Get("allocation"); allocID == "" {
|
||||
return nil, fmt.Errorf("provide a valid alloc id")
|
||||
}
|
||||
if task = req.URL.Query().Get("task"); task != "" {
|
||||
return s.agent.client.ResourceUsageOfTask(allocID, task)
|
||||
statsReporter, err := s.agent.client.AllocStats(allocID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.agent.client.ResourceUsageOfAlloc(allocID)
|
||||
if task = req.URL.Query().Get("task"); task != "" {
|
||||
taskStatsReporter, err := statsReporter.TaskStats(task)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return taskStatsReporter.ResourceUsage(), nil
|
||||
}
|
||||
res := make(map[string]interface{})
|
||||
for task, sr := range statsReporter.AllocStats() {
|
||||
res[task] = sr.ResourceUsage()
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user