From 4a01bd8bcd420aba6dad6905da6e5c5c90ebe043 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 8 Feb 2016 10:10:01 -0800 Subject: [PATCH] Implemented the UpdateLogConfig method --- client/driver/executor/executor.go | 21 +++++++++++++++++++++ client/driver/executor_plugin.go | 9 +++++++++ client/driver/logrotator/logs.go | 14 +++++++------- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 6684ca048..408668010 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -54,6 +54,7 @@ type Executor interface { Wait() (*ProcessState, error) ShutDown() error Exit() error + UpdateLogConfig(logConfig *structs.LogConfig) error } // UniversalExecutor is an implementation of the Executor which launches and @@ -67,6 +68,8 @@ type UniversalExecutor struct { groups *cgroupConfig.Cgroup exitState *ProcessState processExited chan interface{} + lre *logrotator.LogRotator + lro *logrotator.LogRotator logger *log.Logger lock sync.Mutex @@ -105,6 +108,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err) } e.cmd.Stdout = stdow + e.lro = lro go lro.Start(stdor) stder, stdew := io.Pipe() @@ -113,6 +117,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err) } e.cmd.Stderr = stdew + e.lre = lre go lre.Start(stder) e.cmd.Env = ctx.TaskEnv.EnvList() @@ -141,6 +146,22 @@ func (e *UniversalExecutor) Wait() (*ProcessState, error) { return e.exitState, nil } +func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error { + e.ctx.LogConfig = logConfig + if e.lro == nil { + return fmt.Errorf("log rotator for stdout doesn't exist") + } + e.lro.MaxFiles = logConfig.MaxFiles + e.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + + if e.lre == nil { + return fmt.Errorf("log rotator for stderr doesn't exist") + } + e.lre.MaxFiles = logConfig.MaxFiles + e.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + return nil +} + func (e *UniversalExecutor) wait() { defer close(e.processExited) err := e.cmd.Wait() diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index 57a95e326..1d9af9bb4 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/driver/executor" + "github.com/hashicorp/nomad/nomad/structs" ) var HandshakeConfig = plugin.HandshakeConfig{ @@ -76,6 +77,10 @@ func (e *ExecutorRPC) Exit() error { return e.client.Call("Plugin.Exit", new(interface{}), new(interface{})) } +func (e *ExecutorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error { + return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{})) +} + type ExecutorRPCServer struct { Impl executor.Executor } @@ -104,6 +109,10 @@ func (e *ExecutorRPCServer) Exit(args interface{}, resp *interface{}) error { return e.Impl.Exit() } +func (e *ExecutorRPCServer) UpdateLogConfig(args *structs.LogConfig, resp *interface{}) error { + return e.Impl.UpdateLogConfig(args) +} + type ExecutorPlugin struct { logger *log.Logger } diff --git a/client/driver/logrotator/logs.go b/client/driver/logrotator/logs.go index 0442c8519..2cebf1e5d 100644 --- a/client/driver/logrotator/logs.go +++ b/client/driver/logrotator/logs.go @@ -18,8 +18,8 @@ const ( // LogRotator ingests data and writes out to a rotated set of files type LogRotator struct { - maxFiles int // maximum number of rotated files retained by the log rotator - fileSize int64 // maximum file size of a rotated file + MaxFiles int // maximum number of rotated files retained by the log rotator + FileSize int64 // maximum file size of a rotated file path string // path where the rotated files are created fileName string // base file name of the rotated files @@ -52,8 +52,8 @@ func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, l } return &LogRotator{ - maxFiles: maxFiles, - fileSize: fileSize, + MaxFiles: maxFiles, + FileSize: fileSize, path: path, fileName: fileName, logFileIdx: logFileIdx, @@ -67,7 +67,7 @@ func (l *LogRotator) Start(r io.Reader) error { buf := make([]byte, bufSize) for { logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx)) - remainingSize := l.fileSize + remainingSize := l.FileSize if f, err := os.Stat(logFileName); err == nil { // Skipping the current file if it happens to be a directory if f.IsDir() { @@ -75,7 +75,7 @@ func (l *LogRotator) Start(r io.Reader) error { continue } // Calculating the remaining capacity of the log file - remainingSize = l.fileSize - f.Size() + remainingSize = l.FileSize - f.Size() } f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { @@ -147,7 +147,7 @@ func (l *LogRotator) PurgeOldFiles() { // Sorting the file indexes so that we can purge the older files and keep // only the number of files as configured by the user sort.Sort(sort.IntSlice(fIndexes)) - toDelete := fIndexes[l.maxFiles-1 : len(fIndexes)-1] + toDelete := fIndexes[l.MaxFiles-1 : len(fIndexes)-1] for _, fIndex := range toDelete { fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) os.RemoveAll(fname)