From 387fcc36c89c3bddb414b641ab487ae0d6d82ea7 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 19 Feb 2016 14:01:07 -0800 Subject: [PATCH] Making the log rotator a writer --- client/driver/exec.go | 4 +- client/driver/executor/executor.go | 37 +-- client/driver/java.go | 4 +- client/driver/logging/rotator.go | 3 +- client/driver/logging/universal_collector.go | 34 +-- client/driver/logrotator/logs.go | 176 ----------- client/driver/logrotator/logs_test.go | 295 ------------------- client/driver/structs/structs.go | 11 +- 8 files changed, 43 insertions(+), 521 deletions(-) delete mode 100644 client/driver/logrotator/logs.go delete mode 100644 client/driver/logrotator/logs_test.go diff --git a/client/driver/exec.go b/client/driver/exec.go index c00a483b9..c9de4144e 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -38,7 +38,7 @@ type ExecDriverConfig struct { type execHandle struct { pluginClient *plugin.Client executor executor.Executor - isolationConfig *executor.IsolationConfig + isolationConfig *cstructs.IsolationConfig userPid int allocDir *allocdir.AllocDir killTimeout time.Duration @@ -153,7 +153,7 @@ type execId struct { UserPid int TaskDir string AllocDir *allocdir.AllocDir - IsolationConfig *executor.IsolationConfig + IsolationConfig *cstructs.IsolationConfig PluginConfig *PluginReattachConfig } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 7c383adfd..104edd757 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -2,7 +2,6 @@ package executor import ( "fmt" - "io" "log" "os" "os/exec" @@ -18,7 +17,8 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/env" - "github.com/hashicorp/nomad/client/driver/logrotator" + "github.com/hashicorp/nomad/client/driver/logging" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -61,18 +61,12 @@ type ExecCommand struct { Args []string } -// IsolationConfig has information about the isolation mechanism the executor -// uses to put resource constraints and isolation on the user process -type IsolationConfig struct { - Cgroup *cgroupConfig.Cgroup -} - // ProcessState holds information about the state of a user process. type ProcessState struct { Pid int ExitCode int Signal int - IsolationConfig *IsolationConfig + IsolationConfig *cstructs.IsolationConfig Time time.Time } @@ -97,8 +91,8 @@ type UniversalExecutor struct { groups *cgroupConfig.Cgroup exitState *ProcessState processExited chan interface{} - lre *logrotator.LogRotator - lro *logrotator.LogRotator + lre *logging.FileRotator + lro *logging.FileRotator logger *log.Logger lock sync.Mutex @@ -135,28 +129,23 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext } logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) + path := filepath.Join(e.taskDir, allocdir.TaskLocal) + lro, err := logging.NewFileRotator(path, fmt.Sprintf("%v.stdout", ctx.TaskName), + ctx.LogConfig.MaxFiles, logFileSize, e.logger) - stdor, stdow := io.Pipe() - lro, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal), - fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, - logFileSize, e.logger) if err != nil { return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err) } - e.cmd.Stdout = stdow + e.cmd.Stdout = lro e.lro = lro - go lro.Start(stdor) - stder, stdew := io.Pipe() - lre, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal), - fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles, - logFileSize, e.logger) + lre, err := logging.NewFileRotator(path, fmt.Sprintf("%v.stderr", ctx.TaskName), + ctx.LogConfig.MaxFiles, logFileSize, e.logger) if err != nil { return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err) } - e.cmd.Stderr = stdew + e.cmd.Stderr = lre e.lre = lre - go lre.Start(stder) // setting the env, path and args for the command e.ctx.TaskEnv.Build() @@ -175,7 +164,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext return nil, fmt.Errorf("error starting command: %v", err) } go e.wait() - ic := &IsolationConfig{Cgroup: e.groups} + ic := &cstructs.IsolationConfig{Cgroup: e.groups} return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil } diff --git a/client/driver/java.go b/client/driver/java.go index f7d724efc..af67dcdfe 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -45,7 +45,7 @@ type javaHandle struct { pluginClient *plugin.Client userPid int executor executor.Executor - isolationConfig *executor.IsolationConfig + isolationConfig *cstructs.IsolationConfig taskDir string allocDir *allocdir.AllocDir @@ -198,7 +198,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, type javaId struct { KillTimeout time.Duration PluginConfig *PluginReattachConfig - IsolationConfig *executor.IsolationConfig + IsolationConfig *cstructs.IsolationConfig TaskDir string AllocDir *allocdir.AllocDir UserPid int diff --git a/client/driver/logging/rotator.go b/client/driver/logging/rotator.go index 91a04a085..e3d218d2c 100644 --- a/client/driver/logging/rotator.go +++ b/client/driver/logging/rotator.go @@ -33,7 +33,8 @@ type FileRotator struct { purgeCh chan interface{} } -func NewFileRotator(path string, baseFile string, maxFiles int, fileSize int64, logger *log.Logger) (*FileRotator, error) { +func NewFileRotator(path string, baseFile string, maxFiles int, + fileSize int64, logger *log.Logger) (*FileRotator, error) { rotator := &FileRotator{ MaxFiles: maxFiles, FileSize: fileSize, diff --git a/client/driver/logging/universal_collector.go b/client/driver/logging/universal_collector.go index 718d4d37d..f280cd3ef 100644 --- a/client/driver/logging/universal_collector.go +++ b/client/driver/logging/universal_collector.go @@ -4,15 +4,13 @@ package logging import ( "fmt" - "io" "log" "log/syslog" "net" "path/filepath" "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/driver/executor" - "github.com/hashicorp/nomad/client/driver/logrotator" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -40,7 +38,7 @@ type LogCollectorContext struct { // SyslogCollectorState holds the address and islation information of a launched // syslog server type SyslogCollectorState struct { - IsolationConfig *executor.IsolationConfig + IsolationConfig *cstructs.IsolationConfig Addr string } @@ -59,8 +57,8 @@ type SyslogCollector struct { logConfig *structs.LogConfig ctx *LogCollectorContext - lro *logrotator.LogRotator - lre *logrotator.LogRotator + lro *FileRotator + lre *FileRotator server *SyslogServer taskDir string @@ -92,36 +90,32 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl go syslogServer.Start() logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) - ro, wo := io.Pipe() - lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), - fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, - logFileSize, s.logger) + path := filepath.Join(s.taskDir, allocdir.TaskLocal) + lro, err := NewFileRotator(path, fmt.Sprintf("%v.stdout", ctx.TaskName), + ctx.LogConfig.MaxFiles, logFileSize, s.logger) + if err != nil { return nil, err } s.lro = lro - go lro.Start(ro) - re, we := io.Pipe() - lre, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), - fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles, - logFileSize, s.logger) + lre, err := NewFileRotator(path, fmt.Sprintf("%v.stderr", ctx.TaskName), + ctx.LogConfig.MaxFiles, logFileSize, s.logger) if err != nil { return nil, err } s.lre = lre - go lre.Start(re) go func(channel chan *SyslogMessage) { for logParts := range channel { // If the severity of the log line is err then we write to stderr // otherwise all messages go to stdout if logParts.Severity == syslog.LOG_ERR { - we.Write(logParts.Message) - we.Write([]byte("\n")) + s.lre.Write(logParts.Message) + s.lre.Write([]byte("\n")) } else { - wo.Write(logParts.Message) - wo.Write([]byte("\n")) + s.lro.Write(logParts.Message) + s.lro.Write([]byte("\n")) } } }(channel) diff --git a/client/driver/logrotator/logs.go b/client/driver/logrotator/logs.go deleted file mode 100644 index 026c196be..000000000 --- a/client/driver/logrotator/logs.go +++ /dev/null @@ -1,176 +0,0 @@ -package logrotator - -import ( - "fmt" - "io" - "io/ioutil" - "log" - "os" - "path/filepath" - "sort" - "strconv" - "strings" -) - -const ( - bufSize = 32 * 1024 // Max number of bytes read from a buffer -) - -// LogRotator ingests data and writes out to a rotated set of files -type LogRotator struct { - MaxFiles int // maximum number of rotated files retained by the log rotator - FileSize int64 // maximum file size of a rotated file - path string // path where the rotated files are created - fileName string // base file name of the rotated files - - logFileIdx int // index to the current file - oldestLogFileIdx int // index to the oldest log file - - logger *log.Logger - purgeCh chan struct{} -} - -// NewLogRotator configures and returns a new LogRotator -func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, logger *log.Logger) (*LogRotator, error) { - files, err := ioutil.ReadDir(path) - if err != nil { - return nil, err - } - - // Finding out the log file with the largest index - logFileIdx := 0 - prefix := fmt.Sprintf("%s.", fileName) - for _, f := range files { - if strings.HasPrefix(f.Name(), prefix) { - fileIdx := strings.TrimPrefix(f.Name(), prefix) - n, err := strconv.Atoi(fileIdx) - if err != nil { - continue - } - if n > logFileIdx { - logFileIdx = n - } - } - } - - lr := &LogRotator{ - MaxFiles: maxFiles, - FileSize: fileSize, - path: path, - fileName: fileName, - logFileIdx: logFileIdx, - logger: logger, - purgeCh: make(chan struct{}, 1), - } - go lr.PurgeOldFiles() - - return lr, nil -} - -// Start reads from a Reader and writes them to files and rotates them when the -// size of the file becomes equal to the max size configured -func (l *LogRotator) Start(r io.Reader) error { - buf := make([]byte, bufSize) - for { - logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx)) - var fileSize int64 - if f, err := os.Stat(logFileName); err == nil { - // Skipping the current file if it happens to be a directory - if f.IsDir() { - l.logFileIdx += 1 - continue - } - fileSize = f.Size() - // Calculating the remaining capacity of the log file - } - f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - return err - } - l.logger.Printf("[DEBUG] client.logrotator: opened a new file: %s", logFileName) - - // Closing the current log file if it doesn't have any more capacity - if fileSize >= l.FileSize { - l.logFileIdx += 1 - f.Close() - continue - } - - // Reading from the reader and writing into the current log file as long - // as it has capacity or the reader closes - totalWritten := 0 - for { - if l.FileSize-(fileSize+int64(totalWritten)) < 1 { - f.Close() - break - } - var nr int - var err error - remainingSize := l.FileSize - (int64(totalWritten) + fileSize) - if remainingSize < bufSize { - nr, err = r.Read(buf[0:remainingSize]) - } else { - nr, err = r.Read(buf) - } - if err != nil { - f.Close() - return err - } - nw, err := f.Write(buf[:nr]) - if err != nil { - f.Close() - return err - } - if nr != nw { - f.Close() - return fmt.Errorf("failed to write data read from the reader into file, R: %d W: %d", nr, nw) - } - totalWritten += nr - } - l.logFileIdx = l.logFileIdx + 1 - // Purge old files if we have more files than MaxFiles - if l.logFileIdx-l.oldestLogFileIdx >= l.MaxFiles { - select { - case l.purgeCh <- struct{}{}: - default: - } - } - } -} - -// PurgeOldFiles removes older files and keeps only the last N files rotated for -// a file -func (l *LogRotator) PurgeOldFiles() { - for { - select { - case <-l.purgeCh: - var fIndexes []int - files, err := ioutil.ReadDir(l.path) - if err != nil { - return - } - // Inserting all the rotated files in a slice - for _, f := range files { - if strings.HasPrefix(f.Name(), l.fileName) { - fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName)) - n, err := strconv.Atoi(fileIdx) - if err != nil { - continue - } - fIndexes = append(fIndexes, n) - } - } - - // Sorting the file indexes so that we can purge the older files and keep - // only the number of files as configured by the user - sort.Sort(sort.IntSlice(fIndexes)) - var toDelete []int - toDelete = fIndexes[0 : len(fIndexes)-l.MaxFiles] - for _, fIndex := range toDelete { - fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) - os.RemoveAll(fname) - } - l.oldestLogFileIdx = fIndexes[0] - } - } -} diff --git a/client/driver/logrotator/logs_test.go b/client/driver/logrotator/logs_test.go deleted file mode 100644 index b6a62216e..000000000 --- a/client/driver/logrotator/logs_test.go +++ /dev/null @@ -1,295 +0,0 @@ -package logrotator - -import ( - "io" - "io/ioutil" - "log" - "os" - "path/filepath" - "testing" - "time" -) - -var ( - logger = log.New(os.Stdout, "", log.LstdFlags) - pathPrefix = "logrotator" -) - -func TestLogRotator_InvalidPath(t *testing.T) { - invalidPath := "/foo" - - if _, err := NewLogRotator(invalidPath, "redis.stdout", 10, 10, logger); err == nil { - t.Fatal("expected err") - } -} - -func TestLogRotator_FindCorrectIndex(t *testing.T) { - var path string - var err error - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - defer os.RemoveAll(path) - - fname := filepath.Join(path, "redis.stdout.1") - if f, err := os.Create(fname); err == nil { - f.Close() - } - - fname = filepath.Join(path, "redis.stdout.2") - if f, err := os.Create(fname); err == nil { - f.Close() - } - - r, err := NewLogRotator(path, "redis.stdout", 10, 10, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - if r.logFileIdx != 2 { - t.Fatalf("Expected log file idx: %v, actual: %v", 2, r.logFileIdx) - } -} - -func TestLogRotator_AppendToCurrentFile(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - fname := filepath.Join(path, "redis.stdout.0") - if f, err := os.Create(fname); err == nil { - f.WriteString("abcde") - f.Close() - } - - l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger) - if err != nil && err != io.EOF { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - w.Write([]byte("fg")) - w.Close() - }() - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatal(err) - } - finfo, err := os.Stat(fname) - if err != nil { - t.Fatal(err) - } - if finfo.Size() != 6 { - t.Fatalf("Expected size of file: %v, actual: %v", 6, finfo.Size()) - } -} - -func TestLogRotator_RotateFiles(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - fname := filepath.Join(path, "redis.stdout.0") - if f, err := os.Create(fname); err == nil { - f.WriteString("abcde") - f.Close() - } - - l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - // This should make the current log file rotate - w.Write([]byte("fg")) - w.Close() - }() - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatalf("Failure in logrotator start %v", err) - } - - if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1")); err == nil { - if finfo.Size() != 1 { - t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size()) - } - } else { - t.Fatal("expected file redis.stdout.1") - } - - if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")); err == nil { - if finfo.Size() != 6 { - t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size()) - } - } else { - t.Fatal("expected file redis.stdout.0") - } -} - -func TestLogRotator_StartFromEmptyDir(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - - l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - w.Write([]byte("abcdefg")) - w.Close() - }() - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatalf("Failure in logrotator start %v", err) - } - - finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")) - if err != nil { - t.Fatal(err) - } - if finfo.Size() != 7 { - t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size()) - } - -} - -func TestLogRotator_SetPathAsFile(t *testing.T) { - var f *os.File - var err error - var path string - defer os.RemoveAll(path) - if f, err = ioutil.TempFile("", pathPrefix); err != nil { - t.Fatalf("test setup problem: %v", err) - } - path = f.Name() - if _, err = NewLogRotator(f.Name(), "redis.stdout", 10, 10, logger); err == nil { - t.Fatal("expected error") - } -} - -func TestLogRotator_ExcludeDirs(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - if err := os.Mkdir(filepath.Join(path, "redis.stdout.0"), os.ModeDir|os.ModePerm); err != nil { - t.Fatalf("test setup err: %v", err) - } - - l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - w.Write([]byte("fg")) - w.Close() - }() - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatalf("Failure in logrotator start %v", err) - } - - finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1")) - if err != nil { - t.Fatal("expected rotator to create redis.stdout.1") - } - if finfo.Size() != 2 { - t.Fatalf("expected size: %v, actual: %v", 2, finfo.Size()) - } -} - -func TestLogRotator_PurgeDirs(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - - l, err := NewLogRotator(path, "redis.stdout", 2, 4, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - w.Write([]byte("abcdefghijklmnopqrxyz")) - time.Sleep(1 * time.Second) - l.MaxFiles = 1 - w.Write([]byte("abcdefghijklmnopqrxyz")) - w.Close() - }() - - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatalf("failure in logrotator start: %v", err) - } - - // sleeping for a second because purging is async - time.Sleep(1 * time.Second) - files, err := ioutil.ReadDir(path) - if err != nil { - t.Fatalf("err: %v", err) - } - expected := 1 - if len(files) != expected { - t.Fatalf("expected number of files: %v, actual: %v", expected, len(files)) - } -} - -func TestLogRotator_UpdateConfig(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - - l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - w.Write([]byte("abcdefg")) - l.FileSize = 5 - w.Write([]byte("hijklmnojkp")) - w.Close() - }() - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatalf("Failure in logrotator start %v", err) - } - - finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")) - finfo1, err1 := os.Stat(filepath.Join(path, "redis.stdout.1")) - if err != nil { - t.Fatal(err) - } - if finfo.Size() != 10 { - t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size()) - } - if err1 != nil { - t.Fatal(err) - } - if finfo1.Size() != 5 { - t.Fatalf("expected size of file: %v, actual: %v", 5, finfo.Size()) - } -} diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index a60daf8df..912253457 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -1,6 +1,9 @@ package structs -import "fmt" +import ( + "fmt" + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" +) // WaitResult stores the result of a Wait operation. type WaitResult struct { @@ -25,3 +28,9 @@ func (r *WaitResult) String() string { return fmt.Sprintf("Wait returned exit code %v, signal %v, and error %v", r.ExitCode, r.Signal, r.Err) } + +// IsolationConfig has information about the isolation mechanism the executor +// uses to put resource constraints and isolation on the user process +type IsolationConfig struct { + Cgroup *cgroupConfig.Cgroup +}