mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
nomad: adding RPC forwarding methods
This commit is contained in:
78
nomad/rpc.go
78
nomad/rpc.go
@@ -4,6 +4,7 @@ import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -34,6 +35,18 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
// maxQueryTime is used to bound the limit of a blocking query
|
||||
maxQueryTime = 300 * time.Second
|
||||
|
||||
// defaultQueryTime is the amount of time we block waiting for a change
|
||||
// if no time is specified. Previously we would wait the maxQueryTime.
|
||||
defaultQueryTime = 300 * time.Second
|
||||
|
||||
// jitterFraction is a the limit to the amount of jitter we apply
|
||||
// to a user specified MaxQueryTime. We divide the specified time by
|
||||
// the fraction. So 16 == 6.25% limit of jitter
|
||||
jitterFraction = 16
|
||||
|
||||
// Warn if the Raft command is larger than this.
|
||||
// If it's over 1MB something is probably being abusive.
|
||||
raftWarnSize = 1024 * 1024
|
||||
@@ -152,6 +165,71 @@ func (s *Server) handleNomadConn(conn net.Conn) {
|
||||
}
|
||||
}
|
||||
|
||||
// forward is used to forward to a remote region or to forward to the local leader
|
||||
// Returns a bool of if forwarding was performed, as well as any error
|
||||
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
|
||||
// Handle region forwarding
|
||||
region := info.RequestRegion()
|
||||
if region != s.config.Region {
|
||||
err := s.forwardRegion(method, region, args, reply)
|
||||
return true, err
|
||||
}
|
||||
|
||||
// Check if we can allow a stale read
|
||||
if info.IsRead() && info.AllowStaleRead() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Handle leader forwarding
|
||||
if !s.IsLeader() {
|
||||
err := s.forwardLeader(method, args, reply)
|
||||
return true, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
|
||||
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
|
||||
// Get the leader
|
||||
leader := s.raft.Leader()
|
||||
if leader == "" {
|
||||
return structs.ErrNoLeader
|
||||
}
|
||||
|
||||
// Lookup the server
|
||||
s.peerLock.RLock()
|
||||
server := s.localPeers[leader]
|
||||
s.peerLock.RUnlock()
|
||||
|
||||
// Handle a missing server
|
||||
if server == nil {
|
||||
return structs.ErrNoLeader
|
||||
}
|
||||
return s.connPool.RPC(s.config.Region, server.Addr, server.Version, method, args, reply)
|
||||
}
|
||||
|
||||
// forwardRegion is used to forward an RPC call to a remote region, or fail if no servers
|
||||
func (s *Server) forwardRegion(method, region string, args interface{}, reply interface{}) error {
|
||||
// Bail if we can't find any servers
|
||||
s.peerLock.RLock()
|
||||
servers := s.peers[region]
|
||||
if len(servers) == 0 {
|
||||
s.peerLock.RUnlock()
|
||||
s.logger.Printf("[WARN] nomad.rpc: RPC request for region '%s', no path found",
|
||||
region)
|
||||
return structs.ErrNoRegionPath
|
||||
}
|
||||
|
||||
// Select a random addr
|
||||
offset := rand.Int31() % int32(len(servers))
|
||||
server := servers[offset]
|
||||
s.peerLock.RUnlock()
|
||||
|
||||
// Forward to remote Nomad
|
||||
metrics.IncrCounter([]string{"nomad", "rpc", "cross-region", region}, 1)
|
||||
return s.connPool.RPC(region, server.Addr, server.Version, method, args, reply)
|
||||
}
|
||||
|
||||
// raftApply is used to encode a message, run it through raft, and return
|
||||
// the FSM response along with any errors
|
||||
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||
|
||||
Reference in New Issue
Block a user