From 26d308dcb8b5848c5b90d1d34bd02e2662bf7e17 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 11 Feb 2016 14:44:35 -0800 Subject: [PATCH] Fixed an issue with purge --- client/driver/docker.go | 4 +- client/driver/logcollector/collector.go | 2 + client/driver/logrotator/logs.go | 70 ++++++++++++++++--------- client/driver/logrotator/logs_test.go | 15 ++++-- nomad/structs/structs.go | 14 ++--- 5 files changed, 68 insertions(+), 37 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 33d6184a3..60bce0fc9 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -697,7 +697,9 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult { func (h *DockerHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout - h.logCollector.UpdateLogConfig(task.LogConfig) + if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil { + h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err) + } // Update is not possible return nil diff --git a/client/driver/logcollector/collector.go b/client/driver/logcollector/collector.go index 3fdc90896..fab901428 100644 --- a/client/driver/logcollector/collector.go +++ b/client/driver/logcollector/collector.go @@ -104,6 +104,7 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl if err != nil { return nil, err } + s.lro = lro go lro.Start(ro) re, we := io.Pipe() @@ -113,6 +114,7 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl if err != nil { return nil, err } + s.lre = lre go lre.Start(re) go func(channel syslog.LogPartsChannel) { diff --git a/client/driver/logrotator/logs.go b/client/driver/logrotator/logs.go index 161db2dc1..a9eecf983 100644 --- a/client/driver/logrotator/logs.go +++ b/client/driver/logrotator/logs.go @@ -23,9 +23,11 @@ type LogRotator struct { path string // path where the rotated files are created fileName string // base file name of the rotated files - logFileIdx int // index to the current file + logFileIdx int // index to the current file + oldestLogFileIdx int // index to the oldest log file - logger *log.Logger + logger *log.Logger + purgeCh chan struct{} } // NewLogRotator configures and returns a new LogRotator @@ -51,14 +53,18 @@ func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, l } } - return &LogRotator{ + lr := &LogRotator{ MaxFiles: maxFiles, FileSize: fileSize, path: path, fileName: fileName, logFileIdx: logFileIdx, logger: logger, - }, nil + purgeCh: make(chan struct{}, 1), + } + go lr.PurgeOldFiles() + + return lr, nil } // Start reads from a Reader and writes them to files and rotates them when the @@ -122,6 +128,13 @@ func (l *LogRotator) Start(r io.Reader) error { totalWritten += nr } l.logFileIdx = l.logFileIdx + 1 + // Purge old files if we have more files than MaxFiles + if l.logFileIdx-l.oldestLogFileIdx >= l.MaxFiles { + select { + case l.purgeCh <- struct{}{}: + default: + } + } } return nil } @@ -129,29 +142,36 @@ func (l *LogRotator) Start(r io.Reader) error { // PurgeOldFiles removes older files and keeps only the last N files rotated for // a file func (l *LogRotator) PurgeOldFiles() { - var fIndexes []int - files, err := ioutil.ReadDir(l.path) - if err != nil { - return - } - // Inserting all the rotated files in a slice - for _, f := range files { - if strings.HasPrefix(f.Name(), l.fileName) { - fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName)) - n, err := strconv.Atoi(fileIdx) + for { + select { + case <-l.purgeCh: + var fIndexes []int + files, err := ioutil.ReadDir(l.path) if err != nil { - continue + return } - fIndexes = append(fIndexes, n) + // Inserting all the rotated files in a slice + for _, f := range files { + if strings.HasPrefix(f.Name(), l.fileName) { + fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName)) + n, err := strconv.Atoi(fileIdx) + if err != nil { + continue + } + fIndexes = append(fIndexes, n) + } + } + + // Sorting the file indexes so that we can purge the older files and keep + // only the number of files as configured by the user + sort.Sort(sort.IntSlice(fIndexes)) + var toDelete []int + toDelete = fIndexes[0 : len(fIndexes)-l.MaxFiles] + for _, fIndex := range toDelete { + fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) + os.RemoveAll(fname) + } + l.oldestLogFileIdx = fIndexes[0] } } - - // Sorting the file indexes so that we can purge the older files and keep - // only the number of files as configured by the user - sort.Sort(sort.IntSlice(fIndexes)) - toDelete := fIndexes[l.MaxFiles-1 : len(fIndexes)-1] - for _, fIndex := range toDelete { - fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) - os.RemoveAll(fname) - } } diff --git a/client/driver/logrotator/logs_test.go b/client/driver/logrotator/logs_test.go index fd7252b80..b497ab993 100644 --- a/client/driver/logrotator/logs_test.go +++ b/client/driver/logrotator/logs_test.go @@ -1,12 +1,14 @@ package logrotator import ( + "fmt" "io" "io/ioutil" "log" "os" "path/filepath" "testing" + "time" ) var ( @@ -228,7 +230,10 @@ func TestLogRotator_PurgeDirs(t *testing.T) { r, w := io.Pipe() go func() { - w.Write([]byte("abcdefghijklmno")) + w.Write([]byte("abcdefghijklmnopqrxyz")) + time.Sleep(1 * time.Second) + l.MaxFiles = 1 + w.Write([]byte("abcdefghijklmnopqrxyz")) w.Close() }() @@ -236,14 +241,16 @@ func TestLogRotator_PurgeDirs(t *testing.T) { if err != nil && err != io.EOF { t.Fatalf("failure in logrotator start: %v", err) } - l.PurgeOldFiles() + // sleeping for a second because purging is async + time.Sleep(1 * time.Second) files, err := ioutil.ReadDir(path) if err != nil { t.Fatalf("err: %v", err) } - if len(files) != 2 { - t.Fatalf("expected number of files: %v, actual: %v", 2, len(files)) + expected := 1 + if len(files) != expected { + t.Fatalf("expected number of files: %v, actual: %v", expected, len(files)) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ba91d8fdc..f04e96fe2 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1458,15 +1458,15 @@ func DefaultLogConfig() *LogConfig { } } -// MeetsMinResources returns an error if the log config specified are less than +// Validate returns an error if the log config specified are less than // the minimum allowed. -func (l *LogConfig) MeetsMinResources() error { +func (l *LogConfig) Validate() error { var mErr multierror.Error - if l.MaxFiles < 10 { - mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum number of files is 10; got %d", l.MaxFiles)) + if l.MaxFiles < 1 { + mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum number of files is 1; got %d", l.MaxFiles)) } - if l.MaxFileSizeMB < 10 { - mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum file size is 10MB; got %d", l.MaxFileSizeMB)) + if l.MaxFileSizeMB < 1 { + mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum file size is 1MB; got %d", l.MaxFileSizeMB)) } return mErr.ErrorOrNil() } @@ -1667,7 +1667,7 @@ func (t *Task) Validate() error { // Validate the log config if t.LogConfig == nil { mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config")) - } else if err := t.LogConfig.MeetsMinResources(); err != nil { + } else if err := t.LogConfig.Validate(); err != nil { mErr.Errors = append(mErr.Errors, err) }