rpc: added configuration for yamux session (#25466)

Fixes: https://github.com/hashicorp/nomad/issues/25380
This commit is contained in:
Nikita Eliseev
2025-04-02 17:58:23 +03:00
committed by GitHub
parent 1a1ccec8b2
commit 76fb3eb9a1
21 changed files with 318 additions and 10 deletions

View File

@@ -40,6 +40,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/raft"
"github.com/hashicorp/yamux"
)
const (
@@ -528,6 +529,26 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
conf.DefaultSchedulerConfig = *agentConfig.Server.DefaultSchedulerConfig
}
// handle rpc yamux configuration
conf.RPCSessionConfig = yamux.DefaultConfig()
if agentConfig.RPC != nil {
if agentConfig.RPC.AcceptBacklog > 0 {
conf.RPCSessionConfig.AcceptBacklog = agentConfig.RPC.AcceptBacklog
}
if agentConfig.RPC.KeepAliveInterval > 0 {
conf.RPCSessionConfig.KeepAliveInterval = agentConfig.RPC.KeepAliveInterval
}
if agentConfig.RPC.ConnectionWriteTimeout > 0 {
conf.RPCSessionConfig.ConnectionWriteTimeout = agentConfig.RPC.ConnectionWriteTimeout
}
if agentConfig.RPC.StreamCloseTimeout > 0 {
conf.RPCSessionConfig.StreamCloseTimeout = agentConfig.RPC.StreamCloseTimeout
}
if agentConfig.RPC.StreamOpenTimeout > 0 {
conf.RPCSessionConfig.StreamOpenTimeout = agentConfig.RPC.StreamOpenTimeout
}
}
// Set the TLS config
conf.TLSConfig = agentConfig.TLSConfig
@@ -757,6 +778,26 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
conf.NetworkInterface = agentConfig.Client.NetworkInterface
}
// handle rpc yamux configuration
conf.RPCSessionConfig = yamux.DefaultConfig()
if agentConfig.RPC != nil {
if agentConfig.RPC.AcceptBacklog > 0 {
conf.RPCSessionConfig.AcceptBacklog = agentConfig.RPC.AcceptBacklog
}
if agentConfig.RPC.KeepAliveInterval > 0 {
conf.RPCSessionConfig.KeepAliveInterval = agentConfig.RPC.KeepAliveInterval
}
if agentConfig.RPC.ConnectionWriteTimeout > 0 {
conf.RPCSessionConfig.ConnectionWriteTimeout = agentConfig.RPC.ConnectionWriteTimeout
}
if agentConfig.RPC.StreamCloseTimeout > 0 {
conf.RPCSessionConfig.StreamCloseTimeout = agentConfig.RPC.StreamCloseTimeout
}
if agentConfig.RPC.StreamOpenTimeout > 0 {
conf.RPCSessionConfig.StreamOpenTimeout = agentConfig.RPC.StreamOpenTimeout
}
}
conf.PreferredAddressFamily = agentConfig.Client.PreferredAddressFamily
conf.ChrootEnv = agentConfig.Client.ChrootEnv

View File

