mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 01:15:43 +03:00
add metric for count of RPC requests (#15515)
Implement a metric for RPC requests with labels on the identity, so that administrators can monitor the source of requests within the cluster. This changeset demonstrates the change with the new `ACL.WhoAmI` RPC, and we'll wire up the remaining RPCs once we've threaded the new pre-forwarding authentication through the all. Note that metrics are measured after we forward but before we return any authentication error. This ensures that we only emit metrics on the server that actually serves the request. We'll perform rate limiting at the same place. Includes telemetry configuration to omit identity labels.
This commit is contained in:
@@ -472,6 +472,7 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
|
||||
// Setup telemetry related config
|
||||
conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval
|
||||
conf.DisableDispatchedJobSummaryMetrics = agentConfig.Telemetry.DisableDispatchedJobSummaryMetrics
|
||||
conf.DisableRPCRateMetricsLabels = agentConfig.Telemetry.DisableRPCRateMetricsLabels
|
||||
|
||||
if d, err := time.ParseDuration(agentConfig.Limits.RPCHandshakeTimeout); err != nil {
|
||||
return nil, fmt.Errorf("error parsing rpc_handshake_timeout: %v", err)
|
||||
|
||||
@@ -884,6 +884,12 @@ type Telemetry struct {
|
||||
// a small memory overhead.
|
||||
DisableDispatchedJobSummaryMetrics bool `hcl:"disable_dispatched_job_summary_metrics"`
|
||||
|
||||
// DisableRPCRateMetricsLabels drops the label for the identity of the
|
||||
// requester when publishing metrics on RPC rate on the server. This may be
|
||||
// useful to control metrics collection costs in environments where request
|
||||
// rate is well-controlled but cardinality of requesters is high.
|
||||
DisableRPCRateMetricsLabels bool `hcl:"disable_rpc_rate_metrics_labels"`
|
||||
|
||||
// Circonus: see https://github.com/circonus-labs/circonus-gometrics
|
||||
// for more details on the various configuration options.
|
||||
// Valid configuration combinations:
|
||||
@@ -2237,6 +2243,9 @@ func (a *Telemetry) Merge(b *Telemetry) *Telemetry {
|
||||
if b.DisableDispatchedJobSummaryMetrics {
|
||||
result.DisableDispatchedJobSummaryMetrics = b.DisableDispatchedJobSummaryMetrics
|
||||
}
|
||||
if b.DisableRPCRateMetricsLabels {
|
||||
result.DisableRPCRateMetricsLabels = b.DisableRPCRateMetricsLabels
|
||||
}
|
||||
|
||||
return &result
|
||||
}
|
||||
|
||||
@@ -286,6 +286,7 @@ func TestConfig_Merge(t *testing.T) {
|
||||
CirconusBrokerSelectTag: "dc:dc2",
|
||||
PrefixFilter: []string{"prefix1", "prefix2"},
|
||||
DisableDispatchedJobSummaryMetrics: true,
|
||||
DisableRPCRateMetricsLabels: true,
|
||||
FilterDefault: pointer.Of(false),
|
||||
},
|
||||
Client: &ClientConfig{
|
||||
@@ -1350,6 +1351,7 @@ func TestTelemetry_Parse(t *testing.T) {
|
||||
prefix_filter = ["+nomad.raft"]
|
||||
filter_default = false
|
||||
disable_dispatched_job_summary_metrics = true
|
||||
disable_rpc_rate_metrics_labels = true
|
||||
}`), 0600)
|
||||
require.NoError(err)
|
||||
|
||||
@@ -1360,6 +1362,7 @@ func TestTelemetry_Parse(t *testing.T) {
|
||||
require.False(*config.Telemetry.FilterDefault)
|
||||
require.Exactly([]string{"+nomad.raft"}, config.Telemetry.PrefixFilter)
|
||||
require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics)
|
||||
require.True(config.Telemetry.DisableRPCRateMetricsLabels)
|
||||
}
|
||||
|
||||
func TestEventBroker_Parse(t *testing.T) {
|
||||
|
||||
@@ -2011,15 +2011,15 @@ func (a *ACL) GetAuthMethods(
|
||||
// TODO: At some point we might want to give this an equivalent HTTP endpoint
|
||||
// once other Workload Identity work is solidified
|
||||
func (a *ACL) WhoAmI(args *structs.GenericRequest, reply *structs.ACLWhoAmIResponse) error {
|
||||
|
||||
authErr := a.srv.Authenticate(a.ctx, args)
|
||||
if done, err := a.srv.forward("ACL.WhoAmI", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
a.srv.MeasureRPCRate("acl", structs.RateMetricRead, args)
|
||||
if authErr != nil {
|
||||
return authErr
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.WhoAmI", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "acl", "whoami"}, time.Now())
|
||||
|
||||
reply.Identity = args.GetIdentity()
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -161,6 +162,7 @@ func TestAuthenticate_mTLS(t *testing.T) {
|
||||
expectTLSName string
|
||||
expectIP string
|
||||
expectErr string
|
||||
expectIDKey string
|
||||
sendFromPeer *Server
|
||||
}{
|
||||
{
|
||||
@@ -168,6 +170,7 @@ func TestAuthenticate_mTLS(t *testing.T) {
|
||||
tlsCfg: clientTLSCfg, // TODO: this is a mixed use cert
|
||||
testToken: rootToken,
|
||||
expectAccessor: rootAccessor,
|
||||
expectIDKey: fmt.Sprintf("token:%s", rootAccessor),
|
||||
},
|
||||
{
|
||||
name: "from peer to leader without token", // ex. Eval.Dequeue
|
||||
@@ -176,6 +179,7 @@ func TestAuthenticate_mTLS(t *testing.T) {
|
||||
expectAccessor: "anonymous",
|
||||
expectIP: follower.GetConfig().RPCAddr.IP.String(),
|
||||
sendFromPeer: follower,
|
||||
expectIDKey: "token:anonymous",
|
||||
},
|
||||
{
|
||||
// note: this test is somewhat bogus because under test all the
|
||||
@@ -185,6 +189,7 @@ func TestAuthenticate_mTLS(t *testing.T) {
|
||||
expectAccessor: "anonymous",
|
||||
expectTLSName: "regionFoo.nomad",
|
||||
expectIP: "127.0.0.1",
|
||||
expectIDKey: "token:anonymous",
|
||||
},
|
||||
{
|
||||
name: "invalid token",
|
||||
@@ -192,13 +197,7 @@ func TestAuthenticate_mTLS(t *testing.T) {
|
||||
testToken: uuid.Generate(),
|
||||
expectTLSName: "regionFoo.nomad",
|
||||
expectIP: follower.GetConfig().RPCAddr.IP.String(),
|
||||
},
|
||||
{
|
||||
name: "expired token",
|
||||
tlsCfg: clientTLSCfg,
|
||||
testToken: uuid.Generate(),
|
||||
expectTLSName: "regionFoo.nomad",
|
||||
expectIP: follower.GetConfig().RPCAddr.IP.String(),
|
||||
expectIDKey: "regionFoo.nomad:127.0.0.1",
|
||||
},
|
||||
{
|
||||
name: "from peer to leader with leader ACL", // ex. core job GC
|
||||
@@ -208,36 +207,40 @@ func TestAuthenticate_mTLS(t *testing.T) {
|
||||
expectAccessor: "leader",
|
||||
expectIP: follower.GetConfig().RPCAddr.IP.String(),
|
||||
sendFromPeer: follower,
|
||||
expectIDKey: "token:leader",
|
||||
},
|
||||
{
|
||||
name: "from client", // ex. Node.GetAllocs
|
||||
tlsCfg: clientTLSCfg,
|
||||
testToken: node.SecretID,
|
||||
expectClientID: node.ID,
|
||||
expectIDKey: fmt.Sprintf("client:%s", node.ID),
|
||||
},
|
||||
{
|
||||
name: "from failed workload", // ex. Variables.List
|
||||
tlsCfg: clientTLSCfg,
|
||||
testToken: claims1Token,
|
||||
expectErr: "allocation is terminal",
|
||||
expectErr: "rpc error: allocation is terminal",
|
||||
},
|
||||
{
|
||||
name: "from running workload", // ex. Variables.List
|
||||
tlsCfg: clientTLSCfg,
|
||||
testToken: claims2Token,
|
||||
expectAllocID: alloc2.ID,
|
||||
expectIDKey: fmt.Sprintf("alloc:%s", alloc2.ID),
|
||||
},
|
||||
{
|
||||
name: "valid user token",
|
||||
tlsCfg: clientTLSCfg,
|
||||
testToken: token1.SecretID,
|
||||
expectAccessor: token1.AccessorID,
|
||||
expectIDKey: fmt.Sprintf("token:%s", token1.AccessorID),
|
||||
},
|
||||
{
|
||||
name: "expired user token",
|
||||
tlsCfg: clientTLSCfg,
|
||||
testToken: token2.SecretID,
|
||||
expectErr: "ACL token expired",
|
||||
expectErr: "rpc error: ACL token expired",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -270,6 +273,11 @@ func TestAuthenticate_mTLS(t *testing.T) {
|
||||
must.NotNil(t, resp)
|
||||
must.NotNil(t, resp.Identity)
|
||||
|
||||
if tc.expectIDKey != "" {
|
||||
must.Eq(t, tc.expectIDKey, resp.Identity.String(),
|
||||
must.Sprintf("expected identity key for metrics to match"))
|
||||
}
|
||||
|
||||
if tc.expectAccessor != "" {
|
||||
must.NotNil(t, resp.Identity.ACLToken, must.Sprint("expected ACL token"))
|
||||
test.Eq(t, tc.expectAccessor, resp.Identity.ACLToken.AccessorID,
|
||||
|
||||
@@ -342,6 +342,12 @@ type Config struct {
|
||||
// publishing Job summary metrics
|
||||
DisableDispatchedJobSummaryMetrics bool
|
||||
|
||||
// DisableRPCRateMetricsLabels drops the label for the identity of the
|
||||
// requester when publishing metrics on RPC rate on the server. This may be
|
||||
// useful to control metrics collection costs in environments where request
|
||||
// rate is well-controlled but cardinality of requesters is high.
|
||||
DisableRPCRateMetricsLabels bool
|
||||
|
||||
// AutopilotConfig is used to apply the initial autopilot config when
|
||||
// bootstrapping.
|
||||
AutopilotConfig *structs.AutopilotConfig
|
||||
|
||||
23
nomad/rpc_rate_metrics.go
Normal file
23
nomad/rpc_rate_metrics.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"github.com/armon/go-metrics"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// MeasureRPCRate increments the appropriate rate metric for this endpoint,
|
||||
// with a label from the identity
|
||||
func (srv *Server) MeasureRPCRate(endpoint, op string, args structs.RequestWithIdentity) {
|
||||
identity := args.GetIdentity()
|
||||
|
||||
if !srv.config.ACLEnabled || identity == nil || srv.config.DisableRPCRateMetricsLabels {
|
||||
// If ACLs aren't enabled, we never have a sensible identity.
|
||||
// Or the administrator may have disabled the identity labels.
|
||||
metrics.IncrCounter([]string{"nomad", "rpc", endpoint, op}, 1)
|
||||
} else {
|
||||
metrics.IncrCounterWithLabels(
|
||||
[]string{"nomad", "rpc", endpoint, op}, 1,
|
||||
[]metrics.Label{{Name: "identity", Value: identity.String()}})
|
||||
}
|
||||
}
|
||||
@@ -190,6 +190,11 @@ const (
|
||||
// DefaultBlockingRPCQueryTime is the amount of time we block waiting for a change
|
||||
// if no time is specified. Previously we would wait the MaxBlockingRPCQueryTime.
|
||||
DefaultBlockingRPCQueryTime = 300 * time.Second
|
||||
|
||||
// RateMetric constants are used as labels in RPC rate metrics
|
||||
RateMetricRead = "read"
|
||||
RateMetricList = "list"
|
||||
RateMetricWrite = "write"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -477,7 +482,6 @@ type AuthenticatedIdentity struct {
|
||||
ACLToken *ACLToken
|
||||
Claims *IdentityClaims
|
||||
ClientID string
|
||||
ServerID string
|
||||
TLSName string
|
||||
RemoteIP net.IP
|
||||
}
|
||||
@@ -502,6 +506,22 @@ type RequestWithIdentity interface {
|
||||
GetIdentity() *AuthenticatedIdentity
|
||||
}
|
||||
|
||||
func (ai *AuthenticatedIdentity) String() string {
|
||||
if ai == nil {
|
||||
return "unauthenticated"
|
||||
}
|
||||
if ai.ACLToken != nil {
|
||||
return fmt.Sprintf("token:%s", ai.ACLToken.AccessorID)
|
||||
}
|
||||
if ai.Claims != nil {
|
||||
return fmt.Sprintf("alloc:%s", ai.Claims.AllocationID)
|
||||
}
|
||||
if ai.ClientID != "" {
|
||||
return fmt.Sprintf("client:%s", ai.ClientID)
|
||||
}
|
||||
return fmt.Sprintf("%s:%s", ai.TLSName, ai.RemoteIP.String())
|
||||
}
|
||||
|
||||
// QueryMeta allows a query response to include potentially
|
||||
// useful metadata about a query
|
||||
type QueryMeta struct {
|
||||
|
||||
Reference in New Issue
Block a user