mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
* Add -log-file-export and -log-lookback commands to add historical log to debug capture * use monitor.PrepFile() helper for other historical log tests
125 lines
2.7 KiB
Go
125 lines
2.7 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package monitor
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-msgpack/v2/codec"
|
|
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/shoenig/test/must"
|
|
)
|
|
|
|
// StreamingClient is an interface that implements the StreamingRpcHandler function
|
|
type StreamingClient interface {
|
|
StreamingRpcHandler(string) (structs.StreamingRpcHandler, error)
|
|
}
|
|
|
|
var writeLine = []byte(fmt.Sprintf("[INFO] log log log made of wood you are heavy but so good, %v\n", time.Now()))
|
|
|
|
func PrepFile(t *testing.T) *os.File {
|
|
const loopCount = 100
|
|
// Create test file to read from
|
|
dir := t.TempDir()
|
|
f, err := os.CreateTemp(dir, "log")
|
|
must.NoError(t, err)
|
|
|
|
for range loopCount {
|
|
_, _ = f.Write(writeLine)
|
|
}
|
|
f.Close()
|
|
|
|
// Create test file reader for stream set up
|
|
goldenFilePath := f.Name()
|
|
fileReader, err := os.Open(goldenFilePath)
|
|
must.NoError(t, err)
|
|
return fileReader
|
|
}
|
|
|
|
// ExportMonitorClient_TestHelper consolidates streaming test setup for use in
|
|
// client and server RPChandler tests
|
|
func ExportMonitorClient_TestHelper(req cstructs.MonitorExportRequest, c StreamingClient,
|
|
userTimeout <-chan time.Time) (*strings.Builder, error) {
|
|
var (
|
|
builder strings.Builder
|
|
returnedErr error
|
|
timeout <-chan time.Time
|
|
)
|
|
handler, err := c.StreamingRpcHandler("Agent.MonitorExport")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// create pipe
|
|
p1, p2 := net.Pipe()
|
|
defer p1.Close()
|
|
defer p2.Close()
|
|
|
|
errCh := make(chan error)
|
|
streamMsg := make(chan *cstructs.StreamErrWrapper)
|
|
|
|
go handler(p2)
|
|
|
|
// Start decoder
|
|
go func() {
|
|
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
|
|
for {
|
|
var msg cstructs.StreamErrWrapper
|
|
err := decoder.Decode(&msg)
|
|
streamMsg <- &msg
|
|
if err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
}
|
|
}()
|
|
|
|
// send request
|
|
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
|
|
if err := encoder.Encode(req); err != nil {
|
|
return nil, err
|
|
}
|
|
if userTimeout != nil {
|
|
timeout = userTimeout
|
|
}
|
|
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
return nil, errors.New("expected to be unreachable")
|
|
case err := <-errCh:
|
|
if err != nil && err != io.EOF {
|
|
return nil, err
|
|
}
|
|
case message := <-streamMsg:
|
|
var frame sframer.StreamFrame
|
|
|
|
if message.Error != nil {
|
|
returnedErr = message.Error
|
|
}
|
|
|
|
if len(message.Payload) != 0 {
|
|
err = json.Unmarshal(message.Payload, &frame)
|
|
returnedErr = err
|
|
builder.Write(frame.Data)
|
|
} else {
|
|
break OUTER
|
|
}
|
|
}
|
|
}
|
|
return &builder, returnedErr
|
|
}
|