Add nomad monitor export command (#26178)

* Add MonitorExport command and handlers
* Implement autocomplete
* Require nomad in serviceName
* Fix race in StreamReader.Read
* Add and use framer.Flush() to coordinate function exit
* Add LogFile to client/Server config and read NomadLogPath in rpcHandler instead of HTTPServer
* Parameterize StreamFixed stream size
This commit is contained in:
tehut
2025-08-01 10:26:59 -07:00
committed by GitHub
parent 6f81222ec8
commit d709accaf5
31 changed files with 2354 additions and 180 deletions

3
.changelog/26178.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:improvement
cli: Added monitor export cli command to retrieve journald logs or the contents of the Nomad log file for a given Nomad agent
```

View File

@@ -302,8 +302,20 @@ func (a *Agent) Host(serverID, nodeID string, q *QueryOptions) (*HostDataRespons
// Monitor returns a channel which will receive streaming logs from the agent
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
frames, errCh := a.monitorHelper(stopCh, q, "/v1/agent/monitor")
return frames, errCh
}
// MonitorExport returns a channel which will receive streaming logs from the agent
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
func (a *Agent) MonitorExport(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
frames, errCh := a.monitorHelper(stopCh, q, "/v1/agent/monitor/export")
return frames, errCh
}
func (a *Agent) monitorHelper(stopCh <-chan struct{}, q *QueryOptions, path string) (chan *StreamFrame, chan error) {
errCh := make(chan error, 1)
r, err := a.client.newRequest("GET", "/v1/agent/monitor")
r, err := a.client.newRequest("GET", path)
if err != nil {
errCh <- err
return nil, errCh

View File

@@ -389,12 +389,23 @@ func (f *FrameReader) Read(p []byte) (n int, err error) {
case <-unblock:
return 0, nil
case err := <-f.errCh:
return 0, err
// check for race with f.frames before returning error
select {
case frame, ok := <-f.frames:
if !ok {
return 0, io.EOF
}
f.frame = frame
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
default:
return 0, err
}
case <-f.cancelCh:
return 0, io.EOF
}
}
// Copy the data out of the frame and update our offset
n = copy(p, f.frame.Data[f.frameOffset:])
f.frameOffset += n

View File

@@ -10,19 +10,16 @@ import (
"io"
"time"
log "github.com/hashicorp/go-hclog"
metrics "github.com/hashicorp/go-metrics/compat"
"github.com/hashicorp/go-msgpack/v2/codec"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/host"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/structs"
log "github.com/hashicorp/go-hclog"
metrics "github.com/hashicorp/go-metrics/compat"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
)
type Agent struct {
@@ -32,6 +29,7 @@ type Agent struct {
func NewAgentEndpoint(c *Client) *Agent {
a := &Agent{c: c}
a.c.streamingRpcs.Register("Agent.Monitor", a.monitor)
a.c.streamingRpcs.Register("Agent.MonitorExport", a.monitorExport)
return a
}
@@ -84,7 +82,6 @@ func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPpr
func (a *Agent) monitor(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now())
defer conn.Close()
// Decode arguments
var args cstructs.MonitorRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
@@ -117,7 +114,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
monitor := monitor.New(512, a.c.logger, &log.LoggerOptions{
m := monitor.New(512, a.c.logger, &log.LoggerOptions{
JSONFormat: args.LogJSON,
Level: logLevel,
IncludeLocation: args.LogIncludeLocation,
@@ -143,8 +140,8 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
<-ctx.Done()
}()
logCh := monitor.Start()
defer monitor.Stop()
logCh := m.Start()
defer m.Stop()
initialOffset := int64(0)
// receive logs and build frames
@@ -164,49 +161,11 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
case <-ctx.Done():
break LOOP
}
}
}()
var streamErr error
OUTER:
for {
select {
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
// There was a pending error!
default:
// No error, continue on
}
break OUTER
}
var resp cstructs.StreamErrWrapper
if args.PlainText {
resp.Payload = frame.Data
} else {
if err := frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}
resp.Payload = buf.Bytes()
buf.Reset()
}
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
}
}
streamEncoder := monitor.NewStreamEncoder(&buf, conn, encoder, frameCodec, args.PlainText)
streamErr := streamEncoder.EncodeStream(frames, errCh, ctx, framer, false)
if streamErr != nil {
handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder)
@@ -214,7 +173,7 @@ OUTER:
}
}
// Host collects data about the host evironment running the agent
// Host collects data about the host environment running the agent
func (a *Agent) Host(args *structs.HostDataRequest, reply *structs.HostDataResponse) error {
aclObj, err := a.c.ResolveToken(args.AuthToken)
if err != nil {
@@ -233,3 +192,94 @@ func (a *Agent) Host(args *structs.HostDataRequest, reply *structs.HostDataRespo
reply.HostData = data
return nil
}
func (a *Agent) monitorExport(conn io.ReadWriteCloser) {
defer conn.Close()
// Decode arguments
var args cstructs.MonitorExportRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
return
}
// Check acl
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, pointer.Of(int64(403)), encoder)
return
} else if !aclObj.AllowAgentRead() {
handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder)
return
}
nomadLogPath := a.c.GetConfig().LogFile
if args.OnDisk && nomadLogPath == "" {
handleStreamResultError(errors.New("No nomad log file defined"), pointer.Of(int64(400)), encoder)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opts := monitor.MonitorExportOpts{
Logger: a.c.logger,
LogsSince: args.LogsSince,
ServiceName: args.ServiceName,
NomadLogPath: nomadLogPath,
OnDisk: args.OnDisk,
Follow: args.Follow,
Context: ctx,
}
frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
errCh := make(chan error)
var buf bytes.Buffer
frameSize := 1024
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, frameSize)
framer.Run()
defer framer.Destroy()
// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe explicitly closed, exit
cancel()
return
}
<-ctx.Done()
}()
m, err := monitor.NewExportMonitor(opts)
if err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
return
}
var eofCancelCh chan error
streamCh := m.Start()
initialOffset := int64(0)
eofCancel := !opts.Follow
// receive logs and build frames
streamReader := monitor.NewStreamReader(streamCh, framer, int64(frameSize))
go func() {
defer framer.Destroy()
if err := streamReader.StreamFixed(ctx, initialOffset, "", 0, eofCancelCh, eofCancel); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
}
}()
streamEncoder := monitor.NewStreamEncoder(&buf, conn, encoder, frameCodec, args.PlainText)
streamErr := streamEncoder.EncodeStream(frames, errCh, ctx, framer, true)
if streamErr != nil {
handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder)
return
}
}

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net"
"os"
"strings"
"testing"
"time"
@@ -18,11 +19,13 @@ import (
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -446,3 +449,81 @@ func TestAgentHost_ACL(t *testing.T) {
})
}
}
func TestMonitor_MonitorExport(t *testing.T) {
ci.Parallel(t)
// Create test file
dir := t.TempDir()
f, err := os.CreateTemp(dir, "log")
must.NoError(t, err)
for range 1000 {
_, _ = f.WriteString(fmt.Sprintf("%v [INFO] it's log, it's log, it's big it's heavy it's wood", time.Now()))
}
f.Close()
testFilePath := f.Name()
testFileContents, err := os.ReadFile(testFilePath)
must.NoError(t, err)
// start server
s, root, cleanupS := nomad.TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
defer cleanupS()
c, cleanupC := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{s.GetConfig().RPCAddr.String()}
c.LogFile = testFilePath
})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
defer cleanupC()
testutil.WaitForLeader(t, s.RPC)
cases := []struct {
name string
expected string
serviceName string
token string
onDisk bool
expectErr bool
}{
{
name: "happy_path_golden_file",
onDisk: true,
expected: string(testFileContents),
token: root.SecretID,
},
{
name: "token_error",
onDisk: true,
expected: string(testFileContents),
token: tokenBad.SecretID,
expectErr: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := cstructs.MonitorExportRequest{
NodeID: "this is checked in the CLI",
OnDisk: tc.onDisk,
QueryOptions: structs.QueryOptions{
Region: "global",
AuthToken: tc.token,
},
}
builder, finalError := monitor.ExportMonitorClient_TestHelper(req, c, time.After(3*time.Second))
if tc.expectErr {
must.Error(t, finalError)
return
}
must.NoError(t, err)
must.NotNil(t, builder)
must.Eq(t, strings.TrimSpace(tc.expected), strings.TrimSpace(builder.String()))
})
}
}

View File

@@ -384,6 +384,9 @@ type Config struct {
// NodeMaxAllocs is an optional field that sets the maximum number of
// allocations a node can be assigned. Defaults to 0 and ignored if unset.
NodeMaxAllocs int
// LogFile is used by MonitorExport to stream a server's log file
LogFile string `hcl:"log_file"`
}
type APIListenerRegistrar interface {

View File

@@ -97,6 +97,9 @@ type StreamFramer struct {
// Captures whether the framer is running
running bool
// Confirms final flush sent
flushed bool
}
// NewStreamFramer creates a new stream framer that will output StreamFrames to
@@ -107,7 +110,6 @@ func NewStreamFramer(out chan<- *StreamFrame,
// Create the heartbeat and flush ticker
heartbeat := time.NewTicker(heartbeatRate)
flusher := time.NewTicker(batchWindow)
return &StreamFramer{
out: out,
frameSize: frameSize,
@@ -123,7 +125,6 @@ func NewStreamFramer(out chan<- *StreamFrame,
// Destroy is used to cleanup the StreamFramer and flush any pending frames
func (s *StreamFramer) Destroy() {
s.l.Lock()
wasShutdown := s.shutdown
s.shutdown = true
@@ -204,7 +205,6 @@ OUTER:
// Send() may have left a partial frame. Send it now.
if !s.f.IsCleared() {
s.f.Data = s.readData()
// Only send if there's actually data left
if len(s.f.Data) > 0 {
// Cannot select on shutdownCh as it's already closed
@@ -281,6 +281,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
// Flush till we are under the max frame size
for s.data.Len() >= s.frameSize || force {
// Clear since are flushing the frame and capturing the file event.
// Subsequent data frames will be flushed based on the data size alone
// since they share the same fileevent.
@@ -309,3 +310,22 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
return nil
}
func (s *StreamFramer) IsFlushed() bool {
return s.flushed
}
func (s *StreamFramer) Flush() bool {
s.l.Lock()
// Send() may have left a partial frame. Send it now.
s.f.Data = s.readData()
// Only send if there's actually data left
if len(s.f.Data) > 0 {
s.out <- s.f.Copy()
}
s.flushed = true
s.l.Unlock()
return s.IsFlushed()
}

View File

@@ -62,6 +62,37 @@ type MonitorRequest struct {
structs.QueryOptions
}
type MonitorExportRequest struct {
// NodeID is the node we want to track the logs of
NodeID string
// ServerID is the server we want to track the logs of
ServerID string
// ServiceName is the systemd service for which we want to retrieve logs
// Cannot be used with OnDisk
ServiceName string
// Follow indicates that the monitor should continue to deliver logs until
// an outside interrupt. Cannot be used with OnDisk
Follow bool
// LogsSince sets the lookback time for monitorExport logs in hours
LogsSince string
// OnDisk indicates that nomad should export logs written to the configured nomad log path
OnDisk bool
// NomadLogPath is set to the nomad log path by the HTTP agent if OnDisk
// is true
NomadLogPath string
// PlainText disables base64 encoding.
PlainText bool
structs.QueryOptions
}
// AllocFileInfo holds information about a file inside the AllocDir
type AllocFileInfo struct {
Name string

View File

@@ -655,7 +655,8 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
return nil, fmt.Errorf("number of schedulers should be between 0 and %d",
runtime.NumCPU())
}
// Copy LogFile config value
conf.LogFile = agentConfig.LogFile
return conf, nil
}
@@ -753,7 +754,6 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
if conf == nil {
conf = clientconfig.DefaultConfig()
}
conf.Servers = agentConfig.Client.Servers
conf.DevMode = agentConfig.DevMode
conf.EnableDebug = agentConfig.EnableDebug
@@ -1016,6 +1016,7 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
conf.Users = clientconfig.UsersConfigFromAgent(agentConfig.Client.Users)
conf.LogFile = agentConfig.LogFile
return conf, nil
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/hashicorp/nomad/api"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/host"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
@@ -211,6 +212,103 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
codedErr := s.streamMonitor(resp, req, args, nodeID, "Agent.Monitor")
return nil, codedErr
}
func (s *HTTPServer) AgentMonitorExport(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Process and validate arguments
onDisk := false
onDiskBool, err := parseBool(req, "on_disk")
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Unknown value for on-disk: %v", err))
}
if onDiskBool != nil {
onDisk = *onDiskBool
}
follow := false
followBool, err := parseBool(req, "follow")
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Unknown value for follow: %v", err))
}
if followBool != nil {
follow = *followBool
}
plainText := false
plainTextBool, err := parseBool(req, "plain")
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Unknown value for plain: %v", err))
}
if plainTextBool != nil {
plainText = *plainTextBool
}
logsSince := "72h" //default value
logsSinceStr := req.URL.Query().Get("logs_since")
if logsSinceStr != "" {
_, err := time.ParseDuration(logsSinceStr)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Unknown value for logs-since: %v", err))
}
logsSince = logsSinceStr
}
serviceName := req.URL.Query().Get("service_name")
nodeID := req.URL.Query().Get("node_id")
serverID := req.URL.Query().Get("server_id")
if nodeID != "" && serverID != "" {
return nil, CodedError(400, "Cannot target node and server simultaneously")
}
if onDisk && serviceName != "" {
return nil, CodedError(400, "Cannot target journald and nomad log file simultaneously")
}
if !onDisk && serviceName == "" {
return nil, CodedError(400, "Either -service-name or -on-disk must be set")
}
if onDisk && follow {
return nil, CodedError(400, "Cannot follow log file")
}
if serviceName != "" {
if err := monitor.ScanServiceName(serviceName); err != nil {
return nil, CodedError(422, err.Error())
}
}
// Build the request and parse the ACL token
args := cstructs.MonitorExportRequest{
NodeID: nodeID,
ServerID: serverID,
LogsSince: logsSince,
ServiceName: serviceName,
OnDisk: onDisk,
Follow: follow,
PlainText: plainText,
}
// Force the Content-Type to avoid Go's http.ResponseWriter from
// detecting an incorrect or unsafe one.
if plainText {
resp.Header().Set("Content-Type", "text/plain")
} else {
resp.Header().Set("Content-Type", "application/json")
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
codedErr := s.streamMonitor(resp, req, args, nodeID, "Agent.MonitorExport")
return nil, codedErr
}
func (s *HTTPServer) streamMonitor(resp http.ResponseWriter, req *http.Request,
args any, nodeID string, endpoint string) error {
// Make the RPC
var handler structs.StreamingRpcHandler
@@ -219,24 +317,25 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(nodeID)
if useLocalClient {
handler, handlerErr = s.agent.Client().StreamingRpcHandler("Agent.Monitor")
handler, handlerErr = s.agent.Client().StreamingRpcHandler(endpoint)
} else if useClientRPC {
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler("Agent.Monitor")
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler(endpoint)
} else if useServerRPC {
handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor")
handler, handlerErr = s.agent.Server().StreamingRpcHandler(endpoint)
} else {
handlerErr = CodedError(400, "No local Node and node_id not provided")
handlerErr = CodedError(400, "No local Node")
}
// No node id monitor current server/client
} else if srv := s.agent.Server(); srv != nil {
handler, handlerErr = srv.StreamingRpcHandler("Agent.Monitor")
handler, handlerErr = srv.StreamingRpcHandler(endpoint)
} else {
handler, handlerErr = s.agent.Client().StreamingRpcHandler("Agent.Monitor")
handler, handlerErr = s.agent.Client().StreamingRpcHandler(endpoint)
}
if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
return CodedError(500, handlerErr.Error())
}
httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
@@ -256,7 +355,6 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
// stream response
go func() {
defer cancel()
// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
@@ -293,7 +391,8 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
}()
handler(handlerPipe)
cancel()
cancel() //this seems like it should be wrong to me but removing it didn't
// affect either truncation or short returns
codedErr := <-errCh
if codedErr != nil &&
@@ -302,7 +401,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
strings.Contains(codedErr.Error(), "EOF")) {
codedErr = nil
}
return nil, codedErr
return codedErr
}
func (s *HTTPServer) AgentForceLeaveRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

View File

@@ -27,6 +27,7 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/mock"
@@ -446,6 +447,204 @@ func TestHTTP_AgentMonitor(t *testing.T) {
})
}
func TestHTTP_AgentMonitorExport(t *testing.T) {
ci.Parallel(t)
const expectedText = "log log log log log"
dir := t.TempDir()
testFile, err := os.CreateTemp(dir, "nomadtests")
must.NoError(t, err)
_, err = testFile.Write([]byte(expectedText))
must.NoError(t, err)
inlineFilePath := testFile.Name()
config := func(c *Config) {
c.LogFile = inlineFilePath
}
baseURL := "/v1/agent/monitor/export?"
cases := []struct {
name string
follow string
logsSince string
nodeID string
onDisk string
serviceName string
serverID string
config func(c *Config)
errCode int
errString string
expectErr bool
want string
}{
{
name: "happy_path",
follow: "false",
onDisk: "true",
logsSince: "9s",
config: config,
expectErr: false,
want: expectedText,
},
{
name: "invalid_onDisk",
follow: "false",
onDisk: "green",
config: config,
errCode: 400,
expectErr: true,
errString: "Unknown value for on-disk",
},
{
name: "invalid_follow",
follow: "green",
onDisk: "false",
config: config,
errCode: 400,
expectErr: true,
errString: "Unknown value for follow",
},
{
name: "invalid_service_name",
follow: "true",
onDisk: "false",
serviceName: "nomad%",
config: config,
errCode: 422,
expectErr: true,
errString: "does not meet systemd conventions",
},
{
name: "invalid_logsSince_duration",
follow: "false",
onDisk: "true",
serviceName: "nomad",
logsSince: "98seconds",
config: config,
errCode: 400,
expectErr: true,
errString: `unknown unit "seconds" in duration`,
want: expectedText,
},
{
name: "server_and_node",
follow: "false",
onDisk: "true",
nodeID: "doesn'tneedtobeuuid",
serverID: "doesntneedtobeuuid",
config: config,
errCode: 400,
errString: "Cannot target node and server simultaneously",
expectErr: true,
want: expectedText,
},
{
name: "onDisk_and_serviceName",
follow: "false",
onDisk: "true",
serviceName: "nomad",
nodeID: "doesn'tneedtobeuuid",
config: config,
errCode: 400,
errString: "Cannot target journald and nomad log file simultaneously",
expectErr: true,
want: expectedText,
},
{
name: "neither_onDisk_nor_serviceName",
follow: "false",
nodeID: "doesn'tneedtobeuuid",
config: config,
errCode: 400,
errString: "Either -service-name or -on-disk must be set",
expectErr: true,
want: expectedText,
},
{
name: "onDisk_and_follow",
follow: "true",
onDisk: "true",
nodeID: "doesn'tneedtobeuuid",
config: config,
errCode: 400,
errString: "Cannot follow log file",
expectErr: true,
want: expectedText,
},
{
name: "onDisk_and_no_log_file",
onDisk: "true",
config: nil,
errCode: 400,
errString: "No nomad log file defined",
expectErr: true,
want: expectedText,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
httpTest(t, tc.config, func(s *TestAgent) {
// Prepare urlstring
urlVal := url.Values{}
urlParamPrep := func(k string, v string, failCase string, values *url.Values) {
if v != failCase {
values.Add(k, v)
}
}
urlParamPrep("follow", tc.follow, "false", &urlVal)
urlParamPrep("logs_since", tc.logsSince, "", &urlVal)
urlParamPrep("on_disk", tc.onDisk, "", &urlVal)
urlParamPrep("node_id", tc.nodeID, "", &urlVal)
urlParamPrep("server_id", tc.serverID, "", &urlVal)
urlParamPrep("service_name", tc.serviceName, "", &urlVal)
urlString := baseURL + urlVal.Encode()
req, err := http.NewRequest(http.MethodGet, urlString, nil)
must.NoError(t, err)
resp := newClosableRecorder()
defer resp.Close()
var (
builder strings.Builder
frame sframer.StreamFrame
)
_, err = s.Server.AgentMonitorExport(resp, req)
if tc.expectErr {
t.Log(err.Error())
must.Eq(t, tc.errCode, err.(HTTPCodedError).Code())
must.StrContains(t, err.Error(), tc.errString)
return
}
must.NoError(t, err)
output, err := io.ReadAll(resp.Body)
must.NoError(t, err)
err = json.Unmarshal(output, &frame)
if err != nil && err != io.EOF {
must.NoError(t, err)
}
builder.WriteString(string(frame.Data))
must.Eq(t, tc.want, builder.String())
})
})
}
}
// Scenarios when Pprof requests should be available
// see https://github.com/hashicorp/nomad/issues/6496
// +---------------+------------------+--------+------------------+

View File

@@ -424,6 +424,9 @@ type ClientConfig struct {
// NodeMaxAllocs sets the maximum number of allocations per node
// Defaults to 0 and ignored if unset.
NodeMaxAllocs int `hcl:"node_max_allocs"`
// LogFile is used by MonitorExport to stream a client's log file
LogFile string `hcl:"log_file"`
}
func (c *ClientConfig) Copy() *ClientConfig {
@@ -756,6 +759,9 @@ type ServerConfig struct {
// expected to complete before the server is considered healthy. Without
// this, the server can hang indefinitely waiting for these.
StartTimeout string `hcl:"start_timeout"`
// LogFile is used by MonitorExport to stream a server's log file
LogFile string `hcl:"log_file"`
}
func (s *ServerConfig) Copy() *ServerConfig {

View File

@@ -472,6 +472,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
// "application/json" Content-Type depending on the ?plain= query
// parameter.
s.mux.HandleFunc("/v1/agent/monitor", s.wrap(s.AgentMonitor))
s.mux.HandleFunc("/v1/agent/monitor/export", s.wrap(s.AgentMonitorExport))
s.mux.HandleFunc("/v1/agent/pprof/", s.wrapNonJSON(s.AgentPprofRequest))

View File

@@ -0,0 +1,275 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package monitor
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"runtime"
"slices"
"strings"
"sync"
"time"
"github.com/hashicorp/go-hclog"
)
const defaultBufSize = 512
// ExportMonitor implements the Monitor interface for testing
type ExportMonitor struct {
sync.Mutex
logCh chan []byte
logger hclog.Logger
// doneCh coordinates breaking out of the export loop
doneCh chan struct{}
// ExportReader can read from the cli or the NomadFilePath
ExportReader *ExportReader
bufSize int
}
type MonitorExportOpts struct {
Logger hclog.Logger
// LogsSince sets the lookback time for monitorExport logs in hours
LogsSince string
// OnDisk indicates that nomad should export logs written to the configured nomad log path
OnDisk bool
// ServiceName is the systemd service for which we want to retrieve logs
// Cannot be used with OnDisk
ServiceName string
// NomadLogPath is set to the nomad log path by the HTTP agent if OnDisk
// is true
NomadLogPath string
// Follow indicates that the monitor should continue to deliver logs until
// an outside interrupt
Follow bool
// Context passed from client to close the cmd and exit the function
Context context.Context
// ExportMonitor's buffer size, defaults to 512 if unset by caller
BufSize int
}
type ExportReader struct {
io.Reader
Cmd *exec.Cmd
UseCli bool
Follow bool
}
// NewExportMonitor validates and prepares the appropriate reader before
// returning a new ExportMonitor or the appropriate error
func NewExportMonitor(opts MonitorExportOpts) (*ExportMonitor, error) {
var (
exportReader *ExportReader
bufSize int
)
if runtime.GOOS != "linux" &&
opts.ServiceName != "" {
return nil, errors.New("journald log monitoring only available on linux")
}
if opts.BufSize == 0 {
bufSize = defaultBufSize
} else {
bufSize = opts.BufSize
}
if opts.OnDisk && opts.ServiceName == "" {
e, prepErr := fileReader(opts)
if prepErr != nil {
return nil, prepErr
}
exportReader = e
}
if opts.ServiceName != "" && !opts.OnDisk {
e, prepErr := cliReader(opts)
if prepErr != nil {
return nil, prepErr
}
exportReader = e
}
sw := ExportMonitor{
logger: hclog.Default().Named("export"),
doneCh: make(chan struct{}, 1),
logCh: make(chan []byte, bufSize),
bufSize: bufSize,
ExportReader: exportReader,
}
return &sw, nil
}
// ScanServiceName checks that the length, prefix and suffix conform to
// systemd conventions and ensures the service name includes the word 'nomad'
func ScanServiceName(input string) error {
prefix := ""
// invalid if prefix and suffix together are > 255 char
if len(input) > 255 {
return errors.New("service name too long")
}
if isNomad := strings.Contains(input, "nomad"); !isNomad {
return errors.New(`service name must include 'nomad`)
}
// if there is a suffix, check against list of valid suffixes
// and set prefix to exclude suffix index, else set prefix
splitInput := strings.Split(input, ".")
if len(splitInput) < 2 {
prefix = input
} else {
suffix := splitInput[len(splitInput)-1]
validSuffix := []string{
"service",
"socket",
"device",
"mount",
"automount",
"swap",
"target",
"path",
"timer",
"slice",
"scope",
}
if valid := slices.Contains(validSuffix, suffix); !valid {
return errors.New("invalid suffix")
}
prefix = strings.Join(splitInput[:len(splitInput)-1], "")
}
safe, _ := regexp.MatchString(`^[\w\\._-]*(@[\w\\._-]+)?$`, prefix)
if !safe {
return fmt.Errorf("%s does not meet systemd conventions", prefix)
}
return nil
}
func cliReader(opts MonitorExportOpts) (*ExportReader, error) {
isCli := true
// Vet servicename again
if err := ScanServiceName(opts.ServiceName); err != nil {
return nil, err
}
cmdDuration := "72 hours"
if opts.LogsSince != "" {
parsedDur, err := time.ParseDuration(opts.LogsSince)
if err != nil {
return nil, err
}
cmdDuration = parsedDur.String()
}
// build command with vetted inputs
cmdArgs := []string{"-xu", opts.ServiceName, "--since", fmt.Sprintf("%s ago", cmdDuration)}
if opts.Follow {
cmdArgs = append(cmdArgs, "-f")
}
cmd := exec.CommandContext(opts.Context, "journalctl", cmdArgs...)
// set up reader
stdOut, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stdErr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
multiReader := io.MultiReader(stdOut, stdErr)
cmd.Start()
return &ExportReader{multiReader, cmd, isCli, opts.Follow}, nil
}
func fileReader(opts MonitorExportOpts) (*ExportReader, error) {
notCli := false
file, err := os.Open(opts.NomadLogPath)
if err != nil {
return nil, err
}
return &ExportReader{file, nil, notCli, opts.Follow}, nil
}
// Stop stops the monitoring process
func (d *ExportMonitor) Stop() {
select {
case _, ok := <-d.doneCh:
if !ok {
if d.ExportReader.UseCli {
d.ExportReader.Cmd.Wait()
}
close(d.logCh)
return
}
default:
}
close(d.logCh)
}
// Start reads data from the monitor's ExportReader into its logCh
func (d *ExportMonitor) Start() <-chan []byte {
// Read, copy, and send to channel until we hit EOF or error
go func() {
defer d.Stop()
logChunk := make([]byte, d.bufSize)
for {
n, readErr := d.ExportReader.Read(logChunk)
if readErr != nil && readErr != io.EOF {
d.logger.Error("unable to read logs into channel", readErr.Error())
return
}
d.Write(logChunk[:n])
if readErr == io.EOF {
break
}
}
close(d.doneCh)
}()
return d.logCh
}
// Write attempts to send latest log to logCh
// it drops the log if channel is unavailable to receive
func (d *ExportMonitor) Write(p []byte) (n int) {
d.Lock()
defer d.Unlock()
// ensure logCh is still open
select {
case <-d.doneCh:
return
default:
}
bytes := make([]byte, len(p))
copy(bytes, p)
d.logCh <- bytes
return len(p)
}

