mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
* Add MonitorExport command and handlers * Implement autocomplete * Require nomad in serviceName * Fix race in StreamReader.Read * Add and use framer.Flush() to coordinate function exit * Add LogFile to client/Server config and read NomadLogPath in rpcHandler instead of HTTPServer * Parameterize StreamFixed stream size
251 lines
6.1 KiB
Go
251 lines
6.1 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package monitor
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
|
|
"github.com/hashicorp/go-msgpack/v2/codec"
|
|
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
)
|
|
|
|
// StreamReader is used to process fixed length streams for consumers
|
|
// that rely on terminating the stream after hitting an EOF. The lock
|
|
// protects the buffer during reads
|
|
type StreamReader struct {
|
|
sync.Mutex
|
|
framer *sframer.StreamFramer
|
|
ch <-chan []byte
|
|
buf []byte
|
|
frameSize int64
|
|
}
|
|
|
|
// NewStreamReader takes a <-chan[]byte and *sframer.StreamFramer and returns
|
|
// a ready to use StreamReader that will allocate its buffer on first read
|
|
func NewStreamReader(ch <-chan []byte, framer *sframer.StreamFramer, frameSize int64) *StreamReader {
|
|
return &StreamReader{
|
|
ch: ch,
|
|
framer: framer,
|
|
frameSize: frameSize,
|
|
}
|
|
}
|
|
|
|
// Read reads stream data into the StreamReader's buffer and copies that
|
|
// data into p
|
|
func (r *StreamReader) Read(p []byte) (n int, err error) {
|
|
select {
|
|
case data, ok := <-r.ch:
|
|
if !ok && len(data) == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
r.Lock()
|
|
r.buf = data
|
|
default:
|
|
return 0, nil
|
|
}
|
|
|
|
n = copy(p, r.buf)
|
|
r.buf = r.buf[n:]
|
|
r.Unlock()
|
|
return n, nil
|
|
}
|
|
|
|
// StreamFixed streams any fixed length data stream. If limit is greater than
|
|
// zero, the stream will end once that many bytes have been read. If eofCancelCh
|
|
// is triggered while at EOF, read one more frame and cancel the stream on the
|
|
// next EOF. If the connection is broken an EPIPE error is returned.
|
|
func (r *StreamReader) StreamFixed(ctx context.Context, offset int64, path string, limit int64,
|
|
eofCancelCh chan error, cancelAfterFirstEof bool) error {
|
|
defer r.framer.Flush()
|
|
parseFramerErr := func(err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
errMsg := err.Error()
|
|
|
|
if strings.Contains(errMsg, io.ErrClosedPipe.Error()) {
|
|
// The pipe check is for tests
|
|
return syscall.EPIPE
|
|
}
|
|
|
|
// The connection was closed by our peer
|
|
if strings.Contains(errMsg, syscall.EPIPE.Error()) || strings.Contains(errMsg, syscall.ECONNRESET.Error()) {
|
|
return syscall.EPIPE
|
|
}
|
|
|
|
if strings.Contains(errMsg, "forcibly closed") {
|
|
return syscall.EPIPE
|
|
}
|
|
|
|
return err
|
|
}
|
|
// streamFrameSize is the maximum number of bytes to send in a single frame
|
|
streamFrameSize := r.frameSize
|
|
|
|
bufSize := streamFrameSize
|
|
if limit > 0 && limit < streamFrameSize {
|
|
bufSize = limit
|
|
}
|
|
streamBuffer := make([]byte, bufSize)
|
|
|
|
var lastEvent string
|
|
|
|
// Only watch file when there is a need for it
|
|
cancelReceived := cancelAfterFirstEof
|
|
|
|
OUTER:
|
|
for {
|
|
// Read up to the max frame size
|
|
n, readErr := r.Read(streamBuffer)
|
|
|
|
// Update the offset
|
|
offset += int64(n)
|
|
|
|
// Return non-EOF errors
|
|
if readErr != nil && readErr != io.EOF {
|
|
return readErr
|
|
}
|
|
|
|
// Send the frame
|
|
if n != 0 || lastEvent != "" {
|
|
if err := r.framer.Send(path, lastEvent, streamBuffer[:n], offset); err != nil {
|
|
return parseFramerErr(err)
|
|
}
|
|
}
|
|
|
|
// Clear the last event
|
|
if lastEvent != "" {
|
|
lastEvent = ""
|
|
}
|
|
|
|
// Just keep reading since we aren't at the end of the file so we can
|
|
// avoid setting up a file event watcher.
|
|
if readErr == nil {
|
|
continue
|
|
}
|
|
// At this point we can stop without waiting for more changes,
|
|
// because we have EOF and either we're not following at all,
|
|
// or we received an event from the eofCancelCh channel
|
|
// and last read was executed
|
|
if cancelReceived {
|
|
return nil
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-r.framer.ExitCh():
|
|
return nil
|
|
case <-ctx.Done():
|
|
return nil
|
|
case _, ok := <-eofCancelCh:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
cancelReceived = true
|
|
continue OUTER
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Destroy wraps the underlying framer's Destroy() call
|
|
func (r *StreamReader) Destroy() {
|
|
r.framer.Destroy()
|
|
}
|
|
|
|
// Run wraps the underlying framer's Run() call
|
|
func (r *StreamReader) Run() {
|
|
r.framer.Run()
|
|
}
|
|
|
|
// StreamEncoder consolidates logic used by monitor RPC handlers to encode and
|
|
// return stream data
|
|
type StreamEncoder struct {
|
|
buf *bytes.Buffer
|
|
conn io.ReadWriteCloser
|
|
encoder *codec.Encoder
|
|
frameCodec *codec.Encoder
|
|
plainText bool
|
|
}
|
|
|
|
// NewStreamEncoder takes buf *bytes.Buffer, conn io.ReadWriteCloser, encoder *codec.Encoder
|
|
// frameCodec *codec.Encoder,and plainText bool and returns a NewStreamEncoder
|
|
func NewStreamEncoder(buf *bytes.Buffer, conn io.ReadWriteCloser, encoder *codec.Encoder,
|
|
frameCodec *codec.Encoder, plainText bool) StreamEncoder {
|
|
return StreamEncoder{
|
|
buf: buf,
|
|
conn: conn,
|
|
encoder: encoder,
|
|
frameCodec: frameCodec,
|
|
plainText: plainText,
|
|
}
|
|
}
|
|
|
|
// EncodeStream reads and encodes data from a chan *sframer.Streamframe until the
|
|
// channel is closed. If eofCancel is true,EncodeStream continues to read from the closed
|
|
// channel until the underlying framer reports it has flushed it's final frame
|
|
func (s *StreamEncoder) EncodeStream(frames chan *sframer.StreamFrame,
|
|
errCh chan error, ctx context.Context, framer *sframer.StreamFramer,
|
|
eofCancel bool) (err error) {
|
|
var streamErr error
|
|
localFlush := false
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case frame, ok := <-frames:
|
|
if !ok {
|
|
// frame may have been closed when an error
|
|
// occurred. Check once more for an error.
|
|
select {
|
|
case streamErr = <-errCh:
|
|
return streamErr
|
|
// There was a pending error!
|
|
default:
|
|
// No error, continue on and let exitCh control breaking
|
|
}
|
|
// Confirm framer.Flush and localFlush if we're expecting EOF
|
|
if eofCancel {
|
|
_, ok := <-framer.ExitCh()
|
|
if !ok {
|
|
if framer.IsFlushed() && !localFlush {
|
|
localFlush = true
|
|
continue
|
|
} else if framer.IsFlushed() && localFlush {
|
|
break OUTER
|
|
}
|
|
}
|
|
} else {
|
|
break OUTER
|
|
}
|
|
}
|
|
|
|
var resp cstructs.StreamErrWrapper
|
|
if s.plainText {
|
|
resp.Payload = frame.Data
|
|
} else {
|
|
if err := s.frameCodec.Encode(frame); err != nil && err != io.EOF {
|
|
return err
|
|
}
|
|
|
|
resp.Payload = s.buf.Bytes()
|
|
s.buf.Reset()
|
|
}
|
|
if err := s.encoder.Encode(resp); err != nil {
|
|
return err
|
|
}
|
|
s.encoder.Reset(s.conn)
|
|
case <-ctx.Done():
|
|
break OUTER
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|