Files
nomad/command/agent/monitor/stream_helpers.go
tehut d709accaf5 Add nomad monitor export command (#26178)
* Add MonitorExport command and handlers
* Implement autocomplete
* Require nomad in serviceName
* Fix race in StreamReader.Read
* Add and use framer.Flush() to coordinate function exit
* Add LogFile to client/Server config and read NomadLogPath in rpcHandler instead of HTTPServer
* Parameterize StreamFixed stream size
2025-08-01 10:26:59 -07:00

251 lines
6.1 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package monitor
import (
"bytes"
"context"
"io"
"strings"
"sync"
"syscall"
"github.com/hashicorp/go-msgpack/v2/codec"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
)
// StreamReader is used to process fixed length streams for consumers
// that rely on terminating the stream after hitting an EOF. The lock
// protects the buffer during reads
type StreamReader struct {
sync.Mutex
framer *sframer.StreamFramer
ch <-chan []byte
buf []byte
frameSize int64
}
// NewStreamReader takes a <-chan[]byte and *sframer.StreamFramer and returns
// a ready to use StreamReader that will allocate its buffer on first read
func NewStreamReader(ch <-chan []byte, framer *sframer.StreamFramer, frameSize int64) *StreamReader {
return &StreamReader{
ch: ch,
framer: framer,
frameSize: frameSize,
}
}
// Read reads stream data into the StreamReader's buffer and copies that
// data into p
func (r *StreamReader) Read(p []byte) (n int, err error) {
select {
case data, ok := <-r.ch:
if !ok && len(data) == 0 {
return 0, io.EOF
}
r.Lock()
r.buf = data
default:
return 0, nil
}
n = copy(p, r.buf)
r.buf = r.buf[n:]
r.Unlock()
return n, nil
}
// StreamFixed streams any fixed length data stream. If limit is greater than
// zero, the stream will end once that many bytes have been read. If eofCancelCh
// is triggered while at EOF, read one more frame and cancel the stream on the
// next EOF. If the connection is broken an EPIPE error is returned.
func (r *StreamReader) StreamFixed(ctx context.Context, offset int64, path string, limit int64,
eofCancelCh chan error, cancelAfterFirstEof bool) error {
defer r.framer.Flush()
parseFramerErr := func(err error) error {
if err == nil {
return nil
}
errMsg := err.Error()
if strings.Contains(errMsg, io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}
// The connection was closed by our peer
if strings.Contains(errMsg, syscall.EPIPE.Error()) || strings.Contains(errMsg, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
if strings.Contains(errMsg, "forcibly closed") {
return syscall.EPIPE
}
return err
}
// streamFrameSize is the maximum number of bytes to send in a single frame
streamFrameSize := r.frameSize
bufSize := streamFrameSize
if limit > 0 && limit < streamFrameSize {
bufSize = limit
}
streamBuffer := make([]byte, bufSize)
var lastEvent string
// Only watch file when there is a need for it
cancelReceived := cancelAfterFirstEof
OUTER:
for {
// Read up to the max frame size
n, readErr := r.Read(streamBuffer)
// Update the offset
offset += int64(n)
// Return non-EOF errors
if readErr != nil && readErr != io.EOF {
return readErr
}
// Send the frame
if n != 0 || lastEvent != "" {
if err := r.framer.Send(path, lastEvent, streamBuffer[:n], offset); err != nil {
return parseFramerErr(err)
}
}
// Clear the last event
if lastEvent != "" {
lastEvent = ""
}
// Just keep reading since we aren't at the end of the file so we can
// avoid setting up a file event watcher.
if readErr == nil {
continue
}
// At this point we can stop without waiting for more changes,
// because we have EOF and either we're not following at all,
// or we received an event from the eofCancelCh channel
// and last read was executed
if cancelReceived {
return nil
}
for {
select {
case <-r.framer.ExitCh():
return nil
case <-ctx.Done():
return nil
case _, ok := <-eofCancelCh:
if !ok {
return nil
}
cancelReceived = true
continue OUTER
}
}
}
}
// Destroy wraps the underlying framer's Destroy() call
func (r *StreamReader) Destroy() {
r.framer.Destroy()
}
// Run wraps the underlying framer's Run() call
func (r *StreamReader) Run() {
r.framer.Run()
}
// StreamEncoder consolidates logic used by monitor RPC handlers to encode and
// return stream data
type StreamEncoder struct {
buf *bytes.Buffer
conn io.ReadWriteCloser
encoder *codec.Encoder
frameCodec *codec.Encoder
plainText bool
}
// NewStreamEncoder takes buf *bytes.Buffer, conn io.ReadWriteCloser, encoder *codec.Encoder
// frameCodec *codec.Encoder,and plainText bool and returns a NewStreamEncoder
func NewStreamEncoder(buf *bytes.Buffer, conn io.ReadWriteCloser, encoder *codec.Encoder,
frameCodec *codec.Encoder, plainText bool) StreamEncoder {
return StreamEncoder{
buf: buf,
conn: conn,
encoder: encoder,
frameCodec: frameCodec,
plainText: plainText,
}
}
// EncodeStream reads and encodes data from a chan *sframer.Streamframe until the
// channel is closed. If eofCancel is true,EncodeStream continues to read from the closed
// channel until the underlying framer reports it has flushed it's final frame
func (s *StreamEncoder) EncodeStream(frames chan *sframer.StreamFrame,
errCh chan error, ctx context.Context, framer *sframer.StreamFramer,
eofCancel bool) (err error) {
var streamErr error
localFlush := false
OUTER:
for {
select {
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
return streamErr
// There was a pending error!
default:
// No error, continue on and let exitCh control breaking
}
// Confirm framer.Flush and localFlush if we're expecting EOF
if eofCancel {
_, ok := <-framer.ExitCh()
if !ok {
if framer.IsFlushed() && !localFlush {
localFlush = true
continue
} else if framer.IsFlushed() && localFlush {
break OUTER
}
}
} else {
break OUTER
}
}
var resp cstructs.StreamErrWrapper
if s.plainText {
resp.Payload = frame.Data
} else {
if err := s.frameCodec.Encode(frame); err != nil && err != io.EOF {
return err
}
resp.Payload = s.buf.Bytes()
s.buf.Reset()
}
if err := s.encoder.Encode(resp); err != nil {
return err
}
s.encoder.Reset(s.conn)
case <-ctx.Done():
break OUTER
}
}
return nil
}