mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
metrics: Add RPC rate metrics to endpoints that validate TLS names (#15900)
This commit is contained in:
18
.semgrep/rpc_metrics.yml
Normal file
18
.semgrep/rpc_metrics.yml
Normal file
@@ -0,0 +1,18 @@
|
||||
rules:
|
||||
# Check for server RPC endpoints without metrics
|
||||
- id: "rpc-missing-metrics"
|
||||
patterns:
|
||||
- pattern: |
|
||||
authErr := $A.$B.Authenticate($A.ctx, args)
|
||||
- pattern-not-inside: |
|
||||
authErr := $A.$B.Authenticate($A.ctx, args)
|
||||
...
|
||||
$T.srv.MeasureRPCRate(...)
|
||||
...
|
||||
message: "RPC method appears to be missing metrics"
|
||||
languages:
|
||||
- "go"
|
||||
severity: "WARNING"
|
||||
paths:
|
||||
include:
|
||||
- "nomad/*_endpoint.go"
|
||||
@@ -197,15 +197,20 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
|
||||
func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
|
||||
reply *structs.AllocsGetResponse) error {
|
||||
|
||||
authErr := a.srv.Authenticate(a.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by a client if TLS is used.
|
||||
err := validateTLSCertificateLevel(a.srv, a.ctx, tlsCertificateLevelClient)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
a.srv.MeasureRPCRate("alloc", structs.RateMetricList, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now())
|
||||
|
||||
allocs := make([]*structs.Allocation, len(args.AllocIDs))
|
||||
|
||||
@@ -590,15 +590,20 @@ func (d *Deployment) Allocations(args *structs.DeploymentSpecificRequest, reply
|
||||
func (d *Deployment) Reap(args *structs.DeploymentDeleteRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
|
||||
authErr := d.srv.Authenticate(d.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by another server if TLS is used.
|
||||
err := validateTLSCertificateLevel(d.srv, d.ctx, tlsCertificateLevelServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := d.srv.forward("Deployment.Reap", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
d.srv.MeasureRPCRate("deployment", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "deployment", "reap"}, time.Now())
|
||||
|
||||
// Update via Raft
|
||||
|
||||
@@ -124,7 +124,6 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := e.srv.forward("Eval.Dequeue", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
@@ -132,7 +131,6 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "dequeue"}, time.Now())
|
||||
|
||||
// Ensure there is at least one scheduler
|
||||
@@ -233,15 +231,20 @@ func (e *Eval) getWaitIndex(namespace, job string, evalModifyIndex uint64) (uint
|
||||
func (e *Eval) Ack(args *structs.EvalAckRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
|
||||
authErr := e.srv.Authenticate(e.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by another server if TLS is used.
|
||||
err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := e.srv.forward("Eval.Ack", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "ack"}, time.Now())
|
||||
|
||||
// Ack the EvalID
|
||||
@@ -263,15 +266,20 @@ func (e *Eval) Ack(args *structs.EvalAckRequest,
|
||||
func (e *Eval) Nack(args *structs.EvalAckRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
|
||||
authErr := e.srv.Authenticate(e.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by another server if TLS is used.
|
||||
err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := e.srv.forward("Eval.Nack", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "nack"}, time.Now())
|
||||
|
||||
// Nack the EvalID
|
||||
@@ -285,15 +293,20 @@ func (e *Eval) Nack(args *structs.EvalAckRequest,
|
||||
func (e *Eval) Update(args *structs.EvalUpdateRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
|
||||
authErr := e.srv.Authenticate(e.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by another server if TLS is used.
|
||||
err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := e.srv.forward("Eval.Update", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "update"}, time.Now())
|
||||
|
||||
// Ensure there is only a single update with token
|
||||
@@ -322,15 +335,20 @@ func (e *Eval) Update(args *structs.EvalUpdateRequest,
|
||||
func (e *Eval) Create(args *structs.EvalUpdateRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
|
||||
authErr := e.srv.Authenticate(e.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by another server if TLS is used.
|
||||
err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := e.srv.forward("Eval.Create", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "create"}, time.Now())
|
||||
|
||||
// Ensure there is only a single update with token
|
||||
@@ -373,15 +391,21 @@ func (e *Eval) Create(args *structs.EvalUpdateRequest,
|
||||
// Reblock is used to reinsert an existing blocked evaluation into the blocked
|
||||
// evaluation tracker.
|
||||
func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericResponse) error {
|
||||
|
||||
authErr := e.srv.Authenticate(e.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by another server if TLS is used.
|
||||
err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := e.srv.forward("Eval.Reblock", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "reblock"}, time.Now())
|
||||
|
||||
// Ensure there is only a single update with token
|
||||
@@ -422,15 +446,20 @@ func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericRe
|
||||
func (e *Eval) Reap(args *structs.EvalReapRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
|
||||
authErr := e.srv.Authenticate(e.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by another server if TLS is used.
|
||||
err := validateTLSCertificateLevel(e.srv, e.ctx, tlsCertificateLevelServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := e.srv.forward("Eval.Reap", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
e.srv.MeasureRPCRate("eval", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "reap"}, time.Now())
|
||||
|
||||
// Update via Raft
|
||||
|
||||
@@ -241,16 +241,21 @@ func (k *Keyring) validateUpdate(args *structs.KeyringUpdateRootKeyRequest) erro
|
||||
// Get retrieves an existing key from the keyring, including both the
|
||||
// key material and metadata. It is used only for replication.
|
||||
func (k *Keyring) Get(args *structs.KeyringGetRootKeyRequest, reply *structs.KeyringGetRootKeyResponse) error {
|
||||
|
||||
authErr := k.srv.Authenticate(k.ctx, args)
|
||||
|
||||
// ensure that only another server can make this request
|
||||
err := validateTLSCertificateLevel(k.srv, k.ctx, tlsCertificateLevelServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := k.srv.forward("Keyring.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
k.srv.MeasureRPCRate("keyring", structs.RateMetricRead, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "keyring", "get"}, time.Now())
|
||||
|
||||
if args.KeyID == "" {
|
||||
|
||||
@@ -1090,6 +1090,8 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest,
|
||||
// per allocation.
|
||||
func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
|
||||
reply *structs.NodeClientAllocsResponse) error {
|
||||
|
||||
authErr := n.srv.Authenticate(n.ctx, args)
|
||||
isForwarded := args.IsForwarded()
|
||||
if done, err := n.srv.forward("Node.GetClientAllocs", args, args, reply); done {
|
||||
// We have a valid node connection since there is no error from the
|
||||
@@ -1102,6 +1104,10 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
|
||||
|
||||
return err
|
||||
}
|
||||
n.srv.MeasureRPCRate("node", structs.RateMetricList, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "client", "get_client_allocs"}, time.Now())
|
||||
|
||||
// Verify the arguments
|
||||
@@ -1235,6 +1241,7 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
|
||||
// - The node status is down or disconnected. Clients must call the
|
||||
// UpdateStatus method to update its status in the server.
|
||||
func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error {
|
||||
|
||||
authErr := n.srv.Authenticate(n.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by another client if TLS is used.
|
||||
@@ -1242,7 +1249,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := n.srv.forward("Node.UpdateAlloc", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
@@ -1704,6 +1710,9 @@ func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string,
|
||||
// DeriveVaultToken is used by the clients to request wrapped Vault tokens for
|
||||
// tasks
|
||||
func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, reply *structs.DeriveVaultTokenResponse) error {
|
||||
|
||||
authErr := n.srv.Authenticate(n.ctx, args)
|
||||
|
||||
setError := func(e error, recoverable bool) {
|
||||
if e != nil {
|
||||
if re, ok := e.(*structs.RecoverableError); ok {
|
||||
@@ -1719,6 +1728,10 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, reply *st
|
||||
setError(err, structs.IsRecoverable(err) || err == structs.ErrNoLeader)
|
||||
return nil
|
||||
}
|
||||
n.srv.MeasureRPCRate("node", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "client", "derive_vault_token"}, time.Now())
|
||||
|
||||
// Verify the arguments
|
||||
@@ -1926,6 +1939,9 @@ type connectTask struct {
|
||||
}
|
||||
|
||||
func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.DeriveSITokenResponse) error {
|
||||
|
||||
authErr := n.srv.Authenticate(n.ctx, args)
|
||||
|
||||
setError := func(e error, recoverable bool) {
|
||||
if e != nil {
|
||||
if re, ok := e.(*structs.RecoverableError); ok {
|
||||
@@ -1941,6 +1957,10 @@ func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.
|
||||
setError(err, structs.IsRecoverable(err) || err == structs.ErrNoLeader)
|
||||
return nil
|
||||
}
|
||||
n.srv.MeasureRPCRate("node", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "client", "derive_si_token"}, time.Now())
|
||||
|
||||
// Verify the arguments
|
||||
@@ -2159,15 +2179,21 @@ func taskUsesConnect(task *structs.Task) bool {
|
||||
}
|
||||
|
||||
func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error {
|
||||
|
||||
authErr := n.srv.Authenticate(n.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by another client if TLS is used.
|
||||
err := validateTLSCertificateLevel(n.srv, n.ctx, tlsCertificateLevelClient)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
n.srv.MeasureRPCRate("node", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "client", "emit_events"}, time.Now())
|
||||
|
||||
if len(args.NodeEvents) == 0 {
|
||||
|
||||
@@ -23,15 +23,21 @@ func NewPlanEndpoint(srv *Server, ctx *RPCContext) *Plan {
|
||||
|
||||
// Submit is used to submit a plan to the leader
|
||||
func (p *Plan) Submit(args *structs.PlanRequest, reply *structs.PlanResponse) error {
|
||||
|
||||
authErr := p.srv.Authenticate(p.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by another server if TLS is used.
|
||||
err := validateTLSCertificateLevel(p.srv, p.ctx, tlsCertificateLevelServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := p.srv.forward("Plan.Submit", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
p.srv.MeasureRPCRate("plan", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "plan", "submit"}, time.Now())
|
||||
|
||||
if args.Plan == nil {
|
||||
|
||||
@@ -37,14 +37,19 @@ func (s *ServiceRegistration) Upsert(
|
||||
args *structs.ServiceRegistrationUpsertRequest,
|
||||
reply *structs.ServiceRegistrationUpsertResponse) error {
|
||||
|
||||
authErr := s.srv.Authenticate(s.ctx, args)
|
||||
|
||||
// Ensure the connection was initiated by a client if TLS is used.
|
||||
if err := validateTLSCertificateLevel(s.srv, s.ctx, tlsCertificateLevelClient); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if done, err := s.srv.forward(structs.ServiceRegistrationUpsertRPCMethod, args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
s.srv.MeasureRPCRate("service_registration", structs.RateMetricWrite, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "service_registration", "upsert"}, time.Now())
|
||||
|
||||
// Nomad service registrations can only be used once all servers, in the
|
||||
|
||||
Reference in New Issue
Block a user