mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
add helper commands for debugging state
This commit is contained in:
@@ -517,6 +517,21 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
|
||||
Meta: meta,
|
||||
}, nil
|
||||
},
|
||||
"operator raft _info": func() (cli.Command, error) {
|
||||
return &OperatorRaftInfoCommand{
|
||||
Meta: meta,
|
||||
}, nil
|
||||
},
|
||||
"operator raft _logs": func() (cli.Command, error) {
|
||||
return &OperatorRaftLogsCommand{
|
||||
Meta: meta,
|
||||
}, nil
|
||||
},
|
||||
"operator raft _state": func() (cli.Command, error) {
|
||||
return &OperatorRaftStateCommand{
|
||||
Meta: meta,
|
||||
}, nil
|
||||
},
|
||||
|
||||
"operator snapshot": func() (cli.Command, error) {
|
||||
return &OperatorSnapshotCommand{
|
||||
|
||||
66
command/operator_raft_info.go
Normal file
66
command/operator_raft_info.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/nomad/helper/raftutil"
|
||||
"github.com/posener/complete"
|
||||
)
|
||||
|
||||
type OperatorRaftInfoCommand struct {
|
||||
Meta
|
||||
}
|
||||
|
||||
func (c *OperatorRaftInfoCommand) Help() string {
|
||||
helpText := `
|
||||
Usage: nomad operator raft _info <path to nomad data dir>
|
||||
|
||||
Displays info about the raft logs in the data directory
|
||||
`
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
func (c *OperatorRaftInfoCommand) AutocompleteFlags() complete.Flags {
|
||||
return complete.Flags{}
|
||||
}
|
||||
|
||||
func (c *OperatorRaftInfoCommand) AutocompleteArgs() complete.Predictor {
|
||||
return complete.PredictNothing
|
||||
}
|
||||
|
||||
func (c *OperatorRaftInfoCommand) Synopsis() string {
|
||||
return "Display info of the raft log"
|
||||
}
|
||||
|
||||
func (c *OperatorRaftInfoCommand) Name() string { return "operator raft _info" }
|
||||
|
||||
func (c *OperatorRaftInfoCommand) Run(args []string) int {
|
||||
if len(args) != 1 {
|
||||
c.Ui.Error("This command takes one argument: <path>")
|
||||
c.Ui.Error(commandErrorText(c))
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
p := args[0]
|
||||
if fi, err := os.Stat(p); err == nil && fi.IsDir() {
|
||||
p = filepath.Join(args[0], "server", "raft", "raft.db")
|
||||
}
|
||||
|
||||
store, firstIdx, lastIdx, err := raftutil.RaftStateInfo(p)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("failed to open raft logs: %v", err))
|
||||
return 1
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
c.Ui.Output(fmt.Sprintf("path: %v", p))
|
||||
c.Ui.Output(fmt.Sprintf("length: %v", lastIdx-firstIdx+1))
|
||||
c.Ui.Output(fmt.Sprintf("first index: %v", firstIdx))
|
||||
c.Ui.Output(fmt.Sprintf("last index: %v", lastIdx))
|
||||
|
||||
return 0
|
||||
}
|
||||
72
command/operator_raft_logs.go
Normal file
72
command/operator_raft_logs.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/nomad/helper/raftutil"
|
||||
"github.com/posener/complete"
|
||||
)
|
||||
|
||||
type OperatorRaftLogsCommand struct {
|
||||
Meta
|
||||
}
|
||||
|
||||
func (c *OperatorRaftLogsCommand) Help() string {
|
||||
helpText := `
|
||||
Usage: nomad operator raft _logs <path to nomad data dir>
|
||||
|
||||
Display the log entries persisted in data dir in json form
|
||||
`
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
func (c *OperatorRaftLogsCommand) AutocompleteFlags() complete.Flags {
|
||||
return complete.Flags{}
|
||||
}
|
||||
|
||||
func (c *OperatorRaftLogsCommand) AutocompleteArgs() complete.Predictor {
|
||||
return complete.PredictNothing
|
||||
}
|
||||
|
||||
func (c *OperatorRaftLogsCommand) Synopsis() string {
|
||||
return "Display raft log content"
|
||||
}
|
||||
|
||||
func (c *OperatorRaftLogsCommand) Name() string { return "operator raft _info" }
|
||||
|
||||
func (c *OperatorRaftLogsCommand) Run(args []string) int {
|
||||
if len(args) != 1 {
|
||||
c.Ui.Error("This command takes one argument: <path>")
|
||||
c.Ui.Error(commandErrorText(c))
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
p := args[0]
|
||||
if fi, err := os.Stat(p); err == nil && fi.IsDir() {
|
||||
p = filepath.Join(args[0], "server", "raft", "raft.db")
|
||||
}
|
||||
|
||||
logs, warnings, err := raftutil.LogEntries(p)
|
||||
if err != nil {
|
||||
c.Ui.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
for _, warning := range warnings {
|
||||
c.Ui.Error(warning.Error())
|
||||
}
|
||||
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
if err := enc.Encode(logs); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("failed to encode output: %v", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
94
command/operator_raft_state.go
Normal file
94
command/operator_raft_state.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/nomad/helper/raftutil"
|
||||
"github.com/posener/complete"
|
||||
)
|
||||
|
||||
type OperatorRaftStateCommand struct {
|
||||
Meta
|
||||
}
|
||||
|
||||
func (c *OperatorRaftStateCommand) Help() string {
|
||||
helpText := `
|
||||
Usage: nomad operator raft _state <path to nomad data dir>
|
||||
|
||||
Display the log entries persisted in data dir in json form
|
||||
|
||||
Options:
|
||||
|
||||
--last-index=<last_index>
|
||||
Set the last log index to be applied, to drop spurious log entries not
|
||||
properly committed. If passed last_index is zero or negative, it's perceived
|
||||
as an offset from the last index seen in raft.
|
||||
`
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
func (c *OperatorRaftStateCommand) AutocompleteFlags() complete.Flags {
|
||||
return complete.Flags{}
|
||||
}
|
||||
|
||||
func (c *OperatorRaftStateCommand) AutocompleteArgs() complete.Predictor {
|
||||
return complete.PredictNothing
|
||||
}
|
||||
|
||||
func (c *OperatorRaftStateCommand) Synopsis() string {
|
||||
return "Display raft log content"
|
||||
}
|
||||
|
||||
func (c *OperatorRaftStateCommand) Name() string { return "operator raft _info" }
|
||||
|
||||
func (c *OperatorRaftStateCommand) Run(args []string) int {
|
||||
var fLastIdx int64
|
||||
|
||||
flags := c.Meta.FlagSet(c.Name(), 0)
|
||||
flags.Usage = func() { fmt.Println(c.Help()) }
|
||||
flags.Int64Var(&fLastIdx, "last-index", 0, "")
|
||||
|
||||
if err := flags.Parse(args); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err))
|
||||
return 1
|
||||
}
|
||||
args = flags.Args()
|
||||
|
||||
if len(args) != 1 {
|
||||
c.Ui.Error("This command takes one argument: <path>")
|
||||
c.Ui.Error(commandErrorText(c))
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
var p string
|
||||
if _, err := os.Stat(filepath.Join(args[0], "server", "raft", "raft.db")); err == nil {
|
||||
p = filepath.Join(args[0], "server", "raft")
|
||||
} else if _, err := os.Stat(filepath.Join(args[0], "raft", "raft.db")); err == nil {
|
||||
p = filepath.Join(args[0], "raft")
|
||||
} else if _, err := os.Stat(filepath.Join(args[0], "raft.db")); err == nil {
|
||||
p = args[0]
|
||||
} else {
|
||||
c.Ui.Error("path needs to be a data dir path with raft.db file")
|
||||
return 1
|
||||
}
|
||||
|
||||
state, err := raftutil.FSMState(p, fLastIdx)
|
||||
if err != nil {
|
||||
c.Ui.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
if err := enc.Encode(state); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("failed to encode output: %v", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
26
command/raft_tools/state.go
Normal file
26
command/raft_tools/state.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package rafttools
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
raftboltdb "github.com/hashicorp/raft-boltdb"
|
||||
)
|
||||
|
||||
func RaftState(p string) (store *raftboltdb.BoltStore, firstIdx uint64, lastIdx uint64, err error) {
|
||||
s, err := raftboltdb.NewBoltStore(p)
|
||||
if err != nil {
|
||||
return nil, 0, 0, fmt.Errorf("failed to open raft logs: %v", err)
|
||||
}
|
||||
|
||||
firstIdx, err = s.FirstIndex()
|
||||
if err != nil {
|
||||
return nil, 0, 0, fmt.Errorf("failed to fetch first index: %v", err)
|
||||
}
|
||||
|
||||
lastIdx, err = s.LastIndex()
|
||||
if err != nil {
|
||||
return nil, 0, 0, fmt.Errorf("failed to fetch last index: %v", err)
|
||||
}
|
||||
|
||||
return s, firstIdx, lastIdx, nil
|
||||
}
|
||||
159
helper/raftutil/fsm.go
Normal file
159
helper/raftutil/fsm.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package raftutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
// FSMState returns a dump of the FSM state as found in data-dir, as of lastIndx value
|
||||
func FSMState(p string, plastIdx int64) (interface{}, error) {
|
||||
store, firstIdx, lastIdx, err := RaftStateInfo(filepath.Join(p, "raft.db"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open raft logs: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
snaps, err := raft.NewFileSnapshotStore(p, 1000, os.Stderr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open snapshot dir: %v", err)
|
||||
}
|
||||
|
||||
logger := hclog.L()
|
||||
|
||||
// use dummy non-enabled FSM dependencies
|
||||
periodicDispatch := nomad.NewPeriodicDispatch(logger, nil)
|
||||
blockedEvals := nomad.NewBlockedEvals(nil, logger)
|
||||
evalBroker, err := nomad.NewEvalBroker(1, 1, 1, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fsmConfig := &nomad.FSMConfig{
|
||||
EvalBroker: evalBroker,
|
||||
Periodic: periodicDispatch,
|
||||
Blocked: blockedEvals,
|
||||
Logger: logger,
|
||||
Region: "default",
|
||||
}
|
||||
|
||||
fsm, err := nomad.NewFSM(fsmConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// restore from snapshot first
|
||||
sFirstIdx, err := restoreFromSnapshot(fsm, snaps, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if sFirstIdx+1 < firstIdx {
|
||||
return nil, fmt.Errorf("missing logs after snapshot [%v,%v]", sFirstIdx+1, firstIdx-1)
|
||||
} else if sFirstIdx > 0 {
|
||||
firstIdx = sFirstIdx + 1
|
||||
}
|
||||
|
||||
lastIdx = lastIndex(lastIdx, plastIdx)
|
||||
|
||||
for i := firstIdx; i <= lastIdx; i++ {
|
||||
var e raft.Log
|
||||
err := store.GetLog(i, &e)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read log entry at index %d: %v", i, err)
|
||||
}
|
||||
|
||||
if e.Type == raft.LogCommand {
|
||||
fsm.Apply(&e)
|
||||
}
|
||||
}
|
||||
|
||||
state := fsm.State()
|
||||
result := map[string][]interface{}{
|
||||
"ACLPolicies": toArray(state.ACLPolicies(nil)),
|
||||
"ACLTokens": toArray(state.ACLTokens(nil)),
|
||||
"Allocs": toArray(state.Allocs(nil)),
|
||||
"CSIPlugins": toArray(state.CSIPlugins(nil)),
|
||||
"CSIVolumes": toArray(state.CSIVolumes(nil)),
|
||||
"Deployments": toArray(state.Deployments(nil)),
|
||||
"Evals": toArray(state.Evals(nil)),
|
||||
"Indexes": toArray(state.Indexes()),
|
||||
"JobSummaries": toArray(state.JobSummaries(nil)),
|
||||
"JobVersions": toArray(state.JobVersions(nil)),
|
||||
"Jobs": toArray(state.Jobs(nil)),
|
||||
"Nodes": toArray(state.Nodes(nil)),
|
||||
"PeriodicLaunches": toArray(state.PeriodicLaunches(nil)),
|
||||
"SITokenAccessors": toArray(state.SITokenAccessors(nil)),
|
||||
"ScalingEvents": toArray(state.ScalingEvents(nil)),
|
||||
"ScalingPolicies": toArray(state.ScalingPolicies(nil)),
|
||||
"VaultAccessors": toArray(state.VaultAccessors(nil)),
|
||||
}
|
||||
|
||||
insertEnterpriseState(result, state)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func restoreFromSnapshot(fsm raft.FSM, snaps raft.SnapshotStore, logger hclog.Logger) (uint64, error) {
|
||||
snapshots, err := snaps.List()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
logger.Debug("found snapshots", "count", len(snapshots))
|
||||
|
||||
for _, snapshot := range snapshots {
|
||||
_, source, err := snaps.Open(snapshot.ID)
|
||||
if err != nil {
|
||||
logger.Warn("failed to open a snapshot", "snapshot_id", snapshot.ID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = fsm.Restore(source)
|
||||
source.Close()
|
||||
if err != nil {
|
||||
logger.Warn("failed to restore a snapshot", "snapshot_id", snapshot.ID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
return snapshot.Index, nil
|
||||
}
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func lastIndex(raftLastIdx uint64, cliLastIdx int64) uint64 {
|
||||
switch {
|
||||
case cliLastIdx < 0:
|
||||
if raftLastIdx > uint64(-cliLastIdx) {
|
||||
return raftLastIdx - uint64(-cliLastIdx)
|
||||
} else {
|
||||
return 0
|
||||
}
|
||||
case cliLastIdx == 0:
|
||||
return raftLastIdx
|
||||
case uint64(cliLastIdx) < raftLastIdx:
|
||||
return uint64(cliLastIdx)
|
||||
default:
|
||||
return raftLastIdx
|
||||
}
|
||||
}
|
||||
|
||||
func toArray(iter memdb.ResultIterator, err error) []interface{} {
|
||||
if err != nil {
|
||||
return []interface{}{err}
|
||||
}
|
||||
|
||||
r := []interface{}{}
|
||||
|
||||
item := iter.Next()
|
||||
for item != nil {
|
||||
r = append(r, item)
|
||||
item = iter.Next()
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
8
helper/raftutil/fsm_oss.go
Normal file
8
helper/raftutil/fsm_oss.go
Normal file
@@ -0,0 +1,8 @@
|
||||
// +build !ent
|
||||
|
||||
package raftutil
|
||||
|
||||
import "github.com/hashicorp/nomad/nomad/state"
|
||||
|
||||
func insertEnterpriseState(m map[string][]interface{}, state *state.StateStore) {
|
||||
}
|
||||
3
helper/raftutil/generate.go
Normal file
3
helper/raftutil/generate.go
Normal file
@@ -0,0 +1,3 @@
|
||||
package raftutil
|
||||
|
||||
//go:generate ./generate_msgtypes.sh
|
||||
25
helper/raftutil/generate_msgtypes.sh
Executable file
25
helper/raftutil/generate_msgtypes.sh
Executable file
@@ -0,0 +1,25 @@
|
||||
#!/bin/sh
|
||||
|
||||
set -e
|
||||
|
||||
generate_file() {
|
||||
cat <<EOF
|
||||
// Code generated by go generate; DO NOT EDIT.
|
||||
package raftutil
|
||||
|
||||
import "github.com/hashicorp/nomad/nomad/structs"
|
||||
|
||||
var msgTypeNames = map[structs.MessageType]string{
|
||||
EOF
|
||||
|
||||
cat ../../nomad/structs/structs.go \
|
||||
| grep -A500 'MessageType = iota' \
|
||||
| grep -v -e '//' \
|
||||
| awk '/^\)$/ { exit; } /.*/ { printf " structs.%s: \"%s\",\n", $1, $1}'
|
||||
|
||||
echo '}'
|
||||
}
|
||||
|
||||
generate_file > msgtypes.go
|
||||
|
||||
gofmt -w msgtypes.go
|
||||
92
helper/raftutil/msgpack.go
Normal file
92
helper/raftutil/msgpack.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package raftutil
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// fixTime converts any suspected time.Time binary string representation to time.Time
|
||||
func fixTime(v interface{}) {
|
||||
switch v2 := v.(type) {
|
||||
case map[string]interface{}:
|
||||
for ek, ev := range v2 {
|
||||
if s, ok := ev.(string); ok {
|
||||
t, err := maybeDecodeTime(s)
|
||||
if err == nil && isReasonableTime(t) {
|
||||
v2[ek] = *t
|
||||
}
|
||||
} else {
|
||||
fixTime(ev)
|
||||
}
|
||||
}
|
||||
case []interface{}:
|
||||
for _, e := range v2 {
|
||||
fixTime(e)
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// maybeDecodeTime returns a time.Time representation if the string represents a msgpack
|
||||
// representation of a date.
|
||||
func maybeDecodeTime(v string) (*time.Time, error) {
|
||||
if isASCII(v) {
|
||||
return nil, fmt.Errorf("simple ascii string")
|
||||
}
|
||||
|
||||
tt := &time.Time{}
|
||||
var err error
|
||||
|
||||
err = tt.UnmarshalBinary([]byte(v))
|
||||
if err == nil {
|
||||
return tt, nil
|
||||
}
|
||||
|
||||
switch len(v) {
|
||||
case 4, 8, 12:
|
||||
default:
|
||||
return nil, fmt.Errorf("bad length: %d", len(v))
|
||||
}
|
||||
|
||||
var nb bytes.Buffer
|
||||
err = codec.NewEncoder(&nb, structs.MsgpackHandle).Encode(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = codec.NewDecoder(&nb, structs.MsgpackHandle).Decode(tt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tt, nil
|
||||
}
|
||||
|
||||
// isASCII returns true if all string characters are ASCII characters
|
||||
func isASCII(s string) bool {
|
||||
for i := 0; i < len(s); i++ {
|
||||
if s[i] > unicode.MaxASCII {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// isReasonableTime returns true if the time is within some N years of current time
|
||||
//
|
||||
// It's can be used to rule out bad date interpretation (e.g. dates million years away).
|
||||
func isReasonableTime(t *time.Time) bool {
|
||||
if t.IsZero() {
|
||||
return true
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
return t.Before(now.AddDate(20, 0, 0)) && t.After(now.AddDate(-20, 0, 0))
|
||||
}
|
||||
76
helper/raftutil/msgpack_test.go
Normal file
76
helper/raftutil/msgpack_test.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package raftutil
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMaybeDecodeTimeIgnoresASCII(t *testing.T) {
|
||||
cases := []string{
|
||||
"127.0.0.1/32",
|
||||
"host",
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c, func(t *testing.T) {
|
||||
tt, err := maybeDecodeTime(c)
|
||||
fmt.Println(tt)
|
||||
require.Nil(t, tt)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodesTime(t *testing.T) {
|
||||
type Value struct {
|
||||
CreateTime time.Time
|
||||
Mode string
|
||||
}
|
||||
now := time.Now().Truncate(time.Second)
|
||||
v := Value{
|
||||
CreateTime: now,
|
||||
Mode: "host",
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(v)
|
||||
require.NoError(t, err)
|
||||
|
||||
var r map[string]interface{}
|
||||
err = codec.NewDecoder(&buf, structs.MsgpackHandle).Decode(&r)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, "host", r["Mode"])
|
||||
require.IsType(t, "", r["CreateTime"])
|
||||
|
||||
fixTime(r)
|
||||
|
||||
expected := map[string]interface{}{
|
||||
"CreateTime": now,
|
||||
"Mode": "host",
|
||||
}
|
||||
require.Equal(t, expected, r)
|
||||
}
|
||||
|
||||
func TestMyDate(t *testing.T) {
|
||||
handler := &codec.MsgpackHandle{}
|
||||
handler.TimeNotBuiltin = true
|
||||
|
||||
d := time.Date(2025, 7, 10, 8, 1, 56, 0, time.UTC)
|
||||
|
||||
var buf bytes.Buffer
|
||||
err := codec.NewEncoder(&buf, handler).Encode(d)
|
||||
require.NoError(t, err)
|
||||
|
||||
var s string
|
||||
err = codec.NewDecoder(&buf, handler).Decode(&s)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Printf("Original: %q\nround trips: %q\n", d, s)
|
||||
}
|
||||
48
helper/raftutil/msgtypes.go
Normal file
48
helper/raftutil/msgtypes.go
Normal file
@@ -0,0 +1,48 @@
|
||||
// Code generated by go generate; DO NOT EDIT.
|
||||
package raftutil
|
||||
|
||||
import "github.com/hashicorp/nomad/nomad/structs"
|
||||
|
||||
var msgTypeNames = map[structs.MessageType]string{
|
||||
structs.NodeRegisterRequestType: "NodeRegisterRequestType",
|
||||
structs.NodeDeregisterRequestType: "NodeDeregisterRequestType",
|
||||
structs.NodeUpdateStatusRequestType: "NodeUpdateStatusRequestType",
|
||||
structs.NodeUpdateDrainRequestType: "NodeUpdateDrainRequestType",
|
||||
structs.JobRegisterRequestType: "JobRegisterRequestType",
|
||||
structs.JobDeregisterRequestType: "JobDeregisterRequestType",
|
||||
structs.EvalUpdateRequestType: "EvalUpdateRequestType",
|
||||
structs.EvalDeleteRequestType: "EvalDeleteRequestType",
|
||||
structs.AllocUpdateRequestType: "AllocUpdateRequestType",
|
||||
structs.AllocClientUpdateRequestType: "AllocClientUpdateRequestType",
|
||||
structs.ReconcileJobSummariesRequestType: "ReconcileJobSummariesRequestType",
|
||||
structs.VaultAccessorRegisterRequestType: "VaultAccessorRegisterRequestType",
|
||||
structs.VaultAccessorDeregisterRequestType: "VaultAccessorDeregisterRequestType",
|
||||
structs.ApplyPlanResultsRequestType: "ApplyPlanResultsRequestType",
|
||||
structs.DeploymentStatusUpdateRequestType: "DeploymentStatusUpdateRequestType",
|
||||
structs.DeploymentPromoteRequestType: "DeploymentPromoteRequestType",
|
||||
structs.DeploymentAllocHealthRequestType: "DeploymentAllocHealthRequestType",
|
||||
structs.DeploymentDeleteRequestType: "DeploymentDeleteRequestType",
|
||||
structs.JobStabilityRequestType: "JobStabilityRequestType",
|
||||
structs.ACLPolicyUpsertRequestType: "ACLPolicyUpsertRequestType",
|
||||
structs.ACLPolicyDeleteRequestType: "ACLPolicyDeleteRequestType",
|
||||
structs.ACLTokenUpsertRequestType: "ACLTokenUpsertRequestType",
|
||||
structs.ACLTokenDeleteRequestType: "ACLTokenDeleteRequestType",
|
||||
structs.ACLTokenBootstrapRequestType: "ACLTokenBootstrapRequestType",
|
||||
structs.AutopilotRequestType: "AutopilotRequestType",
|
||||
structs.UpsertNodeEventsType: "UpsertNodeEventsType",
|
||||
structs.JobBatchDeregisterRequestType: "JobBatchDeregisterRequestType",
|
||||
structs.AllocUpdateDesiredTransitionRequestType: "AllocUpdateDesiredTransitionRequestType",
|
||||
structs.NodeUpdateEligibilityRequestType: "NodeUpdateEligibilityRequestType",
|
||||
structs.BatchNodeUpdateDrainRequestType: "BatchNodeUpdateDrainRequestType",
|
||||
structs.SchedulerConfigRequestType: "SchedulerConfigRequestType",
|
||||
structs.NodeBatchDeregisterRequestType: "NodeBatchDeregisterRequestType",
|
||||
structs.ClusterMetadataRequestType: "ClusterMetadataRequestType",
|
||||
structs.ServiceIdentityAccessorRegisterRequestType: "ServiceIdentityAccessorRegisterRequestType",
|
||||
structs.ServiceIdentityAccessorDeregisterRequestType: "ServiceIdentityAccessorDeregisterRequestType",
|
||||
structs.CSIVolumeRegisterRequestType: "CSIVolumeRegisterRequestType",
|
||||
structs.CSIVolumeDeregisterRequestType: "CSIVolumeDeregisterRequestType",
|
||||
structs.CSIVolumeClaimRequestType: "CSIVolumeClaimRequestType",
|
||||
structs.ScalingEventRegisterRequestType: "ScalingEventRegisterRequestType",
|
||||
structs.CSIVolumeClaimBatchRequestType: "CSIVolumeClaimBatchRequestType",
|
||||
structs.CSIPluginDeleteRequestType: "CSIPluginDeleteRequestType",
|
||||
}
|
||||
162
helper/raftutil/state.go
Normal file
162
helper/raftutil/state.go
Normal file
@@ -0,0 +1,162 @@
|
||||
package raftutil
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/raft"
|
||||
raftboltdb "github.com/hashicorp/raft-boltdb"
|
||||
)
|
||||
|
||||
// RaftStateInfo returns info about the nomad state, as found in the passed data-dir directory
|
||||
func RaftStateInfo(p string) (store *raftboltdb.BoltStore, firstIdx uint64, lastIdx uint64, err error) {
|
||||
s, err := raftboltdb.NewBoltStore(p)
|
||||
if err != nil {
|
||||
return nil, 0, 0, fmt.Errorf("failed to open raft logs: %v", err)
|
||||
}
|
||||
|
||||
firstIdx, err = s.FirstIndex()
|
||||
if err != nil {
|
||||
return nil, 0, 0, fmt.Errorf("failed to fetch first index: %v", err)
|
||||
}
|
||||
|
||||
lastIdx, err = s.LastIndex()
|
||||
if err != nil {
|
||||
return nil, 0, 0, fmt.Errorf("failed to fetch last index: %v", err)
|
||||
}
|
||||
|
||||
return s, firstIdx, lastIdx, nil
|
||||
}
|
||||
|
||||
// LogEntries returns the log entries as found in raft log in the passed data-dir directory
|
||||
func LogEntries(p string) (logs []interface{}, warnings []error, err error) {
|
||||
store, firstIdx, lastIdx, err := RaftStateInfo(p)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to open raft logs: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
result := make([]interface{}, 0, lastIdx-firstIdx+1)
|
||||
for i := firstIdx; i <= lastIdx; i++ {
|
||||
var e raft.Log
|
||||
err := store.GetLog(i, &e)
|
||||
if err != nil {
|
||||
warnings = append(warnings, fmt.Errorf("failed to read log entry at index %d: %v", i, err))
|
||||
continue
|
||||
}
|
||||
|
||||
m, err := decode(&e)
|
||||
if err != nil {
|
||||
warnings = append(warnings, fmt.Errorf("failed to decode log entry at index %d: %v", i, err))
|
||||
continue
|
||||
}
|
||||
|
||||
result = append(result, m)
|
||||
}
|
||||
|
||||
return result, warnings, nil
|
||||
}
|
||||
|
||||
type logMessage struct {
|
||||
LogType string
|
||||
Term uint64
|
||||
Index uint64
|
||||
|
||||
CommandType string `json:",omitempty"`
|
||||
IgnoreUnknownTypeFlag bool `json:",omitempty"`
|
||||
Body interface{} `json:",omitempty"`
|
||||
}
|
||||
|
||||
func decode(e *raft.Log) (*logMessage, error) {
|
||||
m := &logMessage{
|
||||
LogType: logTypes[e.Type],
|
||||
Term: e.Term,
|
||||
Index: e.Index,
|
||||
}
|
||||
|
||||
if m.LogType == "" {
|
||||
m.LogType = fmt.Sprintf("%d", e.Type)
|
||||
}
|
||||
|
||||
var data []byte
|
||||
if e.Type == raft.LogCommand {
|
||||
if len(e.Data) == 0 {
|
||||
return nil, fmt.Errorf("command did not include data")
|
||||
}
|
||||
|
||||
msgType := structs.MessageType(e.Data[0])
|
||||
|
||||
m.CommandType = commandName(msgType & ^structs.IgnoreUnknownTypeFlag)
|
||||
m.IgnoreUnknownTypeFlag = (msgType & structs.IgnoreUnknownTypeFlag) != 0
|
||||
|
||||
data = e.Data[1:]
|
||||
} else {
|
||||
data = e.Data
|
||||
}
|
||||
|
||||
if len(data) != 0 {
|
||||
decoder := codec.NewDecoder(bytes.NewReader(data), structs.MsgpackHandle)
|
||||
|
||||
var v interface{}
|
||||
var err error
|
||||
if m.CommandType == commandName(structs.JobBatchDeregisterRequestType) {
|
||||
var vr structs.JobBatchDeregisterRequest
|
||||
err = decoder.Decode(&vr)
|
||||
v = jsonifyJobBatchDeregisterRequest(&vr)
|
||||
} else {
|
||||
var vr interface{}
|
||||
err = decoder.Decode(&vr)
|
||||
v = vr
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to decode log entry at index %d: failed to decode body of %v.%v %v\n", e.Index, e.Type, m.CommandType, err)
|
||||
v = "FAILED TO DECODE DATA"
|
||||
}
|
||||
fixTime(v)
|
||||
m.Body = v
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// jsonifyJobBatchDeregisterRequest special case JsonBatchDeregisterRequest object
|
||||
// as the actual type is not json friendly.
|
||||
func jsonifyJobBatchDeregisterRequest(v *structs.JobBatchDeregisterRequest) interface{} {
|
||||
var data struct {
|
||||
Jobs map[string]*structs.JobDeregisterOptions
|
||||
Evals []*structs.Evaluation
|
||||
structs.WriteRequest
|
||||
}
|
||||
data.Evals = v.Evals
|
||||
data.WriteRequest = v.WriteRequest
|
||||
|
||||
data.Jobs = make(map[string]*structs.JobDeregisterOptions, len(v.Jobs))
|
||||
if len(v.Jobs) != 0 {
|
||||
for k, v := range v.Jobs {
|
||||
data.Jobs[k.Namespace+"."+k.ID] = v
|
||||
}
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
var logTypes = map[raft.LogType]string{
|
||||
raft.LogCommand: "LogCommand",
|
||||
raft.LogNoop: "LogNoop",
|
||||
raft.LogAddPeerDeprecated: "LogAddPeerDeprecated",
|
||||
raft.LogRemovePeerDeprecated: "LogRemovePeerDeprecated",
|
||||
raft.LogBarrier: "LogBarrier",
|
||||
raft.LogConfiguration: "LogConfiguration",
|
||||
}
|
||||
|
||||
func commandName(mt structs.MessageType) string {
|
||||
n := msgTypeNames[mt]
|
||||
if n != "" {
|
||||
return n
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%v", mt)
|
||||
}
|
||||
Reference in New Issue
Block a user