Merge pull request #1444 from hashicorp/f-log-streaming

Nomad logs command to stream task logs
This commit is contained in:
Alex Dadgar
2016-07-25 11:19:53 -07:00
committed by GitHub
15 changed files with 1606 additions and 118 deletions

149
api/fs.go
View File

@@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"
)
@@ -204,6 +205,7 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error {
// * path: path to file to stream.
// * offset: The offset to start streaming data at.
// * origin: Either "start" or "end" and defines from where the offset is applied.
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
@@ -275,19 +277,101 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
return frames, nil, nil
}
// Logs streams the content of a tasks logs blocking on EOF.
// The parameters are:
// * allocation: the allocation to stream from.
// * follow: Whether the logs should be followed.
// * task: the tasks name to stream logs for.
// * logType: Either "stdout" or "stderr"
// * origin: Either "start" or "end" and defines from where the offset is applied.
// * offset: The offset to start streaming data at.
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
return nil, nil, err
}
if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID),
}
v := url.Values{}
v.Set("follow", strconv.FormatBool(follow))
v.Set("task", task)
v.Set("type", logType)
v.Set("origin", origin)
v.Set("offset", strconv.FormatInt(offset, 10))
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
Cancel: cancel,
}
c := http.Client{}
resp, err := c.Do(req)
if err != nil {
return nil, nil, err
}
// Create the output channel
frames := make(chan *StreamFrame, 10)
go func() {
// Close the body
defer resp.Body.Close()
// Create a decoder
dec := json.NewDecoder(resp.Body)
for {
// Check if we have been cancelled
select {
case <-cancel:
return
default:
}
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
close(frames)
return
}
// Discard heartbeat frames
if frame.IsHeartbeat() {
continue
}
frames <- &frame
}
}()
return frames, nil, nil
}
// FrameReader is used to convert a stream of frames into a read closer.
type FrameReader struct {
frames <-chan *StreamFrame
cancelCh chan struct{}
closed bool
closedLock sync.Mutex
closed bool
unblockTime time.Duration
frame *StreamFrame
frameOffset int
// To handle printing the file events
fileEventOffset int
fileEvent []byte
byteOffset int
}
@@ -300,6 +384,12 @@ func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameRe
}
}
// SetUnblockTime sets the time to unblock and return zero bytes read. If the
// duration is unset or is zero or less, the read will block til data is read.
func (f *FrameReader) SetUnblockTime(d time.Duration) {
f.unblockTime = d
}
// Offset returns the offset into the stream.
func (f *FrameReader) Offset() int {
return f.byteOffset
@@ -308,32 +398,33 @@ func (f *FrameReader) Offset() int {
// Read reads the data of the incoming frames into the bytes buffer. Returns EOF
// when there are no more frames.
func (f *FrameReader) Read(p []byte) (n int, err error) {
f.closedLock.Lock()
closed := f.closed
f.closedLock.Unlock()
if closed {
return 0, io.EOF
}
if f.frame == nil {
frame, ok := <-f.frames
if !ok {
var unblock <-chan time.Time
if f.unblockTime.Nanoseconds() > 0 {
unblock = time.After(f.unblockTime)
}
select {
case frame, ok := <-f.frames:
if !ok {
return 0, io.EOF
}
f.frame = frame
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
case <-unblock:
return 0, nil
case <-f.cancelCh:
return 0, io.EOF
}
f.frame = frame
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
}
if f.frame.FileEvent != "" && len(f.fileEvent) == 0 {
f.fileEvent = []byte(fmt.Sprintf("\nnomad: %q\n", f.frame.FileEvent))
f.fileEventOffset = 0
}
// If there is a file event we inject it into the read stream
if l := len(f.fileEvent); l != 0 && l != f.fileEventOffset {
n = copy(p, f.fileEvent[f.fileEventOffset:])
f.fileEventOffset += n
return n, nil
}
if len(f.fileEvent) == f.fileEventOffset {
f.fileEvent = nil
f.fileEventOffset = 0
}
// Copy the data out of the frame and update our offset
@@ -351,6 +442,8 @@ func (f *FrameReader) Read(p []byte) (n int, err error) {
// Close cancels the stream of frames
func (f *FrameReader) Close() error {
f.closedLock.Lock()
defer f.closedLock.Unlock()
if f.closed {
return nil
}

View File

@@ -4,6 +4,7 @@ import (
"io"
"reflect"
"testing"
"time"
)
func TestFS_FrameReader(t *testing.T) {
@@ -73,3 +74,39 @@ func TestFS_FrameReader(t *testing.T) {
t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected))
}
}
func TestFS_FrameReader_Unblock(t *testing.T) {
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
cancelCh := make(chan struct{})
r := NewFrameReader(framesCh, cancelCh)
r.SetUnblockTime(10 * time.Millisecond)
// Read a little
p := make([]byte, 12)
n, err := r.Read(p)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
if n != 0 {
t.Fatalf("should have unblocked")
}
// Unset the unblock
r.SetUnblockTime(0)
resultCh := make(chan struct{})
go func() {
r.Read(p)
close(resultCh)
}()
select {
case <-resultCh:
t.Fatalf("shouldn't have unblocked")
case <-time.After(300 * time.Millisecond):
}
}

View File

@@ -61,7 +61,7 @@ type AllocDirFS interface {
List(path string) ([]*AllocFileInfo, error)
Stat(path string) (*AllocFileInfo, error)
ReadAt(path string, offset int64) (io.ReadCloser, error)
BlockUntilExists(path string, t *tomb.Tomb) error
BlockUntilExists(path string, t *tomb.Tomb) chan error
ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error)
}
@@ -343,11 +343,16 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
// BlockUntilExists blocks until the passed file relative the allocation
// directory exists. The block can be cancelled with the passed tomb.
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) error {
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) chan error {
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
watcher := getFileWatcher(p)
return watcher.BlockUntilExists(t)
returnCh := make(chan error, 1)
go func() {
returnCh <- watcher.BlockUntilExists(t)
close(returnCh)
}()
return returnCh
}
// ChangeEvents watches for changes to the passed path relative to the

View File

@@ -4,10 +4,16 @@ import (
"bytes"
"fmt"
"io"
"math"
"net"
"net/http"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"
"gopkg.in/tomb.v1"
@@ -21,6 +27,8 @@ import (
var (
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
fileNameNotPresentErr = fmt.Errorf("must provide a file name")
taskNotPresentErr = fmt.Errorf("must provide task name")
logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
invalidOrigin = fmt.Errorf("origin must be start or end")
)
@@ -31,14 +39,28 @@ const (
// streamHeartbeatRate is the rate at which a heartbeat will occur to detect
// a closed connection without sending any additional data
streamHeartbeatRate = 10 * time.Second
streamHeartbeatRate = 1 * time.Second
// streamBatchWindow is the window in which file content is batched before
// being flushed if the frame size has not been hit.
streamBatchWindow = 200 * time.Millisecond
// nextLogCheckRate is the rate at which we check for a log entry greater
// than what we are watching for. This is to handle the case in which logs
// rotate faster than we can detect and we have to rely on a normal
// directory listing.
nextLogCheckRate = 100 * time.Millisecond
// deleteEvent and truncateEvent are the file events that can be sent in a
// StreamFrame
deleteEvent = "file deleted"
truncateEvent = "file truncated"
// OriginStart and OriginEnd are the available parameters for the origin
// argument when streaming a file. They respectively offset from the start
// and end of a file.
OriginStart = "start"
OriginEnd = "end"
)
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@@ -58,6 +80,8 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int
return s.FileCatRequest(resp, req)
case strings.HasPrefix(path, "stream/"):
return s.Stream(resp, req)
case strings.HasPrefix(path, "logs/"):
return s.Logs(resp, req)
default:
return nil, CodedError(404, ErrInvalidMethod)
}
@@ -223,7 +247,7 @@ type StreamFramer struct {
// Captures whether the framer is running and any error that occurred to
// cause it to stop.
running bool
err error
Err error
}
// NewStreamFramer creates a new stream framer that will output StreamFrames to
@@ -252,18 +276,16 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio
// Destroy is used to cleanup the StreamFramer and flush any pending frames
func (s *StreamFramer) Destroy() {
s.l.Lock()
wasRunning := s.running
s.running = false
s.f = nil
close(s.shutdownCh)
s.heartbeat.Stop()
s.flusher.Stop()
s.l.Unlock()
// Ensure things were flushed
if wasRunning {
if s.running {
<-s.exitCh
}
s.out.Close()
}
// Run starts a long lived goroutine that handles sending data as well as
@@ -290,11 +312,12 @@ func (s *StreamFramer) run() {
// Store any error and mark it as not running
var err error
defer func() {
s.l.Lock()
s.err = err
s.out.Close()
close(s.exitCh)
s.l.Lock()
close(s.outbound)
s.Err = err
s.running = false
s.l.Unlock()
}()
@@ -303,6 +326,8 @@ func (s *StreamFramer) run() {
go func() {
for {
select {
case <-s.exitCh:
return
case <-s.shutdownCh:
return
case <-s.flusher.C:
@@ -315,13 +340,18 @@ func (s *StreamFramer) run() {
// Read the data for the frame, and send it
s.f.Data = s.readData()
s.outbound <- s.f
s.f = nil
select {
case s.outbound <- s.f:
s.f = nil
case <-s.exitCh:
}
s.l.Unlock()
case <-s.heartbeat.C:
// Send a heartbeat frame
s.outbound <- &StreamFrame{}
select {
case s.outbound <- &StreamFrame{}:
default:
}
}
}
}()
@@ -332,7 +362,7 @@ OUTER:
case <-s.shutdownCh:
break OUTER
case o := <-s.outbound:
// Send the frame and then clear the current working frame
// Send the frame
if err = s.enc.Encode(o); err != nil {
return
}
@@ -340,21 +370,25 @@ OUTER:
}
// Flush any existing frames
s.l.Lock()
defer s.l.Unlock()
select {
case o := <-s.outbound:
// Send the frame and then clear the current working frame
if err = s.enc.Encode(o); err != nil {
return
FLUSH:
for {
select {
case o := <-s.outbound:
// Send the frame and then clear the current working frame
if err = s.enc.Encode(o); err != nil {
return
}
default:
break FLUSH
}
default:
}
s.l.Lock()
if s.f != nil {
s.f.Data = s.readData()
s.enc.Encode(s.f)
}
s.l.Unlock()
}
// readData is a helper which reads the buffered data returning up to the frame
@@ -369,7 +403,10 @@ func (s *StreamFramer) readData() []byte {
if size == 0 {
return nil
}
return s.data.Next(size)
d := s.data.Next(size)
b := make([]byte, size)
copy(b, d)
return b
}
// Send creates and sends a StreamFrame based on the passed parameters. An error
@@ -382,8 +419,8 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
// If we are not running, return the error that caused us to not run or
// indicated that it was never started.
if !s.running {
if s.err != nil {
return s.err
if s.Err != nil {
return s.Err
}
return fmt.Errorf("StreamFramer not running")
}
@@ -391,13 +428,14 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
// Check if not mergeable
if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame
s.outbound <- &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
Data: s.readData(),
f := *s.f
f.Data = s.readData()
select {
case <-s.exitCh:
return nil
case s.outbound <- &f:
s.f = nil
}
s.f = nil
}
// Store the new data as the current frame.
@@ -414,21 +452,30 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
// Handle the delete case in which there is no data
if s.data.Len() == 0 && s.f.FileEvent != "" {
s.outbound <- &StreamFrame{
select {
case <-s.exitCh:
return nil
case s.outbound <- &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
}:
}
}
// Flush till we are under the max frame size
for s.data.Len() >= s.frameSize {
// Create a new frame to send it
s.outbound <- &StreamFrame{
d := s.readData()
select {
case <-s.exitCh:
return nil
case s.outbound <- &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
Data: s.readData(),
Data: d,
}:
}
}
@@ -499,10 +546,26 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
return nil, s.stream(offset, path, fs, output)
// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
err = s.stream(offset, path, fs, framer, nil)
if err != nil && err != syscall.EPIPE {
return nil, err
}
return nil, nil
}
func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, output io.WriteCloser) error {
// stream is the internal method to stream the content of a file. eofCancelCh is
// used to cancel the stream if triggered while at EOF. If the connection is
// broken an EPIPE error is returned
func (s *HTTPServer) stream(offset int64, path string,
fs allocdir.AllocDirFS, framer *StreamFramer,
eofCancelCh chan error) error {
// Get the reader
f, err := fs.ReadAt(path, offset)
if err != nil {
@@ -517,11 +580,6 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o
t.Done()
}()
// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Create a variable to allow setting the last event
var lastEvent string
@@ -547,6 +605,22 @@ OUTER:
// Send the frame
if n != 0 {
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
// Check if the connection has been closed
if err == io.ErrClosedPipe {
// The pipe check is for tests
return syscall.EPIPE
}
operr, ok := err.(*net.OpError)
if ok {
// The connection was closed by our peer
e := operr.Err.Error()
if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
}
return err
}
}
@@ -595,9 +669,340 @@ OUTER:
continue OUTER
case <-framer.ExitCh():
return nil
case err, ok := <-eofCancelCh:
if !ok {
return nil
}
return err
}
}
}
return nil
}
// Logs streams the content of a log blocking on EOF. The parameters are:
// * task: task name to stream logs for.
// * type: stdout/stderr to stream.
// * follow: A boolean of whether to follow the logs.
// * offset: The offset to start streaming data at, defaults to zero.
// * origin: Either "start" or "end" and defines from where the offset is
// applied. Defaults to "start".
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, task, logType string
var follow bool
var err error
q := req.URL.Query()
if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" {
return nil, allocIDNotPresentErr
}
if task = q.Get("task"); task == "" {
return nil, taskNotPresentErr
}
if follow, err = strconv.ParseBool(q.Get("follow")); err != nil {
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
}
logType = q.Get("type")
switch logType {
case "stdout", "stderr":
default:
return nil, logTypeNotPresentErr
}
var offset int64
offsetString := q.Get("offset")
if offsetString != "" {
var err error
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
}
}
origin := q.Get("origin")
switch origin {
case "start", "end":
case "":
origin = "start"
default:
return nil, invalidOrigin
}
fs, err := s.agent.client.GetAllocFS(allocID)
if err != nil {
return nil, err
}
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
return nil, s.logs(follow, offset, origin, task, logType, fs, output)
}
func (s *HTTPServer) logs(follow bool, offset int64,
origin, task, logType string,
fs allocdir.AllocDirFS, output io.WriteCloser) error {
// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Path to the logs
logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName)
// nextIdx is the next index to read logs from
var nextIdx int64
switch origin {
case "start":
nextIdx = 0
case "end":
nextIdx = math.MaxInt64
offset *= -1
default:
return invalidOrigin
}
// Create a tomb to cancel watch events
t := tomb.Tomb{}
defer func() {
t.Kill(nil)
t.Done()
}()
for {
// Logic for picking next file is:
// 1) List log files
// 2) Pick log file closest to desired index
// 3) Open log file at correct offset
// 3a) No error, read contents
// 3b) If file doesn't exist, goto 1 as it may have been rotated out
entries, err := fs.List(logPath)
if err != nil {
return fmt.Errorf("failed to list entries: %v", err)
}
// If we are not following logs, determine the max index for the logs we are
// interested in so we can stop there.
maxIndex := int64(math.MaxInt64)
if !follow {
_, idx, _, err := findClosest(entries, maxIndex, 0, task, logType)
if err != nil {
return err
}
maxIndex = idx
}
logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType)
if err != nil {
return err
}
var eofCancelCh chan error
exitAfter := false
if !follow && idx > maxIndex {
// Exceeded what was there initially so return
return nil
} else if !follow && idx == maxIndex {
// At the end
eofCancelCh = make(chan error)
close(eofCancelCh)
exitAfter = true
} else {
eofCancelCh = blockUntilNextLog(fs, &t, logPath, task, logType, idx+1)
}
p := filepath.Join(logPath, logEntry.Name)
err = s.stream(openOffset, p, fs, framer, eofCancelCh)
if err != nil {
// Check if there was an error where the file does not exist. That means
// it got rotated out from under us.
if os.IsNotExist(err) {
continue
}
// Check if the connection was closed
if err == syscall.EPIPE {
return nil
}
return fmt.Errorf("failed to stream %q: %v", p, err)
}
if exitAfter {
return nil
}
//Since we successfully streamed, update the overall offset/idx.
offset = int64(0)
nextIdx = idx + 1
}
return nil
}
// blockUntilNextLog returns a channel that will have data sent when the next
// log index or anything greater is created.
func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logType string, nextIndex int64) chan error {
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, nextIndex))
next := make(chan error, 1)
go func() {
eofCancelCh := fs.BlockUntilExists(nextPath, t)
scanCh := time.Tick(nextLogCheckRate)
for {
select {
case err := <-eofCancelCh:
next <- err
close(next)
return
case <-scanCh:
entries, err := fs.List(logPath)
if err != nil {
next <- fmt.Errorf("failed to list entries: %v", err)
close(next)
return
}
indexes, err := logIndexes(entries, task, logType)
if err != nil {
next <- err
close(next)
return
}
// Scan and see if there are any entries larger than what we are
// waiting for.
for _, entry := range indexes {
if entry.idx >= nextIndex {
next <- nil
close(next)
return
}
}
}
}
}()
return next
}
// indexTuple and indexTupleArray are used to find the correct log entry to
// start streaming logs from
type indexTuple struct {
idx int64
entry *allocdir.AllocFileInfo
}
type indexTupleArray []indexTuple
func (a indexTupleArray) Len() int { return len(a) }
func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx }
func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// logIndexes takes a set of entries and returns a indexTupleArray of
// the desired log file entries. If the indexes could not be determined, an
// error is returned.
func logIndexes(entries []*allocdir.AllocFileInfo, task, logType string) (indexTupleArray, error) {
var indexes []indexTuple
prefix := fmt.Sprintf("%s.%s.", task, logType)
for _, entry := range entries {
if entry.IsDir {
continue
}
// If nothing was trimmed, then it is not a match
idxStr := strings.TrimPrefix(entry.Name, prefix)
if idxStr == entry.Name {
continue
}
// Convert to an int
idx, err := strconv.Atoi(idxStr)
if err != nil {
return nil, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
}
indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry})
}
return indexTupleArray(indexes), nil
}
// findClosest takes a list of entries, the desired log index and desired log
// offset (which can be negative, treated as offset from end), task name and log
// type and returns the log entry, the log index, the offset to read from and a
// potential error.
func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64,
task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) {
// Build the matching indexes
indexes, err := logIndexes(entries, task, logType)
if err != nil {
return nil, 0, 0, err
}
if len(indexes) == 0 {
return nil, 0, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType)
}
// Binary search the indexes to get the desiredIdx
sort.Sort(indexTupleArray(indexes))
i := sort.Search(len(indexes), func(i int) bool { return indexes[i].idx >= desiredIdx })
l := len(indexes)
if i == l {
// Use the last index if the number is bigger than all of them.
i = l - 1
}
// Get to the correct offset
offset := desiredOffset
idx := int64(i)
for {
s := indexes[idx].entry.Size
// Base case
if offset == 0 {
break
} else if offset < 0 {
// Going backwards
if newOffset := s + offset; newOffset >= 0 {
// Current file works
offset = newOffset
break
} else if idx == 0 {
// Already at the end
offset = 0
break
} else {
// Try the file before
offset = newOffset
idx -= 1
continue
}
} else {
// Going forward
if offset <= s {
// Current file works
break
} else if idx == int64(l-1) {
// Already at the end
offset = s
break
} else {
// Try the next file
offset = offset - s
idx += 1
continue
}
}
}
return indexes[idx].entry, indexes[idx].idx, offset, nil
}

