diff --git a/client/client_stats_endpoint.go b/client/client_stats_endpoint.go index 3506e3c04..d543eab28 100644 --- a/client/client_stats_endpoint.go +++ b/client/client_stats_endpoint.go @@ -15,7 +15,7 @@ type ClientStats struct { // Stats is used to retrieve the Clients stats. func (s *ClientStats) Stats(args *structs.ClientStatsRequest, reply *structs.ClientStatsResponse) error { - defer metrics.MeasureSince([]string{"nomad", "client", "client_stats", "stats"}, time.Now()) + defer metrics.MeasureSince([]string{"client", "client_stats", "stats"}, time.Now()) // Check node read permissions if aclObj, err := s.c.ResolveToken(args.AuthToken); err != nil { diff --git a/client/rpc.go b/client/rpc.go index 1a658dde5..70813c428 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -2,12 +2,16 @@ package client import ( "fmt" + "io" "net" "net/rpc" "strings" + metrics "github.com/armon/go-metrics" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/codec" + "github.com/hashicorp/nomad/nomad" + "github.com/hashicorp/yamux" ) // rpcEndpoints holds the RPC endpoints @@ -57,6 +61,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { return mErr.ErrorOrNil() } +// TODO This can't really be tested until Servers can dial back to the client. // setupClientRpc is used to setup the Client's RPC endpoints func (c *Client) setupClientRpc() { // Initialize the RPC handlers @@ -67,6 +72,71 @@ func (c *Client) setupClientRpc() { // Register the endpoints with the RPC server c.setupClientRpcServer(c.rpcServer) + + go c.rpcConnListener() +} + +// rpcConnListener is a long lived function that listens for new connections +// being made on the connection pool and starts an RPC listener for each +// connection. +func (c *Client) rpcConnListener() { + // Make a channel for new connections. + conns := make(chan *yamux.Session, 4) + c.connPool.SetConnListener(conns) + + for { + select { + case <-c.shutdownCh: + return + case session, ok := <-conns: + if !ok { + continue + } + + go c.listenConn(session) + } + } +} + +// listenConn is used to listen for connections being made from the server on +// pre-existing connection. This should be called in a goroutine. +func (c *Client) listenConn(s *yamux.Session) { + for { + conn, err := s.Accept() + if err != nil { + if s.IsClosed() { + return + } + + c.logger.Printf("[ERR] client.rpc: failed to accept RPC conn: %v", err) + continue + } + + go c.handleConn(conn) + metrics.IncrCounter([]string{"client", "rpc", "accept_conn"}, 1) + } +} + +// handleConn is used to handle an individual connection. +func (c *Client) handleConn(conn net.Conn) { + defer conn.Close() + rpcCodec := nomad.NewServerCodec(conn) + for { + select { + case <-c.shutdownCh: + return + default: + } + + if err := c.rpcServer.ServeRequest(rpcCodec); err != nil { + if err != io.EOF && !strings.Contains(err.Error(), "closed") { + c.logger.Printf("[ERR] client.rpc: RPC error: %v (%v)", err, conn) + metrics.IncrCounter([]string{"client", "rpc", "request_error"}, 1) + } + return + } + metrics.IncrCounter([]string{"client", "rpc", "request"}, 1) + } } // setupClientRpcServer is used to populate a client RPC server with endpoints.