diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 37a4c1df4..225530561 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -726,21 +727,11 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi return fmt.Errorf("failed to list entries: %v", err) } - logEntry, idx, err := findClosest(entries, nextIdx, task, logType) + logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType) if err != nil { return err } - // Apply the offset we should open at. Handling the negative case is - // only for the first time. - openOffset := offset - if openOffset < 0 { - openOffset = logEntry.Size + openOffset - if openOffset < 0 { - openOffset = 0 - } - } - p := filepath.Join(logPath, logEntry.Name) nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1)) nextExists := fs.BlockUntilExists(nextPath, &t) @@ -763,26 +754,36 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi return nil } -func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64, - task, logType string) (*allocdir.AllocFileInfo, int64, error) { +// indexTuple and indexTupleArray are used to find the correct log entry to +// start streaming logs from +type indexTuple struct { + idx int64 + entry *allocdir.AllocFileInfo +} - if len(entries) == 0 { - return nil, 0, fmt.Errorf("no file entries found") - } +type indexTupleArray []indexTuple +func (a indexTupleArray) Len() int { return len(a) } +func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx } +func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// findClosest takes a list of entries, the desired log index and desired log +// offset (which can be negative, treated as offset from end), task name and log +// type and returns the log entry, the log index, the offset to read from and a +// potential error. +func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64, + task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) { + + // Build the matching indexes + var indexes []indexTuple prefix := fmt.Sprintf("%s.%s.", task, logType) - - var closest *allocdir.AllocFileInfo - closestIdx := int64(math.MaxInt64) - closestDist := int64(math.MaxInt64) for _, entry := range entries { if entry.IsDir { continue } - idxStr := strings.TrimPrefix(entry.Name, prefix) - // If nothing was trimmed, then it is not a match + idxStr := strings.TrimPrefix(entry.Name, prefix) if idxStr == entry.Name { continue } @@ -790,25 +791,68 @@ func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64, // Convert to an int idx, err := strconv.Atoi(idxStr) if err != nil { - return nil, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err) + return nil, 0, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err) } - // Determine distance to desired - d := desiredIdx - int64(idx) - if d < 0 { - d *= -1 - } + indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry}) + } + + if len(indexes) == 0 { + return nil, 0, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType) + } + + // Binary search the indexes to get the desiredIdx + sort.Sort(indexTupleArray(indexes)) + i := sort.Search(len(indexes), func(i int) bool { return indexes[i].idx >= desiredIdx }) + l := len(indexes) + if i == l { + // Use the last index if the number is bigger than all of them. + i = l - 1 + } + + // Get to the correct offset + offset := desiredOffset + idx := int64(i) + for { + s := indexes[idx].entry.Size + + // Base case + if offset == 0 { + break + } else if offset < 0 { + // Going backwards + if newOffset := s + offset; newOffset >= 0 { + // Current file works + offset = newOffset + break + } else if idx == 0 { + // Already at the end + offset = 0 + break + } else { + // Try the file before + offset = newOffset + idx -= 1 + continue + } + } else { + // Going forward + if offset <= s { + // Current file works + break + } else if idx == int64(l-1) { + // Already at the end + offset = s + break + } else { + // Try the next file + offset = offset - s + idx += 1 + continue + } - if d <= closestDist && (int64(idx) < closestIdx || int64(idx) == desiredIdx) { - closestDist = d - closest = entry - closestIdx = int64(idx) } } - if closest == nil { - return nil, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType) - } - - return closest, closestIdx, nil + return indexes[idx].entry, indexes[idx].idx, offset, nil } diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index c8c6d827f..ced3708af 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -4,6 +4,7 @@ import ( "bytes" "io" "io/ioutil" + "math" "net/http" "net/http/httptest" "os" @@ -743,3 +744,208 @@ func TestHTTP_Stream_Delete(t *testing.T) { }) } + +func TestLogs_findClosest(t *testing.T) { + task := "foo" + entries := []*allocdir.AllocFileInfo{ + { + Name: "foo.stdout.0", + Size: 100, + }, + { + Name: "foo.stdout.1", + Size: 100, + }, + { + Name: "foo.stdout.2", + Size: 100, + }, + { + Name: "foo.stdout.3", + Size: 100, + }, + { + Name: "foo.stderr.0", + Size: 100, + }, + { + Name: "foo.stderr.1", + Size: 100, + }, + { + Name: "foo.stderr.2", + Size: 100, + }, + } + + cases := []struct { + Entries []*allocdir.AllocFileInfo + DesiredIdx int64 + DesiredOffset int64 + Task string + LogType string + ExpectedFile string + ExpectedIdx int64 + ExpectedOffset int64 + Error bool + }{ + // Test error cases + { + Entries: nil, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + Error: true, + }, + { + Entries: entries[0:3], + DesiredIdx: 0, + Task: task, + LogType: "stderr", + Error: true, + }, + + // Test begining cases + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + }, + { + // Desired offset should be ignored at edges + Entries: entries, + DesiredIdx: 0, + DesiredOffset: -100, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + ExpectedOffset: 0, + }, + { + // Desired offset should be ignored at edges + Entries: entries, + DesiredIdx: 1, + DesiredOffset: -1000, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + ExpectedOffset: 0, + }, + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stderr", + ExpectedFile: entries[4].Name, + ExpectedIdx: 0, + }, + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + }, + + // Test middle cases + { + Entries: entries, + DesiredIdx: 1, + Task: task, + LogType: "stdout", + ExpectedFile: entries[1].Name, + ExpectedIdx: 1, + }, + { + Entries: entries, + DesiredIdx: 1, + DesiredOffset: 10, + Task: task, + LogType: "stdout", + ExpectedFile: entries[1].Name, + ExpectedIdx: 1, + ExpectedOffset: 10, + }, + { + Entries: entries, + DesiredIdx: 1, + DesiredOffset: 110, + Task: task, + LogType: "stdout", + ExpectedFile: entries[2].Name, + ExpectedIdx: 2, + ExpectedOffset: 10, + }, + { + Entries: entries, + DesiredIdx: 1, + Task: task, + LogType: "stderr", + ExpectedFile: entries[5].Name, + ExpectedIdx: 1, + }, + // Test end cases + { + Entries: entries, + DesiredIdx: math.MaxInt64, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + DesiredOffset: math.MaxInt64, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + ExpectedOffset: 100, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + DesiredOffset: -10, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + ExpectedOffset: 90, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + Task: task, + LogType: "stderr", + ExpectedFile: entries[6].Name, + ExpectedIdx: 2, + }, + } + + for i, c := range cases { + entry, idx, offset, err := findClosest(c.Entries, c.DesiredIdx, c.DesiredOffset, c.Task, c.LogType) + if err != nil { + if !c.Error { + t.Fatalf("case %d: Unexpected error: %v", i, err) + } + continue + } + + if entry.Name != c.ExpectedFile { + t.Fatalf("case %d: Got file %q; want %q", i, entry.Name, c.ExpectedFile) + } + if idx != c.ExpectedIdx { + t.Fatalf("case %d: Got index %d; want %d", i, idx, c.ExpectedIdx) + } + if offset != c.ExpectedOffset { + t.Fatalf("case %d: Got offset %d; want %d", i, offset, c.ExpectedOffset) + } + } +} diff --git a/command/logs.go b/command/logs.go index 25edc65db..282783d8d 100644 --- a/command/logs.go +++ b/command/logs.go @@ -159,7 +159,7 @@ func (l *LogsCommand) Run(args []string) int { var r io.ReadCloser var readErr error if !tail { - r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0, -1) + r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0) if readErr != nil { readErr = fmt.Errorf("Error reading file: %v", readErr) } @@ -178,7 +178,7 @@ func (l *LogsCommand) Run(args []string) int { numLines = defaultTailLines } - r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset, numLines) + r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset) // If numLines is set, wrap the reader if numLines != -1 { @@ -201,9 +201,9 @@ func (l *LogsCommand) Run(args []string) int { } // followFile outputs the contents of the file to stdout relative to the end of -// the file. If numLines does not equal -1, then tail -n behavior is used. +// the file. func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, - task, logType, origin string, offset, numLines int64) (io.ReadCloser, error) { + task, logType, origin string, offset int64) (io.ReadCloser, error) { cancel := make(chan struct{}) frames, _, err := client.AllocFS().Logs(alloc, task, logType, origin, offset, cancel, nil) @@ -218,11 +218,6 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, frameReader := api.NewFrameReader(frames, cancel) r = frameReader - // If numLines is set, wrap the reader - if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) - } - go func() { <-signalCh