View File

@@ -1,18 +1,24 @@
package agent
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/testutil"
"github.com/pmezard/go-difflib/difflib"
"github.com/ugorji/go/codec"
)
@@ -302,6 +308,106 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
}
}
// This test checks that frames are received in order
func TestStreamFramer_Order(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 10)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, jsonHandle)
files := []string{"1", "2", "3", "4", "5"}
input := bytes.NewBuffer(make([]byte, 0, 100000))
for i := 0; i <= 1000; i++ {
str := strconv.Itoa(i) + ","
input.WriteString(str)
}
expected := bytes.NewBuffer(make([]byte, 0, 100000))
for _, _ = range files {
expected.Write(input.Bytes())
}
receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000))
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
continue
}
receivedBuf.Write(frame.Data)
if reflect.DeepEqual(expected, receivedBuf) {
resultCh <- struct{}{}
return
}
}
}()
// Send the data
b := input.Bytes()
shards := 10
each := len(b) / shards
for _, f := range files {
for i := 0; i < shards; i++ {
l, r := each*i, each*(i+1)
if i == shards-1 {
r = len(b)
}
if err := sf.Send(f, "", b[l:r], 0); err != nil {
t.Fatalf("Send() failed %v", err)
}
}
}
// Ensure we get data
select {
case <-resultCh:
case <-time.After(10 * bWindow):
got := receivedBuf.String()
want := expected.String()
diff := difflib.ContextDiff{
A: difflib.SplitLines(strings.Replace(got, ",", "\n", -1)),
B: difflib.SplitLines(strings.Replace(want, ",", "\n", -1)),
FromFile: "Got",
ToFile: "Want",
Context: 3,
Eol: "\n",
}
result, _ := difflib.GetContextDiffString(diff)
t.Fatalf(strings.Replace(result, "\t", " ", -1))
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(2 * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
func TestHTTP_Stream_MissingParams(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
@@ -347,6 +453,10 @@ func tempAllocDir(t *testing.T) *allocdir.AllocDir {
t.Fatalf("TempDir() failed: %v", err)
}
if err := os.Chmod(dir, 0777); err != nil {
t.Fatalf("failed to chmod dir: %v", err)
}
return allocdir.NewAllocDir(dir)
}
@@ -364,7 +474,11 @@ func TestHTTP_Stream_NoFile(t *testing.T) {
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
if err := s.Server.stream(0, "foo", ad, nopWriteCloser{ioutil.Discard}); err == nil {
framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
if err := s.Server.stream(0, "foo", ad, framer, nil); err == nil {
t.Fatalf("expected an error when streaming unknown file")
}
})
@@ -419,9 +533,13 @@ func TestHTTP_Stream_Modify(t *testing.T) {
t.Fatalf("write failed: %v", err)
}
framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Start streaming
go func() {
if err := s.Server.stream(0, streamFile, ad, w); err != nil {
if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
@@ -496,9 +614,13 @@ func TestHTTP_Stream_Truncate(t *testing.T) {
t.Fatalf("write failed: %v", err)
}
framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Start streaming
go func() {
if err := s.Server.stream(0, streamFile, ad, w); err != nil {
if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
@@ -595,9 +717,12 @@ func TestHTTP_Stream_Delete(t *testing.T) {
t.Fatalf("write failed: %v", err)
}
framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
// Start streaming
go func() {
if err := s.Server.stream(0, streamFile, ad, wrappedW); err != nil {
if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
@@ -615,6 +740,7 @@ func TestHTTP_Stream_Delete(t *testing.T) {
t.Fatalf("did not receive delete")
}
framer.Destroy()
testutil.WaitForResult(func() (bool, error) {
return wrappedW.Closed, nil
}, func(err error) {
@@ -623,3 +749,392 @@ func TestHTTP_Stream_Delete(t *testing.T) {
})
}
func TestHTTP_Logs_NoFollow(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Get a temp alloc dir and create the log dir
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
if err := os.MkdirAll(logDir, 0777); err != nil {
t.Fatalf("Failed to make log dir: %v", err)
}
// Create a series of log files in the temp dir
task := "foo"
logType := "stdout"
expected := []byte("012")
for i := 0; i < 3; i++ {
logFile := fmt.Sprintf("%s.%s.%d", task, logType, i)
logFilePath := filepath.Join(logDir, logFile)
err := ioutil.WriteFile(logFilePath, expected[i:i+1], 777)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
}
// Create a decoder
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
defer r.Close()
defer w.Close()
dec := codec.NewDecoder(r, jsonHandle)
var received []byte
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
if err == io.EOF {
t.Logf("EOF")
return
}
t.Fatalf("failed to decode: %v", err)
}
if frame.IsHeartbeat() {
continue
}
received = append(received, frame.Data...)
if reflect.DeepEqual(received, expected) {
close(resultCh)
return
}
}
}()
// Start streaming logs
go func() {
if err := s.Server.logs(false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
t.Fatalf("logs() failed: %v", err)
}
}()
select {
case <-resultCh:
case <-time.After(4 * streamBatchWindow):
t.Fatalf("did not receive data: got %q", string(received))
}
testutil.WaitForResult(func() (bool, error) {
return wrappedW.Closed, nil
}, func(err error) {
t.Fatalf("connection not closed")
})
})
}
func TestHTTP_Logs_Follow(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Get a temp alloc dir and create the log dir
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
if err := os.MkdirAll(logDir, 0777); err != nil {
t.Fatalf("Failed to make log dir: %v", err)
}
// Create a series of log files in the temp dir
task := "foo"
logType := "stdout"
expected := []byte("012345")
initialWrites := 3
writeToFile := func(index int, data []byte) {
logFile := fmt.Sprintf("%s.%s.%d", task, logType, index)
logFilePath := filepath.Join(logDir, logFile)
err := ioutil.WriteFile(logFilePath, data, 777)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
}
for i := 0; i < initialWrites; i++ {
writeToFile(i, expected[i:i+1])
}
// Create a decoder
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
defer r.Close()
defer w.Close()
dec := codec.NewDecoder(r, jsonHandle)
var received []byte
// Start the reader
firstResultCh := make(chan struct{})
fullResultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
if err == io.EOF {
t.Logf("EOF")
return
}
t.Fatalf("failed to decode: %v", err)
}
if frame.IsHeartbeat() {
continue
}
received = append(received, frame.Data...)
if reflect.DeepEqual(received, expected[:initialWrites]) {
close(firstResultCh)
} else if reflect.DeepEqual(received, expected) {
close(fullResultCh)
return
}
}
}()
// Start streaming logs
go func() {
if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
t.Fatalf("logs() failed: %v", err)
}
}()
select {
case <-firstResultCh:
case <-time.After(4 * streamBatchWindow):
t.Fatalf("did not receive data: got %q", string(received))
}
// We got the first chunk of data, write out the rest to the next file
// at an index much ahead to check that it is following and detecting
// skips
skipTo := initialWrites + 10
writeToFile(skipTo, expected[initialWrites:])
select {
case <-fullResultCh:
case <-time.After(4 * streamBatchWindow):
t.Fatalf("did not receive data: got %q", string(received))
}
// Close the reader
r.Close()
testutil.WaitForResult(func() (bool, error) {
return wrappedW.Closed, nil
}, func(err error) {
t.Fatalf("connection not closed")
})
})
}
func TestLogs_findClosest(t *testing.T) {
task := "foo"
entries := []*allocdir.AllocFileInfo{
{
Name: "foo.stdout.0",
Size: 100,
},
{
Name: "foo.stdout.1",
Size: 100,
},
{
Name: "foo.stdout.2",
Size: 100,
},
{
Name: "foo.stdout.3",
Size: 100,
},
{
Name: "foo.stderr.0",
Size: 100,
},
{
Name: "foo.stderr.1",
Size: 100,
},
{
Name: "foo.stderr.2",
Size: 100,
},
}
cases := []struct {
Entries []*allocdir.AllocFileInfo
DesiredIdx int64
DesiredOffset int64
Task string
LogType string
ExpectedFile string
ExpectedIdx int64
ExpectedOffset int64
Error bool
}{
// Test error cases
{
Entries: nil,
DesiredIdx: 0,
Task: task,
LogType: "stdout",
Error: true,
},
{
Entries: entries[0:3],
DesiredIdx: 0,
Task: task,
LogType: "stderr",
Error: true,
},
// Test begining cases
{
Entries: entries,
DesiredIdx: 0,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
},
{
// Desired offset should be ignored at edges
Entries: entries,
DesiredIdx: 0,
DesiredOffset: -100,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
ExpectedOffset: 0,
},
{
// Desired offset should be ignored at edges
Entries: entries,
DesiredIdx: 1,
DesiredOffset: -1000,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
ExpectedOffset: 0,
},
{
Entries: entries,
DesiredIdx: 0,
Task: task,
LogType: "stderr",
ExpectedFile: entries[4].Name,
ExpectedIdx: 0,
},
{
Entries: entries,
DesiredIdx: 0,
Task: task,
LogType: "stdout",
ExpectedFile: entries[0].Name,
ExpectedIdx: 0,
},
// Test middle cases
{
Entries: entries,
DesiredIdx: 1,
Task: task,
LogType: "stdout",
ExpectedFile: entries[1].Name,
ExpectedIdx: 1,
},
{
Entries: entries,
DesiredIdx: 1,
DesiredOffset: 10,
Task: task,
LogType: "stdout",
ExpectedFile: entries[1].Name,
ExpectedIdx: 1,
ExpectedOffset: 10,
},
{
Entries: entries,
DesiredIdx: 1,
DesiredOffset: 110,
Task: task,
LogType: "stdout",
ExpectedFile: entries[2].Name,
ExpectedIdx: 2,
ExpectedOffset: 10,
},
{
Entries: entries,
DesiredIdx: 1,
Task: task,
LogType: "stderr",
ExpectedFile: entries[5].Name,
ExpectedIdx: 1,
},
// Test end cases
{
Entries: entries,
DesiredIdx: math.MaxInt64,
Task: task,
LogType: "stdout",
ExpectedFile: entries[3].Name,
ExpectedIdx: 3,
},
{
Entries: entries,
DesiredIdx: math.MaxInt64,
DesiredOffset: math.MaxInt64,
Task: task,
LogType: "stdout",
ExpectedFile: entries[3].Name,
ExpectedIdx: 3,
ExpectedOffset: 100,
},
{
Entries: entries,
DesiredIdx: math.MaxInt64,
DesiredOffset: -10,
Task: task,
LogType: "stdout",
ExpectedFile: entries[3].Name,
ExpectedIdx: 3,
ExpectedOffset: 90,
},
{
Entries: entries,
DesiredIdx: math.MaxInt64,
Task: task,
LogType: "stderr",
ExpectedFile: entries[6].Name,
ExpectedIdx: 2,
},
}
for i, c := range cases {
entry, idx, offset, err := findClosest(c.Entries, c.DesiredIdx, c.DesiredOffset, c.Task, c.LogType)
if err != nil {
if !c.Error {
t.Fatalf("case %d: Unexpected error: %v", i, err)
}
continue
}
if entry.Name != c.ExpectedFile {
t.Fatalf("case %d: Got file %q; want %q", i, entry.Name, c.ExpectedFile)
}
if idx != c.ExpectedIdx {
t.Fatalf("case %d: Got index %d; want %d", i, idx, c.ExpectedIdx)
}
if offset != c.ExpectedOffset {
t.Fatalf("case %d: Got offset %d; want %d", i, offset, c.ExpectedOffset)
}
}
}