View File

@@ -4,13 +4,17 @@
package monitor
import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/ci"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
@@ -27,7 +31,6 @@ func TestMonitor_Start(t *testing.T) {
logCh := m.Start()
defer m.Stop()
go func() {
logger.Debug("test log")
time.Sleep(10 * time.Millisecond)
@@ -90,3 +93,103 @@ TEST:
}
}
}
func TestMonitor_Export(t *testing.T) {
ci.Parallel(t)
const (
expectedText = "log log log log log"
)
dir := t.TempDir()
f, err := os.CreateTemp(dir, "log")
must.NoError(t, err)
for range 1000 {
_, _ = f.WriteString(fmt.Sprintf("%v [INFO] it's log, it's log, it's big it's heavy it's wood", time.Now()))
}
f.Close()
goldenFilePath := f.Name()
goldenFileContents, err := os.ReadFile(goldenFilePath)
must.NoError(t, err)
testFile, err := os.CreateTemp("", "nomadtest")
must.NoError(t, err)
_, err = testFile.Write([]byte(expectedText))
must.NoError(t, err)
inlineFilePath := testFile.Name()
logger := log.NewInterceptLogger(&log.LoggerOptions{
Level: log.Error,
})
ctx, cancel := context.WithCancel(context.Background())
cases := []struct {
name string
opts MonitorExportOpts
expected string
expectClose bool
}{
{
name: "happy_path_logpath_long_file",
opts: MonitorExportOpts{
Context: ctx,
Logger: logger,
OnDisk: true,
NomadLogPath: goldenFilePath,
},
expected: string(goldenFileContents),
},
{
name: "happy_path_logpath_short_file",
opts: MonitorExportOpts{
Context: ctx,
Logger: logger,
OnDisk: true,
NomadLogPath: inlineFilePath,
},
expected: expectedText,
},
{
name: "close client context",
opts: MonitorExportOpts{
Context: ctx,
Logger: logger,
OnDisk: true,
NomadLogPath: inlineFilePath,
},
expected: expectedText,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
monitor, err := NewExportMonitor(tc.opts)
must.NoError(t, err)
logCh := monitor.Start()
if tc.expectClose {
cancel()
}
var builder strings.Builder
TEST:
for {
select {
case log, ok := <-logCh:
if !ok {
break TEST
}
builder.Write(log)
default:
continue
}
}
if !tc.expectClose {
must.Eq(t, strings.TrimSpace(tc.expected), strings.TrimSpace(builder.String()))
} else {
must.Eq(t, builder.String(), "")
}
})
}
}

