From e4f4e35c801c33be836417f9bffe66f4930b28ec Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 4 Feb 2016 23:28:01 -0800 Subject: [PATCH 01/31] Added parsing logic for the logrotator --- jobspec/parse.go | 19 +++++++++++++++++++ jobspec/parse_test.go | 16 ++++++++++++++++ jobspec/test-fixtures/basic.hcl | 4 ++++ nomad/structs/structs.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 68 insertions(+) diff --git a/jobspec/parse.go b/jobspec/parse.go index 8623a7e43..db052942c 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -401,6 +401,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l delete(m, "service") delete(m, "meta") delete(m, "resources") + delete(m, "logs") // Build the task var t structs.Task @@ -484,6 +485,24 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l t.Resources = &r } + // If we have logs then parse that + logConfig := &structs.LogConfig{MaxFiles: 10, MaxFileSizeMB: 10} + if o := listVal.Filter("logs"); len(o.Items) > 0 { + if len(o.Items) > 1 { + return fmt.Errorf("only one logs block is allowed in a Task. Number of logs block found: %d", len(o.Items)) + } + var m map[string]interface{} + logsBlock := o.Items[0] + if err := hcl.DecodeObject(&m, logsBlock.Val); err != nil { + return err + } + + if err := mapstructure.WeakDecode(m, &logConfig); err != nil { + return err + } + } + t.LogConfig = logConfig + *result = append(*result, &t) } diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 399bb4a32..00e5476c7 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -58,6 +58,10 @@ func TestParse(t *testing.T) { Meta: map[string]string{ "my-cool-key": "foobar", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, }, }, }, @@ -123,6 +127,10 @@ func TestParse(t *testing.T) { }, }, KillTimeout: 22 * time.Second, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 100, + }, }, &structs.Task{ Name: "storagelocker", @@ -143,6 +151,10 @@ func TestParse(t *testing.T) { Operand: "=", }, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, }, }, }, @@ -284,6 +296,10 @@ func TestParse(t *testing.T) { }, }, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, }, }, }, diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 352aeb7cd..84413e3c1 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -42,6 +42,10 @@ job "binstore-storagelocker" { config { image = "hashicorp/binstore" } + logs { + max_files = 10 + max_file_size = 100 + } env { HELLO = "world" LOREM = "ipsum" diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index f2e5e356b..95d64e05b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1438,6 +1438,25 @@ const ( DefaultKillTimeout = 5 * time.Second ) +// LogConfig provides configuration for log rotation +type LogConfig struct { + MaxFiles int `mapstructure:"max_files"` + MaxFileSizeMB int `mapstructure:"max_file_size"` +} + +// MeetsMinResources returns an error if the log config specified are less than +// the minimum allowed. +func (l *LogConfig) MeetsMinResources() error { + var mErr multierror.Error + if l.MaxFiles < 10 { + mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum number of files is 10; got %d", l.MaxFiles)) + } + if l.MaxFileSizeMB < 10 { + mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum file size is 10MB; got %d", l.MaxFileSizeMB)) + } + return mErr.ErrorOrNil() +} + // Task is a single process typically that is executed as part of a task group. type Task struct { // Name of the task @@ -1469,6 +1488,9 @@ type Task struct { // KillTimeout is the time between signaling a task that it will be // killed and killing it. KillTimeout time.Duration `mapstructure:"kill_timeout"` + + // LogConfig provides configuration for log rotation + LogConfig *LogConfig `mapstructure:"logs"` } // InitFields initializes fields in the task. @@ -1628,6 +1650,13 @@ func (t *Task) Validate() error { mErr.Errors = append(mErr.Errors, err) } + // Validate the log config + if t.LogConfig == nil { + mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config")) + } else if err := t.LogConfig.MeetsMinResources(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + for idx, constr := range t.Constraints { if err := constr.Validate(); err != nil { outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err) From 7bf254c9afee7ec2433aa29a64e07b74c950201a Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 4 Feb 2016 23:54:15 -0800 Subject: [PATCH 02/31] Implemented log rotator for exec based drivers --- api/tasks.go | 7 ++++++ client/driver/exec.go | 1 + client/driver/executor/executor.go | 27 +++++++++++++-------- client/driver/java.go | 1 + client/driver/{ => logrotator}/logs.go | 2 +- client/driver/{ => logrotator}/logs_test.go | 2 +- client/driver/qemu.go | 1 + client/driver/raw_exec.go | 1 + 8 files changed, 30 insertions(+), 12 deletions(-) rename client/driver/{ => logrotator}/logs.go (99%) rename client/driver/{ => logrotator}/logs_test.go (99%) diff --git a/api/tasks.go b/api/tasks.go index 7fa476b72..04056da21 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -74,6 +74,12 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup { return g } +// LogConfig provides configuration for log rotation +type LogConfig struct { + MaxFiles int `mapstructure:"max_files"` + MaxFileSizeMB int `mapstructure:"max_file_size"` +} + // Task is a single process in a task group. type Task struct { Name string @@ -85,6 +91,7 @@ type Task struct { Resources *Resources Meta map[string]string KillTimeout time.Duration + LogConfig *LogConfig } // NewTask creates and initializes a new Task. diff --git a/client/driver/exec.go b/client/driver/exec.go index 5cfb0fcac..a1ca8ded6 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -116,6 +116,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, AllocDir: ctx.AllocDir, TaskName: task.Name, TaskResources: task.Resources, + LogConfig: task.LogConfig, ResourceLimits: true, FSIsolation: true, UnprivilegedUser: true, diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index cdf153cb3..6684ca048 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -2,6 +2,7 @@ package executor import ( "fmt" + "io" "log" "os" "os/exec" @@ -15,6 +16,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/env" + "github.com/hashicorp/nomad/client/driver/logrotator" "github.com/hashicorp/nomad/nomad/structs" ) @@ -25,6 +27,7 @@ type ExecutorContext struct { AllocDir *allocdir.AllocDir TaskName string TaskResources *structs.Resources + LogConfig *structs.LogConfig FSIsolation bool ResourceLimits bool UnprivilegedUser bool @@ -94,19 +97,23 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext } } - stdoPath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", ctx.TaskName)) - stdo, err := os.OpenFile(stdoPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) - if err != nil { - return nil, err - } - e.cmd.Stdout = stdo + logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) - stdePath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", ctx.TaskName)) - stde, err := os.OpenFile(stdePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + stdor, stdow := io.Pipe() + lro, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal), fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, logFileSize, e.logger) if err != nil { - return nil, err + return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err) } - e.cmd.Stderr = stde + e.cmd.Stdout = stdow + go lro.Start(stdor) + + stder, stdew := io.Pipe() + lre, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal), fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles, logFileSize, e.logger) + if err != nil { + return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err) + } + e.cmd.Stderr = stdew + go lre.Start(stder) e.cmd.Env = ctx.TaskEnv.EnvList() diff --git a/client/driver/java.go b/client/driver/java.go index 494916324..3b807de0f 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -162,6 +162,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, AllocDir: ctx.AllocDir, TaskName: task.Name, TaskResources: task.Resources, + LogConfig: task.LogConfig, } ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "java", Args: args}, executorCtx) if err != nil { diff --git a/client/driver/logs.go b/client/driver/logrotator/logs.go similarity index 99% rename from client/driver/logs.go rename to client/driver/logrotator/logs.go index 7297382cd..0442c8519 100644 --- a/client/driver/logs.go +++ b/client/driver/logrotator/logs.go @@ -1,4 +1,4 @@ -package driver +package logrotator import ( "fmt" diff --git a/client/driver/logs_test.go b/client/driver/logrotator/logs_test.go similarity index 99% rename from client/driver/logs_test.go rename to client/driver/logrotator/logs_test.go index d45ae5fdd..12128fb96 100644 --- a/client/driver/logs_test.go +++ b/client/driver/logrotator/logs_test.go @@ -1,4 +1,4 @@ -package driver +package logrotator import ( "io" diff --git a/client/driver/qemu.go b/client/driver/qemu.go index f1bed809a..06709d71a 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -206,6 +206,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, AllocDir: ctx.AllocDir, TaskName: task.Name, TaskResources: task.Resources, + LogConfig: task.LogConfig, } ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: args[0], Args: args[1:]}, executorCtx) if err != nil { diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 784e7292e..c67623367 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -112,6 +112,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl AllocDir: ctx.AllocDir, TaskName: task.Name, TaskResources: task.Resources, + LogConfig: task.LogConfig, } ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) if err != nil { From 9d6b09c378893ea73bd1a07329b14a4cad12a4f4 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 5 Feb 2016 00:22:31 -0800 Subject: [PATCH 03/31] Passing the log rotation parameters to docker daemon --- client/driver/docker.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/client/driver/docker.go b/client/driver/docker.go index ba2d49407..78601817c 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -229,6 +229,13 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri // local directory for storage and a shared alloc directory that can be // used to share data between different tasks in the same task group. Binds: binds, + LogConfig: docker.LogConfig{ + Type: "json-file", + Config: map[string]string{ + "max-size": fmt.Sprintf("%dm", task.LogConfig.MaxFileSizeMB), + "max-file": strconv.Itoa(task.LogConfig.MaxFiles), + }, + }, } d.logger.Printf("[DEBUG] driver.docker: using %d bytes memory for %s", hostConfig.Memory, task.Config["image"]) From 4a01bd8bcd420aba6dad6905da6e5c5c90ebe043 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 8 Feb 2016 10:10:01 -0800 Subject: [PATCH 04/31] Implemented the UpdateLogConfig method --- client/driver/executor/executor.go | 21 +++++++++++++++++++++ client/driver/executor_plugin.go | 9 +++++++++ client/driver/logrotator/logs.go | 14 +++++++------- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 6684ca048..408668010 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -54,6 +54,7 @@ type Executor interface { Wait() (*ProcessState, error) ShutDown() error Exit() error + UpdateLogConfig(logConfig *structs.LogConfig) error } // UniversalExecutor is an implementation of the Executor which launches and @@ -67,6 +68,8 @@ type UniversalExecutor struct { groups *cgroupConfig.Cgroup exitState *ProcessState processExited chan interface{} + lre *logrotator.LogRotator + lro *logrotator.LogRotator logger *log.Logger lock sync.Mutex @@ -105,6 +108,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err) } e.cmd.Stdout = stdow + e.lro = lro go lro.Start(stdor) stder, stdew := io.Pipe() @@ -113,6 +117,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err) } e.cmd.Stderr = stdew + e.lre = lre go lre.Start(stder) e.cmd.Env = ctx.TaskEnv.EnvList() @@ -141,6 +146,22 @@ func (e *UniversalExecutor) Wait() (*ProcessState, error) { return e.exitState, nil } +func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error { + e.ctx.LogConfig = logConfig + if e.lro == nil { + return fmt.Errorf("log rotator for stdout doesn't exist") + } + e.lro.MaxFiles = logConfig.MaxFiles + e.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + + if e.lre == nil { + return fmt.Errorf("log rotator for stderr doesn't exist") + } + e.lre.MaxFiles = logConfig.MaxFiles + e.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + return nil +} + func (e *UniversalExecutor) wait() { defer close(e.processExited) err := e.cmd.Wait() diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index 57a95e326..1d9af9bb4 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/driver/executor" + "github.com/hashicorp/nomad/nomad/structs" ) var HandshakeConfig = plugin.HandshakeConfig{ @@ -76,6 +77,10 @@ func (e *ExecutorRPC) Exit() error { return e.client.Call("Plugin.Exit", new(interface{}), new(interface{})) } +func (e *ExecutorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error { + return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{})) +} + type ExecutorRPCServer struct { Impl executor.Executor } @@ -104,6 +109,10 @@ func (e *ExecutorRPCServer) Exit(args interface{}, resp *interface{}) error { return e.Impl.Exit() } +func (e *ExecutorRPCServer) UpdateLogConfig(args *structs.LogConfig, resp *interface{}) error { + return e.Impl.UpdateLogConfig(args) +} + type ExecutorPlugin struct { logger *log.Logger } diff --git a/client/driver/logrotator/logs.go b/client/driver/logrotator/logs.go index 0442c8519..2cebf1e5d 100644 --- a/client/driver/logrotator/logs.go +++ b/client/driver/logrotator/logs.go @@ -18,8 +18,8 @@ const ( // LogRotator ingests data and writes out to a rotated set of files type LogRotator struct { - maxFiles int // maximum number of rotated files retained by the log rotator - fileSize int64 // maximum file size of a rotated file + MaxFiles int // maximum number of rotated files retained by the log rotator + FileSize int64 // maximum file size of a rotated file path string // path where the rotated files are created fileName string // base file name of the rotated files @@ -52,8 +52,8 @@ func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, l } return &LogRotator{ - maxFiles: maxFiles, - fileSize: fileSize, + MaxFiles: maxFiles, + FileSize: fileSize, path: path, fileName: fileName, logFileIdx: logFileIdx, @@ -67,7 +67,7 @@ func (l *LogRotator) Start(r io.Reader) error { buf := make([]byte, bufSize) for { logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx)) - remainingSize := l.fileSize + remainingSize := l.FileSize if f, err := os.Stat(logFileName); err == nil { // Skipping the current file if it happens to be a directory if f.IsDir() { @@ -75,7 +75,7 @@ func (l *LogRotator) Start(r io.Reader) error { continue } // Calculating the remaining capacity of the log file - remainingSize = l.fileSize - f.Size() + remainingSize = l.FileSize - f.Size() } f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { @@ -147,7 +147,7 @@ func (l *LogRotator) PurgeOldFiles() { // Sorting the file indexes so that we can purge the older files and keep // only the number of files as configured by the user sort.Sort(sort.IntSlice(fIndexes)) - toDelete := fIndexes[l.maxFiles-1 : len(fIndexes)-1] + toDelete := fIndexes[l.MaxFiles-1 : len(fIndexes)-1] for _, fIndex := range toDelete { fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) os.RemoveAll(fname) From a92a96336c9b8bf22ef7e3497b1eda7293175e06 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 9 Feb 2016 12:59:05 -0800 Subject: [PATCH 05/31] Renamed the ExecutorPluginConfig --- client/driver/exec.go | 4 +-- client/driver/executor_plugin.go | 38 --------------------------- client/driver/java.go | 4 +-- client/driver/plugins.go | 45 ++++++++++++++++++++++++++++++++ client/driver/qemu.go | 4 +-- client/driver/raw_exec.go | 4 +-- 6 files changed, 53 insertions(+), 46 deletions(-) create mode 100644 client/driver/plugins.go diff --git a/client/driver/exec.go b/client/driver/exec.go index 9647b4a94..eb5337dc3 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -154,7 +154,7 @@ type execId struct { TaskDir string AllocDir *allocdir.AllocDir IsolationConfig *executor.IsolationConfig - PluginConfig *ExecutorReattachConfig + PluginConfig *PluginReattachConfig } func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -204,7 +204,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro func (h *execHandle) ID() string { id := execId{ KillTimeout: h.killTimeout, - PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, AllocDir: h.allocDir, IsolationConfig: h.isolationConfig, diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index ece9c0428..1189c19dd 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -1,9 +1,7 @@ package driver import ( - "io" "log" - "net" "net/rpc" "github.com/hashicorp/go-plugin" @@ -11,42 +9,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -var HandshakeConfig = plugin.HandshakeConfig{ - ProtocolVersion: 1, - MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE", - MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255", -} - -func GetPluginMap(w io.Writer) map[string]plugin.Plugin { - p := new(ExecutorPlugin) - p.logger = log.New(w, "", log.LstdFlags) - return map[string]plugin.Plugin{"executor": p} -} - -// ExecutorReattachConfig is the config that we seralize and de-serialize and -// store in disk -type ExecutorReattachConfig struct { - Pid int - AddrNet string - AddrName string -} - -// PluginConfig returns a config from an ExecutorReattachConfig -func (c *ExecutorReattachConfig) PluginConfig() *plugin.ReattachConfig { - var addr net.Addr - switch c.AddrNet { - case "unix", "unixgram", "unixpacket": - addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName) - case "tcp", "tcp4", "tcp6": - addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName) - } - return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr} -} - -func NewExecutorReattachConfig(c *plugin.ReattachConfig) *ExecutorReattachConfig { - return &ExecutorReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()} -} - type ExecutorRPC struct { client *rpc.Client } diff --git a/client/driver/java.go b/client/driver/java.go index 6d8a15e7d..7ac3611a5 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -197,7 +197,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, type javaId struct { KillTimeout time.Duration - PluginConfig *ExecutorReattachConfig + PluginConfig *PluginReattachConfig IsolationConfig *executor.IsolationConfig TaskDir string AllocDir *allocdir.AllocDir @@ -254,7 +254,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro func (h *javaHandle) ID() string { id := javaId{ KillTimeout: h.killTimeout, - PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, TaskDir: h.taskDir, AllocDir: h.allocDir, diff --git a/client/driver/plugins.go b/client/driver/plugins.go new file mode 100644 index 000000000..acff16105 --- /dev/null +++ b/client/driver/plugins.go @@ -0,0 +1,45 @@ +package driver + +import ( + "io" + "log" + "net" + + "github.com/hashicorp/go-plugin" +) + +var HandshakeConfig = plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE", + MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255", +} + +func GetPluginMap(w io.Writer) map[string]plugin.Plugin { + p := new(ExecutorPlugin) + p.logger = log.New(w, "", log.LstdFlags) + return map[string]plugin.Plugin{"executor": p} +} + +// ExecutorReattachConfig is the config that we seralize and de-serialize and +// store in disk +type PluginReattachConfig struct { + Pid int + AddrNet string + AddrName string +} + +// PluginConfig returns a config from an ExecutorReattachConfig +func (c *PluginReattachConfig) PluginConfig() *plugin.ReattachConfig { + var addr net.Addr + switch c.AddrNet { + case "unix", "unixgram", "unixpacket": + addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName) + case "tcp", "tcp4", "tcp6": + addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName) + } + return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr} +} + +func NewPluginReattachConfig(c *plugin.ReattachConfig) *PluginReattachConfig { + return &PluginReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()} +} diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 68b7182ab..cd0cc0fd2 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -236,7 +236,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, type qemuId struct { KillTimeout time.Duration UserPid int - PluginConfig *ExecutorReattachConfig + PluginConfig *PluginReattachConfig AllocDir *allocdir.AllocDir } @@ -277,7 +277,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro func (h *qemuHandle) ID() string { id := qemuId{ KillTimeout: h.killTimeout, - PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, AllocDir: h.allocDir, } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 9b89cad48..ed84f3180 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -141,7 +141,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl type rawExecId struct { KillTimeout time.Duration UserPid int - PluginConfig *ExecutorReattachConfig + PluginConfig *PluginReattachConfig AllocDir *allocdir.AllocDir } @@ -181,7 +181,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e func (h *rawExecHandle) ID() string { id := rawExecId{ KillTimeout: h.killTimeout, - PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, AllocDir: h.allocDir, } From 7eedcbfcf6b2b91c1b17edcf44e8fd5818559500 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 9 Feb 2016 18:24:30 -0800 Subject: [PATCH 06/31] Starting the syslog collector along with a docker container --- client/driver/docker.go | 48 ++++++++++++++ client/driver/plugins.go | 12 +++- client/driver/syslog/collector.go | 106 ++++++++++++++++++++++++++++++ client/driver/syslog_plugin.go | 75 +++++++++++++++++++++ client/driver/utils.go | 47 ++++++++++++- command/syslog_plugin.go | 43 ++++++++++++ commands.go | 6 +- main.go | 1 + 8 files changed, 333 insertions(+), 5 deletions(-) create mode 100644 client/driver/syslog/collector.go create mode 100644 client/driver/syslog_plugin.go create mode 100644 command/syslog_plugin.go diff --git a/client/driver/docker.go b/client/driver/docker.go index a71e9f060..3841572eb 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -6,6 +6,7 @@ import ( "log" "net" "os" + "os/exec" "path/filepath" "strconv" "strings" @@ -14,10 +15,14 @@ import ( docker "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/driver/structs" + "github.com/hashicorp/nomad/client/driver/syslog" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" ) @@ -75,6 +80,8 @@ type dockerPID struct { } type DockerHandle struct { + pluginClient *plugin.Client + logCollector syslog.LogCollector client *docker.Client logger *log.Logger cleanupContainer bool @@ -474,8 +481,41 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err) } } + + taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] + if !ok { + return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + } + d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID) + syslogAddr, err := getFreePort(d.config.ClientMinPort, d.config.ClientMaxPort) + if err != nil { + return nil, fmt.Errorf("error creating the syslog plugin: %v", err) + } + + bin, err := discover.NomadExecutable() + if err != nil { + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) + } + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) + pluginConfig := &plugin.ClientConfig{ + Cmd: exec.Command(bin, "syslog", pluginLogFile), + } + + logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + return nil, err + } + logCollectorCtx := &syslog.LogCollectorContext{ + TaskName: task.Name, + AllocDir: ctx.AllocDir, + LogConfig: task.LogConfig, + } + if _, err := logCollector.LaunchCollector(syslogAddr, logCollectorCtx); err != nil { + return nil, fmt.Errorf("failed to start syslog collector: %v", err) + } + config, err := d.createContainer(ctx, task, &driverConfig) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err) @@ -543,6 +583,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle // Return a driver handle h := &DockerHandle{ client: client, + logCollector: logCollector, + pluginClient: pluginClient, cleanupContainer: cleanupContainer, cleanupImage: cleanupImage, logger: d.logger, @@ -706,4 +748,10 @@ func (h *DockerHandle) run() { close(h.doneCh) h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err) close(h.waitCh) + + // Shutdown the syslog collector + if err := h.logCollector.Exit(); err != nil { + h.logger.Printf("[ERR] driver.docker: failed to kill the syslog collector: %v", err) + } + h.pluginClient.Kill() } diff --git a/client/driver/plugins.go b/client/driver/plugins.go index acff16105..4808d81e1 100644 --- a/client/driver/plugins.go +++ b/client/driver/plugins.go @@ -15,9 +15,15 @@ var HandshakeConfig = plugin.HandshakeConfig{ } func GetPluginMap(w io.Writer) map[string]plugin.Plugin { - p := new(ExecutorPlugin) - p.logger = log.New(w, "", log.LstdFlags) - return map[string]plugin.Plugin{"executor": p} + e := new(ExecutorPlugin) + e.logger = log.New(w, "", log.LstdFlags) + + s := new(SyslogCollectorPlugin) + s.logger = log.New(w, "", log.LstdFlags) + return map[string]plugin.Plugin{ + "executor": e, + "syslogcollector": s, + } } // ExecutorReattachConfig is the config that we seralize and de-serialize and diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go new file mode 100644 index 000000000..44610537a --- /dev/null +++ b/client/driver/syslog/collector.go @@ -0,0 +1,106 @@ +package syslog + +import ( + "fmt" + // "io" + "log" + "net" + // "path/filepath" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/driver/logrotator" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/mcuadros/go-syslog" +) + +type LogCollectorContext struct { + TaskName string + AllocDir *allocdir.AllocDir + LogConfig *structs.LogConfig +} + +type SyslogCollectorState struct { + IsolationConfig *IsolationConfig +} + +type LogCollector interface { + LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error) + Exit() error + UpdateLogConfig(logConfig *structs.LogConfig) error +} + +type IsolationConfig struct { +} + +type SyslogCollector struct { + addr net.Addr + logConfig *structs.LogConfig + ctx *LogCollectorContext + + lro *logrotator.LogRotator + lre *logrotator.LogRotator + server *syslog.Server + taskDir string + + logger *log.Logger +} + +func NewSyslogCollector(logger *log.Logger) *SyslogCollector { + return &SyslogCollector{logger: logger} +} + +func (s *SyslogCollector) LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error) { + s.logger.Printf("sylog-server: launching syslog server on addr: %v", addr) + s.ctx = ctx + // configuring the task dir + if err := s.configureTaskDir(); err != nil { + return nil, err + } + + channel := make(syslog.LogPartsChannel) + handler := syslog.NewChannelHandler(channel) + + s.server = syslog.NewServer() + s.server.SetFormat(syslog.RFC5424) + s.server.SetHandler(handler) + s.server.ListenTCP(addr.String()) + if err := s.server.Boot(); err != nil { + return nil, err + } + // r, w := io.Pipe() + // logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) + // lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), + // fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, + // logFileSize, s.logger) + // if err != nil { + // return err + // } + // go lro.Start(r) + + go func(channel syslog.LogPartsChannel) { + for logParts := range channel { + s.logger.Printf("logparts: %v", logParts) + // w.Write([]byte(logParts)) + } + }(channel) + go s.server.Wait() + return &SyslogCollectorState{}, nil +} + +func (s *SyslogCollector) Exit() error { + return nil +} + +func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error { + return nil +} + +// configureTaskDir sets the task dir in the executor +func (s *SyslogCollector) configureTaskDir() error { + taskDir, ok := s.ctx.AllocDir.TaskDirs[s.ctx.TaskName] + if !ok { + return fmt.Errorf("couldn't find task directory for task %v", s.ctx.TaskName) + } + s.taskDir = taskDir + return nil +} diff --git a/client/driver/syslog_plugin.go b/client/driver/syslog_plugin.go new file mode 100644 index 000000000..f999ef545 --- /dev/null +++ b/client/driver/syslog_plugin.go @@ -0,0 +1,75 @@ +package driver + +import ( + "log" + "net" + "net/rpc" + + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/driver/syslog" + "github.com/hashicorp/nomad/nomad/structs" +) + +type SyslogCollectorRPC struct { + client *rpc.Client +} + +type LaunchCollectorArgs struct { + AddrNet string + AddrName string + Ctx *syslog.LogCollectorContext +} + +func (e *SyslogCollectorRPC) LaunchCollector(addr net.Addr, + ctx *syslog.LogCollectorContext) (*syslog.SyslogCollectorState, error) { + var ss *syslog.SyslogCollectorState + err := e.client.Call("Plugin.LaunchCollector", + LaunchCollectorArgs{AddrNet: addr.Network(), AddrName: addr.String(), Ctx: ctx}, &ss) + return ss, err +} + +func (e *SyslogCollectorRPC) Exit() error { + return e.client.Call("Plugin.Exit", new(interface{}), new(interface{})) +} + +func (e *SyslogCollectorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error { + return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{})) +} + +type SyslogCollectorRPCServer struct { + Impl syslog.LogCollector +} + +func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs, + resp *syslog.SyslogCollectorState) error { + addr, _ := net.ResolveTCPAddr(args.AddrNet, args.AddrName) + ss, err := s.Impl.LaunchCollector(addr, args.Ctx) + if err != nil { + *resp = *ss + } + return err +} + +func (s *SyslogCollectorRPCServer) Exit(args interface{}, resp *interface{}) error { + return s.Impl.Exit() +} + +func (s *SyslogCollectorRPCServer) UpdateLogConfig(logConfig *structs.LogConfig, resp *interface{}) error { + return s.Impl.UpdateLogConfig(logConfig) +} + +type SyslogCollectorPlugin struct { + logger *log.Logger + Impl *SyslogCollectorRPCServer +} + +func (p *SyslogCollectorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { + if p.Impl == nil { + p.Impl = &SyslogCollectorRPCServer{Impl: syslog.NewSyslogCollector(p.logger)} + } + return p.Impl, nil +} + +func (p *SyslogCollectorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { + return &SyslogCollectorRPC{client: c}, nil +} diff --git a/client/driver/utils.go b/client/driver/utils.go index 188e467d2..c3e253eda 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -3,17 +3,20 @@ package driver import ( "fmt" "io" + "net" "os" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" + "github.com/hashicorp/nomad/client/driver/syslog" ) // createExecutor launches an executor plugin and returns an instance of the // Executor interface -func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *config.Config) (executor.Executor, *plugin.Client, error) { +func createExecutor(config *plugin.ClientConfig, w io.Writer, + clientConfig *config.Config) (executor.Executor, *plugin.Client, error) { config.HandshakeConfig = HandshakeConfig config.Plugins = GetPluginMap(w) config.MaxPort = clientConfig.ClientMaxPort @@ -39,6 +42,48 @@ func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *conf return executorPlugin, executorClient, nil } +func createLogCollector(config *plugin.ClientConfig, w io.Writer, + clientConfig *config.Config) (syslog.LogCollector, *plugin.Client, error) { + config.HandshakeConfig = HandshakeConfig + config.Plugins = GetPluginMap(w) + config.MaxPort = clientConfig.ClientMaxPort + config.MinPort = clientConfig.ClientMinPort + if config.Cmd != nil { + isolateCommand(config.Cmd) + } + + syslogClient := plugin.NewClient(config) + rpcCLient, err := syslogClient.Client() + if err != nil { + return nil, nil, fmt.Errorf("error creating rpc client for syslog plugin: %v", err) + } + + raw, err := rpcCLient.Dispense("syslogcollector") + if err != nil { + return nil, nil, fmt.Errorf("unable to dispense the syslog plugin: %v", err) + } + logCollector := raw.(syslog.LogCollector) + return logCollector, syslogClient, nil +} + +// getFreePort returns a free port ready to be listened on between upper and +// lower bounds +func getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) { + for i := lowerBound; i <= upperBound; i++ { + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) + if err != nil { + return nil, err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + continue + } + defer l.Close() + return l.Addr(), nil + } + return nil, fmt.Errorf("No free port found") +} + // killProcess kills a process with the given pid func killProcess(pid int) error { proc, err := os.FindProcess(pid) diff --git a/command/syslog_plugin.go b/command/syslog_plugin.go new file mode 100644 index 000000000..b10d6217d --- /dev/null +++ b/command/syslog_plugin.go @@ -0,0 +1,43 @@ +package command + +import ( + "os" + "strings" + + "github.com/hashicorp/go-plugin" + + "github.com/hashicorp/nomad/client/driver" +) + +type SyslogPluginCommand struct { + Meta +} + +func (e *SyslogPluginCommand) Help() string { + helpText := ` + This is a command used by Nomad internally to launch a syslog collector" + ` + return strings.TrimSpace(helpText) +} + +func (s *SyslogPluginCommand) Synopsis() string { + return "internal - lanch a syslog collector plugin" +} + +func (s *SyslogPluginCommand) Run(args []string) int { + if len(args) == 0 { + s.Ui.Error("log output file isn't provided") + } + logFileName := args[0] + stdo, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + s.Ui.Error(err.Error()) + return 1 + } + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: driver.HandshakeConfig, + Plugins: driver.GetPluginMap(stdo), + }) + + return 0 +} diff --git a/commands.go b/commands.go index 6ad1ba3c3..c77c8482a 100644 --- a/commands.go +++ b/commands.go @@ -105,7 +105,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, - + "syslog": func() (cli.Command, error) { + return &command.SyslogPluginCommand{ + Meta: meta, + }, nil + }, "server-force-leave": func() (cli.Command, error) { return &command.ServerForceLeaveCommand{ Meta: meta, diff --git a/main.go b/main.go index 1e3558658..11d237709 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,7 @@ func RunCustom(args []string, commands map[string]cli.CommandFactory) int { for k, _ := range commands { switch k { case "executor": + case "syslog": default: commandsInclude = append(commandsInclude, k) } From 4963026eab0f648f16753e8bf6d2fb2582abc576 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 07:52:15 -0800 Subject: [PATCH 07/31] Writing log lines from docker to files --- client/driver/docker.go | 27 +++++++------ client/driver/syslog/collector.go | 65 +++++++++++++++++++++---------- client/driver/syslog/parser.go | 44 +++++++++++++++++++++ client/driver/syslog_plugin.go | 16 +++----- client/driver/utils.go | 19 --------- 5 files changed, 107 insertions(+), 64 deletions(-) create mode 100644 client/driver/syslog/parser.go diff --git a/client/driver/docker.go b/client/driver/docker.go index 3841572eb..ac2f1149d 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -180,7 +180,8 @@ func (d *DockerDriver) containerBinds(alloc *allocdir.AllocDir, task *structs.Ta } // createContainer initializes a struct needed to call docker.client.CreateContainer() -func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, driverConfig *DockerDriverConfig) (docker.CreateContainerOptions, error) { +func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, + driverConfig *DockerDriverConfig, syslogAddr string) (docker.CreateContainerOptions, error) { var c docker.CreateContainerOptions if task.Resources == nil { // Guard against missing resources. We should never have been able to @@ -238,10 +239,9 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri // used to share data between different tasks in the same task group. Binds: binds, LogConfig: docker.LogConfig{ - Type: "json-file", + Type: "syslog", Config: map[string]string{ - "max-size": fmt.Sprintf("%dm", task.LogConfig.MaxFileSizeMB), - "max-file": strconv.Itoa(task.LogConfig.MaxFiles), + "syslog-address": fmt.Sprintf("tcp://%v", syslogAddr), }, }, } @@ -489,11 +489,6 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID) - syslogAddr, err := getFreePort(d.config.ClientMinPort, d.config.ClientMaxPort) - if err != nil { - return nil, fmt.Errorf("error creating the syslog plugin: %v", err) - } - bin, err := discover.NomadExecutable() if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) @@ -508,15 +503,19 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return nil, err } logCollectorCtx := &syslog.LogCollectorContext{ - TaskName: task.Name, - AllocDir: ctx.AllocDir, - LogConfig: task.LogConfig, + TaskName: task.Name, + AllocDir: ctx.AllocDir, + LogConfig: task.LogConfig, + PortLowerBound: d.config.ClientMinPort, + PortUpperBound: d.config.ClientMaxPort, } - if _, err := logCollector.LaunchCollector(syslogAddr, logCollectorCtx); err != nil { + ss, err := logCollector.LaunchCollector(logCollectorCtx) + if err != nil { return nil, fmt.Errorf("failed to start syslog collector: %v", err) } + d.logger.Printf("Started the syslog server at %v", ss.Addr) - config, err := d.createContainer(ctx, task, &driverConfig) + config, err := d.createContainer(ctx, task, &driverConfig, ss.Addr) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err) return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err) diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go index 44610537a..e59db74a7 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/syslog/collector.go @@ -2,10 +2,10 @@ package syslog import ( "fmt" - // "io" + "io" "log" "net" - // "path/filepath" + "path/filepath" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/logrotator" @@ -14,17 +14,20 @@ import ( ) type LogCollectorContext struct { - TaskName string - AllocDir *allocdir.AllocDir - LogConfig *structs.LogConfig + TaskName string + AllocDir *allocdir.AllocDir + LogConfig *structs.LogConfig + PortUpperBound uint + PortLowerBound uint } type SyslogCollectorState struct { IsolationConfig *IsolationConfig + Addr string } type LogCollector interface { - LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error) + LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) Exit() error UpdateLogConfig(logConfig *structs.LogConfig) error } @@ -49,7 +52,11 @@ func NewSyslogCollector(logger *log.Logger) *SyslogCollector { return &SyslogCollector{logger: logger} } -func (s *SyslogCollector) LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error) { +func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) { + addr, err := s.getFreePort(ctx.PortLowerBound, ctx.PortUpperBound) + if err != nil { + return nil, err + } s.logger.Printf("sylog-server: launching syslog server on addr: %v", addr) s.ctx = ctx // configuring the task dir @@ -61,30 +68,30 @@ func (s *SyslogCollector) LaunchCollector(addr net.Addr, ctx *LogCollectorContex handler := syslog.NewChannelHandler(channel) s.server = syslog.NewServer() - s.server.SetFormat(syslog.RFC5424) + s.server.SetFormat(&CustomParser{logger: s.logger}) s.server.SetHandler(handler) s.server.ListenTCP(addr.String()) if err := s.server.Boot(); err != nil { return nil, err } - // r, w := io.Pipe() - // logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) - // lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), - // fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, - // logFileSize, s.logger) - // if err != nil { - // return err - // } - // go lro.Start(r) + r, w := io.Pipe() + logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) + lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), + fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, + logFileSize, s.logger) + if err != nil { + return nil, err + } + go lro.Start(r) go func(channel syslog.LogPartsChannel) { for logParts := range channel { - s.logger.Printf("logparts: %v", logParts) - // w.Write([]byte(logParts)) + message := logParts["content"].(string) + w.Write([]byte(message)) } }(channel) go s.server.Wait() - return &SyslogCollectorState{}, nil + return &SyslogCollectorState{Addr: addr.String()}, nil } func (s *SyslogCollector) Exit() error { @@ -104,3 +111,21 @@ func (s *SyslogCollector) configureTaskDir() error { s.taskDir = taskDir return nil } + +// getFreePort returns a free port ready to be listened on between upper and +// lower bounds +func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) { + for i := lowerBound; i <= upperBound; i++ { + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) + if err != nil { + return nil, err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + continue + } + defer l.Close() + return l.Addr(), nil + } + return nil, fmt.Errorf("No free port found") +} diff --git a/client/driver/syslog/parser.go b/client/driver/syslog/parser.go new file mode 100644 index 000000000..6c43fb986 --- /dev/null +++ b/client/driver/syslog/parser.go @@ -0,0 +1,44 @@ +package syslog + +import ( + "bufio" + "log" + "time" + + "github.com/jeromer/syslogparser" +) + +type DockerLogParser struct { + line []byte + + log *log.Logger +} + +func NewDockerLogParser(line []byte) *DockerLogParser { + return &DockerLogParser{line: line} +} + +func (d *DockerLogParser) Parse() error { + return nil +} + +func (d *DockerLogParser) Dump() syslogparser.LogParts { + return map[string]interface{}{ + "content": string(d.line), + } +} + +func (d *DockerLogParser) Location(location *time.Location) { +} + +type CustomParser struct { + logger *log.Logger +} + +func (c *CustomParser) GetParser(line []byte) syslogparser.LogParser { + return NewDockerLogParser(line) +} + +func (c *CustomParser) GetSplitFunc() bufio.SplitFunc { + return nil +} diff --git a/client/driver/syslog_plugin.go b/client/driver/syslog_plugin.go index f999ef545..2a0f104b7 100644 --- a/client/driver/syslog_plugin.go +++ b/client/driver/syslog_plugin.go @@ -2,7 +2,6 @@ package driver import ( "log" - "net" "net/rpc" "github.com/hashicorp/go-plugin" @@ -15,16 +14,12 @@ type SyslogCollectorRPC struct { } type LaunchCollectorArgs struct { - AddrNet string - AddrName string - Ctx *syslog.LogCollectorContext + Ctx *syslog.LogCollectorContext } -func (e *SyslogCollectorRPC) LaunchCollector(addr net.Addr, - ctx *syslog.LogCollectorContext) (*syslog.SyslogCollectorState, error) { +func (e *SyslogCollectorRPC) LaunchCollector(ctx *syslog.LogCollectorContext) (*syslog.SyslogCollectorState, error) { var ss *syslog.SyslogCollectorState - err := e.client.Call("Plugin.LaunchCollector", - LaunchCollectorArgs{AddrNet: addr.Network(), AddrName: addr.String(), Ctx: ctx}, &ss) + err := e.client.Call("Plugin.LaunchCollector", LaunchCollectorArgs{Ctx: ctx}, &ss) return ss, err } @@ -42,9 +37,8 @@ type SyslogCollectorRPCServer struct { func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs, resp *syslog.SyslogCollectorState) error { - addr, _ := net.ResolveTCPAddr(args.AddrNet, args.AddrName) - ss, err := s.Impl.LaunchCollector(addr, args.Ctx) - if err != nil { + ss, err := s.Impl.LaunchCollector(args.Ctx) + if ss != nil { *resp = *ss } return err diff --git a/client/driver/utils.go b/client/driver/utils.go index c3e253eda..21b83d033 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -3,7 +3,6 @@ package driver import ( "fmt" "io" - "net" "os" "github.com/hashicorp/go-multierror" @@ -66,24 +65,6 @@ func createLogCollector(config *plugin.ClientConfig, w io.Writer, return logCollector, syslogClient, nil } -// getFreePort returns a free port ready to be listened on between upper and -// lower bounds -func getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) { - for i := lowerBound; i <= upperBound; i++ { - addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) - if err != nil { - return nil, err - } - l, err := net.ListenTCP("tcp", addr) - if err != nil { - continue - } - defer l.Close() - return l.Addr(), nil - } - return nil, fmt.Errorf("No free port found") -} - // killProcess kills a process with the given pid func killProcess(pid int) error { proc, err := os.FindProcess(pid) From 55730424af60c693702683958581e605bd6bedd3 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 08:03:31 -0800 Subject: [PATCH 08/31] Simplified the logic of conversion of interface{} to byte array --- client/driver/syslog/collector.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go index e59db74a7..467add13d 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/syslog/collector.go @@ -1,6 +1,8 @@ package syslog import ( + "bytes" + "encoding/gob" "fmt" "io" "log" @@ -86,8 +88,11 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl go func(channel syslog.LogPartsChannel) { for logParts := range channel { - message := logParts["content"].(string) - w.Write([]byte(message)) + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(logParts["content"]); err == nil { + w.Write(buf.Bytes()) + } } }(channel) go s.server.Wait() From 93d283c3a2048c61224751df9eb01c4e690e1bd2 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 08:13:08 -0800 Subject: [PATCH 09/31] Updated comments --- client/driver/executor/executor.go | 1 + client/driver/syslog/collector.go | 46 +++++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 08d5dc834..7ab5bc96e 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -185,6 +185,7 @@ func (e *UniversalExecutor) Wait() (*ProcessState, error) { return e.exitState, nil } +// UpdateLogConfig updates the log configuration func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error { e.ctx.LogConfig = logConfig if e.lro == nil { diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go index 467add13d..0e86b3c5f 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/syslog/collector.go @@ -15,28 +15,48 @@ import ( "github.com/mcuadros/go-syslog" ) +// LogCollectorContext holds context to configure the syslog server type LogCollectorContext struct { - TaskName string - AllocDir *allocdir.AllocDir - LogConfig *structs.LogConfig + // TaskName is the name of the Task + TaskName string + + // AllocDir is the handle to do operations on the alloc dir of + // the task + AllocDir *allocdir.AllocDir + + // LogConfig provides configuration related to log rotation + LogConfig *structs.LogConfig + + // PortUpperBound is the upper bound of the ports that we can use to start + // the syslog server PortUpperBound uint + + // PortLowerBound is the lower bound of the ports that we can use to start + // the syslog server PortLowerBound uint } +// SyslogCollectorState holds the address and islation information of a launched +// syslog server type SyslogCollectorState struct { IsolationConfig *IsolationConfig Addr string } +// LogCollector is an interface which allows a driver to launch a log server +// and update log configuration type LogCollector interface { LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) Exit() error UpdateLogConfig(logConfig *structs.LogConfig) error } +// IsolationConfig has the cgroup related information of a syslog server type IsolationConfig struct { } +// SyslogCollector is a LogCollector which starts a syslog server and does +// rotation to incoming stream type SyslogCollector struct { addr net.Addr logConfig *structs.LogConfig @@ -50,10 +70,13 @@ type SyslogCollector struct { logger *log.Logger } +// NewSyslogCollector returns an implementation of the SyslogCollector func NewSyslogCollector(logger *log.Logger) *SyslogCollector { return &SyslogCollector{logger: logger} } +// LaunchCollector launches a new syslog server and starts writing log lines to +// files and rotates them func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) { addr, err := s.getFreePort(ctx.PortLowerBound, ctx.PortUpperBound) if err != nil { @@ -99,12 +122,27 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl return &SyslogCollectorState{Addr: addr.String()}, nil } +// Exit kills the syslog server func (s *SyslogCollector) Exit() error { - return nil + return s.server.Kill() } +// UpdateLogConfig updates the log configuration func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error { + s.ctx.LogConfig = logConfig + if s.lro == nil { + return fmt.Errorf("log rotator for stdout doesn't exist") + } + s.lro.MaxFiles = logConfig.MaxFiles + s.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + + if s.lre == nil { + return fmt.Errorf("log rotator for stderr doesn't exist") + } + s.lre.MaxFiles = logConfig.MaxFiles + s.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) return nil + } // configureTaskDir sets the task dir in the executor From 045a653bb0573a9a4d8ec7cb4b8a31c352d81fc8 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 10:18:14 -0800 Subject: [PATCH 10/31] Adding newlines to loglines --- client/driver/docker.go | 30 ++++++++++++++++++++++-------- client/driver/syslog/collector.go | 10 +++------- client/driver/syslog/parser.go | 2 +- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index ac2f1149d..8bd348b39 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -74,9 +74,10 @@ func (c *DockerDriverConfig) Validate() error { } type dockerPID struct { - ImageID string - ContainerID string - KillTimeout time.Duration + ImageID string + ContainerID string + KillTimeout time.Duration + PluginConfig *PluginReattachConfig } type DockerHandle struct { @@ -493,7 +494,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) } - pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-syslog-collector.out", task.Name)) pluginConfig := &plugin.ClientConfig{ Cmd: exec.Command(bin, "syslog", pluginLogFile), } @@ -518,6 +519,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle config, err := d.createContainer(ctx, task, &driverConfig, ss.Addr) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err) + pluginClient.Kill() return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err) } // Create a container @@ -536,12 +538,14 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle }) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to query list of containers matching name:%s", config.Name) + pluginClient.Kill() return nil, fmt.Errorf("Failed to query list of containers: %s", err) } // Couldn't find any matching containers if len(containers) == 0 { d.logger.Printf("[ERR] driver.docker: failed to get id for container %s: %#v", config.Name, containers) + pluginClient.Kill() return nil, fmt.Errorf("Failed to get id for container %s", config.Name) } @@ -553,6 +557,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle }) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to purge container %s", container.ID) + pluginClient.Kill() return nil, fmt.Errorf("Failed to purge container %s: %s", container.ID, err) } d.logger.Printf("[INFO] driver.docker: purged container %s", container.ID) @@ -561,11 +566,13 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle container, err = client.CreateContainer(config) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to re-create container %s; aborting", config.Name) + pluginClient.Kill() return nil, fmt.Errorf("Failed to re-create container %s; aborting", config.Name) } } else { // We failed to create the container for some other reason. d.logger.Printf("[ERR] driver.docker: failed to create container from image %s: %s", image, err) + pluginClient.Kill() return nil, fmt.Errorf("Failed to create container from image %s: %s", image, err) } } @@ -575,6 +582,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle err = client.StartContainer(container.ID, container.HostConfig) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err) + pluginClient.Kill() return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err) } d.logger.Printf("[INFO] driver.docker: started container %s", container.ID) @@ -608,8 +616,11 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } d.logger.Printf("[INFO] driver.docker: re-attaching to docker process: %s", handleID) + pluginConfig := &plugin.ClientConfig{ + Reattach: pid.PluginConfig.PluginConfig(), + } - // Initialize docker API client + logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) client, err := d.dockerClient() if err != nil { return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) @@ -638,6 +649,8 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er // Return a driver handle h := &DockerHandle{ client: client, + logCollector: logCollector, + pluginClient: pluginClient, cleanupContainer: cleanupContainer, cleanupImage: cleanupImage, logger: d.logger, @@ -654,9 +667,10 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er func (h *DockerHandle) ID() string { // Return a handle to the PID pid := dockerPID{ - ImageID: h.imageID, - ContainerID: h.containerID, - KillTimeout: h.killTimeout, + ImageID: h.imageID, + ContainerID: h.containerID, + KillTimeout: h.killTimeout, + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), } data, err := json.Marshal(pid) if err != nil { diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go index 0e86b3c5f..91b1f27c3 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/syslog/collector.go @@ -1,8 +1,6 @@ package syslog import ( - "bytes" - "encoding/gob" "fmt" "io" "log" @@ -109,13 +107,11 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl } go lro.Start(r) + // map[string]interface{} go func(channel syslog.LogPartsChannel) { for logParts := range channel { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - if err := enc.Encode(logParts["content"]); err == nil { - w.Write(buf.Bytes()) - } + w.Write(logParts["content"].([]byte)) + w.Write([]byte("\n")) } }(channel) go s.server.Wait() diff --git a/client/driver/syslog/parser.go b/client/driver/syslog/parser.go index 6c43fb986..4c4371a75 100644 --- a/client/driver/syslog/parser.go +++ b/client/driver/syslog/parser.go @@ -24,7 +24,7 @@ func (d *DockerLogParser) Parse() error { func (d *DockerLogParser) Dump() syslogparser.LogParts { return map[string]interface{}{ - "content": string(d.line), + "content": d.line, } } From 9f26b7606c3be2bbaf25875d4edb91b68afc89d0 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 12:09:07 -0800 Subject: [PATCH 11/31] Parsing the severity of the log lines --- client/driver/syslog/collector.go | 25 +++++++-- client/driver/syslog/parser.go | 78 ++++++++++++++++++++++++++++- client/driver/syslog/parser_test.go | 18 +++++++ 3 files changed, 115 insertions(+), 6 deletions(-) create mode 100644 client/driver/syslog/parser_test.go diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go index 91b1f27c3..cb62b72a4 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/syslog/collector.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + s1 "log/syslog" "net" "path/filepath" @@ -97,21 +98,35 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl if err := s.server.Boot(); err != nil { return nil, err } - r, w := io.Pipe() logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) + + ro, wo := io.Pipe() lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, logFileSize, s.logger) if err != nil { return nil, err } - go lro.Start(r) + go lro.Start(ro) + + re, we := io.Pipe() + lre, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), + fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles, + logFileSize, s.logger) + if err != nil { + return nil, err + } + go lre.Start(re) - // map[string]interface{} go func(channel syslog.LogPartsChannel) { for logParts := range channel { - w.Write(logParts["content"].([]byte)) - w.Write([]byte("\n")) + s := logParts["severity"].(s1.Priority) + if s == s1.LOG_ERR { + we.Write(logParts["content"].([]byte)) + } else { + wo.Write(logParts["content"].([]byte)) + } + wo.Write([]byte("\n")) } }(channel) go s.server.Wait() diff --git a/client/driver/syslog/parser.go b/client/driver/syslog/parser.go index 4c4371a75..95d52593e 100644 --- a/client/driver/syslog/parser.go +++ b/client/driver/syslog/parser.go @@ -2,12 +2,35 @@ package syslog import ( "bufio" + "fmt" "log" + "log/syslog" + "strconv" "time" "github.com/jeromer/syslogparser" ) +var ( + ErrPriorityNoStart = fmt.Errorf("No start char found for priority") + ErrPriorityEmpty = fmt.Errorf("Priority field empty") + ErrPriorityNoEnd = fmt.Errorf("No end char found for priority") + ErrPriorityTooShort = fmt.Errorf("Priority field too short") + ErrPriorityTooLong = fmt.Errorf("Priority field too long") + ErrPriorityNonDigit = fmt.Errorf("Non digit found in priority") +) + +const ( + PRI_PART_START = '<' + PRI_PART_END = '>' +) + +type Priority struct { + P syslog.Priority + F syslog.Priority + S syslog.Priority +} + type DockerLogParser struct { line []byte @@ -23,8 +46,61 @@ func (d *DockerLogParser) Parse() error { } func (d *DockerLogParser) Dump() syslogparser.LogParts { + severity, idx, _ := d.parsePriority(d.line) return map[string]interface{}{ - "content": d.line, + "content": d.line[idx:], + "severity": severity.S, + } +} + +func (d *DockerLogParser) parsePriority(line []byte) (Priority, int, error) { + cursor := 0 + pri := d.newPriority(0) + if len(line) <= 0 { + return pri, cursor, ErrPriorityEmpty + } + if line[cursor] != PRI_PART_START { + return pri, cursor, ErrPriorityNoStart + } + i := 1 + priDigit := 0 + for i < len(line) { + if i >= 5 { + return pri, cursor, ErrPriorityTooLong + } + c := line[i] + if c == PRI_PART_END { + if i == 1 { + return pri, cursor, ErrPriorityTooShort + } + cursor = i + 1 + return d.newPriority(priDigit), cursor, nil + } + if d.isDigit(c) { + v, e := strconv.Atoi(string(c)) + if e != nil { + return pri, cursor, e + } + priDigit = (priDigit * 10) + v + } else { + return pri, cursor, ErrPriorityNonDigit + } + i++ + } + return pri, cursor, ErrPriorityNoEnd +} + +func (d *DockerLogParser) isDigit(c byte) bool { + return c >= '0' && c <= '9' +} + +func (d *DockerLogParser) newPriority(p int) Priority { + // The Priority value is calculated by first multiplying the Facility + // number by 8 and then adding the numerical value of the Severity. + return Priority{ + P: syslog.Priority(p), + F: syslog.Priority(p / 8), + S: syslog.Priority(p % 8), } } diff --git a/client/driver/syslog/parser_test.go b/client/driver/syslog/parser_test.go new file mode 100644 index 000000000..8b0e8b2b6 --- /dev/null +++ b/client/driver/syslog/parser_test.go @@ -0,0 +1,18 @@ +package syslog + +import ( + "log/syslog" + "testing" +) + +func TestLogParser_Priority(t *testing.T) { + line := []byte("<30>2016-02-10T10:16:43-08:00 d-thinkpad docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf") + d := NewDockerLogParser(line) + p, err := d.parsePriority(line) + if err != nil { + t.Fatalf("got an err: %v", err) + } + if p.S != syslog.LOG_INFO { + t.Fatalf("expected serverity: %v, got: %v", syslog.LOG_INFO, p.S) + } +} From e2768f49a3c26af4edb32c37d8ce4ab4687eaabe Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 13:18:10 -0800 Subject: [PATCH 12/31] Fixed the docker handle id test --- client/driver/docker_test.go | 37 ++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 0eb09ce2e..58f2c0953 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -4,6 +4,8 @@ import ( "fmt" "io/ioutil" "math/rand" + "os" + "os/exec" "path/filepath" "reflect" "runtime/debug" @@ -11,9 +13,11 @@ import ( "time" docker "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/env" cstructs "github.com/hashicorp/nomad/client/driver/structs" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" ) @@ -125,16 +129,37 @@ func dockerSetup(t *testing.T, task *structs.Task) (*docker.Client, DriverHandle func TestDockerDriver_Handle(t *testing.T) { t.Parallel() + + bin, err := discover.NomadExecutable() + if err != nil { + t.Fatalf("got an err: %v", err) + } + + f, _ := ioutil.TempFile(os.TempDir(), "") + defer f.Close() + defer os.Remove(f.Name()) + pluginConfig := &plugin.ClientConfig{ + Cmd: exec.Command(bin, "syslog", f.Name()), + } + logCollector, pluginClient, err := createLogCollector(pluginConfig, os.Stdout, &config.Config{}) + if err != nil { + t.Fatalf("got an err: %v", err) + } + defer pluginClient.Kill() + h := &DockerHandle{ - imageID: "imageid", - containerID: "containerid", - killTimeout: 5 * time.Nanosecond, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + imageID: "imageid", + logCollector: logCollector, + pluginClient: pluginClient, + containerID: "containerid", + killTimeout: 5 * time.Nanosecond, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } actual := h.ID() - expected := `DOCKER:{"ImageID":"imageid","ContainerID":"containerid","KillTimeout":5}` + expected := fmt.Sprintf("DOCKER:{\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}", + pluginClient.ReattachConfig().Pid, pluginClient.ReattachConfig().Addr.String()) if actual != expected { t.Errorf("Expected `%s`, found `%s`", expected, actual) } From ffd2bc59a1a18ed94210f2e6e5d44ab5b4735030 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 13:29:06 -0800 Subject: [PATCH 13/31] Fixed docker tests --- client/driver/docker_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 58f2c0953..281076818 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -71,6 +71,10 @@ func dockerTask() (*structs.Task, int, int) { Config: map[string]interface{}{ "image": "redis", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: &structs.Resources{ MemoryMB: 256, CPU: 512, @@ -197,6 +201,10 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { Config: map[string]interface{}{ "image": "redis", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -236,6 +244,10 @@ func TestDockerDriver_Start_Wait(t *testing.T) { MemoryMB: 256, CPU: 512, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, } _, handle, cleanup := dockerSetup(t, task) @@ -279,6 +291,10 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { string(exp), env.AllocDir, file), }, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: &structs.Resources{ MemoryMB: 256, CPU: 512, @@ -328,6 +344,10 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) { "command": "/bin/sleep", "args": []string{"10"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -462,6 +482,10 @@ func TestDockerHostNet(t *testing.T) { MemoryMB: 256, CPU: 512, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, } client, handle, cleanup := dockerSetup(t, task) From adf86c2f7868e338f4ca3fadff47c4b7d4631205 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 13:36:47 -0800 Subject: [PATCH 14/31] Fixing the api tests --- api/tasks.go | 6 ++++++ api/util_test.go | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/api/tasks.go b/api/tasks.go index 04056da21..8b2ae3927 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -133,6 +133,12 @@ func (t *Task) Constrain(c *Constraint) *Task { return t } +// SetLogConfig sets a log config to a task +func (t *Task) SetLogConfig(l *LogConfig) *Task { + t.LogConfig = l + return t +} + // TaskState tracks the current state of a task and events that caused state // transistions. type TaskState struct { diff --git a/api/util_test.go b/api/util_test.go index 30feafdd3..190e0f3e7 100644 --- a/api/util_test.go +++ b/api/util_test.go @@ -26,6 +26,10 @@ func testJob() *Job { MemoryMB: 256, DiskMB: 25, IOPS: 10, + }). + SetLogConfig(&LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, }) group := NewTaskGroup("group1", 1). From d94c28b39ccfd9e8abee9925b87976be82c31b35 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 13:54:54 -0800 Subject: [PATCH 15/31] Fixed more tests --- client/driver/exec_test.go | 28 ++++++++++++++++++++++++++++ client/driver/java_test.go | 12 ++++++++++++ client/driver/qemu_test.go | 8 ++++++++ client/driver/raw_exec_test.go | 24 ++++++++++++++++++++++++ 4 files changed, 72 insertions(+) diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 85da6bfa5..dcfe56604 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -50,6 +50,10 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) { "command": "/bin/sleep", "args": []string{"5"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -87,6 +91,10 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { "command": "/bin/sleep", "args": []string{"1000000"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -144,6 +152,10 @@ func TestExecDriver_Start_Wait(t *testing.T) { "command": "/bin/sleep", "args": []string{"2"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -188,6 +200,10 @@ func TestExecDriver_Start_Artifact_basic(t *testing.T) { "artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s?checksum=%s", file, checksum), "command": file, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -232,6 +248,10 @@ func TestExecDriver_Start_Artifact_expanded(t *testing.T) { "command": "/bin/bash", "args": []string{"-c", fmt.Sprintf("/bin/sleep 1 && %s", file)}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -278,6 +298,10 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { fmt.Sprintf(`sleep 1; echo -n %s > ${%s}/%s`, string(exp), env.AllocDir, file), }, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -324,6 +348,10 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { "command": "/bin/sleep", "args": []string{"100"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, KillTimeout: 10 * time.Second, } diff --git a/client/driver/java_test.go b/client/driver/java_test.go index a546c611f..7b3fe5076 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -58,6 +58,10 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { "jvm_options": []string{"-Xmx64m", "-Xms32m"}, "checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -103,6 +107,10 @@ func TestJavaDriver_Start_Wait(t *testing.T) { "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", "checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -148,6 +156,10 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { Config: map[string]interface{}{ "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } diff --git a/client/driver/qemu_test.go b/client/driver/qemu_test.go index 151fcf7f9..ff163f975 100644 --- a/client/driver/qemu_test.go +++ b/client/driver/qemu_test.go @@ -49,6 +49,10 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) { "web": 8080, }}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: &structs.Resources{ CPU: 500, MemoryMB: 512, @@ -101,6 +105,10 @@ func TestQemuDriver_RequiresMemory(t *testing.T) { "checksum": "sha256:a5e836985934c3392cbbd9b26db55a7d35a8d7ae1deb7ca559dd9c0159572544", // ssh u/p would be here }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, } driverCtx, execCtx := testDriverContexts(task) diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index a09010d3e..942780ae6 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -61,6 +61,10 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) { "command": testtask.Path(), "args": []string{"sleep", "1s"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) @@ -109,6 +113,10 @@ func TestRawExecDriver_Start_Artifact_basic(t *testing.T) { "command": file, "args": []string{"sleep", "1s"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) @@ -156,6 +164,10 @@ func TestRawExecDriver_Start_Artifact_expanded(t *testing.T) { "command": file, "args": []string{"sleep", "1s"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) @@ -197,6 +209,10 @@ func TestRawExecDriver_Start_Wait(t *testing.T) { "command": testtask.Path(), "args": []string{"sleep", "1s"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) @@ -243,6 +259,10 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { "write", string(exp), outPath, }, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) @@ -289,6 +309,10 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { "command": testtask.Path(), "args": []string{"sleep", "45s"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) From f04b945a35f8eeb0824226f8dc427494ba2cf2e0 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 14:40:26 -0800 Subject: [PATCH 16/31] Stripping hostname, timestamps etc from docker syslog messages --- client/driver/syslog/parser.go | 26 ++++++++++++++++++++++++-- client/driver/syslog/parser_test.go | 8 +++++++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/client/driver/syslog/parser.go b/client/driver/syslog/parser.go index 95d52593e..dae46f5d9 100644 --- a/client/driver/syslog/parser.go +++ b/client/driver/syslog/parser.go @@ -46,13 +46,35 @@ func (d *DockerLogParser) Parse() error { } func (d *DockerLogParser) Dump() syslogparser.LogParts { - severity, idx, _ := d.parsePriority(d.line) + severity, _, _ := d.parsePriority(d.line) + msgIdx := d.logContentIndex(d.line) return map[string]interface{}{ - "content": d.line[idx:], + "content": d.line[msgIdx:], "severity": severity.S, } } +func (d *DockerLogParser) logContentIndex(line []byte) int { + cursor := 0 + numSpace := 0 + for i := 0; i < len(line); i++ { + if line[i] == ' ' { + numSpace += 1 + if numSpace == 1 { + cursor = i + break + } + } + } + for i := cursor; i < len(line); i++ { + if line[i] == ':' { + cursor = i + break + } + } + return cursor + 1 +} + func (d *DockerLogParser) parsePriority(line []byte) (Priority, int, error) { cursor := 0 pri := d.newPriority(0) diff --git a/client/driver/syslog/parser_test.go b/client/driver/syslog/parser_test.go index 8b0e8b2b6..89f5b866b 100644 --- a/client/driver/syslog/parser_test.go +++ b/client/driver/syslog/parser_test.go @@ -8,11 +8,17 @@ import ( func TestLogParser_Priority(t *testing.T) { line := []byte("<30>2016-02-10T10:16:43-08:00 d-thinkpad docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf") d := NewDockerLogParser(line) - p, err := d.parsePriority(line) + p, _, err := d.parsePriority(line) if err != nil { t.Fatalf("got an err: %v", err) } if p.S != syslog.LOG_INFO { t.Fatalf("expected serverity: %v, got: %v", syslog.LOG_INFO, p.S) } + + idx := d.logContentIndex(line) + expected := 68 + if idx != expected { + t.Fatalf("expected idx: %v, got: %v", expected, idx) + } } From dc25fd42d6a5fec3bfaa037ebd7fad59bc1ae230 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 15:04:41 -0800 Subject: [PATCH 17/31] Updating log configs --- client/driver/docker.go | 1 + client/driver/exec.go | 1 + client/driver/java.go | 1 + client/driver/qemu.go | 1 + client/driver/raw_exec.go | 1 + 5 files changed, 5 insertions(+) diff --git a/client/driver/docker.go b/client/driver/docker.go index 8bd348b39..44531e4ee 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -690,6 +690,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult { func (h *DockerHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + h.logCollector.UpdateLogConfig(task.LogConfig) // Update is not possible return nil diff --git a/client/driver/exec.go b/client/driver/exec.go index eb5337dc3..c00a483b9 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -224,6 +224,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult { func (h *execHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible return nil diff --git a/client/driver/java.go b/client/driver/java.go index 7ac3611a5..f7d724efc 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -275,6 +275,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { func (h *javaHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible return nil diff --git a/client/driver/qemu.go b/client/driver/qemu.go index cd0cc0fd2..54e28bbbd 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -296,6 +296,7 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { func (h *qemuHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible return nil diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index ed84f3180..6c8adace2 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -200,6 +200,7 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { func (h *rawExecHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible return nil From c6daed2c003587b3845abe610c889b7f1c3cff75 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 15:23:25 -0800 Subject: [PATCH 18/31] Added some comments --- client/driver/syslog/parser.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/client/driver/syslog/parser.go b/client/driver/syslog/parser.go index dae46f5d9..f7159e3d3 100644 --- a/client/driver/syslog/parser.go +++ b/client/driver/syslog/parser.go @@ -11,6 +11,7 @@ import ( "github.com/jeromer/syslogparser" ) +// Errors related to parsing priority var ( ErrPriorityNoStart = fmt.Errorf("No start char found for priority") ErrPriorityEmpty = fmt.Errorf("Priority field empty") @@ -20,40 +21,52 @@ var ( ErrPriorityNonDigit = fmt.Errorf("Non digit found in priority") ) +// Priority header and ending characters const ( PRI_PART_START = '<' PRI_PART_END = '>' ) +// Priority holds all the priority bits in a syslog log line type Priority struct { P syslog.Priority F syslog.Priority S syslog.Priority } +// DockerLogParser parses a line of log message that the docker daemon ships type DockerLogParser struct { - line []byte + line []byte + content []byte + severity Priority log *log.Logger } +// NewDockerLogParser creates a new DockerLogParser func NewDockerLogParser(line []byte) *DockerLogParser { return &DockerLogParser{line: line} } +// Parse parses a syslog log line func (d *DockerLogParser) Parse() error { + severity, _, _ := d.parsePriority(d.line) + msgIdx := d.logContentIndex(d.line) + d.severity = severity + d.content = d.line[msgIdx:] return nil } +// Dump creates a map of the parsed log line and severity func (d *DockerLogParser) Dump() syslogparser.LogParts { - severity, _, _ := d.parsePriority(d.line) - msgIdx := d.logContentIndex(d.line) return map[string]interface{}{ - "content": d.line[msgIdx:], - "severity": severity.S, + "content": d.content, + "severity": d.severity, } } +// logContentIndex finds out the index of the start index of the content in a +// syslog line func (d *DockerLogParser) logContentIndex(line []byte) int { cursor := 0 numSpace := 0 @@ -75,6 +88,7 @@ func (d *DockerLogParser) logContentIndex(line []byte) int { return cursor + 1 } +// parsePriority parses the priority in a syslog message func (d *DockerLogParser) parsePriority(line []byte) (Priority, int, error) { cursor := 0 pri := d.newPriority(0) @@ -112,10 +126,12 @@ func (d *DockerLogParser) parsePriority(line []byte) (Priority, int, error) { return pri, cursor, ErrPriorityNoEnd } +// isDigit checks if a byte is a numeric char func (d *DockerLogParser) isDigit(c byte) bool { return c >= '0' && c <= '9' } +// newPriority creates a new default priority func (d *DockerLogParser) newPriority(p int) Priority { // The Priority value is calculated by first multiplying the Facility // number by 8 and then adding the numerical value of the Severity. @@ -129,6 +145,7 @@ func (d *DockerLogParser) newPriority(p int) Priority { func (d *DockerLogParser) Location(location *time.Location) { } +// CustomParser is a parser to parse docker syslog lines type CustomParser struct { logger *log.Logger } From 97ece8595e81aeb008fb0680c22d1de577f040e4 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 15:27:40 -0800 Subject: [PATCH 19/31] Sharing the isolationconfig of syslog collector with executor --- client/driver/syslog/collector.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go index cb62b72a4..0ecc8385f 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/syslog/collector.go @@ -9,6 +9,7 @@ import ( "path/filepath" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/driver/executor" "github.com/hashicorp/nomad/client/driver/logrotator" "github.com/hashicorp/nomad/nomad/structs" "github.com/mcuadros/go-syslog" @@ -38,7 +39,7 @@ type LogCollectorContext struct { // SyslogCollectorState holds the address and islation information of a launched // syslog server type SyslogCollectorState struct { - IsolationConfig *IsolationConfig + IsolationConfig *executor.IsolationConfig Addr string } @@ -50,10 +51,6 @@ type LogCollector interface { UpdateLogConfig(logConfig *structs.LogConfig) error } -// IsolationConfig has the cgroup related information of a syslog server -type IsolationConfig struct { -} - // SyslogCollector is a LogCollector which starts a syslog server and does // rotation to incoming stream type SyslogCollector struct { From c08f9d7731161a6c351c9c0feabc86a47e524b14 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 16:13:13 -0800 Subject: [PATCH 20/31] Changed the logic of the logrotator to make updates easier --- client/driver/logrotator/logs.go | 14 ++++++++------ client/driver/syslog/collector.go | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/client/driver/logrotator/logs.go b/client/driver/logrotator/logs.go index 2cebf1e5d..ae7833021 100644 --- a/client/driver/logrotator/logs.go +++ b/client/driver/logrotator/logs.go @@ -67,15 +67,15 @@ func (l *LogRotator) Start(r io.Reader) error { buf := make([]byte, bufSize) for { logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx)) - remainingSize := l.FileSize + var fileSize int64 if f, err := os.Stat(logFileName); err == nil { // Skipping the current file if it happens to be a directory if f.IsDir() { l.logFileIdx += 1 continue } + fileSize = f.Size() // Calculating the remaining capacity of the log file - remainingSize = l.FileSize - f.Size() } f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { @@ -84,17 +84,19 @@ func (l *LogRotator) Start(r io.Reader) error { l.logger.Printf("[DEBUG] client.logrotator: opened a new file: %s", logFileName) // Closing the current log file if it doesn't have any more capacity - if remainingSize <= 0 { - l.logFileIdx = l.logFileIdx + 1 + if fileSize >= l.FileSize { + l.logFileIdx += 1 f.Close() continue } // Reading from the reader and writing into the current log file as long // as it has capacity or the reader closes + totalWritten := 0 for { var nr int var err error + remainingSize := l.FileSize - (int64(totalWritten) + fileSize) if remainingSize < bufSize { nr, err = r.Read(buf[0:remainingSize]) } else { @@ -113,8 +115,8 @@ func (l *LogRotator) Start(r io.Reader) error { f.Close() return fmt.Errorf("failed to write data read from the reader into file, R: %d W: %d", nr, nw) } - remainingSize -= int64(nr) - if remainingSize < 1 { + totalWritten += nw + if l.FileSize-(fileSize+int64(totalWritten)) < 1 { f.Close() break } diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go index 0ecc8385f..510c69813 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/syslog/collector.go @@ -153,7 +153,7 @@ func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error { } -// configureTaskDir sets the task dir in the executor +// configureTaskDir sets the task dir in the SyslogCollector func (s *SyslogCollector) configureTaskDir() error { taskDir, ok := s.ctx.AllocDir.TaskDirs[s.ctx.TaskName] if !ok { From 002dee9b3e26803fc3b1f1f6c9830987212e9a0e Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 16:40:36 -0800 Subject: [PATCH 21/31] Handling errors when client can't re-attach to syslog collector --- api/tasks.go | 4 ++-- client/driver/docker.go | 9 ++++++++- client/driver/syslog/collector.go | 4 +++- client/driver/syslog/parser.go | 12 ++++++------ 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index 8b2ae3927..cdcf4e5d4 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -76,8 +76,8 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup { // LogConfig provides configuration for log rotation type LogConfig struct { - MaxFiles int `mapstructure:"max_files"` - MaxFileSizeMB int `mapstructure:"max_file_size"` + MaxFiles int + MaxFileSizeMB int } // Task is a single process in a task group. diff --git a/client/driver/docker.go b/client/driver/docker.go index 44531e4ee..5aab357aa 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -620,7 +620,6 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er Reattach: pid.PluginConfig.PluginConfig(), } - logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) client, err := d.dockerClient() if err != nil { return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) @@ -645,6 +644,14 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er if !found { return nil, fmt.Errorf("Failed to find container %s: %v", pid.ContainerID, err) } + logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + d.logger.Printf("[INFO] driver.docker: couldn't re-attach to the plugin process: %v", err) + if e := client.StopContainer(pid.ContainerID, uint(pid.KillTimeout*time.Second)); e != nil { + d.logger.Printf("[DEBUG] driver.docker: couldn't stop container: %v", e) + } + return nil, err + } // Return a driver handle h := &DockerHandle{ diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go index 510c69813..c4e173e8f 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/syslog/collector.go @@ -78,7 +78,7 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl if err != nil { return nil, err } - s.logger.Printf("sylog-server: launching syslog server on addr: %v", addr) + s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", addr) s.ctx = ctx // configuring the task dir if err := s.configureTaskDir(); err != nil { @@ -117,6 +117,8 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl go func(channel syslog.LogPartsChannel) { for logParts := range channel { + // If the severity of the log line is err then we write to stderr + // otherwise all messages go to stdout s := logParts["severity"].(s1.Priority) if s == s1.LOG_ERR { we.Write(logParts["content"].([]byte)) diff --git a/client/driver/syslog/parser.go b/client/driver/syslog/parser.go index f7159e3d3..807701c04 100644 --- a/client/driver/syslog/parser.go +++ b/client/driver/syslog/parser.go @@ -29,9 +29,9 @@ const ( // Priority holds all the priority bits in a syslog log line type Priority struct { - P syslog.Priority - F syslog.Priority - S syslog.Priority + Pri int + Facility syslog.Priority + Severity syslog.Priority } // DockerLogParser parses a line of log message that the docker daemon ships @@ -136,9 +136,9 @@ func (d *DockerLogParser) newPriority(p int) Priority { // The Priority value is calculated by first multiplying the Facility // number by 8 and then adding the numerical value of the Severity. return Priority{ - P: syslog.Priority(p), - F: syslog.Priority(p / 8), - S: syslog.Priority(p % 8), + Pri: p, + Facility: syslog.Priority(p / 8), + Severity: syslog.Priority(p % 8), } } From 6f9a67c6389a051047d525c28d6534c0b80ff18d Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 16:44:31 -0800 Subject: [PATCH 22/31] Made a DefaultLogConfig method --- jobspec/parse.go | 2 +- nomad/structs/structs.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/jobspec/parse.go b/jobspec/parse.go index db052942c..ffef6bfef 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -486,7 +486,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l } // If we have logs then parse that - logConfig := &structs.LogConfig{MaxFiles: 10, MaxFileSizeMB: 10} + logConfig := structs.DefaultLogConfig() if o := listVal.Filter("logs"); len(o.Items) > 0 { if len(o.Items) > 1 { return fmt.Errorf("only one logs block is allowed in a Task. Number of logs block found: %d", len(o.Items)) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 88991884c..a8caefd2b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1451,6 +1451,13 @@ type LogConfig struct { MaxFileSizeMB int `mapstructure:"max_file_size"` } +func DefaultLogConfig() *LogConfig { + return &LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + } +} + // MeetsMinResources returns an error if the log config specified are less than // the minimum allowed. func (l *LogConfig) MeetsMinResources() error { From 1d79ba7a18ffa179bbe17081f3f1c6badf7b438e Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 17:34:14 -0800 Subject: [PATCH 23/31] Added a test to check if file size update catches up with rotation --- client/driver/logrotator/logs.go | 10 +++---- client/driver/logrotator/logs_test.go | 41 +++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/client/driver/logrotator/logs.go b/client/driver/logrotator/logs.go index ae7833021..161db2dc1 100644 --- a/client/driver/logrotator/logs.go +++ b/client/driver/logrotator/logs.go @@ -94,6 +94,10 @@ func (l *LogRotator) Start(r io.Reader) error { // as it has capacity or the reader closes totalWritten := 0 for { + if l.FileSize-(fileSize+int64(totalWritten)) < 1 { + f.Close() + break + } var nr int var err error remainingSize := l.FileSize - (int64(totalWritten) + fileSize) @@ -115,11 +119,7 @@ func (l *LogRotator) Start(r io.Reader) error { f.Close() return fmt.Errorf("failed to write data read from the reader into file, R: %d W: %d", nr, nw) } - totalWritten += nw - if l.FileSize-(fileSize+int64(totalWritten)) < 1 { - f.Close() - break - } + totalWritten += nr } l.logFileIdx = l.logFileIdx + 1 } diff --git a/client/driver/logrotator/logs_test.go b/client/driver/logrotator/logs_test.go index 12128fb96..fd7252b80 100644 --- a/client/driver/logrotator/logs_test.go +++ b/client/driver/logrotator/logs_test.go @@ -246,3 +246,44 @@ func TestLogRotator_PurgeDirs(t *testing.T) { t.Fatalf("expected number of files: %v, actual: %v", 2, len(files)) } } + +func TestLogRotator_UpdateConfig(t *testing.T) { + var path string + var err error + defer os.RemoveAll(path) + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + + l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + r, w := io.Pipe() + go func() { + w.Write([]byte("abcdefg")) + l.FileSize = 5 + w.Write([]byte("hijklmnojkp")) + w.Close() + }() + err = l.Start(r) + if err != nil && err != io.EOF { + t.Fatalf("Failure in logrotator start %v", err) + } + + finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")) + finfo1, err1 := os.Stat(filepath.Join(path, "redis.stdout.1")) + if err != nil { + t.Fatal(err) + } + if finfo.Size() != 10 { + t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size()) + } + if err1 != nil { + t.Fatal(err) + } + if finfo1.Size() != 5 { + t.Fatalf("expected size of file: %v, actual: %v", 5, finfo.Size()) + } +} From f3261ac9be229fb668b4c363cf27e7d685ae52a3 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 17:46:35 -0800 Subject: [PATCH 24/31] Added a LogConfig to the mock tasks --- nomad/mock/mock.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index c0d6705ef..233bad589 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -102,6 +102,7 @@ func Job() *structs.Job { PortLabel: "admin", }, }, + LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, @@ -177,6 +178,7 @@ func SystemJob() *structs.Job { }, }, }, + LogConfig: structs.DefaultLogConfig(), }, }, }, From 6c27ae6b9139c4543759fcde1ddd700142bcffa2 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 18:47:46 -0800 Subject: [PATCH 25/31] Renamed the syslog package to logcollector --- client/driver/docker.go | 6 +++--- .../driver/{syslog => logcollector}/collector.go | 6 +++--- client/driver/{syslog => logcollector}/parser.go | 2 +- .../driver/{syslog => logcollector}/parser_test.go | 6 +++--- client/driver/syslog_plugin.go | 14 +++++++------- client/driver/utils.go | 6 +++--- 6 files changed, 20 insertions(+), 20 deletions(-) rename client/driver/{syslog => logcollector}/collector.go (98%) rename client/driver/{syslog => logcollector}/parser.go (99%) rename client/driver/{syslog => logcollector}/parser_test.go (90%) diff --git a/client/driver/docker.go b/client/driver/docker.go index 5aab357aa..33d6184a3 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -19,8 +19,8 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/logcollector" cstructs "github.com/hashicorp/nomad/client/driver/structs" - "github.com/hashicorp/nomad/client/driver/syslog" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" @@ -82,7 +82,7 @@ type dockerPID struct { type DockerHandle struct { pluginClient *plugin.Client - logCollector syslog.LogCollector + logCollector logcollector.LogCollector client *docker.Client logger *log.Logger cleanupContainer bool @@ -503,7 +503,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle if err != nil { return nil, err } - logCollectorCtx := &syslog.LogCollectorContext{ + logCollectorCtx := &logcollector.LogCollectorContext{ TaskName: task.Name, AllocDir: ctx.AllocDir, LogConfig: task.LogConfig, diff --git a/client/driver/syslog/collector.go b/client/driver/logcollector/collector.go similarity index 98% rename from client/driver/syslog/collector.go rename to client/driver/logcollector/collector.go index c4e173e8f..e833465f5 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/logcollector/collector.go @@ -1,4 +1,4 @@ -package syslog +package logcollector import ( "fmt" @@ -119,8 +119,8 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl for logParts := range channel { // If the severity of the log line is err then we write to stderr // otherwise all messages go to stdout - s := logParts["severity"].(s1.Priority) - if s == s1.LOG_ERR { + s := logParts["severity"].(Priority) + if s.Severity == s1.LOG_ERR { we.Write(logParts["content"].([]byte)) } else { wo.Write(logParts["content"].([]byte)) diff --git a/client/driver/syslog/parser.go b/client/driver/logcollector/parser.go similarity index 99% rename from client/driver/syslog/parser.go rename to client/driver/logcollector/parser.go index 807701c04..eeb910a6f 100644 --- a/client/driver/syslog/parser.go +++ b/client/driver/logcollector/parser.go @@ -1,4 +1,4 @@ -package syslog +package logcollector import ( "bufio" diff --git a/client/driver/syslog/parser_test.go b/client/driver/logcollector/parser_test.go similarity index 90% rename from client/driver/syslog/parser_test.go rename to client/driver/logcollector/parser_test.go index 89f5b866b..2c62049c5 100644 --- a/client/driver/syslog/parser_test.go +++ b/client/driver/logcollector/parser_test.go @@ -1,4 +1,4 @@ -package syslog +package logcollector import ( "log/syslog" @@ -12,8 +12,8 @@ func TestLogParser_Priority(t *testing.T) { if err != nil { t.Fatalf("got an err: %v", err) } - if p.S != syslog.LOG_INFO { - t.Fatalf("expected serverity: %v, got: %v", syslog.LOG_INFO, p.S) + if p.Severity != syslog.LOG_INFO { + t.Fatalf("expected serverity: %v, got: %v", syslog.LOG_INFO, p.Severity) } idx := d.logContentIndex(line) diff --git a/client/driver/syslog_plugin.go b/client/driver/syslog_plugin.go index 2a0f104b7..b4cee69fb 100644 --- a/client/driver/syslog_plugin.go +++ b/client/driver/syslog_plugin.go @@ -5,7 +5,7 @@ import ( "net/rpc" "github.com/hashicorp/go-plugin" - "github.com/hashicorp/nomad/client/driver/syslog" + "github.com/hashicorp/nomad/client/driver/logcollector" "github.com/hashicorp/nomad/nomad/structs" ) @@ -14,11 +14,11 @@ type SyslogCollectorRPC struct { } type LaunchCollectorArgs struct { - Ctx *syslog.LogCollectorContext + Ctx *logcollector.LogCollectorContext } -func (e *SyslogCollectorRPC) LaunchCollector(ctx *syslog.LogCollectorContext) (*syslog.SyslogCollectorState, error) { - var ss *syslog.SyslogCollectorState +func (e *SyslogCollectorRPC) LaunchCollector(ctx *logcollector.LogCollectorContext) (*logcollector.SyslogCollectorState, error) { + var ss *logcollector.SyslogCollectorState err := e.client.Call("Plugin.LaunchCollector", LaunchCollectorArgs{Ctx: ctx}, &ss) return ss, err } @@ -32,11 +32,11 @@ func (e *SyslogCollectorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error } type SyslogCollectorRPCServer struct { - Impl syslog.LogCollector + Impl logcollector.LogCollector } func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs, - resp *syslog.SyslogCollectorState) error { + resp *logcollector.SyslogCollectorState) error { ss, err := s.Impl.LaunchCollector(args.Ctx) if ss != nil { *resp = *ss @@ -59,7 +59,7 @@ type SyslogCollectorPlugin struct { func (p *SyslogCollectorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { if p.Impl == nil { - p.Impl = &SyslogCollectorRPCServer{Impl: syslog.NewSyslogCollector(p.logger)} + p.Impl = &SyslogCollectorRPCServer{Impl: logcollector.NewSyslogCollector(p.logger)} } return p.Impl, nil } diff --git a/client/driver/utils.go b/client/driver/utils.go index 21b83d033..bfb923e1b 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -9,7 +9,7 @@ import ( "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" - "github.com/hashicorp/nomad/client/driver/syslog" + "github.com/hashicorp/nomad/client/driver/logcollector" ) // createExecutor launches an executor plugin and returns an instance of the @@ -42,7 +42,7 @@ func createExecutor(config *plugin.ClientConfig, w io.Writer, } func createLogCollector(config *plugin.ClientConfig, w io.Writer, - clientConfig *config.Config) (syslog.LogCollector, *plugin.Client, error) { + clientConfig *config.Config) (logcollector.LogCollector, *plugin.Client, error) { config.HandshakeConfig = HandshakeConfig config.Plugins = GetPluginMap(w) config.MaxPort = clientConfig.ClientMaxPort @@ -61,7 +61,7 @@ func createLogCollector(config *plugin.ClientConfig, w io.Writer, if err != nil { return nil, nil, fmt.Errorf("unable to dispense the syslog plugin: %v", err) } - logCollector := raw.(syslog.LogCollector) + logCollector := raw.(logcollector.LogCollector) return logCollector, syslogClient, nil } From 38ec33c508d30b924f87c9f83554ab7431a7e481 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 11 Feb 2016 10:42:56 -0800 Subject: [PATCH 26/31] Fixed some tests --- client/driver/logcollector/collector.go | 1 - command/util_test.go | 4 ++++ nomad/structs/structs_test.go | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/client/driver/logcollector/collector.go b/client/driver/logcollector/collector.go index e833465f5..3fdc90896 100644 --- a/client/driver/logcollector/collector.go +++ b/client/driver/logcollector/collector.go @@ -152,7 +152,6 @@ func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error { s.lre.MaxFiles = logConfig.MaxFiles s.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) return nil - } // configureTaskDir sets the task dir in the SyslogCollector diff --git a/command/util_test.go b/command/util_test.go index c9bd741f6..a94497c37 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -44,6 +44,10 @@ func testJob(jobID string) *api.Job { MemoryMB: 256, DiskMB: 20, CPU: 100, + }). + SetLogConfig(&api.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, }) group := api.NewTaskGroup("group1", 1). diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index dfbca2ad1..51a3603bf 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -258,6 +258,7 @@ func TestTask_Validate(t *testing.T) { MemoryMB: 100, IOPS: 10, }, + LogConfig: DefaultLogConfig(), } err = task.Validate() if err != nil { From 95284307c54ee1728e20844ee87de6d68573ac02 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 11 Feb 2016 10:58:18 -0800 Subject: [PATCH 27/31] Fixed the executor test --- client/driver/executor/executor_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/driver/executor/executor_test.go b/client/driver/executor/executor_test.go index 26876e7e9..948af699f 100644 --- a/client/driver/executor/executor_test.go +++ b/client/driver/executor/executor_test.go @@ -50,6 +50,7 @@ func testExecutorContext(t *testing.T) *ExecutorContext { TaskName: taskName, AllocDir: allocDir, TaskResources: constraint, + LogConfig: structs.DefaultLogConfig(), } return ctx } From 961abeef503369190973ccff3451405d26fc1a66 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 11 Feb 2016 11:13:45 -0800 Subject: [PATCH 28/31] Fixed more client executor tests --- client/driver/executor/executor_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/driver/executor/executor_test.go b/client/driver/executor/executor_test.go index 948af699f..609e7e64a 100644 --- a/client/driver/executor/executor_test.go +++ b/client/driver/executor/executor_test.go @@ -85,7 +85,7 @@ func TestExecutor_Start_Wait_Failure_Code(t *testing.T) { func TestExecutor_Start_Wait(t *testing.T) { execCmd := ExecCommand{Cmd: "/bin/echo", Args: []string{"hello world"}} ctx := testExecutorContext(t) - defer ctx.AllocDir.Destroy() + //defer ctx.AllocDir.Destroy() executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) ps, err := executor.LaunchCmd(&execCmd, ctx) if err != nil { @@ -106,7 +106,7 @@ func TestExecutor_Start_Wait(t *testing.T) { } expected := "hello world" - file := filepath.Join(allocdir.TaskLocal, "web.stdout") + file := filepath.Join(allocdir.TaskLocal, "web.stdout.0") absFilePath := filepath.Join(taskDir, file) output, err := ioutil.ReadFile(absFilePath) if err != nil { @@ -150,7 +150,7 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) { } expected := "hello world" - file := filepath.Join(allocdir.TaskLocal, "web.stdout") + file := filepath.Join(allocdir.TaskLocal, "web.stdout.0") absFilePath := filepath.Join(taskDir, file) output, err := ioutil.ReadFile(absFilePath) if err != nil { @@ -186,7 +186,7 @@ func TestExecutor_Start_Kill(t *testing.T) { t.Fatalf("No task directory found for task %v", task) } - file := filepath.Join(allocdir.TaskLocal, "web.stdout") + file := filepath.Join(allocdir.TaskLocal, "web.stdout.0") absFilePath := filepath.Join(taskDir, file) time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second) From 2d9ea67359e9681649491659ef965382a267e45f Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 11 Feb 2016 12:21:19 -0800 Subject: [PATCH 29/31] Added the log config to init example --- command/init.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/command/init.go b/command/init.go index a0b7d988f..9b24a2f36 100644 --- a/command/init.go +++ b/command/init.go @@ -158,7 +158,13 @@ job "example" { } } } - + + # Specify configuration related to log rotation + # logs { + # max_files = 10 + # max_file_size = 15 + # } + # Controls the timeout between signalling a task it will be killed # and killing the task. If not set a default is used. # kill_timeout = "20s" From 8f17f00a315efbcdb9fae07bdff5df632e769e20 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 11 Feb 2016 12:30:47 -0800 Subject: [PATCH 30/31] Added a validator for log storage --- nomad/structs/structs.go | 4 ++++ nomad/structs/structs_test.go | 15 +++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a8caefd2b..ba91d8fdc 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1683,6 +1683,10 @@ func (t *Task) Validate() error { mErr.Errors = append(mErr.Errors, err) } } + + if t.Resources.DiskMB <= (t.LogConfig.MaxFiles * t.LogConfig.MaxFileSizeMB) { + mErr.Errors = append(mErr.Errors, fmt.Errorf("log storage exceeds requested disk capacity")) + } return mErr.ErrorOrNil() } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 51a3603bf..b11578728 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -266,6 +266,21 @@ func TestTask_Validate(t *testing.T) { } } +func TestTask_Validate_LogConfig(t *testing.T) { + task := &Task{ + LogConfig: DefaultLogConfig(), + Resources: &Resources{ + DiskMB: 1, + }, + } + + err := task.Validate() + mErr := err.(*multierror.Error) + if !strings.Contains(mErr.Errors[3].Error(), "log storage") { + t.Fatalf("err: %s", err) + } +} + func TestConstraint_Validate(t *testing.T) { c := &Constraint{} err := c.Validate() From 26d308dcb8b5848c5b90d1d34bd02e2662bf7e17 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 11 Feb 2016 14:44:35 -0800 Subject: [PATCH 31/31] Fixed an issue with purge --- client/driver/docker.go | 4 +- client/driver/logcollector/collector.go | 2 + client/driver/logrotator/logs.go | 70 ++++++++++++++++--------- client/driver/logrotator/logs_test.go | 15 ++++-- nomad/structs/structs.go | 14 ++--- 5 files changed, 68 insertions(+), 37 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 33d6184a3..60bce0fc9 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -697,7 +697,9 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult { func (h *DockerHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout - h.logCollector.UpdateLogConfig(task.LogConfig) + if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil { + h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err) + } // Update is not possible return nil diff --git a/client/driver/logcollector/collector.go b/client/driver/logcollector/collector.go index 3fdc90896..fab901428 100644 --- a/client/driver/logcollector/collector.go +++ b/client/driver/logcollector/collector.go @@ -104,6 +104,7 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl if err != nil { return nil, err } + s.lro = lro go lro.Start(ro) re, we := io.Pipe() @@ -113,6 +114,7 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl if err != nil { return nil, err } + s.lre = lre go lre.Start(re) go func(channel syslog.LogPartsChannel) { diff --git a/client/driver/logrotator/logs.go b/client/driver/logrotator/logs.go index 161db2dc1..a9eecf983 100644 --- a/client/driver/logrotator/logs.go +++ b/client/driver/logrotator/logs.go @@ -23,9 +23,11 @@ type LogRotator struct { path string // path where the rotated files are created fileName string // base file name of the rotated files - logFileIdx int // index to the current file + logFileIdx int // index to the current file + oldestLogFileIdx int // index to the oldest log file - logger *log.Logger + logger *log.Logger + purgeCh chan struct{} } // NewLogRotator configures and returns a new LogRotator @@ -51,14 +53,18 @@ func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, l } } - return &LogRotator{ + lr := &LogRotator{ MaxFiles: maxFiles, FileSize: fileSize, path: path, fileName: fileName, logFileIdx: logFileIdx, logger: logger, - }, nil + purgeCh: make(chan struct{}, 1), + } + go lr.PurgeOldFiles() + + return lr, nil } // Start reads from a Reader and writes them to files and rotates them when the @@ -122,6 +128,13 @@ func (l *LogRotator) Start(r io.Reader) error { totalWritten += nr } l.logFileIdx = l.logFileIdx + 1 + // Purge old files if we have more files than MaxFiles + if l.logFileIdx-l.oldestLogFileIdx >= l.MaxFiles { + select { + case l.purgeCh <- struct{}{}: + default: + } + } } return nil } @@ -129,29 +142,36 @@ func (l *LogRotator) Start(r io.Reader) error { // PurgeOldFiles removes older files and keeps only the last N files rotated for // a file func (l *LogRotator) PurgeOldFiles() { - var fIndexes []int - files, err := ioutil.ReadDir(l.path) - if err != nil { - return - } - // Inserting all the rotated files in a slice - for _, f := range files { - if strings.HasPrefix(f.Name(), l.fileName) { - fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName)) - n, err := strconv.Atoi(fileIdx) + for { + select { + case <-l.purgeCh: + var fIndexes []int + files, err := ioutil.ReadDir(l.path) if err != nil { - continue + return } - fIndexes = append(fIndexes, n) + // Inserting all the rotated files in a slice + for _, f := range files { + if strings.HasPrefix(f.Name(), l.fileName) { + fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName)) + n, err := strconv.Atoi(fileIdx) + if err != nil { + continue + } + fIndexes = append(fIndexes, n) + } + } + + // Sorting the file indexes so that we can purge the older files and keep + // only the number of files as configured by the user + sort.Sort(sort.IntSlice(fIndexes)) + var toDelete []int + toDelete = fIndexes[0 : len(fIndexes)-l.MaxFiles] + for _, fIndex := range toDelete { + fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) + os.RemoveAll(fname) + } + l.oldestLogFileIdx = fIndexes[0] } } - - // Sorting the file indexes so that we can purge the older files and keep - // only the number of files as configured by the user - sort.Sort(sort.IntSlice(fIndexes)) - toDelete := fIndexes[l.MaxFiles-1 : len(fIndexes)-1] - for _, fIndex := range toDelete { - fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) - os.RemoveAll(fname) - } } diff --git a/client/driver/logrotator/logs_test.go b/client/driver/logrotator/logs_test.go index fd7252b80..b497ab993 100644 --- a/client/driver/logrotator/logs_test.go +++ b/client/driver/logrotator/logs_test.go @@ -1,12 +1,14 @@ package logrotator import ( + "fmt" "io" "io/ioutil" "log" "os" "path/filepath" "testing" + "time" ) var ( @@ -228,7 +230,10 @@ func TestLogRotator_PurgeDirs(t *testing.T) { r, w := io.Pipe() go func() { - w.Write([]byte("abcdefghijklmno")) + w.Write([]byte("abcdefghijklmnopqrxyz")) + time.Sleep(1 * time.Second) + l.MaxFiles = 1 + w.Write([]byte("abcdefghijklmnopqrxyz")) w.Close() }() @@ -236,14 +241,16 @@ func TestLogRotator_PurgeDirs(t *testing.T) { if err != nil && err != io.EOF { t.Fatalf("failure in logrotator start: %v", err) } - l.PurgeOldFiles() + // sleeping for a second because purging is async + time.Sleep(1 * time.Second) files, err := ioutil.ReadDir(path) if err != nil { t.Fatalf("err: %v", err) } - if len(files) != 2 { - t.Fatalf("expected number of files: %v, actual: %v", 2, len(files)) + expected := 1 + if len(files) != expected { + t.Fatalf("expected number of files: %v, actual: %v", expected, len(files)) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ba91d8fdc..f04e96fe2 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1458,15 +1458,15 @@ func DefaultLogConfig() *LogConfig { } } -// MeetsMinResources returns an error if the log config specified are less than +// Validate returns an error if the log config specified are less than // the minimum allowed. -func (l *LogConfig) MeetsMinResources() error { +func (l *LogConfig) Validate() error { var mErr multierror.Error - if l.MaxFiles < 10 { - mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum number of files is 10; got %d", l.MaxFiles)) + if l.MaxFiles < 1 { + mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum number of files is 1; got %d", l.MaxFiles)) } - if l.MaxFileSizeMB < 10 { - mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum file size is 10MB; got %d", l.MaxFileSizeMB)) + if l.MaxFileSizeMB < 1 { + mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum file size is 1MB; got %d", l.MaxFileSizeMB)) } return mErr.ErrorOrNil() } @@ -1667,7 +1667,7 @@ func (t *Task) Validate() error { // Validate the log config if t.LogConfig == nil { mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config")) - } else if err := t.LogConfig.MeetsMinResources(); err != nil { + } else if err := t.LogConfig.Validate(); err != nil { mErr.Errors = append(mErr.Errors, err) }