diff --git a/client/logmon/logging/syslog_parser.go b/client/logmon/logging/syslog_parser.go deleted file mode 100644 index 09f1adcfa..000000000 --- a/client/logmon/logging/syslog_parser.go +++ /dev/null @@ -1,159 +0,0 @@ -// +build darwin dragonfly freebsd linux netbsd openbsd solaris windows - -package logging - -import ( - "fmt" - "strconv" - - syslog "github.com/RackSec/srslog" - hclog "github.com/hashicorp/go-hclog" -) - -// Errors related to parsing priority -var ( - ErrPriorityNoStart = fmt.Errorf("No start char found for priority") - ErrPriorityEmpty = fmt.Errorf("Priority field empty") - ErrPriorityNoEnd = fmt.Errorf("No end char found for priority") - ErrPriorityTooShort = fmt.Errorf("Priority field too short") - ErrPriorityTooLong = fmt.Errorf("Priority field too long") - ErrPriorityNonDigit = fmt.Errorf("Non digit found in priority") -) - -// Priority header and ending characters -const ( - PRI_PART_START = '<' - PRI_PART_END = '>' -) - -// SyslogMessage represents a log line received -type SyslogMessage struct { - Message []byte - Severity syslog.Priority -} - -// Priority holds all the priority bits in a syslog log line -type Priority struct { - Pri int - Facility syslog.Priority - Severity syslog.Priority -} - -// DockerLogParser parses a line of log message that the docker daemon ships -type DockerLogParser struct { - logger hclog.Logger -} - -// NewDockerLogParser creates a new DockerLogParser -func NewDockerLogParser(logger hclog.Logger) *DockerLogParser { - return &DockerLogParser{logger: logger} -} - -// Parse parses a syslog log line -func (d *DockerLogParser) Parse(line []byte) *SyslogMessage { - pri, _, _ := d.parsePriority(line) - msgIdx := d.logContentIndex(line) - - // Create a copy of the line so that subsequent Scans do not override the - // message - lineCopy := make([]byte, len(line[msgIdx:])) - copy(lineCopy, line[msgIdx:]) - - return &SyslogMessage{ - Severity: pri.Severity, - Message: lineCopy, - } -} - -// logContentIndex finds out the index of the start index of the content in a -// syslog line -func (d *DockerLogParser) logContentIndex(line []byte) int { - cursor := 0 - numSpace := 0 - numColons := 0 - // first look for at least 2 colons. This matches into the date that has no more spaces in it - // DefaultFormatter log line look: '<30>2016-07-06T15:13:11Z00:00 hostname docker/9648c64f5037[16200]' - // UnixFormatter log line look: '<30>Jul 6 15:13:11 docker/9648c64f5037[16200]' - for i := 0; i < len(line); i++ { - if line[i] == ':' { - numColons += 1 - if numColons == 2 { - cursor = i - break - } - } - } - // then look for the next space - for i := cursor; i < len(line); i++ { - if line[i] == ' ' { - numSpace += 1 - if numSpace == 1 { - cursor = i - break - } - } - } - // then the colon is what separates it, followed by a space - for i := cursor; i < len(line); i++ { - if line[i] == ':' && i+1 < len(line) && line[i+1] == ' ' { - cursor = i + 1 - break - } - } - // return the cursor to the next character - return cursor + 1 -} - -// parsePriority parses the priority in a syslog message -func (d *DockerLogParser) parsePriority(line []byte) (Priority, int, error) { - cursor := 0 - pri := d.newPriority(0) - if len(line) <= 0 { - return pri, cursor, ErrPriorityEmpty - } - if line[cursor] != PRI_PART_START { - return pri, cursor, ErrPriorityNoStart - } - i := 1 - priDigit := 0 - for i < len(line) { - if i >= 5 { - return pri, cursor, ErrPriorityTooLong - } - c := line[i] - if c == PRI_PART_END { - if i == 1 { - return pri, cursor, ErrPriorityTooShort - } - cursor = i + 1 - return d.newPriority(priDigit), cursor, nil - } - if d.isDigit(c) { - v, e := strconv.Atoi(string(c)) - if e != nil { - return pri, cursor, e - } - priDigit = (priDigit * 10) + v - } else { - return pri, cursor, ErrPriorityNonDigit - } - i++ - } - return pri, cursor, ErrPriorityNoEnd -} - -// isDigit checks if a byte is a numeric char -func (d *DockerLogParser) isDigit(c byte) bool { - return c >= '0' && c <= '9' -} - -// newPriority creates a new default priority -func (d *DockerLogParser) newPriority(p int) Priority { - // The Priority value is calculated by first multiplying the Facility - // number by 8 and then adding the numerical value of the Severity. - return Priority{ - Pri: p, - Facility: syslog.Priority(p / 8), - Severity: syslog.Priority(p % 8), - } -} diff --git a/client/logmon/logging/syslog_parser_unix_test.go b/client/logmon/logging/syslog_parser_unix_test.go deleted file mode 100644 index ebb337fb1..000000000 --- a/client/logmon/logging/syslog_parser_unix_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// +build darwin dragonfly freebsd linux netbsd openbsd solaris - -package logging - -import ( - "bytes" - "testing" - - syslog "github.com/RackSec/srslog" - "github.com/hashicorp/nomad/helper/testlog" -) - -func TestLogParser_Priority(t *testing.T) { - t.Parallel() - line := []byte("<30>2016-02-10T10:16:43-08:00 d-thinkpad docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf") - d := NewDockerLogParser(testlog.HCLogger(t)) - p, _, err := d.parsePriority(line) - if err != nil { - t.Fatalf("got an err: %v", err) - } - if p.Severity != syslog.LOG_INFO { - t.Fatalf("expected severity: %v, got: %v", syslog.LOG_INFO, p.Severity) - } - - idx := d.logContentIndex(line) - expected := bytes.Index(line, []byte("1:C 10 Feb 18:16:43.391")) - if idx != expected { - t.Fatalf("expected idx: %v, got: %v", expected, idx) - } -} - -func TestLogParser_Priority_UnixFormatter(t *testing.T) { - t.Parallel() - line := []byte("<30>Feb 6, 10:16:43 docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf") - d := NewDockerLogParser(testlog.HCLogger(t)) - p, _, err := d.parsePriority(line) - if err != nil { - t.Fatalf("got an err: %v", err) - } - if p.Severity != syslog.LOG_INFO { - t.Fatalf("expected severity: %v, got: %v", syslog.LOG_INFO, p.Severity) - } - - idx := d.logContentIndex(line) - expected := bytes.Index(line, []byte("1:C 10 Feb 18:16:43.391")) - if idx != expected { - t.Fatalf("expected idx: %v, got: %v", expected, idx) - } -} diff --git a/client/logmon/logging/syslog_server.go b/client/logmon/logging/syslog_server.go deleted file mode 100644 index d7c7a5283..000000000 --- a/client/logmon/logging/syslog_server.go +++ /dev/null @@ -1,93 +0,0 @@ -package logging - -import ( - "bufio" - "net" - "sync" - - hclog "github.com/hashicorp/go-hclog" -) - -// SyslogServer is a server which listens to syslog messages and parses them -type SyslogServer struct { - listener net.Listener - messages chan *SyslogMessage - parser *DockerLogParser - - doneCh chan interface{} - done bool - doneLock sync.Mutex - - logger hclog.Logger -} - -// NewSyslogServer creates a new syslog server -func NewSyslogServer(l net.Listener, messages chan *SyslogMessage, logger hclog.Logger) *SyslogServer { - logger = logger.Named("logcollector.server") - parser := NewDockerLogParser(logger) - return &SyslogServer{ - listener: l, - messages: messages, - parser: parser, - logger: logger, - doneCh: make(chan interface{}), - } -} - -// Start starts accepting syslog connections -func (s *SyslogServer) Start() { - for { - select { - case <-s.doneCh: - return - default: - connection, err := s.listener.Accept() - if err != nil { - s.doneLock.Lock() - done := s.done - s.doneLock.Unlock() - if done { - return - } - - s.logger.Error("error in accepting connection", "err", err) - continue - } - go s.read(connection) - } - } -} - -// read reads the bytes from a connection -func (s *SyslogServer) read(connection net.Conn) { - defer connection.Close() - scanner := bufio.NewScanner(bufio.NewReader(connection)) - - for { - select { - case <-s.doneCh: - return - default: - } - if scanner.Scan() { - b := scanner.Bytes() - msg := s.parser.Parse(b) - s.messages <- msg - } else { - return - } - } -} - -// Shutdown the syslog server -func (s *SyslogServer) Shutdown() { - s.doneLock.Lock() - defer s.doneLock.Unlock() - - if !s.done { - close(s.doneCh) - close(s.messages) - s.done = true - s.listener.Close() - } -} diff --git a/client/logmon/logging/syslog_server_unix_test.go b/client/logmon/logging/syslog_server_unix_test.go deleted file mode 100644 index d71e32ee9..000000000 --- a/client/logmon/logging/syslog_server_unix_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package logging - -import ( - "io/ioutil" - "net" - "os" - "path" - "testing" - "time" - - "github.com/hashicorp/nomad/helper/testlog" -) - -func TestSyslogServer_Start_Shutdown(t *testing.T) { - t.Parallel() - dir, err := ioutil.TempDir("", "sock") - if err != nil { - t.Fatalf("Failed to create temporary directory: %v", err) - } - - sock := path.Join(dir, "socket") - defer os.Remove(sock) - - l, err := net.Listen("unix", sock) - if err != nil { - t.Fatalf("Failed to listen unix socket: %v", err) - } - - s := NewSyslogServer(l, make(chan *SyslogMessage, 2048), testlog.HCLogger(t)) - - go s.Start() - if s.done { - t.Fatalf("expected running SyslogServer, but not running") - } - - received := false - go func() { - for range s.messages { - received = true - } - }() - - conn, err := net.Dial("unix", sock) - if err != nil { - t.Fatalf("expected access to SyslogServer, but %v", err) - } - - _, err = conn.Write([]byte("syslog server test\n")) - if err != nil { - t.Fatalf("expected send data to SyslogServer but: %v", err) - } - - // Need to wait until SyslogServer received the data certainly - time.Sleep(1000 * time.Millisecond) - - if !received { - t.Fatalf("expected SyslogServer received data, but not received") - } - - defer conn.Close() - - s.Shutdown() - if !s.done { - t.Fatalf("expected SyslogServer done, but running") - } -} diff --git a/client/logmon/logging/universal_collector.go b/client/logmon/logging/universal_collector.go deleted file mode 100644 index e3399228f..000000000 --- a/client/logmon/logging/universal_collector.go +++ /dev/null @@ -1,206 +0,0 @@ -// +build darwin dragonfly freebsd linux netbsd openbsd solaris windows - -package logging - -import ( - "fmt" - "io" - "io/ioutil" - "net" - "os" - "runtime" - - syslog "github.com/RackSec/srslog" - hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/nomad/structs" -) - -// LogCollectorContext holds context to configure the syslog server -type LogCollectorContext struct { - // TaskName is the name of the Task - TaskName string - - // AllocDir is the handle to do operations on the alloc dir of - // the task - AllocDir *allocdir.AllocDir - - // LogConfig provides configuration related to log rotation - LogConfig *structs.LogConfig - - // PortUpperBound is the upper bound of the ports that we can use to start - // the syslog server - PortUpperBound uint - - // PortLowerBound is the lower bound of the ports that we can use to start - // the syslog server - PortLowerBound uint -} - -// SyslogCollectorState holds the address and isolation information of a launched -// syslog server -type SyslogCollectorState struct { - Addr string -} - -// LogCollector is an interface which allows a driver to launch a log server -// and update log configuration -type LogCollector interface { - LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) - Exit() error - UpdateLogConfig(logConfig *structs.LogConfig) error -} - -// SyslogCollector is a LogCollector which starts a syslog server and does -// rotation to incoming stream -type SyslogCollector struct { - ctx *LogCollectorContext - - lro *FileRotator - lre *FileRotator - server *SyslogServer - syslogChan chan *SyslogMessage - taskDir string - - logger hclog.Logger -} - -// NewSyslogCollector returns an implementation of the SyslogCollector -func NewSyslogCollector(logger hclog.Logger) *SyslogCollector { - return &SyslogCollector{logger: logger.Named("syslog-server"), - syslogChan: make(chan *SyslogMessage, 2048)} -} - -// LaunchCollector launches a new syslog server and starts writing log lines to -// files and rotates them -func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) { - l, err := s.getListener(ctx.PortLowerBound, ctx.PortUpperBound) - if err != nil { - return nil, err - } - s.logger.Debug("launching syslog server on addr", "addr", l.Addr().String()) - s.ctx = ctx - // configuring the task dir - if err := s.configureTaskDir(); err != nil { - return nil, err - } - - s.server = NewSyslogServer(l, s.syslogChan, s.logger) - go s.server.Start() - logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) - - //FIXME There's an easier way to get this - logdir := ctx.AllocDir.TaskDirs[ctx.TaskName].LogDir - lro, err := NewFileRotator(logdir, fmt.Sprintf("%v.stdout", ctx.TaskName), - ctx.LogConfig.MaxFiles, logFileSize, s.logger) - - if err != nil { - return nil, err - } - s.lro = lro - - lre, err := NewFileRotator(logdir, fmt.Sprintf("%v.stderr", ctx.TaskName), - ctx.LogConfig.MaxFiles, logFileSize, s.logger) - if err != nil { - return nil, err - } - s.lre = lre - - go s.collectLogs(lre, lro) - syslogAddr := fmt.Sprintf("%s://%s", l.Addr().Network(), l.Addr().String()) - return &SyslogCollectorState{Addr: syslogAddr}, nil -} - -func (s *SyslogCollector) collectLogs(we io.Writer, wo io.Writer) { - for logParts := range s.syslogChan { - // If the severity of the log line is err then we write to stderr - // otherwise all messages go to stdout - if logParts.Severity == syslog.LOG_ERR { - s.lre.Write(logParts.Message) - s.lre.Write([]byte{'\n'}) - } else { - s.lro.Write(logParts.Message) - s.lro.Write([]byte{'\n'}) - } - } -} - -// Exit kills the syslog server -func (s *SyslogCollector) Exit() error { - s.server.Shutdown() - s.lre.Close() - s.lro.Close() - return nil -} - -// UpdateLogConfig updates the log configuration -func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error { - s.ctx.LogConfig = logConfig - if s.lro == nil { - return fmt.Errorf("log rotator for stdout doesn't exist") - } - s.lro.MaxFiles = logConfig.MaxFiles - s.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) - - if s.lre == nil { - return fmt.Errorf("log rotator for stderr doesn't exist") - } - s.lre.MaxFiles = logConfig.MaxFiles - s.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) - return nil -} - -// configureTaskDir sets the task dir in the SyslogCollector -func (s *SyslogCollector) configureTaskDir() error { - taskDir, ok := s.ctx.AllocDir.TaskDirs[s.ctx.TaskName] - if !ok { - return fmt.Errorf("couldn't find task directory for task %v", s.ctx.TaskName) - } - s.taskDir = taskDir.Dir - return nil -} - -// getFreePort returns a free port ready to be listened on between upper and -// lower bounds -func (s *SyslogCollector) getListener(lowerBound uint, upperBound uint) (net.Listener, error) { - if runtime.GOOS == "windows" { - return s.listenerTCP(lowerBound, upperBound) - } - - return s.listenerUnix() -} - -// listenerTCP creates a TCP listener using an unused port between an upper and -// lower bound -func (s *SyslogCollector) listenerTCP(lowerBound uint, upperBound uint) (net.Listener, error) { - for i := lowerBound; i <= upperBound; i++ { - addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) - if err != nil { - return nil, err - } - l, err := net.ListenTCP("tcp", addr) - if err != nil { - continue - } - return l, nil - } - return nil, fmt.Errorf("No free port found") -} - -// listenerUnix creates a Unix domain socket -func (s *SyslogCollector) listenerUnix() (net.Listener, error) { - f, err := ioutil.TempFile("", "plugin") - if err != nil { - return nil, err - } - path := f.Name() - - if err := f.Close(); err != nil { - return nil, err - } - if err := os.Remove(path); err != nil { - return nil, err - } - - return net.Listen("unix", path) -}