From b65fd2624e0f872636b8cccdebfc2ba007341884 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 20 Jul 2016 10:18:05 -0700 Subject: [PATCH] Support non-following logs --- api/fs.go | 8 ++++-- command/agent/fs_endpoint.go | 56 ++++++++++++++++++++++++++++++------ command/logs.go | 15 ++++++---- 3 files changed, 63 insertions(+), 16 deletions(-) diff --git a/api/fs.go b/api/fs.go index 42ab3e579..706353028 100644 --- a/api/fs.go +++ b/api/fs.go @@ -279,15 +279,16 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, // Logs streams the content of a tasks logs blocking on EOF. // The parameters are: // * allocation: the allocation to stream from. +// * follow: Whether the logs should be followed. // * task: the tasks name to stream logs for. // * logType: Either "stdout" or "stderr" -// * offset: The offset to start streaming data at. // * origin: Either "start" or "end" and defines from where the offset is applied. +// * offset: The offset to start streaming data at. // * cancel: A channel that when closed, streaming will end. // // The return value is a channel that will emit StreamFrames as they are read. -func (a *AllocFS) Logs(alloc *Allocation, task, logType, origin string, offset int64, - cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) { +func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string, + offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) { node, _, err := a.client.Nodes().Info(alloc.NodeID, q) if err != nil { @@ -303,6 +304,7 @@ func (a *AllocFS) Logs(alloc *Allocation, task, logType, origin string, offset i Path: fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID), } v := url.Values{} + v.Set("follow", strconv.FormatBool(follow)) v.Set("task", task) v.Set("type", logType) v.Set("origin", origin) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 225530561..2eba9f456 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -350,17 +350,16 @@ OUTER: } // Flush any existing frames - s.l.Lock() select { case o := <-s.outbound: // Send the frame and then clear the current working frame if err = s.enc.Encode(o); err != nil { - s.l.Unlock() return } default: } + s.l.Lock() if s.f != nil { s.f.Data = s.readData() s.enc.Encode(s.f) @@ -622,7 +621,11 @@ OUTER: continue OUTER case <-framer.ExitCh(): return nil - case err := <-eofCancelCh: + case err, ok := <-eofCancelCh: + if !ok { + return nil + } + return err } } @@ -634,11 +637,13 @@ OUTER: // Logs streams the content of a log blocking on EOF. The parameters are: // * task: task name to stream logs for. // * type: stdout/stderr to stream. +// * follow: A boolean of whether to follow the logs. // * offset: The offset to start streaming data at, defaults to zero. // * origin: Either "start" or "end" and defines from where the offset is // applied. Defaults to "start". func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, task, logType string + var follow bool var err error q := req.URL.Query() @@ -651,6 +656,10 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac return nil, taskNotPresentErr } + if follow, err = strconv.ParseBool(q.Get("follow")); err != nil { + return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) + } + logType = q.Get("type") switch logType { case "stdout", "stderr": @@ -684,10 +693,13 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - return nil, s.logs(offset, origin, task, logType, fs, output) + return nil, s.logs(follow, offset, origin, task, logType, fs, output) } -func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error { +func (s *HTTPServer) logs(follow bool, offset int64, + origin, task, logType string, + fs allocdir.AllocDirFS, output io.WriteCloser) error { + // Create the framer framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() @@ -727,15 +739,39 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi return fmt.Errorf("failed to list entries: %v", err) } + // If we are not following logs, determine the max index for the logs we are + // interested in so we can stop there. + maxIndex := int64(math.MaxInt64) + if !follow { + _, idx, _, err := findClosest(entries, maxIndex, 0, task, logType) + if err != nil { + return err + } + maxIndex = idx + } + logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType) if err != nil { return err } + var eofCancelCh chan error + exitAfter := false + if !follow && idx > maxIndex { + // Exceeded what was there initially so return + return nil + } else if !follow && idx == maxIndex { + // At the end + eofCancelCh = make(chan error) + close(eofCancelCh) + exitAfter = true + } else { + nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1)) + eofCancelCh = fs.BlockUntilExists(nextPath, &t) + } + p := filepath.Join(logPath, logEntry.Name) - nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1)) - nextExists := fs.BlockUntilExists(nextPath, &t) - err = s.stream(openOffset, p, fs, framer, nextExists) + err = s.stream(openOffset, p, fs, framer, eofCancelCh) // Check if there was an error where the file does not exist. That means // it got rotated out from under us. @@ -746,6 +782,10 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi return err } + if exitAfter { + return nil + } + //Since we successfully streamed, update the overall offset/idx. offset = int64(0) nextIdx = idx + 1 diff --git a/command/logs.go b/command/logs.go index 158858993..e5aa6297a 100644 --- a/command/logs.go +++ b/command/logs.go @@ -34,6 +34,10 @@ Logs Specific Options: -job Use a random allocation from a specified job-id. + -f + Causes the output to not stop when the end of the logs are reached, but + rather to wait for additional output. + -tail Show the files contents with offsets relative to the end of the file. If no offset is given, -n is defaulted to 10. @@ -53,7 +57,7 @@ func (l *LogsCommand) Synopsis() string { } func (l *LogsCommand) Run(args []string) int { - var verbose, job, tail, stderr bool + var verbose, job, tail, stderr, follow bool var numLines, numBytes int64 flags := l.Meta.FlagSet("logs-list", FlagSetClient) @@ -61,6 +65,7 @@ func (l *LogsCommand) Run(args []string) int { flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&job, "job", false, "") flags.BoolVar(&tail, "tail", false, "") + flags.BoolVar(&follow, "f", false, "") flags.BoolVar(&stderr, "stderr", false, "") flags.Int64Var(&numLines, "n", -1, "") flags.Int64Var(&numBytes, "c", -1, "") @@ -187,7 +192,7 @@ func (l *LogsCommand) Run(args []string) int { var r io.ReadCloser var readErr error if !tail { - r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0) + r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginStart, 0) if readErr != nil { readErr = fmt.Errorf("Error reading file: %v", readErr) } @@ -206,7 +211,7 @@ func (l *LogsCommand) Run(args []string) int { numLines = defaultTailLines } - r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset) + r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginEnd, offset) // If numLines is set, wrap the reader if numLines != -1 { @@ -231,10 +236,10 @@ func (l *LogsCommand) Run(args []string) int { // followFile outputs the contents of the file to stdout relative to the end of // the file. func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, - task, logType, origin string, offset int64) (io.ReadCloser, error) { + follow bool, task, logType, origin string, offset int64) (io.ReadCloser, error) { cancel := make(chan struct{}) - frames, _, err := client.AllocFS().Logs(alloc, task, logType, origin, offset, cancel, nil) + frames, _, err := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil) if err != nil { panic(err.Error()) return nil, err