diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go index 011f806c3..471860624 100644 --- a/client/driver/docker_coordinator.go +++ b/client/driver/docker_coordinator.go @@ -191,7 +191,8 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport) + pm := newImageProgressManager(image, cancel, d.handlePullInactivity, + d.handlePullProgressReport, d.handleSlowPullProgressReport) defer pm.stop() pullOptions := docker.PullImageOptions{ @@ -394,16 +395,16 @@ func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{} } } -func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time, interval int64) { +func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp time.Time) { d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg) } -func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time, interval int64) { +func (d *dockerCoordinator) handlePullProgressReport(image, msg string, _ time.Time) { d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg) +} - if interval%int64(dockerPullProgressEmitInterval.Seconds()/dockerImageProgressReportInterval.Seconds()) == 0 { - d.emitEvent(image, "Docker image %s pull progress: %s", image, msg) - } +func (d *dockerCoordinator) handleSlowPullProgressReport(image, msg string, _ time.Time) { + d.emitEvent(image, "Docker image %s pull progress: %s", image, msg) } // recoverablePullError wraps the error gotten when trying to pull and image if diff --git a/client/driver/docker_progress.go b/client/driver/docker_progress.go index 17098e71b..ed0da74ce 100644 --- a/client/driver/docker_progress.go +++ b/client/driver/docker_progress.go @@ -161,10 +161,9 @@ func (p *imageProgress) totalBytes() int64 { } // progressReporterFunc defines the method for handeling inactivity and report -// events from the imageProgressManager. The image name, current status message, -// timestamp of last received status update, timestamp of when the pull started -// and current report interation are passed in. -type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time, interval int64) +// events from the imageProgressManager. The image name, current status message +// and timestamp of last received status update are passed in. +type progressReporterFunc func(image string, msg string, timestamp time.Time) // imageProgressManager tracks the progress of pulling a docker image from an // image repository. @@ -172,20 +171,23 @@ type progressReporterFunc func(image string, msg string, timestamp time.Time, pu // client pull image method in order to receive status updates from the docker // engine api. type imageProgressManager struct { - imageProgress *imageProgress - image string - activityDeadline time.Duration - inactivityFunc progressReporterFunc - reportInterval time.Duration - reporter progressReporterFunc - cancel context.CancelFunc - stopCh chan struct{} - buf bytes.Buffer + imageProgress *imageProgress + image string + activityDeadline time.Duration + inactivityFunc progressReporterFunc + reportInterval time.Duration + reporter progressReporterFunc + slowReportInterval time.Duration + slowReporter progressReporterFunc + lastSlowReport time.Time + cancel context.CancelFunc + stopCh chan struct{} + buf bytes.Buffer } func newImageProgressManager( image string, cancel context.CancelFunc, - inactivityFunc, reporter progressReporterFunc) *imageProgressManager { + inactivityFunc, reporter, slowReporter progressReporterFunc) *imageProgressManager { pm := &imageProgressManager{ image: image, @@ -193,6 +195,7 @@ func newImageProgressManager( inactivityFunc: inactivityFunc, reportInterval: dockerImageProgressReportInterval, reporter: reporter, + slowReporter: slowReporter, imageProgress: &imageProgress{ timestamp: time.Now(), layers: make(map[string]*layerProgress), @@ -207,21 +210,26 @@ func newImageProgressManager( // start intiates the ticker to trigger the inactivity and reporter handlers func (pm *imageProgressManager) start() { - pm.imageProgress.pullStart = time.Now() + now := time.Now() + pm.imageProgress.pullStart = now + pm.lastSlowReport = now go func() { ticker := time.NewTicker(dockerImageProgressReportInterval) - var interval int64 for { - interval++ select { case <-ticker.C: - msg, timestamp := pm.imageProgress.get() - if time.Now().Sub(timestamp) > pm.activityDeadline { - pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval) + msg, lastStatusTime := pm.imageProgress.get() + t := time.Now() + if t.Sub(lastStatusTime) > pm.activityDeadline { + pm.inactivityFunc(pm.image, msg, lastStatusTime) pm.cancel() return } - pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval) + if t.Sub(pm.lastSlowReport) > pm.slowReportInterval { + pm.slowReporter(pm.image, msg, lastStatusTime) + pm.lastSlowReport = t + } + pm.reporter(pm.image, msg, lastStatusTime) case <-pm.stopCh: return }