Starting the syslog collector along with a docker container

This commit is contained in:
Diptanu Choudhury
2016-02-09 18:24:30 -08:00
parent a92a96336c
commit 7eedcbfcf6
8 changed files with 333 additions and 5 deletions

View File

@@ -6,6 +6,7 @@ import (
"log"
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
@@ -14,10 +15,14 @@ import (
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/driver/syslog"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
@@ -75,6 +80,8 @@ type dockerPID struct {
}
type DockerHandle struct {
pluginClient *plugin.Client
logCollector syslog.LogCollector
client *docker.Client
logger *log.Logger
cleanupContainer bool
@@ -474,8 +481,41 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
}
}
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID)
syslogAddr, err := getFreePort(d.config.ClientMinPort, d.config.ClientMaxPort)
if err != nil {
return nil, fmt.Errorf("error creating the syslog plugin: %v", err)
}
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "syslog", pluginLogFile),
}
logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
logCollectorCtx := &syslog.LogCollectorContext{
TaskName: task.Name,
AllocDir: ctx.AllocDir,
LogConfig: task.LogConfig,
}
if _, err := logCollector.LaunchCollector(syslogAddr, logCollectorCtx); err != nil {
return nil, fmt.Errorf("failed to start syslog collector: %v", err)
}
config, err := d.createContainer(ctx, task, &driverConfig)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err)
@@ -543,6 +583,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
// Return a driver handle
h := &DockerHandle{
client: client,
logCollector: logCollector,
pluginClient: pluginClient,
cleanupContainer: cleanupContainer,
cleanupImage: cleanupImage,
logger: d.logger,
@@ -706,4 +748,10 @@ func (h *DockerHandle) run() {
close(h.doneCh)
h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err)
close(h.waitCh)
// Shutdown the syslog collector
if err := h.logCollector.Exit(); err != nil {
h.logger.Printf("[ERR] driver.docker: failed to kill the syslog collector: %v", err)
}
h.pluginClient.Kill()
}

View File

@@ -15,9 +15,15 @@ var HandshakeConfig = plugin.HandshakeConfig{
}
func GetPluginMap(w io.Writer) map[string]plugin.Plugin {
p := new(ExecutorPlugin)
p.logger = log.New(w, "", log.LstdFlags)
return map[string]plugin.Plugin{"executor": p}
e := new(ExecutorPlugin)
e.logger = log.New(w, "", log.LstdFlags)
s := new(SyslogCollectorPlugin)
s.logger = log.New(w, "", log.LstdFlags)
return map[string]plugin.Plugin{
"executor": e,
"syslogcollector": s,
}
}
// ExecutorReattachConfig is the config that we seralize and de-serialize and

View File

@@ -0,0 +1,106 @@
package syslog
import (
"fmt"
// "io"
"log"
"net"
// "path/filepath"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/logrotator"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mcuadros/go-syslog"
)
type LogCollectorContext struct {
TaskName string
AllocDir *allocdir.AllocDir
LogConfig *structs.LogConfig
}
type SyslogCollectorState struct {
IsolationConfig *IsolationConfig
}
type LogCollector interface {
LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error)
Exit() error
UpdateLogConfig(logConfig *structs.LogConfig) error
}
type IsolationConfig struct {
}
type SyslogCollector struct {
addr net.Addr
logConfig *structs.LogConfig
ctx *LogCollectorContext
lro *logrotator.LogRotator
lre *logrotator.LogRotator
server *syslog.Server
taskDir string
logger *log.Logger
}
func NewSyslogCollector(logger *log.Logger) *SyslogCollector {
return &SyslogCollector{logger: logger}
}
func (s *SyslogCollector) LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error) {
s.logger.Printf("sylog-server: launching syslog server on addr: %v", addr)
s.ctx = ctx
// configuring the task dir
if err := s.configureTaskDir(); err != nil {
return nil, err
}
channel := make(syslog.LogPartsChannel)
handler := syslog.NewChannelHandler(channel)
s.server = syslog.NewServer()
s.server.SetFormat(syslog.RFC5424)
s.server.SetHandler(handler)
s.server.ListenTCP(addr.String())
if err := s.server.Boot(); err != nil {
return nil, err
}
// r, w := io.Pipe()
// logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
// lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
// fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles,
// logFileSize, s.logger)
// if err != nil {
// return err
// }
// go lro.Start(r)
go func(channel syslog.LogPartsChannel) {
for logParts := range channel {
s.logger.Printf("logparts: %v", logParts)
// w.Write([]byte(logParts))
}
}(channel)
go s.server.Wait()
return &SyslogCollectorState{}, nil
}
func (s *SyslogCollector) Exit() error {
return nil
}
func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error {
return nil
}
// configureTaskDir sets the task dir in the executor
func (s *SyslogCollector) configureTaskDir() error {
taskDir, ok := s.ctx.AllocDir.TaskDirs[s.ctx.TaskName]
if !ok {
return fmt.Errorf("couldn't find task directory for task %v", s.ctx.TaskName)
}
s.taskDir = taskDir
return nil
}

View File

