mirror of
https://github.com/kemko/nomad.git
synced 2026-01-09 20:05:42 +03:00
Merge pull request #8036 from hashicorp/f-background-vault-revoke-on-restore
Speed up leadership establishment
This commit is contained in:
@@ -95,6 +95,9 @@ type ConsulACLsAPI interface {
|
||||
// RevokeTokens instructs Consul to revoke the given token accessors.
|
||||
RevokeTokens(context.Context, []*structs.SITokenAccessor, bool) bool
|
||||
|
||||
// MarkForRevocation marks the tokens for background revocation
|
||||
MarkForRevocation([]*structs.SITokenAccessor)
|
||||
|
||||
// Stop is used to stop background token revocations. Intended to be used
|
||||
// on Nomad Server shutdown.
|
||||
Stop()
|
||||
@@ -143,6 +146,10 @@ type consulACLsAPI struct {
|
||||
}
|
||||
|
||||
func NewConsulACLsAPI(aclClient consul.ACLsAPI, logger hclog.Logger, purgeFunc PurgeSITokenAccessorFunc) *consulACLsAPI {
|
||||
if purgeFunc == nil {
|
||||
purgeFunc = func([]*structs.SITokenAccessor) error { return nil }
|
||||
}
|
||||
|
||||
c := &consulACLsAPI{
|
||||
aclClient: aclClient,
|
||||
limiter: rate.NewLimiter(siTokenRequestRateLimit, int(siTokenRequestRateLimit)),
|
||||
@@ -285,6 +292,10 @@ func (c *consulACLsAPI) RevokeTokens(ctx context.Context, accessors []*structs.S
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *consulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) {
|
||||
c.storeForRevocation(accessors)
|
||||
}
|
||||
|
||||
func (c *consulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor) {
|
||||
c.bgRevokeLock.Lock()
|
||||
defer c.bgRevokeLock.Unlock()
|
||||
@@ -369,6 +380,10 @@ func (c *consulACLsAPI) bgRetryRevokeDaemon() {
|
||||
}
|
||||
}
|
||||
|
||||
// maxConsulRevocationBatchSize is the maximum tokens a bgRetryRevoke should revoke
|
||||
// at any given time.
|
||||
const maxConsulRevocationBatchSize = 1000
|
||||
|
||||
func (c *consulACLsAPI) bgRetryRevoke() {
|
||||
c.bgRevokeLock.Lock()
|
||||
defer c.bgRevokeLock.Unlock()
|
||||
@@ -380,7 +395,11 @@ func (c *consulACLsAPI) bgRetryRevoke() {
|
||||
|
||||
// unlike vault tokens, SI tokens do not have a TTL, and so we must try to
|
||||
// remove all SI token accessors, every time, until they're gone
|
||||
toPurge := make([]*structs.SITokenAccessor, len(c.bgRetryRevocation), len(c.bgRetryRevocation))
|
||||
toRevoke := len(c.bgRetryRevocation)
|
||||
if toRevoke > maxConsulRevocationBatchSize {
|
||||
toRevoke = maxConsulRevocationBatchSize
|
||||
}
|
||||
toPurge := make([]*structs.SITokenAccessor, toRevoke)
|
||||
copy(toPurge, c.bgRetryRevocation)
|
||||
|
||||
if err := c.parallelRevoke(context.Background(), toPurge); err != nil {
|
||||
|
||||
@@ -65,6 +65,14 @@ func (mps *mockPurgingServer) purgeFunc(accessors []*structs.SITokenAccessor) er
|
||||
}
|
||||
|
||||
func (m *mockConsulACLsAPI) RevokeTokens(_ context.Context, accessors []*structs.SITokenAccessor, committed bool) bool {
|
||||
return m.storeForRevocation(accessors, committed)
|
||||
}
|
||||
|
||||
func (m *mockConsulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) {
|
||||
m.storeForRevocation(accessors, true)
|
||||
}
|
||||
|
||||
func (m *mockConsulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor, committed bool) bool {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
@@ -168,6 +176,31 @@ func TestConsulACLsAPI_RevokeTokens(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestConsulACLsAPI_MarkForRevocation(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
logger := testlog.HCLogger(t)
|
||||
aclAPI := consul.NewMockACLsAPI(logger)
|
||||
|
||||
c := NewConsulACLsAPI(aclAPI, logger, nil)
|
||||
|
||||
generated, err := c.CreateToken(context.Background(), ServiceIdentityRequest{
|
||||
ClusterID: uuid.Generate(),
|
||||
AllocID: uuid.Generate(),
|
||||
TaskName: "task1-sidecar-proxy",
|
||||
TaskKind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service1"),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// set the mock error after calling CreateToken for setting up
|
||||
aclAPI.SetError(nil)
|
||||
|
||||
accessors := []*structs.SITokenAccessor{{AccessorID: generated.AccessorID}}
|
||||
c.MarkForRevocation(accessors)
|
||||
require.Len(t, c.bgRetryRevocation, 1)
|
||||
require.Contains(t, c.bgRetryRevocation, accessors[0])
|
||||
}
|
||||
|
||||
func TestConsulACLsAPI_bgRetryRevoke(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
@@ -254,19 +254,17 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
|
||||
|
||||
// Activate the vault client
|
||||
s.vault.SetActive(true)
|
||||
// Cleanup orphaned Vault token accessors
|
||||
if err := s.revokeVaultAccessorsOnRestore(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Cleanup orphaned Service Identity token accessors
|
||||
if err := s.revokeSITokenAccessorsOnRestore(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Enable the periodic dispatcher, since we are now the leader.
|
||||
s.periodicDispatcher.SetEnabled(true)
|
||||
|
||||
// Activate RPC now that local FSM caught up with Raft (as evident by Barrier call success)
|
||||
// and all leader related components (e.g. broker queue) are enabled.
|
||||
// Auxiliary processes (e.g. background, bookkeeping, and cleanup tasks can start after)
|
||||
s.setConsistentReadReady()
|
||||
|
||||
// Further clean ups and follow up that don't block RPC consistency
|
||||
|
||||
// Restore the periodic dispatcher state
|
||||
if err := s.restorePeriodicDispatcher(); err != nil {
|
||||
return err
|
||||
@@ -316,7 +314,15 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.setConsistentReadReady()
|
||||
// Cleanup orphaned Vault token accessors
|
||||
if err := s.revokeVaultAccessorsOnRestore(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Cleanup orphaned Service Identity token accessors
|
||||
if err := s.revokeSITokenAccessorsOnRestore(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -393,7 +399,9 @@ func (s *Server) revokeVaultAccessorsOnRestore() error {
|
||||
}
|
||||
|
||||
if len(revoke) != 0 {
|
||||
if err := s.vault.RevokeTokens(context.Background(), revoke, true); err != nil {
|
||||
s.logger.Info("revoking vault accessors after becoming leader", "accessors", len(revoke))
|
||||
|
||||
if err := s.vault.MarkForRevocation(revoke); err != nil {
|
||||
return fmt.Errorf("failed to revoke tokens: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -439,8 +447,8 @@ func (s *Server) revokeSITokenAccessorsOnRestore() error {
|
||||
}
|
||||
|
||||
if len(toRevoke) > 0 {
|
||||
ctx := context.Background()
|
||||
s.consulACLs.RevokeTokens(ctx, toRevoke, true)
|
||||
s.logger.Info("revoking consul accessors after becoming leader", "accessors", len(toRevoke))
|
||||
s.consulACLs.MarkForRevocation(toRevoke)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -121,6 +121,9 @@ type VaultClient interface {
|
||||
// RevokeTokens takes a set of tokens accessor and revokes the tokens
|
||||
RevokeTokens(ctx context.Context, accessors []*structs.VaultAccessor, committed bool) error
|
||||
|
||||
// MarkForRevocation revokes the tokens in background
|
||||
MarkForRevocation(accessors []*structs.VaultAccessor) error
|
||||
|
||||
// Stop is used to stop token renewal
|
||||
Stop()
|
||||
|
||||
@@ -249,6 +252,9 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault
|
||||
if logger == nil {
|
||||
return nil, fmt.Errorf("must pass valid logger")
|
||||
}
|
||||
if purgeFn == nil {
|
||||
purgeFn = func(accessors []*structs.VaultAccessor) error { return nil }
|
||||
}
|
||||
|
||||
v := &vaultClient{
|
||||
config: c,
|
||||
@@ -1128,6 +1134,19 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *vaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error {
|
||||
if !v.Enabled() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !v.Active() {
|
||||
return fmt.Errorf("Vault client not active")
|
||||
}
|
||||
|
||||
v.storeForRevocation(accessors)
|
||||
return nil
|
||||
}
|
||||
|
||||
// storeForRevocation stores the passed set of accessors for revocation. It
|
||||
// captures their effective TTL by storing their create TTL plus the current
|
||||
// time.
|
||||
@@ -1207,6 +1226,16 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
// maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke
|
||||
// at any given time.
|
||||
//
|
||||
// Limiting the revocation batch size is beneficial for few reasons:
|
||||
// * A single revocation failure of any entry in batch result into retrying the whole batch;
|
||||
// the larger the batch is the higher likelihood of such failure
|
||||
// * Smaller batch sizes result into more co-operativeness: provides hooks for
|
||||
// reconsidering token TTL and leadership steps down.
|
||||
const maxVaultRevokeBatchSize = 1000
|
||||
|
||||
// revokeDaemon should be called in a goroutine and is used to periodically
|
||||
// revoke Vault accessors that failed the original revocation
|
||||
func (v *vaultClient) revokeDaemon() {
|
||||
@@ -1231,13 +1260,22 @@ func (v *vaultClient) revokeDaemon() {
|
||||
}
|
||||
|
||||
// Build the list of accessors that need to be revoked while pruning any TTL'd checks
|
||||
revoking := make([]*structs.VaultAccessor, 0, len(v.revoking))
|
||||
toRevoke := len(v.revoking)
|
||||
if toRevoke > maxVaultRevokeBatchSize {
|
||||
toRevoke = maxVaultRevokeBatchSize
|
||||
}
|
||||
revoking := make([]*structs.VaultAccessor, 0, toRevoke)
|
||||
ttlExpired := []*structs.VaultAccessor{}
|
||||
for va, ttl := range v.revoking {
|
||||
if now.After(ttl) {
|
||||
delete(v.revoking, va)
|
||||
ttlExpired = append(ttlExpired, va)
|
||||
} else {
|
||||
revoking = append(revoking, va)
|
||||
}
|
||||
|
||||
if len(revoking) >= toRevoke {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err := v.parallelRevoke(context.Background(), revoking); err != nil {
|
||||
@@ -1249,6 +1287,10 @@ func (v *vaultClient) revokeDaemon() {
|
||||
// Unlock before a potentially expensive operation
|
||||
v.revLock.Unlock()
|
||||
|
||||
// purge all explicitly revoked as well as ttl expired tokens
|
||||
// and only remove them locally on purge success
|
||||
revoking = append(revoking, ttlExpired...)
|
||||
|
||||
// Call the passed in token revocation function
|
||||
if err := v.purgeFn(revoking); err != nil {
|
||||
// Can continue since revocation is idempotent
|
||||
|
||||
@@ -1365,6 +1365,33 @@ func TestVaultClient_CreateToken_Prestart(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestVaultClient_MarkForRevocation(t *testing.T) {
|
||||
vconfig := &config.VaultConfig{
|
||||
Enabled: helper.BoolToPtr(true),
|
||||
Token: uuid.Generate(),
|
||||
Addr: "http://127.0.0.1:0",
|
||||
}
|
||||
logger := testlog.HCLogger(t)
|
||||
client, err := NewVaultClient(vconfig, logger, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
client.SetActive(true)
|
||||
defer client.Stop()
|
||||
|
||||
// Create some VaultAccessors
|
||||
vas := []*structs.VaultAccessor{
|
||||
mock.VaultAccessor(),
|
||||
mock.VaultAccessor(),
|
||||
}
|
||||
|
||||
err = client.MarkForRevocation(vas)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wasn't committed
|
||||
require.Len(t, client.revoking, 2)
|
||||
require.Equal(t, 2, client.stats().TrackedForRevoke)
|
||||
|
||||
}
|
||||
func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
|
||||
t.Parallel()
|
||||
vconfig := &config.VaultConfig{
|
||||
|
||||
@@ -135,6 +135,11 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *TestVaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error {
|
||||
v.RevokedTokens = append(v.RevokedTokens, accessors...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *TestVaultClient) Stop() {}
|
||||
func (v *TestVaultClient) SetActive(enabled bool) {}
|
||||
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
|
||||
|
||||
Reference in New Issue
Block a user