From 7eedcbfcf6b2b91c1b17edcf44e8fd5818559500 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 9 Feb 2016 18:24:30 -0800 Subject: [PATCH] Starting the syslog collector along with a docker container --- client/driver/docker.go | 48 ++++++++++++++ client/driver/plugins.go | 12 +++- client/driver/syslog/collector.go | 106 ++++++++++++++++++++++++++++++ client/driver/syslog_plugin.go | 75 +++++++++++++++++++++ client/driver/utils.go | 47 ++++++++++++- command/syslog_plugin.go | 43 ++++++++++++ commands.go | 6 +- main.go | 1 + 8 files changed, 333 insertions(+), 5 deletions(-) create mode 100644 client/driver/syslog/collector.go create mode 100644 client/driver/syslog_plugin.go create mode 100644 command/syslog_plugin.go diff --git a/client/driver/docker.go b/client/driver/docker.go index a71e9f060..3841572eb 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -6,6 +6,7 @@ import ( "log" "net" "os" + "os/exec" "path/filepath" "strconv" "strings" @@ -14,10 +15,14 @@ import ( docker "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/driver/structs" + "github.com/hashicorp/nomad/client/driver/syslog" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" ) @@ -75,6 +80,8 @@ type dockerPID struct { } type DockerHandle struct { + pluginClient *plugin.Client + logCollector syslog.LogCollector client *docker.Client logger *log.Logger cleanupContainer bool @@ -474,8 +481,41 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err) } } + + taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] + if !ok { + return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + } + d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID) + syslogAddr, err := getFreePort(d.config.ClientMinPort, d.config.ClientMaxPort) + if err != nil { + return nil, fmt.Errorf("error creating the syslog plugin: %v", err) + } + + bin, err := discover.NomadExecutable() + if err != nil { + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) + } + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) + pluginConfig := &plugin.ClientConfig{ + Cmd: exec.Command(bin, "syslog", pluginLogFile), + } + + logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + return nil, err + } + logCollectorCtx := &syslog.LogCollectorContext{ + TaskName: task.Name, + AllocDir: ctx.AllocDir, + LogConfig: task.LogConfig, + } + if _, err := logCollector.LaunchCollector(syslogAddr, logCollectorCtx); err != nil { + return nil, fmt.Errorf("failed to start syslog collector: %v", err) + } + config, err := d.createContainer(ctx, task, &driverConfig) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err) @@ -543,6 +583,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle // Return a driver handle h := &DockerHandle{ client: client, + logCollector: logCollector, + pluginClient: pluginClient, cleanupContainer: cleanupContainer, cleanupImage: cleanupImage, logger: d.logger, @@ -706,4 +748,10 @@ func (h *DockerHandle) run() { close(h.doneCh) h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err) close(h.waitCh) + + // Shutdown the syslog collector + if err := h.logCollector.Exit(); err != nil { + h.logger.Printf("[ERR] driver.docker: failed to kill the syslog collector: %v", err) + } + h.pluginClient.Kill() } diff --git a/client/driver/plugins.go b/client/driver/plugins.go index acff16105..4808d81e1 100644 --- a/client/driver/plugins.go +++ b/client/driver/plugins.go @@ -15,9 +15,15 @@ var HandshakeConfig = plugin.HandshakeConfig{ } func GetPluginMap(w io.Writer) map[string]plugin.Plugin { - p := new(ExecutorPlugin) - p.logger = log.New(w, "", log.LstdFlags) - return map[string]plugin.Plugin{"executor": p} + e := new(ExecutorPlugin) + e.logger = log.New(w, "", log.LstdFlags) + + s := new(SyslogCollectorPlugin) + s.logger = log.New(w, "", log.LstdFlags) + return map[string]plugin.Plugin{ + "executor": e, + "syslogcollector": s, + } } // ExecutorReattachConfig is the config that we seralize and de-serialize and diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go new file mode 100644 index 000000000..44610537a --- /dev/null +++ b/client/driver/syslog/collector.go @@ -0,0 +1,106 @@ +package syslog + +import ( + "fmt" + // "io" + "log" + "net" + // "path/filepath" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/driver/logrotator" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/mcuadros/go-syslog" +) + +type LogCollectorContext struct { + TaskName string + AllocDir *allocdir.AllocDir + LogConfig *structs.LogConfig +} + +type SyslogCollectorState struct { + IsolationConfig *IsolationConfig +} + +type LogCollector interface { + LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error) + Exit() error + UpdateLogConfig(logConfig *structs.LogConfig) error +} + +type IsolationConfig struct { +} + +type SyslogCollector struct { + addr net.Addr + logConfig *structs.LogConfig + ctx *LogCollectorContext + + lro *logrotator.LogRotator + lre *logrotator.LogRotator + server *syslog.Server + taskDir string + + logger *log.Logger +} + +func NewSyslogCollector(logger *log.Logger) *SyslogCollector { + return &SyslogCollector{logger: logger} +} + +func (s *SyslogCollector) LaunchCollector(addr net.Addr, ctx *LogCollectorContext) (*SyslogCollectorState, error) { + s.logger.Printf("sylog-server: launching syslog server on addr: %v", addr) + s.ctx = ctx + // configuring the task dir + if err := s.configureTaskDir(); err != nil { + return nil, err + } + + channel := make(syslog.LogPartsChannel) + handler := syslog.NewChannelHandler(channel) + + s.server = syslog.NewServer() + s.server.SetFormat(syslog.RFC5424) + s.server.SetHandler(handler) + s.server.ListenTCP(addr.String()) + if err := s.server.Boot(); err != nil { + return nil, err + } + // r, w := io.Pipe() + // logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) + // lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), + // fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, + // logFileSize, s.logger) + // if err != nil { + // return err + // } + // go lro.Start(r) + + go func(channel syslog.LogPartsChannel) { + for logParts := range channel { + s.logger.Printf("logparts: %v", logParts) + // w.Write([]byte(logParts)) + } + }(channel) + go s.server.Wait() + return &SyslogCollectorState{}, nil +} + +func (s *SyslogCollector) Exit() error { + return nil +} + +func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error { + return nil +} + +// configureTaskDir sets the task dir in the executor +func (s *SyslogCollector) configureTaskDir() error { + taskDir, ok := s.ctx.AllocDir.TaskDirs[s.ctx.TaskName] + if !ok { + return fmt.Errorf("couldn't find task directory for task %v", s.ctx.TaskName) + } + s.taskDir = taskDir + return nil +} diff --git a/client/driver/syslog_plugin.go b/client/driver/syslog_plugin.go new file mode 100644 index 000000000..f999ef545 --- /dev/null +++ b/client/driver/syslog_plugin.go @@ -0,0 +1,75 @@ +package driver + +import ( + "log" + "net" + "net/rpc" + + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/driver/syslog" + "github.com/hashicorp/nomad/nomad/structs" +) + +type SyslogCollectorRPC struct { + client *rpc.Client +} + +type LaunchCollectorArgs struct { + AddrNet string + AddrName string + Ctx *syslog.LogCollectorContext +} + +func (e *SyslogCollectorRPC) LaunchCollector(addr net.Addr, + ctx *syslog.LogCollectorContext) (*syslog.SyslogCollectorState, error) { + var ss *syslog.SyslogCollectorState + err := e.client.Call("Plugin.LaunchCollector", + LaunchCollectorArgs{AddrNet: addr.Network(), AddrName: addr.String(), Ctx: ctx}, &ss) + return ss, err +} + +func (e *SyslogCollectorRPC) Exit() error { + return e.client.Call("Plugin.Exit", new(interface{}), new(interface{})) +} + +func (e *SyslogCollectorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error { + return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{})) +} + +type SyslogCollectorRPCServer struct { + Impl syslog.LogCollector +} + +func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs, + resp *syslog.SyslogCollectorState) error { + addr, _ := net.ResolveTCPAddr(args.AddrNet, args.AddrName) + ss, err := s.Impl.LaunchCollector(addr, args.Ctx) + if err != nil { + *resp = *ss + } + return err +} + +func (s *SyslogCollectorRPCServer) Exit(args interface{}, resp *interface{}) error { + return s.Impl.Exit() +} + +func (s *SyslogCollectorRPCServer) UpdateLogConfig(logConfig *structs.LogConfig, resp *interface{}) error { + return s.Impl.UpdateLogConfig(logConfig) +} + +type SyslogCollectorPlugin struct { + logger *log.Logger + Impl *SyslogCollectorRPCServer +} + +func (p *SyslogCollectorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { + if p.Impl == nil { + p.Impl = &SyslogCollectorRPCServer{Impl: syslog.NewSyslogCollector(p.logger)} + } + return p.Impl, nil +} + +func (p *SyslogCollectorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { + return &SyslogCollectorRPC{client: c}, nil +} diff --git a/client/driver/utils.go b/client/driver/utils.go index 188e467d2..c3e253eda 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -3,17 +3,20 @@ package driver import ( "fmt" "io" + "net" "os" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" + "github.com/hashicorp/nomad/client/driver/syslog" ) // createExecutor launches an executor plugin and returns an instance of the // Executor interface -func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *config.Config) (executor.Executor, *plugin.Client, error) { +func createExecutor(config *plugin.ClientConfig, w io.Writer, + clientConfig *config.Config) (executor.Executor, *plugin.Client, error) { config.HandshakeConfig = HandshakeConfig config.Plugins = GetPluginMap(w) config.MaxPort = clientConfig.ClientMaxPort @@ -39,6 +42,48 @@ func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *conf return executorPlugin, executorClient, nil } +func createLogCollector(config *plugin.ClientConfig, w io.Writer, + clientConfig *config.Config) (syslog.LogCollector, *plugin.Client, error) { + config.HandshakeConfig = HandshakeConfig + config.Plugins = GetPluginMap(w) + config.MaxPort = clientConfig.ClientMaxPort + config.MinPort = clientConfig.ClientMinPort + if config.Cmd != nil { + isolateCommand(config.Cmd) + } + + syslogClient := plugin.NewClient(config) + rpcCLient, err := syslogClient.Client() + if err != nil { + return nil, nil, fmt.Errorf("error creating rpc client for syslog plugin: %v", err) + } + + raw, err := rpcCLient.Dispense("syslogcollector") + if err != nil { + return nil, nil, fmt.Errorf("unable to dispense the syslog plugin: %v", err) + } + logCollector := raw.(syslog.LogCollector) + return logCollector, syslogClient, nil +} + +// getFreePort returns a free port ready to be listened on between upper and +// lower bounds +func getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) { + for i := lowerBound; i <= upperBound; i++ { + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) + if err != nil { + return nil, err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + continue + } + defer l.Close() + return l.Addr(), nil + } + return nil, fmt.Errorf("No free port found") +} + // killProcess kills a process with the given pid func killProcess(pid int) error { proc, err := os.FindProcess(pid) diff --git a/command/syslog_plugin.go b/command/syslog_plugin.go new file mode 100644 index 000000000..b10d6217d --- /dev/null +++ b/command/syslog_plugin.go @@ -0,0 +1,43 @@ +package command + +import ( + "os" + "strings" + + "github.com/hashicorp/go-plugin" + + "github.com/hashicorp/nomad/client/driver" +) + +type SyslogPluginCommand struct { + Meta +} + +func (e *SyslogPluginCommand) Help() string { + helpText := ` + This is a command used by Nomad internally to launch a syslog collector" + ` + return strings.TrimSpace(helpText) +} + +func (s *SyslogPluginCommand) Synopsis() string { + return "internal - lanch a syslog collector plugin" +} + +func (s *SyslogPluginCommand) Run(args []string) int { + if len(args) == 0 { + s.Ui.Error("log output file isn't provided") + } + logFileName := args[0] + stdo, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + s.Ui.Error(err.Error()) + return 1 + } + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: driver.HandshakeConfig, + Plugins: driver.GetPluginMap(stdo), + }) + + return 0 +} diff --git a/commands.go b/commands.go index 6ad1ba3c3..c77c8482a 100644 --- a/commands.go +++ b/commands.go @@ -105,7 +105,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, - + "syslog": func() (cli.Command, error) { + return &command.SyslogPluginCommand{ + Meta: meta, + }, nil + }, "server-force-leave": func() (cli.Command, error) { return &command.ServerForceLeaveCommand{ Meta: meta, diff --git a/main.go b/main.go index 1e3558658..11d237709 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,7 @@ func RunCustom(args []string, commands map[string]cli.CommandFactory) int { for k, _ := range commands { switch k { case "executor": + case "syslog": default: commandsInclude = append(commandsInclude, k) }