Update nomad operator snapshot inspect with more detail (#20062)

Co-authored-by: Michael Schurter <michael.schurter@gmail.com>
Co-authored-by: James Rasell <jrasell@hashicorp.com>
This commit is contained in:
Gerard Nguyen
2024-06-06 15:57:10 +10:00
committed by GitHub
parent 34f34440ac
commit c3c2240304
5 changed files with 278 additions and 12 deletions

3
.changelog/18372.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:improvement
cli: `operator snapshot inspect` now includes details of data in snapshot
```

View File

@@ -5,10 +5,18 @@ package command
import (
"fmt"
"io"
"os"
"sort"
"strconv"
"strings"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/hashicorp/go-msgpack/v2/codec"
"github.com/hashicorp/nomad/helper/snapshot"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/raft"
"github.com/posener/complete"
)
@@ -16,14 +24,55 @@ type OperatorSnapshotInspectCommand struct {
Meta
}
type typeStats struct {
Name string
Sum int
Count int
}
type SnapshotInspectFormat struct {
Meta *raft.SnapshotMeta
Stats []typeStats
}
// SnapshotInfo is used for passing snapshot stat
// information between functions
type SnapshotInfo struct {
Stats map[nomad.SnapshotType]typeStats
TotalSize int
TotalCount int
}
// countingReader helps keep track of the bytes we have read
// when reading snapshots
type countingReader struct {
wrappedReader io.Reader
read int
}
func (r *countingReader) Read(p []byte) (n int, err error) {
n, err = r.wrappedReader.Read(p)
if err == nil {
r.read += n
}
return n, err
}
func (c *OperatorSnapshotInspectCommand) Help() string {
helpText := `
Usage: nomad operator snapshot inspect [options] <file>
Displays information about a snapshot file on disk.
The output will include all snapshot types and their
respective sizes, sorted in descending order.
To inspect the file "backup.snap":
$ nomad operator snapshot inspect backup.snap
Snapshot Inspect Options:
-json
Output the snapshot inspect in its JSON format.
`
return strings.TrimSpace(helpText)
}
@@ -33,7 +82,7 @@ func (c *OperatorSnapshotInspectCommand) AutocompleteFlags() complete.Flags {
}
func (c *OperatorSnapshotInspectCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFiles("*.snap")
return complete.PredictNothing
}
func (c *OperatorSnapshotInspectCommand) Synopsis() string {
@@ -43,7 +92,18 @@ func (c *OperatorSnapshotInspectCommand) Synopsis() string {
func (c *OperatorSnapshotInspectCommand) Name() string { return "operator snapshot inspect" }
func (c *OperatorSnapshotInspectCommand) Run(args []string) int {
var json bool
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&json, "json", false, "")
if err := flags.Parse(args); err != nil {
return 1
}
// Check that we either got no filename or exactly one.
args = flags.Args()
if len(args) != 1 {
c.Ui.Error("This command takes one argument: <filename>")
c.Ui.Error(commandErrorText(c))
@@ -58,20 +118,140 @@ func (c *OperatorSnapshotInspectCommand) Run(args []string) int {
}
defer f.Close()
meta, err := snapshot.Verify(f)
meta, info, err := inspect(f)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error verifying snapshot: %s", err))
c.Ui.Error(fmt.Sprintf("Error inspecting snapshot: %s", err))
return 1
}
stats := generateStats(info)
// format as JSON if requested
if json {
data := SnapshotInspectFormat{
Meta: meta,
Stats: stats,
}
out, err := Format(json, "", data)
if err != nil {
c.Ui.Error(err.Error())
return 1
}
output := []string{
c.Ui.Output(out)
return 0
}
// print human-readable output
c.Ui.Output(formatKV([]string{
fmt.Sprintf("Created|%s", extractTimeFromName(meta.ID)),
fmt.Sprintf("ID|%s", meta.ID),
fmt.Sprintf("Size|%d", meta.Size),
fmt.Sprintf("Size|%s", humanize.IBytes(uint64(meta.Size))),
fmt.Sprintf("Index|%d", meta.Index),
fmt.Sprintf("Term|%d", meta.Term),
fmt.Sprintf("Version|%d", meta.Version),
}
}))
c.Ui.Output("")
c.Ui.Output(formatList(output))
output := []string{"Type|Count|Size"}
for _, stat := range stats {
output = append(output, fmt.Sprintf("%s|%d|%s", stat.Name, stat.Count, humanize.IBytes(uint64(stat.Sum))))
}
output = append(output,
" | | ",
fmt.Sprintf("Total|%v|%s", info.TotalCount, humanize.IBytes(uint64(info.TotalSize))),
)
c.Ui.Output(formatListWithSpaces(output))
return 0
}
func inspect(file io.Reader) (*raft.SnapshotMeta, *SnapshotInfo, error) {
info := &SnapshotInfo{
Stats: make(map[nomad.SnapshotType]typeStats),
TotalSize: 0,
}
// w is closed by CopySnapshot
r, w := io.Pipe()
cr := &countingReader{wrappedReader: r}
errCh := make(chan error)
metaCh := make(chan *raft.SnapshotMeta)
go func() {
meta, err := snapshot.CopySnapshot(file, w)
if err != nil {
errCh <- fmt.Errorf("failed to read snapshot: %w", err)
} else {
metaCh <- meta
}
}()
handler := func(header *nomad.SnapshotHeader, snapType nomad.SnapshotType, dec *codec.Decoder) error {
name := snapType.String()
stat := info.Stats[snapType]
if stat.Name == "" {
stat.Name = name
}
var val interface{}
err := dec.Decode(&val)
if err != nil {
return fmt.Errorf("failed to decode snapshot %q: %v", snapType, err)
}
size := cr.read - info.TotalSize
stat.Sum += size
stat.Count++
info.TotalSize = cr.read
info.TotalCount++
info.Stats[snapType] = stat
return nil
}
err := nomad.ReadSnapshot(cr, handler)
if err != nil {
return nil, nil, err
}
select {
case err := <-errCh:
return nil, nil, err
case meta := <-metaCh:
return meta, info, nil
}
}
func generateStats(info *SnapshotInfo) []typeStats {
ss := make([]typeStats, 0, len(info.Stats))
for _, stat := range info.Stats {
ss = append(ss, stat)
}
// sort by Sum
sort.Slice(ss, func(i, j int) bool {
// sort alphabetically if size is equal
if ss[i].Sum == ss[j].Sum {
return ss[i].Name < ss[j].Name
}
return ss[i].Sum > ss[j].Sum
})
return ss
}
// Raft snapshot name is in format of <term>-<index>-<time-milliseconds>
// we will extract the creation time
func extractTimeFromName(snapshotName string) string {
parts := strings.Split(snapshotName, "-")
if len(parts) != 3 {
return ""
}
msec, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return ""
}
return formatTime(time.UnixMilli(msec))
}

View File

@@ -64,7 +64,7 @@ func TestOperatorSnapshotInspect_HandlesFailure(t *testing.T) {
code := cmd.Run([]string{filepath.Join(tmpDir, "invalid.snap")})
must.Positive(t, code)
must.StrContains(t, ui.ErrorWriter.String(), "Error verifying snapshot")
must.StrContains(t, ui.ErrorWriter.String(), "Error inspecting snapshot")
})
}

View File

@@ -71,6 +71,40 @@ const (
NamespaceSnapshot SnapshotType = 64
)
var snapshotTypeStrings = map[SnapshotType]string{
NodeSnapshot: "Node",
JobSnapshot: "Job",
IndexSnapshot: "Index",
EvalSnapshot: "Eval",
AllocSnapshot: "Alloc",
TimeTableSnapshot: "TimeTable",
PeriodicLaunchSnapshot: "PeriodicLaunch",
JobSummarySnapshot: "JobSummary",
VaultAccessorSnapshot: "VaultAccessor",
JobVersionSnapshot: "JobVersion",
DeploymentSnapshot: "Deployment",
ACLPolicySnapshot: "ACLPolicy",
ACLTokenSnapshot: "ACLToken",
SchedulerConfigSnapshot: "SchedulerConfig",
ClusterMetadataSnapshot: "ClusterMetadata",
ServiceIdentityTokenAccessorSnapshot: "ServiceIdentityTokenAccessor",
ScalingPolicySnapshot: "ScalingPolicy",
CSIPluginSnapshot: "CSIPlugin",
CSIVolumeSnapshot: "CSIVolume",
ScalingEventsSnapshot: "ScalingEvents",
EventSinkSnapshot: "EventSink",
ServiceRegistrationSnapshot: "ServiceRegistration",
VariablesSnapshot: "Variables",
VariablesQuotaSnapshot: "VariablesQuota",
RootKeyMetaSnapshot: "RootKeyMeta",
ACLRoleSnapshot: "ACLRole",
ACLAuthMethodSnapshot: "ACLAuthMethod",
ACLBindingRuleSnapshot: "ACLBindingRule",
NodePoolSnapshot: "NodePool",
JobSubmissionSnapshot: "JobSubmission",
NamespaceSnapshot: "Namespace",
}
// LogApplier is the definition of a function that can apply a Raft log
type LogApplier func(buf []byte, index uint64) interface{}
@@ -120,8 +154,8 @@ type nomadSnapshot struct {
timetable *TimeTable
}
// snapshotHeader is the first entry in our snapshot
type snapshotHeader struct {
// SnapshotHeader is the first entry in our snapshot
type SnapshotHeader struct {
}
// FSMConfig is used to configure the FSM
@@ -1502,7 +1536,7 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error {
dec := codec.NewDecoder(old, structs.MsgpackHandle)
// Read in the header
var header snapshotHeader
var header SnapshotHeader
if err := dec.Decode(&header); err != nil {
return err
}
@@ -2299,7 +2333,7 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
// Write the header
header := snapshotHeader{}
header := SnapshotHeader{}
if err := encoder.Encode(&header); err != nil {
sink.Cancel()
return err
@@ -3174,3 +3208,47 @@ func (s *nomadSnapshot) persistJobSubmissions(sink raft.SnapshotSink, encoder *c
// to the state store snapshot. There is nothing to explicitly
// cleanup.
func (s *nomadSnapshot) Release() {}
// ReadSnapshot decodes each message type and utilizes the handler function to
// process each message type individually
func ReadSnapshot(r io.Reader, handler func(header *SnapshotHeader, snapType SnapshotType, dec *codec.Decoder) error) error {
// Create a decoder
dec := codec.NewDecoder(r, structs.MsgpackHandle)
// Read in the header
var header SnapshotHeader
if err := dec.Decode(&header); err != nil {
return err
}
// Populate the new state
msgType := make([]byte, 1)
for {
// Read the message type
_, err := r.Read(msgType)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
// Decode
snapType := SnapshotType(msgType[0])
if err := handler(&header, snapType, dec); err != nil {
return err
}
}
}
func (s SnapshotType) String() string {
v, ok := snapshotTypeStrings[s]
if ok {
return v
}
v, ok = enterpriseSnapshotType(s)
if ok {
return v
}
return fmt.Sprintf("Unknown(%d)", s)
}

View File

@@ -11,3 +11,8 @@ package nomad
func (n *nomadFSM) allocQuota(_ string) (string, error) {
return "", nil
}
// enterpriseSnapshotType is a no-op for community edition.
func enterpriseSnapshotType(s SnapshotType) (string, bool) {
return "", false
}