Read from correct offset

This commit is contained in:
Alex Dadgar
2016-07-19 15:58:02 -07:00
parent 53f198932f
commit 4cf65d7944
3 changed files with 291 additions and 46 deletions

View File

@@ -8,6 +8,7 @@ import (
"net/http"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
@@ -726,21 +727,11 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi
return fmt.Errorf("failed to list entries: %v", err)
}
logEntry, idx, err := findClosest(entries, nextIdx, task, logType)
logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType)
if err != nil {
return err
}
// Apply the offset we should open at. Handling the negative case is
// only for the first time.
openOffset := offset
if openOffset < 0 {
openOffset = logEntry.Size + openOffset
if openOffset < 0 {
openOffset = 0
}
}
p := filepath.Join(logPath, logEntry.Name)
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1))
nextExists := fs.BlockUntilExists(nextPath, &t)
@@ -763,26 +754,36 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi
return nil
}
func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64,
task, logType string) (*allocdir.AllocFileInfo, int64, error) {
// indexTuple and indexTupleArray are used to find the correct log entry to
// start streaming logs from
type indexTuple struct {
idx int64
entry *allocdir.AllocFileInfo
}
if len(entries) == 0 {
return nil, 0, fmt.Errorf("no file entries found")
}
type indexTupleArray []indexTuple
func (a indexTupleArray) Len() int { return len(a) }
func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx }
func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// findClosest takes a list of entries, the desired log index and desired log
// offset (which can be negative, treated as offset from end), task name and log
// type and returns the log entry, the log index, the offset to read from and a
// potential error.
func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64,
task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) {
// Build the matching indexes
var indexes []indexTuple
prefix := fmt.Sprintf("%s.%s.", task, logType)
var closest *allocdir.AllocFileInfo
closestIdx := int64(math.MaxInt64)
closestDist := int64(math.MaxInt64)
for _, entry := range entries {
if entry.IsDir {
continue
}
idxStr := strings.TrimPrefix(entry.Name, prefix)
// If nothing was trimmed, then it is not a match
idxStr := strings.TrimPrefix(entry.Name, prefix)
if idxStr == entry.Name {
continue
}
@@ -790,25 +791,68 @@ func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64,
// Convert to an int
idx, err := strconv.Atoi(idxStr)
if err != nil {
return nil, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
return nil, 0, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
}
// Determine distance to desired
d := desiredIdx - int64(idx)
if d < 0 {
d *= -1
}
indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry})
}
if len(indexes) == 0 {
return nil, 0, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType)
}
// Binary search the indexes to get the desiredIdx
sort.Sort(indexTupleArray(indexes))
i := sort.Search(len(indexes), func(i int) bool { return indexes[i].idx >= desiredIdx })
l := len(indexes)
if i == l {
// Use the last index if the number is bigger than all of them.
i = l - 1
}
// Get to the correct offset
offset := desiredOffset
idx := int64(i)
for {
s := indexes[idx].entry.Size
// Base case
if offset == 0 {
break
} else if offset < 0 {
// Going backwards
if newOffset := s + offset; newOffset >= 0 {
// Current file works
offset = newOffset
break
} else if idx == 0 {
// Already at the end
offset = 0
break
} else {
// Try the file before
offset = newOffset
idx -= 1
continue
}
} else {
// Going forward
if offset <= s {
// Current file works
break
} else if idx == int64(l-1) {
// Already at the end
offset = s
break
} else {
// Try the next file
offset = offset - s
idx += 1
continue
}
if d <= closestDist && (int64(idx) < closestIdx || int64(idx) == desiredIdx) {
closestDist = d
closest = entry
closestIdx = int64(idx)
}
}
if closest == nil {
return nil, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType)
}
return closest, closestIdx, nil
return indexes[idx].entry, indexes[idx].idx, offset, nil
}

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"io"
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"os"
@@ -743,3 +744,208 @@ func TestHTTP_Stream_Delete(t *testing.T) {
})
}
func TestLogs_findClosest(t *testing.T) {
task := "foo"
entries := []*allocdir.AllocFileInfo{
{
Name: "foo.stdout.0",
Size: 100,
},
{
Name: "foo.stdout.1",
Size: 100,
},
{
Name: "foo.stdout.2",
Size: 100,
},
{
Name: "foo.stdout.3",
Size: 100,
},
{
Name: "foo.stderr.0",
Size: 100,
},
{
Name: "foo.stderr.1",
Size: 100,
},
{
Name: "foo.stderr.2",
Size: 100,
},
}
cases := []struct {
Entries []*allocdir.AllocFileInfo
DesiredIdx int64
DesiredOffset int64
Task string
LogType string
ExpectedFile string
ExpectedIdx int64
ExpectedOffset int64
Error bool
}{
// Test error cases
{
Entries: nil,
DesiredIdx: 0,
Task: task,
LogType: "stdout",
Error: true,
},
{
Entries: entries[0:3],
DesiredIdx: 0,
Task: task,
LogType: "stderr",
Error: true,
},
// Test begining cases
{
Entries: entries,
DesiredIdx: 0,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
},
{
// Desired offset should be ignored at edges
Entries: entries,
DesiredIdx: 0,
DesiredOffset: -100,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
ExpectedOffset: 0,
},
{
// Desired offset should be ignored at edges
Entries: entries,
DesiredIdx: 1,
DesiredOffset: -1000,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
ExpectedOffset: 0,
},
{
Entries: entries,
DesiredIdx: 0,
Task: task,
LogType: "stderr",
ExpectedFile: entries[4].Name,
ExpectedIdx: 0,
},
{
Entries: entries,
DesiredIdx: 0,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
},
// Test middle cases
{
Entries: entries,
DesiredIdx: 1,
Task: task,
LogType: "stdout",
ExpectedFile: entries[1].Name,
ExpectedIdx: 1,
},
{
Entries: entries,
DesiredIdx: 1,
DesiredOffset: 10,
Task: task,
LogType: "stdout",
ExpectedFile: entries[1].Name,
ExpectedIdx: 1,
ExpectedOffset: 10,
},
{
Entries: entries,
DesiredIdx: 1,
DesiredOffset: 110,
Task: task,
LogType: "stdout",
ExpectedFile: entries[2].Name,
ExpectedIdx: 2,
ExpectedOffset: 10,
},
{
Entries: entries,
DesiredIdx: 1,
Task: task,
LogType: "stderr",
ExpectedFile: entries[5].Name,
ExpectedIdx: 1,
},
// Test end cases
{
Entries: entries,
DesiredIdx: math.MaxInt64,
Task: task,
LogType: "stdout",
ExpectedFile: entries[3].Name,
ExpectedIdx: 3,
},
{
Entries: entries,
DesiredIdx: math.MaxInt64,
DesiredOffset: math.MaxInt64,
Task: task,
LogType: "stdout",
ExpectedFile: entries[3].Name,
ExpectedIdx: 3,
ExpectedOffset: 100,
},
{
Entries: entries,
DesiredIdx: math.MaxInt64,
DesiredOffset: -10,
Task: task,
LogType: "stdout",
ExpectedFile: entries[3].Name,
ExpectedIdx: 3,
ExpectedOffset: 90,
},
{
Entries: entries,
DesiredIdx: math.MaxInt64,
Task: task,
LogType: "stderr",
ExpectedFile: entries[6].Name,
ExpectedIdx: 2,
},
}
for i, c := range cases {
entry, idx, offset, err := findClosest(c.Entries, c.DesiredIdx, c.DesiredOffset, c.Task, c.LogType)
if err != nil {
if !c.Error {
t.Fatalf("case %d: Unexpected error: %v", i, err)
}
continue
}
if entry.Name != c.ExpectedFile {
t.Fatalf("case %d: Got file %q; want %q", i, entry.Name, c.ExpectedFile)
}
if idx != c.ExpectedIdx {
t.Fatalf("case %d: Got index %d; want %d", i, idx, c.ExpectedIdx)
}
if offset != c.ExpectedOffset {
t.Fatalf("case %d: Got offset %d; want %d", i, offset, c.ExpectedOffset)
}
}
}

View File

@@ -159,7 +159,7 @@ func (l *LogsCommand) Run(args []string) int {
var r io.ReadCloser
var readErr error
if !tail {
r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0, -1)
r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0)
if readErr != nil {
readErr = fmt.Errorf("Error reading file: %v", readErr)
}
@@ -178,7 +178,7 @@ func (l *LogsCommand) Run(args []string) int {
numLines = defaultTailLines
}
r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset, numLines)
r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset)
// If numLines is set, wrap the reader
if numLines != -1 {
@@ -201,9 +201,9 @@ func (l *LogsCommand) Run(args []string) int {
}
// followFile outputs the contents of the file to stdout relative to the end of
// the file. If numLines does not equal -1, then tail -n behavior is used.
// the file.
func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation,
task, logType, origin string, offset, numLines int64) (io.ReadCloser, error) {
task, logType, origin string, offset int64) (io.ReadCloser, error) {
cancel := make(chan struct{})
frames, _, err := client.AllocFS().Logs(alloc, task, logType, origin, offset, cancel, nil)
@@ -218,11 +218,6 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation,
frameReader := api.NewFrameReader(frames, cancel)
r = frameReader
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
}
go func() {
<-signalCh