@@ -0,0 +1,75 @@
package driver
import (
"log"
"net"
"net/rpc"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver/syslog"
"github.com/hashicorp/nomad/nomad/structs"
)
type SyslogCollectorRPC struct {
client *rpc.Client
}
type LaunchCollectorArgs struct {
AddrNet string
AddrName string
Ctx *syslog.LogCollectorContext
}
func (e *SyslogCollectorRPC) LaunchCollector(addr net.Addr,
ctx *syslog.LogCollectorContext) (*syslog.SyslogCollectorState, error) {
var ss *syslog.SyslogCollectorState
err := e.client.Call("Plugin.LaunchCollector",
LaunchCollectorArgs{AddrNet: addr.Network(), AddrName: addr.String(), Ctx: ctx}, &ss)
return ss, err
}
func (e *SyslogCollectorRPC) Exit() error {
return e.client.Call("Plugin.Exit", new(interface{}), new(interface{}))
}
func (e *SyslogCollectorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error {
return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{}))
}
type SyslogCollectorRPCServer struct {
Impl syslog.LogCollector
}
func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs,
resp *syslog.SyslogCollectorState) error {
addr, _ := net.ResolveTCPAddr(args.AddrNet, args.AddrName)
ss, err := s.Impl.LaunchCollector(addr, args.Ctx)
if err != nil {
*resp = *ss
}
return err
}
func (s *SyslogCollectorRPCServer) Exit(args interface{}, resp *interface{}) error {
return s.Impl.Exit()
}
func (s *SyslogCollectorRPCServer) UpdateLogConfig(logConfig *structs.LogConfig, resp *interface{}) error {
return s.Impl.UpdateLogConfig(logConfig)
}
type SyslogCollectorPlugin struct {
logger *log.Logger
Impl *SyslogCollectorRPCServer
}
func (p *SyslogCollectorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
if p.Impl == nil {
p.Impl = &SyslogCollectorRPCServer{Impl: syslog.NewSyslogCollector(p.logger)}
}
return p.Impl, nil
}
func (p *SyslogCollectorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &SyslogCollectorRPC{client: c}, nil
}

View File

@@ -3,17 +3,20 @@ package driver
import (
"fmt"
"io"
"net"
"os"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
"github.com/hashicorp/nomad/client/driver/syslog"
)
// createExecutor launches an executor plugin and returns an instance of the
// Executor interface
func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *config.Config) (executor.Executor, *plugin.Client, error) {
func createExecutor(config *plugin.ClientConfig, w io.Writer,
clientConfig *config.Config) (executor.Executor, *plugin.Client, error) {
config.HandshakeConfig = HandshakeConfig
config.Plugins = GetPluginMap(w)
config.MaxPort = clientConfig.ClientMaxPort
@@ -39,6 +42,48 @@ func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *conf
return executorPlugin, executorClient, nil
}
func createLogCollector(config *plugin.ClientConfig, w io.Writer,
clientConfig *config.Config) (syslog.LogCollector, *plugin.Client, error) {
config.HandshakeConfig = HandshakeConfig
config.Plugins = GetPluginMap(w)
config.MaxPort = clientConfig.ClientMaxPort
config.MinPort = clientConfig.ClientMinPort
if config.Cmd != nil {
isolateCommand(config.Cmd)
}
syslogClient := plugin.NewClient(config)
rpcCLient, err := syslogClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for syslog plugin: %v", err)
}
raw, err := rpcCLient.Dispense("syslogcollector")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the syslog plugin: %v", err)
}
logCollector := raw.(syslog.LogCollector)
return logCollector, syslogClient, nil
}
// getFreePort returns a free port ready to be listened on between upper and
// lower bounds
func getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) {
for i := lowerBound; i <= upperBound; i++ {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i))
if err != nil {
return nil, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
continue
}
defer l.Close()
return l.Addr(), nil
}
return nil, fmt.Errorf("No free port found")
}
// killProcess kills a process with the given pid
func killProcess(pid int) error {
proc, err := os.FindProcess(pid)

43
command/syslog_plugin.go Normal file
View File

@@ -0,0 +1,43 @@
package command
import (
"os"
"strings"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver"
)
type SyslogPluginCommand struct {
Meta
}
func (e *SyslogPluginCommand) Help() string {
helpText := `
This is a command used by Nomad internally to launch a syslog collector"
`
return strings.TrimSpace(helpText)
}
func (s *SyslogPluginCommand) Synopsis() string {
return "internal - lanch a syslog collector plugin"
}
func (s *SyslogPluginCommand) Run(args []string) int {
if len(args) == 0 {
s.Ui.Error("log output file isn't provided")
}
logFileName := args[0]
stdo, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
s.Ui.Error(err.Error())
return 1
}
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: driver.HandshakeConfig,
Plugins: driver.GetPluginMap(stdo),
})
return 0
}

View File

@@ -105,7 +105,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"syslog": func() (cli.Command, error) {
return &command.SyslogPluginCommand{
Meta: meta,
}, nil
},
"server-force-leave": func() (cli.Command, error) {
return &command.ServerForceLeaveCommand{
Meta: meta,

View File

@@ -33,6 +33,7 @@ func RunCustom(args []string, commands map[string]cli.CommandFactory) int {
for k, _ := range commands {
switch k {
case "executor":
case "syslog":
default:
commandsInclude = append(commandsInclude, k)
}