Incorporated review comments for executor

This commit is contained in:
Diptanu Choudhury
2016-05-24 22:54:32 -07:00
parent bf6c034fec
commit 584c1e34fb
4 changed files with 48 additions and 22 deletions

View File

@@ -30,6 +30,9 @@ import (
)
const (
// pidScanInterval is the interval at which the executor scans the process
// tree for finding out the pids that the executor and it's child processes
// have forked
pidScanInterval = 5 * time.Second
)
@@ -130,8 +133,8 @@ type ProcessState struct {
Time time.Time
}
// NomadPid holds a pid and it's cpu percentage calculator
type NomadPid struct {
// nomadPid holds a pid and it's cpu percentage calculator
type nomadPid struct {
pid int
cpuStats *stats.CpuStats
}
@@ -160,7 +163,8 @@ type UniversalExecutor struct {
ctx *ExecutorContext
command *ExecCommand
pids []*NomadPid
pids []*nomadPid
pidLock sync.RWMutex
taskDir string
exitState *ProcessState
processExited chan interface{}
@@ -481,10 +485,14 @@ func (e *UniversalExecutor) DeregisterServices() error {
return nil
}
// PidStats returns the resource usage stats per pid
func (e *UniversalExecutor) PidStats() (map[string]*cstructs.ResourceUsage, error) {
// pidStats returns the resource usage stats per pid
func (e *UniversalExecutor) pidStats() (map[string]*cstructs.ResourceUsage, error) {
stats := make(map[string]*cstructs.ResourceUsage)
ts := time.Now()
e.pidLock.RLock()
pids := make([]*nomadPid, len(e.pids))
copy(pids, e.pids)
e.pidLock.RUnlock()
for _, pid := range e.pids {
p, err := process.NewProcess(int32(pid.pid))
if err != nil {
@@ -677,16 +685,18 @@ func (e *UniversalExecutor) interpolateServices(task *structs.Task) {
// collectPids collects the pids of the child processes that the executor is
// running every 5 seconds
func (e *UniversalExecutor) collectPids() {
timer := time.NewTimer(pidScanInterval)
for {
timer := time.NewTimer(pidScanInterval)
select {
case <-timer.C:
pids, err := e.getAllPids()
timer.Reset(pidScanInterval)
if err != nil {
e.logger.Printf("[DEBUG] executor: error collecting pids: %v", err)
}
e.pidLock.Lock()
e.pids = pids
e.pidLock.Unlock()
timer.Reset(pidScanInterval)
case <-e.processExited:
timer.Stop()
return
@@ -696,7 +706,7 @@ func (e *UniversalExecutor) collectPids() {
// scanPids scans all the pids on the machine running the current executor and
// returns the child processes of the executor.
func (e *UniversalExecutor) scanPids() ([]*NomadPid, error) {
func (e *UniversalExecutor) scanPids() ([]*nomadPid, error) {
processFamily := make(map[int]struct{})
processFamily[os.Getpid()] = struct{}{}
pids, err := ps.Processes()
@@ -710,18 +720,18 @@ func (e *UniversalExecutor) scanPids() ([]*NomadPid, error) {
processFamily[pid.Pid()] = struct{}{}
}
}
res := make([]*NomadPid, 0, len(processFamily))
res := make([]*nomadPid, 0, len(processFamily))
for pid := range processFamily {
res = append(res, &NomadPid{pid, stats.NewCpuStats()})
res = append(res, &nomadPid{pid, stats.NewCpuStats()})
}
return res, nil
}
// resourceUsagePids aggreagates the resources used by all the pids that are
// resourceUsagePids aggregates the resources used by all the pids that are
// spawned by the executor and the user process.
func (e *UniversalExecutor) resourceUsagePids() (*cstructs.TaskResourceUsage, error) {
ts := time.Now()
pidStats, err := e.PidStats()
pidStats, err := e.pidStats()
if err != nil {
return nil, err
}
@@ -750,6 +760,14 @@ func (e *UniversalExecutor) resourceUsagePids() (*cstructs.TaskResourceUsage, er
Swap: totalSwap,
}
resourceUsage := cstructs.ResourceUsage{MemoryStats: totalMemory, CpuStats: totalCPU, Timestamp: ts}
return &cstructs.TaskResourceUsage{ResourceUsage: &resourceUsage, Timestamp: ts, Pids: pidStats}, nil
resourceUsage := cstructs.ResourceUsage{
MemoryStats: totalMemory,
CpuStats: totalCPU,
Timestamp: ts,
}
return &cstructs.TaskResourceUsage{
ResourceUsage: &resourceUsage,
Timestamp: ts,
Pids: pidStats,
}, nil
}

View File

@@ -35,6 +35,6 @@ func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) {
return e.resourceUsagePids()
}
func (e *UniversalExecutor) getAllPids() ([]*NomadPid, error) {
func (e *UniversalExecutor) getAllPids() ([]*nomadPid, error) {
return e.scanPids()
}

View File

@@ -125,7 +125,9 @@ func (e *UniversalExecutor) configureCgroups(resources *structs.Resources) error
return nil
}
// Stats reports the resource utilization of the cgroup
// Stats reports the resource utilization of the cgroup. If there is no resource
// isolation we aggregate the resource utilization of all the pids launched by
// the executor.
func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) {
if !e.command.ResourceLimits {
return e.resourceUsagePids()
@@ -168,7 +170,14 @@ func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) {
if e.cpuStats != nil {
cs.Percent = e.cpuStats.Percent(float64(totalProcessCPUUsage / nanosecondsInSecond))
}
taskResUsage := cstructs.TaskResourceUsage{ResourceUsage: &cstructs.ResourceUsage{MemoryStats: ms, CpuStats: cs, Timestamp: ts}, Timestamp: ts}
taskResUsage := cstructs.TaskResourceUsage{
ResourceUsage: &cstructs.ResourceUsage{
MemoryStats: ms,
CpuStats: cs,
Timestamp: ts,
},
Timestamp: ts,
}
if pidStats, err := e.PidStats(); err == nil {
taskResUsage.Pids = pidStats
}

View File

@@ -31,20 +31,19 @@ func (c *CpuStats) Percent(currentProcessUsage float64) float64 {
return 0.0
}
numcpu := runtime.NumCPU()
delta := (now.Sub(c.prevTime).Seconds()) * float64(numcpu)
ret := c.calculatePercent(c.prevProcessUsage, currentProcessUsage, delta, numcpu)
delta := (now.Sub(c.prevTime).Seconds()) * float64(c.totalCpus)
ret := c.calculatePercent(c.prevProcessUsage, currentProcessUsage, delta)
c.prevProcessUsage = currentProcessUsage
c.prevTime = now
return ret
}
func (c *CpuStats) calculatePercent(t1, t2 float64, delta float64, numcpu int) float64 {
func (c *CpuStats) calculatePercent(t1, t2 float64, delta float64) float64 {
if delta == 0 {
return 0
}
delta_proc := t2 - t1
overall_percent := ((delta_proc / delta) * 100) * float64(numcpu)
overall_percent := ((delta_proc / delta) * 100) * float64(c.totalCpus)
return overall_percent
}