mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 01:45:44 +03:00
Fixed implementation of the docker stats
This commit is contained in:
@@ -123,6 +123,9 @@ type DockerHandle struct {
|
||||
version string
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
stats chan *docker.Stats
|
||||
doneMonitoring chan bool
|
||||
resourceUsage *cstructs.TaskResourceUsage
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
@@ -768,12 +771,16 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
||||
version: d.config.Version,
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
stats: make(chan *docker.Stats),
|
||||
doneMonitoring: make(chan bool),
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
if err := exec.SyncServices(consulContext(d.config, container.ID)); err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err)
|
||||
}
|
||||
go h.monitorUsage()
|
||||
go h.startMonitoring()
|
||||
go h.run()
|
||||
return h, nil
|
||||
}
|
||||
@@ -841,6 +848,8 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
|
||||
version: pid.Version,
|
||||
killTimeout: pid.KillTimeout,
|
||||
maxKillTimeout: pid.MaxKillTimeout,
|
||||
stats: make(chan *docker.Stats),
|
||||
doneMonitoring: make(chan bool),
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
@@ -909,16 +918,7 @@ func (h *DockerHandle) Kill() error {
|
||||
}
|
||||
|
||||
func (h *DockerHandle) Stats() (*cstructs.TaskResourceUsage, error) {
|
||||
stats := make(chan *docker.Stats)
|
||||
done := make(chan bool)
|
||||
statsOpts := docker.StatsOptions{ID: h.containerID, Done: done, Stats: stats, Stream: false, Timeout: 2 * time.Second}
|
||||
if err := h.client.Stats(statsOpts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
containerStats := <-stats
|
||||
close(done)
|
||||
resourceUsage := cstructs.TaskResourceUsage{MemoryStats: &cstructs.MemoryStats{RSS: containerStats.MemoryStats.Stats.Rss}}
|
||||
return &resourceUsage, nil
|
||||
return h.resourceUsage, nil
|
||||
}
|
||||
|
||||
func (h *DockerHandle) run() {
|
||||
@@ -933,6 +933,8 @@ func (h *DockerHandle) run() {
|
||||
}
|
||||
|
||||
close(h.doneCh)
|
||||
h.doneMonitoring <- true
|
||||
close(h.doneMonitoring)
|
||||
h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err)
|
||||
close(h.waitCh)
|
||||
|
||||
@@ -969,3 +971,30 @@ func (h *DockerHandle) run() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DockerHandle) startMonitoring() {
|
||||
statsOpts := docker.StatsOptions{ID: h.containerID, Done: h.doneMonitoring, Stats: h.stats, Stream: true}
|
||||
if err := h.client.Stats(statsOpts); err != nil {
|
||||
h.logger.Printf("[DEBUG] driver.docker: error collecting stats from container %s: %v", h.containerID, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DockerHandle) monitorUsage() {
|
||||
for {
|
||||
select {
|
||||
case s := <-h.stats:
|
||||
if s != nil {
|
||||
ms := &cstructs.MemoryStats{
|
||||
RSS: s.MemoryStats.Stats.Rss,
|
||||
Cache: s.MemoryStats.Stats.Cache,
|
||||
Swap: s.MemoryStats.Stats.Swap,
|
||||
MaxUsage: s.MemoryStats.MaxUsage,
|
||||
}
|
||||
h.resourceUsage = &cstructs.TaskResourceUsage{MemoryStats: ms, Timestamp: s.Read}
|
||||
}
|
||||
case <-h.doneMonitoring:
|
||||
case <-h.doneCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user