From 584c1e34fbbb41396fe181fbc0f5f1ebe5cb0c82 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 24 May 2016 22:54:32 -0700 Subject: [PATCH] Incorporated review comments for executor --- client/driver/executor/executor.go | 46 ++++++++++++++++-------- client/driver/executor/executor_basic.go | 2 +- client/driver/executor/executor_linux.go | 13 +++++-- client/stats/cpu.go | 9 +++-- 4 files changed, 48 insertions(+), 22 deletions(-) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index b4136e83e..f0e4f98b9 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -30,6 +30,9 @@ import ( ) const ( + // pidScanInterval is the interval at which the executor scans the process + // tree for finding out the pids that the executor and it's child processes + // have forked pidScanInterval = 5 * time.Second ) @@ -130,8 +133,8 @@ type ProcessState struct { Time time.Time } -// NomadPid holds a pid and it's cpu percentage calculator -type NomadPid struct { +// nomadPid holds a pid and it's cpu percentage calculator +type nomadPid struct { pid int cpuStats *stats.CpuStats } @@ -160,7 +163,8 @@ type UniversalExecutor struct { ctx *ExecutorContext command *ExecCommand - pids []*NomadPid + pids []*nomadPid + pidLock sync.RWMutex taskDir string exitState *ProcessState processExited chan interface{} @@ -481,10 +485,14 @@ func (e *UniversalExecutor) DeregisterServices() error { return nil } -// PidStats returns the resource usage stats per pid -func (e *UniversalExecutor) PidStats() (map[string]*cstructs.ResourceUsage, error) { +// pidStats returns the resource usage stats per pid +func (e *UniversalExecutor) pidStats() (map[string]*cstructs.ResourceUsage, error) { stats := make(map[string]*cstructs.ResourceUsage) ts := time.Now() + e.pidLock.RLock() + pids := make([]*nomadPid, len(e.pids)) + copy(pids, e.pids) + e.pidLock.RUnlock() for _, pid := range e.pids { p, err := process.NewProcess(int32(pid.pid)) if err != nil { @@ -677,16 +685,18 @@ func (e *UniversalExecutor) interpolateServices(task *structs.Task) { // collectPids collects the pids of the child processes that the executor is // running every 5 seconds func (e *UniversalExecutor) collectPids() { + timer := time.NewTimer(pidScanInterval) for { - timer := time.NewTimer(pidScanInterval) select { case <-timer.C: pids, err := e.getAllPids() - timer.Reset(pidScanInterval) if err != nil { e.logger.Printf("[DEBUG] executor: error collecting pids: %v", err) } + e.pidLock.Lock() e.pids = pids + e.pidLock.Unlock() + timer.Reset(pidScanInterval) case <-e.processExited: timer.Stop() return @@ -696,7 +706,7 @@ func (e *UniversalExecutor) collectPids() { // scanPids scans all the pids on the machine running the current executor and // returns the child processes of the executor. -func (e *UniversalExecutor) scanPids() ([]*NomadPid, error) { +func (e *UniversalExecutor) scanPids() ([]*nomadPid, error) { processFamily := make(map[int]struct{}) processFamily[os.Getpid()] = struct{}{} pids, err := ps.Processes() @@ -710,18 +720,18 @@ func (e *UniversalExecutor) scanPids() ([]*NomadPid, error) { processFamily[pid.Pid()] = struct{}{} } } - res := make([]*NomadPid, 0, len(processFamily)) + res := make([]*nomadPid, 0, len(processFamily)) for pid := range processFamily { - res = append(res, &NomadPid{pid, stats.NewCpuStats()}) + res = append(res, &nomadPid{pid, stats.NewCpuStats()}) } return res, nil } -// resourceUsagePids aggreagates the resources used by all the pids that are +// resourceUsagePids aggregates the resources used by all the pids that are // spawned by the executor and the user process. func (e *UniversalExecutor) resourceUsagePids() (*cstructs.TaskResourceUsage, error) { ts := time.Now() - pidStats, err := e.PidStats() + pidStats, err := e.pidStats() if err != nil { return nil, err } @@ -750,6 +760,14 @@ func (e *UniversalExecutor) resourceUsagePids() (*cstructs.TaskResourceUsage, er Swap: totalSwap, } - resourceUsage := cstructs.ResourceUsage{MemoryStats: totalMemory, CpuStats: totalCPU, Timestamp: ts} - return &cstructs.TaskResourceUsage{ResourceUsage: &resourceUsage, Timestamp: ts, Pids: pidStats}, nil + resourceUsage := cstructs.ResourceUsage{ + MemoryStats: totalMemory, + CpuStats: totalCPU, + Timestamp: ts, + } + return &cstructs.TaskResourceUsage{ + ResourceUsage: &resourceUsage, + Timestamp: ts, + Pids: pidStats, + }, nil } diff --git a/client/driver/executor/executor_basic.go b/client/driver/executor/executor_basic.go index b458ea549..ca0120dc7 100644 --- a/client/driver/executor/executor_basic.go +++ b/client/driver/executor/executor_basic.go @@ -35,6 +35,6 @@ func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) { return e.resourceUsagePids() } -func (e *UniversalExecutor) getAllPids() ([]*NomadPid, error) { +func (e *UniversalExecutor) getAllPids() ([]*nomadPid, error) { return e.scanPids() } diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go index dc29f2d1a..e5b0376ad 100644 --- a/client/driver/executor/executor_linux.go +++ b/client/driver/executor/executor_linux.go @@ -125,7 +125,9 @@ func (e *UniversalExecutor) configureCgroups(resources *structs.Resources) error return nil } -// Stats reports the resource utilization of the cgroup +// Stats reports the resource utilization of the cgroup. If there is no resource +// isolation we aggregate the resource utilization of all the pids launched by +// the executor. func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) { if !e.command.ResourceLimits { return e.resourceUsagePids() @@ -168,7 +170,14 @@ func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) { if e.cpuStats != nil { cs.Percent = e.cpuStats.Percent(float64(totalProcessCPUUsage / nanosecondsInSecond)) } - taskResUsage := cstructs.TaskResourceUsage{ResourceUsage: &cstructs.ResourceUsage{MemoryStats: ms, CpuStats: cs, Timestamp: ts}, Timestamp: ts} + taskResUsage := cstructs.TaskResourceUsage{ + ResourceUsage: &cstructs.ResourceUsage{ + MemoryStats: ms, + CpuStats: cs, + Timestamp: ts, + }, + Timestamp: ts, + } if pidStats, err := e.PidStats(); err == nil { taskResUsage.Pids = pidStats } diff --git a/client/stats/cpu.go b/client/stats/cpu.go index d84f0c2b4..ca92c0e73 100644 --- a/client/stats/cpu.go +++ b/client/stats/cpu.go @@ -31,20 +31,19 @@ func (c *CpuStats) Percent(currentProcessUsage float64) float64 { return 0.0 } - numcpu := runtime.NumCPU() - delta := (now.Sub(c.prevTime).Seconds()) * float64(numcpu) - ret := c.calculatePercent(c.prevProcessUsage, currentProcessUsage, delta, numcpu) + delta := (now.Sub(c.prevTime).Seconds()) * float64(c.totalCpus) + ret := c.calculatePercent(c.prevProcessUsage, currentProcessUsage, delta) c.prevProcessUsage = currentProcessUsage c.prevTime = now return ret } -func (c *CpuStats) calculatePercent(t1, t2 float64, delta float64, numcpu int) float64 { +func (c *CpuStats) calculatePercent(t1, t2 float64, delta float64) float64 { if delta == 0 { return 0 } delta_proc := t2 - t1 - overall_percent := ((delta_proc / delta) * 100) * float64(numcpu) + overall_percent := ((delta_proc / delta) * 100) * float64(c.totalCpus) return overall_percent }