Writing log lines from docker to files

This commit is contained in:
Diptanu Choudhury
2016-02-10 07:52:15 -08:00
parent 7eedcbfcf6
commit 4963026eab
5 changed files with 107 additions and 64 deletions

View File

@@ -180,7 +180,8 @@ func (d *DockerDriver) containerBinds(alloc *allocdir.AllocDir, task *structs.Ta
}
// createContainer initializes a struct needed to call docker.client.CreateContainer()
func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, driverConfig *DockerDriverConfig) (docker.CreateContainerOptions, error) {
func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task,
driverConfig *DockerDriverConfig, syslogAddr string) (docker.CreateContainerOptions, error) {
var c docker.CreateContainerOptions
if task.Resources == nil {
// Guard against missing resources. We should never have been able to
@@ -238,10 +239,9 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri
// used to share data between different tasks in the same task group.
Binds: binds,
LogConfig: docker.LogConfig{
Type: "json-file",
Type: "syslog",
Config: map[string]string{
"max-size": fmt.Sprintf("%dm", task.LogConfig.MaxFileSizeMB),
"max-file": strconv.Itoa(task.LogConfig.MaxFiles),
"syslog-address": fmt.Sprintf("tcp://%v", syslogAddr),
},
},
}
@@ -489,11 +489,6 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID)
syslogAddr, err := getFreePort(d.config.ClientMinPort, d.config.ClientMaxPort)
if err != nil {
return nil, fmt.Errorf("error creating the syslog plugin: %v", err)
}
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
@@ -508,15 +503,19 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return nil, err
}
logCollectorCtx := &syslog.LogCollectorContext{
TaskName: task.Name,
AllocDir: ctx.AllocDir,
LogConfig: task.LogConfig,
TaskName: task.Name,
AllocDir: ctx.AllocDir,
LogConfig: task.LogConfig,
PortLowerBound: d.config.ClientMinPort,
PortUpperBound: d.config.ClientMaxPort,
}
if _, err := logCollector.LaunchCollector(syslogAddr, logCollectorCtx); err != nil {
ss, err := logCollector.LaunchCollector(logCollectorCtx)
if err != nil {
return nil, fmt.Errorf("failed to start syslog collector: %v", err)
}
d.logger.Printf("Started the syslog server at %v", ss.Addr)
config, err := d.createContainer(ctx, task, &driverConfig)
config, err := d.createContainer(ctx, task, &driverConfig, ss.Addr)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err)
return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err)

View File

@@ -2,10 +2,10 @@ package syslog
import (
"fmt"
// "io"
"io"
"log"
"net"
// "path/filepath"
"path/filepath"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/logrotator"
@@ -14,17 +14,20 @@ import (
)
type LogCollectorContext struct {
TaskName string
AllocDir *allocdir.AllocDir
LogConfig *structs.LogConfig
TaskName string
AllocDir *allocdir.AllocDir
LogConfig *structs.LogConfig
PortUpperBound uint
PortLowerBound uint
}
type SyslogCollectorState struct {
IsolationConfig *IsolationConfig
Addr string
}
type LogCollector interface {
LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error)
LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error)
Exit() error
UpdateLogConfig(logConfig *structs.LogConfig) error
}
@@ -49,7 +52,11 @@ func NewSyslogCollector(logger *log.Logger) *SyslogCollector {
return &SyslogCollector{logger: logger}
}
func (s *SyslogCollector) LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error) {
func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) {
addr, err := s.getFreePort(ctx.PortLowerBound, ctx.PortUpperBound)
if err != nil {
return nil, err
}
s.logger.Printf("sylog-server: launching syslog server on addr: %v", addr)
s.ctx = ctx
// configuring the task dir
@@ -61,30 +68,30 @@ func (s *SyslogCollector) LaunchCollector(addr net.Addr, ctx *LogCollectorContex
handler := syslog.NewChannelHandler(channel)
s.server = syslog.NewServer()
s.server.SetFormat(syslog.RFC5424)
s.server.SetFormat(&CustomParser{logger: s.logger})
s.server.SetHandler(handler)
s.server.ListenTCP(addr.String())
if err := s.server.Boot(); err != nil {
return nil, err
}
// r, w := io.Pipe()
// logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
// lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
// fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles,
// logFileSize, s.logger)
// if err != nil {
// return err
// }
// go lro.Start(r)
r, w := io.Pipe()
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, s.logger)
if err != nil {
return nil, err
}
go lro.Start(r)
go func(channel syslog.LogPartsChannel) {
for logParts := range channel {
s.logger.Printf("logparts: %v", logParts)
// w.Write([]byte(logParts))
message := logParts["content"].(string)
w.Write([]byte(message))
}
}(channel)
go s.server.Wait()
return &SyslogCollectorState{}, nil
return &SyslogCollectorState{Addr: addr.String()}, nil
}
func (s *SyslogCollector) Exit() error {
@@ -104,3 +111,21 @@ func (s *SyslogCollector) configureTaskDir() error {
s.taskDir = taskDir
return nil
}
// getFreePort returns a free port ready to be listened on between upper and
// lower bounds
func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) {
for i := lowerBound; i <= upperBound; i++ {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i))
if err != nil {
return nil, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
continue
}
defer l.Close()
return l.Addr(), nil
}
return nil, fmt.Errorf("No free port found")
}

