Files
nomad/command/agent/monitor/stream_helpers_test.go
tehut 21841d3067 Add historical journald and log export flags to operator debug command (#26410)
* Add -log-file-export and -log-lookback commands to add historical log to
debug capture
* use monitor.PrepFile() helper for other historical log tests
2025-08-04 13:55:25 -07:00

230 lines
4.3 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package monitor
import (
"context"
"io"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/nomad/ci"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
"github.com/shoenig/test/must"
)
func TestClientStreamReader_StreamFixed(t *testing.T) {
ci.Parallel(t)
streamBytes := func(streamCh chan []byte, wg *sync.WaitGroup, file *os.File) {
go func() {
defer close(streamCh)
defer wg.Done()
logChunk := make([]byte, len(writeLine))
for {
n, readErr := file.Read(logChunk)
if readErr != nil && readErr != io.EOF {
must.NoError(t, readErr)
}
streamCh <- logChunk[:n]
if readErr == io.EOF {
break
}
}
}()
}
cases := []struct {
name string
eofCancel bool
expectErr bool
errString string
}{
{
name: "happy_path",
eofCancel: true,
},
{
name: "Stream Framer not Running",
expectErr: true,
eofCancel: true,
errString: "StreamFramer not running",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
file := PrepFile(t)
goldenFileContents, err := os.ReadFile(file.Name())
must.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
streamMsg := make(chan []byte, len(goldenFileContents))
streamBytes(streamMsg, &wg, file)
wg.Wait()
frames := make(chan *sframer.StreamFrame, 32)
frameSize := 1024
errCh := make(chan error, 1)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, frameSize)
streamReader := NewStreamReader(streamMsg, framer, int64(frameSize))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(1) //block until streamReader completes
go func() {
defer wg.Done()
defer streamReader.Destroy()
if !tc.expectErr {
streamReader.Run()
}
initialOffset := int64(0)
err := streamReader.StreamFixed(ctx, initialOffset, "", 0, errCh, tc.eofCancel)
if !tc.expectErr {
must.NoError(t, err)
} else {
must.NotNil(t, err)
must.EqError(t, err, tc.errString)
}
}()
wg.Wait()
// Parse and validate the contents of the frames channel
var streamErr error
var builder strings.Builder
var skipCount int
OUTER:
for skipCount < 2 {
select {
case frame, ok := <-frames:
if !ok {
select {
case streamErr = <-errCh:
must.NoError(t, streamErr) //we shouldn't hit an error here
default:
}
break OUTER
}
builder.Write(frame.Data)
case streamErr = <-errCh:
must.NoError(t, streamErr) //we shouldn't hit an error here
case <-ctx.Done():
break OUTER
default:
skipCount++
time.Sleep(1 * time.Millisecond) //makes the test a touch less flakey
}
}
if !tc.expectErr {
must.Eq(t, string(goldenFileContents), builder.String())
}
})
}
}
func TestScanServiceName(t *testing.T) {
cases := []struct {
testString string
expectErr bool
}{
{
testString: `nomad`,
},
{
testString: `nomad.socket`,
},
{
testString: `nomad-client.service`,
},
{
testString: `nomad.client.02.swap`,
},
{
testString: `nomadhelper@54.device`,
},
{
testString: `1.\@_-nomad@`,
expectErr: true,
},
{
testString: `1./@_-nomad@.automount`,
expectErr: true,
},
{
testString: `docker.path`,
expectErr: true,
},
{
testString: `nomad.path.gotcha`,
expectErr: true,
},
{
testString: `nomad/8.path`,
expectErr: true,
},
{
testString: `nomad%.path`,
expectErr: true,
},
{
testString: `nom4ad.path`,
expectErr: true,
},
{
testString: `nomad,.path`,
expectErr: true,
},
{
testString: `nomad.client`,
expectErr: true,
},
{
testString: `nomad!.path`,
expectErr: true,
},
{
testString: `nomad%http.timer`,
expectErr: true,
},
{
testString: `nomad,http.mount`,
expectErr: true,
},
{
testString: `nomad$http.service`,
expectErr: true,
},
{
testString: `nomad$.http.service`,
expectErr: true,
},
{
testString: `nomad$`,
expectErr: true,
},
}
for _, tc := range cases {
t.Run(tc.testString, func(t *testing.T) {
err := ScanServiceName(tc.testString)
if !tc.expectErr {
must.NoError(t, err)
} else {
must.Error(t, err)
}
})
}
}