Making the log rotator a writer

This commit is contained in:
Diptanu Choudhury
2016-02-19 14:01:07 -08:00
parent 3ba626df72
commit 387fcc36c8
8 changed files with 43 additions and 521 deletions

View File

@@ -38,7 +38,7 @@ type ExecDriverConfig struct {
type execHandle struct {
pluginClient *plugin.Client
executor executor.Executor
isolationConfig *executor.IsolationConfig
isolationConfig *cstructs.IsolationConfig
userPid int
allocDir *allocdir.AllocDir
killTimeout time.Duration
@@ -153,7 +153,7 @@ type execId struct {
UserPid int
TaskDir string
AllocDir *allocdir.AllocDir
IsolationConfig *executor.IsolationConfig
IsolationConfig *cstructs.IsolationConfig
PluginConfig *PluginReattachConfig
}

View File

@@ -2,7 +2,6 @@ package executor
import (
"fmt"
"io"
"log"
"os"
"os/exec"
@@ -18,7 +17,8 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/client/driver/logrotator"
"github.com/hashicorp/nomad/client/driver/logging"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -61,18 +61,12 @@ type ExecCommand struct {
Args []string
}
// IsolationConfig has information about the isolation mechanism the executor
// uses to put resource constraints and isolation on the user process
type IsolationConfig struct {
Cgroup *cgroupConfig.Cgroup
}
// ProcessState holds information about the state of a user process.
type ProcessState struct {
Pid int
ExitCode int
Signal int
IsolationConfig *IsolationConfig
IsolationConfig *cstructs.IsolationConfig
Time time.Time
}
@@ -97,8 +91,8 @@ type UniversalExecutor struct {
groups *cgroupConfig.Cgroup
exitState *ProcessState
processExited chan interface{}
lre *logrotator.LogRotator
lro *logrotator.LogRotator
lre *logging.FileRotator
lro *logging.FileRotator
logger *log.Logger
lock sync.Mutex
@@ -135,28 +129,23 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
}
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
path := filepath.Join(e.taskDir, allocdir.TaskLocal)
lro, err := logging.NewFileRotator(path, fmt.Sprintf("%v.stdout", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, e.logger)
stdor, stdow := io.Pipe()
lro, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, e.logger)
if err != nil {
return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err)
}
e.cmd.Stdout = stdow
e.cmd.Stdout = lro
e.lro = lro
go lro.Start(stdor)
stder, stdew := io.Pipe()
lre, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, e.logger)
lre, err := logging.NewFileRotator(path, fmt.Sprintf("%v.stderr", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, e.logger)
if err != nil {
return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err)
}
e.cmd.Stderr = stdew
e.cmd.Stderr = lre
e.lre = lre
go lre.Start(stder)
// setting the env, path and args for the command
e.ctx.TaskEnv.Build()
@@ -175,7 +164,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
return nil, fmt.Errorf("error starting command: %v", err)
}
go e.wait()
ic := &IsolationConfig{Cgroup: e.groups}
ic := &cstructs.IsolationConfig{Cgroup: e.groups}
return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil
}

View File