View File

@@ -60,7 +60,7 @@ func NewHTTPServer(agent *Agent, config *Config, logOutput io.Writer) (*HTTPServ
agent: agent,
mux: mux,
listener: ln,
logger: log.New(logOutput, "", log.LstdFlags),
logger: agent.logger,
addr: ln.Addr().String(),
}
srv.registerHandlers(config.EnableDebug)

View File

@@ -50,7 +50,7 @@ FS Specific Options:
Show full information.
-job <job-id>
Use a random allocation from a specified job-id.
Use a random allocation from the specified job ID.
-stat
Show file stat information instead of displaying the file, or listing the directory.
@@ -60,12 +60,12 @@ FS Specific Options:
rather to wait for additional output.
-tail
Show the files contents with offsets relative to the end of the file. If no
offset is given, -n is defaulted to 10.
Show the files contents with offsets relative to the end of the file. If no
offset is given, -n is defaulted to 10.
-n
Sets the tail location in best-efforted number of lines relative to the end
of the file.
Sets the tail location in best-efforted number of lines relative to the end
of the file.
-c
Sets the tail location in number of bytes relative to the end of the file.
@@ -81,7 +81,7 @@ func (f *FSCommand) Run(args []string) int {
var verbose, machine, job, stat, tail, follow bool
var numLines, numBytes int64
flags := f.Meta.FlagSet("fs-list", FlagSetClient)
flags := f.Meta.FlagSet("fs", FlagSetClient)
flags.Usage = func() { f.Ui.Output(f.Help()) }
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&machine, "H", false, "")
@@ -177,14 +177,6 @@ func (f *FSCommand) Run(args []string) int {
return 1
}
if alloc.DesiredStatus == "failed" {
allocID := limit(alloc.ID, length)
msg := fmt.Sprintf(`The allocation %q failed to be placed. To see the cause, run:
nomad alloc-status %s`, allocID, allocID)
f.Ui.Error(msg)
return 0
}
// Get file stat info
file, _, err := client.AllocFS().Stat(alloc, path, nil)
if err != nil {
@@ -290,7 +282,7 @@ nomad alloc-status %s`, allocID, allocID)
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0)
}
}
@@ -329,7 +321,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines))
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0)
}
go func() {
@@ -337,9 +329,6 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
// End the streaming
r.Close()
// Output the last offset
f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset()))
}()
return r, nil

View File

@@ -105,17 +105,26 @@ type LineLimitReader struct {
lines int
searchLimit int
timeLimit time.Duration
lastRead time.Time
buffer *bytes.Buffer
bufFiled bool
foundLines bool
}
// NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find
// searching backwards in the first searchLimit bytes.
func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader {
// searching backwards in the first searchLimit bytes. timeLimit can optionally
// be specified by passing a non-zero duration. When set, the search for the
// last n lines is aborted if no data has been read in the duration. This
// can be used to flush what is had if no extra data is being received. When
// used, the underlying reader must not block forever and must periodically
// unblock even when no data has been read.
func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int, timeLimit time.Duration) *LineLimitReader {
return &LineLimitReader{
ReadCloser: r,
searchLimit: searchLimit,
timeLimit: timeLimit,
lines: lines,
buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)),
}
@@ -124,14 +133,52 @@ func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReade
func (l *LineLimitReader) Read(p []byte) (n int, err error) {
// Fill up the buffer so we can find the correct number of lines.
if !l.bufFiled {
_, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit)))
if err != nil {
return 0, err
b := make([]byte, len(p))
n, err := l.ReadCloser.Read(b)
if n > 0 {
if _, err := l.buffer.Write(b[:n]); err != nil {
return 0, err
}
}
l.bufFiled = true
if err != nil {
if err != io.EOF {
return 0, err
}
l.bufFiled = true
goto READ
}
if l.buffer.Len() >= l.searchLimit {
l.bufFiled = true
goto READ
}
if l.timeLimit.Nanoseconds() > 0 {
if l.lastRead.IsZero() {
l.lastRead = time.Now()
return 0, nil
}
now := time.Now()
if n == 0 {
// We hit the limit
if l.lastRead.Add(l.timeLimit).Before(now) {
l.bufFiled = true
goto READ
} else {
return 0, nil
}
} else {
l.lastRead = now
}
}
return 0, nil
}
READ:
if l.bufFiled && l.buffer.Len() != 0 {
b := l.buffer.Bytes()

View File

@@ -1,9 +1,12 @@
package command
import (
"io"
"io/ioutil"
"reflect"
"strings"
"testing"
"time"
"github.com/mitchellh/cli"
)
@@ -48,7 +51,7 @@ func TestHelpers_NodeID(t *testing.T) {
}
}
func TestHelpers_LineLimitReader(t *testing.T) {
func TestHelpers_LineLimitReader_NoTimeLimit(t *testing.T) {
helloString := `hello
world
this
@@ -114,7 +117,7 @@ test`,
for i, c := range cases {
in := ioutil.NopCloser(strings.NewReader(c.Input))
limit := NewLineLimitReader(in, c.Lines, c.SearchLimit)
limit := NewLineLimitReader(in, c.Lines, c.SearchLimit, 0)
outBytes, err := ioutil.ReadAll(limit)
if err != nil {
t.Fatalf("case %d failed: %v", i, err)
@@ -126,3 +129,59 @@ test`,
}
}
}
type testReadCloser struct {
data chan []byte
}
func (t *testReadCloser) Read(p []byte) (n int, err error) {
select {
case b, ok := <-t.data:
if !ok {
return 0, io.EOF
}
return copy(p, b), nil
case <-time.After(10 * time.Millisecond):
return 0, nil
}
}
func (t *testReadCloser) Close() error {
close(t.data)
return nil
}
func TestHelpers_LineLimitReader_TimeLimit(t *testing.T) {
// Create the test reader
in := &testReadCloser{data: make(chan []byte)}
// Set up the reader such that it won't hit the line/buffer limit and could
// only terminate if it hits the time limit
limit := NewLineLimitReader(in, 1000, 1000, 100*time.Millisecond)
expected := []byte("hello world")
resultCh := make(chan struct{})
go func() {
outBytes, err := ioutil.ReadAll(limit)
if err != nil {
t.Fatalf("ReadAll failed: %v", err)
}
if reflect.DeepEqual(outBytes, expected) {
close(resultCh)
return
}
}()
// Send the data
in.data <- expected
in.Close()
select {
case <-resultCh:
case <-time.After(1 * time.Second):
t.Fatalf("did not exit by time limit")
}
}

