mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
core: add expired token garbage collection periodic jobs. (#13805)
Two new periodic core jobs have been added which handle removing expired local and global tokens from state. The local core job is run on every leader; the global core job is only run on the leader within the authoritative region.
This commit is contained in:
@@ -51,6 +51,10 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error {
|
||||
return c.csiPluginGC(eval)
|
||||
case structs.CoreJobOneTimeTokenGC:
|
||||
return c.expiredOneTimeTokenGC(eval)
|
||||
case structs.CoreJobLocalTokenExpiredGC:
|
||||
return c.expiredACLTokenGC(eval, false)
|
||||
case structs.CoreJobGlobalTokenExpiredGC:
|
||||
return c.expiredACLTokenGC(eval, true)
|
||||
case structs.CoreJobForceGC:
|
||||
return c.forceGC(eval)
|
||||
default:
|
||||
@@ -78,6 +82,13 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error {
|
||||
if err := c.expiredOneTimeTokenGC(eval); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.expiredACLTokenGC(eval, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.expiredACLTokenGC(eval, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Node GC must occur after the others to ensure the allocations are
|
||||
// cleared.
|
||||
return c.nodeGC(eval)
|
||||
@@ -773,6 +784,100 @@ func (c *CoreScheduler) expiredOneTimeTokenGC(eval *structs.Evaluation) error {
|
||||
return c.srv.RPC("ACL.ExpireOneTimeTokens", req, &structs.GenericResponse{})
|
||||
}
|
||||
|
||||
// expiredACLTokenGC handles running the garbage collector for expired ACL
|
||||
// tokens. It can be used for both local and global tokens and includes
|
||||
// behaviour to account for periodic and user actioned garbage collection
|
||||
// invocations.
|
||||
func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool) error {
|
||||
|
||||
// If ACLs are not enabled, we do not need to continue and should exit
|
||||
// early. This is not an error condition as callers can blindly call this
|
||||
// function without checking the configuration. If the caller wants this to
|
||||
// be an error, they should check this config value themselves.
|
||||
if !c.srv.config.ACLEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the function has been triggered for global tokens, but we are not the
|
||||
// authoritative region, we should exit. This is not an error condition as
|
||||
// callers can blindly call this function without checking the
|
||||
// configuration. If the caller wants this to be an error, they should
|
||||
// check this config value themselves.
|
||||
if global && c.srv.config.AuthoritativeRegion != c.srv.Region() {
|
||||
return nil
|
||||
}
|
||||
|
||||
expiryThresholdIdx := c.getThreshold(eval, "expired_acl_token",
|
||||
"acl_token_expiration_gc_threshold", c.srv.config.ACLTokenExpirationGCThreshold)
|
||||
|
||||
expiredIter, err := c.snap.ACLTokensByExpired(global)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
expiredAccessorIDs []string
|
||||
num int
|
||||
)
|
||||
|
||||
// The memdb iterator contains all tokens which include an expiration time,
|
||||
// however, as the caller, we do not know at which point in the array the
|
||||
// tokens are no longer expired. This time therefore forms the basis at
|
||||
// which we draw the line in the iteration loop and find the final expired
|
||||
// token that is eligible for deletion.
|
||||
now := time.Now().UTC()
|
||||
|
||||
for raw := expiredIter.Next(); raw != nil; raw = expiredIter.Next() {
|
||||
token := raw.(*structs.ACLToken)
|
||||
|
||||
// The iteration order of the indexes mean if we come across an
|
||||
// unexpired token, we can exit as we have found all currently expired
|
||||
// tokens.
|
||||
if !token.IsExpired(now) {
|
||||
break
|
||||
}
|
||||
|
||||
// Check if the token is recent enough to skip, otherwise we'll delete
|
||||
// it.
|
||||
if token.CreateIndex > expiryThresholdIdx {
|
||||
continue
|
||||
}
|
||||
|
||||
// Add the token accessor ID to the tracking array, thus marking it
|
||||
// ready for deletion.
|
||||
expiredAccessorIDs = append(expiredAccessorIDs, token.AccessorID)
|
||||
|
||||
// Increment the counter. If this is at or above our limit, we return
|
||||
// what we have so far.
|
||||
if num++; num >= structs.ACLMaxExpiredBatchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// There is no need to call the RPC endpoint if we do not have any tokens
|
||||
// to delete.
|
||||
if len(expiredAccessorIDs) < 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Log a nice, friendly debug message which could be useful when debugging
|
||||
// garbage collection in environments with a high rate of token creation
|
||||
// and expiration.
|
||||
c.logger.Debug("expired ACL token GC found eligible tokens",
|
||||
"num", len(expiredAccessorIDs))
|
||||
|
||||
// Set up and make the RPC request which will return any error performing
|
||||
// the deletion.
|
||||
req := structs.ACLTokenDeleteRequest{
|
||||
AccessorIDs: expiredAccessorIDs,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.srv.Region(),
|
||||
AuthToken: eval.LeaderACL,
|
||||
},
|
||||
}
|
||||
return c.srv.RPC(structs.ACLDeleteTokensRPCMethod, req, &structs.GenericResponse{})
|
||||
}
|
||||
|
||||
// getThreshold returns the index threshold for determining whether an
|
||||
// object is old enough to GC
|
||||
func (c *CoreScheduler) getThreshold(eval *structs.Evaluation, objectName, configName string, configThreshold time.Duration) uint64 {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/helper/pointer"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
@@ -2513,3 +2514,165 @@ func TestCoreScheduler_FailLoop(t *testing.T) {
|
||||
out.TriggeredBy)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoreScheduler_ExpiredACLTokenGC(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
testServer, rootACLToken, testServerShutdown := TestACLServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0
|
||||
})
|
||||
defer testServerShutdown()
|
||||
testutil.WaitForLeader(t, testServer.RPC)
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
// Craft some specific local and global tokens. For each type, one is
|
||||
// expired, one is not.
|
||||
expiredGlobal := mock.ACLToken()
|
||||
expiredGlobal.Global = true
|
||||
expiredGlobal.ExpirationTime = pointer.Of(now.Add(-2 * time.Hour))
|
||||
|
||||
unexpiredGlobal := mock.ACLToken()
|
||||
unexpiredGlobal.Global = true
|
||||
unexpiredGlobal.ExpirationTime = pointer.Of(now.Add(2 * time.Hour))
|
||||
|
||||
expiredLocal := mock.ACLToken()
|
||||
expiredLocal.ExpirationTime = pointer.Of(now.Add(-2 * time.Hour))
|
||||
|
||||
unexpiredLocal := mock.ACLToken()
|
||||
unexpiredLocal.ExpirationTime = pointer.Of(now.Add(2 * time.Hour))
|
||||
|
||||
// Upsert these into state.
|
||||
err := testServer.State().UpsertACLTokens(structs.MsgTypeTestSetup, 10, []*structs.ACLToken{
|
||||
expiredGlobal, unexpiredGlobal, expiredLocal, unexpiredLocal,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Overwrite the timetable. The existing timetable has an entry due to the
|
||||
// ACL bootstrapping which makes witnessing a new index at a timestamp in
|
||||
// the past impossible.
|
||||
tt := NewTimeTable(timeTableGranularity, timeTableLimit)
|
||||
tt.Witness(20, time.Now().UTC().Add(-1*testServer.config.ACLTokenExpirationGCThreshold))
|
||||
testServer.fsm.timetable = tt
|
||||
|
||||
// Generate the core scheduler.
|
||||
snap, err := testServer.State().Snapshot()
|
||||
require.NoError(t, err)
|
||||
coreScheduler := NewCoreScheduler(testServer, snap)
|
||||
|
||||
// Trigger global and local periodic garbage collection runs.
|
||||
index, err := testServer.State().LatestIndex()
|
||||
require.NoError(t, err)
|
||||
index++
|
||||
|
||||
globalGCEval := testServer.coreJobEval(structs.CoreJobGlobalTokenExpiredGC, index)
|
||||
require.NoError(t, coreScheduler.Process(globalGCEval))
|
||||
|
||||
localGCEval := testServer.coreJobEval(structs.CoreJobLocalTokenExpiredGC, index)
|
||||
require.NoError(t, coreScheduler.Process(localGCEval))
|
||||
|
||||
// Ensure the ACL tokens stored within state are as expected.
|
||||
iter, err := testServer.State().ACLTokens(nil, state.SortDefault)
|
||||
require.NoError(t, err)
|
||||
|
||||
var tokens []*structs.ACLToken
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
tokens = append(tokens, raw.(*structs.ACLToken))
|
||||
}
|
||||
require.ElementsMatch(t, []*structs.ACLToken{rootACLToken, unexpiredGlobal, unexpiredLocal}, tokens)
|
||||
}
|
||||
|
||||
func TestCoreScheduler_ExpiredACLTokenGC_Force(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
testServer, rootACLToken, testServerShutdown := TestACLServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0
|
||||
})
|
||||
defer testServerShutdown()
|
||||
testutil.WaitForLeader(t, testServer.RPC)
|
||||
|
||||
// This time is the threshold for all expiry calls to be based on. All
|
||||
// tokens with expiry can use this as their base and use Add().
|
||||
expiryTimeThreshold := time.Now().UTC()
|
||||
|
||||
// Track expired and non-expired tokens for local and global tokens in
|
||||
// separate arrays, so we have a clear way to test state.
|
||||
var expiredGlobalTokens, nonExpiredGlobalTokens, expiredLocalTokens, nonExpiredLocalTokens []*structs.ACLToken
|
||||
|
||||
// Add the root ACL token to the appropriate array. This will be returned
|
||||
// from state so must be accounted for and tested.
|
||||
nonExpiredGlobalTokens = append(nonExpiredGlobalTokens, rootACLToken)
|
||||
|
||||
// Generate and upsert a number of mixed expired, non-expired global
|
||||
// tokens.
|
||||
for i := 0; i < 20; i++ {
|
||||
mockedToken := mock.ACLToken()
|
||||
mockedToken.Global = true
|
||||
if i%2 == 0 {
|
||||
expiredGlobalTokens = append(expiredGlobalTokens, mockedToken)
|
||||
mockedToken.ExpirationTime = pointer.Of(expiryTimeThreshold.Add(-24 * time.Hour))
|
||||
} else {
|
||||
nonExpiredGlobalTokens = append(nonExpiredGlobalTokens, mockedToken)
|
||||
mockedToken.ExpirationTime = pointer.Of(expiryTimeThreshold.Add(24 * time.Hour))
|
||||
}
|
||||
}
|
||||
|
||||
// Generate and upsert a number of mixed expired, non-expired local
|
||||
// tokens.
|
||||
for i := 0; i < 20; i++ {
|
||||
mockedToken := mock.ACLToken()
|
||||
mockedToken.Global = false
|
||||
if i%2 == 0 {
|
||||
expiredLocalTokens = append(expiredLocalTokens, mockedToken)
|
||||
mockedToken.ExpirationTime = pointer.Of(expiryTimeThreshold.Add(-24 * time.Hour))
|
||||
} else {
|
||||
nonExpiredLocalTokens = append(nonExpiredLocalTokens, mockedToken)
|
||||
mockedToken.ExpirationTime = pointer.Of(expiryTimeThreshold.Add(24 * time.Hour))
|
||||
}
|
||||
}
|
||||
|
||||
allTokens := append(expiredGlobalTokens, nonExpiredGlobalTokens...)
|
||||
allTokens = append(allTokens, expiredLocalTokens...)
|
||||
allTokens = append(allTokens, nonExpiredLocalTokens...)
|
||||
|
||||
// Upsert them all.
|
||||
err := testServer.State().UpsertACLTokens(structs.MsgTypeTestSetup, 10, allTokens)
|
||||
require.NoError(t, err)
|
||||
|
||||
// This function provides an easy way to get all tokens out of the
|
||||
// iterator.
|
||||
fromIteratorFunc := func(iter memdb.ResultIterator) []*structs.ACLToken {
|
||||
var tokens []*structs.ACLToken
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
tokens = append(tokens, raw.(*structs.ACLToken))
|
||||
}
|
||||
return tokens
|
||||
}
|
||||
|
||||
// Check all the tokens are correctly stored within state.
|
||||
iter, err := testServer.State().ACLTokens(nil, state.SortDefault)
|
||||
require.NoError(t, err)
|
||||
|
||||
tokens := fromIteratorFunc(iter)
|
||||
require.ElementsMatch(t, allTokens, tokens)
|
||||
|
||||
// Generate the core scheduler and trigger a forced garbage collection
|
||||
// which should delete all expired tokens.
|
||||
snap, err := testServer.State().Snapshot()
|
||||
require.NoError(t, err)
|
||||
coreScheduler := NewCoreScheduler(testServer, snap)
|
||||
|
||||
index, err := testServer.State().LatestIndex()
|
||||
require.NoError(t, err)
|
||||
index++
|
||||
|
||||
forceGCEval := testServer.coreJobEval(structs.CoreJobForceGC, index)
|
||||
require.NoError(t, coreScheduler.Process(forceGCEval))
|
||||
|
||||
// List all the remaining ACL tokens to be sure they are as expected.
|
||||
iter, err = testServer.State().ACLTokens(nil, state.SortDefault)
|
||||
require.NoError(t, err)
|
||||
|
||||
tokens = fromIteratorFunc(iter)
|
||||
require.ElementsMatch(t, append(nonExpiredGlobalTokens, nonExpiredLocalTokens...), tokens)
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -341,7 +342,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Scheduler periodic jobs
|
||||
// Schedule periodic jobs which include expired local ACL token garbage
|
||||
// collection.
|
||||
go s.schedulePeriodic(stopCh)
|
||||
|
||||
// Reap any failed evaluations
|
||||
@@ -373,12 +375,22 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start replication of ACLs and Policies if they are enabled,
|
||||
// and we are not the authoritative region.
|
||||
if s.config.ACLEnabled && s.config.Region != s.config.AuthoritativeRegion {
|
||||
go s.replicateACLPolicies(stopCh)
|
||||
go s.replicateACLTokens(stopCh)
|
||||
go s.replicateNamespaces(stopCh)
|
||||
// If ACLs are enabled, the leader needs to start a number of long-lived
|
||||
// routines. Exactly which routines, depends on whether this leader is
|
||||
// running within the authoritative region or not.
|
||||
if s.config.ACLEnabled {
|
||||
|
||||
// The authoritative region is responsible for garbage collecting
|
||||
// expired global tokens. Otherwise, non-authoritative regions need to
|
||||
// replicate policies, tokens, and namespaces.
|
||||
switch s.config.AuthoritativeRegion {
|
||||
case s.config.Region:
|
||||
go s.schedulePeriodicAuthoritative(stopCh)
|
||||
default:
|
||||
go s.replicateACLPolicies(stopCh)
|
||||
go s.replicateACLTokens(stopCh)
|
||||
go s.replicateNamespaces(stopCh)
|
||||
}
|
||||
}
|
||||
|
||||
// Setup any enterprise systems required.
|
||||
@@ -762,43 +774,35 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
|
||||
oneTimeTokenGC := time.NewTicker(s.config.OneTimeTokenGCInterval)
|
||||
defer oneTimeTokenGC.Stop()
|
||||
|
||||
// getLatest grabs the latest index from the state store. It returns true if
|
||||
// the index was retrieved successfully.
|
||||
getLatest := func() (uint64, bool) {
|
||||
snapshotIndex, err := s.fsm.State().LatestIndex()
|
||||
if err != nil {
|
||||
s.logger.Error("failed to determine state store's index", "error", err)
|
||||
return 0, false
|
||||
}
|
||||
|
||||
return snapshotIndex, true
|
||||
}
|
||||
// Set up the expired ACL local token garbage collection timer.
|
||||
localTokenExpiredGC, localTokenExpiredGCStop := helper.NewSafeTimer(s.config.ACLTokenExpirationGCInterval)
|
||||
defer localTokenExpiredGCStop()
|
||||
|
||||
for {
|
||||
|
||||
select {
|
||||
case <-evalGC.C:
|
||||
if index, ok := getLatest(); ok {
|
||||
if index, ok := s.getLatestIndex(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, index))
|
||||
}
|
||||
case <-nodeGC.C:
|
||||
if index, ok := getLatest(); ok {
|
||||
if index, ok := s.getLatestIndex(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, index))
|
||||
}
|
||||
case <-jobGC.C:
|
||||
if index, ok := getLatest(); ok {
|
||||
if index, ok := s.getLatestIndex(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, index))
|
||||
}
|
||||
case <-deploymentGC.C:
|
||||
if index, ok := getLatest(); ok {
|
||||
if index, ok := s.getLatestIndex(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobDeploymentGC, index))
|
||||
}
|
||||
case <-csiPluginGC.C:
|
||||
if index, ok := getLatest(); ok {
|
||||
if index, ok := s.getLatestIndex(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIPluginGC, index))
|
||||
}
|
||||
case <-csiVolumeClaimGC.C:
|
||||
if index, ok := getLatest(); ok {
|
||||
if index, ok := s.getLatestIndex(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIVolumeClaimGC, index))
|
||||
}
|
||||
case <-oneTimeTokenGC.C:
|
||||
@@ -806,15 +810,55 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
|
||||
continue
|
||||
}
|
||||
|
||||
if index, ok := getLatest(); ok {
|
||||
if index, ok := s.getLatestIndex(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobOneTimeTokenGC, index))
|
||||
}
|
||||
case <-localTokenExpiredGC.C:
|
||||
if index, ok := s.getLatestIndex(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobLocalTokenExpiredGC, index))
|
||||
}
|
||||
localTokenExpiredGC.Reset(s.config.ACLTokenExpirationGCInterval)
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// schedulePeriodicAuthoritative is a long-lived routine intended for use on
|
||||
// the leader within the authoritative region only. It periodically queues work
|
||||
// onto the _core scheduler for ACL based activities such as removing expired
|
||||
// global ACL tokens.
|
||||
func (s *Server) schedulePeriodicAuthoritative(stopCh chan struct{}) {
|
||||
|
||||
// Set up the expired ACL global token garbage collection timer.
|
||||
globalTokenExpiredGC, globalTokenExpiredGCStop := helper.NewSafeTimer(s.config.ACLTokenExpirationGCInterval)
|
||||
defer globalTokenExpiredGCStop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-globalTokenExpiredGC.C:
|
||||
if index, ok := s.getLatestIndex(); ok {
|
||||
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobGlobalTokenExpiredGC, index))
|
||||
}
|
||||
globalTokenExpiredGC.Reset(s.config.ACLTokenExpirationGCInterval)
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getLatestIndex is a helper function which returns the latest index from the
|
||||
// state store. The boolean return indicates whether the call has been
|
||||
// successful or not.
|
||||
func (s *Server) getLatestIndex() (uint64, bool) {
|
||||
snapshotIndex, err := s.fsm.State().LatestIndex()
|
||||
if err != nil {
|
||||
s.logger.Error("failed to determine state store's index", "error", err)
|
||||
return 0, false
|
||||
}
|
||||
return snapshotIndex, true
|
||||
}
|
||||
|
||||
// coreJobEval returns an evaluation for a core job
|
||||
func (s *Server) coreJobEval(job string, modifyIndex uint64) *structs.Evaluation {
|
||||
return &structs.Evaluation{
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/shoenig/test/must"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -1665,6 +1666,27 @@ func waitForStableLeadership(t *testing.T, servers []*Server) *Server {
|
||||
return leader
|
||||
}
|
||||
|
||||
func TestServer_getLatestIndex(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
testServer, testServerCleanup := TestServer(t, nil)
|
||||
defer testServerCleanup()
|
||||
|
||||
// Test a new state store value.
|
||||
idx, success := testServer.getLatestIndex()
|
||||
require.True(t, success)
|
||||
must.Eq(t, 1, idx)
|
||||
|
||||
// Upsert something with a high index, and check again.
|
||||
err := testServer.State().UpsertACLPolicies(
|
||||
structs.MsgTypeTestSetup, 1013, []*structs.ACLPolicy{mock.ACLPolicy()})
|
||||
require.NoError(t, err)
|
||||
|
||||
idx, success = testServer.getLatestIndex()
|
||||
require.True(t, success)
|
||||
must.Eq(t, 1013, idx)
|
||||
}
|
||||
|
||||
func TestServer_handleEvalBrokerStateChange(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
|
||||
@@ -2,9 +2,8 @@ package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
// ACLTokensByExpired returns an array accessor IDs of expired ACL tokens.
|
||||
@@ -14,39 +13,14 @@ import (
|
||||
// the global boolean argument. The number of returned IDs can be limited by
|
||||
// the max integer, which is useful to limit the number of tokens we attempt to
|
||||
// delete in a single transaction.
|
||||
func (s *StateStore) ACLTokensByExpired(global bool, now time.Time, max int) ([]string, error) {
|
||||
func (s *StateStore) ACLTokensByExpired(global bool) (memdb.ResultIterator, error) {
|
||||
tnx := s.db.ReadTxn()
|
||||
|
||||
iter, err := tnx.Get("acl_token", expiresIndexName(global))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed acl token listing: %v", err)
|
||||
}
|
||||
|
||||
var (
|
||||
accessorIDs []string
|
||||
num int
|
||||
)
|
||||
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
token := raw.(*structs.ACLToken)
|
||||
|
||||
// The indexes mean if we come across an unexpired token, we can exit
|
||||
// as we have found all currently expired tokens.
|
||||
if !token.IsExpired(now) {
|
||||
return accessorIDs, nil
|
||||
}
|
||||
|
||||
accessorIDs = append(accessorIDs, token.AccessorID)
|
||||
|
||||
// Increment the counter. If this is at or above our limit, we return
|
||||
// what we have so far.
|
||||
num++
|
||||
if num >= max {
|
||||
return accessorIDs, nil
|
||||
}
|
||||
}
|
||||
|
||||
return accessorIDs, nil
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// expiresIndexName is a helper function to identify the correct ACL token
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/helper/pointer"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
@@ -15,6 +16,16 @@ func TestStateStore_ACLTokensByExpired(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
testState := testStateStore(t)
|
||||
|
||||
// This function provides an easy way to get all tokens out of the
|
||||
// iterator.
|
||||
fromIteratorFunc := func(iter memdb.ResultIterator) []*structs.ACLToken {
|
||||
var tokens []*structs.ACLToken
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
tokens = append(tokens, raw.(*structs.ACLToken))
|
||||
}
|
||||
return tokens
|
||||
}
|
||||
|
||||
// This time is the threshold for all expiry calls to be based on. All
|
||||
// tokens with expiry can use this as their base and use Add().
|
||||
expiryTimeThreshold := time.Date(2022, time.April, 27, 14, 50, 0, 0, time.UTC)
|
||||
@@ -31,13 +42,15 @@ func TestStateStore_ACLTokensByExpired(t *testing.T) {
|
||||
neverExpireLocalToken, neverExpireGlobalToken})
|
||||
require.NoError(t, err)
|
||||
|
||||
ids, err := testState.ACLTokensByExpired(true, expiryTimeThreshold, 10)
|
||||
iter, err := testState.ACLTokensByExpired(true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, ids, 0)
|
||||
tokens := fromIteratorFunc(iter)
|
||||
require.Len(t, tokens, 0)
|
||||
|
||||
ids, err = testState.ACLTokensByExpired(false, expiryTimeThreshold, 10)
|
||||
iter, err = testState.ACLTokensByExpired(false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, ids, 0)
|
||||
tokens = fromIteratorFunc(iter)
|
||||
require.Len(t, tokens, 0)
|
||||
|
||||
// Generate, upsert, and test an expired local token. This token expired
|
||||
// long ago and therefore before all others coming in the tests. It should
|
||||
@@ -48,10 +61,11 @@ func TestStateStore_ACLTokensByExpired(t *testing.T) {
|
||||
err = testState.UpsertACLTokens(structs.MsgTypeTestSetup, 20, []*structs.ACLToken{expiredLocalToken})
|
||||
require.NoError(t, err)
|
||||
|
||||
ids, err = testState.ACLTokensByExpired(false, expiryTimeThreshold, 10)
|
||||
iter, err = testState.ACLTokensByExpired(false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, ids, 1)
|
||||
require.Equal(t, expiredLocalToken.AccessorID, ids[0])
|
||||
tokens = fromIteratorFunc(iter)
|
||||
require.Len(t, tokens, 1)
|
||||
require.Equal(t, expiredLocalToken.AccessorID, tokens[0].AccessorID)
|
||||
|
||||
// Generate, upsert, and test an expired global token. This token expired
|
||||
// long ago and therefore before all others coming in the tests. It should
|
||||
@@ -63,54 +77,50 @@ func TestStateStore_ACLTokensByExpired(t *testing.T) {
|
||||
err = testState.UpsertACLTokens(structs.MsgTypeTestSetup, 30, []*structs.ACLToken{expiredGlobalToken})
|
||||
require.NoError(t, err)
|
||||
|
||||
ids, err = testState.ACLTokensByExpired(true, expiryTimeThreshold, 10)
|
||||
iter, err = testState.ACLTokensByExpired(true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, ids, 1)
|
||||
require.Equal(t, expiredGlobalToken.AccessorID, ids[0])
|
||||
tokens = fromIteratorFunc(iter)
|
||||
require.Len(t, tokens, 1)
|
||||
require.Equal(t, expiredGlobalToken.AccessorID, tokens[0].AccessorID)
|
||||
|
||||
// This test function allows us to run the same test for local and global
|
||||
// tokens.
|
||||
testFn := func(oldID string, global bool) {
|
||||
testFn := func(oldToken *structs.ACLToken, global bool) {
|
||||
|
||||
// Track all the expected expired accessor IDs including the long
|
||||
// Track all the expected expired ACL tokens, including the long
|
||||
// expired token.
|
||||
var expiredLocalAccessorIDs []string
|
||||
expiredLocalAccessorIDs = append(expiredLocalAccessorIDs, oldID)
|
||||
var expiredTokens []*structs.ACLToken
|
||||
expiredTokens = append(expiredTokens, oldToken)
|
||||
|
||||
// Generate and upsert a number of mixed expired, non-expired local tokens.
|
||||
mixedLocalTokens := make([]*structs.ACLToken, 20)
|
||||
// Generate and upsert a number of mixed expired, non-expired tokens.
|
||||
mixedTokens := make([]*structs.ACLToken, 20)
|
||||
for i := 0; i < 20; i++ {
|
||||
mockedToken := mock.ACLToken()
|
||||
mockedToken.Global = global
|
||||
if i%2 == 0 {
|
||||
expiredLocalAccessorIDs = append(expiredLocalAccessorIDs, mockedToken.AccessorID)
|
||||
expiredTokens = append(expiredTokens, mockedToken)
|
||||
mockedToken.ExpirationTime = pointer.Of(expiryTimeThreshold.Add(-24 * time.Hour))
|
||||
} else {
|
||||
mockedToken.ExpirationTime = pointer.Of(expiryTimeThreshold.Add(24 * time.Hour))
|
||||
}
|
||||
mixedLocalTokens[i] = mockedToken
|
||||
mixedTokens[i] = mockedToken
|
||||
}
|
||||
|
||||
err = testState.UpsertACLTokens(structs.MsgTypeTestSetup, 40, mixedLocalTokens)
|
||||
err = testState.UpsertACLTokens(structs.MsgTypeTestSetup, 40, mixedTokens)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Use a max value higher than the number we have to check the full listing
|
||||
// works as expected. Ensure our oldest expired token is first in the list.
|
||||
ids, err = testState.ACLTokensByExpired(global, expiryTimeThreshold, 100)
|
||||
// Check the full listing works as expected as the first 11 elements
|
||||
// should all be our expired tokens. Ensure our oldest expired token is
|
||||
// first in the list.
|
||||
iter, err = testState.ACLTokensByExpired(global)
|
||||
require.NoError(t, err)
|
||||
require.ElementsMatch(t, ids, expiredLocalAccessorIDs)
|
||||
require.Equal(t, ids[0], oldID)
|
||||
|
||||
// Use a lower max value than the number of known expired tokens to ensure
|
||||
// this is working.
|
||||
ids, err = testState.ACLTokensByExpired(global, expiryTimeThreshold, 3)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, ids, 3)
|
||||
require.Equal(t, ids[0], oldID)
|
||||
tokens = fromIteratorFunc(iter)
|
||||
require.ElementsMatch(t, expiredTokens, tokens[:11])
|
||||
require.Equal(t, tokens[0], oldToken)
|
||||
}
|
||||
|
||||
testFn(expiredLocalToken.AccessorID, false)
|
||||
testFn(expiredGlobalToken.AccessorID, true)
|
||||
testFn(expiredLocalToken, false)
|
||||
testFn(expiredGlobalToken, true)
|
||||
}
|
||||
|
||||
func Test_expiresIndexName(t *testing.T) {
|
||||
|
||||
@@ -24,6 +24,22 @@ const (
|
||||
// Args: ACLTokenUpsertRequest
|
||||
// Reply: ACLTokenUpsertResponse
|
||||
ACLUpsertTokensRPCMethod = "ACL.UpsertTokens"
|
||||
|
||||
// ACLDeleteTokensRPCMethod is the RPC method for batch deleting ACL
|
||||
// tokens.
|
||||
//
|
||||
// Args: ACLTokenDeleteRequest
|
||||
// Reply: GenericResponse
|
||||
ACLDeleteTokensRPCMethod = "ACL.DeleteTokens"
|
||||
)
|
||||
|
||||
const (
|
||||
// ACLMaxExpiredBatchSize is the maximum number of expired ACL tokens that
|
||||
// will be garbage collected in a single trigger. This number helps limit
|
||||
// the replication pressure due to expired token deletion. If there are a
|
||||
// large number of expired tokens pending garbage collection, this value is
|
||||
// a potential limiting factor.
|
||||
ACLMaxExpiredBatchSize = 4096
|
||||
)
|
||||
|
||||
// Canonicalize performs basic canonicalization on the ACL token object. It is
|
||||
|
||||
@@ -10741,6 +10741,16 @@ const (
|
||||
// tokens. We periodically scan for expired tokens and delete them.
|
||||
CoreJobOneTimeTokenGC = "one-time-token-gc"
|
||||
|
||||
// CoreJobLocalTokenExpiredGC is used for the garbage collection of
|
||||
// expired local ACL tokens. We periodically scan for expired tokens and
|
||||
// delete them.
|
||||
CoreJobLocalTokenExpiredGC = "local-token-expired-gc"
|
||||
|
||||
// CoreJobGlobalTokenExpiredGC is used for the garbage collection of
|
||||
// expired global ACL tokens. We periodically scan for expired tokens and
|
||||
// delete them.
|
||||
CoreJobGlobalTokenExpiredGC = "global-token-expired-gc"
|
||||
|
||||
// CoreJobForceGC is used to force garbage collection of all GCable objects.
|
||||
CoreJobForceGC = "force-gc"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user