From 1cae57aee9257edbd45d2c0085e0cb24992f9fa4 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 28 Apr 2016 16:06:01 -0700 Subject: [PATCH] Add the Stats api to driverhandle --- client/driver/docker.go | 13 +++++++++++++ client/driver/driver.go | 2 ++ client/driver/exec.go | 4 ++++ client/driver/executor/executor.go | 1 + client/driver/executor/executor_basic.go | 9 ++++++++- client/driver/executor/executor_linux.go | 11 +++++++++++ client/driver/executor_plugin.go | 15 +++++++++++++++ client/driver/java.go | 4 ++++ client/driver/qemu.go | 4 ++++ client/driver/raw_exec.go | 4 ++++ client/driver/rkt.go | 4 ++++ client/driver/structs/structs.go | 8 ++++++++ 12 files changed, 78 insertions(+), 1 deletion(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index b101dbcbd..59ba2da13 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -908,6 +908,19 @@ func (h *DockerHandle) Kill() error { return nil } +func (h *DockerHandle) Stats() (*cstructs.TaskResourceUsage, error) { + stats := make(chan *docker.Stats) + done := make(chan bool) + statsOpts := docker.StatsOptions{ID: h.containerID, Done: done, Stats: stats, Stream: false, Timeout: 2 * time.Second} + if err := h.client.Stats(statsOpts); err != nil { + return nil, err + } + containerStats := <-stats + close(done) + resourceUsage := cstructs.TaskResourceUsage{MemoryStats: &cstructs.MemoryStats{RSS: containerStats.MemoryStats.Stats.Rss}} + return &resourceUsage, nil +} + func (h *DockerHandle) run() { // Wait for it... exitCode, err := h.client.WaitContainer(h.containerID) diff --git a/client/driver/driver.go b/client/driver/driver.go index 68b3b46b9..1040ca094 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -113,6 +113,8 @@ type DriverHandle interface { // Kill is used to stop the task Kill() error + + Stats() (*cstructs.TaskResourceUsage, error) } // ExecContext is shared between drivers within an allocation diff --git a/client/driver/exec.go b/client/driver/exec.go index e5264c6fc..77456b028 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -280,6 +280,10 @@ func (h *execHandle) Kill() error { } } +func (h *execHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return h.executor.Stats() +} + func (h *execHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index c95437dca..8d7304fda 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -38,6 +38,7 @@ type Executor interface { SyncServices(ctx *ConsulContext) error DeregisterServices() error Version() (*ExecutorVersion, error) + Stats() (*cstructs.TaskResourceUsage, error) } // ConsulContext holds context to configure the consul client and run checks diff --git a/client/driver/executor/executor_basic.go b/client/driver/executor/executor_basic.go index db2479d82..65558aa91 100644 --- a/client/driver/executor/executor_basic.go +++ b/client/driver/executor/executor_basic.go @@ -2,7 +2,10 @@ package executor -import cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" +import ( + cstructs "github.com/hashicorp/nomad/client/driver/structs" + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" +) func (e *UniversalExecutor) configureChroot() error { return nil @@ -27,3 +30,7 @@ func (e *UniversalExecutor) applyLimits(pid int) error { func (e *UniversalExecutor) configureIsolation() error { return nil } + +func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) { + return nil, nil +} diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go index c7f3b5b7d..9858e00fe 100644 --- a/client/driver/executor/executor_linux.go +++ b/client/driver/executor/executor_linux.go @@ -15,6 +15,7 @@ import ( cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" "github.com/hashicorp/nomad/client/allocdir" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -116,6 +117,16 @@ func (e *UniversalExecutor) configureCgroups(resources *structs.Resources) error return nil } +func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) { + manager := getCgroupManager(e.groups, e.cgPaths) + stats, err := manager.GetStats() + if err != nil { + return nil, err + } + e.logger.Printf("DIPTANU stats %#v", stats.MemoryStats.Stats) + return &cstructs.TaskResourceUsage{}, nil +} + // runAs takes a user id as a string and looks up the user, and sets the command // to execute as that user. func (e *UniversalExecutor) runAs(userid string) error { diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index decb359e3..a4edf2da7 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/driver/executor" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -88,6 +89,12 @@ func (e *ExecutorRPC) Version() (*executor.ExecutorVersion, error) { return &version, err } +func (e *ExecutorRPC) Stats() (*cstructs.TaskResourceUsage, error) { + var resourceUsage cstructs.TaskResourceUsage + err := e.client.Call("Plugin.Stats", new(interface{}), &resourceUsage) + return &resourceUsage, err +} + type ExecutorRPCServer struct { Impl executor.Executor logger *log.Logger @@ -149,6 +156,14 @@ func (e *ExecutorRPCServer) Version(args interface{}, version *executor.Executor return err } +func (e *ExecutorRPCServer) Stats(args interface{}, resourceUsage *cstructs.TaskResourceUsage) error { + ru, err := e.Impl.Stats() + if ru != nil { + *resourceUsage = *ru + } + return err +} + type ExecutorPlugin struct { logger *log.Logger Impl *ExecutorRPCServer diff --git a/client/driver/java.go b/client/driver/java.go index acd2289a4..f18bdf842 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -380,6 +380,10 @@ func (h *javaHandle) Kill() error { } } +func (h *javaHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return h.executor.Stats() +} + func (h *javaHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) diff --git a/client/driver/qemu.go b/client/driver/qemu.go index aad12f77f..719ce41e7 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -375,6 +375,10 @@ func (h *qemuHandle) Kill() error { } } +func (h *qemuHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return h.executor.Stats() +} + func (h *qemuHandle) run() { ps, err := h.executor.Wait() if ps.ExitCode == 0 && err != nil { diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 8a71a73c4..2aa0695cc 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -277,6 +277,10 @@ func (h *rawExecHandle) Kill() error { } } +func (h *rawExecHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return h.executor.Stats() +} + func (h *rawExecHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 7aa6f8d00..137c643d7 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -399,6 +399,10 @@ func (h *rktHandle) Kill() error { } } +func (h *rktHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return nil, fmt.Errorf("stats not implemented for rkt") +} + func (h *rktHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index 9059df6ec..8cecb9f7a 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -84,3 +84,11 @@ type CheckResult struct { // Err is the error that a check returned Err error } + +type MemoryStats struct { + RSS uint64 +} + +type TaskResourceUsage struct { + MemoryStats *MemoryStats +}