From bcd5bbdad76cf47387c0f88846d48a2a9b77de03 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 24 Jan 2023 11:54:20 -0500 Subject: [PATCH] add metric for count of RPC requests (#15515) Implement a metric for RPC requests with labels on the identity, so that administrators can monitor the source of requests within the cluster. This changeset demonstrates the change with the new `ACL.WhoAmI` RPC, and we'll wire up the remaining RPCs once we've threaded the new pre-forwarding authentication through the all. Note that metrics are measured after we forward but before we return any authentication error. This ensures that we only emit metrics on the server that actually serves the request. We'll perform rate limiting at the same place. Includes telemetry configuration to omit identity labels. --- command/agent/agent.go | 1 + command/agent/config.go | 9 +++++++++ command/agent/config_test.go | 3 +++ nomad/acl_endpoint.go | 8 ++++---- nomad/acl_test.go | 26 +++++++++++++++++--------- nomad/config.go | 6 ++++++ nomad/rpc_rate_metrics.go | 23 +++++++++++++++++++++++ nomad/structs/structs.go | 22 +++++++++++++++++++++- 8 files changed, 84 insertions(+), 14 deletions(-) create mode 100644 nomad/rpc_rate_metrics.go diff --git a/command/agent/agent.go b/command/agent/agent.go index b955485f1..4141c28e2 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -472,6 +472,7 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { // Setup telemetry related config conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval conf.DisableDispatchedJobSummaryMetrics = agentConfig.Telemetry.DisableDispatchedJobSummaryMetrics + conf.DisableRPCRateMetricsLabels = agentConfig.Telemetry.DisableRPCRateMetricsLabels if d, err := time.ParseDuration(agentConfig.Limits.RPCHandshakeTimeout); err != nil { return nil, fmt.Errorf("error parsing rpc_handshake_timeout: %v", err) diff --git a/command/agent/config.go b/command/agent/config.go index 08db9e6a9..7e06a2879 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -884,6 +884,12 @@ type Telemetry struct { // a small memory overhead. DisableDispatchedJobSummaryMetrics bool `hcl:"disable_dispatched_job_summary_metrics"` + // DisableRPCRateMetricsLabels drops the label for the identity of the + // requester when publishing metrics on RPC rate on the server. This may be + // useful to control metrics collection costs in environments where request + // rate is well-controlled but cardinality of requesters is high. + DisableRPCRateMetricsLabels bool `hcl:"disable_rpc_rate_metrics_labels"` + // Circonus: see https://github.com/circonus-labs/circonus-gometrics // for more details on the various configuration options. // Valid configuration combinations: @@ -2237,6 +2243,9 @@ func (a *Telemetry) Merge(b *Telemetry) *Telemetry { if b.DisableDispatchedJobSummaryMetrics { result.DisableDispatchedJobSummaryMetrics = b.DisableDispatchedJobSummaryMetrics } + if b.DisableRPCRateMetricsLabels { + result.DisableRPCRateMetricsLabels = b.DisableRPCRateMetricsLabels + } return &result } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 56692baee..bba3e5ea3 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -286,6 +286,7 @@ func TestConfig_Merge(t *testing.T) { CirconusBrokerSelectTag: "dc:dc2", PrefixFilter: []string{"prefix1", "prefix2"}, DisableDispatchedJobSummaryMetrics: true, + DisableRPCRateMetricsLabels: true, FilterDefault: pointer.Of(false), }, Client: &ClientConfig{ @@ -1350,6 +1351,7 @@ func TestTelemetry_Parse(t *testing.T) { prefix_filter = ["+nomad.raft"] filter_default = false disable_dispatched_job_summary_metrics = true + disable_rpc_rate_metrics_labels = true }`), 0600) require.NoError(err) @@ -1360,6 +1362,7 @@ func TestTelemetry_Parse(t *testing.T) { require.False(*config.Telemetry.FilterDefault) require.Exactly([]string{"+nomad.raft"}, config.Telemetry.PrefixFilter) require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics) + require.True(config.Telemetry.DisableRPCRateMetricsLabels) } func TestEventBroker_Parse(t *testing.T) { diff --git a/nomad/acl_endpoint.go b/nomad/acl_endpoint.go index 9307b4b5b..456f667da 100644 --- a/nomad/acl_endpoint.go +++ b/nomad/acl_endpoint.go @@ -2011,15 +2011,15 @@ func (a *ACL) GetAuthMethods( // TODO: At some point we might want to give this an equivalent HTTP endpoint // once other Workload Identity work is solidified func (a *ACL) WhoAmI(args *structs.GenericRequest, reply *structs.ACLWhoAmIResponse) error { - authErr := a.srv.Authenticate(a.ctx, args) + if done, err := a.srv.forward("ACL.WhoAmI", args, args, reply); done { + return err + } + a.srv.MeasureRPCRate("acl", structs.RateMetricRead, args) if authErr != nil { return authErr } - if done, err := a.srv.forward("ACL.WhoAmI", args, args, reply); done { - return err - } defer metrics.MeasureSince([]string{"nomad", "acl", "whoami"}, time.Now()) reply.Identity = args.GetIdentity() diff --git a/nomad/acl_test.go b/nomad/acl_test.go index 994b270f9..ed8d366f1 100644 --- a/nomad/acl_test.go +++ b/nomad/acl_test.go @@ -1,6 +1,7 @@ package nomad import ( + "fmt" "path" "testing" "time" @@ -161,6 +162,7 @@ func TestAuthenticate_mTLS(t *testing.T) { expectTLSName string expectIP string expectErr string + expectIDKey string sendFromPeer *Server }{ { @@ -168,6 +170,7 @@ func TestAuthenticate_mTLS(t *testing.T) { tlsCfg: clientTLSCfg, // TODO: this is a mixed use cert testToken: rootToken, expectAccessor: rootAccessor, + expectIDKey: fmt.Sprintf("token:%s", rootAccessor), }, { name: "from peer to leader without token", // ex. Eval.Dequeue @@ -176,6 +179,7 @@ func TestAuthenticate_mTLS(t *testing.T) { expectAccessor: "anonymous", expectIP: follower.GetConfig().RPCAddr.IP.String(), sendFromPeer: follower, + expectIDKey: "token:anonymous", }, { // note: this test is somewhat bogus because under test all the @@ -185,6 +189,7 @@ func TestAuthenticate_mTLS(t *testing.T) { expectAccessor: "anonymous", expectTLSName: "regionFoo.nomad", expectIP: "127.0.0.1", + expectIDKey: "token:anonymous", }, { name: "invalid token", @@ -192,13 +197,7 @@ func TestAuthenticate_mTLS(t *testing.T) { testToken: uuid.Generate(), expectTLSName: "regionFoo.nomad", expectIP: follower.GetConfig().RPCAddr.IP.String(), - }, - { - name: "expired token", - tlsCfg: clientTLSCfg, - testToken: uuid.Generate(), - expectTLSName: "regionFoo.nomad", - expectIP: follower.GetConfig().RPCAddr.IP.String(), + expectIDKey: "regionFoo.nomad:127.0.0.1", }, { name: "from peer to leader with leader ACL", // ex. core job GC @@ -208,36 +207,40 @@ func TestAuthenticate_mTLS(t *testing.T) { expectAccessor: "leader", expectIP: follower.GetConfig().RPCAddr.IP.String(), sendFromPeer: follower, + expectIDKey: "token:leader", }, { name: "from client", // ex. Node.GetAllocs tlsCfg: clientTLSCfg, testToken: node.SecretID, expectClientID: node.ID, + expectIDKey: fmt.Sprintf("client:%s", node.ID), }, { name: "from failed workload", // ex. Variables.List tlsCfg: clientTLSCfg, testToken: claims1Token, - expectErr: "allocation is terminal", + expectErr: "rpc error: allocation is terminal", }, { name: "from running workload", // ex. Variables.List tlsCfg: clientTLSCfg, testToken: claims2Token, expectAllocID: alloc2.ID, + expectIDKey: fmt.Sprintf("alloc:%s", alloc2.ID), }, { name: "valid user token", tlsCfg: clientTLSCfg, testToken: token1.SecretID, expectAccessor: token1.AccessorID, + expectIDKey: fmt.Sprintf("token:%s", token1.AccessorID), }, { name: "expired user token", tlsCfg: clientTLSCfg, testToken: token2.SecretID, - expectErr: "ACL token expired", + expectErr: "rpc error: ACL token expired", }, } @@ -270,6 +273,11 @@ func TestAuthenticate_mTLS(t *testing.T) { must.NotNil(t, resp) must.NotNil(t, resp.Identity) + if tc.expectIDKey != "" { + must.Eq(t, tc.expectIDKey, resp.Identity.String(), + must.Sprintf("expected identity key for metrics to match")) + } + if tc.expectAccessor != "" { must.NotNil(t, resp.Identity.ACLToken, must.Sprint("expected ACL token")) test.Eq(t, tc.expectAccessor, resp.Identity.ACLToken.AccessorID, diff --git a/nomad/config.go b/nomad/config.go index 2bcb8714b..7df5335e0 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -342,6 +342,12 @@ type Config struct { // publishing Job summary metrics DisableDispatchedJobSummaryMetrics bool + // DisableRPCRateMetricsLabels drops the label for the identity of the + // requester when publishing metrics on RPC rate on the server. This may be + // useful to control metrics collection costs in environments where request + // rate is well-controlled but cardinality of requesters is high. + DisableRPCRateMetricsLabels bool + // AutopilotConfig is used to apply the initial autopilot config when // bootstrapping. AutopilotConfig *structs.AutopilotConfig diff --git a/nomad/rpc_rate_metrics.go b/nomad/rpc_rate_metrics.go new file mode 100644 index 000000000..ce9aaf621 --- /dev/null +++ b/nomad/rpc_rate_metrics.go @@ -0,0 +1,23 @@ +package nomad + +import ( + "github.com/armon/go-metrics" + + "github.com/hashicorp/nomad/nomad/structs" +) + +// MeasureRPCRate increments the appropriate rate metric for this endpoint, +// with a label from the identity +func (srv *Server) MeasureRPCRate(endpoint, op string, args structs.RequestWithIdentity) { + identity := args.GetIdentity() + + if !srv.config.ACLEnabled || identity == nil || srv.config.DisableRPCRateMetricsLabels { + // If ACLs aren't enabled, we never have a sensible identity. + // Or the administrator may have disabled the identity labels. + metrics.IncrCounter([]string{"nomad", "rpc", endpoint, op}, 1) + } else { + metrics.IncrCounterWithLabels( + []string{"nomad", "rpc", endpoint, op}, 1, + []metrics.Label{{Name: "identity", Value: identity.String()}}) + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c0aa2659b..0f4dcc409 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -190,6 +190,11 @@ const ( // DefaultBlockingRPCQueryTime is the amount of time we block waiting for a change // if no time is specified. Previously we would wait the MaxBlockingRPCQueryTime. DefaultBlockingRPCQueryTime = 300 * time.Second + + // RateMetric constants are used as labels in RPC rate metrics + RateMetricRead = "read" + RateMetricList = "list" + RateMetricWrite = "write" ) var ( @@ -477,7 +482,6 @@ type AuthenticatedIdentity struct { ACLToken *ACLToken Claims *IdentityClaims ClientID string - ServerID string TLSName string RemoteIP net.IP } @@ -502,6 +506,22 @@ type RequestWithIdentity interface { GetIdentity() *AuthenticatedIdentity } +func (ai *AuthenticatedIdentity) String() string { + if ai == nil { + return "unauthenticated" + } + if ai.ACLToken != nil { + return fmt.Sprintf("token:%s", ai.ACLToken.AccessorID) + } + if ai.Claims != nil { + return fmt.Sprintf("alloc:%s", ai.Claims.AllocationID) + } + if ai.ClientID != "" { + return fmt.Sprintf("client:%s", ai.ClientID) + } + return fmt.Sprintf("%s:%s", ai.TLSName, ai.RemoteIP.String()) +} + // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct {