mirror of
https://github.com/kemko/nomad.git
synced 2026-01-09 11:55:42 +03:00
nomad: Add ACL Token snapshot/restore to FSM
This commit is contained in:
42
nomad/fsm.go
42
nomad/fsm.go
@@ -42,6 +42,7 @@ const (
|
||||
JobVersionSnapshot
|
||||
DeploymentSnapshot
|
||||
ACLPolicySnapshot
|
||||
ACLTokenSnapshot
|
||||
)
|
||||
|
||||
// nomadFSM implements a finite state machine that is used
|
||||
@@ -870,6 +871,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
|
||||
return err
|
||||
}
|
||||
|
||||
case ACLTokenSnapshot:
|
||||
token := new(structs.ACLToken)
|
||||
if err := dec.Decode(token); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.ACLTokenRestore(token); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unrecognized snapshot type: %v", msgType)
|
||||
}
|
||||
@@ -1080,6 +1090,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
if err := s.persistACLTokens(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1384,6 +1398,34 @@ func (s *nomadSnapshot) persistACLPolicies(sink raft.SnapshotSink,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
// Get all the policies
|
||||
ws := memdb.NewWatchSet()
|
||||
tokens, err := s.snap.ACLTokens(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
// Get the next item
|
||||
raw := tokens.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Prepare the request struct
|
||||
token := raw.(*structs.ACLToken)
|
||||
|
||||
// Write out a token registration
|
||||
sink.Write([]byte{byte(ACLTokenSnapshot)})
|
||||
if err := encoder.Encode(token); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Release is a no-op, as we just need to GC the pointer
|
||||
// to the state store snapshot. There is nothing to explicitly
|
||||
// cleanup.
|
||||
|
||||
@@ -1931,6 +1931,25 @@ func TestFSM_SnapshotRestore_ACLPolicy(t *testing.T) {
|
||||
assert.Equal(t, p2, out2)
|
||||
}
|
||||
|
||||
func TestFSM_SnapshotRestore_ACLTokens(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Add some state
|
||||
fsm := testFSM(t)
|
||||
state := fsm.State()
|
||||
tk1 := mock.ACLToken()
|
||||
tk2 := mock.ACLToken()
|
||||
state.UpsertACLTokens(1000, []*structs.ACLToken{tk1, tk2})
|
||||
|
||||
// Verify the contents
|
||||
fsm2 := testSnapshotRestore(t, fsm)
|
||||
state2 := fsm2.State()
|
||||
ws := memdb.NewWatchSet()
|
||||
out1, _ := state2.ACLTokenByPublicID(ws, tk1.AccessorID)
|
||||
out2, _ := state2.ACLTokenByPublicID(ws, tk2.AccessorID)
|
||||
assert.Equal(t, tk1, out1)
|
||||
assert.Equal(t, tk2, out2)
|
||||
}
|
||||
|
||||
func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Add some state
|
||||
|
||||
Reference in New Issue
Block a user