diff --git a/client/driver/logs.go b/client/driver/logs.go new file mode 100644 index 000000000..7297382cd --- /dev/null +++ b/client/driver/logs.go @@ -0,0 +1,155 @@ +package driver + +import ( + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path/filepath" + "sort" + "strconv" + "strings" +) + +const ( + bufSize = 32 * 1024 // Max number of bytes read from a buffer +) + +// LogRotator ingests data and writes out to a rotated set of files +type LogRotator struct { + maxFiles int // maximum number of rotated files retained by the log rotator + fileSize int64 // maximum file size of a rotated file + path string // path where the rotated files are created + fileName string // base file name of the rotated files + + logFileIdx int // index to the current file + + logger *log.Logger +} + +// NewLogRotator configures and returns a new LogRotator +func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, logger *log.Logger) (*LogRotator, error) { + files, err := ioutil.ReadDir(path) + if err != nil { + return nil, err + } + + // Finding out the log file with the largest index + logFileIdx := 0 + prefix := fmt.Sprintf("%s.", fileName) + for _, f := range files { + if strings.HasPrefix(f.Name(), prefix) { + fileIdx := strings.TrimPrefix(f.Name(), prefix) + n, err := strconv.Atoi(fileIdx) + if err != nil { + continue + } + if n > logFileIdx { + logFileIdx = n + } + } + } + + return &LogRotator{ + maxFiles: maxFiles, + fileSize: fileSize, + path: path, + fileName: fileName, + logFileIdx: logFileIdx, + logger: logger, + }, nil +} + +// Start reads from a Reader and writes them to files and rotates them when the +// size of the file becomes equal to the max size configured +func (l *LogRotator) Start(r io.Reader) error { + buf := make([]byte, bufSize) + for { + logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx)) + remainingSize := l.fileSize + if f, err := os.Stat(logFileName); err == nil { + // Skipping the current file if it happens to be a directory + if f.IsDir() { + l.logFileIdx += 1 + continue + } + // Calculating the remaining capacity of the log file + remainingSize = l.fileSize - f.Size() + } + f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return err + } + l.logger.Printf("[DEBUG] client.logrotator: opened a new file: %s", logFileName) + + // Closing the current log file if it doesn't have any more capacity + if remainingSize <= 0 { + l.logFileIdx = l.logFileIdx + 1 + f.Close() + continue + } + + // Reading from the reader and writing into the current log file as long + // as it has capacity or the reader closes + for { + var nr int + var err error + if remainingSize < bufSize { + nr, err = r.Read(buf[0:remainingSize]) + } else { + nr, err = r.Read(buf) + } + if err != nil { + f.Close() + return err + } + nw, err := f.Write(buf[:nr]) + if err != nil { + f.Close() + return err + } + if nr != nw { + f.Close() + return fmt.Errorf("failed to write data read from the reader into file, R: %d W: %d", nr, nw) + } + remainingSize -= int64(nr) + if remainingSize < 1 { + f.Close() + break + } + } + l.logFileIdx = l.logFileIdx + 1 + } + return nil +} + +// PurgeOldFiles removes older files and keeps only the last N files rotated for +// a file +func (l *LogRotator) PurgeOldFiles() { + var fIndexes []int + files, err := ioutil.ReadDir(l.path) + if err != nil { + return + } + // Inserting all the rotated files in a slice + for _, f := range files { + if strings.HasPrefix(f.Name(), l.fileName) { + fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName)) + n, err := strconv.Atoi(fileIdx) + if err != nil { + continue + } + fIndexes = append(fIndexes, n) + } + } + + // Sorting the file indexes so that we can purge the older files and keep + // only the number of files as configured by the user + sort.Sort(sort.IntSlice(fIndexes)) + toDelete := fIndexes[l.maxFiles-1 : len(fIndexes)-1] + for _, fIndex := range toDelete { + fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) + os.RemoveAll(fname) + } +} diff --git a/client/driver/logs_test.go b/client/driver/logs_test.go new file mode 100644 index 000000000..d45ae5fdd --- /dev/null +++ b/client/driver/logs_test.go @@ -0,0 +1,248 @@ +package driver + +import ( + "io" + "io/ioutil" + "log" + "os" + "path/filepath" + "testing" +) + +var ( + logger = log.New(os.Stdout, "", log.LstdFlags) + pathPrefix = "logrotator" +) + +func TestLogRotator_InvalidPath(t *testing.T) { + invalidPath := "/foo" + + if _, err := NewLogRotator(invalidPath, "redis.stdout", 10, 10, logger); err == nil { + t.Fatal("expected err") + } +} + +func TestLogRotator_FindCorrectIndex(t *testing.T) { + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + fname := filepath.Join(path, "redis.stdout.1") + if f, err := os.Create(fname); err == nil { + f.Close() + } + + fname = filepath.Join(path, "redis.stdout.2") + if f, err := os.Create(fname); err == nil { + f.Close() + } + + r, err := NewLogRotator(path, "redis.stdout", 10, 10, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + if r.logFileIdx != 2 { + t.Fatalf("Expected log file idx: %v, actual: %v", 2, r.logFileIdx) + } +} + +func TestLogRotator_AppendToCurrentFile(t *testing.T) { + var path string + var err error + defer os.RemoveAll(path) + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + fname := filepath.Join(path, "redis.stdout.0") + if f, err := os.Create(fname); err == nil { + f.WriteString("abcde") + f.Close() + } + + l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger) + if err != nil && err != io.EOF { + t.Fatalf("test setup err: %v", err) + } + + r, w := io.Pipe() + go func() { + w.Write([]byte("fg")) + w.Close() + }() + err = l.Start(r) + if err != nil && err != io.EOF { + t.Fatal(err) + } + finfo, err := os.Stat(fname) + if err != nil { + t.Fatal(err) + } + if finfo.Size() != 6 { + t.Fatalf("Expected size of file: %v, actual: %v", 6, finfo.Size()) + } +} + +func TestLogRotator_RotateFiles(t *testing.T) { + var path string + var err error + defer os.RemoveAll(path) + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + fname := filepath.Join(path, "redis.stdout.0") + if f, err := os.Create(fname); err == nil { + f.WriteString("abcde") + f.Close() + } + + l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + r, w := io.Pipe() + go func() { + // This should make the current log file rotate + w.Write([]byte("fg")) + w.Close() + }() + err = l.Start(r) + if err != nil && err != io.EOF { + t.Fatalf("Failure in logrotator start %v", err) + } + + if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1")); err == nil { + if finfo.Size() != 1 { + t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size()) + } + } else { + t.Fatal("expected file redis.stdout.1") + } + + if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")); err == nil { + if finfo.Size() != 6 { + t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size()) + } + } else { + t.Fatal("expected file redis.stdout.0") + } +} + +func TestLogRotator_StartFromEmptyDir(t *testing.T) { + var path string + var err error + defer os.RemoveAll(path) + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + + l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + r, w := io.Pipe() + go func() { + w.Write([]byte("abcdefg")) + w.Close() + }() + err = l.Start(r) + if err != nil && err != io.EOF { + t.Fatalf("Failure in logrotator start %v", err) + } + + finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")) + if err != nil { + t.Fatal(err) + } + if finfo.Size() != 7 { + t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size()) + } + +} + +func TestLogRotator_SetPathAsFile(t *testing.T) { + var f *os.File + var err error + var path string + defer os.RemoveAll(path) + if f, err = ioutil.TempFile("", pathPrefix); err != nil { + t.Fatalf("test setup problem: %v", err) + } + path = f.Name() + if _, err = NewLogRotator(f.Name(), "redis.stdout", 10, 10, logger); err == nil { + t.Fatal("expected error") + } +} + +func TestLogRotator_ExcludeDirs(t *testing.T) { + var path string + var err error + defer os.RemoveAll(path) + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + if err := os.Mkdir(filepath.Join(path, "redis.stdout.0"), os.ModeDir|os.ModePerm); err != nil { + t.Fatalf("test setup err: %v", err) + } + + l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + r, w := io.Pipe() + go func() { + w.Write([]byte("fg")) + w.Close() + }() + err = l.Start(r) + if err != nil && err != io.EOF { + t.Fatalf("Failure in logrotator start %v", err) + } + + finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1")) + if err != nil { + t.Fatal("expected rotator to create redis.stdout.1") + } + if finfo.Size() != 2 { + t.Fatalf("expected size: %v, actual: %v", 2, finfo.Size()) + } +} + +func TestLogRotator_PurgeDirs(t *testing.T) { + var path string + var err error + defer os.RemoveAll(path) + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + + l, err := NewLogRotator(path, "redis.stdout", 2, 4, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + r, w := io.Pipe() + go func() { + w.Write([]byte("abcdefghijklmno")) + w.Close() + }() + + err = l.Start(r) + if err != nil && err != io.EOF { + t.Fatalf("failure in logrotator start: %v", err) + } + l.PurgeOldFiles() + + files, err := ioutil.ReadDir(path) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(files) != 2 { + t.Fatalf("expected number of files: %v, actual: %v", 2, len(files)) + } +}