diff --git a/nomad/server.go b/nomad/server.go index d9f8679fc..a0b0bf16f 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1132,10 +1132,10 @@ func (s *Server) setupVaultClient() error { // setupRPC is used to setup the RPC listener func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error { // Populate the static RPC server - err := s.setupRpcServer(s.rpcServer, nil) - if err != nil { - return err - } + s.setupRpcServer(s.rpcServer, nil) + + // Setup streaming endpoints + s.setupStreamingEndpoints(s.rpcServer) listener, err := s.createRPCListener() if err != nil { @@ -1193,26 +1193,21 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error { return nil } -// setupRpcServer is used to populate an RPC server with endpoints. This gets -// called at startup but also once for every new RPC connection so that RPC -// handlers can have per-connection context. -func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) error { - +// setupStreamingEndpoints is used to populate an RPC server with streaming +// endpoints. This only gets called at server startup. +func (s *Server) setupStreamingEndpoints(server *rpc.Server) { // The endpoints are client RPCs and don't include a connection // context. They also need to be registered as streaming endpoints in their // register() methods. clientAllocs := NewClientAllocationsEndpoint(s) clientAllocs.register() - _ = server.Register(clientAllocs) fsEndpoint := NewFileSystemEndpoint(s) fsEndpoint.register() - _ = server.Register(fsEndpoint) agentEndpoint := NewAgentEndpoint(s) agentEndpoint.register() - _ = server.Register(agentEndpoint) // Event is a streaming-only endpoint so we don't want to register it as a // normal RPC @@ -1221,14 +1216,26 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) error { // Operator takes a RPC context but also has a streaming RPC that needs to // be registered - operatorEndpoint := NewOperatorEndpoint(s, ctx) + operatorEndpoint := NewOperatorEndpoint(s, nil) operatorEndpoint.register() - _ = server.Register(NewOperatorEndpoint(s, ctx)) +} +// setupRpcServer is used to populate an RPC server with endpoints. This gets +// called at startup but also once for every new RPC connection so that RPC +// handlers can have per-connection context. +func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { // These endpoints are client RPCs and don't include a connection context _ = server.Register(NewClientCSIEndpoint(s)) _ = server.Register(NewClientStatsEndpoint(s)) + // These endpoints have their streaming component registered in + // setupStreamingEndpoints, but their non-streaming RPCs are registered + // here. + _ = server.Register(NewClientAllocationsEndpoint(s)) + _ = server.Register(NewFileSystemEndpoint(s)) + _ = server.Register(NewAgentEndpoint(s)) + _ = server.Register(NewOperatorEndpoint(s, ctx)) + // All other endpoints include the connection context and don't need to be // registered as streaming endpoints @@ -1252,10 +1259,10 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) error { _ = server.Register(NewSystemEndpoint(s, ctx)) _ = server.Register(NewVariablesEndpoint(s, ctx, s.encrypter)) + // Register non-streaming + ent := NewEnterpriseEndpoints(s, ctx) ent.Register(server) - - return nil } // setupRaft is used to setup and initialize Raft