From bd18a452abf5369309a876f2edf3ab7df983af23 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 16 Dec 2021 13:38:58 -0500 Subject: [PATCH] cli: stream raft logs to operator raft logs subcommand (#11684) The `nomad operator raft logs` command uses a raft helper that reads in the logs from raft and serializes them to JSON. The previous implementation returned the slice of all logs and then serializes the entire object. Update the helper to stream the log entries and then serialize them as newline-delimited JSON. --- command/operator_raft_logs.go | 53 ++++++++++++++++++++++++++++------- helper/raftutil/state.go | 51 ++++++++++++++++++++------------- 2 files changed, 74 insertions(+), 30 deletions(-) diff --git a/command/operator_raft_logs.go b/command/operator_raft_logs.go index fe3233ee6..006827aa6 100644 --- a/command/operator_raft_logs.go +++ b/command/operator_raft_logs.go @@ -28,6 +28,11 @@ Usage: nomad operator raft logs This is a low-level debugging tool and not subject to Nomad's usual backward compatibility guarantees. +Raft Logs Options: + + -pretty + By default this command outputs newline delimited JSON. If the -pretty flag + is passed, each entry will be pretty-printed. ` return strings.TrimSpace(helpText) } @@ -47,10 +52,20 @@ func (c *OperatorRaftLogsCommand) Synopsis() string { func (c *OperatorRaftLogsCommand) Name() string { return "operator raft logs" } func (c *OperatorRaftLogsCommand) Run(args []string) int { - if len(args) != 1 { + + var pretty bool + flagSet := c.Meta.FlagSet(c.Name(), FlagSetClient) + flagSet.Usage = func() { c.Ui.Output(c.Help()) } + flagSet.BoolVar(&pretty, "pretty", false, "") + + if err := flagSet.Parse(args); err != nil { + return 1 + } + + args = flagSet.Args() + if l := len(args); l != 1 { c.Ui.Error("This command takes one argument: ") c.Ui.Error(commandErrorText(c)) - return 1 } @@ -60,22 +75,40 @@ func (c *OperatorRaftLogsCommand) Run(args []string) int { return 1 } - logs, warnings, err := raftutil.LogEntries(raftPath) + enc := json.NewEncoder(os.Stdout) + if pretty { + enc.SetIndent("", " ") + } + + logChan, warningsChan, err := raftutil.LogEntries(raftPath) if err != nil { c.Ui.Error(err.Error()) return 1 } + // so that the warnings don't end up mixed into the JSON stream, + // collect them and print them once we're done + warnings := []error{} + +DONE: + for { + select { + case log := <-logChan: + if log == nil { + break DONE // no more logs, but break to print warnings + } + if err := enc.Encode(log); err != nil { + c.Ui.Error(fmt.Sprintf("failed to encode output: %v", err)) + return 1 + } + case warning := <-warningsChan: + warnings = append(warnings, warning) + } + } + for _, warning := range warnings { c.Ui.Error(warning.Error()) } - enc := json.NewEncoder(os.Stdout) - enc.SetIndent("", " ") - if err := enc.Encode(logs); err != nil { - c.Ui.Error(fmt.Sprintf("failed to encode output: %v", err)) - return 1 - } - return 0 } diff --git a/helper/raftutil/state.go b/helper/raftutil/state.go index 1206fbdba..f902a32ef 100644 --- a/helper/raftutil/state.go +++ b/helper/raftutil/state.go @@ -32,33 +32,44 @@ func RaftStateInfo(p string) (store *raftboltdb.BoltStore, firstIdx uint64, last return s, firstIdx, lastIdx, nil } -// LogEntries returns the log entries as found in raft log in the passed data-dir directory -func LogEntries(p string) (logs []interface{}, warnings []error, err error) { +// LogEntries reads the raft logs found in the data directory found at +// the path `p`, and returns a channel of logs, and a channel of +// warnings. If opening the raft state returns an error, both channels +// will be nil. +func LogEntries(p string) (<-chan interface{}, <-chan error, error) { store, firstIdx, lastIdx, err := RaftStateInfo(p) if err != nil { return nil, nil, fmt.Errorf("failed to open raft logs: %v", err) } - defer store.Close() - result := make([]interface{}, 0, lastIdx-firstIdx+1) - for i := firstIdx; i <= lastIdx; i++ { - var e raft.Log - err := store.GetLog(i, &e) - if err != nil { - warnings = append(warnings, fmt.Errorf("failed to read log entry at index %d (firstIdx: %d, lastIdx: %d): %v", i, firstIdx, lastIdx, err)) - continue + entries := make(chan interface{}) + warnings := make(chan error) + + go func() { + defer store.Close() + defer close(entries) + for i := firstIdx; i <= lastIdx; i++ { + var e raft.Log + err := store.GetLog(i, &e) + if err != nil { + warnings <- fmt.Errorf( + "failed to read log entry at index %d (firstIdx: %d, lastIdx: %d): %v", + i, firstIdx, lastIdx, err) + continue + } + + entry, err := decode(&e) + if err != nil { + warnings <- fmt.Errorf( + "failed to decode log entry at index %d: %v", i, err) + continue + } + + entries <- entry } + }() - m, err := decode(&e) - if err != nil { - warnings = append(warnings, fmt.Errorf("failed to decode log entry at index %d: %v", i, err)) - continue - } - - result = append(result, m) - } - - return result, warnings, nil + return entries, warnings, nil } type logMessage struct {