diff --git a/api/allocations.go b/api/allocations.go index 18d1d4d6e..b7db0f29a 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -43,7 +43,7 @@ func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *Query return &resp, qm, nil } -func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (map[string]*TaskResourceUsage, error) { +func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (*AllocResourceUsage, error) { node, _, err := a.client.Nodes().Info(alloc.NodeID, q) if err != nil { return nil, err @@ -58,13 +58,9 @@ func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (map[string]*Tas if err != nil { return nil, err } - resp := make(map[string][]*TaskResourceUsage) + var resp AllocResourceUsage _, err = client.query("/v1/client/allocation/"+alloc.ID+"/stats", &resp, nil) - res := make(map[string]*TaskResourceUsage) - for task, ru := range resp { - res[task] = ru[0] - } - return res, err + return &resp, err } // Allocation is used for serialization of allocations. diff --git a/api/tasks.go b/api/tasks.go index 33834b2a9..ca1ff41d8 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -40,6 +40,14 @@ type TaskResourceUsage struct { Pids map[string]*ResourceUsage } +// AllocResourceUsage holds the aggregated task resource usage of the +// allocation. +type AllocResourceUsage struct { + ResourceUsage *ResourceUsage + Tasks map[string]*TaskResourceUsage + Timestamp int64 +} + // RestartPolicy defines how the Nomad client restarts // tasks in a taskgroup when they fail type RestartPolicy struct { diff --git a/client/alloc_runner.go b/client/alloc_runner.go index b796ccbe4..4c3214552 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -13,6 +13,8 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/structs" ) const ( @@ -28,11 +30,6 @@ const ( // AllocStateUpdater is used to update the status of an allocation type AllocStateUpdater func(alloc *structs.Allocation) -// AllocStatsReporter exposes stats related APIs of an allocation runner -type AllocStatsReporter interface { - AllocStats() map[string]TaskStatsReporter -} - // AllocRunner is used to wrap an allocation and provide the execution context. type AllocRunner struct { config *config.Config @@ -476,33 +473,62 @@ func (r *AllocRunner) Update(update *structs.Allocation) { } } -// StatsReporter returns an interface to query resource usage statistics of an -// allocation -func (r *AllocRunner) StatsReporter() AllocStatsReporter { - return r -} - -// AllocStats returns the stats reporter of all the tasks running in the -// allocation -func (r *AllocRunner) AllocStats() map[string]TaskStatsReporter { +// LatestAllocStats returns the latest allocation stats. If the optional taskFilter is set +// the allocation stats will only include the given task. +func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) { r.taskLock.RLock() defer r.taskLock.RUnlock() - res := make(map[string]TaskStatsReporter) - for task, tr := range r.tasks { - res[task] = tr.StatsReporter() + + astat := &cstructs.AllocResourceUsage{ + Tasks: make(map[string]*cstructs.TaskResourceUsage), } - return res + + var flat []*cstructs.TaskResourceUsage + if taskFilter != "" { + tr, ok := r.tasks[taskFilter] + if !ok { + return nil, fmt.Errorf("allocation %q has no task %q", r.alloc.ID, taskFilter) + } + l := tr.LatestResourceUsage() + if l != nil { + astat.Tasks[taskFilter] = l + flat = []*cstructs.TaskResourceUsage{l} + astat.Timestamp = l.Timestamp + } + } else { + for task, tr := range r.tasks { + l := tr.LatestResourceUsage() + if l != nil { + astat.Tasks[task] = l + flat = append(flat, l) + if l.Timestamp > astat.Timestamp { + astat.Timestamp = l.Timestamp + } + } + } + } + + astat.ResourceUsage = sumTaskResourceUsage(flat) + return astat, nil } -// TaskStats returns the stats reporter for a specific task running in the -// allocation -func (r *AllocRunner) TaskStats(task string) (TaskStatsReporter, error) { - tr, ok := r.tasks[task] - if !ok { - return nil, fmt.Errorf("task %q not running in allocation %v", task, r.alloc.ID) +// sumTaskResourceUsage takes a set of task resources and sums their resources +func sumTaskResourceUsage(usages []*cstructs.TaskResourceUsage) *cstructs.ResourceUsage { + summed := &cstructs.ResourceUsage{ + MemoryStats: &cstructs.MemoryStats{}, + CpuStats: &cstructs.CpuStats{}, } + for _, usage := range usages { + summed.Add(usage.ResourceUsage) + } + return summed +} - return tr.StatsReporter(), nil +// AllocStatsSince returns the allocation stats collected since the passed unix +// nanosecond timestamp. If the optional taskFilter is set the allocation stats +// will only include the given task. +func (r *AllocRunner) AllocStatsSince(taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error) { + return nil, nil } // shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and diff --git a/client/client.go b/client/client.go index deaa78233..4ec1314fd 100644 --- a/client/client.go +++ b/client/client.go @@ -26,6 +26,8 @@ import ( "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/hashstructure" + + cstructs "github.com/hashicorp/nomad/client/structs" ) const ( @@ -81,15 +83,20 @@ const ( // ClientStatsReporter exposes all the APIs related to resource usage of a Nomad // Client type ClientStatsReporter interface { - // AllocStats returns a map of alloc ids and their corresponding stats - // collector - AllocStats() map[string]AllocStatsReporter + // LatestAllocStats returns the latest allocation resource usage optionally + // filtering by task name + LatestAllocStats(allocID, taskFilter string) (*cstructs.AllocResourceUsage, error) - // HostStats returns resource usage stats for the host - HostStats() []*stats.HostStats + // AllocStatsSince returns the allocation resource usage collected since the + // passed timestamp optionally filtering by task name. + AllocStatsSince(allocID, taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error) - // HostStatsTS returns a time series of host resource usage stats - HostStatsTS(since int64) []*stats.HostStats + // LatestHostStats returns the latest resource usage stats for the host + LatestHostStats() *stats.HostStats + + // HostStatsSince returns the collect resource usage stats for the host + // since the passed unix nanosecond time stamp + HostStatsSince(since int64) []*stats.HostStats } // Client is used to implement the client interaction with Nomad. Clients @@ -400,26 +407,40 @@ func (c *Client) StatsReporter() ClientStatsReporter { return c } -// AllocStats returns all the stats reporter of the allocations running on a -// Nomad client -func (c *Client) AllocStats() map[string]AllocStatsReporter { - res := make(map[string]AllocStatsReporter) - for alloc, ar := range c.getAllocRunners() { - res[alloc] = ar +// LatestAllocStats returns the latest allocation resource usage optionally +// filtering by task name +func (c *Client) LatestAllocStats(allocID, taskFilter string) (*cstructs.AllocResourceUsage, error) { + c.allocLock.RLock() + ar, ok := c.allocs[allocID] + if !ok { + return nil, fmt.Errorf("unknown allocation ID %q", allocID) } - return res + c.allocLock.RUnlock() + return ar.LatestAllocStats(taskFilter) +} + +// AllocStatsSince returns the allocation resource usage collected since the +// passed timestamp optionally filtering by task name. +func (c *Client) AllocStatsSince(allocID, taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error) { + c.allocLock.RLock() + ar, ok := c.allocs[allocID] + if !ok { + return nil, fmt.Errorf("unknown allocation ID %q", allocID) + } + c.allocLock.RUnlock() + return ar.AllocStatsSince(taskFilter, since) } // HostStats returns all the stats related to a Nomad client -func (c *Client) HostStats() []*stats.HostStats { +func (c *Client) LatestHostStats() *stats.HostStats { c.resourceUsageLock.RLock() defer c.resourceUsageLock.RUnlock() val := c.resourceUsage.Peek() ru, _ := val.(*stats.HostStats) - return []*stats.HostStats{ru} + return ru } -func (c *Client) HostStatsTS(since int64) []*stats.HostStats { +func (c *Client) HostStatsSince(since int64) []*stats.HostStats { c.resourceUsageLock.RLock() defer c.resourceUsageLock.RUnlock() diff --git a/client/driver/docker.go b/client/driver/docker.go index 972cb6000..a200a571b 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -21,7 +21,8 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" - cstructs "github.com/hashicorp/nomad/client/driver/structs" + dstructs "github.com/hashicorp/nomad/client/driver/structs" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/helper/fields" shelpers "github.com/hashicorp/nomad/helper/stats" @@ -140,7 +141,7 @@ type DockerHandle struct { maxKillTimeout time.Duration resourceUsageLock sync.RWMutex resourceUsage *cstructs.TaskResourceUsage - waitCh chan *cstructs.WaitResult + waitCh chan *dstructs.WaitResult doneCh chan bool } @@ -541,7 +542,7 @@ func (d *DockerDriver) recoverablePullError(err error, image string) error { if imageNotFoundMatcher.MatchString(err.Error()) { recoverable = false } - return cstructs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable) + return dstructs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable) } func (d *DockerDriver) Periodic() (bool, time.Duration) { @@ -797,7 +798,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle killTimeout: GetKillTimeout(task.KillTimeout, maxKill), maxKillTimeout: maxKill, doneCh: make(chan bool), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if err := exec.SyncServices(consulContext(d.config, container.ID)); err != nil { d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err) @@ -872,7 +873,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er killTimeout: pid.KillTimeout, maxKillTimeout: pid.MaxKillTimeout, doneCh: make(chan bool), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if err := exec.SyncServices(consulContext(d.config, pid.ContainerID)); err != nil { h.logger.Printf("[ERR] driver.docker: error registering services with consul: %v", err) @@ -904,7 +905,7 @@ func (h *DockerHandle) ContainerID() string { return h.containerID } -func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult { +func (h *DockerHandle) WaitCh() chan *dstructs.WaitResult { return h.waitCh } @@ -961,7 +962,7 @@ func (h *DockerHandle) run() { } close(h.doneCh) - h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err) + h.waitCh <- dstructs.NewWaitResult(exitCode, 0, err) close(h.waitCh) // Remove services diff --git a/client/driver/driver.go b/client/driver/driver.go index 9e8513073..112626585 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -12,7 +12,8 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/nomad/structs" - cstructs "github.com/hashicorp/nomad/client/driver/structs" + dstructs "github.com/hashicorp/nomad/client/driver/structs" + cstructs "github.com/hashicorp/nomad/client/structs" ) // BuiltinDrivers contains the built in registered drivers @@ -105,7 +106,7 @@ type DriverHandle interface { ID() string // WaitCh is used to return a channel used wait for task completion - WaitCh() chan *cstructs.WaitResult + WaitCh() chan *dstructs.WaitResult // Update is used to update the task if possible and update task related // configurations. diff --git a/client/driver/exec.go b/client/driver/exec.go index 77456b028..82e657ef5 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -14,7 +14,8 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" - cstructs "github.com/hashicorp/nomad/client/driver/structs" + dstructs "github.com/hashicorp/nomad/client/driver/structs" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/helper/fields" "github.com/hashicorp/nomad/nomad/structs" @@ -42,13 +43,13 @@ type ExecDriverConfig struct { type execHandle struct { pluginClient *plugin.Client executor executor.Executor - isolationConfig *cstructs.IsolationConfig + isolationConfig *dstructs.IsolationConfig userPid int allocDir *allocdir.AllocDir killTimeout time.Duration maxKillTimeout time.Duration logger *log.Logger - waitCh chan *cstructs.WaitResult + waitCh chan *dstructs.WaitResult doneCh chan struct{} version string } @@ -153,7 +154,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, logger: d.logger, version: d.config.Version, doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if err := exec.SyncServices(consulContext(d.config, "")); err != nil { d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %q: %v", task.Name, err) @@ -169,7 +170,7 @@ type execId struct { UserPid int TaskDir string AllocDir *allocdir.AllocDir - IsolationConfig *cstructs.IsolationConfig + IsolationConfig *dstructs.IsolationConfig PluginConfig *PluginReattachConfig } @@ -217,7 +218,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro killTimeout: id.KillTimeout, maxKillTimeout: id.MaxKillTimeout, doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if err := exec.SyncServices(consulContext(d.config, "")); err != nil { d.logger.Printf("[ERR] driver.exec: error registering services with consul: %v", err) @@ -244,7 +245,7 @@ func (h *execHandle) ID() string { return string(data) } -func (h *execHandle) WaitCh() chan *cstructs.WaitResult { +func (h *execHandle) WaitCh() chan *dstructs.WaitResult { return h.waitCh } @@ -305,7 +306,7 @@ func (h *execHandle) run() { h.logger.Printf("[ERR] driver.exec: unmounting dev,proc and alloc dirs failed: %v", e) } } - h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, ps.Signal, err) + h.waitCh <- dstructs.NewWaitResult(ps.ExitCode, ps.Signal, err) close(h.waitCh) // Remove services if err := h.executor.DeregisterServices(); err != nil { diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 8644b5aa0..565927ccb 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -23,11 +23,13 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/env" "github.com/hashicorp/nomad/client/driver/logging" - cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" + + dstructs "github.com/hashicorp/nomad/client/driver/structs" + cstructs "github.com/hashicorp/nomad/client/structs" ) const ( @@ -140,7 +142,7 @@ type ProcessState struct { Pid int ExitCode int Signal int - IsolationConfig *cstructs.IsolationConfig + IsolationConfig *dstructs.IsolationConfig Time time.Time } @@ -155,7 +157,7 @@ type nomadPid struct { // SyslogServerState holds the address and islation information of a launched // syslog server type SyslogServerState struct { - IsolationConfig *cstructs.IsolationConfig + IsolationConfig *dstructs.IsolationConfig Addr string } @@ -296,7 +298,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext } go e.collectPids() go e.wait() - ic := &cstructs.IsolationConfig{Cgroup: e.groups, CgroupPaths: e.cgPaths} + ic := &dstructs.IsolationConfig{Cgroup: e.groups, CgroupPaths: e.cgPaths} return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil } @@ -371,7 +373,7 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { func (e *UniversalExecutor) wait() { defer close(e.processExited) err := e.cmd.Wait() - ic := &cstructs.IsolationConfig{Cgroup: e.groups, CgroupPaths: e.cgPaths} + ic := &dstructs.IsolationConfig{Cgroup: e.groups, CgroupPaths: e.cgPaths} if err == nil { e.exitState = &ProcessState{Pid: 0, ExitCode: 0, IsolationConfig: ic, Time: time.Now()} return diff --git a/client/driver/executor/executor_basic.go b/client/driver/executor/executor_basic.go index d945721fb..e0f7bb659 100644 --- a/client/driver/executor/executor_basic.go +++ b/client/driver/executor/executor_basic.go @@ -5,9 +5,9 @@ package executor import ( "os" - cstructs "github.com/hashicorp/nomad/client/driver/structs" - + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/mitchellh/go-ps" + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" ) diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go index 812d1a001..b0a91e861 100644 --- a/client/driver/executor/executor_linux.go +++ b/client/driver/executor/executor_linux.go @@ -18,8 +18,8 @@ import ( "github.com/opencontainers/runc/libcontainer/system" "github.com/hashicorp/nomad/client/allocdir" - cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/stats" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" ) diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index a4edf2da7..1fc9d7e45 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -7,7 +7,7 @@ import ( "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/driver/executor" - cstructs "github.com/hashicorp/nomad/client/driver/structs" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" ) diff --git a/client/driver/java.go b/client/driver/java.go index f18bdf842..954131881 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -19,8 +19,9 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" - cstructs "github.com/hashicorp/nomad/client/driver/structs" + dstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/helper/fields" "github.com/hashicorp/nomad/nomad/structs" @@ -50,7 +51,7 @@ type javaHandle struct { pluginClient *plugin.Client userPid int executor executor.Executor - isolationConfig *cstructs.IsolationConfig + isolationConfig *dstructs.IsolationConfig taskDir string allocDir *allocdir.AllocDir @@ -58,7 +59,7 @@ type javaHandle struct { maxKillTimeout time.Duration version string logger *log.Logger - waitCh chan *cstructs.WaitResult + waitCh chan *dstructs.WaitResult doneCh chan struct{} } @@ -241,7 +242,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, version: d.config.Version, logger: d.logger, doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { d.logger.Printf("[ERR] driver.java: error registering services with consul for task: %q: %v", task.Name, err) @@ -262,7 +263,7 @@ type javaId struct { KillTimeout time.Duration MaxKillTimeout time.Duration PluginConfig *PluginReattachConfig - IsolationConfig *cstructs.IsolationConfig + IsolationConfig *dstructs.IsolationConfig TaskDir string AllocDir *allocdir.AllocDir UserPid int @@ -315,7 +316,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro killTimeout: id.KillTimeout, maxKillTimeout: id.MaxKillTimeout, doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { d.logger.Printf("[ERR] driver.java: error registering services with consul: %v", err) @@ -344,7 +345,7 @@ func (h *javaHandle) ID() string { return string(data) } -func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { +func (h *javaHandle) WaitCh() chan *dstructs.WaitResult { return h.waitCh } @@ -403,7 +404,7 @@ func (h *javaHandle) run() { h.logger.Printf("[ERR] driver.java: unmounting dev,proc and alloc dirs failed: %v", e) } } - h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: ps.Signal, Err: err} + h.waitCh <- &dstructs.WaitResult{ExitCode: ps.ExitCode, Signal: ps.Signal, Err: err} close(h.waitCh) // Remove services diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 719ce41e7..7f577dfbf 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -15,8 +15,9 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" - cstructs "github.com/hashicorp/nomad/client/driver/structs" + dstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/helper/fields" "github.com/hashicorp/nomad/nomad/structs" @@ -57,7 +58,7 @@ type qemuHandle struct { maxKillTimeout time.Duration logger *log.Logger version string - waitCh chan *cstructs.WaitResult + waitCh chan *dstructs.WaitResult doneCh chan struct{} } @@ -260,7 +261,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, version: d.config.Version, logger: d.logger, doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { @@ -311,7 +312,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro maxKillTimeout: id.MaxKillTimeout, version: id.Version, doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { h.logger.Printf("[ERR] driver.qemu: error registering services: %v", err) @@ -337,7 +338,7 @@ func (h *qemuHandle) ID() string { return string(data) } -func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { +func (h *qemuHandle) WaitCh() chan *dstructs.WaitResult { return h.waitCh } @@ -390,7 +391,7 @@ func (h *qemuHandle) run() { } } close(h.doneCh) - h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: ps.Signal, Err: err} + h.waitCh <- &dstructs.WaitResult{ExitCode: ps.ExitCode, Signal: ps.Signal, Err: err} close(h.waitCh) // Remove services if err := h.executor.DeregisterServices(); err != nil { diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 2aa0695cc..684632b06 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -13,8 +13,9 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" - cstructs "github.com/hashicorp/nomad/client/driver/structs" + dstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/helper/fields" "github.com/hashicorp/nomad/nomad/structs" @@ -48,7 +49,7 @@ type rawExecHandle struct { maxKillTimeout time.Duration allocDir *allocdir.AllocDir logger *log.Logger - waitCh chan *cstructs.WaitResult + waitCh chan *dstructs.WaitResult doneCh chan struct{} } @@ -165,7 +166,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl version: d.config.Version, logger: d.logger, doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul for task: %q: %v", task.Name, err) @@ -215,7 +216,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e allocDir: id.AllocDir, version: id.Version, doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul: %v", err) @@ -241,7 +242,7 @@ func (h *rawExecHandle) ID() string { return string(data) } -func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { +func (h *rawExecHandle) WaitCh() chan *dstructs.WaitResult { return h.waitCh } @@ -292,7 +293,7 @@ func (h *rawExecHandle) run() { h.logger.Printf("[ERR] driver.raw_exec: unmounting dev,proc and alloc dirs failed: %v", e) } } - h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: ps.Signal, Err: err} + h.waitCh <- &dstructs.WaitResult{ExitCode: ps.ExitCode, Signal: ps.Signal, Err: err} close(h.waitCh) // Remove services if err := h.executor.DeregisterServices(); err != nil { diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 137c643d7..36795446f 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -19,8 +19,9 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" - cstructs "github.com/hashicorp/nomad/client/driver/structs" + dstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/helper/fields" "github.com/hashicorp/nomad/nomad/structs" @@ -70,7 +71,7 @@ type rktHandle struct { logger *log.Logger killTimeout time.Duration maxKillTimeout time.Duration - waitCh chan *cstructs.WaitResult + waitCh chan *dstructs.WaitResult doneCh chan struct{} } @@ -308,7 +309,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e killTimeout: GetKillTimeout(task.KillTimeout, maxKill), maxKillTimeout: maxKill, doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if h.executor.SyncServices(consulContext(d.config, "")); err != nil { h.logger.Printf("[ERR] driver.rkt: error registering services for task: %q: %v", task.Name, err) @@ -349,7 +350,7 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error killTimeout: id.KillTimeout, maxKillTimeout: id.MaxKillTimeout, doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + waitCh: make(chan *dstructs.WaitResult, 1), } if h.executor.SyncServices(consulContext(d.config, "")); err != nil { h.logger.Printf("[ERR] driver.rkt: error registering services: %v", err) @@ -374,7 +375,7 @@ func (h *rktHandle) ID() string { return fmt.Sprintf("Rkt:%s", string(data)) } -func (h *rktHandle) WaitCh() chan *cstructs.WaitResult { +func (h *rktHandle) WaitCh() chan *dstructs.WaitResult { return h.waitCh } @@ -414,7 +415,7 @@ func (h *rktHandle) run() { h.logger.Printf("[ERROR] driver.rkt: unmounting dev,proc and alloc dirs failed: %v", e) } } - h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err) + h.waitCh <- dstructs.NewWaitResult(ps.ExitCode, 0, err) close(h.waitCh) // Remove services if err := h.executor.DeregisterServices(); err != nil { diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index 2410f371f..9059df6ec 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -84,43 +84,3 @@ type CheckResult struct { // Err is the error that a check returned Err error } - -// MemoryStats holds memory usage related stats -type MemoryStats struct { - RSS uint64 - Cache uint64 - Swap uint64 - MaxUsage uint64 - KernelUsage uint64 - KernelMaxUsage uint64 - - // A list of fields whose values were actually sampled - Measured []string -} - -// CpuStats holds cpu usage related stats -type CpuStats struct { - SystemMode float64 - UserMode float64 - TotalTicks float64 - ThrottledPeriods uint64 - ThrottledTime uint64 - Percent float64 - - // A list of fields whose values were actually sampled - Measured []string -} - -// ResourceUsage holds information related to cpu and memory stats -type ResourceUsage struct { - MemoryStats *MemoryStats - CpuStats *CpuStats -} - -// TaskResourceUsage holds aggregated resource usage of all processes in a Task -// and the resource usage of the individual pids -type TaskResourceUsage struct { - ResourceUsage *ResourceUsage - Timestamp int64 - Pids map[string]*ResourceUsage -} diff --git a/client/structs/structs.go b/client/structs/structs.go new file mode 100644 index 000000000..724084dc8 --- /dev/null +++ b/client/structs/structs.go @@ -0,0 +1,97 @@ +package structs + +// MemoryStats holds memory usage related stats +type MemoryStats struct { + RSS uint64 + Cache uint64 + Swap uint64 + MaxUsage uint64 + KernelUsage uint64 + KernelMaxUsage uint64 + + // A list of fields whose values were actually sampled + Measured []string +} + +func (ms *MemoryStats) Add(other *MemoryStats) { + ms.RSS += other.RSS + ms.Cache += other.Cache + ms.Swap += other.Swap + ms.MaxUsage += other.MaxUsage + ms.KernelUsage += other.KernelUsage + ms.KernelMaxUsage += other.KernelMaxUsage + ms.Measured = joinStringSet(ms.Measured, other.Measured) +} + +// CpuStats holds cpu usage related stats +type CpuStats struct { + SystemMode float64 + UserMode float64 + TotalTicks float64 + ThrottledPeriods uint64 + ThrottledTime uint64 + Percent float64 + + // A list of fields whose values were actually sampled + Measured []string +} + +func (cs *CpuStats) Add(other *CpuStats) { + cs.SystemMode += other.SystemMode + cs.UserMode += other.UserMode + cs.TotalTicks += other.TotalTicks + cs.ThrottledPeriods += other.ThrottledPeriods + cs.ThrottledTime += other.ThrottledTime + cs.Percent += other.Percent + cs.Measured = joinStringSet(cs.Measured, other.Measured) +} + +// ResourceUsage holds information related to cpu and memory stats +type ResourceUsage struct { + MemoryStats *MemoryStats + CpuStats *CpuStats +} + +func (ru *ResourceUsage) Add(other *ResourceUsage) { + ru.MemoryStats.Add(other.MemoryStats) + ru.CpuStats.Add(other.CpuStats) +} + +// TaskResourceUsage holds aggregated resource usage of all processes in a Task +// and the resource usage of the individual pids +type TaskResourceUsage struct { + ResourceUsage *ResourceUsage + Timestamp int64 + Pids map[string]*ResourceUsage +} + +// AllocResourceUsage holds the aggregated task resource usage of the +// allocation. +type AllocResourceUsage struct { + // ResourceUsage is the summation of the task resources + ResourceUsage *ResourceUsage + + // Tasks contains the resource usage of each task + Tasks map[string]*TaskResourceUsage + + // The max timestamp of all the Tasks + Timestamp int64 +} + +// joinStringSet takes two slices of strings and joins them +func joinStringSet(s1, s2 []string) []string { + lookup := make(map[string]struct{}, len(s1)) + j := make([]string, 0, len(s1)) + for _, s := range s1 { + j = append(j, s) + lookup[s] = struct{}{} + } + + for _, s := range s2 { + if _, ok := lookup[s]; !ok { + j = append(j, s) + } + } + + return j +} diff --git a/client/task_runner.go b/client/task_runner.go index a6d77b886..310216a23 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -21,7 +21,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/client/driver/env" - cstructs "github.com/hashicorp/nomad/client/driver/structs" + dstructs "github.com/hashicorp/nomad/client/driver/structs" + cstructs "github.com/hashicorp/nomad/client/structs" ) const ( @@ -38,17 +39,6 @@ const ( killFailureLimit = 5 ) -// TaskStatsReporter exposes APIs to query resource usage of a Task -type TaskStatsReporter interface { - // ResourceUsage returns the latest resource usage data point collected for - // the task - ResourceUsage() []*cstructs.TaskResourceUsage - - // ResourceUsageTS returns all the resource usage data points since a given - // time - ResourceUsageTS(since int64) []*cstructs.TaskResourceUsage -} - // TaskRunner is used to wrap a task within an allocation and provide the execution context. type TaskRunner struct { config *config.Config @@ -58,12 +48,17 @@ type TaskRunner struct { alloc *structs.Allocation restartTracker *RestartTracker + // running marks whether the task is running + running bool + runningLock sync.Mutex + resourceUsage *stats.RingBuff resourceUsageLock sync.RWMutex - task *structs.Task - taskEnv *env.TaskEnvironment - updateCh chan *structs.Allocation + task *structs.Task + taskEnv *env.TaskEnvironment + updateCh chan *structs.Allocation + handle driver.DriverHandle handleLock sync.Mutex @@ -329,7 +324,7 @@ func (r *TaskRunner) run() { if err := getter.GetArtifact(r.taskEnv, artifact, taskDir, r.logger); err != nil { r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(err)) - r.restartTracker.SetStartError(cstructs.NewRecoverableError(err, true)) + r.restartTracker.SetStartError(dstructs.NewRecoverableError(err, true)) goto RESTART } } @@ -354,6 +349,9 @@ func (r *TaskRunner) run() { // Mark the task as started r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) + r.runningLock.Lock() + r.running = true + r.runningLock.Unlock() } if stopCollection == nil { @@ -370,6 +368,10 @@ func (r *TaskRunner) run() { panic("nil wait") } + r.runningLock.Lock() + r.running = false + r.runningLock.Unlock() + // Stop collection of the task's resource usage close(stopCollection) @@ -509,23 +511,26 @@ func (r *TaskRunner) collectResourceUsageStats(stopCollection <-chan struct{}) { } } -// TaskStatsReporter returns the stats reporter of the task -func (r *TaskRunner) StatsReporter() TaskStatsReporter { - return r -} - -// ResourceUsage returns the last resource utilization datapoint collected -func (r *TaskRunner) ResourceUsage() []*cstructs.TaskResourceUsage { +// LatestResourceUsage returns the last resource utilization datapoint collected +func (r *TaskRunner) LatestResourceUsage() *cstructs.TaskResourceUsage { r.resourceUsageLock.RLock() defer r.resourceUsageLock.RUnlock() + r.runningLock.Lock() + defer r.runningLock.Unlock() + + // If the task is not running there can be no latest resource + if !r.running { + return nil + } + val := r.resourceUsage.Peek() ru, _ := val.(*cstructs.TaskResourceUsage) - return []*cstructs.TaskResourceUsage{ru} + return ru } -// ResourceUsageTS returns the list of all the resource utilization datapoints -// collected -func (r *TaskRunner) ResourceUsageTS(since int64) []*cstructs.TaskResourceUsage { +// ResourceUsageSince returns the list of all the resource utilization datapoints +// collected since the given timestamp +func (r *TaskRunner) ResourceUsageSince(since int64) []*cstructs.TaskResourceUsage { r.resourceUsageLock.RLock() defer r.resourceUsageLock.RUnlock() @@ -629,7 +634,7 @@ func (r *TaskRunner) handleDestroy() (destroyed bool, err error) { } // Helper function for converting a WaitResult into a TaskTerminated event. -func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent { +func (r *TaskRunner) waitErrorToEvent(res *dstructs.WaitResult) *structs.TaskEvent { return structs.NewTaskEvent(structs.TaskTerminated). SetExitCode(res.ExitCode). SetSignal(res.Signal). diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 8fe63ce79..887f50229 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -76,42 +76,18 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ allocID := tokens[0] clientStats := s.agent.client.StatsReporter() - allocStats, ok := clientStats.AllocStats()[allocID] - if !ok { - return nil, CodedError(404, "alloc not running on node") - } - + task := req.URL.Query().Get("task") var since int var err error - ts := false if sinceTime := req.URL.Query().Get("since"); sinceTime != "" { - ts = true since, err = strconv.Atoi(sinceTime) if err != nil { return nil, CodedError(400, fmt.Sprintf("can't read the since query parameter: %v", err)) } } - if task := req.URL.Query().Get("task"); task != "" { - taskStats, ok := allocStats.AllocStats()[task] - if !ok { - return nil, CodedError(404, "task not present in allocation") - } - if ts { - return taskStats.ResourceUsageTS(int64(since)), nil - } - return taskStats.ResourceUsage(), nil + if since > 0 { + return clientStats.AllocStatsSince(allocID, task, int64(since)) } - - // Return the resource usage of all the tasks in an allocation if task name - // is not specified - res := make(map[string]interface{}) - for task, taskStats := range allocStats.AllocStats() { - if ts { - res[task] = taskStats.ResourceUsageTS(int64(since)) - } else { - res[task] = taskStats.ResourceUsage() - } - } - return res, nil + return clientStats.LatestAllocStats(allocID, task) } diff --git a/command/agent/stats_endpoint.go b/command/agent/stats_endpoint.go index 0cf5b8dc5..c725d373d 100644 --- a/command/agent/stats_endpoint.go +++ b/command/agent/stats_endpoint.go @@ -4,6 +4,8 @@ import ( "fmt" "net/http" "strconv" + + "github.com/hashicorp/nomad/client/stats" ) const ( @@ -28,7 +30,7 @@ func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Requ clientStats := s.agent.client.StatsReporter() if ts { - return clientStats.HostStatsTS(int64(since)), nil + return clientStats.HostStatsSince(int64(since)), nil } - return clientStats.HostStats(), nil + return []*stats.HostStats{clientStats.LatestHostStats()}, nil } diff --git a/command/alloc_status.go b/command/alloc_status.go index ebdcfdf46..623923a45 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -131,7 +131,7 @@ func (c *AllocStatusCommand) Run(args []string) int { } var statsErr error - var stats map[string]*api.TaskResourceUsage + var stats *api.AllocResourceUsage stats, statsErr = client.Allocations().Stats(alloc, nil) // Format the allocation data @@ -322,7 +322,7 @@ func (c *AllocStatusCommand) allocResources(alloc *api.Allocation) { } // taskResources prints out the tasks current resource usage -func (c *AllocStatusCommand) taskResources(alloc *api.Allocation, stats map[string]*api.TaskResourceUsage, displayStats bool) { +func (c *AllocStatusCommand) taskResources(alloc *api.Allocation, stats *api.AllocResourceUsage, displayStats bool) { if len(alloc.TaskResources) == 0 { return } @@ -358,9 +358,11 @@ func (c *AllocStatusCommand) taskResources(alloc *api.Allocation, stats map[stri if len(addr) > 0 { firstAddr = addr[0] } + + // Display the rolled up stats. If possible prefer the live stastics cpuUsage := strconv.Itoa(resource.CPU) memUsage := strconv.Itoa(resource.MemoryMB) - if ru, ok := stats[task]; ok && ru != nil && ru.ResourceUsage != nil { + if ru, ok := stats.Tasks[task]; ok && ru != nil && ru.ResourceUsage != nil { if cs := ru.ResourceUsage.CpuStats; cs != nil { cpuUsage = fmt.Sprintf("%v/%v", math.Floor(cs.TotalTicks), resource.CPU) } @@ -379,7 +381,7 @@ func (c *AllocStatusCommand) taskResources(alloc *api.Allocation, stats map[stri } c.Ui.Output(formatListWithSpaces(resourcesOutput)) - if ru, ok := stats[task]; ok && ru != nil && displayStats && ru.ResourceUsage != nil { + if ru, ok := stats.Tasks[task]; ok && ru != nil && displayStats && ru.ResourceUsage != nil { c.Ui.Output("") c.printTaskResourceUsage(task, ru.ResourceUsage) }