mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
Fixed an issue with purge
This commit is contained in:
@@ -697,7 +697,9 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
|
||||
func (h *DockerHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.logCollector.UpdateLogConfig(task.LogConfig)
|
||||
if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil {
|
||||
h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err)
|
||||
}
|
||||
|
||||
// Update is not possible
|
||||
return nil
|
||||
|
||||
@@ -104,6 +104,7 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.lro = lro
|
||||
go lro.Start(ro)
|
||||
|
||||
re, we := io.Pipe()
|
||||
@@ -113,6 +114,7 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.lre = lre
|
||||
go lre.Start(re)
|
||||
|
||||
go func(channel syslog.LogPartsChannel) {
|
||||
|
||||
@@ -23,9 +23,11 @@ type LogRotator struct {
|
||||
path string // path where the rotated files are created
|
||||
fileName string // base file name of the rotated files
|
||||
|
||||
logFileIdx int // index to the current file
|
||||
logFileIdx int // index to the current file
|
||||
oldestLogFileIdx int // index to the oldest log file
|
||||
|
||||
logger *log.Logger
|
||||
logger *log.Logger
|
||||
purgeCh chan struct{}
|
||||
}
|
||||
|
||||
// NewLogRotator configures and returns a new LogRotator
|
||||
@@ -51,14 +53,18 @@ func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, l
|
||||
}
|
||||
}
|
||||
|
||||
return &LogRotator{
|
||||
lr := &LogRotator{
|
||||
MaxFiles: maxFiles,
|
||||
FileSize: fileSize,
|
||||
path: path,
|
||||
fileName: fileName,
|
||||
logFileIdx: logFileIdx,
|
||||
logger: logger,
|
||||
}, nil
|
||||
purgeCh: make(chan struct{}, 1),
|
||||
}
|
||||
go lr.PurgeOldFiles()
|
||||
|
||||
return lr, nil
|
||||
}
|
||||
|
||||
// Start reads from a Reader and writes them to files and rotates them when the
|
||||
@@ -122,6 +128,13 @@ func (l *LogRotator) Start(r io.Reader) error {
|
||||
totalWritten += nr
|
||||
}
|
||||
l.logFileIdx = l.logFileIdx + 1
|
||||
// Purge old files if we have more files than MaxFiles
|
||||
if l.logFileIdx-l.oldestLogFileIdx >= l.MaxFiles {
|
||||
select {
|
||||
case l.purgeCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -129,29 +142,36 @@ func (l *LogRotator) Start(r io.Reader) error {
|
||||
// PurgeOldFiles removes older files and keeps only the last N files rotated for
|
||||
// a file
|
||||
func (l *LogRotator) PurgeOldFiles() {
|
||||
var fIndexes []int
|
||||
files, err := ioutil.ReadDir(l.path)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Inserting all the rotated files in a slice
|
||||
for _, f := range files {
|
||||
if strings.HasPrefix(f.Name(), l.fileName) {
|
||||
fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName))
|
||||
n, err := strconv.Atoi(fileIdx)
|
||||
for {
|
||||
select {
|
||||
case <-l.purgeCh:
|
||||
var fIndexes []int
|
||||
files, err := ioutil.ReadDir(l.path)
|
||||
if err != nil {
|
||||
continue
|
||||
return
|
||||
}
|
||||
fIndexes = append(fIndexes, n)
|
||||
// Inserting all the rotated files in a slice
|
||||
for _, f := range files {
|
||||
if strings.HasPrefix(f.Name(), l.fileName) {
|
||||
fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName))
|
||||
n, err := strconv.Atoi(fileIdx)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
fIndexes = append(fIndexes, n)
|
||||
}
|
||||
}
|
||||
|
||||
// Sorting the file indexes so that we can purge the older files and keep
|
||||
// only the number of files as configured by the user
|
||||
sort.Sort(sort.IntSlice(fIndexes))
|
||||
var toDelete []int
|
||||
toDelete = fIndexes[0 : len(fIndexes)-l.MaxFiles]
|
||||
for _, fIndex := range toDelete {
|
||||
fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex))
|
||||
os.RemoveAll(fname)
|
||||
}
|
||||
l.oldestLogFileIdx = fIndexes[0]
|
||||
}
|
||||
}
|
||||
|
||||
// Sorting the file indexes so that we can purge the older files and keep
|
||||
// only the number of files as configured by the user
|
||||
sort.Sort(sort.IntSlice(fIndexes))
|
||||
toDelete := fIndexes[l.MaxFiles-1 : len(fIndexes)-1]
|
||||
for _, fIndex := range toDelete {
|
||||
fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex))
|
||||
os.RemoveAll(fname)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package logrotator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -228,7 +230,10 @@ func TestLogRotator_PurgeDirs(t *testing.T) {
|
||||
|
||||
r, w := io.Pipe()
|
||||
go func() {
|
||||
w.Write([]byte("abcdefghijklmno"))
|
||||
w.Write([]byte("abcdefghijklmnopqrxyz"))
|
||||
time.Sleep(1 * time.Second)
|
||||
l.MaxFiles = 1
|
||||
w.Write([]byte("abcdefghijklmnopqrxyz"))
|
||||
w.Close()
|
||||
}()
|
||||
|
||||
@@ -236,14 +241,16 @@ func TestLogRotator_PurgeDirs(t *testing.T) {
|
||||
if err != nil && err != io.EOF {
|
||||
t.Fatalf("failure in logrotator start: %v", err)
|
||||
}
|
||||
l.PurgeOldFiles()
|
||||
|
||||
// sleeping for a second because purging is async
|
||||
time.Sleep(1 * time.Second)
|
||||
files, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(files) != 2 {
|
||||
t.Fatalf("expected number of files: %v, actual: %v", 2, len(files))
|
||||
expected := 1
|
||||
if len(files) != expected {
|
||||
t.Fatalf("expected number of files: %v, actual: %v", expected, len(files))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1458,15 +1458,15 @@ func DefaultLogConfig() *LogConfig {
|
||||
}
|
||||
}
|
||||
|
||||
// MeetsMinResources returns an error if the log config specified are less than
|
||||
// Validate returns an error if the log config specified are less than
|
||||
// the minimum allowed.
|
||||
func (l *LogConfig) MeetsMinResources() error {
|
||||
func (l *LogConfig) Validate() error {
|
||||
var mErr multierror.Error
|
||||
if l.MaxFiles < 10 {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum number of files is 10; got %d", l.MaxFiles))
|
||||
if l.MaxFiles < 1 {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum number of files is 1; got %d", l.MaxFiles))
|
||||
}
|
||||
if l.MaxFileSizeMB < 10 {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum file size is 10MB; got %d", l.MaxFileSizeMB))
|
||||
if l.MaxFileSizeMB < 1 {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum file size is 1MB; got %d", l.MaxFileSizeMB))
|
||||
}
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
@@ -1667,7 +1667,7 @@ func (t *Task) Validate() error {
|
||||
// Validate the log config
|
||||
if t.LogConfig == nil {
|
||||
mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config"))
|
||||
} else if err := t.LogConfig.MeetsMinResources(); err != nil {
|
||||
} else if err := t.LogConfig.Validate(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user