View File

@@ -0,0 +1,250 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package monitor
import (
"bytes"
"context"
"io"
"strings"
"sync"
"syscall"
"github.com/hashicorp/go-msgpack/v2/codec"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
)
// StreamReader is used to process fixed length streams for consumers
// that rely on terminating the stream after hitting an EOF. The lock
// protects the buffer during reads
type StreamReader struct {
sync.Mutex
framer *sframer.StreamFramer
ch <-chan []byte
buf []byte
frameSize int64
}
// NewStreamReader takes a <-chan[]byte and *sframer.StreamFramer and returns
// a ready to use StreamReader that will allocate its buffer on first read
func NewStreamReader(ch <-chan []byte, framer *sframer.StreamFramer, frameSize int64) *StreamReader {
return &StreamReader{
ch: ch,
framer: framer,
frameSize: frameSize,
}
}
// Read reads stream data into the StreamReader's buffer and copies that
// data into p
func (r *StreamReader) Read(p []byte) (n int, err error) {
select {
case data, ok := <-r.ch:
if !ok && len(data) == 0 {
return 0, io.EOF
}
r.Lock()
r.buf = data
default:
return 0, nil
}
n = copy(p, r.buf)
r.buf = r.buf[n:]
r.Unlock()
return n, nil
}
// StreamFixed streams any fixed length data stream. If limit is greater than
// zero, the stream will end once that many bytes have been read. If eofCancelCh
// is triggered while at EOF, read one more frame and cancel the stream on the
// next EOF. If the connection is broken an EPIPE error is returned.
func (r *StreamReader) StreamFixed(ctx context.Context, offset int64, path string, limit int64,
eofCancelCh chan error, cancelAfterFirstEof bool) error {
defer r.framer.Flush()
parseFramerErr := func(err error) error {
if err == nil {
return nil
}
errMsg := err.Error()
if strings.Contains(errMsg, io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}
// The connection was closed by our peer
if strings.Contains(errMsg, syscall.EPIPE.Error()) || strings.Contains(errMsg, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
if strings.Contains(errMsg, "forcibly closed") {
return syscall.EPIPE
}
return err
}
// streamFrameSize is the maximum number of bytes to send in a single frame
streamFrameSize := r.frameSize
bufSize := streamFrameSize
if limit > 0 && limit < streamFrameSize {
bufSize = limit
}
streamBuffer := make([]byte, bufSize)
var lastEvent string
// Only watch file when there is a need for it
cancelReceived := cancelAfterFirstEof
OUTER:
for {
// Read up to the max frame size
n, readErr := r.Read(streamBuffer)
// Update the offset
offset += int64(n)
// Return non-EOF errors
if readErr != nil && readErr != io.EOF {
return readErr
}
// Send the frame
if n != 0 || lastEvent != "" {
if err := r.framer.Send(path, lastEvent, streamBuffer[:n], offset); err != nil {
return parseFramerErr(err)
}
}
// Clear the last event
if lastEvent != "" {
lastEvent = ""
}
// Just keep reading since we aren't at the end of the file so we can
// avoid setting up a file event watcher.
if readErr == nil {
continue
}
// At this point we can stop without waiting for more changes,
// because we have EOF and either we're not following at all,
// or we received an event from the eofCancelCh channel
// and last read was executed
if cancelReceived {
return nil
}
for {
select {
case <-r.framer.ExitCh():
return nil
case <-ctx.Done():
return nil
case _, ok := <-eofCancelCh:
if !ok {
return nil
}
cancelReceived = true
continue OUTER
}
}
}
}
// Destroy wraps the underlying framer's Destroy() call
func (r *StreamReader) Destroy() {
r.framer.Destroy()
}
// Run wraps the underlying framer's Run() call
func (r *StreamReader) Run() {
r.framer.Run()
}
// StreamEncoder consolidates logic used by monitor RPC handlers to encode and
// return stream data
type StreamEncoder struct {
buf *bytes.Buffer
conn io.ReadWriteCloser
encoder *codec.Encoder
frameCodec *codec.Encoder
plainText bool
}
// NewStreamEncoder takes buf *bytes.Buffer, conn io.ReadWriteCloser, encoder *codec.Encoder
// frameCodec *codec.Encoder,and plainText bool and returns a NewStreamEncoder
func NewStreamEncoder(buf *bytes.Buffer, conn io.ReadWriteCloser, encoder *codec.Encoder,
frameCodec *codec.Encoder, plainText bool) StreamEncoder {
return StreamEncoder{
buf: buf,
conn: conn,
encoder: encoder,
frameCodec: frameCodec,
plainText: plainText,
}
}
// EncodeStream reads and encodes data from a chan *sframer.Streamframe until the
// channel is closed. If eofCancel is true,EncodeStream continues to read from the closed
// channel until the underlying framer reports it has flushed it's final frame
func (s *StreamEncoder) EncodeStream(frames chan *sframer.StreamFrame,
errCh chan error, ctx context.Context, framer *sframer.StreamFramer,
eofCancel bool) (err error) {
var streamErr error
localFlush := false
OUTER:
for {
select {
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
return streamErr
// There was a pending error!
default:
// No error, continue on and let exitCh control breaking
}
// Confirm framer.Flush and localFlush if we're expecting EOF
if eofCancel {
_, ok := <-framer.ExitCh()
if !ok {
if framer.IsFlushed() && !localFlush {
localFlush = true
continue
} else if framer.IsFlushed() && localFlush {
break OUTER
}
}
} else {
break OUTER
}
}
var resp cstructs.StreamErrWrapper
if s.plainText {
resp.Payload = frame.Data
} else {
if err := s.frameCodec.Encode(frame); err != nil && err != io.EOF {
return err
}
resp.Payload = s.buf.Bytes()
s.buf.Reset()
}
if err := s.encoder.Encode(resp); err != nil {
return err
}
s.encoder.Reset(s.conn)
case <-ctx.Done():
break OUTER
}
}
return nil
}

