From f390261cd002dc6a213aeac96f8568c52d5c8e28 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 9 May 2016 07:57:26 -0700 Subject: [PATCH] Reporting time series of stats --- client/alloc_runner.go | 20 +++++++++---- client/client.go | 13 ++------ client/driver/executor/executor_linux.go | 3 +- client/driver/structs/structs.go | 1 + client/task_runner.go | 38 ++++++++++++++++++++++-- command/agent/stats_endpoint.go | 18 +++++++++-- 6 files changed, 69 insertions(+), 24 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 2e3ced2db..073d38315 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -12,7 +12,6 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" - cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -29,6 +28,11 @@ const ( // AllocStateUpdater is used to update the status of an allocation type AllocStateUpdater func(alloc *structs.Allocation) +type AllocStatsReporter interface { + AllocStats() map[string]TaskStatsReporter + TaskStats(task string) (TaskStatsReporter, error) +} + // AllocRunner is used to wrap an allocation and provide the execution context. type AllocRunner struct { config *config.Config @@ -472,21 +476,25 @@ func (r *AllocRunner) Update(update *structs.Allocation) { } } -func (r *AllocRunner) ResourceUsage() map[string]*cstructs.TaskResourceUsage { - res := make(map[string]*cstructs.TaskResourceUsage) +func (r *AllocRunner) StatsReporter() AllocStatsReporter { + return r +} + +func (r *AllocRunner) AllocStats() map[string]TaskStatsReporter { + res := make(map[string]TaskStatsReporter) for task, tr := range r.tasks { - res[task] = tr.ResourceUsage() + res[task] = tr.StatsReporter() } return res } -func (r *AllocRunner) ResourceUsageOfTask(task string) (*cstructs.TaskResourceUsage, error) { +func (r *AllocRunner) TaskStats(task string) (TaskStatsReporter, error) { tr, ok := r.tasks[task] if !ok { return nil, fmt.Errorf("task %q not running", task) } - return tr.ResourceUsage(), nil + return tr.StatsReporter(), nil } // shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and diff --git a/client/client.go b/client/client.go index 8928bb1eb..6e60e6bd8 100644 --- a/client/client.go +++ b/client/client.go @@ -17,7 +17,6 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/driver" - cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" @@ -395,20 +394,12 @@ func (c *Client) Node() *structs.Node { return c.config.Node } -func (c *Client) ResourceUsageOfAlloc(alloc string) (map[string]*cstructs.TaskResourceUsage, error) { +func (c *Client) AllocStats(alloc string) (AllocStatsReporter, error) { ar, ok := c.allocs[alloc] if !ok { return nil, fmt.Errorf("allocation: %q not running on this client", alloc) } - return ar.ResourceUsage(), nil -} - -func (c *Client) ResourceUsageOfTask(alloc string, task string) (*cstructs.TaskResourceUsage, error) { - ar, ok := c.allocs[alloc] - if !ok { - return nil, fmt.Errorf("allocation: %q not running on this client", alloc) - } - return ar.ResourceUsageOfTask(task) + return ar.StatsReporter(), nil } // GetAllocFS returns the AllocFS interface for the alloc dir of an allocation diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go index bb170af55..2aa92fe31 100644 --- a/client/driver/executor/executor_linux.go +++ b/client/driver/executor/executor_linux.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "syscall" + "time" "github.com/hashicorp/go-multierror" "github.com/opencontainers/runc/libcontainer/cgroups" @@ -157,7 +158,7 @@ func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) { ThrottledPeriods: stats.CpuStats.ThrottlingData.ThrottledPeriods, ThrottledTime: stats.CpuStats.ThrottlingData.ThrottledTime, } - return &cstructs.TaskResourceUsage{MemoryStats: ms, CpuStats: cs}, nil + return &cstructs.TaskResourceUsage{MemoryStats: ms, CpuStats: cs, Timestamp: time.Now()}, nil } // runAs takes a user id as a string and looks up the user, and sets the command diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index 5ec4f211d..89e0ed40b 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -104,4 +104,5 @@ type CpuUsage struct { type TaskResourceUsage struct { MemoryStats *MemoryStats CpuStats *CpuUsage + Timestamp time.Time } diff --git a/client/task_runner.go b/client/task_runner.go index 7e6175c83..307b85cd4 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/getter" + "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/client/driver/env" @@ -34,6 +35,11 @@ const ( killFailureLimit = 5 ) +type TaskStatsReporter interface { + ResourceUsage() *cstructs.TaskResourceUsage + ResourceUsageTS() []*cstructs.TaskResourceUsage +} + // TaskRunner is used to wrap a task within an allocation and provide the execution context. type TaskRunner struct { config *config.Config @@ -43,7 +49,7 @@ type TaskRunner struct { alloc *structs.Allocation restartTracker *RestartTracker - resourceUsage *cstructs.TaskResourceUsage + resourceUsage *stats.RingBuff resourceUsageLock sync.RWMutex stopResourceMonitorCh chan struct{} @@ -90,11 +96,18 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, } restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type) + resourceUsage, err := stats.NewRingBuff(60) + if err != nil { + logger.Printf("[ERR] client: can't create resource usage buffer: %v", err) + return nil + } + tc := &TaskRunner{ config: config, updater: updater, logger: logger, restartTracker: restartTracker, + resourceUsage: resourceUsage, ctx: ctx, alloc: alloc, task: task, @@ -455,7 +468,7 @@ func (r *TaskRunner) monitorUsage() { r.logger.Printf("[DEBUG] client.taskrunner: error fetching stats of task %v: %v", r.task.Name, err) } r.resourceUsageLock.RLock() - r.resourceUsage = ru + r.resourceUsage.Enqueue(ru) r.resourceUsageLock.RUnlock() next.Reset(1 * time.Second) case <-r.stopResourceMonitorCh: @@ -465,10 +478,29 @@ func (r *TaskRunner) monitorUsage() { } } +func (r *TaskRunner) StatsReporter() TaskStatsReporter { + return r +} + func (r *TaskRunner) ResourceUsage() *cstructs.TaskResourceUsage { r.resourceUsageLock.RLock() defer r.resourceUsageLock.RUnlock() - return r.resourceUsage + val := r.resourceUsage.Peek() + if val != nil { + return val.(*cstructs.TaskResourceUsage) + } + return nil +} + +func (r *TaskRunner) ResourceUsageTS() []*cstructs.TaskResourceUsage { + r.resourceUsageLock.RLock() + defer r.resourceUsageLock.RUnlock() + values := r.resourceUsage.Values() + ts := make([]*cstructs.TaskResourceUsage, len(values)) + for index, val := range values { + ts[index] = val.(*cstructs.TaskResourceUsage) + } + return ts } // handleUpdate takes an updated allocation and updates internal state to diff --git a/command/agent/stats_endpoint.go b/command/agent/stats_endpoint.go index 594b97109..66c64217f 100644 --- a/command/agent/stats_endpoint.go +++ b/command/agent/stats_endpoint.go @@ -13,8 +13,20 @@ func (s *HTTPServer) StatsRequest(resp http.ResponseWriter, req *http.Request) ( if allocID = req.URL.Query().Get("allocation"); allocID == "" { return nil, fmt.Errorf("provide a valid alloc id") } - if task = req.URL.Query().Get("task"); task != "" { - return s.agent.client.ResourceUsageOfTask(allocID, task) + statsReporter, err := s.agent.client.AllocStats(allocID) + if err != nil { + return nil, err } - return s.agent.client.ResourceUsageOfAlloc(allocID) + if task = req.URL.Query().Get("task"); task != "" { + taskStatsReporter, err := statsReporter.TaskStats(task) + if err != nil { + return nil, err + } + return taskStatsReporter.ResourceUsage(), nil + } + res := make(map[string]interface{}) + for task, sr := range statsReporter.AllocStats() { + res[task] = sr.ResourceUsage() + } + return res, nil }