diff --git a/client/driver/docker.go b/client/driver/docker.go index 3841572eb..ac2f1149d 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -180,7 +180,8 @@ func (d *DockerDriver) containerBinds(alloc *allocdir.AllocDir, task *structs.Ta } // createContainer initializes a struct needed to call docker.client.CreateContainer() -func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, driverConfig *DockerDriverConfig) (docker.CreateContainerOptions, error) { +func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, + driverConfig *DockerDriverConfig, syslogAddr string) (docker.CreateContainerOptions, error) { var c docker.CreateContainerOptions if task.Resources == nil { // Guard against missing resources. We should never have been able to @@ -238,10 +239,9 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri // used to share data between different tasks in the same task group. Binds: binds, LogConfig: docker.LogConfig{ - Type: "json-file", + Type: "syslog", Config: map[string]string{ - "max-size": fmt.Sprintf("%dm", task.LogConfig.MaxFileSizeMB), - "max-file": strconv.Itoa(task.LogConfig.MaxFiles), + "syslog-address": fmt.Sprintf("tcp://%v", syslogAddr), }, }, } @@ -489,11 +489,6 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID) - syslogAddr, err := getFreePort(d.config.ClientMinPort, d.config.ClientMaxPort) - if err != nil { - return nil, fmt.Errorf("error creating the syslog plugin: %v", err) - } - bin, err := discover.NomadExecutable() if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) @@ -508,15 +503,19 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return nil, err } logCollectorCtx := &syslog.LogCollectorContext{ - TaskName: task.Name, - AllocDir: ctx.AllocDir, - LogConfig: task.LogConfig, + TaskName: task.Name, + AllocDir: ctx.AllocDir, + LogConfig: task.LogConfig, + PortLowerBound: d.config.ClientMinPort, + PortUpperBound: d.config.ClientMaxPort, } - if _, err := logCollector.LaunchCollector(syslogAddr, logCollectorCtx); err != nil { + ss, err := logCollector.LaunchCollector(logCollectorCtx) + if err != nil { return nil, fmt.Errorf("failed to start syslog collector: %v", err) } + d.logger.Printf("Started the syslog server at %v", ss.Addr) - config, err := d.createContainer(ctx, task, &driverConfig) + config, err := d.createContainer(ctx, task, &driverConfig, ss.Addr) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err) return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err) diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go index 44610537a..e59db74a7 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/syslog/collector.go @@ -2,10 +2,10 @@ package syslog import ( "fmt" - // "io" + "io" "log" "net" - // "path/filepath" + "path/filepath" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/logrotator" @@ -14,17 +14,20 @@ import ( ) type LogCollectorContext struct { - TaskName string - AllocDir *allocdir.AllocDir - LogConfig *structs.LogConfig + TaskName string + AllocDir *allocdir.AllocDir + LogConfig *structs.LogConfig + PortUpperBound uint + PortLowerBound uint } type SyslogCollectorState struct { IsolationConfig *IsolationConfig + Addr string } type LogCollector interface { - LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error) + LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) Exit() error UpdateLogConfig(logConfig *structs.LogConfig) error } @@ -49,7 +52,11 @@ func NewSyslogCollector(logger *log.Logger) *SyslogCollector { return &SyslogCollector{logger: logger} } -func (s *SyslogCollector) LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error) { +func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) { + addr, err := s.getFreePort(ctx.PortLowerBound, ctx.PortUpperBound) + if err != nil { + return nil, err + } s.logger.Printf("sylog-server: launching syslog server on addr: %v", addr) s.ctx = ctx // configuring the task dir @@ -61,30 +68,30 @@ func (s *SyslogCollector) LaunchCollector(addr net.Addr, ctx *LogCollectorContex handler := syslog.NewChannelHandler(channel) s.server = syslog.NewServer() - s.server.SetFormat(syslog.RFC5424) + s.server.SetFormat(&CustomParser{logger: s.logger}) s.server.SetHandler(handler) s.server.ListenTCP(addr.String()) if err := s.server.Boot(); err != nil { return nil, err } - // r, w := io.Pipe() - // logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) - // lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), - // fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, - // logFileSize, s.logger) - // if err != nil { - // return err - // } - // go lro.Start(r) + r, w := io.Pipe() + logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) + lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), + fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, + logFileSize, s.logger) + if err != nil { + return nil, err + } + go lro.Start(r) go func(channel syslog.LogPartsChannel) { for logParts := range channel { - s.logger.Printf("logparts: %v", logParts) - // w.Write([]byte(logParts)) + message := logParts["content"].(string) + w.Write([]byte(message)) } }(channel) go s.server.Wait() - return &SyslogCollectorState{}, nil + return &SyslogCollectorState{Addr: addr.String()}, nil } func (s *SyslogCollector) Exit() error { @@ -104,3 +111,21 @@ func (s *SyslogCollector) configureTaskDir() error { s.taskDir = taskDir return nil } + +// getFreePort returns a free port ready to be listened on between upper and +// lower bounds +func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) { + for i := lowerBound; i <= upperBound; i++ { + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) + if err != nil { + return nil, err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + continue + } + defer l.Close() + return l.Addr(), nil + } + return nil, fmt.Errorf("No free port found") +} diff --git a/client/driver/syslog/parser.go b/client/driver/syslog/parser.go new file mode 100644 index 000000000..6c43fb986 --- /dev/null +++ b/client/driver/syslog/parser.go @@ -0,0 +1,44 @@ +package syslog + +import ( + "bufio" + "log" + "time" + + "github.com/jeromer/syslogparser" +) + +type DockerLogParser struct { + line []byte + + log *log.Logger +} + +func NewDockerLogParser(line []byte) *DockerLogParser { + return &DockerLogParser{line: line} +} + +func (d *DockerLogParser) Parse() error { + return nil +} + +func (d *DockerLogParser) Dump() syslogparser.LogParts { + return map[string]interface{}{ + "content": string(d.line), + } +} + +func (d *DockerLogParser) Location(location *time.Location) { +} + +type CustomParser struct { + logger *log.Logger +} + +func (c *CustomParser) GetParser(line []byte) syslogparser.LogParser { + return NewDockerLogParser(line) +} + +func (c *CustomParser) GetSplitFunc() bufio.SplitFunc { + return nil +} diff --git a/client/driver/syslog_plugin.go b/client/driver/syslog_plugin.go index f999ef545..2a0f104b7 100644 --- a/client/driver/syslog_plugin.go +++ b/client/driver/syslog_plugin.go @@ -2,7 +2,6 @@ package driver import ( "log" - "net" "net/rpc" "github.com/hashicorp/go-plugin" @@ -15,16 +14,12 @@ type SyslogCollectorRPC struct { } type LaunchCollectorArgs struct { - AddrNet string - AddrName string - Ctx *syslog.LogCollectorContext + Ctx *syslog.LogCollectorContext } -func (e *SyslogCollectorRPC) LaunchCollector(addr net.Addr, - ctx *syslog.LogCollectorContext) (*syslog.SyslogCollectorState, error) { +func (e *SyslogCollectorRPC) LaunchCollector(ctx *syslog.LogCollectorContext) (*syslog.SyslogCollectorState, error) { var ss *syslog.SyslogCollectorState - err := e.client.Call("Plugin.LaunchCollector", - LaunchCollectorArgs{AddrNet: addr.Network(), AddrName: addr.String(), Ctx: ctx}, &ss) + err := e.client.Call("Plugin.LaunchCollector", LaunchCollectorArgs{Ctx: ctx}, &ss) return ss, err } @@ -42,9 +37,8 @@ type SyslogCollectorRPCServer struct { func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs, resp *syslog.SyslogCollectorState) error { - addr, _ := net.ResolveTCPAddr(args.AddrNet, args.AddrName) - ss, err := s.Impl.LaunchCollector(addr, args.Ctx) - if err != nil { + ss, err := s.Impl.LaunchCollector(args.Ctx) + if ss != nil { *resp = *ss } return err diff --git a/client/driver/utils.go b/client/driver/utils.go index c3e253eda..21b83d033 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -3,7 +3,6 @@ package driver import ( "fmt" "io" - "net" "os" "github.com/hashicorp/go-multierror" @@ -66,24 +65,6 @@ func createLogCollector(config *plugin.ClientConfig, w io.Writer, return logCollector, syslogClient, nil } -// getFreePort returns a free port ready to be listened on between upper and -// lower bounds -func getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) { - for i := lowerBound; i <= upperBound; i++ { - addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) - if err != nil { - return nil, err - } - l, err := net.ListenTCP("tcp", addr) - if err != nil { - continue - } - defer l.Close() - return l.Addr(), nil - } - return nil, fmt.Errorf("No free port found") -} - // killProcess kills a process with the given pid func killProcess(pid int) error { proc, err := os.FindProcess(pid)