RPC Listener

This commit is contained in:
Alex Dadgar
2018-01-11 13:23:57 -08:00
parent ddee97ca29
commit 784f68e901
2 changed files with 71 additions and 1 deletions

View File

@@ -15,7 +15,7 @@ type ClientStats struct {
// Stats is used to retrieve the Clients stats.
func (s *ClientStats) Stats(args *structs.ClientStatsRequest, reply *structs.ClientStatsResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client", "client_stats", "stats"}, time.Now())
defer metrics.MeasureSince([]string{"client", "client_stats", "stats"}, time.Now())
// Check node read permissions
if aclObj, err := s.c.ResolveToken(args.AuthToken); err != nil {

View File

@@ -2,12 +2,16 @@ package client
import (
"fmt"
"io"
"net"
"net/rpc"
"strings"
metrics "github.com/armon/go-metrics"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/yamux"
)
// rpcEndpoints holds the RPC endpoints
@@ -57,6 +61,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
return mErr.ErrorOrNil()
}
// TODO This can't really be tested until Servers can dial back to the client.
// setupClientRpc is used to setup the Client's RPC endpoints
func (c *Client) setupClientRpc() {
// Initialize the RPC handlers
@@ -67,6 +72,71 @@ func (c *Client) setupClientRpc() {
// Register the endpoints with the RPC server
c.setupClientRpcServer(c.rpcServer)
go c.rpcConnListener()
}
// rpcConnListener is a long lived function that listens for new connections
// being made on the connection pool and starts an RPC listener for each
// connection.
func (c *Client) rpcConnListener() {
// Make a channel for new connections.
conns := make(chan *yamux.Session, 4)
c.connPool.SetConnListener(conns)
for {
select {
case <-c.shutdownCh:
return
case session, ok := <-conns:
if !ok {
continue
}
go c.listenConn(session)
}
}
}
// listenConn is used to listen for connections being made from the server on
// pre-existing connection. This should be called in a goroutine.
func (c *Client) listenConn(s *yamux.Session) {
for {
conn, err := s.Accept()
if err != nil {
if s.IsClosed() {
return
}
c.logger.Printf("[ERR] client.rpc: failed to accept RPC conn: %v", err)
continue
}
go c.handleConn(conn)
metrics.IncrCounter([]string{"client", "rpc", "accept_conn"}, 1)
}
}
// handleConn is used to handle an individual connection.
func (c *Client) handleConn(conn net.Conn) {
defer conn.Close()
rpcCodec := nomad.NewServerCodec(conn)
for {
select {
case <-c.shutdownCh:
return
default:
}
if err := c.rpcServer.ServeRequest(rpcCodec); err != nil {
if err != io.EOF && !strings.Contains(err.Error(), "closed") {
c.logger.Printf("[ERR] client.rpc: RPC error: %v (%v)", err, conn)
metrics.IncrCounter([]string{"client", "rpc", "request_error"}, 1)
}
return
}
metrics.IncrCounter([]string{"client", "rpc", "request"}, 1)
}
}
// setupClientRpcServer is used to populate a client RPC server with endpoints.