diff --git a/nomad/acl_endpoint.go b/nomad/acl_endpoint.go index 6b9ec8bf4..7d0288ad9 100644 --- a/nomad/acl_endpoint.go +++ b/nomad/acl_endpoint.go @@ -9,10 +9,11 @@ import ( "strings" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-set" + policy "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" @@ -35,7 +36,12 @@ const ( // ACL endpoint is used for manipulating ACL tokens and policies type ACL struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewACLEndpoint(srv *Server, ctx *RPCContext) *ACL { + return &ACL{srv: srv, ctx: ctx, logger: srv.logger.Named("acl")} } // UpsertPolicies is used to create or update a set of policies diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 79745bb29..44a6a93af 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -5,10 +5,10 @@ import ( "net/http" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/pointer" @@ -21,10 +21,12 @@ import ( // Alloc endpoint is used for manipulating allocations type Alloc struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} - // ctx provides context regarding the underlying connection - ctx *RPCContext +func NewAllocEndpoint(srv *Server, ctx *RPCContext) *Alloc { + return &Alloc{srv: srv, ctx: ctx, logger: srv.logger.Named("alloc")} } // List is used to list the allocations in the system diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 8a2f29e01..59402ab6b 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -26,6 +26,10 @@ type Agent struct { srv *Server } +func NewAgentEndpoint(srv *Server) *Agent { + return &Agent{srv: srv} +} + func (a *Agent) register() { a.srv.streamingRpcs.Register("Agent.Monitor", a.monitor) } diff --git a/nomad/client_alloc_endpoint.go b/nomad/client_alloc_endpoint.go index 69d7296ca..dc4e8700b 100644 --- a/nomad/client_alloc_endpoint.go +++ b/nomad/client_alloc_endpoint.go @@ -7,9 +7,10 @@ import ( "net" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/pointer" @@ -20,7 +21,11 @@ import ( // Allocation endpoint. type ClientAllocations struct { srv *Server - logger log.Logger + logger hclog.Logger +} + +func NewClientAllocationsEndpoint(srv *Server) *ClientAllocations { + return &ClientAllocations{srv: srv, logger: srv.logger.Named("client_allocs")} } func (a *ClientAllocations) register() { diff --git a/nomad/client_csi_endpoint.go b/nomad/client_csi_endpoint.go index 37876305b..315ded4ff 100644 --- a/nomad/client_csi_endpoint.go +++ b/nomad/client_csi_endpoint.go @@ -19,6 +19,10 @@ type ClientCSI struct { logger log.Logger } +func NewClientCSIEndpoint(srv *Server) *ClientCSI { + return &ClientCSI{srv: srv, logger: srv.logger.Named("client_csi")} +} + func (a *ClientCSI) ControllerAttachVolume(args *cstructs.ClientCSIControllerAttachVolumeRequest, reply *cstructs.ClientCSIControllerAttachVolumeResponse) error { defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "attach_volume"}, time.Now()) diff --git a/nomad/client_fs_endpoint.go b/nomad/client_fs_endpoint.go index 9e73fd60e..946b2748d 100644 --- a/nomad/client_fs_endpoint.go +++ b/nomad/client_fs_endpoint.go @@ -25,6 +25,10 @@ type FileSystem struct { logger log.Logger } +func NewFileSystemEndpoint(srv *Server) *FileSystem { + return &FileSystem{srv: srv, logger: srv.logger.Named("client_fs")} +} + func (f *FileSystem) register() { f.srv.streamingRpcs.Register("FileSystem.Logs", f.logs) f.srv.streamingRpcs.Register("FileSystem.Stream", f.stream) diff --git a/nomad/client_stats_endpoint.go b/nomad/client_stats_endpoint.go index 91540b241..ac1976f90 100644 --- a/nomad/client_stats_endpoint.go +++ b/nomad/client_stats_endpoint.go @@ -18,6 +18,10 @@ type ClientStats struct { logger log.Logger } +func NewClientStatsEndpoint(srv *Server) *ClientStats { + return &ClientStats{srv: srv, logger: srv.logger.Named("client_stats")} +} + func (s *ClientStats) Stats(args *nstructs.NodeSpecificRequest, reply *structs.ClientStatsResponse) error { // We only allow stale reads since the only potentially stale information is // the Node registration and the cost is fairly high for adding another hope diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 36b2c9bf7..edaaf1caf 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -6,10 +6,11 @@ import ( "strings" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/state" @@ -20,7 +21,12 @@ import ( // CSIVolume wraps the structs.CSIVolume with request data and server context type CSIVolume struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewCSIVolumeEndpoint(srv *Server, ctx *RPCContext) *CSIVolume { + return &CSIVolume{srv: srv, ctx: ctx, logger: srv.logger.Named("csi_volume")} } // QueryACLObj looks up the ACL token in the request and returns the acl.ACL object @@ -1428,7 +1434,12 @@ func (v *CSIVolume) ListSnapshots(args *structs.CSISnapshotListRequest, reply *s // CSIPlugin wraps the structs.CSIPlugin with request data and server context type CSIPlugin struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewCSIPluginEndpoint(srv *Server, ctx *RPCContext) *CSIPlugin { + return &CSIPlugin{srv: srv, ctx: ctx, logger: srv.logger.Named("csi_plugin")} } // List replies with CSIPlugins, filtered by ACL access diff --git a/nomad/deployment_endpoint.go b/nomad/deployment_endpoint.go index 84632f5a5..08c4c5fc2 100644 --- a/nomad/deployment_endpoint.go +++ b/nomad/deployment_endpoint.go @@ -5,9 +5,10 @@ import ( "net/http" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state/paginator" @@ -17,10 +18,12 @@ import ( // Deployment endpoint is used for manipulating deployments type Deployment struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} - // ctx provides context regarding the underlying connection - ctx *RPCContext +func NewDeploymentEndpoint(srv *Server, ctx *RPCContext) *Deployment { + return &Deployment{srv: srv, ctx: ctx, logger: srv.logger.Named("deployment")} } // GetDeployment is used to request information about a specific deployment diff --git a/nomad/endpoints_oss.go b/nomad/endpoints_oss.go index 7c15a04ba..6d6592656 100644 --- a/nomad/endpoints_oss.go +++ b/nomad/endpoints_oss.go @@ -10,7 +10,7 @@ type EnterpriseEndpoints struct{} // NewEnterpriseEndpoints returns a stub of the enterprise endpoints since there // are none in oss -func NewEnterpriseEndpoints(s *Server) *EnterpriseEndpoints { +func NewEnterpriseEndpoints(s *Server, ctx *RPCContext) *EnterpriseEndpoints { return &EnterpriseEndpoints{} } diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index cd30d2550..9c1ca4e83 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -6,12 +6,12 @@ import ( "net/http" "time" - metrics "github.com/armon/go-metrics" + "github.com/armon/go-metrics" "github.com/hashicorp/go-bexpr" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" - version "github.com/hashicorp/go-version" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/state" @@ -30,10 +30,12 @@ var minVersionEvalDeleteByFilter = version.Must(version.NewVersion("1.4.3")) // Eval endpoint is used for eval interactions type Eval struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} - // ctx provides context regarding the underlying connection - ctx *RPCContext +func NewEvalEndpoint(srv *Server, ctx *RPCContext) *Eval { + return &Eval{srv: srv, ctx: ctx, logger: srv.logger.Named("eval")} } // GetEval is used to request information about a specific evaluation diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index dcfaf49a2..4bb75c095 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" @@ -16,6 +17,10 @@ type Event struct { srv *Server } +func NewEventEndpoint(srv *Server) *Event { + return &Event{srv: srv} +} + func (e *Event) register() { e.srv.streamingRpcs.Register("Event.Stream", e.stream) } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index c5a9e7cc3..dec4b9ca2 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-set" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pointer" @@ -50,6 +51,7 @@ var ( // Job endpoint is used for job interactions type Job struct { srv *Server + ctx *RPCContext logger hclog.Logger // builtin admission controllers @@ -58,9 +60,10 @@ type Job struct { } // NewJobEndpoints creates a new job endpoint with builtin admission controllers -func NewJobEndpoints(s *Server) *Job { +func NewJobEndpoints(s *Server, ctx *RPCContext) *Job { return &Job{ srv: s, + ctx: ctx, logger: s.logger.Named("job"), mutators: []jobMutator{ jobCanonicalizer{}, diff --git a/nomad/job_endpoint_hook_connect_test.go b/nomad/job_endpoint_hook_connect_test.go index 4e5bd7fc9..f2dd4642e 100644 --- a/nomad/job_endpoint_hook_connect_test.go +++ b/nomad/job_endpoint_hook_connect_test.go @@ -341,7 +341,7 @@ func TestJobEndpointConnect_ConnectInterpolation(t *testing.T) { ci.Parallel(t) server := &Server{logger: testlog.HCLogger(t)} - jobEndpoint := NewJobEndpoints(server) + jobEndpoint := NewJobEndpoints(server, nil) j := mock.ConnectJob() j.TaskGroups[0].Services[0].Name = "${JOB}-api" diff --git a/nomad/keyring_endpoint.go b/nomad/keyring_endpoint.go index 9b7e27ad9..114fb41db 100644 --- a/nomad/keyring_endpoint.go +++ b/nomad/keyring_endpoint.go @@ -4,9 +4,9 @@ import ( "fmt" "time" - metrics "github.com/armon/go-metrics" + "github.com/armon/go-metrics" "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" @@ -15,10 +15,15 @@ import ( // Keyring endpoint serves RPCs for root key management type Keyring struct { - srv *Server - logger hclog.Logger + srv *Server + ctx *RPCContext + logger hclog.Logger + encrypter *Encrypter - ctx *RPCContext // context for connection, to check TLS role +} + +func NewKeyringEndpoint(srv *Server, ctx *RPCContext, enc *Encrypter) *Keyring { + return &Keyring{srv: srv, ctx: ctx, logger: srv.logger.Named("keyring"), encrypter: enc} } func (k *Keyring) Rotate(args *structs.KeyringRotateRootKeyRequest, reply *structs.KeyringRotateRootKeyResponse) error { diff --git a/nomad/namespace_endpoint.go b/nomad/namespace_endpoint.go index 351405d3c..0a6a77800 100644 --- a/nomad/namespace_endpoint.go +++ b/nomad/namespace_endpoint.go @@ -4,9 +4,10 @@ import ( "fmt" "time" - metrics "github.com/armon/go-metrics" - memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -14,6 +15,11 @@ import ( // Namespace endpoint is used for manipulating namespaces type Namespace struct { srv *Server + ctx *RPCContext +} + +func NewNamespaceEndpoint(srv *Server, ctx *RPCContext) *Namespace { + return &Namespace{srv: srv, ctx: ctx} } // UpsertNamespaces is used to upsert a set of namespaces diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index a00215955..0151e6271 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -14,14 +14,15 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" + vapi "github.com/hashicorp/vault/api" + "golang.org/x/sync/errgroup" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" - vapi "github.com/hashicorp/vault/api" - "golang.org/x/sync/errgroup" ) const ( @@ -77,6 +78,16 @@ type Node struct { updatesLock sync.Mutex } +func NewNodeEndpoint(srv *Server, ctx *RPCContext) *Node { + return &Node{ + srv: srv, + ctx: ctx, + logger: srv.logger.Named("client"), + updates: []*structs.Allocation{}, + evals: []*structs.Evaluation{}, + } +} + // Register is used to upsert a client that is available for scheduling func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { isForwarded := args.IsForwarded() diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index 060e93034..cf348f274 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -7,7 +7,7 @@ import ( "net" "time" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -20,7 +20,12 @@ import ( // Operator endpoint is used to perform low-level operator tasks for Nomad. type Operator struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewOperatorEndpoint(srv *Server, ctx *RPCContext) *Operator { + return &Operator{srv: srv, ctx: ctx, logger: srv.logger.Named("operator")} } func (op *Operator) register() { diff --git a/nomad/periodic_endpoint.go b/nomad/periodic_endpoint.go index b8e4807cf..e4d9d8a36 100644 --- a/nomad/periodic_endpoint.go +++ b/nomad/periodic_endpoint.go @@ -4,9 +4,9 @@ import ( "fmt" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/structs" @@ -15,7 +15,12 @@ import ( // Periodic endpoint is used for periodic job interactions type Periodic struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewPeriodicEndpoint(srv *Server, ctx *RPCContext) *Periodic { + return &Periodic{srv: srv, ctx: ctx, logger: srv.logger.Named("periodic")} } // Force is used to force a new instance of a periodic job diff --git a/nomad/plan_endpoint.go b/nomad/plan_endpoint.go index 4979270e4..585049d4b 100644 --- a/nomad/plan_endpoint.go +++ b/nomad/plan_endpoint.go @@ -4,8 +4,8 @@ import ( "fmt" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -13,10 +13,12 @@ import ( // Plan endpoint is used for plan interactions type Plan struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} - // ctx provides context regarding the underlying connection - ctx *RPCContext +func NewPlanEndpoint(srv *Server, ctx *RPCContext) *Plan { + return &Plan{srv: srv, ctx: ctx, logger: srv.logger.Named("plan")} } // Submit is used to submit a plan to the leader diff --git a/nomad/regions_endpoint.go b/nomad/regions_endpoint.go index 84afad2e7..bd6db9723 100644 --- a/nomad/regions_endpoint.go +++ b/nomad/regions_endpoint.go @@ -1,7 +1,7 @@ package nomad import ( - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -9,7 +9,12 @@ import ( // Region is used to query and list the known regions type Region struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewRegionEndpoint(srv *Server, ctx *RPCContext) *Region { + return &Region{srv: srv, ctx: ctx, logger: srv.logger.Named("region")} } // List is used to list all of the known regions. No leader forwarding is diff --git a/nomad/scaling_endpoint.go b/nomad/scaling_endpoint.go index a93ddd5a1..95b021cdf 100644 --- a/nomad/scaling_endpoint.go +++ b/nomad/scaling_endpoint.go @@ -4,9 +4,9 @@ import ( "strings" "time" - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" @@ -17,7 +17,12 @@ import ( // Scaling endpoint is used for listing and retrieving scaling policies type Scaling struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewScalingEndpoint(srv *Server, ctx *RPCContext) *Scaling { + return &Scaling{srv: srv, ctx: ctx, logger: srv.logger.Named("scaling")} } // ListPolicies is used to list the policies diff --git a/nomad/search_endpoint.go b/nomad/search_endpoint.go index 9358488d9..a7c45e6f7 100644 --- a/nomad/search_endpoint.go +++ b/nomad/search_endpoint.go @@ -43,9 +43,14 @@ var ( // Search endpoint is used to look up matches for a given prefix and context type Search struct { srv *Server + ctx *RPCContext logger hclog.Logger } +func NewSearchEndpoint(srv *Server, ctx *RPCContext) *Search { + return &Search{srv: srv, ctx: ctx, logger: srv.logger.Named("search")} +} + // getPrefixMatches extracts matches for an iterator, and returns a list of ids for // these matches. func (s *Search) getPrefixMatches(iter memdb.ResultIterator, prefix string) ([]string, bool) { diff --git a/nomad/server.go b/nomad/server.go index a00781d3a..cea051601 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1200,92 +1200,90 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error { // setupRpcServer is used to populate an RPC server with endpoints func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) error { - // Add the static endpoints to the RPC server. + // Add the static endpoints to the RPC server. These are the RPC handlers + // that get used when component on the server is making an internal RPC + // call, so we only need them to be initialized once and they have no RPC + // context. if s.staticEndpoints.Status == nil { - // Initialize the list just once - s.staticEndpoints.ACL = &ACL{srv: s, logger: s.logger.Named("acl")} - s.staticEndpoints.Job = NewJobEndpoints(s) - s.staticEndpoints.CSIVolume = &CSIVolume{srv: s, logger: s.logger.Named("csi_volume")} - s.staticEndpoints.CSIPlugin = &CSIPlugin{srv: s, logger: s.logger.Named("csi_plugin")} - s.staticEndpoints.Operator = &Operator{srv: s, logger: s.logger.Named("operator")} - s.staticEndpoints.Operator.register() + // note: Alloc, Plan have only dynamic endpoints + s.staticEndpoints.ACL = NewACLEndpoint(s, nil) + s.staticEndpoints.CSIVolume = NewCSIVolumeEndpoint(s, nil) + s.staticEndpoints.CSIPlugin = NewCSIPluginEndpoint(s, nil) + s.staticEndpoints.Deployment = NewDeploymentEndpoint(s, nil) + s.staticEndpoints.Job = NewJobEndpoints(s, nil) + s.staticEndpoints.Keyring = NewKeyringEndpoint(s, nil, s.encrypter) + s.staticEndpoints.Namespace = NewNamespaceEndpoint(s, nil) + s.staticEndpoints.Node = NewNodeEndpoint(s, nil) + s.staticEndpoints.Operator = NewOperatorEndpoint(s, nil) + s.staticEndpoints.Operator.register() // register the streaming RPCs + s.staticEndpoints.Periodic = NewPeriodicEndpoint(s, nil) + s.staticEndpoints.Region = NewRegionEndpoint(s, nil) + s.staticEndpoints.Scaling = NewScalingEndpoint(s, nil) + s.staticEndpoints.Search = NewSearchEndpoint(s, nil) + s.staticEndpoints.ServiceRegistration = NewServiceRegistrationEndpoint(s, nil) + s.staticEndpoints.Status = NewStatusEndpoint(s, nil) + s.staticEndpoints.System = NewSystemEndpoint(s, nil) + s.staticEndpoints.Variables = NewVariablesEndpoint(s, nil, s.encrypter) - s.staticEndpoints.Periodic = &Periodic{srv: s, logger: s.logger.Named("periodic")} - s.staticEndpoints.Region = &Region{srv: s, logger: s.logger.Named("region")} - s.staticEndpoints.Scaling = &Scaling{srv: s, logger: s.logger.Named("scaling")} - s.staticEndpoints.Status = &Status{srv: s, logger: s.logger.Named("status")} - s.staticEndpoints.System = &System{srv: s, logger: s.logger.Named("system")} - s.staticEndpoints.Search = &Search{srv: s, logger: s.logger.Named("search")} - s.staticEndpoints.Namespace = &Namespace{srv: s} - s.staticEndpoints.Variables = &Variables{srv: s, logger: s.logger.Named("variables"), encrypter: s.encrypter} - s.staticEndpoints.Keyring = &Keyring{srv: s, logger: s.logger.Named("keyring"), encrypter: s.encrypter} + s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s, nil) - s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s) - - // These endpoints are dynamic because they need access to the - // RPCContext, but they also need to be called directly in some cases, - // so store them into staticEndpoints for later access, but don't - // register them as static. - s.staticEndpoints.Deployment = &Deployment{srv: s, logger: s.logger.Named("deployment")} - s.staticEndpoints.Node = &Node{srv: s, logger: s.logger.Named("client")} - s.staticEndpoints.ServiceRegistration = &ServiceRegistration{srv: s} + // These endpoints don't have a dynamic counterpart, so they'll need to + // be re-registered per connection as well (see below) // Client endpoints - s.staticEndpoints.ClientStats = &ClientStats{srv: s, logger: s.logger.Named("client_stats")} - s.staticEndpoints.ClientAllocations = &ClientAllocations{srv: s, logger: s.logger.Named("client_allocs")} - s.staticEndpoints.ClientAllocations.register() - s.staticEndpoints.ClientCSI = &ClientCSI{srv: s, logger: s.logger.Named("client_csi")} + s.staticEndpoints.ClientStats = NewClientStatsEndpoint(s) + s.staticEndpoints.ClientAllocations = NewClientAllocationsEndpoint(s) + s.staticEndpoints.ClientAllocations.register() // register the streaming RPCs + s.staticEndpoints.ClientCSI = NewClientCSIEndpoint(s) // Streaming endpoints - s.staticEndpoints.FileSystem = &FileSystem{srv: s, logger: s.logger.Named("client_fs")} + s.staticEndpoints.FileSystem = NewFileSystemEndpoint(s) s.staticEndpoints.FileSystem.register() - s.staticEndpoints.Agent = &Agent{srv: s} + s.staticEndpoints.Agent = NewAgentEndpoint(s) s.staticEndpoints.Agent.register() - s.staticEndpoints.Event = &Event{srv: s} + s.staticEndpoints.Event = NewEventEndpoint(s) s.staticEndpoints.Event.register() - } - // Register the static handlers - server.Register(s.staticEndpoints.ACL) - server.Register(s.staticEndpoints.Job) - server.Register(s.staticEndpoints.CSIVolume) - server.Register(s.staticEndpoints.CSIPlugin) - server.Register(s.staticEndpoints.Operator) - server.Register(s.staticEndpoints.Periodic) - server.Register(s.staticEndpoints.Region) - server.Register(s.staticEndpoints.Scaling) - server.Register(s.staticEndpoints.Status) - server.Register(s.staticEndpoints.System) - server.Register(s.staticEndpoints.Search) - s.staticEndpoints.Enterprise.Register(server) + // If an endpoint has any non-streaming RPCs doesn't have an RPC context, + // we'll register the static handler here instead of creating a new dynamic + // endpoint on each connection. + server.Register(s.staticEndpoints.ClientStats) server.Register(s.staticEndpoints.ClientAllocations) server.Register(s.staticEndpoints.ClientCSI) server.Register(s.staticEndpoints.FileSystem) server.Register(s.staticEndpoints.Agent) - server.Register(s.staticEndpoints.Namespace) - server.Register(s.staticEndpoints.Variables) - // Create new dynamic endpoints and add them to the RPC server. - alloc := &Alloc{srv: s, ctx: ctx, logger: s.logger.Named("alloc")} - deployment := &Deployment{srv: s, ctx: ctx, logger: s.logger.Named("deployment")} - eval := &Eval{srv: s, ctx: ctx, logger: s.logger.Named("eval")} - node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")} - plan := &Plan{srv: s, ctx: ctx, logger: s.logger.Named("plan")} - serviceReg := &ServiceRegistration{srv: s, ctx: ctx} - keyringReg := &Keyring{srv: s, ctx: ctx, logger: s.logger.Named("keyring"), encrypter: s.encrypter} + // Dynamic endpoints are endpoints that include the connection context and + // are created on each connection. Register all the dynamic endpoints with + // the RPC server. + + _ = server.Register(NewACLEndpoint(s, ctx)) + _ = server.Register(NewAllocEndpoint(s, ctx)) + _ = server.Register(NewCSIVolumeEndpoint(s, ctx)) + _ = server.Register(NewCSIPluginEndpoint(s, ctx)) + _ = server.Register(NewDeploymentEndpoint(s, ctx)) + _ = server.Register(NewEvalEndpoint(s, ctx)) + _ = server.Register(NewJobEndpoints(s, ctx)) + _ = server.Register(NewKeyringEndpoint(s, ctx, s.encrypter)) + _ = server.Register(NewNamespaceEndpoint(s, ctx)) + _ = server.Register(NewNodeEndpoint(s, ctx)) + _ = server.Register(NewOperatorEndpoint(s, ctx)) + _ = server.Register(NewPeriodicEndpoint(s, ctx)) + _ = server.Register(NewPlanEndpoint(s, ctx)) + _ = server.Register(NewRegionEndpoint(s, ctx)) + _ = server.Register(NewScalingEndpoint(s, ctx)) + _ = server.Register(NewSearchEndpoint(s, ctx)) + _ = server.Register(NewServiceRegistrationEndpoint(s, ctx)) + _ = server.Register(NewStatusEndpoint(s, ctx)) + _ = server.Register(NewSystemEndpoint(s, ctx)) + _ = server.Register(NewVariablesEndpoint(s, ctx, s.encrypter)) + + _ = server.Register(NewEnterpriseEndpoints(s, ctx)) - // Register the dynamic endpoints - server.Register(alloc) - server.Register(deployment) - server.Register(eval) - server.Register(node) - server.Register(plan) - _ = server.Register(serviceReg) - _ = server.Register(keyringReg) return nil } diff --git a/nomad/service_registration_endpoint.go b/nomad/service_registration_endpoint.go index 9aacbad60..61d92890b 100644 --- a/nomad/service_registration_endpoint.go +++ b/nomad/service_registration_endpoint.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-set" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" @@ -24,12 +25,13 @@ import ( // "/v1/service{s}" HTTP API. type ServiceRegistration struct { srv *Server - - // ctx provides context regarding the underlying connection, so we can - // perform TLS certificate validation on internal only endpoints. ctx *RPCContext } +func NewServiceRegistrationEndpoint(srv *Server, ctx *RPCContext) *ServiceRegistration { + return &ServiceRegistration{srv: srv, ctx: ctx} +} + // Upsert creates or updates service registrations held within Nomad. This RPC // is only callable by Nomad nodes. func (s *ServiceRegistration) Upsert( diff --git a/nomad/status_endpoint.go b/nomad/status_endpoint.go index 88fa754a2..138e27719 100644 --- a/nomad/status_endpoint.go +++ b/nomad/status_endpoint.go @@ -5,7 +5,7 @@ import ( "fmt" "strconv" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -13,7 +13,12 @@ import ( // Status endpoint is used to check on server status type Status struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewStatusEndpoint(srv *Server, ctx *RPCContext) *Status { + return &Status{srv: srv, ctx: ctx, logger: srv.logger.Named("status")} } // Ping is used to just check for connectivity diff --git a/nomad/system_endpoint.go b/nomad/system_endpoint.go index 6d9aa0b32..87deeac67 100644 --- a/nomad/system_endpoint.go +++ b/nomad/system_endpoint.go @@ -3,7 +3,7 @@ package nomad import ( "fmt" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -11,7 +11,12 @@ import ( // System endpoint is used to call invoke system tasks. type System struct { srv *Server - logger log.Logger + ctx *RPCContext + logger hclog.Logger +} + +func NewSystemEndpoint(srv *Server, ctx *RPCContext) *System { + return &System{srv: srv, ctx: ctx, logger: srv.logger.Named("system")} } // GarbageCollect is used to trigger the system to immediately garbage collect nodes, evals diff --git a/nomad/variables_endpoint.go b/nomad/variables_endpoint.go index 79f83b8af..856853a86 100644 --- a/nomad/variables_endpoint.go +++ b/nomad/variables_endpoint.go @@ -7,9 +7,9 @@ import ( "strings" "time" - metrics "github.com/armon/go-metrics" + "github.com/armon/go-metrics" "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" @@ -22,11 +22,17 @@ import ( // callable via the Variables RPCs and externally via the "/v1/var{s}" // HTTP API. type Variables struct { - srv *Server - logger hclog.Logger + srv *Server + ctx *RPCContext + logger hclog.Logger + encrypter *Encrypter } +func NewVariablesEndpoint(srv *Server, ctx *RPCContext, enc *Encrypter) *Variables { + return &Variables{srv: srv, ctx: ctx, logger: srv.logger.Named("variables"), encrypter: enc} +} + // Apply is used to apply a SV update request to the data store. func (sv *Variables) Apply(args *structs.VariablesApplyRequest, reply *structs.VariablesApplyResponse) error { if done, err := sv.srv.forward(structs.VariablesApplyRPCMethod, args, args, reply); done {