From c3c2240304e53b40384346808627be8a189c44bd Mon Sep 17 00:00:00 2001 From: Gerard Nguyen Date: Thu, 6 Jun 2024 15:57:10 +1000 Subject: [PATCH] Update nomad operator snapshot inspect with more detail (#20062) Co-authored-by: Michael Schurter Co-authored-by: James Rasell --- .changelog/18372.txt | 3 + command/operator_snapshot_inspect.go | 194 +++++++++++++++++++++- command/operator_snapshot_inspect_test.go | 2 +- nomad/fsm.go | 86 +++++++++- nomad/fsm_ce.go | 5 + 5 files changed, 278 insertions(+), 12 deletions(-) create mode 100644 .changelog/18372.txt diff --git a/.changelog/18372.txt b/.changelog/18372.txt new file mode 100644 index 000000000..a0013852e --- /dev/null +++ b/.changelog/18372.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: `operator snapshot inspect` now includes details of data in snapshot +``` \ No newline at end of file diff --git a/command/operator_snapshot_inspect.go b/command/operator_snapshot_inspect.go index dee3d2d2b..6d47dfc01 100644 --- a/command/operator_snapshot_inspect.go +++ b/command/operator_snapshot_inspect.go @@ -5,10 +5,18 @@ package command import ( "fmt" + "io" "os" + "sort" + "strconv" "strings" + "time" + humanize "github.com/dustin/go-humanize" + "github.com/hashicorp/go-msgpack/v2/codec" "github.com/hashicorp/nomad/helper/snapshot" + "github.com/hashicorp/nomad/nomad" + "github.com/hashicorp/raft" "github.com/posener/complete" ) @@ -16,14 +24,55 @@ type OperatorSnapshotInspectCommand struct { Meta } +type typeStats struct { + Name string + Sum int + Count int +} + +type SnapshotInspectFormat struct { + Meta *raft.SnapshotMeta + Stats []typeStats +} + +// SnapshotInfo is used for passing snapshot stat +// information between functions +type SnapshotInfo struct { + Stats map[nomad.SnapshotType]typeStats + TotalSize int + TotalCount int +} + +// countingReader helps keep track of the bytes we have read +// when reading snapshots +type countingReader struct { + wrappedReader io.Reader + read int +} + +func (r *countingReader) Read(p []byte) (n int, err error) { + n, err = r.wrappedReader.Read(p) + if err == nil { + r.read += n + } + return n, err +} + func (c *OperatorSnapshotInspectCommand) Help() string { helpText := ` Usage: nomad operator snapshot inspect [options] Displays information about a snapshot file on disk. + The output will include all snapshot types and their + respective sizes, sorted in descending order. To inspect the file "backup.snap": $ nomad operator snapshot inspect backup.snap + +Snapshot Inspect Options: + + -json + Output the snapshot inspect in its JSON format. ` return strings.TrimSpace(helpText) } @@ -33,7 +82,7 @@ func (c *OperatorSnapshotInspectCommand) AutocompleteFlags() complete.Flags { } func (c *OperatorSnapshotInspectCommand) AutocompleteArgs() complete.Predictor { - return complete.PredictFiles("*.snap") + return complete.PredictNothing } func (c *OperatorSnapshotInspectCommand) Synopsis() string { @@ -43,7 +92,18 @@ func (c *OperatorSnapshotInspectCommand) Synopsis() string { func (c *OperatorSnapshotInspectCommand) Name() string { return "operator snapshot inspect" } func (c *OperatorSnapshotInspectCommand) Run(args []string) int { + var json bool + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&json, "json", false, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + // Check that we either got no filename or exactly one. + args = flags.Args() if len(args) != 1 { c.Ui.Error("This command takes one argument: ") c.Ui.Error(commandErrorText(c)) @@ -58,20 +118,140 @@ func (c *OperatorSnapshotInspectCommand) Run(args []string) int { } defer f.Close() - meta, err := snapshot.Verify(f) + meta, info, err := inspect(f) if err != nil { - c.Ui.Error(fmt.Sprintf("Error verifying snapshot: %s", err)) + c.Ui.Error(fmt.Sprintf("Error inspecting snapshot: %s", err)) return 1 } + stats := generateStats(info) - output := []string{ + // format as JSON if requested + if json { + data := SnapshotInspectFormat{ + Meta: meta, + Stats: stats, + } + out, err := Format(json, "", data) + if err != nil { + c.Ui.Error(err.Error()) + return 1 + } + + c.Ui.Output(out) + return 0 + } + + // print human-readable output + c.Ui.Output(formatKV([]string{ + fmt.Sprintf("Created|%s", extractTimeFromName(meta.ID)), fmt.Sprintf("ID|%s", meta.ID), - fmt.Sprintf("Size|%d", meta.Size), + fmt.Sprintf("Size|%s", humanize.IBytes(uint64(meta.Size))), fmt.Sprintf("Index|%d", meta.Index), fmt.Sprintf("Term|%d", meta.Term), fmt.Sprintf("Version|%d", meta.Version), - } + })) + c.Ui.Output("") - c.Ui.Output(formatList(output)) + output := []string{"Type|Count|Size"} + + for _, stat := range stats { + output = append(output, fmt.Sprintf("%s|%d|%s", stat.Name, stat.Count, humanize.IBytes(uint64(stat.Sum)))) + } + output = append(output, + " | | ", + fmt.Sprintf("Total|%v|%s", info.TotalCount, humanize.IBytes(uint64(info.TotalSize))), + ) + + c.Ui.Output(formatListWithSpaces(output)) return 0 } + +func inspect(file io.Reader) (*raft.SnapshotMeta, *SnapshotInfo, error) { + info := &SnapshotInfo{ + Stats: make(map[nomad.SnapshotType]typeStats), + TotalSize: 0, + } + + // w is closed by CopySnapshot + r, w := io.Pipe() + cr := &countingReader{wrappedReader: r} + errCh := make(chan error) + metaCh := make(chan *raft.SnapshotMeta) + + go func() { + meta, err := snapshot.CopySnapshot(file, w) + if err != nil { + errCh <- fmt.Errorf("failed to read snapshot: %w", err) + } else { + metaCh <- meta + } + }() + + handler := func(header *nomad.SnapshotHeader, snapType nomad.SnapshotType, dec *codec.Decoder) error { + name := snapType.String() + stat := info.Stats[snapType] + + if stat.Name == "" { + stat.Name = name + } + + var val interface{} + err := dec.Decode(&val) + if err != nil { + return fmt.Errorf("failed to decode snapshot %q: %v", snapType, err) + } + + size := cr.read - info.TotalSize + stat.Sum += size + stat.Count++ + info.TotalSize = cr.read + info.TotalCount++ + info.Stats[snapType] = stat + + return nil + } + + err := nomad.ReadSnapshot(cr, handler) + if err != nil { + return nil, nil, err + } + + select { + case err := <-errCh: + return nil, nil, err + case meta := <-metaCh: + return meta, info, nil + } +} + +func generateStats(info *SnapshotInfo) []typeStats { + ss := make([]typeStats, 0, len(info.Stats)) + for _, stat := range info.Stats { + ss = append(ss, stat) + } + + // sort by Sum + sort.Slice(ss, func(i, j int) bool { + // sort alphabetically if size is equal + if ss[i].Sum == ss[j].Sum { + return ss[i].Name < ss[j].Name + } + return ss[i].Sum > ss[j].Sum + }) + + return ss +} + +// Raft snapshot name is in format of -- +// we will extract the creation time +func extractTimeFromName(snapshotName string) string { + parts := strings.Split(snapshotName, "-") + if len(parts) != 3 { + return "" + } + msec, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + return "" + } + return formatTime(time.UnixMilli(msec)) +} diff --git a/command/operator_snapshot_inspect_test.go b/command/operator_snapshot_inspect_test.go index c6dbb8622..203f441db 100644 --- a/command/operator_snapshot_inspect_test.go +++ b/command/operator_snapshot_inspect_test.go @@ -64,7 +64,7 @@ func TestOperatorSnapshotInspect_HandlesFailure(t *testing.T) { code := cmd.Run([]string{filepath.Join(tmpDir, "invalid.snap")}) must.Positive(t, code) - must.StrContains(t, ui.ErrorWriter.String(), "Error verifying snapshot") + must.StrContains(t, ui.ErrorWriter.String(), "Error inspecting snapshot") }) } diff --git a/nomad/fsm.go b/nomad/fsm.go index e3d757164..3e7d99d7c 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -71,6 +71,40 @@ const ( NamespaceSnapshot SnapshotType = 64 ) +var snapshotTypeStrings = map[SnapshotType]string{ + NodeSnapshot: "Node", + JobSnapshot: "Job", + IndexSnapshot: "Index", + EvalSnapshot: "Eval", + AllocSnapshot: "Alloc", + TimeTableSnapshot: "TimeTable", + PeriodicLaunchSnapshot: "PeriodicLaunch", + JobSummarySnapshot: "JobSummary", + VaultAccessorSnapshot: "VaultAccessor", + JobVersionSnapshot: "JobVersion", + DeploymentSnapshot: "Deployment", + ACLPolicySnapshot: "ACLPolicy", + ACLTokenSnapshot: "ACLToken", + SchedulerConfigSnapshot: "SchedulerConfig", + ClusterMetadataSnapshot: "ClusterMetadata", + ServiceIdentityTokenAccessorSnapshot: "ServiceIdentityTokenAccessor", + ScalingPolicySnapshot: "ScalingPolicy", + CSIPluginSnapshot: "CSIPlugin", + CSIVolumeSnapshot: "CSIVolume", + ScalingEventsSnapshot: "ScalingEvents", + EventSinkSnapshot: "EventSink", + ServiceRegistrationSnapshot: "ServiceRegistration", + VariablesSnapshot: "Variables", + VariablesQuotaSnapshot: "VariablesQuota", + RootKeyMetaSnapshot: "RootKeyMeta", + ACLRoleSnapshot: "ACLRole", + ACLAuthMethodSnapshot: "ACLAuthMethod", + ACLBindingRuleSnapshot: "ACLBindingRule", + NodePoolSnapshot: "NodePool", + JobSubmissionSnapshot: "JobSubmission", + NamespaceSnapshot: "Namespace", +} + // LogApplier is the definition of a function that can apply a Raft log type LogApplier func(buf []byte, index uint64) interface{} @@ -120,8 +154,8 @@ type nomadSnapshot struct { timetable *TimeTable } -// snapshotHeader is the first entry in our snapshot -type snapshotHeader struct { +// SnapshotHeader is the first entry in our snapshot +type SnapshotHeader struct { } // FSMConfig is used to configure the FSM @@ -1502,7 +1536,7 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error { dec := codec.NewDecoder(old, structs.MsgpackHandle) // Read in the header - var header snapshotHeader + var header SnapshotHeader if err := dec.Decode(&header); err != nil { return err } @@ -2299,7 +2333,7 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { encoder := codec.NewEncoder(sink, structs.MsgpackHandle) // Write the header - header := snapshotHeader{} + header := SnapshotHeader{} if err := encoder.Encode(&header); err != nil { sink.Cancel() return err @@ -3174,3 +3208,47 @@ func (s *nomadSnapshot) persistJobSubmissions(sink raft.SnapshotSink, encoder *c // to the state store snapshot. There is nothing to explicitly // cleanup. func (s *nomadSnapshot) Release() {} + +// ReadSnapshot decodes each message type and utilizes the handler function to +// process each message type individually +func ReadSnapshot(r io.Reader, handler func(header *SnapshotHeader, snapType SnapshotType, dec *codec.Decoder) error) error { + // Create a decoder + dec := codec.NewDecoder(r, structs.MsgpackHandle) + + // Read in the header + var header SnapshotHeader + if err := dec.Decode(&header); err != nil { + return err + } + + // Populate the new state + msgType := make([]byte, 1) + for { + // Read the message type + _, err := r.Read(msgType) + if err == io.EOF { + return nil + } else if err != nil { + return err + } + + // Decode + snapType := SnapshotType(msgType[0]) + + if err := handler(&header, snapType, dec); err != nil { + return err + } + } +} + +func (s SnapshotType) String() string { + v, ok := snapshotTypeStrings[s] + if ok { + return v + } + v, ok = enterpriseSnapshotType(s) + if ok { + return v + } + return fmt.Sprintf("Unknown(%d)", s) +} diff --git a/nomad/fsm_ce.go b/nomad/fsm_ce.go index 961e389c1..a69429c8d 100644 --- a/nomad/fsm_ce.go +++ b/nomad/fsm_ce.go @@ -11,3 +11,8 @@ package nomad func (n *nomadFSM) allocQuota(_ string) (string, error) { return "", nil } + +// enterpriseSnapshotType is a no-op for community edition. +func enterpriseSnapshotType(s SnapshotType) (string, bool) { + return "", false +}