Files
nomad/command/agent/monitor/test_helpers.go
tehut 21841d3067 Add historical journald and log export flags to operator debug command (#26410)
* Add -log-file-export and -log-lookback commands to add historical log to
debug capture
* use monitor.PrepFile() helper for other historical log tests
2025-08-04 13:55:25 -07:00

125 lines
2.7 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package monitor
import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"testing"
"time"
"github.com/hashicorp/go-msgpack/v2/codec"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
)
// StreamingClient is an interface that implements the StreamingRpcHandler function
type StreamingClient interface {
StreamingRpcHandler(string) (structs.StreamingRpcHandler, error)
}
var writeLine = []byte(fmt.Sprintf("[INFO] log log log made of wood you are heavy but so good, %v\n", time.Now()))
func PrepFile(t *testing.T) *os.File {
const loopCount = 100
// Create test file to read from
dir := t.TempDir()
f, err := os.CreateTemp(dir, "log")
must.NoError(t, err)
for range loopCount {
_, _ = f.Write(writeLine)
}
f.Close()
// Create test file reader for stream set up
goldenFilePath := f.Name()
fileReader, err := os.Open(goldenFilePath)
must.NoError(t, err)
return fileReader
}
// ExportMonitorClient_TestHelper consolidates streaming test setup for use in
// client and server RPChandler tests
func ExportMonitorClient_TestHelper(req cstructs.MonitorExportRequest, c StreamingClient,
userTimeout <-chan time.Time) (*strings.Builder, error) {
var (
builder strings.Builder
returnedErr error
timeout <-chan time.Time
)
handler, err := c.StreamingRpcHandler("Agent.MonitorExport")
if err != nil {
return nil, err
}
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
err := decoder.Decode(&msg)
streamMsg <- &msg
if err != nil {
errCh <- err
return
}
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
if err := encoder.Encode(req); err != nil {
return nil, err
}
if userTimeout != nil {
timeout = userTimeout
}
OUTER:
for {
select {
case <-timeout:
return nil, errors.New("expected to be unreachable")
case err := <-errCh:
if err != nil && err != io.EOF {
return nil, err
}
case message := <-streamMsg:
var frame sframer.StreamFrame
if message.Error != nil {
returnedErr = message.Error
}
if len(message.Payload) != 0 {
err = json.Unmarshal(message.Payload, &frame)
returnedErr = err
builder.Write(frame.Data)
} else {
break OUTER
}
}
}
return &builder, returnedErr
}