mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 01:45:44 +03:00
some changes for more idiomatic code
This commit is contained in:
44
nomad/rpc.go
44
nomad/rpc.go
@@ -49,9 +49,8 @@ const (
|
||||
|
||||
type rpcHandler struct {
|
||||
*Server
|
||||
logger log.Logger
|
||||
gologger *golog.Logger
|
||||
acceptLoopDelay time.Duration
|
||||
logger log.Logger
|
||||
gologger *golog.Logger
|
||||
}
|
||||
|
||||
func newRpcHandler(s *Server) *rpcHandler {
|
||||
@@ -85,6 +84,8 @@ type RPCContext struct {
|
||||
// listen is used to listen for incoming RPC connections
|
||||
func (r *rpcHandler) listen(ctx context.Context) {
|
||||
defer close(r.listenerCh)
|
||||
|
||||
var acceptLoopDelay time.Duration
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -99,41 +100,46 @@ func (r *rpcHandler) listen(ctx context.Context) {
|
||||
if r.shutdown {
|
||||
return
|
||||
}
|
||||
r.handleAcceptErr(err, ctx)
|
||||
r.handleAcceptErr(ctx, err, &acceptLoopDelay)
|
||||
continue
|
||||
}
|
||||
// No error, reset loop delay
|
||||
r.acceptLoopDelay = 0
|
||||
acceptLoopDelay = 0
|
||||
|
||||
go r.handleConn(ctx, conn, &RPCContext{Conn: conn})
|
||||
metrics.IncrCounter([]string{"nomad", "rpc", "accept_conn"}, 1)
|
||||
}
|
||||
}
|
||||
|
||||
// Sleep to avoid spamming the log, with a maximum delay according to whether or not the error is temporary
|
||||
func (r *rpcHandler) handleAcceptErr(err error, ctx context.Context) {
|
||||
const baseAcceptLoopDelay = 5 * time.Millisecond
|
||||
const maxAcceptLoopDelay = 5 * time.Second
|
||||
const maxAcceptLoopDelayTemporaryError = 1 * time.Second
|
||||
// handleAcceptErr sleeps to avoid spamming the log,
|
||||
// with a maximum delay according to whether or not the error is temporary
|
||||
func (r *rpcHandler) handleAcceptErr(ctx context.Context, err error, loopDelay *time.Duration) {
|
||||
const baseDelay = 5 * time.Millisecond
|
||||
const maxDelayPerm = 5 * time.Second
|
||||
const maxDelayTemp = 1 * time.Second
|
||||
|
||||
if r.acceptLoopDelay == 0 {
|
||||
r.acceptLoopDelay = baseAcceptLoopDelay
|
||||
if *loopDelay == 0 {
|
||||
*loopDelay = baseDelay
|
||||
} else {
|
||||
r.acceptLoopDelay *= 2
|
||||
*loopDelay *= 2
|
||||
}
|
||||
|
||||
temporaryError := false
|
||||
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||
temporaryError = true
|
||||
}
|
||||
if temporaryError && r.acceptLoopDelay > maxAcceptLoopDelayTemporaryError {
|
||||
r.acceptLoopDelay = maxAcceptLoopDelayTemporaryError
|
||||
} else if r.acceptLoopDelay > maxAcceptLoopDelay {
|
||||
r.acceptLoopDelay = maxAcceptLoopDelay
|
||||
|
||||
if temporaryError && *loopDelay > maxDelayTemp {
|
||||
*loopDelay = maxDelayTemp
|
||||
} else if *loopDelay > maxDelayPerm {
|
||||
*loopDelay = maxDelayPerm
|
||||
}
|
||||
r.logger.Error("failed to accept RPC conn", "error", err, "delay", r.acceptLoopDelay)
|
||||
|
||||
r.logger.Error("failed to accept RPC conn", "error", err, "delay", *loopDelay)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.After(r.acceptLoopDelay):
|
||||
case <-time.After(*loopDelay):
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user