268
command/logs.go Normal file
View File

@@ -0,0 +1,268 @@
package command
import (
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/hashicorp/nomad/api"
)
type LogsCommand struct {
Meta
}
func (l *LogsCommand) Help() string {
helpText := `
Usage: nomad logs [options] <alloc-id> <task>
Streams the stdout/stderr of the given allocation and task.
General Options:
` + generalOptionsUsage() + `
Logs Specific Options:
-verbose
Show full information.
-job <job-id>
Use a random allocation from the specified job ID.
-f
Causes the output to not stop when the end of the logs are reached, but
rather to wait for additional output.
-tail
Show the files contents with offsets relative to the end of the file. If no
offset is given, -n is defaulted to 10.
-n
Sets the tail location in best-efforted number of lines relative to the end
of the file.
-c
Sets the tail location in number of bytes relative to the end of the file.
`
return strings.TrimSpace(helpText)
}
func (l *LogsCommand) Synopsis() string {
return "Streams the logs of a task."
}
func (l *LogsCommand) Run(args []string) int {
var verbose, job, tail, stderr, follow bool
var numLines, numBytes int64
flags := l.Meta.FlagSet("logs", FlagSetClient)
flags.Usage = func() { l.Ui.Output(l.Help()) }
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&job, "job", false, "")
flags.BoolVar(&tail, "tail", false, "")
flags.BoolVar(&follow, "f", false, "")
flags.BoolVar(&stderr, "stderr", false, "")
flags.Int64Var(&numLines, "n", -1, "")
flags.Int64Var(&numBytes, "c", -1, "")
if err := flags.Parse(args); err != nil {
return 1
}
args = flags.Args()
if numArgs := len(args); numArgs < 1 {
if job {
l.Ui.Error("Job ID required. See help:\n")
} else {
l.Ui.Error("Allocation ID required. See help:\n")
}
l.Ui.Error(l.Help())
return 1
} else if numArgs > 2 {
l.Ui.Error(l.Help())
return 1
}
client, err := l.Meta.Client()
if err != nil {
l.Ui.Error(fmt.Sprintf("Error initializing client: %v", err))
return 1
}
// If -job is specified, use random allocation, otherwise use provided allocation
allocID := args[0]
if job {
allocID, err = getRandomJobAlloc(client, args[0])
if err != nil {
l.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err))
return 1
}
}
// Truncate the id unless full length is requested
length := shortId
if verbose {
length = fullId
}
// Query the allocation info
if len(allocID) == 1 {
l.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters."))
return 1
}
if len(allocID)%2 == 1 {
// Identifiers must be of even length, so we strip off the last byte
// to provide a consistent user experience.
allocID = allocID[:len(allocID)-1]
}
allocs, _, err := client.Allocations().PrefixList(allocID)
if err != nil {
l.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err))
return 1
}
if len(allocs) == 0 {
l.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID))
return 1
}
if len(allocs) > 1 {
// Format the allocs
out := make([]string, len(allocs)+1)
out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status"
for i, alloc := range allocs {
out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s",
limit(alloc.ID, length),
limit(alloc.EvalID, length),
alloc.JobID,
alloc.TaskGroup,
alloc.DesiredStatus,
alloc.ClientStatus,
)
}
l.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out)))
return 0
}
// Prefix lookup matched a single allocation
alloc, _, err := client.Allocations().Info(allocs[0].ID, nil)
if err != nil {
l.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err))
return 1
}
var task string
if len(args) >= 2 {
task = args[1]
if task == "" {
l.Ui.Error("Task name required")
return 1
}
} else {
// Try to determine the tasks name from the allocation
var tasks []*api.Task
for _, tg := range alloc.Job.TaskGroups {
if tg.Name == alloc.TaskGroup {
if len(tg.Tasks) == 1 {
task = tg.Tasks[0].Name
break
}
tasks = tg.Tasks
break
}
}
if task == "" {
l.Ui.Error(fmt.Sprintf("Allocation %q is running the following tasks:", limit(alloc.ID, length)))
for _, t := range tasks {
l.Ui.Error(fmt.Sprintf(" * %s", t.Name))
}
l.Ui.Error("\nPlease specify the task.")
return 1
}
}
logType := "stdout"
if stderr {
logType = "stderr"
}
// We have a file, output it.
var r io.ReadCloser
var readErr error
if !tail {
r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginStart, 0)
if readErr != nil {
readErr = fmt.Errorf("Error reading file: %v", readErr)
}
} else {
// Parse the offset
var offset int64 = defaultTailLines * bytesToLines
if nLines, nBytes := numLines != -1, numBytes != -1; nLines && nBytes {
l.Ui.Error("Both -n and -c set")
return 1
} else if nLines {
offset = numLines * bytesToLines
} else if nBytes {
offset = numBytes
} else {
numLines = defaultTailLines
}
r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginEnd, offset)
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second)
}
if readErr != nil {
readErr = fmt.Errorf("Error tailing file: %v", readErr)
}
}
defer r.Close()
if readErr != nil {
l.Ui.Error(readErr.Error())
return 1
}
io.Copy(os.Stdout, r)
return 0
}
// followFile outputs the contents of the file to stdout relative to the end of
// the file.
func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation,
follow bool, task, logType, origin string, offset int64) (io.ReadCloser, error) {
cancel := make(chan struct{})
frames, _, err := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil)
if err != nil {
panic(err.Error())
return nil, err
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, cancel)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader
go func() {
<-signalCh
// End the streaming
r.Close()
}()
return r, nil
}