View File

@@ -0,0 +1,250 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package monitor
import (
"context"
"io"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/nomad/ci"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
"github.com/shoenig/test/must"
)
var writeLine = []byte("[INFO] log log log made of wood you are heavy but so good\n")
func prepFile(t *testing.T) *os.File {
const loopCount = 10
// Create test file to read from
dir := t.TempDir()
f, err := os.CreateTemp(dir, "log")
must.NoError(t, err)
for range loopCount {
_, _ = f.Write(writeLine)
}
f.Close()
// Create test file reader for stream set up
goldenFilePath := f.Name()
fileReader, err := os.Open(goldenFilePath)
must.NoError(t, err)
return fileReader
}
func TestClientStreamReader_StreamFixed(t *testing.T) {
ci.Parallel(t)
streamBytes := func(streamCh chan []byte, wg *sync.WaitGroup, file *os.File) {
go func() {
defer close(streamCh)
defer wg.Done()
logChunk := make([]byte, len(writeLine))
for {
n, readErr := file.Read(logChunk)
if readErr != nil && readErr != io.EOF {
must.NoError(t, readErr)
}
streamCh <- logChunk[:n]
if readErr == io.EOF {
break
}
}
}()
}
cases := []struct {
name string
eofCancel bool
expectErr bool
errString string
}{
{
name: "happy_path",
eofCancel: true,
},
{
name: "Stream Framer not Running",
expectErr: true,
eofCancel: true,
errString: "StreamFramer not running",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
file := prepFile(t)
goldenFileContents, err := os.ReadFile(file.Name())
must.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
streamMsg := make(chan []byte, len(goldenFileContents))
streamBytes(streamMsg, &wg, file)
wg.Wait()
frames := make(chan *sframer.StreamFrame, 32)
frameSize := 1024
errCh := make(chan error, 1)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, frameSize)
streamReader := NewStreamReader(streamMsg, framer, int64(frameSize))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(1) //block until streamReader completes
go func() {
defer wg.Done()
defer streamReader.Destroy()
if !tc.expectErr {
streamReader.Run()
}
initialOffset := int64(0)
err := streamReader.StreamFixed(ctx, initialOffset, "", 0, errCh, tc.eofCancel)
if !tc.expectErr {
must.NoError(t, err)
} else {
must.NotNil(t, err)
must.EqError(t, err, tc.errString)
}
}()
wg.Wait()
// Parse and validate the contents of the frames channel
var streamErr error
var builder strings.Builder
var skipCount int
OUTER:
for skipCount < 2 {
select {
case frame, ok := <-frames:
if !ok {
select {
case streamErr = <-errCh:
must.NoError(t, streamErr) //we shouldn't hit an error here
default:
}
break OUTER
}
builder.Write(frame.Data)
case streamErr = <-errCh:
must.NoError(t, streamErr) //we shouldn't hit an error here
case <-ctx.Done():
break OUTER
default:
skipCount++
time.Sleep(1 * time.Millisecond) //makes the test a touch less flakey
}
}
if !tc.expectErr {
must.Eq(t, string(goldenFileContents), builder.String())
}
})
}
}
func TestScanServiceName(t *testing.T) {
cases := []struct {
testString string
expectErr bool
}{
{
testString: `nomad`,
},
{
testString: `nomad.socket`,
},
{
testString: `nomad-client.service`,
},
{
testString: `nomad.client.02.swap`,
},
{
testString: `nomadhelper@54.device`,
},
{
testString: `1.\@_-nomad@`,
expectErr: true,
},
{
testString: `1./@_-nomad@.automount`,
expectErr: true,
},
{
testString: `docker.path`,
expectErr: true,
},
{
testString: `nomad.path.gotcha`,
expectErr: true,
},
{
testString: `nomad/8.path`,
expectErr: true,
},
{
testString: `nomad%.path`,
expectErr: true,
},
{
testString: `nom4ad.path`,
expectErr: true,
},
{
testString: `nomad,.path`,
expectErr: true,
},
{
testString: `nomad.client`,
expectErr: true,
},
{
testString: `nomad!.path`,
expectErr: true,
},
{
testString: `nomad%http.timer`,
expectErr: true,
},
{
testString: `nomad,http.mount`,
expectErr: true,
},
{
testString: `nomad$http.service`,
expectErr: true,
},
{
testString: `nomad$.http.service`,
expectErr: true,
},
{
testString: `nomad$`,
expectErr: true,
},
}
for _, tc := range cases {
t.Run(tc.testString, func(t *testing.T) {
err := ScanServiceName(tc.testString)
if !tc.expectErr {
must.NoError(t, err)
} else {
must.Error(t, err)
}
})
}
}