@@ -480,6 +480,10 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool {
)
return false
}
if err := config.RPC.Validate(); err != nil {
c.Ui.Error(fmt.Sprintf("rpc block invalid: %v)", err))
return false
}
if !config.DevMode {
// Ensure that we have the directories we need to run.

View File

@@ -109,6 +109,9 @@ type Config struct {
// Server has our server related settings
Server *ServerConfig `hcl:"server"`
// RPC has yamux multiplex settings
RPC *RPCConfig `hcl:"rpc"`
// ACL has our acl related settings
ACL *ACLConfig `hcl:"acl"`
@@ -769,6 +772,112 @@ func (s *ServerConfig) Copy() *ServerConfig {
return &ns
}
// RPCConfig allows for tunable yamux multiplex configuration
type RPCConfig struct {
// AcceptBacklog is used to limit how many streams may be
// waiting an accept.
AcceptBacklog int `hcl:"accept_backlog,optional"`
// KeepAliveInterval is how often to perform the keep alive
KeepAliveInterval time.Duration
KeepAliveIntervalHCL string `hcl:"keep_alive_interval,optional"`
// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
// we which will suspect a problem with the underlying connection and
// close it. This is only applied to writes, where's there's generally
// an expectation that things will move along quickly.
ConnectionWriteTimeout time.Duration
ConnectionWriteTimeoutHCL string `hcl:"connection_write_timeout,optional"`
// StreamOpenTimeout is the maximum amount of time that a stream will
// be allowed to remain in pending state while waiting for an ack from the peer.
// Once the timeout is reached the session will be gracefully closed.
// A zero value disables the StreamOpenTimeout allowing unbounded
// blocking on OpenStream calls.
StreamOpenTimeout time.Duration
StreamOpenTimeoutHCL string `hcl:"stream_open_timeout,optional"`
// StreamCloseTimeout is the maximum time that a stream will allowed to
// be in a half-closed state when `Close` is called before forcibly
// closing the connection. Forcibly closed connections will empty the
// receive buffer, drop any future packets received for that stream,
// and send a RST to the remote side.
StreamCloseTimeout time.Duration
StreamCloseTimeoutHCL string `hcl:"stream_close_timeout,optional"`
}
func (r *RPCConfig) Copy() *RPCConfig {
if r == nil {
return nil
}
nr := *r
return &nr
}
func (r *RPCConfig) Merge(rpc *RPCConfig) *RPCConfig {
if r == nil {
return rpc
}
result := *r
if rpc == nil {
return &result
}
if rpc.AcceptBacklog > 0 {
result.AcceptBacklog = rpc.AcceptBacklog
}
if rpc.KeepAliveIntervalHCL != "" {
result.KeepAliveIntervalHCL = rpc.KeepAliveIntervalHCL
}
if rpc.KeepAliveInterval > 0 {
result.KeepAliveInterval = rpc.KeepAliveInterval
}
if rpc.ConnectionWriteTimeoutHCL != "" {
result.ConnectionWriteTimeoutHCL = rpc.ConnectionWriteTimeoutHCL
}
if rpc.ConnectionWriteTimeout > 0 {
result.ConnectionWriteTimeout = rpc.ConnectionWriteTimeout
}
if rpc.StreamOpenTimeoutHCL != "" {
result.StreamOpenTimeoutHCL = rpc.StreamOpenTimeoutHCL
}
if rpc.StreamOpenTimeout > 0 {
result.StreamOpenTimeout = rpc.StreamOpenTimeout
}
if rpc.StreamCloseTimeoutHCL != "" {
result.StreamCloseTimeoutHCL = rpc.StreamCloseTimeoutHCL
}
if rpc.StreamCloseTimeout > 0 {
result.StreamCloseTimeout = rpc.StreamCloseTimeout
}
return &result
}
func (r *RPCConfig) Validate() error {
if r != nil {
if r.AcceptBacklog < 0 {
return errors.New("rcp.accept_backlog interval must be greater than zero")
}
if r.KeepAliveInterval < 0 {
return errors.New("rcp.keep_alive_interval must be greater than zero")
}
if r.ConnectionWriteTimeout < 0 {
return errors.New("rcp.connection_write_timeout must be greater than zero")
}
if r.StreamCloseTimeout < 0 {
return errors.New("rcp.stream_close_timeout must be greater than zero")
}
if r.StreamOpenTimeout < 0 {
return errors.New("rcp.stream_open_timeout must be greater than zero")
}
}
return nil
}
// RaftBoltConfig is used in servers to configure parameters of the boltdb
// used for raft consensus.
type RaftBoltConfig struct {
@@ -1401,6 +1510,17 @@ func DefaultConfig() *Config {
Consuls: []*config.ConsulConfig{config.DefaultConsulConfig()},
Vaults: []*config.VaultConfig{config.DefaultVaultConfig()},
UI: config.DefaultUIConfig(),
RPC: &RPCConfig{
AcceptBacklog: 256,
KeepAliveInterval: 30 * time.Second,
KeepAliveIntervalHCL: "30s",
ConnectionWriteTimeout: 10 * time.Second,
ConnectionWriteTimeoutHCL: "10s",
StreamOpenTimeout: 75 * time.Second,
StreamOpenTimeoutHCL: "75s",
StreamCloseTimeout: 5 * time.Minute,
StreamCloseTimeoutHCL: "5m",
},
Client: &ClientConfig{
Enabled: false,
NodePool: structs.NodePoolDefault,
@@ -1614,6 +1734,14 @@ func (c *Config) Merge(b *Config) *Config {
result.Server = result.Server.Merge(b.Server)
}
// Apply the rpc mux config
if result.RPC == nil && b.RPC != nil {
rpcMux := *b.RPC
result.RPC = &rpcMux
} else if b.RPC != nil {
result.RPC = result.RPC.Merge(b.RPC)
}
// Apply the acl config
if result.ACL == nil && b.ACL != nil {
server := *b.ACL

View File

@@ -57,6 +57,7 @@ func ParseConfigFile(path string) (*Config, error) {
ServerJoin: &ServerJoin{},
},
ACL: &ACLConfig{},
RPC: &RPCConfig{},
Audit: &config.AuditConfig{},
Consuls: []*config.ConsulConfig{},
Autopilot: &config.AutopilotConfig{},
@@ -181,6 +182,10 @@ func ParseConfigFile(path string) (*Config, error) {
},
{"reporting.export_interval",
&c.Reporting.ExportInterval, &c.Reporting.ExportIntervalHCL, nil},
{"rpc.keep_alive_interval", &c.RPC.KeepAliveInterval, &c.RPC.KeepAliveIntervalHCL, nil},
{"rpc.connection_write_timeout", &c.RPC.ConnectionWriteTimeout, &c.RPC.ConnectionWriteTimeoutHCL, nil},
{"rpc.stream_open_timeout", &c.RPC.StreamOpenTimeout, &c.RPC.StreamOpenTimeoutHCL, nil},
{"rpc.stream_close_timeout", &c.RPC.StreamCloseTimeout, &c.RPC.StreamCloseTimeoutHCL, nil},
}
// Parse durations for Consul and Vault config blocks if provided.

View File

@@ -43,6 +43,7 @@ var basicConfig = &Config{
RPC: "127.0.0.3",
Serf: "127.0.0.4",
},
RPC: &RPCConfig{},
Client: &ClientConfig{
Enabled: true,
StateDir: "/tmp/client-state",
@@ -588,6 +589,9 @@ func (c *Config) addDefaults() {
if c.ACL == nil {
c.ACL = &ACLConfig{}
}
if c.RPC == nil {
c.RPC = &RPCConfig{}
}
if c.Audit == nil {
c.Audit = &config.AuditConfig{}
}
@@ -705,6 +709,7 @@ var sample0 = &Config{
ACL: &ACLConfig{
Enabled: true,
},
RPC: &RPCConfig{},
Audit: &config.AuditConfig{
Enabled: pointer.Of(true),
Sinks: []*config.AuditSink{
@@ -813,6 +818,17 @@ var sample1 = &Config{
ACL: &ACLConfig{
Enabled: true,
},
RPC: &RPCConfig{
AcceptBacklog: 256,
KeepAliveInterval: 30 * time.Second,
KeepAliveIntervalHCL: "30s",
ConnectionWriteTimeout: 10 * time.Second,
ConnectionWriteTimeoutHCL: "10s",
StreamOpenTimeout: 75 * time.Second,
StreamOpenTimeoutHCL: "75s",
StreamCloseTimeout: 5 * time.Minute,
StreamCloseTimeoutHCL: "5m",
},
Audit: &config.AuditConfig{
Enabled: pointer.Of(true),
Sinks: []*config.AuditSink{

View File

@@ -19,6 +19,14 @@
"kms_key_id": "alias/kms-nomad-keyring"
}
},
"rpc": {
"accept_backlog": 256,
"keep_alive_interval": "30s",
"connection_write_timeout": "10s",
"max_stream_window_size": 262144,
"stream_open_timeout": "75s",
"stream_close_timeout": "5m"
},
"leave_on_interrupt": true,
"leave_on_terminate": true,
"log_level": "INFO",

View File

@@ -18,6 +18,14 @@
"leave_on_terminate": true,
"log_level": "INFO",
"region": "global",
"rpc": {
"accept_backlog": 256,
"keep_alive_interval": "30s",
"connection_write_timeout": "10s",
"max_stream_window_size": 262144,
"stream_open_timeout": "75s",
"stream_close_timeout": "5m"
},
"server": {
"bootstrap_expect": 3,
"enabled": true

View File

@@ -17,6 +17,15 @@
"token" = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
}
rpc {
accept_backlog = 256
keep_alive_interval = "30s"
connection_write_timeout = "10s"
max_stream_window_size = 262144
stream_open_timeout = "75s"
stream_close_timeout = "5m"
}
vault = {
enabled = true
}