View File

@@ -0,0 +1,44 @@
package syslog
import (
"bufio"
"log"
"time"
"github.com/jeromer/syslogparser"
)
type DockerLogParser struct {
line []byte
log *log.Logger
}
func NewDockerLogParser(line []byte) *DockerLogParser {
return &DockerLogParser{line: line}
}
func (d *DockerLogParser) Parse() error {
return nil
}
func (d *DockerLogParser) Dump() syslogparser.LogParts {
return map[string]interface{}{
"content": string(d.line),
}
}
func (d *DockerLogParser) Location(location *time.Location) {
}
type CustomParser struct {
logger *log.Logger
}
func (c *CustomParser) GetParser(line []byte) syslogparser.LogParser {
return NewDockerLogParser(line)
}
func (c *CustomParser) GetSplitFunc() bufio.SplitFunc {
return nil
}

View File

@@ -2,7 +2,6 @@ package driver
import (
"log"
"net"
"net/rpc"
"github.com/hashicorp/go-plugin"
@@ -15,16 +14,12 @@ type SyslogCollectorRPC struct {
}
type LaunchCollectorArgs struct {
AddrNet string
AddrName string
Ctx *syslog.LogCollectorContext
Ctx *syslog.LogCollectorContext
}
func (e *SyslogCollectorRPC) LaunchCollector(addr net.Addr,
ctx *syslog.LogCollectorContext) (*syslog.SyslogCollectorState, error) {
func (e *SyslogCollectorRPC) LaunchCollector(ctx *syslog.LogCollectorContext) (*syslog.SyslogCollectorState, error) {
var ss *syslog.SyslogCollectorState
err := e.client.Call("Plugin.LaunchCollector",
LaunchCollectorArgs{AddrNet: addr.Network(), AddrName: addr.String(), Ctx: ctx}, &ss)
err := e.client.Call("Plugin.LaunchCollector", LaunchCollectorArgs{Ctx: ctx}, &ss)
return ss, err
}
@@ -42,9 +37,8 @@ type SyslogCollectorRPCServer struct {
func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs,
resp *syslog.SyslogCollectorState) error {
addr, _ := net.ResolveTCPAddr(args.AddrNet, args.AddrName)
ss, err := s.Impl.LaunchCollector(addr, args.Ctx)
if err != nil {
ss, err := s.Impl.LaunchCollector(args.Ctx)
if ss != nil {
*resp = *ss
}
return err

View File

@@ -3,7 +3,6 @@ package driver
import (
"fmt"
"io"
"net"
"os"
"github.com/hashicorp/go-multierror"
@@ -66,24 +65,6 @@ func createLogCollector(config *plugin.ClientConfig, w io.Writer,
return logCollector, syslogClient, nil
}
// getFreePort returns a free port ready to be listened on between upper and
// lower bounds
func getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) {
for i := lowerBound; i <= upperBound; i++ {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i))
if err != nil {
return nil, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
continue
}
defer l.Close()
return l.Addr(), nil
}
return nil, fmt.Errorf("No free port found")
}
// killProcess kills a process with the given pid
func killProcess(pid int) error {
proc, err := os.FindProcess(pid)