mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
cli: stream raft logs to operator raft logs subcommand (#11684)
The `nomad operator raft logs` command uses a raft helper that reads in the logs from raft and serializes them to JSON. The previous implementation returned the slice of all logs and then serializes the entire object. Update the helper to stream the log entries and then serialize them as newline-delimited JSON.
This commit is contained in:
@@ -28,6 +28,11 @@ Usage: nomad operator raft logs <path to nomad data dir>
|
||||
This is a low-level debugging tool and not subject to Nomad's usual backward
|
||||
compatibility guarantees.
|
||||
|
||||
Raft Logs Options:
|
||||
|
||||
-pretty
|
||||
By default this command outputs newline delimited JSON. If the -pretty flag
|
||||
is passed, each entry will be pretty-printed.
|
||||
`
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
@@ -47,10 +52,20 @@ func (c *OperatorRaftLogsCommand) Synopsis() string {
|
||||
func (c *OperatorRaftLogsCommand) Name() string { return "operator raft logs" }
|
||||
|
||||
func (c *OperatorRaftLogsCommand) Run(args []string) int {
|
||||
if len(args) != 1 {
|
||||
|
||||
var pretty bool
|
||||
flagSet := c.Meta.FlagSet(c.Name(), FlagSetClient)
|
||||
flagSet.Usage = func() { c.Ui.Output(c.Help()) }
|
||||
flagSet.BoolVar(&pretty, "pretty", false, "")
|
||||
|
||||
if err := flagSet.Parse(args); err != nil {
|
||||
return 1
|
||||
}
|
||||
|
||||
args = flagSet.Args()
|
||||
if l := len(args); l != 1 {
|
||||
c.Ui.Error("This command takes one argument: <path>")
|
||||
c.Ui.Error(commandErrorText(c))
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
@@ -60,22 +75,40 @@ func (c *OperatorRaftLogsCommand) Run(args []string) int {
|
||||
return 1
|
||||
}
|
||||
|
||||
logs, warnings, err := raftutil.LogEntries(raftPath)
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
if pretty {
|
||||
enc.SetIndent("", " ")
|
||||
}
|
||||
|
||||
logChan, warningsChan, err := raftutil.LogEntries(raftPath)
|
||||
if err != nil {
|
||||
c.Ui.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
// so that the warnings don't end up mixed into the JSON stream,
|
||||
// collect them and print them once we're done
|
||||
warnings := []error{}
|
||||
|
||||
DONE:
|
||||
for {
|
||||
select {
|
||||
case log := <-logChan:
|
||||
if log == nil {
|
||||
break DONE // no more logs, but break to print warnings
|
||||
}
|
||||
if err := enc.Encode(log); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("failed to encode output: %v", err))
|
||||
return 1
|
||||
}
|
||||
case warning := <-warningsChan:
|
||||
warnings = append(warnings, warning)
|
||||
}
|
||||
}
|
||||
|
||||
for _, warning := range warnings {
|
||||
c.Ui.Error(warning.Error())
|
||||
}
|
||||
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
if err := enc.Encode(logs); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("failed to encode output: %v", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -32,33 +32,44 @@ func RaftStateInfo(p string) (store *raftboltdb.BoltStore, firstIdx uint64, last
|
||||
return s, firstIdx, lastIdx, nil
|
||||
}
|
||||
|
||||
// LogEntries returns the log entries as found in raft log in the passed data-dir directory
|
||||
func LogEntries(p string) (logs []interface{}, warnings []error, err error) {
|
||||
// LogEntries reads the raft logs found in the data directory found at
|
||||
// the path `p`, and returns a channel of logs, and a channel of
|
||||
// warnings. If opening the raft state returns an error, both channels
|
||||
// will be nil.
|
||||
func LogEntries(p string) (<-chan interface{}, <-chan error, error) {
|
||||
store, firstIdx, lastIdx, err := RaftStateInfo(p)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to open raft logs: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
result := make([]interface{}, 0, lastIdx-firstIdx+1)
|
||||
for i := firstIdx; i <= lastIdx; i++ {
|
||||
var e raft.Log
|
||||
err := store.GetLog(i, &e)
|
||||
if err != nil {
|
||||
warnings = append(warnings, fmt.Errorf("failed to read log entry at index %d (firstIdx: %d, lastIdx: %d): %v", i, firstIdx, lastIdx, err))
|
||||
continue
|
||||
entries := make(chan interface{})
|
||||
warnings := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer store.Close()
|
||||
defer close(entries)
|
||||
for i := firstIdx; i <= lastIdx; i++ {
|
||||
var e raft.Log
|
||||
err := store.GetLog(i, &e)
|
||||
if err != nil {
|
||||
warnings <- fmt.Errorf(
|
||||
"failed to read log entry at index %d (firstIdx: %d, lastIdx: %d): %v",
|
||||
i, firstIdx, lastIdx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
entry, err := decode(&e)
|
||||
if err != nil {
|
||||
warnings <- fmt.Errorf(
|
||||
"failed to decode log entry at index %d: %v", i, err)
|
||||
continue
|
||||
}
|
||||
|
||||
entries <- entry
|
||||
}
|
||||
}()
|
||||
|
||||
m, err := decode(&e)
|
||||
if err != nil {
|
||||
warnings = append(warnings, fmt.Errorf("failed to decode log entry at index %d: %v", i, err))
|
||||
continue
|
||||
}
|
||||
|
||||
result = append(result, m)
|
||||
}
|
||||
|
||||
return result, warnings, nil
|
||||
return entries, warnings, nil
|
||||
}
|
||||
|
||||
type logMessage struct {
|
||||
|
||||
Reference in New Issue
Block a user