diff --git a/command/agent/agent.go b/command/agent/agent.go index e1b4364b5..5f4d95be2 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -314,6 +314,13 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { } conf.DeploymentGCThreshold = dur } + if gcThreshold := agentConfig.Server.CSIPluginGCThreshold; gcThreshold != "" { + dur, err := time.ParseDuration(gcThreshold) + if err != nil { + return nil, err + } + conf.CSIPluginGCThreshold = dur + } if heartbeatGrace := agentConfig.Server.HeartbeatGrace; heartbeatGrace != 0 { conf.HeartbeatGrace = heartbeatGrace diff --git a/command/agent/config.go b/command/agent/config.go index 0b2567736..799d57eea 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -389,6 +389,11 @@ type ServerConfig struct { // GCed but the threshold can be used to filter by age. DeploymentGCThreshold string `hcl:"deployment_gc_threshold"` + // CSIPluginGCThreshold controls how "old" a CSI plugin must be to be + // collected by GC. Age is not the only requirement for a plugin to be + // GCed but the threshold can be used to filter by age. + CSIPluginGCThreshold string `hcl:"csi_plugin_gc_threshold"` + // HeartbeatGrace is the grace period beyond the TTL to account for network, // processing delays and clock skew before marking a node as "down". HeartbeatGrace time.Duration @@ -1323,6 +1328,9 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig { if b.DeploymentGCThreshold != "" { result.DeploymentGCThreshold = b.DeploymentGCThreshold } + if b.CSIPluginGCThreshold != "" { + result.CSIPluginGCThreshold = b.CSIPluginGCThreshold + } if b.HeartbeatGrace != 0 { result.HeartbeatGrace = b.HeartbeatGrace } diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 157413ec0..e5523373f 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -104,6 +104,7 @@ var basicConfig = &Config{ JobGCInterval: "3m", JobGCThreshold: "12h", DeploymentGCThreshold: "12h", + CSIPluginGCThreshold: "12h", HeartbeatGrace: 30 * time.Second, HeartbeatGraceHCL: "30s", MinHeartbeatTTL: 33 * time.Second, diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index 78d8d63ef..69ed6cf00 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -115,6 +115,7 @@ server { job_gc_threshold = "12h" eval_gc_threshold = "12h" deployment_gc_threshold = "12h" + csi_plugin_gc_threshold = "12h" heartbeat_grace = "30s" min_heartbeat_ttl = "33s" max_heartbeats_per_second = 11.0 diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index b02e0db88..ab18da9c1 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -256,6 +256,7 @@ { "authoritative_region": "foobar", "bootstrap_expect": 5, + "csi_plugin_gc_threshold": "12h", "data_dir": "/tmp/data", "deployment_gc_threshold": "12h", "enabled": true, diff --git a/nomad/config.go b/nomad/config.go index 8b4fc1c25..e7be14d2c 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -191,6 +191,13 @@ type Config struct { // for GC. This gives users some time to view terminal deployments. DeploymentGCThreshold time.Duration + // CSIPluginGCInterval is how often we dispatch a job to GC unused plugins. + CSIPluginGCInterval time.Duration + + // CSIPluginGCThreshold is how "old" a plugin must be to be eligible + // for GC. This gives users some time to debug plugins. + CSIPluginGCThreshold time.Duration + // EvalNackTimeout controls how long we allow a sub-scheduler to // work on an evaluation before we consider it failed and Nack it. // This allows that evaluation to be handed to another sub-scheduler @@ -377,6 +384,8 @@ func DefaultConfig() *Config { NodeGCThreshold: 24 * time.Hour, DeploymentGCInterval: 5 * time.Minute, DeploymentGCThreshold: 1 * time.Hour, + CSIPluginGCInterval: 5 * time.Minute, + CSIPluginGCThreshold: 1 * time.Hour, EvalNackTimeout: 60 * time.Second, EvalDeliveryLimit: 3, EvalNackInitialReenqueueDelay: 1 * time.Second, diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 08f65e5bf..e3f762f4c 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -54,6 +54,8 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error { return c.deploymentGC(eval) case structs.CoreJobCSIVolumeClaimGC: return c.csiVolumeClaimGC(eval) + case structs.CoreJobCSIPluginGC: + return c.csiPluginGC(eval) case structs.CoreJobForceGC: return c.forceGC(eval) default: @@ -72,6 +74,9 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { if err := c.deploymentGC(eval); err != nil { return err } + if err := c.csiPluginGC(eval); err != nil { + return err + } // Node GC must occur after the others to ensure the allocations are // cleared. @@ -736,3 +741,51 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) return err } + +// csiPluginGC is used to garbage collect unused plugins +func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error { + + ws := memdb.NewWatchSet() + + iter, err := c.snap.CSIPlugins(ws) + if err != nil { + return err + } + + // Get the time table to calculate GC cutoffs. + var oldThreshold uint64 + if eval.JobID == structs.CoreJobForceGC { + // The GC was forced, so set the threshold to its maximum so + // everything will GC. + oldThreshold = math.MaxUint64 + c.logger.Debug("forced plugin GC") + } else { + tt := c.srv.fsm.TimeTable() + cutoff := time.Now().UTC().Add(-1 * c.srv.config.CSIPluginGCThreshold) + oldThreshold = tt.NearestIndex(cutoff) + } + + c.logger.Debug("CSI plugin GC scanning before cutoff index", + "index", oldThreshold, "csi_plugin_gc_threshold", c.srv.config.CSIPluginGCThreshold) + + for i := iter.Next(); i != nil; i = iter.Next() { + plugin := i.(*structs.CSIPlugin) + + // Ignore new plugins + if plugin.CreateIndex > oldThreshold { + continue + } + + req := &structs.CSIPluginDeleteRequest{ID: plugin.ID} + req.Region = c.srv.Region() + err := c.srv.RPC("CSIPlugin.Delete", req, &structs.CSIPluginDeleteResponse{}) + if err != nil { + if err.Error() == "plugin in use" { + continue + } + c.logger.Error("failed to GC plugin", "plugin_id", plugin.ID, "error", err) + return err + } + } + return nil +} diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 70b500a82..bfebd51ba 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -8,6 +8,7 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" @@ -2193,3 +2194,57 @@ func TestAllocation_GCEligible(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete require.True(allocGCEligible(alloc, nil, time.Now(), 1000)) } + +func TestCoreScheduler_CSIPluginGC(t *testing.T) { + t.Parallel() + + srv, cleanupSRV := TestServer(t, nil) + defer cleanupSRV() + testutil.WaitForLeader(t, srv.RPC) + require := require.New(t) + + srv.fsm.timetable.table = make([]TimeTableEntry, 1, 10) + + deleteNodes := state.CreateTestCSIPlugin(srv.fsm.State(), "foo") + defer deleteNodes() + state := srv.fsm.State() + + // Update the time tables to make this work + tt := srv.fsm.TimeTable() + index := uint64(2000) + tt.Witness(index, time.Now().UTC().Add(-1*srv.config.DeploymentGCThreshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + require.NoError(err) + core := NewCoreScheduler(srv, snap) + + // Attempt the GC + index++ + gc := srv.coreJobEval(structs.CoreJobCSIPluginGC, index) + require.NoError(core.Process(gc)) + + // Should not be gone (plugin in use) + ws := memdb.NewWatchSet() + plug, err := state.CSIPluginByID(ws, "foo") + require.NotNil(plug) + require.NoError(err) + + // Empty the plugin + plug.Controllers = map[string]*structs.CSIInfo{} + plug.Nodes = map[string]*structs.CSIInfo{} + + index++ + err = state.UpsertCSIPlugin(index, plug) + require.NoError(err) + + // Retry + index++ + gc = srv.coreJobEval(structs.CoreJobCSIPluginGC, index) + require.NoError(core.Process(gc)) + + // Should be gone + plug, err = state.CSIPluginByID(ws, "foo") + require.Nil(plug) + require.NoError(err) +} diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index b04294680..56e6254be 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -594,3 +594,34 @@ func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPlu }} return v.srv.blockingRPC(&opts) } + +// Delete deletes a plugin if it is unused +func (v *CSIPlugin) Delete(args *structs.CSIPluginDeleteRequest, reply *structs.CSIPluginDeleteResponse) error { + if done, err := v.srv.forward("CSIPlugin.Delete", args, args, reply); done { + return err + } + + // Check that it is a management token. + if aclObj, err := v.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + metricsStart := time.Now() + defer metrics.MeasureSince([]string{"nomad", "plugin", "delete"}, metricsStart) + + resp, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args) + if err != nil { + v.logger.Error("csi raft apply failed", "error", err, "method", "delete") + return err + } + + if respErr, ok := resp.(error); ok { + return respErr + } + + reply.Index = index + v.srv.setQueryMeta(&reply.QueryMeta) + return nil +} diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 1878d109b..fb903cdb3 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -548,6 +548,87 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) { require.Nil(t, resp2.Plugin) } +func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) { + t.Parallel() + srv, shutdown := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer shutdown() + testutil.WaitForLeader(t, srv.RPC) + + deleteNodes := state.CreateTestCSIPlugin(srv.fsm.State(), "foo") + defer deleteNodes() + + state := srv.fsm.State() + state.BootstrapACLTokens(1, 0, mock.ACLManagementToken()) + srv.config.ACLEnabled = true + codec := rpcClient(t, srv) + + // Get the plugin back out + listJob := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}) + policy := mock.PluginPolicy("read") + listJob + getToken := mock.CreatePolicyAndToken(t, state, 1001, "plugin-read", policy) + + reqGet := &structs.CSIPluginGetRequest{ + ID: "foo", + QueryOptions: structs.QueryOptions{ + Region: "global", + AuthToken: getToken.SecretID, + }, + } + respGet := &structs.CSIPluginGetResponse{} + err := msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet) + require.NoError(t, err) + require.NotNil(t, respGet.Plugin) + + // Delete plugin + reqDel := &structs.CSIPluginDeleteRequest{ + ID: "foo", + QueryOptions: structs.QueryOptions{ + Region: "global", + AuthToken: getToken.SecretID, + }, + } + respDel := &structs.CSIPluginDeleteResponse{} + + // Improper permissions + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel) + require.EqualError(t, err, structs.ErrPermissionDenied.Error()) + + // Retry with management permissions + reqDel.AuthToken = srv.getLeaderAcl() + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel) + require.EqualError(t, err, "plugin in use") + + // Plugin was not deleted + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet) + require.NoError(t, err) + require.NotNil(t, respGet.Plugin) + + // Empty the plugin + plugin := respGet.Plugin.Copy() + plugin.Controllers = map[string]*structs.CSIInfo{} + plugin.Nodes = map[string]*structs.CSIInfo{} + + index, _ := state.LatestIndex() + index++ + err = state.UpsertCSIPlugin(index, plugin) + require.NoError(t, err) + + // Retry now that it's empty + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel) + require.NoError(t, err) + + // Plugin is deleted + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", reqGet, respGet) + require.NoError(t, err) + require.Nil(t, respGet.Plugin) + + // Safe to call on already-deleted plugnis + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Delete", reqDel, respDel) + require.NoError(t, err) +} + func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) { srv, shutdown := TestServer(t, func(c *Config) {}) defer shutdown() diff --git a/nomad/fsm.go b/nomad/fsm.go index b9f412393..39280a736 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -270,10 +270,12 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyCSIVolumeDeregister(buf[1:], log.Index) case structs.CSIVolumeClaimRequestType: return n.applyCSIVolumeClaim(buf[1:], log.Index) - case structs.CSIVolumeClaimBatchRequestType: - return n.applyCSIVolumeBatchClaim(buf[1:], log.Index) case structs.ScalingEventRegisterRequestType: return n.applyUpsertScalingEvent(buf[1:], log.Index) + case structs.CSIVolumeClaimBatchRequestType: + return n.applyCSIVolumeBatchClaim(buf[1:], log.Index) + case structs.CSIPluginDeleteRequestType: + return n.applyCSIPluginDelete(buf[1:], log.Index) } // Check enterprise only message types. @@ -1190,6 +1192,24 @@ func (n *nomadFSM) applyCSIVolumeClaim(buf []byte, index uint64) interface{} { return nil } +func (n *nomadFSM) applyCSIPluginDelete(buf []byte, index uint64) interface{} { + var req structs.CSIPluginDeleteRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_plugin_delete"}, time.Now()) + + if err := n.state.DeleteCSIPlugin(index, req.ID); err != nil { + // "plugin in use" is an error for the state store but not for typical + // callers, so reduce log noise by not logging that case here + if err.Error() != "plugin in use" { + n.logger.Error("DeleteCSIPlugin failed", "error", err) + } + return err + } + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() diff --git a/nomad/leader.go b/nomad/leader.go index 29550dc79..97a59eb1a 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -522,6 +522,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { defer jobGC.Stop() deploymentGC := time.NewTicker(s.config.DeploymentGCInterval) defer deploymentGC.Stop() + csiPluginGC := time.NewTicker(s.config.CSIPluginGCInterval) + defer csiPluginGC.Stop() // getLatest grabs the latest index from the state store. It returns true if // the index was retrieved successfully. @@ -554,6 +556,11 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { if index, ok := getLatest(); ok { s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobDeploymentGC, index)) } + case <-csiPluginGC.C: + if index, ok := getLatest(); ok { + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIPluginGC, index)) + } + case <-stopCh: return } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 88d9b1bb8..fbb1e3e57 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2312,6 +2312,65 @@ func (s *StateStore) CSIPluginDenormalize(ws memdb.WatchSet, plug *structs.CSIPl return plug, nil } +// UpsertCSIPlugin writes the plugin to the state store. Note: there +// is currently no raft message for this, as it's intended to support +// testing use cases. +func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) error { + txn := s.db.Txn(true) + defer txn.Abort() + + existing, err := txn.First("csi_plugins", "id", plug.ID) + if err != nil { + return fmt.Errorf("csi_plugin lookup error: %s %v", plug.ID, err) + } + + plug.ModifyIndex = index + if existing != nil { + plug.CreateIndex = existing.(*structs.CSIPlugin).CreateIndex + } + + err = txn.Insert("csi_plugins", plug) + if err != nil { + return fmt.Errorf("csi_plugins insert error: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + txn.Commit() + return nil +} + +// DeleteCSIPlugin deletes the plugin if it's not in use. +func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error { + txn := s.db.Txn(true) + defer txn.Abort() + ws := memdb.NewWatchSet() + + plug, err := s.CSIPluginByID(ws, id) + if err != nil { + return err + } + + if plug == nil { + return nil + } + + plug, err = s.CSIPluginDenormalize(ws, plug.Copy()) + if err != nil { + return err + } + if !plug.IsEmpty() { + return fmt.Errorf("plugin in use") + } + + err = txn.Delete("csi_plugins", plug) + if err != nil { + return fmt.Errorf("csi_plugins delete error: %v", err) + } + txn.Commit() + return nil +} + // UpsertPeriodicLaunch is used to register a launch or update it. func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error { txn := s.db.Txn(true) diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index a227e93c1..7cecb7cd9 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -868,3 +868,12 @@ type CSIPluginGetResponse struct { Plugin *CSIPlugin QueryMeta } + +type CSIPluginDeleteRequest struct { + ID string + QueryOptions +} + +type CSIPluginDeleteResponse struct { + QueryMeta +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5a2e47c5f..0880cbef0 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -94,6 +94,7 @@ const ( CSIVolumeClaimRequestType ScalingEventRegisterRequestType CSIVolumeClaimBatchRequestType + CSIPluginDeleteRequestType ) const ( @@ -8990,6 +8991,11 @@ const ( // claiming them. If so, we unclaim the volume. CoreJobCSIVolumeClaimGC = "csi-volume-claim-gc" + // CoreJobCSIPluginGC is use for the garbage collection of CSI plugins. + // We periodically scan plugins to see if they have no associated volumes + // or allocs running them. If so, we delete the plugin. + CoreJobCSIPluginGC = "csi-plugin-gc" + // CoreJobForceGC is used to force garbage collection of all GCable objects. CoreJobForceGC = "force-gc" ) diff --git a/website/pages/docs/configuration/server.mdx b/website/pages/docs/configuration/server.mdx index db7294a7b..e14a3fae2 100644 --- a/website/pages/docs/configuration/server.mdx +++ b/website/pages/docs/configuration/server.mdx @@ -90,6 +90,10 @@ server { deployment must be in the terminal state before it is eligible for garbage collection. This is specified using a label suffix like "30s" or "1h". +- `csi_plugin_gc_threshold` `(string: "1h")` - Specifies the minimum age of + CSI plugin before it is eligible for garbage collection if not in use. + This is specified using a label suffix like "30s" or "1h". + - `default_scheduler_config` ([scheduler_configuration][update-scheduler-config]: nil) - Specifies the initial default scheduler config when bootstrapping cluster. The parameter is ignored once the cluster is bootstrapped or