From 76fb3eb9a1ca5b5f1ffb07e6aba94afac7a4a6ef Mon Sep 17 00:00:00 2001 From: Nikita Eliseev Date: Wed, 2 Apr 2025 17:58:23 +0300 Subject: [PATCH] rpc: added configuration for yamux session (#25466) Fixes: https://github.com/hashicorp/nomad/issues/25380 --- .changelog/25466.txt | 3 + client/client.go | 2 +- client/config/config.go | 5 + client/testing.go | 3 +- command/agent/agent.go | 41 ++++++ command/agent/command.go | 4 + command/agent/config.go | 128 +++++++++++++++++++ command/agent/config_parse.go | 5 + command/agent/config_parse_test.go | 16 +++ command/agent/testdata/sample1/sample0.json | 8 ++ command/agent/testdata/sample1/sample1.json | 8 ++ command/agent/testdata/sample1/sample2.hcl | 9 ++ helper/pool/pool.go | 10 +- helper/pool/pool_test.go | 3 +- nomad/config.go | 5 + nomad/rpc.go | 9 +- nomad/rpc_test.go | 2 +- nomad/server.go | 2 +- website/content/docs/configuration/index.mdx | 7 + website/content/docs/configuration/rpc.mdx | 54 ++++++++ website/data/docs-nav-data.json | 4 + 21 files changed, 318 insertions(+), 10 deletions(-) create mode 100644 .changelog/25466.txt create mode 100644 website/content/docs/configuration/rpc.mdx diff --git a/.changelog/25466.txt b/.changelog/25466.txt new file mode 100644 index 000000000..44dcb7d6b --- /dev/null +++ b/.changelog/25466.txt @@ -0,0 +1,3 @@ +```release-note:improvement +rpc: Added ability to configure yamux session parameters +``` diff --git a/client/client.go b/client/client.go index f0d77bf77..391e84e24 100644 --- a/client/client.go +++ b/client/client.go @@ -375,7 +375,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie consulProxiesFunc: consulProxiesFunc, consulServices: consulServices, start: time.Now(), - connPool: pool.NewPool(logger, clientRPCCache, clientMaxStreams, tlsWrap), + connPool: pool.NewPool(logger, clientRPCCache, clientMaxStreams, tlsWrap, cfg.RPCSessionConfig), tlsWrap: tlsWrap, streamingRpcs: structs.NewStreamingRpcRegistry(), logger: logger, diff --git a/client/config/config.go b/client/config/config.go index c12f860e2..ae6af7e30 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -30,6 +30,7 @@ import ( structsc "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/version" + "github.com/hashicorp/yamux" ) var ( @@ -271,6 +272,9 @@ type Config struct { // place, and a small jitter is applied to avoid a thundering herd. RPCHoldTimeout time.Duration + // RPCSessionConfig configures yamux multiplex + RPCSessionConfig *yamux.Config + // PluginLoader is used to load plugins. PluginLoader loader.PluginCatalog @@ -895,6 +899,7 @@ func DefaultConfig() *Config { DisableRemoteExec: false, TemplateConfig: DefaultTemplateConfig(), RPCHoldTimeout: 5 * time.Second, + RPCSessionConfig: yamux.DefaultConfig(), CNIPath: "/opt/cni/bin", CNIConfigDir: "/opt/cni/config", CNIInterfacePrefix: "eth", diff --git a/client/testing.go b/client/testing.go index 967564720..7db65d255 100644 --- a/client/testing.go +++ b/client/testing.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/nomad/helper/pluginutils/singleton" "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/yamux" testing "github.com/mitchellh/go-testing-interface" "github.com/shoenig/test/must" ) @@ -118,7 +119,7 @@ func TestRPCOnlyClient(t testing.T, cb func(c *config.Config), srvAddr net.Addr, } client.heartbeatStop = newHeartbeatStop( client.getAllocRunner, time.Second, client.logger, client.shutdownCh) - client.connPool = pool.NewPool(testlog.HCLogger(t), 10*time.Second, 10, nil) + client.connPool = pool.NewPool(testlog.HCLogger(t), 10*time.Second, 10, nil, yamux.DefaultConfig()) client.init() cancelFunc := func() { diff --git a/command/agent/agent.go b/command/agent/agent.go index c8606f574..fa8746d2d 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -40,6 +40,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/raft" + "github.com/hashicorp/yamux" ) const ( @@ -528,6 +529,26 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { conf.DefaultSchedulerConfig = *agentConfig.Server.DefaultSchedulerConfig } + // handle rpc yamux configuration + conf.RPCSessionConfig = yamux.DefaultConfig() + if agentConfig.RPC != nil { + if agentConfig.RPC.AcceptBacklog > 0 { + conf.RPCSessionConfig.AcceptBacklog = agentConfig.RPC.AcceptBacklog + } + if agentConfig.RPC.KeepAliveInterval > 0 { + conf.RPCSessionConfig.KeepAliveInterval = agentConfig.RPC.KeepAliveInterval + } + if agentConfig.RPC.ConnectionWriteTimeout > 0 { + conf.RPCSessionConfig.ConnectionWriteTimeout = agentConfig.RPC.ConnectionWriteTimeout + } + if agentConfig.RPC.StreamCloseTimeout > 0 { + conf.RPCSessionConfig.StreamCloseTimeout = agentConfig.RPC.StreamCloseTimeout + } + if agentConfig.RPC.StreamOpenTimeout > 0 { + conf.RPCSessionConfig.StreamOpenTimeout = agentConfig.RPC.StreamOpenTimeout + } + } + // Set the TLS config conf.TLSConfig = agentConfig.TLSConfig @@ -757,6 +778,26 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { conf.NetworkInterface = agentConfig.Client.NetworkInterface } + // handle rpc yamux configuration + conf.RPCSessionConfig = yamux.DefaultConfig() + if agentConfig.RPC != nil { + if agentConfig.RPC.AcceptBacklog > 0 { + conf.RPCSessionConfig.AcceptBacklog = agentConfig.RPC.AcceptBacklog + } + if agentConfig.RPC.KeepAliveInterval > 0 { + conf.RPCSessionConfig.KeepAliveInterval = agentConfig.RPC.KeepAliveInterval + } + if agentConfig.RPC.ConnectionWriteTimeout > 0 { + conf.RPCSessionConfig.ConnectionWriteTimeout = agentConfig.RPC.ConnectionWriteTimeout + } + if agentConfig.RPC.StreamCloseTimeout > 0 { + conf.RPCSessionConfig.StreamCloseTimeout = agentConfig.RPC.StreamCloseTimeout + } + if agentConfig.RPC.StreamOpenTimeout > 0 { + conf.RPCSessionConfig.StreamOpenTimeout = agentConfig.RPC.StreamOpenTimeout + } + } + conf.PreferredAddressFamily = agentConfig.Client.PreferredAddressFamily conf.ChrootEnv = agentConfig.Client.ChrootEnv diff --git a/command/agent/command.go b/command/agent/command.go index 977803866..4d72029b0 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -480,6 +480,10 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool { ) return false } + if err := config.RPC.Validate(); err != nil { + c.Ui.Error(fmt.Sprintf("rpc block invalid: %v)", err)) + return false + } if !config.DevMode { // Ensure that we have the directories we need to run. diff --git a/command/agent/config.go b/command/agent/config.go index 1e2dbb3c5..1a5f86f1b 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -109,6 +109,9 @@ type Config struct { // Server has our server related settings Server *ServerConfig `hcl:"server"` + // RPC has yamux multiplex settings + RPC *RPCConfig `hcl:"rpc"` + // ACL has our acl related settings ACL *ACLConfig `hcl:"acl"` @@ -769,6 +772,112 @@ func (s *ServerConfig) Copy() *ServerConfig { return &ns } +// RPCConfig allows for tunable yamux multiplex configuration +type RPCConfig struct { + // AcceptBacklog is used to limit how many streams may be + // waiting an accept. + AcceptBacklog int `hcl:"accept_backlog,optional"` + + // KeepAliveInterval is how often to perform the keep alive + KeepAliveInterval time.Duration + KeepAliveIntervalHCL string `hcl:"keep_alive_interval,optional"` + + // ConnectionWriteTimeout is meant to be a "safety valve" timeout after + // we which will suspect a problem with the underlying connection and + // close it. This is only applied to writes, where's there's generally + // an expectation that things will move along quickly. + ConnectionWriteTimeout time.Duration + ConnectionWriteTimeoutHCL string `hcl:"connection_write_timeout,optional"` + + // StreamOpenTimeout is the maximum amount of time that a stream will + // be allowed to remain in pending state while waiting for an ack from the peer. + // Once the timeout is reached the session will be gracefully closed. + // A zero value disables the StreamOpenTimeout allowing unbounded + // blocking on OpenStream calls. + StreamOpenTimeout time.Duration + StreamOpenTimeoutHCL string `hcl:"stream_open_timeout,optional"` + + // StreamCloseTimeout is the maximum time that a stream will allowed to + // be in a half-closed state when `Close` is called before forcibly + // closing the connection. Forcibly closed connections will empty the + // receive buffer, drop any future packets received for that stream, + // and send a RST to the remote side. + StreamCloseTimeout time.Duration + StreamCloseTimeoutHCL string `hcl:"stream_close_timeout,optional"` +} + +func (r *RPCConfig) Copy() *RPCConfig { + if r == nil { + return nil + } + + nr := *r + return &nr +} + +func (r *RPCConfig) Merge(rpc *RPCConfig) *RPCConfig { + if r == nil { + return rpc + } + + result := *r + + if rpc == nil { + return &result + } + + if rpc.AcceptBacklog > 0 { + result.AcceptBacklog = rpc.AcceptBacklog + } + if rpc.KeepAliveIntervalHCL != "" { + result.KeepAliveIntervalHCL = rpc.KeepAliveIntervalHCL + } + if rpc.KeepAliveInterval > 0 { + result.KeepAliveInterval = rpc.KeepAliveInterval + } + if rpc.ConnectionWriteTimeoutHCL != "" { + result.ConnectionWriteTimeoutHCL = rpc.ConnectionWriteTimeoutHCL + } + if rpc.ConnectionWriteTimeout > 0 { + result.ConnectionWriteTimeout = rpc.ConnectionWriteTimeout + } + if rpc.StreamOpenTimeoutHCL != "" { + result.StreamOpenTimeoutHCL = rpc.StreamOpenTimeoutHCL + } + if rpc.StreamOpenTimeout > 0 { + result.StreamOpenTimeout = rpc.StreamOpenTimeout + } + if rpc.StreamCloseTimeoutHCL != "" { + result.StreamCloseTimeoutHCL = rpc.StreamCloseTimeoutHCL + } + if rpc.StreamCloseTimeout > 0 { + result.StreamCloseTimeout = rpc.StreamCloseTimeout + } + return &result +} + +func (r *RPCConfig) Validate() error { + if r != nil { + if r.AcceptBacklog < 0 { + return errors.New("rcp.accept_backlog interval must be greater than zero") + } + if r.KeepAliveInterval < 0 { + return errors.New("rcp.keep_alive_interval must be greater than zero") + } + if r.ConnectionWriteTimeout < 0 { + return errors.New("rcp.connection_write_timeout must be greater than zero") + } + if r.StreamCloseTimeout < 0 { + return errors.New("rcp.stream_close_timeout must be greater than zero") + } + if r.StreamOpenTimeout < 0 { + return errors.New("rcp.stream_open_timeout must be greater than zero") + } + } + + return nil +} + // RaftBoltConfig is used in servers to configure parameters of the boltdb // used for raft consensus. type RaftBoltConfig struct { @@ -1401,6 +1510,17 @@ func DefaultConfig() *Config { Consuls: []*config.ConsulConfig{config.DefaultConsulConfig()}, Vaults: []*config.VaultConfig{config.DefaultVaultConfig()}, UI: config.DefaultUIConfig(), + RPC: &RPCConfig{ + AcceptBacklog: 256, + KeepAliveInterval: 30 * time.Second, + KeepAliveIntervalHCL: "30s", + ConnectionWriteTimeout: 10 * time.Second, + ConnectionWriteTimeoutHCL: "10s", + StreamOpenTimeout: 75 * time.Second, + StreamOpenTimeoutHCL: "75s", + StreamCloseTimeout: 5 * time.Minute, + StreamCloseTimeoutHCL: "5m", + }, Client: &ClientConfig{ Enabled: false, NodePool: structs.NodePoolDefault, @@ -1614,6 +1734,14 @@ func (c *Config) Merge(b *Config) *Config { result.Server = result.Server.Merge(b.Server) } + // Apply the rpc mux config + if result.RPC == nil && b.RPC != nil { + rpcMux := *b.RPC + result.RPC = &rpcMux + } else if b.RPC != nil { + result.RPC = result.RPC.Merge(b.RPC) + } + // Apply the acl config if result.ACL == nil && b.ACL != nil { server := *b.ACL diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 7ee8d8e53..a6448c35e 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -57,6 +57,7 @@ func ParseConfigFile(path string) (*Config, error) { ServerJoin: &ServerJoin{}, }, ACL: &ACLConfig{}, + RPC: &RPCConfig{}, Audit: &config.AuditConfig{}, Consuls: []*config.ConsulConfig{}, Autopilot: &config.AutopilotConfig{}, @@ -181,6 +182,10 @@ func ParseConfigFile(path string) (*Config, error) { }, {"reporting.export_interval", &c.Reporting.ExportInterval, &c.Reporting.ExportIntervalHCL, nil}, + {"rpc.keep_alive_interval", &c.RPC.KeepAliveInterval, &c.RPC.KeepAliveIntervalHCL, nil}, + {"rpc.connection_write_timeout", &c.RPC.ConnectionWriteTimeout, &c.RPC.ConnectionWriteTimeoutHCL, nil}, + {"rpc.stream_open_timeout", &c.RPC.StreamOpenTimeout, &c.RPC.StreamOpenTimeoutHCL, nil}, + {"rpc.stream_close_timeout", &c.RPC.StreamCloseTimeout, &c.RPC.StreamCloseTimeoutHCL, nil}, } // Parse durations for Consul and Vault config blocks if provided. diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 903cf76f1..69d9056c6 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -43,6 +43,7 @@ var basicConfig = &Config{ RPC: "127.0.0.3", Serf: "127.0.0.4", }, + RPC: &RPCConfig{}, Client: &ClientConfig{ Enabled: true, StateDir: "/tmp/client-state", @@ -588,6 +589,9 @@ func (c *Config) addDefaults() { if c.ACL == nil { c.ACL = &ACLConfig{} } + if c.RPC == nil { + c.RPC = &RPCConfig{} + } if c.Audit == nil { c.Audit = &config.AuditConfig{} } @@ -705,6 +709,7 @@ var sample0 = &Config{ ACL: &ACLConfig{ Enabled: true, }, + RPC: &RPCConfig{}, Audit: &config.AuditConfig{ Enabled: pointer.Of(true), Sinks: []*config.AuditSink{ @@ -813,6 +818,17 @@ var sample1 = &Config{ ACL: &ACLConfig{ Enabled: true, }, + RPC: &RPCConfig{ + AcceptBacklog: 256, + KeepAliveInterval: 30 * time.Second, + KeepAliveIntervalHCL: "30s", + ConnectionWriteTimeout: 10 * time.Second, + ConnectionWriteTimeoutHCL: "10s", + StreamOpenTimeout: 75 * time.Second, + StreamOpenTimeoutHCL: "75s", + StreamCloseTimeout: 5 * time.Minute, + StreamCloseTimeoutHCL: "5m", + }, Audit: &config.AuditConfig{ Enabled: pointer.Of(true), Sinks: []*config.AuditSink{ diff --git a/command/agent/testdata/sample1/sample0.json b/command/agent/testdata/sample1/sample0.json index 1a23c7378..34cfc9329 100644 --- a/command/agent/testdata/sample1/sample0.json +++ b/command/agent/testdata/sample1/sample0.json @@ -19,6 +19,14 @@ "kms_key_id": "alias/kms-nomad-keyring" } }, + "rpc": { + "accept_backlog": 256, + "keep_alive_interval": "30s", + "connection_write_timeout": "10s", + "max_stream_window_size": 262144, + "stream_open_timeout": "75s", + "stream_close_timeout": "5m" + }, "leave_on_interrupt": true, "leave_on_terminate": true, "log_level": "INFO", diff --git a/command/agent/testdata/sample1/sample1.json b/command/agent/testdata/sample1/sample1.json index f147acfb6..3a06e26c9 100644 --- a/command/agent/testdata/sample1/sample1.json +++ b/command/agent/testdata/sample1/sample1.json @@ -18,6 +18,14 @@ "leave_on_terminate": true, "log_level": "INFO", "region": "global", + "rpc": { + "accept_backlog": 256, + "keep_alive_interval": "30s", + "connection_write_timeout": "10s", + "max_stream_window_size": 262144, + "stream_open_timeout": "75s", + "stream_close_timeout": "5m" + }, "server": { "bootstrap_expect": 3, "enabled": true diff --git a/command/agent/testdata/sample1/sample2.hcl b/command/agent/testdata/sample1/sample2.hcl index 3351dc3e9..05a16cc1d 100644 --- a/command/agent/testdata/sample1/sample2.hcl +++ b/command/agent/testdata/sample1/sample2.hcl @@ -17,6 +17,15 @@ "token" = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" } +rpc { + accept_backlog = 256 + keep_alive_interval = "30s" + connection_write_timeout = "10s" + max_stream_window_size = 262144 + stream_open_timeout = "75s" + stream_close_timeout = "5m" +} + vault = { enabled = true } diff --git a/helper/pool/pool.go b/helper/pool/pool.go index 14a6aadfb..fdac1b053 100644 --- a/helper/pool/pool.go +++ b/helper/pool/pool.go @@ -197,6 +197,9 @@ type ConnPool struct { // TLS wrapper tlsWrap tlsutil.RegionWrapper + // yamuxCfg is used for setup rpc client + yamuxCfg *yamux.Config + // Used to indicate the pool is shutdown shutdown bool shutdownCh chan struct{} @@ -211,7 +214,9 @@ type ConnPool struct { // Set maxTime to 0 to disable reaping. maxStreams is used to control // the number of idle streams allowed. // If TLS settings are provided outgoing connections use TLS. -func NewPool(logger hclog.Logger, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.RegionWrapper) *ConnPool { +func NewPool( + logger hclog.Logger, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.RegionWrapper, yamuxCfg *yamux.Config, +) *ConnPool { pool := &ConnPool{ logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), maxTime: maxTime, @@ -219,6 +224,7 @@ func NewPool(logger hclog.Logger, maxTime time.Duration, maxStreams int, tlsWrap pool: make(map[string]*Conn), limiter: make(map[string]chan struct{}), tlsWrap: tlsWrap, + yamuxCfg: yamuxCfg, shutdownCh: make(chan struct{}), } if maxTime > 0 { @@ -389,7 +395,7 @@ func (p *ConnPool) getNewConn(region string, addr net.Addr) (*Conn, error) { } // Setup the logger - conf := yamux.DefaultConfig() + conf := p.yamuxCfg conf.LogOutput = nil conf.Logger = p.logger diff --git a/helper/pool/pool_test.go b/helper/pool/pool_test.go index 529c19d0d..1abef259c 100644 --- a/helper/pool/pool_test.go +++ b/helper/pool/pool_test.go @@ -11,12 +11,13 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/yamux" "github.com/shoenig/test/must" ) func newTestPool(t *testing.T) *ConnPool { l := testlog.HCLogger(t) - p := NewPool(l, 1*time.Minute, 10, nil) + p := NewPool(l, 1*time.Minute, 10, nil, yamux.DefaultConfig()) return p } diff --git a/nomad/config.go b/nomad/config.go index 3a17623e8..61c683230 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" + "github.com/hashicorp/yamux" ) const ( @@ -403,6 +404,9 @@ type Config struct { // connections from a single IP address. nil/0 means no limit. RPCMaxConnsPerClient int + // RPCSessionConfig configures the yamux session configuration for RPC + RPCSessionConfig *yamux.Config + // LicenseConfig stores information about the Enterprise license loaded for the server. LicenseConfig *LicenseConfig @@ -615,6 +619,7 @@ func DefaultConfig() *Config { VaultConfigs: map[string]*config.VaultConfig{ structs.VaultDefaultCluster: config.DefaultVaultConfig()}, RPCHoldTimeout: 5 * time.Second, + RPCSessionConfig: yamux.DefaultConfig(), StatsCollectionInterval: 1 * time.Minute, TLSConfig: &config.TLSConfig{}, ReplicationBackoff: 30 * time.Second, diff --git a/nomad/rpc.go b/nomad/rpc.go index 455c7162e..efa54ca64 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -106,6 +106,9 @@ type RPCContext struct { // NodeID marks the NodeID that initiated the connection. NodeID string + + // SessionConfig allowing to change default yamux configs value for advanced configuration + SessionConfig *yamux.Config } func (ctx *RPCContext) IsTLS() bool { @@ -218,7 +221,7 @@ func (r *rpcHandler) listen(ctx context.Context) { conn = connlimit.Wrap(conn, free) } - go r.handleConn(ctx, conn, &RPCContext{Conn: conn}) + go r.handleConn(ctx, conn, &RPCContext{Conn: conn, SessionConfig: r.srv.GetConfig().RPCSessionConfig}) metrics.IncrCounter([]string{"nomad", "rpc", "accept_conn"}, 1) } } @@ -406,7 +409,7 @@ func (r *rpcHandler) handleMultiplex(ctx context.Context, conn net.Conn, rpcCtx conn.Close() }() - conf := yamux.DefaultConfig() + conf := rpcCtx.SessionConfig conf.LogOutput = nil conf.Logger = r.gologger server, err := yamux.Server(conn, conf) @@ -515,7 +518,7 @@ func (r *rpcHandler) handleMultiplexV2(ctx context.Context, conn net.Conn, rpcCt conn.Close() }() - conf := yamux.DefaultConfig() + conf := rpcCtx.SessionConfig conf.LogOutput = nil conf.Logger = r.gologger server, err := yamux.Server(conn, conf) diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index caf3839c8..1cae3b1e3 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -530,7 +530,7 @@ func TestRPC_handleMultiplexV2(t *testing.T) { // Start the handler doneCh := make(chan struct{}) go func() { - s.handleConn(context.Background(), p2, &RPCContext{Conn: p2}) + s.handleConn(context.Background(), p2, &RPCContext{Conn: p2, SessionConfig: yamux.DefaultConfig()}) close(doneCh) }() diff --git a/nomad/server.go b/nomad/server.go index b74dd787f..70c1eda2a 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -345,7 +345,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc s := &Server{ config: config, consulCatalog: consulCatalog, - connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap), + connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap, config.RPCSessionConfig), logger: logger, tlsWrap: tlsWrap, rpcServer: rpc.NewServer(), diff --git a/website/content/docs/configuration/index.mdx b/website/content/docs/configuration/index.mdx index 9de3ecefd..fce0cb238 100644 --- a/website/content/docs/configuration/index.mdx +++ b/website/content/docs/configuration/index.mdx @@ -327,6 +327,12 @@ testing. no local state remains from the previous region before running the agent again. +- `rpc` `(`[`RPC`]`: nil)` - Specifies configuration which is specific to RPC. + We strongly recommend that you do not configure RPC values. Use the default + values, which have been production-proven on clusters of thousands of nodes. + You should only configure the `rpc` block if you have a specific reason to + believe it will improve your particular use case. + - `sentinel` `(`[`Sentinel`]`: nil)` - Specifies configuration for Sentinel policies. @@ -390,6 +396,7 @@ http_api_response_headers { ``` [`acl`]: /nomad/docs/configuration/acl 'Nomad Agent ACL Configuration' +[`rpc`]: /nomad/docs/configuration/rpc [`audit`]: /nomad/docs/configuration/audit 'Nomad Agent Audit Logging Configuration' [`client`]: /nomad/docs/configuration/client 'Nomad Agent client Configuration' [`consul`]: /nomad/docs/configuration/consul 'Nomad Agent consul Configuration' diff --git a/website/content/docs/configuration/rpc.mdx b/website/content/docs/configuration/rpc.mdx new file mode 100644 index 000000000..e14204e85 --- /dev/null +++ b/website/content/docs/configuration/rpc.mdx @@ -0,0 +1,54 @@ +--- +layout: docs +page_title: rpc Block in Agent Configuration +description: >- + Configure Nomad's RPC behavior in the `rpc` block of a Nomad agent configuration. Modify RPC session configuration, change stream accept backlog, keepalive interval, and open stream timeouts. +--- + +# rpc Block in Agent Configuration + + + +This page provides reference information for configuring Nomad's RPC behavior in +the `rpc` block of a Nomad agent configuration. Modify RPC session +configuration, stream accept backlog, keepalive interval, and open stream +timeouts. We strongly recommend leaving these values unconfigured to use the +default values, which have been production-proven on clusters of thousands of +nodes. You should only configure the `rpc` block if you have specific reason to +believe it will improve your specific use case. + +```hcl +rpc { + accept_backlog = 256 + keep_alive_interval = "30s" + connection_write_timeout = "10s" + max_stream_window_size = 262144 + stream_open_timeout = "75s" + stream_close_timeout = "5m" +} +``` + +## `rpc` Parameters + +- `accept_backlog` `(int: 256)` - Limits how many RPC streams + (requests) can be waiting for acceptance by the RPC server. + +- `keep_alive_interval` `(string: 30s)` - How often to perform the keep alive + +- `connection_write_timeout` `(string: 10s)` - Safety valve timeout after which + you should suspect a problem with the underlying connection and close it. This + is only applied to writes, where there is generally an expectation that things + move along quickly. + +- `max_stream_window_size` `(int: 262144)` - Controls the maximum window size + allowed for a stream, in bytes. + +- `stream_open_timeout` `(string: 75s)` - Maximum amount of time that a stream + remains in pending state while waiting for an ACK from the peer. Once the + timeout is reached, Nomad gracefully closes the session. A zero value disables + the stream open timeout, allowing unbounded blocking on open stream calls. + +- `stream_close_timeout` `(string: 5m)` - When `close` is called, maximum time + that a stream is in a half-closed state before forcibly closing the + connection. Forcibly closed connections empty the receive buffer, drop any + future packets received for that stream, and send an RST to the remote side. diff --git a/website/data/docs-nav-data.json b/website/data/docs-nav-data.json index 8055baec1..62b05fe64 100644 --- a/website/data/docs-nav-data.json +++ b/website/data/docs-nav-data.json @@ -325,6 +325,10 @@ "title": "reporting", "path": "configuration/reporting" }, + { + "title": "rpc", + "path": "configuration/rpc" + }, { "title": "search", "path": "configuration/search"