diff --git a/.semgrep/rpc_metrics.yml b/.semgrep/rpc_metrics.yml new file mode 100644 index 000000000..d78022054 --- /dev/null +++ b/.semgrep/rpc_metrics.yml @@ -0,0 +1,18 @@ +rules: + # Check for server RPC endpoints without metrics + - id: "rpc-missing-metrics" + patterns: + - pattern: | + authErr := $A.$B.Authenticate($A.ctx, args) + - pattern-not-inside: | + authErr := $A.$B.Authenticate($A.ctx, args) + ... + $T.srv.MeasureRPCRate(...) + ... + message: "RPC method appears to be missing metrics" + languages: + - "go" + severity: "WARNING" + paths: + include: + - "nomad/*_endpoint.go" diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 1aea718d6..07b339395 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -197,15 +197,20 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest, func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, reply *structs.AllocsGetResponse) error { + authErr := a.srv.Authenticate(a.ctx, args) + // Ensure the connection was initiated by a client if TLS is used. err := validateTLSCertificateLevel(a.srv, a.ctx, tlsCertificateLevelClient) if err != nil { return err } - if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done { return err } + a.srv.MeasureRPCRate("alloc", structs.RateMetricList, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now()) allocs := make([]*structs.Allocation, len(args.AllocIDs)) diff --git a/nomad/deployment_endpoint.go b/nomad/deployment_endpoint.go index 3449a3f57..a6820c778 100644 --- a/nomad/deployment_endpoint.go +++ b/nomad/deployment_endpoint.go @@ -590,15 +590,20 @@ func (d *Deployment) Allocations(args *structs.DeploymentSpecificRequest, reply func (d *Deployment) Reap(args *structs.DeploymentDeleteRequest, reply *structs.GenericResponse) error { + authErr := d.srv.Authenticate(d.ctx, args) + // Ensure the connection was initiated by another server if TLS is used. err := validateTLSCertificateLevel(d.srv, d.ctx, tlsCertificateLevelServer) if err != nil { return err } - if done, err := d.srv.forward("Deployment.Reap", args, args, reply); done { return err } + d.srv.MeasureRPCRate("deployment", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "deployment", "reap"}, time.Now()) // Update via Raft diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 15cc135b3..b57caf7aa 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -124,7 +124,6 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, if err != nil { return err } - if done, err := e.srv.forward("Eval.Dequeue", args, args, reply); done { return err } @@ -132,7 +131,6 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, if authErr != nil { return structs.ErrPermissionDenied } - defer metrics.MeasureSince([]string{"nomad", "eval", "dequeue"}, time.Now()) // Ensure there is at least one scheduler @@ -233,15 +231,20 @@ func (e *Eval) getWaitIndex(namespace, job string, evalModifyIndex uint64) (uint func (e *Eval) Ack(args *structs.EvalAckRequest, reply *structs.GenericResponse) error { + authErr := e.srv.Authenticate(e.ctx, args) + // Ensure the connection was initiated by another server if TLS is used. err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer) if err != nil { return err } - if done, err := e.srv.forward("Eval.Ack", args, args, reply); done { return err } + e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "eval", "ack"}, time.Now()) // Ack the EvalID @@ -263,15 +266,20 @@ func (e *Eval) Ack(args *structs.EvalAckRequest, func (e *Eval) Nack(args *structs.EvalAckRequest, reply *structs.GenericResponse) error { + authErr := e.srv.Authenticate(e.ctx, args) + // Ensure the connection was initiated by another server if TLS is used. err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer) if err != nil { return err } - if done, err := e.srv.forward("Eval.Nack", args, args, reply); done { return err } + e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "eval", "nack"}, time.Now()) // Nack the EvalID @@ -285,15 +293,20 @@ func (e *Eval) Nack(args *structs.EvalAckRequest, func (e *Eval) Update(args *structs.EvalUpdateRequest, reply *structs.GenericResponse) error { + authErr := e.srv.Authenticate(e.ctx, args) + // Ensure the connection was initiated by another server if TLS is used. err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer) if err != nil { return err } - if done, err := e.srv.forward("Eval.Update", args, args, reply); done { return err } + e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "eval", "update"}, time.Now()) // Ensure there is only a single update with token @@ -322,15 +335,20 @@ func (e *Eval) Update(args *structs.EvalUpdateRequest, func (e *Eval) Create(args *structs.EvalUpdateRequest, reply *structs.GenericResponse) error { + authErr := e.srv.Authenticate(e.ctx, args) + // Ensure the connection was initiated by another server if TLS is used. err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer) if err != nil { return err } - if done, err := e.srv.forward("Eval.Create", args, args, reply); done { return err } + e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "eval", "create"}, time.Now()) // Ensure there is only a single update with token @@ -373,15 +391,21 @@ func (e *Eval) Create(args *structs.EvalUpdateRequest, // Reblock is used to reinsert an existing blocked evaluation into the blocked // evaluation tracker. func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericResponse) error { + + authErr := e.srv.Authenticate(e.ctx, args) + // Ensure the connection was initiated by another server if TLS is used. err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer) if err != nil { return err } - if done, err := e.srv.forward("Eval.Reblock", args, args, reply); done { return err } + e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "eval", "reblock"}, time.Now()) // Ensure there is only a single update with token @@ -422,15 +446,20 @@ func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericRe func (e *Eval) Reap(args *structs.EvalReapRequest, reply *structs.GenericResponse) error { + authErr := e.srv.Authenticate(e.ctx, args) + // Ensure the connection was initiated by another server if TLS is used. err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer) if err != nil { return err } - if done, err := e.srv.forward("Eval.Reap", args, args, reply); done { return err } + e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "eval", "reap"}, time.Now()) // Update via Raft diff --git a/nomad/keyring_endpoint.go b/nomad/keyring_endpoint.go index 569d93e38..ff30302c3 100644 --- a/nomad/keyring_endpoint.go +++ b/nomad/keyring_endpoint.go @@ -241,16 +241,21 @@ func (k *Keyring) validateUpdate(args *structs.KeyringUpdateRootKeyRequest) erro // Get retrieves an existing key from the keyring, including both the // key material and metadata. It is used only for replication. func (k *Keyring) Get(args *structs.KeyringGetRootKeyRequest, reply *structs.KeyringGetRootKeyResponse) error { + + authErr := k.srv.Authenticate(k.ctx, args) + // ensure that only another server can make this request err := validateTLSCertificateLevel(k.srv, k.ctx, tlsCertificateLevelServer) if err != nil { return err } - if done, err := k.srv.forward("Keyring.Get", args, args, reply); done { return err } - + k.srv.MeasureRPCRate("keyring", structs.RateMetricRead, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "keyring", "get"}, time.Now()) if args.KeyID == "" { diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index c6b3b7e21..38990cf4e 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1090,6 +1090,8 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, // per allocation. func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply *structs.NodeClientAllocsResponse) error { + + authErr := n.srv.Authenticate(n.ctx, args) isForwarded := args.IsForwarded() if done, err := n.srv.forward("Node.GetClientAllocs", args, args, reply); done { // We have a valid node connection since there is no error from the @@ -1102,6 +1104,10 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, return err } + n.srv.MeasureRPCRate("node", structs.RateMetricList, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "client", "get_client_allocs"}, time.Now()) // Verify the arguments @@ -1235,6 +1241,7 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, // - The node status is down or disconnected. Clients must call the // UpdateStatus method to update its status in the server. func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error { + authErr := n.srv.Authenticate(n.ctx, args) // Ensure the connection was initiated by another client if TLS is used. @@ -1242,7 +1249,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene if err != nil { return err } - if done, err := n.srv.forward("Node.UpdateAlloc", args, args, reply); done { return err } @@ -1704,6 +1710,9 @@ func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string, // DeriveVaultToken is used by the clients to request wrapped Vault tokens for // tasks func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, reply *structs.DeriveVaultTokenResponse) error { + + authErr := n.srv.Authenticate(n.ctx, args) + setError := func(e error, recoverable bool) { if e != nil { if re, ok := e.(*structs.RecoverableError); ok { @@ -1719,6 +1728,10 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, reply *st setError(err, structs.IsRecoverable(err) || err == structs.ErrNoLeader) return nil } + n.srv.MeasureRPCRate("node", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "client", "derive_vault_token"}, time.Now()) // Verify the arguments @@ -1926,6 +1939,9 @@ type connectTask struct { } func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.DeriveSITokenResponse) error { + + authErr := n.srv.Authenticate(n.ctx, args) + setError := func(e error, recoverable bool) { if e != nil { if re, ok := e.(*structs.RecoverableError); ok { @@ -1941,6 +1957,10 @@ func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs. setError(err, structs.IsRecoverable(err) || err == structs.ErrNoLeader) return nil } + n.srv.MeasureRPCRate("node", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "client", "derive_si_token"}, time.Now()) // Verify the arguments @@ -2159,15 +2179,21 @@ func taskUsesConnect(task *structs.Task) bool { } func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error { + + authErr := n.srv.Authenticate(n.ctx, args) + // Ensure the connection was initiated by another client if TLS is used. err := validateTLSCertificateLevel(n.srv, n.ctx, tlsCertificateLevelClient) if err != nil { return err } - if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done { return err } + n.srv.MeasureRPCRate("node", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "client", "emit_events"}, time.Now()) if len(args.NodeEvents) == 0 { diff --git a/nomad/plan_endpoint.go b/nomad/plan_endpoint.go index 585049d4b..2903709eb 100644 --- a/nomad/plan_endpoint.go +++ b/nomad/plan_endpoint.go @@ -23,15 +23,21 @@ func NewPlanEndpoint(srv *Server, ctx *RPCContext) *Plan { // Submit is used to submit a plan to the leader func (p *Plan) Submit(args *structs.PlanRequest, reply *structs.PlanResponse) error { + + authErr := p.srv.Authenticate(p.ctx, args) + // Ensure the connection was initiated by another server if TLS is used. err := validateTLSCertificateLevel(p.srv, p.ctx, tlsCertificateLevelServer) if err != nil { return err } - if done, err := p.srv.forward("Plan.Submit", args, args, reply); done { return err } + p.srv.MeasureRPCRate("plan", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "plan", "submit"}, time.Now()) if args.Plan == nil { diff --git a/nomad/service_registration_endpoint.go b/nomad/service_registration_endpoint.go index f1774ea77..e6adfd3d8 100644 --- a/nomad/service_registration_endpoint.go +++ b/nomad/service_registration_endpoint.go @@ -37,14 +37,19 @@ func (s *ServiceRegistration) Upsert( args *structs.ServiceRegistrationUpsertRequest, reply *structs.ServiceRegistrationUpsertResponse) error { + authErr := s.srv.Authenticate(s.ctx, args) + // Ensure the connection was initiated by a client if TLS is used. if err := validateTLSCertificateLevel(s.srv, s.ctx, tlsCertificateLevelClient); err != nil { return err } - if done, err := s.srv.forward(structs.ServiceRegistrationUpsertRPCMethod, args, args, reply); done { return err } + s.srv.MeasureRPCRate("service_registration", structs.RateMetricWrite, args) + if authErr != nil { + return structs.ErrPermissionDenied + } defer metrics.MeasureSince([]string{"nomad", "service_registration", "upsert"}, time.Now()) // Nomad service registrations can only be used once all servers, in the