From 757bc663211386bbc7f5efc9a8f5f9b4bd15f9a8 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 31 May 2016 23:09:05 +0000 Subject: [PATCH 1/2] fix wait result being nil and some panics in the cli --- client/driver/executor/executor.go | 2 +- client/task_runner.go | 22 ++++++++++++++++------ command/alloc_status.go | 2 +- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 604ee50bd..582a4cefd 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -492,7 +492,7 @@ func (e *UniversalExecutor) pidStats() (map[string]*cstructs.ResourceUsage, erro pids := make([]*nomadPid, len(e.pids)) copy(pids, e.pids) e.pidLock.RUnlock() - for _, pid := range e.pids { + for _, pid := range pids { p, err := process.NewProcess(int32(pid.pid)) if err != nil { e.logger.Printf("[DEBUG] executor: unable to create new process with pid: %v", pid.pid) diff --git a/client/task_runner.go b/client/task_runner.go index 370775801..8482bc4e9 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -307,6 +307,7 @@ func (r *TaskRunner) validateTask() error { func (r *TaskRunner) run() { // Predeclare things so we an jump to the RESTART var handleEmpty bool + var stopCollection chan struct{} for { // Download the task's artifacts @@ -339,8 +340,10 @@ func (r *TaskRunner) run() { r.handleLock.Lock() handleEmpty = r.handle == nil r.handleLock.Unlock() + if handleEmpty { - startErr := r.startTask() + stopCollection = make(chan struct{}) + startErr := r.startTask(stopCollection) r.restartTracker.SetStartError(startErr) if startErr != nil { r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr)) @@ -360,6 +363,9 @@ func (r *TaskRunner) run() { panic("nil wait") } + // Stop collection the task's resource usage + close(stopCollection) + // Log whether the task was successful or not. r.restartTracker.SetWaitResult(waitRes) r.setState(structs.TaskStateDead, r.waitErrorToEvent(waitRes)) @@ -435,7 +441,10 @@ func (r *TaskRunner) run() { } } -func (r *TaskRunner) startTask() error { +// startTask creates the driver and start the task. Resource usage is also +// collect in a launched goroutine. Collection ends when the passed channel is +// closed +func (r *TaskRunner) startTask(stopCollection <-chan struct{}) error { // Create a driver driver, err := r.createDriver() if err != nil { @@ -455,12 +464,13 @@ func (r *TaskRunner) startTask() error { r.handleLock.Lock() r.handle = handle r.handleLock.Unlock() - go r.collectResourceUsageStats() + go r.collectResourceUsageStats(stopCollection) return nil } -// collectResourceUsageStats starts collecting resource usage stats of a Task -func (r *TaskRunner) collectResourceUsageStats() { +// collectResourceUsageStats starts collecting resource usage stats of a Task. +// Collection ends when the passed channel is closed +func (r *TaskRunner) collectResourceUsageStats(stopCollection <-chan struct{}) { // start collecting the stats right away and then start collecting every // collection interval next := time.NewTimer(0) @@ -476,7 +486,7 @@ func (r *TaskRunner) collectResourceUsageStats() { r.resourceUsage.Enqueue(ru) r.resourceUsageLock.Unlock() next.Reset(r.config.StatsCollectionInterval) - case <-r.handle.WaitCh(): + case <-stopCollection: return } } diff --git a/command/alloc_status.go b/command/alloc_status.go index e3e8a4c41..de5958afb 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -347,7 +347,7 @@ func (c *AllocStatusCommand) taskResources(alloc *api.Allocation, stats map[stri } cpuUsage := strconv.Itoa(resource.CPU) memUsage := strconv.Itoa(resource.MemoryMB) - if ru, ok := stats[task]; ok { + if ru, ok := stats[task]; ok && ru != nil && ru.ResourceUsage != nil { cpuStats := ru.ResourceUsage.CpuStats memoryStats := ru.ResourceUsage.MemoryStats cpuUsage = fmt.Sprintf("%v/%v", (cpuStats.SystemMode + cpuStats.UserMode), resource.CPU) From 1c60397f268f7d03248752950950a82a25b84746 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 31 May 2016 23:50:14 +0000 Subject: [PATCH 2/2] flaky test --- nomad/eval_endpoint_test.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 3fa270ce8..993587807 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -669,13 +669,21 @@ func TestEvalEndpoint_Reblock(t *testing.T) { t.Fatal(err) } - out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out == nil { - t.Fatalf("missing eval") - } + var out *structs.Evaluation + var token string + var err error + testutil.WaitForResult(func() (bool, error) { + out, token, err = s1.evalBroker.Dequeue(defaultSched, time.Second) + if err != nil { + return false, err + } + if out == nil { + return false, nil + } + return true, nil + }, func(err error) { + t.Fatal(err) + }) get := &structs.EvalUpdateRequest{ Evals: []*structs.Evaluation{eval1},