View File

@@ -0,0 +1,99 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package monitor
import (
"encoding/json"
"errors"
"io"
"net"
"strings"
"time"
"github.com/hashicorp/go-msgpack/v2/codec"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
// StreamingClient is an interface that implements the StreamingRpcHandler function
type StreamingClient interface {
StreamingRpcHandler(string) (structs.StreamingRpcHandler, error)
}
// ExportMonitorClient_TestHelper consolidates streaming test setup for use in
// client and server RPChandler tests
func ExportMonitorClient_TestHelper(req cstructs.MonitorExportRequest, c StreamingClient,
userTimeout <-chan time.Time) (*strings.Builder, error) {
var (
builder strings.Builder
returnedErr error
timeout <-chan time.Time
)
handler, err := c.StreamingRpcHandler("Agent.MonitorExport")
if err != nil {
return nil, err
}
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
err := decoder.Decode(&msg)
streamMsg <- &msg
if err != nil {
errCh <- err
return
}
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
if err := encoder.Encode(req); err != nil {
return nil, err
}
if userTimeout != nil {
timeout = userTimeout
}
OUTER:
for {
select {
case <-timeout:
return nil, errors.New("expected to be unreachable")
case err := <-errCh:
if err != nil && err != io.EOF {
return nil, err
}
case message := <-streamMsg:
var frame sframer.StreamFrame
if message.Error != nil {
returnedErr = message.Error
}
if len(message.Payload) != 0 {
err = json.Unmarshal(message.Payload, &frame)
returnedErr = err
builder.Write(frame.Data)
} else {
break OUTER
}
}
}
return &builder, returnedErr
}

View File

@@ -7,11 +7,8 @@ import (
"fmt"
"io"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/hashicorp/cli"
"github.com/hashicorp/nomad/api"
@@ -127,31 +124,12 @@ func (c *MonitorCommand) Run(args []string) int {
eventDoneCh := make(chan struct{})
frames, errCh := client.Agent().Monitor(eventDoneCh, query)
select {
case err := <-errCh:
r, err := streamFrames(frames, errCh, -1, eventDoneCh)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err))
c.Ui.Error(commandErrorText(c))
return 1
default:
}
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, errCh, eventDoneCh)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader
defer r.Close()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-signalCh
// End the streaming
r.Close()
}()
_, err = io.Copy(os.Stdout, r)
if err != nil {
c.Ui.Error(fmt.Sprintf("error monitoring logs: %s", err))

View File

@@ -0,0 +1,209 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package command
import (
"fmt"
"io"
"os"
"strconv"
"strings"
"time"
"github.com/hashicorp/cli"
"github.com/hashicorp/nomad/api"
"github.com/posener/complete"
)
type MonitorExportCommand struct {
Meta
// Below this point is where CLI flag options are stored.
nodeID string
serverID string
onDisk bool
logsSince time.Duration
serviceName string
follow bool
}
func (c *MonitorExportCommand) Help() string {
helpText := `
Usage: nomad monitor export [options]
Use the 'nomad monitor export' command to export an agent's historic data
from journald or its Nomad log file. If exporting journald logs, you must
pass '-service-name' with the name of the nomad service.
The '-logs-since' and '-follow' options are only valid for journald queries.
You may pass a duration string to the '-logs-since' option to override the
default 72h duration. Nomad will accept the following time units in the
'-logs-since duration string:"ns", "us" (or "µs"), "ms", "s", "m", "h".
The '-follow=true' option causes the agent to continue to stream logs until
interrupted or until the remote agent quits. Nomad only supports journald
queries on Linux.
If you do not use Linux or you do not run Nomad as a systemd unit, pass the
'-on-disk=true' option to export the entirety of a given agent's nomad log file.
When ACLs are enabled, this command requires a token with the 'agent:read'
capability.
General Options:
` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + `
Monitor Specific Options:
-node-id <node-id>
Sets the specific node to monitor. Accepts only a single node-id and cannot
be used with server-id.
-server-id <server-id>
Sets the specific server to monitor. Accepts only a single server-id and
cannot be used with node-id.
-service-name <service-name>
Sets the name of the nomad service, must match systemd conventions and
include the word 'nomad'. You may provide the full systemd file name
or omit the suffix. If your service name includes a '.', you must include
a valid suffix (e.g. nomad.client.service).
-logs-since <duration string>
Sets the journald log period, invalid if on-disk=true. Defaults to 72h.
Valid unit strings are "ns", "us" (or "µs"), "ms", "s", "m", "h".
-follow <bool>
If set, the export command will continue streaming until interrupted. Ignored
if on-disk=true.
-on-disk <bool>
If set, the export command will retrieve the Nomad log file defined in the
target agent's log_file configuration.
`
return strings.TrimSpace(helpText)
}
func (c *MonitorExportCommand) Synopsis() string {
return "Stream logs from a Nomad agent"
}
func (c *MonitorExportCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-node-id": NodePredictor(c.Client),
"-server-id": ServerPredictor(c.Client),
"-service-name": complete.PredictSet("nomad"),
"-logs-since": complete.PredictNothing,
"-follow": complete.PredictNothing,
"-on-disk": complete.PredictNothing,
})
}
func (c *MonitorExportCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}
func (c *MonitorExportCommand) Name() string { return "monitor export" }
func (c *MonitorExportCommand) Run(args []string) int {
c.Ui = &cli.PrefixedUi{
OutputPrefix: " ",
InfoPrefix: " ",
ErrorPrefix: "==> ",
Ui: c.Ui,
}
defaultDur := time.Hour * 72
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.StringVar(&c.nodeID, "node-id", "", "")
flags.StringVar(&c.serverID, "server-id", "", "")
flags.DurationVar(&c.logsSince, "logs-since", defaultDur,
`sets the journald log period. Defaults to 72h, valid unit strings are
"ns", "us" (or "µs"), "ms", "s", "m", or "h".`)
flags.StringVar(&c.serviceName, "service-name", "",
"the name of the systemdervice unit to collect logs for, cannot be used with on-disk=true")
flags.BoolVar(&c.onDisk, "on-disk", false,
"directs the cli to stream the configured nomad log file, cannot be used with -service-name")
flags.BoolVar(&c.follow, "follow", false, "")
if err := flags.Parse(args); err != nil {
return 1
}
args = flags.Args()
if l := len(args); l != 0 {
c.Ui.Error("This command takes no arguments")
c.Ui.Error(commandErrorText(c))
return 1
}
if c.serviceName != "" && c.onDisk {
c.Ui.Error("Cannot target journalctl and nomad log file simultaneously")
c.Ui.Error(commandErrorText(c))
}
if c.serviceName != "" {
if isNomad := strings.Contains(c.serviceName, "nomad"); !isNomad {
c.Ui.Error(fmt.Sprintf("Invalid value: -service-name=%s does not include 'nomad'", c.serviceName))
c.Ui.Error(commandErrorText(c))
}
}
if c.serviceName == "" && !c.onDisk {
c.Ui.Error("One of -service-name or -on-disk must be set")
}
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
c.Ui.Error(commandErrorText(c))
return 1
}
// Query the node info and lookup prefix
if c.nodeID != "" {
c.nodeID, err = lookupNodeID(client.Nodes(), c.nodeID)
if err != nil {
c.Ui.Error(err.Error())
return 1
}
}
params := map[string]string{
"follow": strconv.FormatBool(c.follow),
"logs_since": c.logsSince.String(),
"node_id": c.nodeID,
"on_disk": strconv.FormatBool(c.onDisk),
"server_id": c.serverID,
"service_name": c.serviceName,
}
query := &api.QueryOptions{
Params: params,
}
eventDoneCh := make(chan struct{})
frames, errCh := client.Agent().MonitorExport(eventDoneCh, query)
r, err := streamFrames(frames, errCh, -1, eventDoneCh)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error starting monitor: \n%s", err))
c.Ui.Error(commandErrorText(c))
return 1
}
n, err := io.Copy(os.Stdout, r)
if err != nil && err != io.EOF {
c.Ui.Error(fmt.Sprintf("Error monitoring logs: %s", err.Error()))
return 1
}
if n == 0 && err == nil {
emptyMessage := "Returned no data or errors, check your log_file configuration or service name"
c.Ui.Error(fmt.Sprintf("Error starting monitor: \n%s", emptyMessage))
return 1
}
return 0
}