65
command/logs_test.go Normal file
View File

@@ -0,0 +1,65 @@
package command
import (
"strings"
"testing"
"github.com/mitchellh/cli"
)
func TestLogsCommand_Implements(t *testing.T) {
var _ cli.Command = &LogsCommand{}
}
func TestLogsCommand_Fails(t *testing.T) {
srv, _, url := testServer(t, nil)
defer srv.Stop()
ui := new(cli.MockUi)
cmd := &LogsCommand{Meta: Meta{Ui: ui}}
// Fails on misuse
if code := cmd.Run([]string{"some", "bad", "args"}); code != 1 {
t.Fatalf("expected exit code 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, cmd.Help()) {
t.Fatalf("expected help output, got: %s", out)
}
ui.ErrorWriter.Reset()
// Fails on connection failure
if code := cmd.Run([]string{"-address=nope", "foobar"}); code != 1 {
t.Fatalf("expected exit code 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "Error querying allocation") {
t.Fatalf("expected failed query error, got: %s", out)
}
ui.ErrorWriter.Reset()
// Fails on missing alloc
if code := cmd.Run([]string{"-address=" + url, "26470238-5CF2-438F-8772-DC67CFB0705C"}); code != 1 {
t.Fatalf("expected exit 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "No allocation(s) with prefix or id") {
t.Fatalf("expected not found error, got: %s", out)
}
ui.ErrorWriter.Reset()
// Fail on identifier with too few characters
if code := cmd.Run([]string{"-address=" + url, "2"}); code != 1 {
t.Fatalf("expected exit 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "must contain at least two characters.") {
t.Fatalf("expected too few characters error, got: %s", out)
}
ui.ErrorWriter.Reset()
// Identifiers with uneven length should produce a query result
if code := cmd.Run([]string{"-address=" + url, "123"}); code != 1 {
t.Fatalf("expected exit 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "No allocation(s) with prefix or id") {
t.Fatalf("expected not found error, got: %s", out)
}
}

View File

@@ -63,8 +63,8 @@ General Options:
Plan Options:
-diff
Determines whether the diff between the remote job and planned job is shown.
Defaults to true.
Determines whether the diff between the remote job and planned job is shown.
Defaults to true.
-verbose
Increase diff verbosity.

View File

@@ -62,17 +62,17 @@ General Options:
Run Options:
-check-index
If set, the job is only registered or updated if the the passed
job modify index matches the server side version. If a check-index value of
zero is passed, the job is only registered if it does not yet exist. If a
non-zero value is passed, it ensures that the job is being updated from a
known state. The use of this flag is most common in conjunction with plan
command.
If set, the job is only registered or updated if the the passed
job modify index matches the server side version. If a check-index value of
zero is passed, the job is only registered if it does not yet exist. If a
non-zero value is passed, it ensures that the job is being updated from a
known state. The use of this flag is most common in conjunction with plan
command.
-detach
Return immediately instead of entering monitor mode. After job submission,
the evaluation ID will be printed to the screen, which can be used to
examine the evaluation using the eval-status command.
Return immediately instead of entering monitor mode. After job submission,
the evaluation ID will be printed to the screen, which can be used to
examine the evaluation using the eval-status command.
-verbose
Display full information.

View File

@@ -27,9 +27,9 @@ Stop Options:
-detach
Return immediately instead of entering monitor mode. After the
deregister command is submitted, a new evaluation ID is printed to the
screen, which can be used to examine the evaluation using the eval-status
command.
deregister command is submitted, a new evaluation ID is printed to the
screen, which can be used to examine the evaluation using the eval-status
command.
-yes
Automatic yes to prompts.

View File

@@ -79,6 +79,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"logs": func() (cli.Command, error) {
return &command.LogsCommand{
Meta: meta,
}, nil
},
"node-drain": func() (cli.Command, error) {
return &command.NodeDrainCommand{
Meta: meta,