@@ -45,7 +45,7 @@ type javaHandle struct {
pluginClient *plugin.Client
userPid int
executor executor.Executor
isolationConfig *executor.IsolationConfig
isolationConfig *cstructs.IsolationConfig
taskDir string
allocDir *allocdir.AllocDir
@@ -198,7 +198,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
type javaId struct {
KillTimeout time.Duration
PluginConfig *PluginReattachConfig
IsolationConfig *executor.IsolationConfig
IsolationConfig *cstructs.IsolationConfig
TaskDir string
AllocDir *allocdir.AllocDir
UserPid int

View File

@@ -33,7 +33,8 @@ type FileRotator struct {
purgeCh chan interface{}
}
func NewFileRotator(path string, baseFile string, maxFiles int, fileSize int64, logger *log.Logger) (*FileRotator, error) {
func NewFileRotator(path string, baseFile string, maxFiles int,
fileSize int64, logger *log.Logger) (*FileRotator, error) {
rotator := &FileRotator{
MaxFiles: maxFiles,
FileSize: fileSize,

View File

@@ -4,15 +4,13 @@ package logging
import (
"fmt"
"io"
"log"
"log/syslog"
"net"
"path/filepath"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/executor"
"github.com/hashicorp/nomad/client/driver/logrotator"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -40,7 +38,7 @@ type LogCollectorContext struct {
// SyslogCollectorState holds the address and islation information of a launched
// syslog server
type SyslogCollectorState struct {
IsolationConfig *executor.IsolationConfig
IsolationConfig *cstructs.IsolationConfig
Addr string
}
@@ -59,8 +57,8 @@ type SyslogCollector struct {
logConfig *structs.LogConfig
ctx *LogCollectorContext
lro *logrotator.LogRotator
lre *logrotator.LogRotator
lro *FileRotator
lre *FileRotator
server *SyslogServer
taskDir string
@@ -92,36 +90,32 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl
go syslogServer.Start()
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
ro, wo := io.Pipe()
lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, s.logger)
path := filepath.Join(s.taskDir, allocdir.TaskLocal)
lro, err := NewFileRotator(path, fmt.Sprintf("%v.stdout", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, s.logger)
if err != nil {
return nil, err
}
s.lro = lro
go lro.Start(ro)
re, we := io.Pipe()
lre, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, s.logger)
lre, err := NewFileRotator(path, fmt.Sprintf("%v.stderr", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, s.logger)
if err != nil {
return nil, err
}
s.lre = lre
go lre.Start(re)
go func(channel chan *SyslogMessage) {
for logParts := range channel {
// If the severity of the log line is err then we write to stderr
// otherwise all messages go to stdout
if logParts.Severity == syslog.LOG_ERR {
we.Write(logParts.Message)
we.Write([]byte("\n"))
s.lre.Write(logParts.Message)
s.lre.Write([]byte("\n"))
} else {
wo.Write(logParts.Message)
wo.Write([]byte("\n"))
s.lro.Write(logParts.Message)
s.lro.Write([]byte("\n"))
}
}
}(channel)

View File

@@ -1,176 +0,0 @@
package logrotator
import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
)
const (
bufSize = 32 * 1024 // Max number of bytes read from a buffer
)
// LogRotator ingests data and writes out to a rotated set of files
type LogRotator struct {
MaxFiles int // maximum number of rotated files retained by the log rotator
FileSize int64 // maximum file size of a rotated file
path string // path where the rotated files are created
fileName string // base file name of the rotated files
logFileIdx int // index to the current file
oldestLogFileIdx int // index to the oldest log file
logger *log.Logger
purgeCh chan struct{}
}
// NewLogRotator configures and returns a new LogRotator
func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, logger *log.Logger) (*LogRotator, error) {
files, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
}
// Finding out the log file with the largest index
logFileIdx := 0
prefix := fmt.Sprintf("%s.", fileName)
for _, f := range files {
if strings.HasPrefix(f.Name(), prefix) {
fileIdx := strings.TrimPrefix(f.Name(), prefix)
n, err := strconv.Atoi(fileIdx)
if err != nil {
continue
}
if n > logFileIdx {
logFileIdx = n
}
}
}
lr := &LogRotator{
MaxFiles: maxFiles,
FileSize: fileSize,
path: path,
fileName: fileName,
logFileIdx: logFileIdx,
logger: logger,
purgeCh: make(chan struct{}, 1),
}
go lr.PurgeOldFiles()
return lr, nil
}
// Start reads from a Reader and writes them to files and rotates them when the
// size of the file becomes equal to the max size configured
func (l *LogRotator) Start(r io.Reader) error {
buf := make([]byte, bufSize)
for {
logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx))
var fileSize int64
if f, err := os.Stat(logFileName); err == nil {
// Skipping the current file if it happens to be a directory
if f.IsDir() {
l.logFileIdx += 1
continue
}
fileSize = f.Size()
// Calculating the remaining capacity of the log file
}
f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return err
}
l.logger.Printf("[DEBUG] client.logrotator: opened a new file: %s", logFileName)
// Closing the current log file if it doesn't have any more capacity
if fileSize >= l.FileSize {
l.logFileIdx += 1
f.Close()
continue
}
// Reading from the reader and writing into the current log file as long
// as it has capacity or the reader closes
totalWritten := 0
for {
if l.FileSize-(fileSize+int64(totalWritten)) < 1 {
f.Close()
break
}
var nr int
var err error
remainingSize := l.FileSize - (int64(totalWritten) + fileSize)
if remainingSize < bufSize {
nr, err = r.Read(buf[0:remainingSize])
} else {
nr, err = r.Read(buf)
}
if err != nil {
f.Close()
return err
}
nw, err := f.Write(buf[:nr])
if err != nil {
f.Close()
return err
}
if nr != nw {
f.Close()
return fmt.Errorf("failed to write data read from the reader into file, R: %d W: %d", nr, nw)
}
totalWritten += nr
}
l.logFileIdx = l.logFileIdx + 1
// Purge old files if we have more files than MaxFiles
if l.logFileIdx-l.oldestLogFileIdx >= l.MaxFiles {
select {
case l.purgeCh <- struct{}{}:
default:
}
}
}
}
// PurgeOldFiles removes older files and keeps only the last N files rotated for
// a file
func (l *LogRotator) PurgeOldFiles() {
for {
select {
case <-l.purgeCh:
var fIndexes []int
files, err := ioutil.ReadDir(l.path)
if err != nil {
return
}
// Inserting all the rotated files in a slice
for _, f := range files {
if strings.HasPrefix(f.Name(), l.fileName) {
fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName))
n, err := strconv.Atoi(fileIdx)
if err != nil {
continue
}
fIndexes = append(fIndexes, n)
}
}
// Sorting the file indexes so that we can purge the older files and keep
// only the number of files as configured by the user
sort.Sort(sort.IntSlice(fIndexes))
var toDelete []int
toDelete = fIndexes[0 : len(fIndexes)-l.MaxFiles]
for _, fIndex := range toDelete {
fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex))
os.RemoveAll(fname)
}
l.oldestLogFileIdx = fIndexes[0]
}
}
}

View File