View File

@@ -0,0 +1,93 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package command
import (
"os"
"path/filepath"
"testing"
"github.com/hashicorp/cli"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/command/agent"
"github.com/shoenig/test/must"
)
func TestMonitorExportCommand_Implements(t *testing.T) {
ci.Parallel(t)
var _ cli.Command = &MonitorExportCommand{}
}
func TestMonitorExportCommand_Fails(t *testing.T) {
const expectedText = "log log log log log"
tempDir := t.TempDir()
testFile := filepath.Join(tempDir, "test.log")
must.NoError(t, os.WriteFile(testFile, []byte(expectedText), 0777))
config := func(c *agent.Config) {
c.LogFile = testFile
}
srv, _, url := testServer(t, false, config)
defer srv.Shutdown()
cases := []struct {
name string
cmdArgs []string
defaultErr bool
errString string
}{
{
name: "misuse",
cmdArgs: []string{"some", "bad", "args"},
defaultErr: true,
},
{
name: "no address",
cmdArgs: []string{"-address=nope"},
errString: "unsupported protocol scheme",
},
{
name: "invalid follow boolean",
cmdArgs: []string{"-address=" + url, "-follow=maybe"},
errString: `invalid boolean value "maybe" for -follow`,
},
{
name: "invalid on-disk boolean",
cmdArgs: []string{"-address=" + url, "-on-disk=maybe"},
errString: `invalid boolean value "maybe" for -on-disk`,
},
{
name: "setting both on-disk and service-name",
cmdArgs: []string{"-address=" + url, "-on-disk=true", "-service-name=nomad"},
errString: "journald and nomad log file simultaneously",
},
{
name: "setting neither on-disk nor service-name",
cmdArgs: []string{"-address=" + url},
errString: "One of -service-name or -on-disk must be set",
},
{
name: "requires nomad in service name",
cmdArgs: []string{"-address=" + url, "-service-name=docker.path"},
errString: "does not include 'nomad'",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ui := cli.NewMockUi()
cmd := &MonitorExportCommand{Meta: Meta{Ui: ui}}
code := cmd.Run(tc.cmdArgs)
must.One(t, code)
out := ui.ErrorWriter.String()
if tc.defaultErr {
must.StrContains(t, out, commandErrorText(cmd))
} else {
must.StrContains(t, out, tc.errString)
}
})
}
}

View File

@@ -8,9 +8,7 @@ import (
"io"
"math/rand"
"os"
"os/signal"
"strings"
"syscall"
"time"
humanize "github.com/dustin/go-humanize"
@@ -355,38 +353,19 @@ func (f *AllocFSCommand) Run(args []string) int {
return 0
}
// followFile outputs the contents of the file to stdout relative to the end of
// the file. If numLines does not equal -1, then tail -n behavior is used.
// followFile calls the streamFrames helper to output the contents of the
// file to stdout relative to the end of the file. If numLines does not equal
// -1, then tail -n behavior is used.
func (f *AllocFSCommand) followFile(client *api.Client, alloc *api.Allocation,
path, origin string, offset, numLines int64) (io.ReadCloser, error) {
cancel := make(chan struct{})
frames, errCh := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil)
select {
case err := <-errCh:
r, err := streamFrames(frames, errCh, numLines, cancel)
if err != nil {
return nil, err
default:
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, errCh, cancel)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second)
}
go func() {
<-signalCh
// End the streaming
r.Close()
}()
return r, nil
}

