diff --git a/.changelog/26178.txt b/.changelog/26178.txt new file mode 100644 index 000000000..a3674d07c --- /dev/null +++ b/.changelog/26178.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: Added monitor export cli command to retrieve journald logs or the contents of the Nomad log file for a given Nomad agent +``` diff --git a/api/agent.go b/api/agent.go index 295d159a0..ee8ff65fd 100644 --- a/api/agent.go +++ b/api/agent.go @@ -302,8 +302,20 @@ func (a *Agent) Host(serverID, nodeID string, q *QueryOptions) (*HostDataRespons // Monitor returns a channel which will receive streaming logs from the agent // Providing a non-nil stopCh can be used to close the connection and stop log streaming func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { + frames, errCh := a.monitorHelper(stopCh, q, "/v1/agent/monitor") + return frames, errCh +} + +// MonitorExport returns a channel which will receive streaming logs from the agent +// Providing a non-nil stopCh can be used to close the connection and stop log streaming +func (a *Agent) MonitorExport(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { + frames, errCh := a.monitorHelper(stopCh, q, "/v1/agent/monitor/export") + return frames, errCh +} + +func (a *Agent) monitorHelper(stopCh <-chan struct{}, q *QueryOptions, path string) (chan *StreamFrame, chan error) { errCh := make(chan error, 1) - r, err := a.client.newRequest("GET", "/v1/agent/monitor") + r, err := a.client.newRequest("GET", path) if err != nil { errCh <- err return nil, errCh diff --git a/api/fs.go b/api/fs.go index f6b831c30..8e65f60ab 100644 --- a/api/fs.go +++ b/api/fs.go @@ -389,12 +389,23 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { case <-unblock: return 0, nil case err := <-f.errCh: - return 0, err + // check for race with f.frames before returning error + select { + case frame, ok := <-f.frames: + if !ok { + return 0, io.EOF + } + f.frame = frame + + // Store the total offset into the file + f.byteOffset = int(f.frame.Offset) + default: + return 0, err + } case <-f.cancelCh: return 0, io.EOF } } - // Copy the data out of the frame and update our offset n = copy(p, f.frame.Data[f.frameOffset:]) f.frameOffset += n diff --git a/client/agent_endpoint.go b/client/agent_endpoint.go index a47dcde05..cb43ac3ae 100644 --- a/client/agent_endpoint.go +++ b/client/agent_endpoint.go @@ -10,19 +10,16 @@ import ( "io" "time" + log "github.com/hashicorp/go-hclog" + metrics "github.com/hashicorp/go-metrics/compat" "github.com/hashicorp/go-msgpack/v2/codec" - + sframer "github.com/hashicorp/nomad/client/lib/streamframer" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/structs" - - log "github.com/hashicorp/go-hclog" - metrics "github.com/hashicorp/go-metrics/compat" - - sframer "github.com/hashicorp/nomad/client/lib/streamframer" - cstructs "github.com/hashicorp/nomad/client/structs" ) type Agent struct { @@ -32,6 +29,7 @@ type Agent struct { func NewAgentEndpoint(c *Client) *Agent { a := &Agent{c: c} a.c.streamingRpcs.Register("Agent.Monitor", a.monitor) + a.c.streamingRpcs.Register("Agent.MonitorExport", a.monitorExport) return a } @@ -84,7 +82,6 @@ func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPpr func (a *Agent) monitor(conn io.ReadWriteCloser) { defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now()) defer conn.Close() - // Decode arguments var args cstructs.MonitorRequest decoder := codec.NewDecoder(conn, structs.MsgpackHandle) @@ -117,7 +114,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - monitor := monitor.New(512, a.c.logger, &log.LoggerOptions{ + m := monitor.New(512, a.c.logger, &log.LoggerOptions{ JSONFormat: args.LogJSON, Level: logLevel, IncludeLocation: args.LogIncludeLocation, @@ -143,8 +140,8 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { <-ctx.Done() }() - logCh := monitor.Start() - defer monitor.Stop() + logCh := m.Start() + defer m.Stop() initialOffset := int64(0) // receive logs and build frames @@ -164,49 +161,11 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { case <-ctx.Done(): break LOOP } + } }() - - var streamErr error -OUTER: - for { - select { - case frame, ok := <-frames: - if !ok { - // frame may have been closed when an error - // occurred. Check once more for an error. - select { - case streamErr = <-errCh: - // There was a pending error! - default: - // No error, continue on - } - - break OUTER - } - - var resp cstructs.StreamErrWrapper - if args.PlainText { - resp.Payload = frame.Data - } else { - if err := frameCodec.Encode(frame); err != nil { - streamErr = err - break OUTER - } - - resp.Payload = buf.Bytes() - buf.Reset() - } - - if err := encoder.Encode(resp); err != nil { - streamErr = err - break OUTER - } - encoder.Reset(conn) - case <-ctx.Done(): - break OUTER - } - } + streamEncoder := monitor.NewStreamEncoder(&buf, conn, encoder, frameCodec, args.PlainText) + streamErr := streamEncoder.EncodeStream(frames, errCh, ctx, framer, false) if streamErr != nil { handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder) @@ -214,7 +173,7 @@ OUTER: } } -// Host collects data about the host evironment running the agent +// Host collects data about the host environment running the agent func (a *Agent) Host(args *structs.HostDataRequest, reply *structs.HostDataResponse) error { aclObj, err := a.c.ResolveToken(args.AuthToken) if err != nil { @@ -233,3 +192,94 @@ func (a *Agent) Host(args *structs.HostDataRequest, reply *structs.HostDataRespo reply.HostData = data return nil } + +func (a *Agent) monitorExport(conn io.ReadWriteCloser) { + defer conn.Close() + + // Decode arguments + var args cstructs.MonitorExportRequest + + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + + if err := decoder.Decode(&args); err != nil { + handleStreamResultError(err, pointer.Of(int64(500)), encoder) + return + } + + // Check acl + if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { + handleStreamResultError(err, pointer.Of(int64(403)), encoder) + return + } else if !aclObj.AllowAgentRead() { + handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder) + return + } + + nomadLogPath := a.c.GetConfig().LogFile + if args.OnDisk && nomadLogPath == "" { + handleStreamResultError(errors.New("No nomad log file defined"), pointer.Of(int64(400)), encoder) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + opts := monitor.MonitorExportOpts{ + Logger: a.c.logger, + LogsSince: args.LogsSince, + ServiceName: args.ServiceName, + NomadLogPath: nomadLogPath, + OnDisk: args.OnDisk, + Follow: args.Follow, + Context: ctx, + } + + frames := make(chan *sframer.StreamFrame, streamFramesBuffer) + errCh := make(chan error) + var buf bytes.Buffer + frameSize := 1024 + frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + + framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, frameSize) + framer.Run() + defer framer.Destroy() + + // goroutine to detect remote side closing + go func() { + if _, err := conn.Read(nil); err != nil { + // One end of the pipe explicitly closed, exit + cancel() + return + } + <-ctx.Done() + }() + + m, err := monitor.NewExportMonitor(opts) + if err != nil { + handleStreamResultError(err, pointer.Of(int64(500)), encoder) + return + } + var eofCancelCh chan error + + streamCh := m.Start() + initialOffset := int64(0) + eofCancel := !opts.Follow + + // receive logs and build frames + streamReader := monitor.NewStreamReader(streamCh, framer, int64(frameSize)) + go func() { + defer framer.Destroy() + if err := streamReader.StreamFixed(ctx, initialOffset, "", 0, eofCancelCh, eofCancel); err != nil { + select { + case errCh <- err: + case <-ctx.Done(): + } + } + }() + streamEncoder := monitor.NewStreamEncoder(&buf, conn, encoder, frameCodec, args.PlainText) + streamErr := streamEncoder.EncodeStream(frames, errCh, ctx, framer, true) + + if streamErr != nil { + handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder) + return + } +} diff --git a/client/agent_endpoint_test.go b/client/agent_endpoint_test.go index 7c02691e5..3eb845bd1 100644 --- a/client/agent_endpoint_test.go +++ b/client/agent_endpoint_test.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net" + "os" "strings" "testing" "time" @@ -18,11 +19,13 @@ import ( "github.com/hashicorp/nomad/client/config" sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -446,3 +449,81 @@ func TestAgentHost_ACL(t *testing.T) { }) } } + +func TestMonitor_MonitorExport(t *testing.T) { + ci.Parallel(t) + + // Create test file + dir := t.TempDir() + f, err := os.CreateTemp(dir, "log") + must.NoError(t, err) + for range 1000 { + _, _ = f.WriteString(fmt.Sprintf("%v [INFO] it's log, it's log, it's big it's heavy it's wood", time.Now())) + } + f.Close() + testFilePath := f.Name() + testFileContents, err := os.ReadFile(testFilePath) + must.NoError(t, err) + + // start server + s, root, cleanupS := nomad.TestACLServer(t, nil) + defer cleanupS() + testutil.WaitForLeader(t, s.RPC) + defer cleanupS() + + c, cleanupC := TestClient(t, func(c *config.Config) { + c.ACLEnabled = true + c.Servers = []string{s.GetConfig().RPCAddr.String()} + c.LogFile = testFilePath + }) + + tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny)) + defer cleanupC() + + testutil.WaitForLeader(t, s.RPC) + + cases := []struct { + name string + expected string + serviceName string + token string + onDisk bool + expectErr bool + }{ + { + name: "happy_path_golden_file", + onDisk: true, + expected: string(testFileContents), + token: root.SecretID, + }, + { + name: "token_error", + onDisk: true, + expected: string(testFileContents), + token: tokenBad.SecretID, + expectErr: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + req := cstructs.MonitorExportRequest{ + NodeID: "this is checked in the CLI", + OnDisk: tc.onDisk, + QueryOptions: structs.QueryOptions{ + Region: "global", + AuthToken: tc.token, + }, + } + + builder, finalError := monitor.ExportMonitorClient_TestHelper(req, c, time.After(3*time.Second)) + if tc.expectErr { + must.Error(t, finalError) + return + } + must.NoError(t, err) + must.NotNil(t, builder) + must.Eq(t, strings.TrimSpace(tc.expected), strings.TrimSpace(builder.String())) + }) + + } +} diff --git a/client/config/config.go b/client/config/config.go index ebefd532a..12e295903 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -384,6 +384,9 @@ type Config struct { // NodeMaxAllocs is an optional field that sets the maximum number of // allocations a node can be assigned. Defaults to 0 and ignored if unset. NodeMaxAllocs int + + // LogFile is used by MonitorExport to stream a server's log file + LogFile string `hcl:"log_file"` } type APIListenerRegistrar interface { diff --git a/client/lib/streamframer/framer.go b/client/lib/streamframer/framer.go index 43a63398a..c01733105 100644 --- a/client/lib/streamframer/framer.go +++ b/client/lib/streamframer/framer.go @@ -97,6 +97,9 @@ type StreamFramer struct { // Captures whether the framer is running running bool + + // Confirms final flush sent + flushed bool } // NewStreamFramer creates a new stream framer that will output StreamFrames to @@ -107,7 +110,6 @@ func NewStreamFramer(out chan<- *StreamFrame, // Create the heartbeat and flush ticker heartbeat := time.NewTicker(heartbeatRate) flusher := time.NewTicker(batchWindow) - return &StreamFramer{ out: out, frameSize: frameSize, @@ -123,7 +125,6 @@ func NewStreamFramer(out chan<- *StreamFrame, // Destroy is used to cleanup the StreamFramer and flush any pending frames func (s *StreamFramer) Destroy() { s.l.Lock() - wasShutdown := s.shutdown s.shutdown = true @@ -204,7 +205,6 @@ OUTER: // Send() may have left a partial frame. Send it now. if !s.f.IsCleared() { s.f.Data = s.readData() - // Only send if there's actually data left if len(s.f.Data) > 0 { // Cannot select on shutdownCh as it's already closed @@ -281,6 +281,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e // Flush till we are under the max frame size for s.data.Len() >= s.frameSize || force { + // Clear since are flushing the frame and capturing the file event. // Subsequent data frames will be flushed based on the data size alone // since they share the same fileevent. @@ -309,3 +310,22 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e return nil } + +func (s *StreamFramer) IsFlushed() bool { + return s.flushed +} + +func (s *StreamFramer) Flush() bool { + s.l.Lock() + // Send() may have left a partial frame. Send it now. + s.f.Data = s.readData() + + // Only send if there's actually data left + if len(s.f.Data) > 0 { + s.out <- s.f.Copy() + } + s.flushed = true + + s.l.Unlock() + return s.IsFlushed() +} diff --git a/client/structs/structs.go b/client/structs/structs.go index 4b2fd8fe1..95e7bff9c 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -62,6 +62,37 @@ type MonitorRequest struct { structs.QueryOptions } +type MonitorExportRequest struct { + // NodeID is the node we want to track the logs of + NodeID string + + // ServerID is the server we want to track the logs of + ServerID string + + // ServiceName is the systemd service for which we want to retrieve logs + // Cannot be used with OnDisk + ServiceName string + + // Follow indicates that the monitor should continue to deliver logs until + // an outside interrupt. Cannot be used with OnDisk + Follow bool + + // LogsSince sets the lookback time for monitorExport logs in hours + LogsSince string + + // OnDisk indicates that nomad should export logs written to the configured nomad log path + OnDisk bool + + // NomadLogPath is set to the nomad log path by the HTTP agent if OnDisk + // is true + NomadLogPath string + + // PlainText disables base64 encoding. + PlainText bool + + structs.QueryOptions +} + // AllocFileInfo holds information about a file inside the AllocDir type AllocFileInfo struct { Name string diff --git a/command/agent/agent.go b/command/agent/agent.go index 5760b3747..54e467fa3 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -655,7 +655,8 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { return nil, fmt.Errorf("number of schedulers should be between 0 and %d", runtime.NumCPU()) } - + // Copy LogFile config value + conf.LogFile = agentConfig.LogFile return conf, nil } @@ -753,7 +754,6 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { if conf == nil { conf = clientconfig.DefaultConfig() } - conf.Servers = agentConfig.Client.Servers conf.DevMode = agentConfig.DevMode conf.EnableDebug = agentConfig.EnableDebug @@ -1016,6 +1016,7 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { conf.Users = clientconfig.UsersConfigFromAgent(agentConfig.Client.Users) + conf.LogFile = agentConfig.LogFile return conf, nil } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 8c50f70a0..0c987dbdb 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -22,6 +22,7 @@ import ( "github.com/hashicorp/nomad/api" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/host" + "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" @@ -211,6 +212,103 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( } s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) + codedErr := s.streamMonitor(resp, req, args, nodeID, "Agent.Monitor") + + return nil, codedErr +} + +func (s *HTTPServer) AgentMonitorExport(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Process and validate arguments + onDisk := false + onDiskBool, err := parseBool(req, "on_disk") + if err != nil { + return nil, CodedError(400, fmt.Sprintf("Unknown value for on-disk: %v", err)) + } + if onDiskBool != nil { + onDisk = *onDiskBool + } + + follow := false + followBool, err := parseBool(req, "follow") + if err != nil { + return nil, CodedError(400, fmt.Sprintf("Unknown value for follow: %v", err)) + } + if followBool != nil { + follow = *followBool + } + + plainText := false + plainTextBool, err := parseBool(req, "plain") + if err != nil { + return nil, CodedError(400, fmt.Sprintf("Unknown value for plain: %v", err)) + } + if plainTextBool != nil { + plainText = *plainTextBool + } + + logsSince := "72h" //default value + logsSinceStr := req.URL.Query().Get("logs_since") + if logsSinceStr != "" { + _, err := time.ParseDuration(logsSinceStr) + if err != nil { + return nil, CodedError(400, fmt.Sprintf("Unknown value for logs-since: %v", err)) + } + logsSince = logsSinceStr + } + + serviceName := req.URL.Query().Get("service_name") + + nodeID := req.URL.Query().Get("node_id") + serverID := req.URL.Query().Get("server_id") + + if nodeID != "" && serverID != "" { + return nil, CodedError(400, "Cannot target node and server simultaneously") + } + + if onDisk && serviceName != "" { + return nil, CodedError(400, "Cannot target journald and nomad log file simultaneously") + } + + if !onDisk && serviceName == "" { + return nil, CodedError(400, "Either -service-name or -on-disk must be set") + } + if onDisk && follow { + return nil, CodedError(400, "Cannot follow log file") + } + + if serviceName != "" { + if err := monitor.ScanServiceName(serviceName); err != nil { + return nil, CodedError(422, err.Error()) + } + } + + // Build the request and parse the ACL token + args := cstructs.MonitorExportRequest{ + NodeID: nodeID, + ServerID: serverID, + LogsSince: logsSince, + ServiceName: serviceName, + OnDisk: onDisk, + Follow: follow, + PlainText: plainText, + } + + // Force the Content-Type to avoid Go's http.ResponseWriter from + // detecting an incorrect or unsafe one. + if plainText { + resp.Header().Set("Content-Type", "text/plain") + } else { + resp.Header().Set("Content-Type", "application/json") + } + + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) + codedErr := s.streamMonitor(resp, req, args, nodeID, "Agent.MonitorExport") + + return nil, codedErr +} + +func (s *HTTPServer) streamMonitor(resp http.ResponseWriter, req *http.Request, + args any, nodeID string, endpoint string) error { // Make the RPC var handler structs.StreamingRpcHandler @@ -219,24 +317,25 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( // Determine the handler to use useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(nodeID) if useLocalClient { - handler, handlerErr = s.agent.Client().StreamingRpcHandler("Agent.Monitor") + handler, handlerErr = s.agent.Client().StreamingRpcHandler(endpoint) } else if useClientRPC { - handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler("Agent.Monitor") + handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler(endpoint) } else if useServerRPC { - handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor") + handler, handlerErr = s.agent.Server().StreamingRpcHandler(endpoint) } else { - handlerErr = CodedError(400, "No local Node and node_id not provided") + handlerErr = CodedError(400, "No local Node") } // No node id monitor current server/client } else if srv := s.agent.Server(); srv != nil { - handler, handlerErr = srv.StreamingRpcHandler("Agent.Monitor") + handler, handlerErr = srv.StreamingRpcHandler(endpoint) } else { - handler, handlerErr = s.agent.Client().StreamingRpcHandler("Agent.Monitor") + handler, handlerErr = s.agent.Client().StreamingRpcHandler(endpoint) } if handlerErr != nil { - return nil, CodedError(500, handlerErr.Error()) + return CodedError(500, handlerErr.Error()) } + httpPipe, handlerPipe := net.Pipe() decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) @@ -256,7 +355,6 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( // stream response go func() { defer cancel() - // Send the request if err := encoder.Encode(args); err != nil { errCh <- CodedError(500, err.Error()) @@ -293,7 +391,8 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( }() handler(handlerPipe) - cancel() + cancel() //this seems like it should be wrong to me but removing it didn't + // affect either truncation or short returns codedErr := <-errCh if codedErr != nil && @@ -302,7 +401,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( strings.Contains(codedErr.Error(), "EOF")) { codedErr = nil } - return nil, codedErr + return codedErr } func (s *HTTPServer) AgentForceLeaveRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 7033814b9..49e2d06ca 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -27,6 +27,7 @@ import ( "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/ci" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/nomad/mock" @@ -446,6 +447,204 @@ func TestHTTP_AgentMonitor(t *testing.T) { }) } +func TestHTTP_AgentMonitorExport(t *testing.T) { + ci.Parallel(t) + const expectedText = "log log log log log" + dir := t.TempDir() + testFile, err := os.CreateTemp(dir, "nomadtests") + must.NoError(t, err) + + _, err = testFile.Write([]byte(expectedText)) + must.NoError(t, err) + inlineFilePath := testFile.Name() + + config := func(c *Config) { + c.LogFile = inlineFilePath + } + + baseURL := "/v1/agent/monitor/export?" + cases := []struct { + name string + follow string + logsSince string + nodeID string + onDisk string + serviceName string + serverID string + + config func(c *Config) + errCode int + errString string + expectErr bool + want string + }{ + { + name: "happy_path", + follow: "false", + onDisk: "true", + logsSince: "9s", + + config: config, + expectErr: false, + want: expectedText, + }, + { + name: "invalid_onDisk", + follow: "false", + onDisk: "green", + + config: config, + errCode: 400, + expectErr: true, + errString: "Unknown value for on-disk", + }, + { + name: "invalid_follow", + follow: "green", + onDisk: "false", + + config: config, + errCode: 400, + expectErr: true, + errString: "Unknown value for follow", + }, + { + name: "invalid_service_name", + follow: "true", + onDisk: "false", + serviceName: "nomad%", + + config: config, + errCode: 422, + expectErr: true, + errString: "does not meet systemd conventions", + }, + { + name: "invalid_logsSince_duration", + follow: "false", + onDisk: "true", + serviceName: "nomad", + logsSince: "98seconds", + + config: config, + errCode: 400, + expectErr: true, + errString: `unknown unit "seconds" in duration`, + want: expectedText, + }, + { + name: "server_and_node", + follow: "false", + onDisk: "true", + nodeID: "doesn'tneedtobeuuid", + serverID: "doesntneedtobeuuid", + + config: config, + errCode: 400, + errString: "Cannot target node and server simultaneously", + expectErr: true, + want: expectedText, + }, + { + name: "onDisk_and_serviceName", + follow: "false", + onDisk: "true", + serviceName: "nomad", + nodeID: "doesn'tneedtobeuuid", + + config: config, + errCode: 400, + errString: "Cannot target journald and nomad log file simultaneously", + expectErr: true, + want: expectedText, + }, + { + name: "neither_onDisk_nor_serviceName", + follow: "false", + nodeID: "doesn'tneedtobeuuid", + + config: config, + errCode: 400, + errString: "Either -service-name or -on-disk must be set", + expectErr: true, + want: expectedText, + }, + { + name: "onDisk_and_follow", + follow: "true", + onDisk: "true", + nodeID: "doesn'tneedtobeuuid", + + config: config, + errCode: 400, + errString: "Cannot follow log file", + expectErr: true, + want: expectedText, + }, + { + name: "onDisk_and_no_log_file", + onDisk: "true", + + config: nil, + errCode: 400, + errString: "No nomad log file defined", + expectErr: true, + want: expectedText, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + httpTest(t, tc.config, func(s *TestAgent) { + // Prepare urlstring + urlVal := url.Values{} + urlParamPrep := func(k string, v string, failCase string, values *url.Values) { + if v != failCase { + values.Add(k, v) + } + } + + urlParamPrep("follow", tc.follow, "false", &urlVal) + urlParamPrep("logs_since", tc.logsSince, "", &urlVal) + urlParamPrep("on_disk", tc.onDisk, "", &urlVal) + urlParamPrep("node_id", tc.nodeID, "", &urlVal) + urlParamPrep("server_id", tc.serverID, "", &urlVal) + urlParamPrep("service_name", tc.serviceName, "", &urlVal) + urlString := baseURL + urlVal.Encode() + + req, err := http.NewRequest(http.MethodGet, urlString, nil) + must.NoError(t, err) + + resp := newClosableRecorder() + defer resp.Close() + var ( + builder strings.Builder + frame sframer.StreamFrame + ) + + _, err = s.Server.AgentMonitorExport(resp, req) + if tc.expectErr { + t.Log(err.Error()) + must.Eq(t, tc.errCode, err.(HTTPCodedError).Code()) + must.StrContains(t, err.Error(), tc.errString) + return + } + + must.NoError(t, err) + output, err := io.ReadAll(resp.Body) + must.NoError(t, err) + + err = json.Unmarshal(output, &frame) + if err != nil && err != io.EOF { + must.NoError(t, err) + } + + builder.WriteString(string(frame.Data)) + must.Eq(t, tc.want, builder.String()) + }) + }) + } +} + // Scenarios when Pprof requests should be available // see https://github.com/hashicorp/nomad/issues/6496 // +---------------+------------------+--------+------------------+ diff --git a/command/agent/config.go b/command/agent/config.go index e6311a6af..4cce6d309 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -424,6 +424,9 @@ type ClientConfig struct { // NodeMaxAllocs sets the maximum number of allocations per node // Defaults to 0 and ignored if unset. NodeMaxAllocs int `hcl:"node_max_allocs"` + + // LogFile is used by MonitorExport to stream a client's log file + LogFile string `hcl:"log_file"` } func (c *ClientConfig) Copy() *ClientConfig { @@ -756,6 +759,9 @@ type ServerConfig struct { // expected to complete before the server is considered healthy. Without // this, the server can hang indefinitely waiting for these. StartTimeout string `hcl:"start_timeout"` + + // LogFile is used by MonitorExport to stream a server's log file + LogFile string `hcl:"log_file"` } func (s *ServerConfig) Copy() *ServerConfig { diff --git a/command/agent/http.go b/command/agent/http.go index 52c552677..2cabf64ff 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -472,6 +472,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { // "application/json" Content-Type depending on the ?plain= query // parameter. s.mux.HandleFunc("/v1/agent/monitor", s.wrap(s.AgentMonitor)) + s.mux.HandleFunc("/v1/agent/monitor/export", s.wrap(s.AgentMonitorExport)) s.mux.HandleFunc("/v1/agent/pprof/", s.wrapNonJSON(s.AgentPprofRequest)) diff --git a/command/agent/monitor/export_monitor.go b/command/agent/monitor/export_monitor.go new file mode 100644 index 000000000..b86485ec4 --- /dev/null +++ b/command/agent/monitor/export_monitor.go @@ -0,0 +1,275 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package monitor + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "os/exec" + "regexp" + "runtime" + "slices" + "strings" + "sync" + "time" + + "github.com/hashicorp/go-hclog" +) + +const defaultBufSize = 512 + +// ExportMonitor implements the Monitor interface for testing +type ExportMonitor struct { + sync.Mutex + + logCh chan []byte + logger hclog.Logger + + // doneCh coordinates breaking out of the export loop + doneCh chan struct{} + + // ExportReader can read from the cli or the NomadFilePath + ExportReader *ExportReader + + bufSize int +} + +type MonitorExportOpts struct { + Logger hclog.Logger + + // LogsSince sets the lookback time for monitorExport logs in hours + LogsSince string + + // OnDisk indicates that nomad should export logs written to the configured nomad log path + OnDisk bool + + // ServiceName is the systemd service for which we want to retrieve logs + // Cannot be used with OnDisk + ServiceName string + + // NomadLogPath is set to the nomad log path by the HTTP agent if OnDisk + // is true + NomadLogPath string + + // Follow indicates that the monitor should continue to deliver logs until + // an outside interrupt + Follow bool + + // Context passed from client to close the cmd and exit the function + Context context.Context + + // ExportMonitor's buffer size, defaults to 512 if unset by caller + BufSize int +} + +type ExportReader struct { + io.Reader + Cmd *exec.Cmd + UseCli bool + Follow bool +} + +// NewExportMonitor validates and prepares the appropriate reader before +// returning a new ExportMonitor or the appropriate error +func NewExportMonitor(opts MonitorExportOpts) (*ExportMonitor, error) { + var ( + exportReader *ExportReader + bufSize int + ) + + if runtime.GOOS != "linux" && + opts.ServiceName != "" { + return nil, errors.New("journald log monitoring only available on linux") + } + + if opts.BufSize == 0 { + bufSize = defaultBufSize + } else { + bufSize = opts.BufSize + } + + if opts.OnDisk && opts.ServiceName == "" { + e, prepErr := fileReader(opts) + if prepErr != nil { + return nil, prepErr + } + exportReader = e + } + + if opts.ServiceName != "" && !opts.OnDisk { + e, prepErr := cliReader(opts) + if prepErr != nil { + return nil, prepErr + } + exportReader = e + } + + sw := ExportMonitor{ + logger: hclog.Default().Named("export"), + doneCh: make(chan struct{}, 1), + logCh: make(chan []byte, bufSize), + bufSize: bufSize, + ExportReader: exportReader, + } + + return &sw, nil +} + +// ScanServiceName checks that the length, prefix and suffix conform to +// systemd conventions and ensures the service name includes the word 'nomad' +func ScanServiceName(input string) error { + prefix := "" + // invalid if prefix and suffix together are > 255 char + if len(input) > 255 { + return errors.New("service name too long") + } + + if isNomad := strings.Contains(input, "nomad"); !isNomad { + return errors.New(`service name must include 'nomad`) + } + + // if there is a suffix, check against list of valid suffixes + // and set prefix to exclude suffix index, else set prefix + splitInput := strings.Split(input, ".") + if len(splitInput) < 2 { + prefix = input + } else { + suffix := splitInput[len(splitInput)-1] + validSuffix := []string{ + "service", + "socket", + "device", + "mount", + "automount", + "swap", + "target", + "path", + "timer", + "slice", + "scope", + } + if valid := slices.Contains(validSuffix, suffix); !valid { + return errors.New("invalid suffix") + } + prefix = strings.Join(splitInput[:len(splitInput)-1], "") + } + + safe, _ := regexp.MatchString(`^[\w\\._-]*(@[\w\\._-]+)?$`, prefix) + if !safe { + return fmt.Errorf("%s does not meet systemd conventions", prefix) + } + return nil +} + +func cliReader(opts MonitorExportOpts) (*ExportReader, error) { + isCli := true + // Vet servicename again + if err := ScanServiceName(opts.ServiceName); err != nil { + return nil, err + } + cmdDuration := "72 hours" + if opts.LogsSince != "" { + parsedDur, err := time.ParseDuration(opts.LogsSince) + if err != nil { + return nil, err + } + cmdDuration = parsedDur.String() + } + // build command with vetted inputs + cmdArgs := []string{"-xu", opts.ServiceName, "--since", fmt.Sprintf("%s ago", cmdDuration)} + + if opts.Follow { + cmdArgs = append(cmdArgs, "-f") + } + cmd := exec.CommandContext(opts.Context, "journalctl", cmdArgs...) + + // set up reader + stdOut, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + stdErr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + multiReader := io.MultiReader(stdOut, stdErr) + cmd.Start() + + return &ExportReader{multiReader, cmd, isCli, opts.Follow}, nil +} + +func fileReader(opts MonitorExportOpts) (*ExportReader, error) { + notCli := false + file, err := os.Open(opts.NomadLogPath) + if err != nil { + return nil, err + } + return &ExportReader{file, nil, notCli, opts.Follow}, nil + +} + +// Stop stops the monitoring process +func (d *ExportMonitor) Stop() { + select { + case _, ok := <-d.doneCh: + if !ok { + if d.ExportReader.UseCli { + d.ExportReader.Cmd.Wait() + } + close(d.logCh) + return + } + default: + } + close(d.logCh) +} + +// Start reads data from the monitor's ExportReader into its logCh +func (d *ExportMonitor) Start() <-chan []byte { + // Read, copy, and send to channel until we hit EOF or error + go func() { + defer d.Stop() + logChunk := make([]byte, d.bufSize) + + for { + n, readErr := d.ExportReader.Read(logChunk) + if readErr != nil && readErr != io.EOF { + d.logger.Error("unable to read logs into channel", readErr.Error()) + return + } + + d.Write(logChunk[:n]) + + if readErr == io.EOF { + break + } + } + close(d.doneCh) + }() + return d.logCh +} + +// Write attempts to send latest log to logCh +// it drops the log if channel is unavailable to receive +func (d *ExportMonitor) Write(p []byte) (n int) { + d.Lock() + defer d.Unlock() + + // ensure logCh is still open + select { + case <-d.doneCh: + return + default: + } + + bytes := make([]byte, len(p)) + copy(bytes, p) + + d.logCh <- bytes + + return len(p) +} diff --git a/command/agent/monitor/monitor_test.go b/command/agent/monitor/monitor_test.go index 306ae8859..32a108411 100644 --- a/command/agent/monitor/monitor_test.go +++ b/command/agent/monitor/monitor_test.go @@ -4,13 +4,17 @@ package monitor import ( + "context" "fmt" + "os" "strings" + "testing" "time" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -27,7 +31,6 @@ func TestMonitor_Start(t *testing.T) { logCh := m.Start() defer m.Stop() - go func() { logger.Debug("test log") time.Sleep(10 * time.Millisecond) @@ -90,3 +93,103 @@ TEST: } } } + +func TestMonitor_Export(t *testing.T) { + ci.Parallel(t) + const ( + expectedText = "log log log log log" + ) + + dir := t.TempDir() + f, err := os.CreateTemp(dir, "log") + must.NoError(t, err) + for range 1000 { + _, _ = f.WriteString(fmt.Sprintf("%v [INFO] it's log, it's log, it's big it's heavy it's wood", time.Now())) + } + f.Close() + goldenFilePath := f.Name() + goldenFileContents, err := os.ReadFile(goldenFilePath) + must.NoError(t, err) + + testFile, err := os.CreateTemp("", "nomadtest") + must.NoError(t, err) + + _, err = testFile.Write([]byte(expectedText)) + must.NoError(t, err) + inlineFilePath := testFile.Name() + + logger := log.NewInterceptLogger(&log.LoggerOptions{ + Level: log.Error, + }) + ctx, cancel := context.WithCancel(context.Background()) + cases := []struct { + name string + opts MonitorExportOpts + expected string + expectClose bool + }{ + { + name: "happy_path_logpath_long_file", + opts: MonitorExportOpts{ + Context: ctx, + Logger: logger, + OnDisk: true, + NomadLogPath: goldenFilePath, + }, + expected: string(goldenFileContents), + }, + { + name: "happy_path_logpath_short_file", + opts: MonitorExportOpts{ + Context: ctx, + Logger: logger, + OnDisk: true, + NomadLogPath: inlineFilePath, + }, + expected: expectedText, + }, + { + name: "close client context", + opts: MonitorExportOpts{ + Context: ctx, + Logger: logger, + OnDisk: true, + NomadLogPath: inlineFilePath, + }, + expected: expectedText, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + monitor, err := NewExportMonitor(tc.opts) + must.NoError(t, err) + logCh := monitor.Start() + if tc.expectClose { + cancel() + } + + var builder strings.Builder + + TEST: + for { + select { + case log, ok := <-logCh: + if !ok { + break TEST + } + builder.Write(log) + default: + continue + } + + } + + if !tc.expectClose { + must.Eq(t, strings.TrimSpace(tc.expected), strings.TrimSpace(builder.String())) + } else { + must.Eq(t, builder.String(), "") + } + }) + } +} diff --git a/command/agent/monitor/stream_helpers.go b/command/agent/monitor/stream_helpers.go new file mode 100644 index 000000000..b82045f01 --- /dev/null +++ b/command/agent/monitor/stream_helpers.go @@ -0,0 +1,250 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package monitor + +import ( + "bytes" + "context" + "io" + "strings" + "sync" + "syscall" + + "github.com/hashicorp/go-msgpack/v2/codec" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" + cstructs "github.com/hashicorp/nomad/client/structs" +) + +// StreamReader is used to process fixed length streams for consumers +// that rely on terminating the stream after hitting an EOF. The lock +// protects the buffer during reads +type StreamReader struct { + sync.Mutex + framer *sframer.StreamFramer + ch <-chan []byte + buf []byte + frameSize int64 +} + +// NewStreamReader takes a <-chan[]byte and *sframer.StreamFramer and returns +// a ready to use StreamReader that will allocate its buffer on first read +func NewStreamReader(ch <-chan []byte, framer *sframer.StreamFramer, frameSize int64) *StreamReader { + return &StreamReader{ + ch: ch, + framer: framer, + frameSize: frameSize, + } +} + +// Read reads stream data into the StreamReader's buffer and copies that +// data into p +func (r *StreamReader) Read(p []byte) (n int, err error) { + select { + case data, ok := <-r.ch: + if !ok && len(data) == 0 { + return 0, io.EOF + } + r.Lock() + r.buf = data + default: + return 0, nil + } + + n = copy(p, r.buf) + r.buf = r.buf[n:] + r.Unlock() + return n, nil +} + +// StreamFixed streams any fixed length data stream. If limit is greater than +// zero, the stream will end once that many bytes have been read. If eofCancelCh +// is triggered while at EOF, read one more frame and cancel the stream on the +// next EOF. If the connection is broken an EPIPE error is returned. +func (r *StreamReader) StreamFixed(ctx context.Context, offset int64, path string, limit int64, + eofCancelCh chan error, cancelAfterFirstEof bool) error { + defer r.framer.Flush() + parseFramerErr := func(err error) error { + if err == nil { + return nil + } + errMsg := err.Error() + + if strings.Contains(errMsg, io.ErrClosedPipe.Error()) { + // The pipe check is for tests + return syscall.EPIPE + } + + // The connection was closed by our peer + if strings.Contains(errMsg, syscall.EPIPE.Error()) || strings.Contains(errMsg, syscall.ECONNRESET.Error()) { + return syscall.EPIPE + } + + if strings.Contains(errMsg, "forcibly closed") { + return syscall.EPIPE + } + + return err + } + // streamFrameSize is the maximum number of bytes to send in a single frame + streamFrameSize := r.frameSize + + bufSize := streamFrameSize + if limit > 0 && limit < streamFrameSize { + bufSize = limit + } + streamBuffer := make([]byte, bufSize) + + var lastEvent string + + // Only watch file when there is a need for it + cancelReceived := cancelAfterFirstEof + +OUTER: + for { + // Read up to the max frame size + n, readErr := r.Read(streamBuffer) + + // Update the offset + offset += int64(n) + + // Return non-EOF errors + if readErr != nil && readErr != io.EOF { + return readErr + } + + // Send the frame + if n != 0 || lastEvent != "" { + if err := r.framer.Send(path, lastEvent, streamBuffer[:n], offset); err != nil { + return parseFramerErr(err) + } + } + + // Clear the last event + if lastEvent != "" { + lastEvent = "" + } + + // Just keep reading since we aren't at the end of the file so we can + // avoid setting up a file event watcher. + if readErr == nil { + continue + } + // At this point we can stop without waiting for more changes, + // because we have EOF and either we're not following at all, + // or we received an event from the eofCancelCh channel + // and last read was executed + if cancelReceived { + return nil + } + + for { + select { + case <-r.framer.ExitCh(): + return nil + case <-ctx.Done(): + return nil + case _, ok := <-eofCancelCh: + if !ok { + return nil + } + cancelReceived = true + continue OUTER + } + } + } +} + +// Destroy wraps the underlying framer's Destroy() call +func (r *StreamReader) Destroy() { + r.framer.Destroy() +} + +// Run wraps the underlying framer's Run() call +func (r *StreamReader) Run() { + r.framer.Run() +} + +// StreamEncoder consolidates logic used by monitor RPC handlers to encode and +// return stream data +type StreamEncoder struct { + buf *bytes.Buffer + conn io.ReadWriteCloser + encoder *codec.Encoder + frameCodec *codec.Encoder + plainText bool +} + +// NewStreamEncoder takes buf *bytes.Buffer, conn io.ReadWriteCloser, encoder *codec.Encoder +// frameCodec *codec.Encoder,and plainText bool and returns a NewStreamEncoder +func NewStreamEncoder(buf *bytes.Buffer, conn io.ReadWriteCloser, encoder *codec.Encoder, + frameCodec *codec.Encoder, plainText bool) StreamEncoder { + return StreamEncoder{ + buf: buf, + conn: conn, + encoder: encoder, + frameCodec: frameCodec, + plainText: plainText, + } +} + +// EncodeStream reads and encodes data from a chan *sframer.Streamframe until the +// channel is closed. If eofCancel is true,EncodeStream continues to read from the closed +// channel until the underlying framer reports it has flushed it's final frame +func (s *StreamEncoder) EncodeStream(frames chan *sframer.StreamFrame, + errCh chan error, ctx context.Context, framer *sframer.StreamFramer, + eofCancel bool) (err error) { + var streamErr error + localFlush := false +OUTER: + for { + select { + case frame, ok := <-frames: + if !ok { + // frame may have been closed when an error + // occurred. Check once more for an error. + select { + case streamErr = <-errCh: + return streamErr + // There was a pending error! + default: + // No error, continue on and let exitCh control breaking + } + // Confirm framer.Flush and localFlush if we're expecting EOF + if eofCancel { + _, ok := <-framer.ExitCh() + if !ok { + if framer.IsFlushed() && !localFlush { + localFlush = true + continue + } else if framer.IsFlushed() && localFlush { + break OUTER + } + } + } else { + break OUTER + } + } + + var resp cstructs.StreamErrWrapper + if s.plainText { + resp.Payload = frame.Data + } else { + if err := s.frameCodec.Encode(frame); err != nil && err != io.EOF { + return err + } + + resp.Payload = s.buf.Bytes() + s.buf.Reset() + } + if err := s.encoder.Encode(resp); err != nil { + return err + } + s.encoder.Reset(s.conn) + case <-ctx.Done(): + break OUTER + } + + } + return nil +} diff --git a/command/agent/monitor/stream_helpers_test.go b/command/agent/monitor/stream_helpers_test.go new file mode 100644 index 000000000..52bb38c9c --- /dev/null +++ b/command/agent/monitor/stream_helpers_test.go @@ -0,0 +1,250 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package monitor + +import ( + "context" + "io" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" + "github.com/shoenig/test/must" +) + +var writeLine = []byte("[INFO] log log log made of wood you are heavy but so good\n") + +func prepFile(t *testing.T) *os.File { + const loopCount = 10 + // Create test file to read from + dir := t.TempDir() + f, err := os.CreateTemp(dir, "log") + must.NoError(t, err) + + for range loopCount { + _, _ = f.Write(writeLine) + } + f.Close() + + // Create test file reader for stream set up + goldenFilePath := f.Name() + fileReader, err := os.Open(goldenFilePath) + must.NoError(t, err) + return fileReader +} + +func TestClientStreamReader_StreamFixed(t *testing.T) { + ci.Parallel(t) + + streamBytes := func(streamCh chan []byte, wg *sync.WaitGroup, file *os.File) { + go func() { + defer close(streamCh) + defer wg.Done() + logChunk := make([]byte, len(writeLine)) + for { + n, readErr := file.Read(logChunk) + if readErr != nil && readErr != io.EOF { + must.NoError(t, readErr) + } + + streamCh <- logChunk[:n] + if readErr == io.EOF { + break + } + } + }() + } + + cases := []struct { + name string + + eofCancel bool + expectErr bool + errString string + }{ + { + name: "happy_path", + eofCancel: true, + }, + { + name: "Stream Framer not Running", + expectErr: true, + eofCancel: true, + errString: "StreamFramer not running", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + file := prepFile(t) + goldenFileContents, err := os.ReadFile(file.Name()) + must.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + streamMsg := make(chan []byte, len(goldenFileContents)) + streamBytes(streamMsg, &wg, file) + wg.Wait() + + frames := make(chan *sframer.StreamFrame, 32) + frameSize := 1024 + errCh := make(chan error, 1) + framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, frameSize) + streamReader := NewStreamReader(streamMsg, framer, int64(frameSize)) + ctx, cancel := context.WithCancel(context.Background()) + + defer cancel() + wg.Add(1) //block until streamReader completes + + go func() { + defer wg.Done() + defer streamReader.Destroy() + if !tc.expectErr { + streamReader.Run() + } + initialOffset := int64(0) + err := streamReader.StreamFixed(ctx, initialOffset, "", 0, errCh, tc.eofCancel) + if !tc.expectErr { + must.NoError(t, err) + } else { + must.NotNil(t, err) + must.EqError(t, err, tc.errString) + } + + }() + wg.Wait() + // Parse and validate the contents of the frames channel + var streamErr error + var builder strings.Builder + var skipCount int + + OUTER: + for skipCount < 2 { + select { + case frame, ok := <-frames: + if !ok { + select { + case streamErr = <-errCh: + must.NoError(t, streamErr) //we shouldn't hit an error here + default: + + } + break OUTER + } + builder.Write(frame.Data) + case streamErr = <-errCh: + must.NoError(t, streamErr) //we shouldn't hit an error here + case <-ctx.Done(): + break OUTER + default: + skipCount++ + time.Sleep(1 * time.Millisecond) //makes the test a touch less flakey + } + } + if !tc.expectErr { + must.Eq(t, string(goldenFileContents), builder.String()) + } + + }) + + } +} + +func TestScanServiceName(t *testing.T) { + cases := []struct { + testString string + expectErr bool + }{ + { + testString: `nomad`, + }, + { + testString: `nomad.socket`, + }, + { + testString: `nomad-client.service`, + }, + { + testString: `nomad.client.02.swap`, + }, + { + testString: `nomadhelper@54.device`, + }, + { + testString: `1.\@_-nomad@`, + expectErr: true, + }, + { + testString: `1./@_-nomad@.automount`, + expectErr: true, + }, + { + testString: `docker.path`, + expectErr: true, + }, + { + testString: `nomad.path.gotcha`, + expectErr: true, + }, + { + testString: `nomad/8.path`, + expectErr: true, + }, + { + testString: `nomad%.path`, + expectErr: true, + }, + { + testString: `nom4ad.path`, + expectErr: true, + }, + { + testString: `nomad,.path`, + expectErr: true, + }, + { + testString: `nomad.client`, + expectErr: true, + }, + { + testString: `nomad!.path`, + expectErr: true, + }, + { + testString: `nomad%http.timer`, + expectErr: true, + }, + { + testString: `nomad,http.mount`, + expectErr: true, + }, + { + testString: `nomad$http.service`, + expectErr: true, + }, + { + testString: `nomad$.http.service`, + expectErr: true, + }, + { + testString: `nomad$`, + expectErr: true, + }, + } + + for _, tc := range cases { + t.Run(tc.testString, func(t *testing.T) { + err := ScanServiceName(tc.testString) + if !tc.expectErr { + must.NoError(t, err) + } else { + must.Error(t, err) + } + + }) + } +} diff --git a/command/agent/monitor/test_helpers.go b/command/agent/monitor/test_helpers.go new file mode 100644 index 000000000..ab5b80ddd --- /dev/null +++ b/command/agent/monitor/test_helpers.go @@ -0,0 +1,99 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package monitor + +import ( + "encoding/json" + "errors" + "io" + "net" + "strings" + "time" + + "github.com/hashicorp/go-msgpack/v2/codec" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" +) + +// StreamingClient is an interface that implements the StreamingRpcHandler function +type StreamingClient interface { + StreamingRpcHandler(string) (structs.StreamingRpcHandler, error) +} + +// ExportMonitorClient_TestHelper consolidates streaming test setup for use in +// client and server RPChandler tests +func ExportMonitorClient_TestHelper(req cstructs.MonitorExportRequest, c StreamingClient, + userTimeout <-chan time.Time) (*strings.Builder, error) { + var ( + builder strings.Builder + returnedErr error + timeout <-chan time.Time + ) + handler, err := c.StreamingRpcHandler("Agent.MonitorExport") + if err != nil { + return nil, err + } + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + err := decoder.Decode(&msg) + streamMsg <- &msg + if err != nil { + errCh <- err + return + } + + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + if err := encoder.Encode(req); err != nil { + return nil, err + } + if userTimeout != nil { + timeout = userTimeout + } + +OUTER: + for { + select { + case <-timeout: + return nil, errors.New("expected to be unreachable") + case err := <-errCh: + if err != nil && err != io.EOF { + return nil, err + } + case message := <-streamMsg: + var frame sframer.StreamFrame + + if message.Error != nil { + returnedErr = message.Error + } + + if len(message.Payload) != 0 { + err = json.Unmarshal(message.Payload, &frame) + returnedErr = err + builder.Write(frame.Data) + } else { + break OUTER + } + } + } + return &builder, returnedErr +} diff --git a/command/agent_monitor.go b/command/agent_monitor.go index 2b177e237..953c16eec 100644 --- a/command/agent_monitor.go +++ b/command/agent_monitor.go @@ -7,11 +7,8 @@ import ( "fmt" "io" "os" - "os/signal" "strconv" "strings" - "syscall" - "time" "github.com/hashicorp/cli" "github.com/hashicorp/nomad/api" @@ -127,31 +124,12 @@ func (c *MonitorCommand) Run(args []string) int { eventDoneCh := make(chan struct{}) frames, errCh := client.Agent().Monitor(eventDoneCh, query) - select { - case err := <-errCh: + r, err := streamFrames(frames, errCh, -1, eventDoneCh) + if err != nil { c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err)) c.Ui.Error(commandErrorText(c)) return 1 - default: } - - // Create a reader - var r io.ReadCloser - frameReader := api.NewFrameReader(frames, errCh, eventDoneCh) - frameReader.SetUnblockTime(500 * time.Millisecond) - r = frameReader - - defer r.Close() - - signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - - go func() { - <-signalCh - // End the streaming - r.Close() - }() - _, err = io.Copy(os.Stdout, r) if err != nil { c.Ui.Error(fmt.Sprintf("error monitoring logs: %s", err)) diff --git a/command/agent_monitor_export.go b/command/agent_monitor_export.go new file mode 100644 index 000000000..b803f3101 --- /dev/null +++ b/command/agent_monitor_export.go @@ -0,0 +1,209 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package command + +import ( + "fmt" + "io" + "os" + "strconv" + "strings" + "time" + + "github.com/hashicorp/cli" + "github.com/hashicorp/nomad/api" + "github.com/posener/complete" +) + +type MonitorExportCommand struct { + Meta + + // Below this point is where CLI flag options are stored. + nodeID string + serverID string + onDisk bool + logsSince time.Duration + serviceName string + follow bool +} + +func (c *MonitorExportCommand) Help() string { + helpText := ` +Usage: nomad monitor export [options] + +Use the 'nomad monitor export' command to export an agent's historic data +from journald or its Nomad log file. If exporting journald logs, you must +pass '-service-name' with the name of the nomad service. +The '-logs-since' and '-follow' options are only valid for journald queries. +You may pass a duration string to the '-logs-since' option to override the +default 72h duration. Nomad will accept the following time units in the +'-logs-since duration string:"ns", "us" (or "µs"), "ms", "s", "m", "h". +The '-follow=true' option causes the agent to continue to stream logs until +interrupted or until the remote agent quits. Nomad only supports journald +queries on Linux. + +If you do not use Linux or you do not run Nomad as a systemd unit, pass the +'-on-disk=true' option to export the entirety of a given agent's nomad log file. + +When ACLs are enabled, this command requires a token with the 'agent:read' +capability. + + +General Options: + + ` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + ` + +Monitor Specific Options: + + -node-id + Sets the specific node to monitor. Accepts only a single node-id and cannot + be used with server-id. + + -server-id + Sets the specific server to monitor. Accepts only a single server-id and + cannot be used with node-id. + + -service-name + Sets the name of the nomad service, must match systemd conventions and + include the word 'nomad'. You may provide the full systemd file name + or omit the suffix. If your service name includes a '.', you must include + a valid suffix (e.g. nomad.client.service). + + -logs-since + Sets the journald log period, invalid if on-disk=true. Defaults to 72h. + Valid unit strings are "ns", "us" (or "µs"), "ms", "s", "m", "h". + + -follow + If set, the export command will continue streaming until interrupted. Ignored + if on-disk=true. + + -on-disk + If set, the export command will retrieve the Nomad log file defined in the + target agent's log_file configuration. +` + return strings.TrimSpace(helpText) +} + +func (c *MonitorExportCommand) Synopsis() string { + return "Stream logs from a Nomad agent" +} + +func (c *MonitorExportCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-node-id": NodePredictor(c.Client), + "-server-id": ServerPredictor(c.Client), + "-service-name": complete.PredictSet("nomad"), + "-logs-since": complete.PredictNothing, + "-follow": complete.PredictNothing, + "-on-disk": complete.PredictNothing, + }) +} + +func (c *MonitorExportCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictNothing +} + +func (c *MonitorExportCommand) Name() string { return "monitor export" } + +func (c *MonitorExportCommand) Run(args []string) int { + c.Ui = &cli.PrefixedUi{ + OutputPrefix: " ", + InfoPrefix: " ", + ErrorPrefix: "==> ", + Ui: c.Ui, + } + defaultDur := time.Hour * 72 + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.StringVar(&c.nodeID, "node-id", "", "") + flags.StringVar(&c.serverID, "server-id", "", "") + flags.DurationVar(&c.logsSince, "logs-since", defaultDur, + `sets the journald log period. Defaults to 72h, valid unit strings are + "ns", "us" (or "µs"), "ms", "s", "m", or "h".`) + flags.StringVar(&c.serviceName, "service-name", "", + "the name of the systemdervice unit to collect logs for, cannot be used with on-disk=true") + flags.BoolVar(&c.onDisk, "on-disk", false, + "directs the cli to stream the configured nomad log file, cannot be used with -service-name") + flags.BoolVar(&c.follow, "follow", false, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + args = flags.Args() + if l := len(args); l != 0 { + c.Ui.Error("This command takes no arguments") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + if c.serviceName != "" && c.onDisk { + c.Ui.Error("Cannot target journalctl and nomad log file simultaneously") + c.Ui.Error(commandErrorText(c)) + } + + if c.serviceName != "" { + if isNomad := strings.Contains(c.serviceName, "nomad"); !isNomad { + c.Ui.Error(fmt.Sprintf("Invalid value: -service-name=%s does not include 'nomad'", c.serviceName)) + c.Ui.Error(commandErrorText(c)) + } + } + + if c.serviceName == "" && !c.onDisk { + c.Ui.Error("One of -service-name or -on-disk must be set") + } + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + c.Ui.Error(commandErrorText(c)) + return 1 + } + + // Query the node info and lookup prefix + if c.nodeID != "" { + c.nodeID, err = lookupNodeID(client.Nodes(), c.nodeID) + if err != nil { + c.Ui.Error(err.Error()) + return 1 + } + } + + params := map[string]string{ + "follow": strconv.FormatBool(c.follow), + "logs_since": c.logsSince.String(), + "node_id": c.nodeID, + "on_disk": strconv.FormatBool(c.onDisk), + "server_id": c.serverID, + "service_name": c.serviceName, + } + + query := &api.QueryOptions{ + Params: params, + } + + eventDoneCh := make(chan struct{}) + frames, errCh := client.Agent().MonitorExport(eventDoneCh, query) + r, err := streamFrames(frames, errCh, -1, eventDoneCh) + + if err != nil { + c.Ui.Error(fmt.Sprintf("Error starting monitor: \n%s", err)) + c.Ui.Error(commandErrorText(c)) + return 1 + } + + n, err := io.Copy(os.Stdout, r) + if err != nil && err != io.EOF { + c.Ui.Error(fmt.Sprintf("Error monitoring logs: %s", err.Error())) + return 1 + } + + if n == 0 && err == nil { + emptyMessage := "Returned no data or errors, check your log_file configuration or service name" + c.Ui.Error(fmt.Sprintf("Error starting monitor: \n%s", emptyMessage)) + return 1 + } + return 0 +} diff --git a/command/agent_monitor_export_test.go b/command/agent_monitor_export_test.go new file mode 100644 index 000000000..fe7b8bbb8 --- /dev/null +++ b/command/agent_monitor_export_test.go @@ -0,0 +1,93 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package command + +import ( + "os" + "path/filepath" + "testing" + + "github.com/hashicorp/cli" + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/command/agent" + "github.com/shoenig/test/must" +) + +func TestMonitorExportCommand_Implements(t *testing.T) { + ci.Parallel(t) + var _ cli.Command = &MonitorExportCommand{} +} + +func TestMonitorExportCommand_Fails(t *testing.T) { + const expectedText = "log log log log log" + + tempDir := t.TempDir() + testFile := filepath.Join(tempDir, "test.log") + must.NoError(t, os.WriteFile(testFile, []byte(expectedText), 0777)) + config := func(c *agent.Config) { + c.LogFile = testFile + } + + srv, _, url := testServer(t, false, config) + defer srv.Shutdown() + cases := []struct { + name string + cmdArgs []string + defaultErr bool + errString string + }{ + { + name: "misuse", + cmdArgs: []string{"some", "bad", "args"}, + defaultErr: true, + }, + { + name: "no address", + cmdArgs: []string{"-address=nope"}, + errString: "unsupported protocol scheme", + }, + { + name: "invalid follow boolean", + cmdArgs: []string{"-address=" + url, "-follow=maybe"}, + errString: `invalid boolean value "maybe" for -follow`, + }, + { + name: "invalid on-disk boolean", + cmdArgs: []string{"-address=" + url, "-on-disk=maybe"}, + errString: `invalid boolean value "maybe" for -on-disk`, + }, + { + name: "setting both on-disk and service-name", + cmdArgs: []string{"-address=" + url, "-on-disk=true", "-service-name=nomad"}, + errString: "journald and nomad log file simultaneously", + }, + { + name: "setting neither on-disk nor service-name", + cmdArgs: []string{"-address=" + url}, + errString: "One of -service-name or -on-disk must be set", + }, + { + name: "requires nomad in service name", + cmdArgs: []string{"-address=" + url, "-service-name=docker.path"}, + errString: "does not include 'nomad'", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ui := cli.NewMockUi() + cmd := &MonitorExportCommand{Meta: Meta{Ui: ui}} + + code := cmd.Run(tc.cmdArgs) + must.One(t, code) + + out := ui.ErrorWriter.String() + if tc.defaultErr { + must.StrContains(t, out, commandErrorText(cmd)) + } else { + must.StrContains(t, out, tc.errString) + } + }) + } +} diff --git a/command/alloc_fs.go b/command/alloc_fs.go index 8d01bd2e8..9386bea9a 100644 --- a/command/alloc_fs.go +++ b/command/alloc_fs.go @@ -8,9 +8,7 @@ import ( "io" "math/rand" "os" - "os/signal" "strings" - "syscall" "time" humanize "github.com/dustin/go-humanize" @@ -355,38 +353,19 @@ func (f *AllocFSCommand) Run(args []string) int { return 0 } -// followFile outputs the contents of the file to stdout relative to the end of -// the file. If numLines does not equal -1, then tail -n behavior is used. +// followFile calls the streamFrames helper to output the contents of the +// file to stdout relative to the end of the file. If numLines does not equal +// -1, then tail -n behavior is used. func (f *AllocFSCommand) followFile(client *api.Client, alloc *api.Allocation, path, origin string, offset, numLines int64) (io.ReadCloser, error) { cancel := make(chan struct{}) + frames, errCh := client.AllocFS().Stream(alloc, path, origin, offset, cancel, nil) - select { - case err := <-errCh: + r, err := streamFrames(frames, errCh, numLines, cancel) + if err != nil { return nil, err - default: } - signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - - // Create a reader - var r io.ReadCloser - frameReader := api.NewFrameReader(frames, errCh, cancel) - frameReader.SetUnblockTime(500 * time.Millisecond) - r = frameReader - - // If numLines is set, wrap the reader - if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second) - } - - go func() { - <-signalCh - - // End the streaming - r.Close() - }() return r, nil } diff --git a/command/commands.go b/command/commands.go index 0766ea43f..d17b5c7ee 100644 --- a/command/commands.go +++ b/command/commands.go @@ -579,6 +579,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "monitor export": func() (cli.Command, error) { + return &MonitorExportCommand{ + Meta: meta, + }, nil + }, "namespace": func() (cli.Command, error) { return &NamespaceCommand{ Meta: meta, diff --git a/command/helpers.go b/command/helpers.go index e871d0cff..9f7e2af47 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -11,9 +11,11 @@ import ( "io" "maps" "os" + "os/signal" "path/filepath" "strconv" "strings" + "syscall" "time" "github.com/hashicorp/cli" @@ -782,3 +784,35 @@ func getByPrefix[T any]( return nil, objs, nil } } + +func streamFrames(frames <-chan *api.StreamFrame, errCh <-chan error, + numLines int64, cancel chan struct{}) (io.ReadCloser, error) { + + select { + case err := <-errCh: + return nil, err + default: + } + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + // Create a reader + var r io.ReadCloser + frameReader := api.NewFrameReader(frames, errCh, cancel) + frameReader.SetUnblockTime(500 * time.Millisecond) + r = frameReader + + // If numLines is set, wrap the reader + if numLines != -1 { + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second) + } else { + } + go func() { + <-signalCh + + // End the streaming + r.Close() + }() + + return r, nil +} diff --git a/command/helpers_test.go b/command/helpers_test.go index 52cce0780..6f03ffcfd 100644 --- a/command/helpers_test.go +++ b/command/helpers_test.go @@ -722,3 +722,126 @@ func TestHelperGetByPrefix(t *testing.T) { } } + +// TestHelperStreamFrames tests the streamFrames command helper used +// by the agent_monitor and fs_alloc endpoints to populate a reader +// with data from the streamFrame channel passed to the function +func TestHelperStreamFrames(t *testing.T) { + const loopCount = 50 + + // Create test file + dir := t.TempDir() + f, err := os.CreateTemp(dir, "log") + must.NoError(t, err) + writeLine := []byte("[INFO]log log log made of wood you are heavy but so good\n") + writeLength := len(writeLine) + + for range loopCount { + _, _ = f.Write(writeLine) + } + f.Close() + + // Create test file reader for streaming + goldenFilePath := f.Name() + goldenFileContents, err := os.ReadFile(goldenFilePath) + must.NoError(t, err) + + fileReader, err := os.Open(goldenFilePath) + must.NoError(t, err) + + // Helper func to populate stream chan in test case + streamFunc := func() (chan *api.StreamFrame, chan error, chan struct{}) { + framesCh := make(chan *api.StreamFrame, 30) + errCh := make(chan error) + cancelCh := make(chan struct{}) + + offset := 0 + + r := io.LimitReader(fileReader, 64) + for { + bytesHolder := make([]byte, 64) + n, err := r.Read(bytesHolder) + if err != nil && err != io.EOF { + must.NoError(t, err) + } + + if n == 0 && err == io.EOF { + break + } + offset += n + framesCh <- &api.StreamFrame{ + Offset: int64(offset), + Data: goldenFileContents, + File: goldenFilePath, + } + + if n != 0 && err == io.EOF { + //break after sending if we hit EOF with bytes in buffer + break + } + } + + close(framesCh) + return framesCh, errCh, cancelCh + } + testErr := errors.New("isErr") + cases := []struct { + name string + numLines int + expectErr bool + err error + }{ + { + name: "happy_no_limit", + numLines: -1, + }, + { + name: "happy_limit", + numLines: 25, + }, + { + name: "error", + numLines: -1, + expectErr: true, + err: testErr, + }, + } + + for _, tc := range cases { + + t.Run(tc.name, func(t *testing.T) { + + framesCh, errCh, cancelCh := streamFunc() + + if tc.expectErr { + go func() { + time.Sleep(time.Nanosecond * 1) + errCh <- tc.err + }() + } + + r, err := streamFrames(framesCh, errCh, int64(tc.numLines), cancelCh) + if !tc.expectErr { + must.NoError(t, err) + } + + result, err := io.ReadAll(r) + if !tc.expectErr { + must.NoError(t, err) + } + if tc.numLines == -1 { + //expectedLength := writeLength * loopCount + must.Eq(t, + goldenFileContents, + result) + } else { + expectedLength := (writeLength * tc.numLines) + must.Eq(t, + expectedLength, + len(result)) + } + + r.Close() + }) + } +} diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 1d63acfa2..1f545e48a 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -13,7 +13,7 @@ import ( "time" log "github.com/hashicorp/go-hclog" - + "github.com/hashicorp/go-msgpack/v2/codec" sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/host" @@ -21,8 +21,6 @@ import ( "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/structs" - - "github.com/hashicorp/go-msgpack/v2/codec" ) type Agent struct { @@ -35,6 +33,7 @@ func NewAgentEndpoint(srv *Server) *Agent { func (a *Agent) register() { a.srv.streamingRpcs.Register("Agent.Monitor", a.monitor) + a.srv.streamingRpcs.Register("Agent.MonitorExport", a.monitorExport) } func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPprofResponse) error { @@ -64,7 +63,7 @@ func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPpr return fmt.Errorf("missing target RPC") } - if region != a.srv.config.Region { + if region != a.srv.Region() { // Mark that we are forwarding args.SetForwarded() return a.srv.forwardRegion(region, "Agent.Profile", args, reply) @@ -87,7 +86,7 @@ func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPpr } // This server is the target, so now we can check for AllowAgentDebug - if !aclObj.AllowAgentDebug(a.srv.config.EnableDebug) { + if !aclObj.AllowAgentDebug(a.srv.GetConfig().EnableDebug) { return structs.ErrPermissionDenied } @@ -168,17 +167,17 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { // Targeting a node, forward request to node if args.NodeID != "" { - a.forwardMonitorClient(conn, args, encoder, decoder) + a.forwardMonitorClient(conn, args, encoder, decoder, args.NodeID, "Agent.Monitor") // forwarded request has ended, return return } region := args.RequestRegion() if region == "" { - handleStreamResultError(fmt.Errorf("missing target RPC"), pointer.Of(int64(400)), encoder) + handleStreamResultError(fmt.Errorf("missing target region"), pointer.Of(int64(400)), encoder) return } - if region != a.srv.config.Region { + if region != a.srv.Region() { // Mark that we are forwarding args.SetForwarded() } @@ -191,7 +190,9 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { return } if serverToFwd != nil { - a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder) + // Empty ServerID to prevent forwarding loop + args.ServerID = "" + a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder, "Agent.Monitor") return } } @@ -200,7 +201,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - monitor := monitor.New(512, a.srv.logger, &log.LoggerOptions{ + m := monitor.New(512, a.srv.logger, &log.LoggerOptions{ Level: logLevel, JSONFormat: args.LogJSON, IncludeLocation: args.LogIncludeLocation, @@ -225,8 +226,8 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { <-ctx.Done() }() - logCh := monitor.Start() - defer monitor.Stop() + logCh := m.Start() + defer m.Stop() initialOffset := int64(0) // receive logs and build frames @@ -248,48 +249,135 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { } } }() + streamEncoder := monitor.NewStreamEncoder(&buf, conn, encoder, frameCodec, args.PlainText) + streamErr := streamEncoder.EncodeStream(frames, errCh, ctx, framer, false) + if streamErr != nil { + handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder) + return + } +} - var streamErr error -OUTER: - for { - select { - case frame, ok := <-frames: - if !ok { - // frame may have been closed when an error - // occurred. Check once more for an error. - select { - case streamErr = <-errCh: - // There was a pending error! - default: - // No error, continue on - } +func (a *Agent) monitorExport(conn io.ReadWriteCloser) { + defer conn.Close() + // Decode args + var args cstructs.MonitorExportRequest + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) - break OUTER - } + if err := decoder.Decode(&args); err != nil { + handleStreamResultError(err, pointer.Of(int64(500)), encoder) + return + } + authErr := a.srv.Authenticate(nil, &args) + a.srv.MeasureRPCRate("agent", structs.RateMetricRead, &args) + if authErr != nil { + handleStreamResultError(structs.ErrPermissionDenied, nil, encoder) + return + } - var resp cstructs.StreamErrWrapper - if args.PlainText { - resp.Payload = frame.Data - } else { - if err := frameCodec.Encode(frame); err != nil { - streamErr = err - break OUTER - } + // Check agent read permissions + if aclObj, err := a.srv.ResolveACL(&args); err != nil { + handleStreamResultError(err, nil, encoder) + return + } else if !aclObj.AllowAgentRead() { + handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder) + return + } - resp.Payload = buf.Bytes() - buf.Reset() - } + // Targeting a node, forward request to node + if args.NodeID != "" { + a.forwardMonitorClient(conn, args, encoder, decoder, args.NodeID, "Agent.MonitorExport") + // forwarded request has ended, return + return + } - if err := encoder.Encode(resp); err != nil { - streamErr = err - break OUTER - } - encoder.Reset(conn) - case <-ctx.Done(): - break OUTER + region := args.RequestRegion() + if region == "" { + handleStreamResultError(fmt.Errorf("missing target region"), pointer.Of(int64(400)), encoder) + return + } + if region != a.srv.Region() { + // Mark that we are forwarding + args.SetForwarded() + } + + // Try to forward request to remote region/server + if args.ServerID != "" { + serverToFwd, err := a.forwardFor(args.ServerID, region) + if err != nil { + handleStreamResultError(err, pointer.Of(int64(400)), encoder) + return + } + if serverToFwd != nil { + //empty args.ServerID to prevent forwarding loop + args.ServerID = "" + a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder, "Agent.MonitorExport") + return } } + nomadLogPath := a.srv.GetConfig().LogFile + if args.OnDisk && nomadLogPath == "" { + handleStreamResultError(errors.New("No nomad log file defined"), pointer.Of(int64(400)), encoder) + } + // NodeID was empty, ServerID was equal to this server, monitor this server + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + opts := monitor.MonitorExportOpts{ + Logger: a.srv.logger, + LogsSince: args.LogsSince, + ServiceName: args.ServiceName, + NomadLogPath: nomadLogPath, + OnDisk: args.OnDisk, + Follow: args.Follow, + Context: ctx, + } + + frames := make(chan *sframer.StreamFrame, 32) + errCh := make(chan error) + var buf bytes.Buffer + frameSize := 1024 + frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + + framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, frameSize) + framer.Run() + defer framer.Destroy() + + // goroutine to detect remote side closing + go func() { + if _, err := conn.Read(nil); err != nil { + // One end of the pipe explicitly closed, exit + cancel() + return + } + <-ctx.Done() + }() + m, err := monitor.NewExportMonitor(opts) + if err != nil { + handleStreamResultError(err, pointer.Of(int64(500)), encoder) + return + } + + var eofCancelCh chan error + + streamCh := m.Start() + initialOffset := int64(0) + eofCancel := !opts.Follow + + streamEncoder := monitor.NewStreamEncoder(&buf, conn, encoder, frameCodec, args.PlainText) + // receive logs and build frames + streamReader := monitor.NewStreamReader(streamCh, framer, int64(frameSize)) + go func() { + defer framer.Destroy() + if err := streamReader.StreamFixed(ctx, initialOffset, "", 0, eofCancelCh, eofCancel); err != nil { + select { + case errCh <- err: + case <-ctx.Done(): + } + } + }() + + streamErr := streamEncoder.EncodeStream(frames, errCh, ctx, framer, true) if streamErr != nil { handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder) return @@ -334,11 +422,10 @@ func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) { return target, nil } -func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { +func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args any, encoder *codec.Encoder, decoder *codec.Decoder, nodeID string, endpoint string) { // Get the Connection to the client either by fowarding to another server // or creating direct stream - - state, srv, err := a.findClientConn(args.NodeID) + state, srv, err := a.findClientConn(nodeID) if err != nil { handleStreamResultError(err, pointer.Of(int64(500)), encoder) return @@ -347,7 +434,7 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni var clientConn net.Conn if state == nil { - conn, err := a.srv.streamingRpc(srv, "Agent.Monitor") + conn, err := a.srv.streamingRpc(srv, endpoint) if err != nil { handleStreamResultError(err, nil, encoder) return @@ -355,7 +442,7 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni clientConn = conn } else { - stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor") + stream, err := NodeStreamingRpc(state.Session, endpoint) if err != nil { handleStreamResultError(err, nil, encoder) return @@ -374,10 +461,7 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni structs.Bridge(conn, clientConn) } -func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, server *serverParts, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { - // empty ServerID to prevent forwarding loop - args.ServerID = "" - +func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, server *serverParts, args any, encoder *codec.Encoder, decoder *codec.Decoder, endpoint string) { serverConn, err := a.srv.streamingRpc(server, "Agent.Monitor") if err != nil { handleStreamResultError(err, pointer.Of(int64(500)), encoder) @@ -439,7 +523,7 @@ func (a *Agent) Host(args *structs.HostDataRequest, reply *structs.HostDataRespo return fmt.Errorf("missing target RPC") } - if region != a.srv.config.Region { + if region != a.srv.Region() { // Mark that we are forwarding args.SetForwarded() return a.srv.forwardRegion(region, "Agent.Host", args, reply) diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 3dcaa9ef7..9896c9f74 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "net" + "os" "strings" "testing" "time" @@ -21,6 +22,7 @@ import ( "github.com/hashicorp/nomad/client/config" sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -1023,3 +1025,86 @@ func TestAgentHost_ACLDebugRequired(t *testing.T) { err := s.RPC("Agent.Host", &req, &resp) must.EqError(t, err, structs.ErrPermissionDenied.Error()) } + +func TestMonitor_MonitorExport(t *testing.T) { + ci.Parallel(t) + const ( + shortText = "log log log log log" + ) + // Create test file + dir := t.TempDir() + f, err := os.CreateTemp(dir, "log") + must.NoError(t, err) + for range 1000 { + _, _ = f.WriteString(fmt.Sprintf("%v [INFO] it's log, it's log, it's big it's heavy it's wood", time.Now())) + } + f.Close() + longFilePath := f.Name() + longFileContents, err := os.ReadFile(longFilePath) + must.NoError(t, err) + + // start server + s, root, cleanupS := TestACLServer(t, func(c *Config) { + c.LogFile = longFilePath + }) + defer cleanupS() + defer os.Remove(longFilePath) + testutil.WaitForLeader(t, s.RPC) + + cases := []struct { + name string + expected string + nomadLogPath string + serviceName string + token *structs.ACLToken + onDisk bool + expectErr bool + }{ + { + name: "happy_path_long_file", + onDisk: true, + expected: string(longFileContents), + token: root, + }, + { + name: "token_error", + onDisk: true, + expected: string(longFileContents), + token: &structs.ACLToken{}, + expectErr: true, + }, + { + name: "invalid_service_name", + serviceName: "nomad$", + expected: string(longFileContents), + token: &structs.ACLToken{}, + expectErr: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + // No NodeID set to force server use + req := cstructs.MonitorExportRequest{ + LogsSince: "72", + NomadLogPath: tc.nomadLogPath, + OnDisk: tc.onDisk, + + ServiceName: tc.serviceName, + QueryOptions: structs.QueryOptions{ + Region: "global", + AuthToken: tc.token.SecretID, + }, + } + + builder, finalError := monitor.ExportMonitorClient_TestHelper(req, s, time.After(3*time.Second)) + if tc.expectErr { + must.Error(t, finalError) + return + } + must.NoError(t, err) + must.NotNil(t, builder) + must.Eq(t, strings.TrimSpace(tc.expected), strings.TrimSpace(builder.String())) + }) + } +} diff --git a/nomad/config.go b/nomad/config.go index c1e1c969e..a0b2d6887 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -448,6 +448,9 @@ type Config struct { // considered healthy. Without this, the server can hang indefinitely // waiting for these. StartTimeout time.Duration + + // LogFile is used by MonitorExport to stream a server's log file + LogFile string `hcl:"log_file"` } func (c *Config) Copy() *Config { diff --git a/website/content/commands/monitor/export.mdx b/website/content/commands/monitor/export.mdx new file mode 100644 index 000000000..42feb818e --- /dev/null +++ b/website/content/commands/monitor/export.mdx @@ -0,0 +1,79 @@ +--- +layout: docs +page_title: 'nomad monitor export command reference' +description: | + The `nomad monitor export` command returns logs written to disk or journald by a nomad agent. +--- + +# `nomad monitor export` command reference + +The `nomad monitor export` command returns logs written to disk or journald by a nomad agent. + +## Usage + +```plaintext +nomad monitor export [options] +``` + +Use the `nomad monitor export` command to export an agent's historic data +from journald or its Nomad log file. If exporting journald logs, you must +pass `-service-name` with the name of the systemd unit to query. +The `-logs-since` and `-follow` options are only valid for journald queries. +You may pass a duration string to the `-logs-since` option to override the +default 72h duration. Nomad will accept the following time units in the +`-logs-since` duration string: "ns", "us" (or "µs"), "ms", "s", "m", "h". +The `-follow=true` option causes the agent to continue to stream logs until +interrupted or until the remote agent quits. Nomad only supports journald +queries on Linux. + +If you do not use Linux or you do not run Nomad as a systemd unit, pass the +`-on-disk=true` option to export the entirety of a given agent's nomad log file. + +When ACLs are enabled, this command requires a token with the `agent:read` +capability. + +## Options + +- `-node-id`: Specifies the client node-id to stream logs from. If no + node-id is given, the Nomad server from the `-address` flag is used. + +- `-server-id`: Specifies the Nomad server id to stream logs from. Accepts + server names from `nomad server members` and also a special `leader` option + which will target the current leader. + +- `-service-name`: Specifies the the name of the systemd unit for export. + Do not use with `-on-disk`. Must include 'nomad' and conform to systemd + naming conventions. You may provide the full systemd file name + or omit the suffix. If your service name includes a '.', you must include + a valid suffix (e.g. nomad.client.service). + +- `-logs-since`: Duration used to determine how far back to return logs from + journald. Ignored if used with `-on-disk` and defaults to `72h` if not set. + +- `-follow`: Boolean that, if true, continues streaming journald logs until + interrupted. Do not use with `-on-disk` + +- `-on-disk`: Boolean that, if true, returns the contents of the Nomad log file + defined in the agent config. + +## Examples + +This example returns journald log entries with a specific node ID and service name. + +```shell-session +$ nomad monitor export -node-id=$(nomad node status --quiet) -service-name="nomad" +Jun 04 20:09:29 nomad-client01 systemd[1]: Starting Nomad... +Subject: A start job for unit nomad_client.service has begun execution +``` + +This example returns the contents of the nomad log file for a specific server. + +```shell-session +$ nomad monitor export -server-id=a57b2adb-1a30-2dda-8df0-25abb0881952 -on-disk=true +2025-06-20T12:22:08.528-0500 [DEBUG] http: request complete: method=GET path=/v1/agent/health?type=server duration=1.445739ms +2025-06-20T12:22:09.892-0500 [DEBUG] nomad: memberlist: Stream connection from=127.0.0.1:53628 +``` + +## General options + +@include 'general_options_no_namespace.mdx' diff --git a/website/content/commands/monitor.mdx b/website/content/commands/monitor/index.mdx similarity index 100% rename from website/content/commands/monitor.mdx rename to website/content/commands/monitor/index.mdx diff --git a/website/data/commands-nav-data.json b/website/data/commands-nav-data.json index 172f0503d..8d03b1365 100644 --- a/website/data/commands-nav-data.json +++ b/website/data/commands-nav-data.json @@ -412,7 +412,15 @@ }, { "title": "monitor", - "path": "monitor" + "routes": [ + { + "title": "Overview", + "path": "monitor" + }, + { "title": "export", + "path": "monitor/export" + } + ] }, { "title": "namespace",