@@ -1,295 +0,0 @@
package logrotator
import (
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"testing"
"time"
)
var (
logger = log.New(os.Stdout, "", log.LstdFlags)
pathPrefix = "logrotator"
)
func TestLogRotator_InvalidPath(t *testing.T) {
invalidPath := "/foo"
if _, err := NewLogRotator(invalidPath, "redis.stdout", 10, 10, logger); err == nil {
t.Fatal("expected err")
}
}
func TestLogRotator_FindCorrectIndex(t *testing.T) {
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)
fname := filepath.Join(path, "redis.stdout.1")
if f, err := os.Create(fname); err == nil {
f.Close()
}
fname = filepath.Join(path, "redis.stdout.2")
if f, err := os.Create(fname); err == nil {
f.Close()
}
r, err := NewLogRotator(path, "redis.stdout", 10, 10, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
if r.logFileIdx != 2 {
t.Fatalf("Expected log file idx: %v, actual: %v", 2, r.logFileIdx)
}
}
func TestLogRotator_AppendToCurrentFile(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
fname := filepath.Join(path, "redis.stdout.0")
if f, err := os.Create(fname); err == nil {
f.WriteString("abcde")
f.Close()
}
l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger)
if err != nil && err != io.EOF {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("fg"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatal(err)
}
finfo, err := os.Stat(fname)
if err != nil {
t.Fatal(err)
}
if finfo.Size() != 6 {
t.Fatalf("Expected size of file: %v, actual: %v", 6, finfo.Size())
}
}
func TestLogRotator_RotateFiles(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
fname := filepath.Join(path, "redis.stdout.0")
if f, err := os.Create(fname); err == nil {
f.WriteString("abcde")
f.Close()
}
l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
// This should make the current log file rotate
w.Write([]byte("fg"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("Failure in logrotator start %v", err)
}
if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1")); err == nil {
if finfo.Size() != 1 {
t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size())
}
} else {
t.Fatal("expected file redis.stdout.1")
}
if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")); err == nil {
if finfo.Size() != 6 {
t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size())
}
} else {
t.Fatal("expected file redis.stdout.0")
}
}
func TestLogRotator_StartFromEmptyDir(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("abcdefg"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("Failure in logrotator start %v", err)
}
finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0"))
if err != nil {
t.Fatal(err)
}
if finfo.Size() != 7 {
t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size())
}
}
func TestLogRotator_SetPathAsFile(t *testing.T) {
var f *os.File
var err error
var path string
defer os.RemoveAll(path)
if f, err = ioutil.TempFile("", pathPrefix); err != nil {
t.Fatalf("test setup problem: %v", err)
}
path = f.Name()
if _, err = NewLogRotator(f.Name(), "redis.stdout", 10, 10, logger); err == nil {
t.Fatal("expected error")
}
}
func TestLogRotator_ExcludeDirs(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
if err := os.Mkdir(filepath.Join(path, "redis.stdout.0"), os.ModeDir|os.ModePerm); err != nil {
t.Fatalf("test setup err: %v", err)
}
l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("fg"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("Failure in logrotator start %v", err)
}
finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1"))
if err != nil {
t.Fatal("expected rotator to create redis.stdout.1")
}
if finfo.Size() != 2 {
t.Fatalf("expected size: %v, actual: %v", 2, finfo.Size())
}
}
func TestLogRotator_PurgeDirs(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
l, err := NewLogRotator(path, "redis.stdout", 2, 4, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("abcdefghijklmnopqrxyz"))
time.Sleep(1 * time.Second)
l.MaxFiles = 1
w.Write([]byte("abcdefghijklmnopqrxyz"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("failure in logrotator start: %v", err)
}
// sleeping for a second because purging is async
time.Sleep(1 * time.Second)
files, err := ioutil.ReadDir(path)
if err != nil {
t.Fatalf("err: %v", err)
}
expected := 1
if len(files) != expected {
t.Fatalf("expected number of files: %v, actual: %v", expected, len(files))
}
}
func TestLogRotator_UpdateConfig(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("abcdefg"))
l.FileSize = 5
w.Write([]byte("hijklmnojkp"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("Failure in logrotator start %v", err)
}
finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0"))
finfo1, err1 := os.Stat(filepath.Join(path, "redis.stdout.1"))
if err != nil {
t.Fatal(err)
}
if finfo.Size() != 10 {
t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size())
}
if err1 != nil {
t.Fatal(err)
}
if finfo1.Size() != 5 {
t.Fatalf("expected size of file: %v, actual: %v", 5, finfo.Size())
}
}

View File

@@ -1,6 +1,9 @@
package structs
import "fmt"
import (
"fmt"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
// WaitResult stores the result of a Wait operation.
type WaitResult struct {
@@ -25,3 +28,9 @@ func (r *WaitResult) String() string {
return fmt.Sprintf("Wait returned exit code %v, signal %v, and error %v",
r.ExitCode, r.Signal, r.Err)
}
// IsolationConfig has information about the isolation mechanism the executor
// uses to put resource constraints and isolation on the user process
type IsolationConfig struct {
Cgroup *cgroupConfig.Cgroup
}