View File

@@ -579,6 +579,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"monitor export": func() (cli.Command, error) {
return &MonitorExportCommand{
Meta: meta,
}, nil
},
"namespace": func() (cli.Command, error) {
return &NamespaceCommand{
Meta: meta,

View File

@@ -11,9 +11,11 @@ import (
"io"
"maps"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
"github.com/hashicorp/cli"
@@ -782,3 +784,35 @@ func getByPrefix[T any](
return nil, objs, nil
}
}
func streamFrames(frames <-chan *api.StreamFrame, errCh <-chan error,
numLines int64, cancel chan struct{}) (io.ReadCloser, error) {
select {
case err := <-errCh:
return nil, err
default:
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, errCh, cancel)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader
// If numLines is set, wrap the reader
if numLines != -1 {
r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second)
} else {
}
go func() {
<-signalCh
// End the streaming
r.Close()
}()
return r, nil
}

View File

@@ -722,3 +722,126 @@ func TestHelperGetByPrefix(t *testing.T) {
}
}
// TestHelperStreamFrames tests the streamFrames command helper used
// by the agent_monitor and fs_alloc endpoints to populate a reader
// with data from the streamFrame channel passed to the function
func TestHelperStreamFrames(t *testing.T) {
const loopCount = 50
// Create test file
dir := t.TempDir()
f, err := os.CreateTemp(dir, "log")
must.NoError(t, err)
writeLine := []byte("[INFO]log log log made of wood you are heavy but so good\n")
writeLength := len(writeLine)
for range loopCount {
_, _ = f.Write(writeLine)
}
f.Close()
// Create test file reader for streaming
goldenFilePath := f.Name()
goldenFileContents, err := os.ReadFile(goldenFilePath)
must.NoError(t, err)
fileReader, err := os.Open(goldenFilePath)
must.NoError(t, err)
// Helper func to populate stream chan in test case
streamFunc := func() (chan *api.StreamFrame, chan error, chan struct{}) {
framesCh := make(chan *api.StreamFrame, 30)
errCh := make(chan error)
cancelCh := make(chan struct{})
offset := 0
r := io.LimitReader(fileReader, 64)
for {
bytesHolder := make([]byte, 64)
n, err := r.Read(bytesHolder)
if err != nil && err != io.EOF {
must.NoError(t, err)
}
if n == 0 && err == io.EOF {
break
}
offset += n
framesCh <- &api.StreamFrame{
Offset: int64(offset),
Data: goldenFileContents,
File: goldenFilePath,
}
if n != 0 && err == io.EOF {
//break after sending if we hit EOF with bytes in buffer
break
}
}
close(framesCh)
return framesCh, errCh, cancelCh
}
testErr := errors.New("isErr")
cases := []struct {
name string
numLines int
expectErr bool
err error
}{
{
name: "happy_no_limit",
numLines: -1,
},
{
name: "happy_limit",
numLines: 25,
},
{
name: "error",
numLines: -1,
expectErr: true,
err: testErr,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
framesCh, errCh, cancelCh := streamFunc()
if tc.expectErr {
go func() {
time.Sleep(time.Nanosecond * 1)
errCh <- tc.err
}()
}
r, err := streamFrames(framesCh, errCh, int64(tc.numLines), cancelCh)
if !tc.expectErr {
must.NoError(t, err)
}
result, err := io.ReadAll(r)
if !tc.expectErr {
must.NoError(t, err)
}
if tc.numLines == -1 {
//expectedLength := writeLength * loopCount
must.Eq(t,
goldenFileContents,
result)
} else {
expectedLength := (writeLength * tc.numLines)
must.Eq(t,
expectedLength,
len(result))
}
r.Close()
})
}
}

View File

