mirror of
https://github.com/kemko/nomad.git
synced 2026-01-09 03:45:41 +03:00
Merge pull request #4282 from hashicorp/f-rotator
Avoid splitting log line across two files
This commit is contained in:
@@ -2,6 +2,7 @@ package logging
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
@@ -15,8 +16,20 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
bufSize = 32768
|
||||
flushDur = 100 * time.Millisecond
|
||||
// logBufferSize is the size of the buffer.
|
||||
logBufferSize = 32 * 1024
|
||||
|
||||
// bufferFlushDuration is the duration at which we flush the buffer.
|
||||
bufferFlushDuration = 100 * time.Millisecond
|
||||
|
||||
// lineScanLimit is the number of bytes we will attempt to scan for new
|
||||
// lines when approaching the end of the file to avoid a log line being
|
||||
// split between two files. Any single line that is greater than this limit
|
||||
// may be split.
|
||||
lineScanLimit = 16 * 1024
|
||||
|
||||
// newLineDelimiter is the delimiter used for new lines.
|
||||
newLineDelimiter = '\n'
|
||||
)
|
||||
|
||||
// FileRotator writes bytes to a rotated set of files
|
||||
@@ -53,7 +66,7 @@ func NewFileRotator(path string, baseFile string, maxFiles int,
|
||||
path: path,
|
||||
baseFileName: baseFile,
|
||||
|
||||
flushTicker: time.NewTicker(flushDur),
|
||||
flushTicker: time.NewTicker(bufferFlushDuration),
|
||||
logger: logger,
|
||||
purgeCh: make(chan struct{}, 1),
|
||||
doneCh: make(chan struct{}, 1),
|
||||
@@ -70,12 +83,13 @@ func NewFileRotator(path string, baseFile string, maxFiles int,
|
||||
// equal to the maximum size the user has defined.
|
||||
func (f *FileRotator) Write(p []byte) (n int, err error) {
|
||||
n = 0
|
||||
var nw int
|
||||
var forceRotate bool
|
||||
|
||||
for n < len(p) {
|
||||
// Check if we still have space in the current file, otherwise close and
|
||||
// open the next file
|
||||
if f.currentWr >= f.FileSize {
|
||||
if forceRotate || f.currentWr >= f.FileSize {
|
||||
forceRotate = false
|
||||
f.flushBuffer()
|
||||
f.currentFile.Close()
|
||||
if err := f.nextFile(); err != nil {
|
||||
@@ -83,15 +97,38 @@ func (f *FileRotator) Write(p []byte) (n int, err error) {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
// Calculate the remaining size on this file
|
||||
remainingSize := f.FileSize - f.currentWr
|
||||
// Calculate the remaining size on this file and how much we have left
|
||||
// to write
|
||||
remainingSpace := f.FileSize - f.currentWr
|
||||
remainingToWrite := int64(len(p[n:]))
|
||||
|
||||
// Check if the number of bytes that we have to write is less than the
|
||||
// remaining size of the file
|
||||
if remainingSize < int64(len(p[n:])) {
|
||||
// Write the number of bytes that we can write on the current file
|
||||
li := int64(n) + remainingSize
|
||||
nw, err = f.writeToBuffer(p[n:li])
|
||||
// Check if we are near the end of the file. If we are we attempt to
|
||||
// avoid a log line being split between two files.
|
||||
var nw int
|
||||
if (remainingSpace - lineScanLimit) < remainingToWrite {
|
||||
// Scan for new line and if the data up to new line fits in current
|
||||
// file, write to buffer
|
||||
idx := bytes.IndexByte(p[n:], newLineDelimiter)
|
||||
if idx >= 0 && (remainingSpace-int64(idx)-1) >= 0 {
|
||||
// We have space so write it to buffer
|
||||
nw, err = f.writeToBuffer(p[n : n+idx+1])
|
||||
} else if idx >= 0 {
|
||||
// We found a new line but don't have space so just force rotate
|
||||
forceRotate = true
|
||||
} else if remainingToWrite > f.FileSize || f.FileSize-lineScanLimit < 0 {
|
||||
// There is no new line remaining but there is no point in
|
||||
// rotating since the remaining data will not even fit in the
|
||||
// next file either so just fill this one up.
|
||||
li := int64(n) + remainingSpace
|
||||
if remainingSpace > remainingToWrite {
|
||||
li = int64(n) + remainingToWrite
|
||||
}
|
||||
nw, err = f.writeToBuffer(p[n:li])
|
||||
} else {
|
||||
// There is no new line in the data remaining for us to write
|
||||
// and it will fit in the next file so rotate.
|
||||
forceRotate = true
|
||||
}
|
||||
} else {
|
||||
// Write all the bytes in the current file
|
||||
nw, err = f.writeToBuffer(p[n:])
|
||||
@@ -283,7 +320,7 @@ func (f *FileRotator) createOrResetBuffer() {
|
||||
f.bufLock.Lock()
|
||||
defer f.bufLock.Unlock()
|
||||
if f.bufw == nil {
|
||||
f.bufw = bufio.NewWriterSize(f.currentFile, bufSize)
|
||||
f.bufw = bufio.NewWriterSize(f.currentFile, logBufferSize)
|
||||
} else {
|
||||
f.bufw.Reset(f.currentFile)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@@ -168,6 +169,65 @@ func TestFileRotator_RotateFiles(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestFileRotator_RotateFiles_Boundary(t *testing.T) {
|
||||
t.Parallel()
|
||||
var path string
|
||||
var err error
|
||||
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(path)
|
||||
|
||||
fr, err := NewFileRotator(path, baseFileName, 10, 5, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
|
||||
// We will write three times:
|
||||
// 1st: Write with new lines spanning two files
|
||||
// 2nd: Write long string with no new lines
|
||||
// 3rd: Write a single new line
|
||||
expectations := [][]byte{
|
||||
[]byte("ab\n"),
|
||||
[]byte("cdef\n"),
|
||||
[]byte("12345"),
|
||||
[]byte("67890"),
|
||||
[]byte("\n"),
|
||||
}
|
||||
|
||||
for _, str := range []string{"ab\ncdef\n", "1234567890", "\n"} {
|
||||
nw, err := fr.Write([]byte(str))
|
||||
if err != nil {
|
||||
t.Fatalf("got error while writing: %v", err)
|
||||
}
|
||||
|
||||
if nw != len(str) {
|
||||
t.Fatalf("expected %v, got %v", len(str), nw)
|
||||
}
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
|
||||
for i, exp := range expectations {
|
||||
fname := filepath.Join(path, fmt.Sprintf("redis.stdout.%d", i))
|
||||
fi, err := os.Stat(fname)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
return false, nil
|
||||
}
|
||||
if int(fi.Size()) != len(exp) {
|
||||
lastErr = fmt.Errorf("expected size: %v, actual: %v", len(exp), fi.Size())
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("%v", lastErr)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFileRotator_WriteRemaining(t *testing.T) {
|
||||
t.Parallel()
|
||||
var path string
|
||||
@@ -289,3 +349,48 @@ func TestFileRotator_PurgeOldFiles(t *testing.T) {
|
||||
t.Fatalf("%v", lastErr)
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkRotator(b *testing.B) {
|
||||
kb := 1024
|
||||
for _, inputSize := range []int{kb, 2 * kb, 4 * kb, 8 * kb, 16 * kb, 32 * kb, 64 * kb, 128 * kb, 256 * kb} {
|
||||
b.Run(fmt.Sprintf("%dKB", inputSize/kb), func(b *testing.B) {
|
||||
benchmarkRotatorWithInputSize(inputSize, b)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkRotatorWithInputSize(size int, b *testing.B) {
|
||||
var path string
|
||||
var err error
|
||||
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
|
||||
b.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(path)
|
||||
|
||||
fr, err := NewFileRotator(path, baseFileName, 5, 1024*1024, logger)
|
||||
if err != nil {
|
||||
b.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
// run the Fib function b.N times
|
||||
for n := 0; n < b.N; n++ {
|
||||
// Generate some input
|
||||
data := make([]byte, size)
|
||||
_, err := rand.Read(data)
|
||||
if err != nil {
|
||||
b.Fatalf("Error generating date: %v", err)
|
||||
}
|
||||
|
||||
// Insert random new lines
|
||||
for i := 0; i < 100; i++ {
|
||||
index := rand.Intn(size)
|
||||
data[index] = '\n'
|
||||
}
|
||||
|
||||
// Write the data
|
||||
if _, err := fr.Write(data); err != nil {
|
||||
b.Fatalf("Failed to write data: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user