From 281a0fb38e4b8b4e6777519a08ca74e2096dd0f7 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Wed, 13 Apr 2022 07:41:44 +0200 Subject: [PATCH] service discovery: add pagination and filtering support to info requests (#12552) * services: add pagination and filter support to info RPC. * cli: add filter flag to service info command. * docs: add pagination and filter details to services info API. * paginator: minor updates to comment and func signature. --- command/service_info.go | 14 ++- nomad/service_registration_endpoint.go | 36 ++++++- nomad/service_registration_endpoint_test.go | 101 ++++++++++++++++++++ nomad/state/paginator/paginator.go | 4 +- nomad/state/paginator/tokenizer.go | 2 +- nomad/structs/service_registration.go | 18 ++++ nomad/structs/service_registration_test.go | 56 +++++++++++ website/content/api-docs/services.mdx | 17 ++++ 8 files changed, 240 insertions(+), 8 deletions(-) diff --git a/command/service_info.go b/command/service_info.go index 95f413c36..0297712ad 100644 --- a/command/service_info.go +++ b/command/service_info.go @@ -37,6 +37,9 @@ Service Info Options: -verbose Display full information. + -filter + Specifies an expression used to filter query results. + -json Output the service in JSON format. @@ -55,6 +58,7 @@ func (s *ServiceInfoCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(s.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ "-json": complete.PredictNothing, + "-filter": complete.PredictAnything, "-t": complete.PredictAnything, "-verbose": complete.PredictNothing, }) @@ -67,7 +71,7 @@ func (s *ServiceInfoCommand) Name() string { return "service info" } func (s *ServiceInfoCommand) Run(args []string) int { var ( json, verbose bool - tmpl string + tmpl, filter string ) flags := s.Meta.FlagSet(s.Name(), FlagSetClient) @@ -75,6 +79,7 @@ func (s *ServiceInfoCommand) Run(args []string) int { flags.BoolVar(&json, "json", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.StringVar(&tmpl, "t", "", "") + flags.StringVar(&filter, "filter", "", "") if err := flags.Parse(args); err != nil { return 1 } @@ -92,7 +97,12 @@ func (s *ServiceInfoCommand) Run(args []string) int { return 1 } - serviceInfo, _, err := client.Services().Get(args[0], nil) + // Set up the options to capture any filter passed. + opts := api.QueryOptions{ + Filter: filter, + } + + serviceInfo, _, err := client.Services().Get(args[0], &opts) if err != nil { s.Ui.Error(fmt.Sprintf("Error listing service registrations: %s", err)) return 1 diff --git a/nomad/service_registration_endpoint.go b/nomad/service_registration_endpoint.go index 3f3ff8763..3684fc460 100644 --- a/nomad/service_registration_endpoint.go +++ b/nomad/service_registration_endpoint.go @@ -1,6 +1,7 @@ package nomad import ( + "net/http" "time" "github.com/armon/go-metrics" @@ -8,6 +9,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" ) @@ -389,15 +391,41 @@ func (s *ServiceRegistration) GetService( return err } + // Generate the tokenizer to use for pagination using namespace and + // ID to ensure complete uniqueness. + tokenizer := paginator.NewStructsTokenizer(iter, + paginator.StructsTokenizerOptions{ + WithNamespace: true, + WithID: true, + }, + ) + // Set up our output after we have checked the error. var services []*structs.ServiceRegistration - // Iterate the iterator, appending all service registrations - // returned to the reply. - for raw := iter.Next(); raw != nil; raw = iter.Next() { - services = append(services, raw.(*structs.ServiceRegistration)) + // Build the paginator. This includes the function that is + // responsible for appending a registration to the services array. + paginatorImpl, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions, + func(raw interface{}) error { + services = append(services, raw.(*structs.ServiceRegistration)) + return nil + }) + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to create result paginator: %v", err) } + + // Calling page populates our output services array as well as + // returns the next token. + nextToken, err := paginatorImpl.Page() + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to read result page: %v", err) + } + + // Populate the reply. reply.Services = services + reply.NextToken = nextToken // Use the index table to populate the query meta as we have no way // of tracking the max index on deletes. diff --git a/nomad/service_registration_endpoint_test.go b/nomad/service_registration_endpoint_test.go index 3c8df91cb..164f4d8a1 100644 --- a/nomad/service_registration_endpoint_test.go +++ b/nomad/service_registration_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "fmt" "testing" "github.com/hashicorp/go-memdb" @@ -1073,6 +1074,106 @@ func TestServiceRegistration_GetService(t *testing.T) { }, name: "ACLs enabled using node secret", }, + { + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Generate mock services then upsert them individually using different indexes. + services := mock.ServiceRegistrations() + require.NoError(t, s.fsm.State().UpsertServiceRegistrations( + structs.MsgTypeTestSetup, 10, services)) + + // Generate a second set of mocks. Set the datacenter to the + // opposite or the mock, (dc1,dc2) which will be used to test + // filtering and alter the ID. + nextServices := mock.ServiceRegistrations() + nextServices[0].ID += "_next" + nextServices[0].Datacenter = "dc2" + nextServices[1].ID += "_next" + nextServices[1].Datacenter = "dc1" + require.NoError(t, s.fsm.State().UpsertServiceRegistrations( + structs.MsgTypeTestSetup, 20, nextServices)) + + // Create and test a request where we filter for service + // registrations in the default namespace, running within + // datacenter "dc2" only. + serviceRegReq := &structs.ServiceRegistrationByNameRequest{ + ServiceName: services[0].ServiceName, + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: DefaultRegion, + Filter: `Datacenter == "dc2"`, + }, + } + var serviceRegResp structs.ServiceRegistrationByNameResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + require.ElementsMatch(t, []*structs.ServiceRegistration{nextServices[0]}, serviceRegResp.Services) + + // Create a test function which can be used for each namespace + // to ensure cross-namespace functionality of pagination. + namespaceTestFn := func( + req *structs.ServiceRegistrationByNameRequest, + resp *structs.ServiceRegistrationByNameResponse) { + + // We have two service registrations, therefore loop twice in + // order to check the return array and pagination details. + for i := 0; i < 2; i++ { + + // The message makes debugging test failures easier as we + // are inside a loop. + msg := fmt.Sprintf("iteration %v of 2", i) + + err2 := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationGetServiceRPCMethod, req, resp) + require.NoError(t, err2, msg) + require.Len(t, resp.Services, 1, msg) + + // Anything but the first iteration should result in an + // empty token as we only have two entries. + switch i { + case 1: + require.Empty(t, resp.NextToken) + default: + require.NotEmpty(t, resp.NextToken) + req.NextToken = resp.NextToken + } + } + } + + // Test the default namespace pagnination. + serviceRegReq2 := structs.ServiceRegistrationByNameRequest{ + ServiceName: services[0].ServiceName, + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: DefaultRegion, + PerPage: 1, + }, + } + var serviceRegResp2 structs.ServiceRegistrationByNameResponse + namespaceTestFn(&serviceRegReq2, &serviceRegResp2) + + // Test the platform namespace pagnination. + serviceRegReq3 := structs.ServiceRegistrationByNameRequest{ + ServiceName: services[1].ServiceName, + QueryOptions: structs.QueryOptions{ + Namespace: services[1].Namespace, + Region: DefaultRegion, + PerPage: 1, + }, + } + var serviceRegResp3 structs.ServiceRegistrationByNameResponse + namespaceTestFn(&serviceRegReq3, &serviceRegResp3) + + }, + name: "filtering and pagination", + }, } for _, tc := range testCases { diff --git a/nomad/state/paginator/paginator.go b/nomad/state/paginator/paginator.go index f4aa3c2fe..5b4f31d2e 100644 --- a/nomad/state/paginator/paginator.go +++ b/nomad/state/paginator/paginator.go @@ -34,7 +34,9 @@ type Paginator struct { appendFunc func(interface{}) error } -// NewPaginator returns a new Paginator. +// NewPaginator returns a new Paginator. Any error creating the paginator is +// due to bad user filter input, RPC functions should therefore return a 400 +// error code along with an appropriate message. func NewPaginator(iter Iterator, tokenizer Tokenizer, filters []Filter, opts structs.QueryOptions, appendFunc func(interface{}) error) (*Paginator, error) { diff --git a/nomad/state/paginator/tokenizer.go b/nomad/state/paginator/tokenizer.go index 7f44d89aa..343d7e17f 100644 --- a/nomad/state/paginator/tokenizer.go +++ b/nomad/state/paginator/tokenizer.go @@ -68,7 +68,7 @@ type StructsTokenizer struct { } // NewStructsTokenizer returns a new StructsTokenizer. -func NewStructsTokenizer(it Iterator, opts StructsTokenizerOptions) StructsTokenizer { +func NewStructsTokenizer(_ Iterator, opts StructsTokenizerOptions) StructsTokenizer { return StructsTokenizer{ opts: opts, } diff --git a/nomad/structs/service_registration.go b/nomad/structs/service_registration.go index 88c03c4d8..31554fe5d 100644 --- a/nomad/structs/service_registration.go +++ b/nomad/structs/service_registration.go @@ -153,6 +153,24 @@ func (s *ServiceRegistration) Validate() error { return nil } +// GetID is a helper for getting the ID when the object may be nil and is +// required for pagination. +func (s *ServiceRegistration) GetID() string { + if s == nil { + return "" + } + return s.ID +} + +// GetNamespace is a helper for getting the namespace when the object may be +// nil and is required for pagination. +func (s *ServiceRegistration) GetNamespace() string { + if s == nil { + return "" + } + return s.Namespace +} + // ServiceRegistrationUpsertRequest is the request object used to upsert one or // more service registrations. type ServiceRegistrationUpsertRequest struct { diff --git a/nomad/structs/service_registration_test.go b/nomad/structs/service_registration_test.go index d8fe06b27..da9c0773b 100644 --- a/nomad/structs/service_registration_test.go +++ b/nomad/structs/service_registration_test.go @@ -382,6 +382,62 @@ func TestServiceRegistration_Equal(t *testing.T) { } } +func TestServiceRegistration_GetID(t *testing.T) { + testCases := []struct { + inputServiceRegistration *ServiceRegistration + expectedOutput string + name string + }{ + { + inputServiceRegistration: nil, + expectedOutput: "", + name: "nil input", + }, + { + inputServiceRegistration: &ServiceRegistration{ + ID: "_nomad-task-2873cf75-42e5-7c45-ca1c-415f3e18be3d-group-cache-example-cache-db", + }, + expectedOutput: "_nomad-task-2873cf75-42e5-7c45-ca1c-415f3e18be3d-group-cache-example-cache-db", + name: "generic input 1", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualOutput := tc.inputServiceRegistration.GetID() + require.Equal(t, tc.expectedOutput, actualOutput) + }) + } +} + +func TestServiceRegistration_GetNamespace(t *testing.T) { + testCases := []struct { + inputServiceRegistration *ServiceRegistration + expectedOutput string + name string + }{ + { + inputServiceRegistration: nil, + expectedOutput: "", + name: "nil input", + }, + { + inputServiceRegistration: &ServiceRegistration{ + Namespace: "platform", + }, + expectedOutput: "platform", + name: "generic input 1", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualOutput := tc.inputServiceRegistration.GetNamespace() + require.Equal(t, tc.expectedOutput, actualOutput) + }) + } +} + func TestServiceRegistrationListRequest_StaleReadSupport(t *testing.T) { req := &ServiceRegistrationListRequest{} require.True(t, req.IsRead()) diff --git a/website/content/api-docs/services.mdx b/website/content/api-docs/services.mdx index ca883557b..465a2ce9a 100644 --- a/website/content/api-docs/services.mdx +++ b/website/content/api-docs/services.mdx @@ -77,6 +77,23 @@ The table below shows this endpoint's support for - `:service_name` `(string: )` - Specifies the service name. This is specified as part of the path. +- `namespace` `(string: "default")` - Specifies the target namespace. This + parameter is used before any `filter` expression is applied. + +- `next_token` `(string: "")` - This endpoint supports paging. The `next_token` + parameter accepts a string which identifies the next expected service. This + value can be obtained from the `X-Nomad-NextToken` header from the previous + response. + +- `per_page` `(int: 0)` - Specifies a maximum number of services to return for + this request. If omitted, the response is not paginated. The value of the + `X-Nomad-NextToken` header of the last response can be used as the `next_token` + of the next request to fetch additional pages. + +- `filter` `(string: "")` - Specifies the [expression](/api-docs#filtering) + used to filter the results. Consider using pagination or a query parameter to + reduce resource used to serve the request. + ### Sample Request ```shell-session