@@ -13,7 +13,7 @@ import (
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/v2/codec"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/host"
@@ -21,8 +21,6 @@ import (
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/go-msgpack/v2/codec"
)
type Agent struct {
@@ -35,6 +33,7 @@ func NewAgentEndpoint(srv *Server) *Agent {
func (a *Agent) register() {
a.srv.streamingRpcs.Register("Agent.Monitor", a.monitor)
a.srv.streamingRpcs.Register("Agent.MonitorExport", a.monitorExport)
}
func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPprofResponse) error {
@@ -64,7 +63,7 @@ func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPpr
return fmt.Errorf("missing target RPC")
}
if region != a.srv.config.Region {
if region != a.srv.Region() {
// Mark that we are forwarding
args.SetForwarded()
return a.srv.forwardRegion(region, "Agent.Profile", args, reply)
@@ -87,7 +86,7 @@ func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPpr
}
// This server is the target, so now we can check for AllowAgentDebug
if !aclObj.AllowAgentDebug(a.srv.config.EnableDebug) {
if !aclObj.AllowAgentDebug(a.srv.GetConfig().EnableDebug) {
return structs.ErrPermissionDenied
}
@@ -168,17 +167,17 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
// Targeting a node, forward request to node
if args.NodeID != "" {
a.forwardMonitorClient(conn, args, encoder, decoder)
a.forwardMonitorClient(conn, args, encoder, decoder, args.NodeID, "Agent.Monitor")
// forwarded request has ended, return
return
}
region := args.RequestRegion()
if region == "" {
handleStreamResultError(fmt.Errorf("missing target RPC"), pointer.Of(int64(400)), encoder)
handleStreamResultError(fmt.Errorf("missing target region"), pointer.Of(int64(400)), encoder)
return
}
if region != a.srv.config.Region {
if region != a.srv.Region() {
// Mark that we are forwarding
args.SetForwarded()
}
@@ -191,7 +190,9 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
return
}
if serverToFwd != nil {
a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder)
// Empty ServerID to prevent forwarding loop
args.ServerID = ""
a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder, "Agent.Monitor")
return
}
}
@@ -200,7 +201,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
monitor := monitor.New(512, a.srv.logger, &log.LoggerOptions{
m := monitor.New(512, a.srv.logger, &log.LoggerOptions{
Level: logLevel,
JSONFormat: args.LogJSON,
IncludeLocation: args.LogIncludeLocation,
@@ -225,8 +226,8 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
<-ctx.Done()
}()
logCh := monitor.Start()
defer monitor.Stop()
logCh := m.Start()
defer m.Stop()
initialOffset := int64(0)
// receive logs and build frames
@@ -248,48 +249,135 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
}
}
}()
streamEncoder := monitor.NewStreamEncoder(&buf, conn, encoder, frameCodec, args.PlainText)
streamErr := streamEncoder.EncodeStream(frames, errCh, ctx, framer, false)
if streamErr != nil {
handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder)
return
}
}
var streamErr error
OUTER:
for {
select {
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
// There was a pending error!
default:
// No error, continue on
}
func (a *Agent) monitorExport(conn io.ReadWriteCloser) {
defer conn.Close()
// Decode args
var args cstructs.MonitorExportRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
break OUTER
}
if err := decoder.Decode(&args); err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
return
}
authErr := a.srv.Authenticate(nil, &args)
a.srv.MeasureRPCRate("agent", structs.RateMetricRead, &args)
if authErr != nil {
handleStreamResultError(structs.ErrPermissionDenied, nil, encoder)
return
}
var resp cstructs.StreamErrWrapper
if args.PlainText {
resp.Payload = frame.Data
} else {
if err := frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}
// Check agent read permissions
if aclObj, err := a.srv.ResolveACL(&args); err != nil {
handleStreamResultError(err, nil, encoder)
return
} else if !aclObj.AllowAgentRead() {
handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder)
return
}
resp.Payload = buf.Bytes()
buf.Reset()
}
// Targeting a node, forward request to node
if args.NodeID != "" {
a.forwardMonitorClient(conn, args, encoder, decoder, args.NodeID, "Agent.MonitorExport")
// forwarded request has ended, return
return
}
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
region := args.RequestRegion()
if region == "" {
handleStreamResultError(fmt.Errorf("missing target region"), pointer.Of(int64(400)), encoder)
return
}
if region != a.srv.Region() {
// Mark that we are forwarding
args.SetForwarded()
}
// Try to forward request to remote region/server
if args.ServerID != "" {
serverToFwd, err := a.forwardFor(args.ServerID, region)
if err != nil {
handleStreamResultError(err, pointer.Of(int64(400)), encoder)
return
}
if serverToFwd != nil {
//empty args.ServerID to prevent forwarding loop
args.ServerID = ""
a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder, "Agent.MonitorExport")
return
}
}
nomadLogPath := a.srv.GetConfig().LogFile
if args.OnDisk && nomadLogPath == "" {
handleStreamResultError(errors.New("No nomad log file defined"), pointer.Of(int64(400)), encoder)
}
// NodeID was empty, ServerID was equal to this server, monitor this server
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opts := monitor.MonitorExportOpts{
Logger: a.srv.logger,
LogsSince: args.LogsSince,
ServiceName: args.ServiceName,
NomadLogPath: nomadLogPath,
OnDisk: args.OnDisk,
Follow: args.Follow,
Context: ctx,
}
frames := make(chan *sframer.StreamFrame, 32)
errCh := make(chan error)
var buf bytes.Buffer
frameSize := 1024
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, frameSize)
framer.Run()
defer framer.Destroy()
// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe explicitly closed, exit
cancel()
return
}
<-ctx.Done()
}()
m, err := monitor.NewExportMonitor(opts)
if err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
return
}
var eofCancelCh chan error
streamCh := m.Start()
initialOffset := int64(0)
eofCancel := !opts.Follow
streamEncoder := monitor.NewStreamEncoder(&buf, conn, encoder, frameCodec, args.PlainText)
// receive logs and build frames
streamReader := monitor.NewStreamReader(streamCh, framer, int64(frameSize))
go func() {
defer framer.Destroy()
if err := streamReader.StreamFixed(ctx, initialOffset, "", 0, eofCancelCh, eofCancel); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
}
}()
streamErr := streamEncoder.EncodeStream(frames, errCh, ctx, framer, true)
if streamErr != nil {
handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder)
return
@@ -334,11 +422,10 @@ func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) {
return target, nil
}
func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args any, encoder *codec.Encoder, decoder *codec.Decoder, nodeID string, endpoint string) {
// Get the Connection to the client either by fowarding to another server
// or creating direct stream
state, srv, err := a.findClientConn(args.NodeID)
state, srv, err := a.findClientConn(nodeID)
if err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
return
@@ -347,7 +434,7 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni
var clientConn net.Conn
if state == nil {
conn, err := a.srv.streamingRpc(srv, "Agent.Monitor")
conn, err := a.srv.streamingRpc(srv, endpoint)
if err != nil {
handleStreamResultError(err, nil, encoder)
return
@@ -355,7 +442,7 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni
clientConn = conn
} else {
stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor")
stream, err := NodeStreamingRpc(state.Session, endpoint)
if err != nil {
handleStreamResultError(err, nil, encoder)
return
@@ -374,10 +461,7 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni
structs.Bridge(conn, clientConn)
}
func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, server *serverParts, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
// empty ServerID to prevent forwarding loop
args.ServerID = ""
func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, server *serverParts, args any, encoder *codec.Encoder, decoder *codec.Decoder, endpoint string) {
serverConn, err := a.srv.streamingRpc(server, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
@@ -439,7 +523,7 @@ func (a *Agent) Host(args *structs.HostDataRequest, reply *structs.HostDataRespo
return fmt.Errorf("missing target RPC")
}
if region != a.srv.config.Region {
if region != a.srv.Region() {
// Mark that we are forwarding
args.SetForwarded()
return a.srv.forwardRegion(region, "Agent.Host", args, reply)

View File

@@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net"
"os"
"strings"
"testing"
"time"
@@ -21,6 +22,7 @@ import (
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
@@ -1023,3 +1025,86 @@ func TestAgentHost_ACLDebugRequired(t *testing.T) {
err := s.RPC("Agent.Host", &req, &resp)
must.EqError(t, err, structs.ErrPermissionDenied.Error())
}
func TestMonitor_MonitorExport(t *testing.T) {
ci.Parallel(t)
const (
shortText = "log log log log log"
)
// Create test file
dir := t.TempDir()
f, err := os.CreateTemp(dir, "log")
must.NoError(t, err)
for range 1000 {
_, _ = f.WriteString(fmt.Sprintf("%v [INFO] it's log, it's log, it's big it's heavy it's wood", time.Now()))
}
f.Close()
longFilePath := f.Name()
longFileContents, err := os.ReadFile(longFilePath)
must.NoError(t, err)
// start server
s, root, cleanupS := TestACLServer(t, func(c *Config) {
c.LogFile = longFilePath
})
defer cleanupS()
defer os.Remove(longFilePath)
testutil.WaitForLeader(t, s.RPC)
cases := []struct {
name string
expected string
nomadLogPath string
serviceName string
token *structs.ACLToken
onDisk bool
expectErr bool
}{
{
name: "happy_path_long_file",
onDisk: true,
expected: string(longFileContents),
token: root,
},
{
name: "token_error",
onDisk: true,
expected: string(longFileContents),
token: &structs.ACLToken{},
expectErr: true,
},
{
name: "invalid_service_name",
serviceName: "nomad$",
expected: string(longFileContents),
token: &structs.ACLToken{},
expectErr: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// No NodeID set to force server use
req := cstructs.MonitorExportRequest{
LogsSince: "72",
NomadLogPath: tc.nomadLogPath,
OnDisk: tc.onDisk,
ServiceName: tc.serviceName,
QueryOptions: structs.QueryOptions{
Region: "global",
AuthToken: tc.token.SecretID,
},
}
builder, finalError := monitor.ExportMonitorClient_TestHelper(req, s, time.After(3*time.Second))
if tc.expectErr {
must.Error(t, finalError)
return
}
must.NoError(t, err)
must.NotNil(t, builder)
must.Eq(t, strings.TrimSpace(tc.expected), strings.TrimSpace(builder.String()))
})
}
}

View File

@@ -448,6 +448,9 @@ type Config struct {
// considered healthy. Without this, the server can hang indefinitely
// waiting for these.
StartTimeout time.Duration
// LogFile is used by MonitorExport to stream a server's log file
LogFile string `hcl:"log_file"`
}
func (c *Config) Copy() *Config {

View File

@@ -0,0 +1,79 @@
---
layout: docs
page_title: 'nomad monitor export command reference'
description: |
The `nomad monitor export` command returns logs written to disk or journald by a nomad agent.
---
# `nomad monitor export` command reference
The `nomad monitor export` command returns logs written to disk or journald by a nomad agent.
## Usage
```plaintext
nomad monitor export [options]
```
Use the `nomad monitor export` command to export an agent's historic data
from journald or its Nomad log file. If exporting journald logs, you must
pass `-service-name` with the name of the systemd unit to query.
The `-logs-since` and `-follow` options are only valid for journald queries.
You may pass a duration string to the `-logs-since` option to override the
default 72h duration. Nomad will accept the following time units in the
`-logs-since` duration string: "ns", "us" (or "µs"), "ms", "s", "m", "h".
The `-follow=true` option causes the agent to continue to stream logs until
interrupted or until the remote agent quits. Nomad only supports journald
queries on Linux.
If you do not use Linux or you do not run Nomad as a systemd unit, pass the
`-on-disk=true` option to export the entirety of a given agent's nomad log file.
When ACLs are enabled, this command requires a token with the `agent:read`
capability.
## Options
- `-node-id`: Specifies the client node-id to stream logs from. If no
node-id is given, the Nomad server from the `-address` flag is used.
- `-server-id`: Specifies the Nomad server id to stream logs from. Accepts
server names from `nomad server members` and also a special `leader` option
which will target the current leader.
- `-service-name`: Specifies the the name of the systemd unit for export.
Do not use with `-on-disk`. Must include 'nomad' and conform to systemd
naming conventions. You may provide the full systemd file name
or omit the suffix. If your service name includes a '.', you must include
a valid suffix (e.g. nomad.client.service).
- `-logs-since`: Duration used to determine how far back to return logs from
journald. Ignored if used with `-on-disk` and defaults to `72h` if not set.
- `-follow`: Boolean that, if true, continues streaming journald logs until
interrupted. Do not use with `-on-disk`
- `-on-disk`: Boolean that, if true, returns the contents of the Nomad log file
defined in the agent config.
## Examples
This example returns journald log entries with a specific node ID and service name.
```shell-session
$ nomad monitor export -node-id=$(nomad node status --quiet) -service-name="nomad"
Jun 04 20:09:29 nomad-client01 systemd[1]: Starting Nomad...
Subject: A start job for unit nomad_client.service has begun execution
```
This example returns the contents of the nomad log file for a specific server.
```shell-session
$ nomad monitor export -server-id=a57b2adb-1a30-2dda-8df0-25abb0881952 -on-disk=true
2025-06-20T12:22:08.528-0500 [DEBUG] http: request complete: method=GET path=/v1/agent/health?type=server duration=1.445739ms
2025-06-20T12:22:09.892-0500 [DEBUG] nomad: memberlist: Stream connection from=127.0.0.1:53628
```
## General options
@include 'general_options_no_namespace.mdx'

View File

@@ -412,7 +412,15 @@
},
{
"title": "monitor",
"path": "monitor"
"routes": [
{
"title": "Overview",
"path": "monitor"
},
{ "title": "export",
"path": "monitor/export"
}
]
},
{
"title": "namespace",