diff --git a/client/driver/logging/rotator.go b/client/driver/logging/rotator.go index 72dea0073..5606d600f 100644 --- a/client/driver/logging/rotator.go +++ b/client/driver/logging/rotator.go @@ -14,7 +14,8 @@ import ( "time" ) -var ( +const ( + bufSize = 32768 flushDur = 100 * time.Millisecond ) @@ -31,6 +32,7 @@ type FileRotator struct { currentFile *os.File // currentFile is the file that is currently getting written currentWr int64 // currentWr is the number of bytes written to the current file bufw *bufio.Writer + bufLock sync.Mutex flushTicker *time.Ticker logger *log.Logger @@ -74,7 +76,7 @@ func (f *FileRotator) Write(p []byte) (n int, err error) { // Check if we still have space in the current file, otherwise close and // open the next file if f.currentWr >= f.FileSize { - f.bufw.Flush() + f.flushBuffer() f.currentFile.Close() if err := f.nextFile(); err != nil { f.logger.Printf("[ERROR] driver.rotator: error creating next file: %v", err) @@ -89,12 +91,10 @@ func (f *FileRotator) Write(p []byte) (n int, err error) { if remainingSize < int64(len(p[n:])) { // Write the number of bytes that we can write on the current file li := int64(n) + remainingSize - nw, err = f.bufw.Write(p[n:li]) - //nw, err = f.currentFile.Write(p[n:li]) + nw, err = f.writeToBuffer(p[n:li]) } else { // Write all the bytes in the current file - nw, err = f.bufw.Write(p[n:]) - //nw, err = f.currentFile.Write(p[n:]) + nw, err = f.writeToBuffer(p[n:]) } // Increment the number of bytes written so far in this method @@ -181,11 +181,7 @@ func (f *FileRotator) createFile() error { return err } f.currentWr = fi.Size() - if f.bufw == nil { - f.bufw = bufio.NewWriter(f.currentFile) - } else { - f.bufw.Reset(f.currentFile) - } + f.createOrResetBuffer() return nil } @@ -193,9 +189,7 @@ func (f *FileRotator) createFile() error { // file func (f *FileRotator) flushPeriodically() { for _ = range f.flushTicker.C { - if f.bufw != nil { - f.bufw.Flush() - } + f.flushBuffer() } } @@ -205,9 +199,7 @@ func (f *FileRotator) Close() { // Stop the ticker and flush for one last time f.flushTicker.Stop() - if f.bufw != nil { - f.bufw.Flush() - } + f.flushBuffer() // Stop the purge go routine if !f.closed { @@ -260,3 +252,32 @@ func (f *FileRotator) purgeOldFiles() { } } } + +// flushBuffer flushes the buffer +func (f *FileRotator) flushBuffer() error { + f.bufLock.Lock() + defer f.bufLock.Unlock() + if f.bufw != nil { + return f.bufw.Flush() + } + return nil +} + +// writeToBuffer writes the byte array to buffer +func (f *FileRotator) writeToBuffer(p []byte) (int, error) { + f.bufLock.Lock() + defer f.bufLock.Unlock() + return f.bufw.Write(p) +} + +// createOrResetBuffer creates a new buffer if we don't have one otherwise +// resets the buffer +func (f *FileRotator) createOrResetBuffer() { + f.bufLock.Lock() + defer f.bufLock.Unlock() + if f.bufw == nil { + f.bufw = bufio.NewWriterSize(f.currentFile, bufSize) + } else { + f.bufw.Reset(f.currentFile) + } +}