Implemented stats for raw_exec

This commit is contained in:
Diptanu Choudhury
2016-05-11 12:56:47 -07:00
parent 72c60d6b99
commit 0fdff61d2d
4 changed files with 126 additions and 5 deletions

View File

@@ -15,7 +15,9 @@ import (
"time"
"github.com/hashicorp/go-multierror"
"github.com/mitchellh/go-ps"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
"github.com/shirou/gopsutil/process"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/consul"
@@ -146,6 +148,7 @@ type UniversalExecutor struct {
ctx *ExecutorContext
command *ExecCommand
pids []int
taskDir string
exitState *ProcessState
processExited chan interface{}
@@ -251,6 +254,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
if err := e.cmd.Start(); err != nil {
return nil, err
}
go e.collectPids()
go e.wait()
ic := &cstructs.IsolationConfig{Cgroup: e.groups, CgroupPaths: e.cgPaths}
return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil
@@ -619,3 +623,104 @@ func (e *UniversalExecutor) interpolateServices(task *structs.Task) {
service.Tags = e.ctx.TaskEnv.ParseAndReplace(service.Tags)
}
}
func (e *UniversalExecutor) collectPids() {
for {
timer := time.NewTimer(5 * time.Second)
select {
case <-timer.C:
pids, err := e.getAllPids()
e.logger.Printf("DIPTANU PIDS %#v", pids)
timer.Reset(5 * time.Second)
if err != nil {
e.logger.Printf("[DEBUG] executor: error collecting pids: %v", err)
}
e.pids = pids
case <-e.processExited:
timer.Stop()
return
}
}
}
func (e *UniversalExecutor) scanPids() ([]int, error) {
processFamily := make(map[int]struct{})
processFamily[os.Getpid()] = struct{}{}
pids, err := ps.Processes()
if err != nil {
return nil, err
}
for _, pid := range pids {
_, parentPid := processFamily[pid.Pid()]
_, childPid := processFamily[pid.PPid()]
if parentPid || childPid {
processFamily[pid.Pid()] = struct{}{}
}
}
res := make([]int, 0, len(processFamily))
for pid := range processFamily {
res = append(res, pid)
}
return res, nil
}
func (e *UniversalExecutor) resourceUsagePids() (*cstructs.TaskResourceUsage, error) {
resourceUsage := make(map[int]*cstructs.TaskResourceUsage)
ts := time.Now()
for _, pid := range e.pids {
p, err := process.NewProcess(int32(pid))
if err != nil {
e.logger.Printf("[DEBUG] executor: unable to create new process with pid: %v", pid)
}
memInfo, err := p.MemoryInfo()
if err != nil {
e.logger.Printf("[DEBUG] executor: unable to get memory stats for process: %v", pid)
}
cpuStats, err := p.Times()
if err != nil {
e.logger.Printf("[DEBUG] executor: unable to get cpu stats for process: %v", pid)
}
ms := &cstructs.MemoryStats{
RSS: memInfo.RSS,
Swap: memInfo.Swap,
}
percent, _ := p.Percent(0)
cs := &cstructs.CpuUsage{
SystemMode: cpuStats.System,
UserMode: cpuStats.User,
Percent: percent,
}
resourceUsage[pid] = &cstructs.TaskResourceUsage{MemoryStats: ms, CpuStats: cs, Timestamp: ts}
}
var (
systemModeCPU, userModeCPU, percent float64
totalRSS, totalSwap uint64
)
for _, rs := range resourceUsage {
systemModeCPU += rs.CpuStats.SystemMode
userModeCPU += rs.CpuStats.UserMode
percent += rs.CpuStats.Percent
totalRSS += rs.MemoryStats.RSS
totalSwap += rs.MemoryStats.Swap
}
totalCPU := &cstructs.CpuUsage{
SystemMode: systemModeCPU,
UserMode: userModeCPU,
Percent: percent,
}
totalMemory := &cstructs.MemoryStats{
RSS: totalRSS,
Swap: totalSwap,
}
return &cstructs.TaskResourceUsage{MemoryStats: totalMemory, CpuStats: totalCPU}, nil
}