service discovery: add pagination and filtering support to info requests (#12552)

* services: add pagination and filter support to info RPC.
* cli: add filter flag to service info command.
* docs: add pagination and filter details to services info API.
* paginator: minor updates to comment and func signature.
This commit is contained in:
James Rasell
2022-04-13 07:41:44 +02:00
committed by GitHub
parent 36c89f61bb
commit 281a0fb38e
8 changed files with 240 additions and 8 deletions

View File

@@ -37,6 +37,9 @@ Service Info Options:
-verbose
Display full information.
-filter
Specifies an expression used to filter query results.
-json
Output the service in JSON format.
@@ -55,6 +58,7 @@ func (s *ServiceInfoCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(s.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-json": complete.PredictNothing,
"-filter": complete.PredictAnything,
"-t": complete.PredictAnything,
"-verbose": complete.PredictNothing,
})
@@ -67,7 +71,7 @@ func (s *ServiceInfoCommand) Name() string { return "service info" }
func (s *ServiceInfoCommand) Run(args []string) int {
var (
json, verbose bool
tmpl string
tmpl, filter string
)
flags := s.Meta.FlagSet(s.Name(), FlagSetClient)
@@ -75,6 +79,7 @@ func (s *ServiceInfoCommand) Run(args []string) int {
flags.BoolVar(&json, "json", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
flags.StringVar(&tmpl, "t", "", "")
flags.StringVar(&filter, "filter", "", "")
if err := flags.Parse(args); err != nil {
return 1
}
@@ -92,7 +97,12 @@ func (s *ServiceInfoCommand) Run(args []string) int {
return 1
}
serviceInfo, _, err := client.Services().Get(args[0], nil)
// Set up the options to capture any filter passed.
opts := api.QueryOptions{
Filter: filter,
}
serviceInfo, _, err := client.Services().Get(args[0], &opts)
if err != nil {
s.Ui.Error(fmt.Sprintf("Error listing service registrations: %s", err))
return 1

View File

@@ -1,6 +1,7 @@
package nomad
import (
"net/http"
"time"
"github.com/armon/go-metrics"
@@ -8,6 +9,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -389,15 +391,41 @@ func (s *ServiceRegistration) GetService(
return err
}
// Generate the tokenizer to use for pagination using namespace and
// ID to ensure complete uniqueness.
tokenizer := paginator.NewStructsTokenizer(iter,
paginator.StructsTokenizerOptions{
WithNamespace: true,
WithID: true,
},
)
// Set up our output after we have checked the error.
var services []*structs.ServiceRegistration
// Iterate the iterator, appending all service registrations
// returned to the reply.
for raw := iter.Next(); raw != nil; raw = iter.Next() {
services = append(services, raw.(*structs.ServiceRegistration))
// Build the paginator. This includes the function that is
// responsible for appending a registration to the services array.
paginatorImpl, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions,
func(raw interface{}) error {
services = append(services, raw.(*structs.ServiceRegistration))
return nil
})
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to create result paginator: %v", err)
}
// Calling page populates our output services array as well as
// returns the next token.
nextToken, err := paginatorImpl.Page()
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to read result page: %v", err)
}
// Populate the reply.
reply.Services = services
reply.NextToken = nextToken
// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.

View File

@@ -1,6 +1,7 @@
package nomad
import (
"fmt"
"testing"
"github.com/hashicorp/go-memdb"
@@ -1073,6 +1074,106 @@ func TestServiceRegistration_GetService(t *testing.T) {
},
name: "ACLs enabled using node secret",
},
{
serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) {
server, cleanup := TestServer(t, nil)
return server, nil, cleanup
},
testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Generate mock services then upsert them individually using different indexes.
services := mock.ServiceRegistrations()
require.NoError(t, s.fsm.State().UpsertServiceRegistrations(
structs.MsgTypeTestSetup, 10, services))
// Generate a second set of mocks. Set the datacenter to the
// opposite or the mock, (dc1,dc2) which will be used to test
// filtering and alter the ID.
nextServices := mock.ServiceRegistrations()
nextServices[0].ID += "_next"
nextServices[0].Datacenter = "dc2"
nextServices[1].ID += "_next"
nextServices[1].Datacenter = "dc1"
require.NoError(t, s.fsm.State().UpsertServiceRegistrations(
structs.MsgTypeTestSetup, 20, nextServices))
// Create and test a request where we filter for service
// registrations in the default namespace, running within
// datacenter "dc2" only.
serviceRegReq := &structs.ServiceRegistrationByNameRequest{
ServiceName: services[0].ServiceName,
QueryOptions: structs.QueryOptions{
Namespace: structs.DefaultNamespace,
Region: DefaultRegion,
Filter: `Datacenter == "dc2"`,
},
}
var serviceRegResp structs.ServiceRegistrationByNameResponse
err := msgpackrpc.CallWithCodec(
codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp)
require.NoError(t, err)
require.ElementsMatch(t, []*structs.ServiceRegistration{nextServices[0]}, serviceRegResp.Services)
// Create a test function which can be used for each namespace
// to ensure cross-namespace functionality of pagination.
namespaceTestFn := func(
req *structs.ServiceRegistrationByNameRequest,
resp *structs.ServiceRegistrationByNameResponse) {
// We have two service registrations, therefore loop twice in
// order to check the return array and pagination details.
for i := 0; i < 2; i++ {
// The message makes debugging test failures easier as we
// are inside a loop.
msg := fmt.Sprintf("iteration %v of 2", i)
err2 := msgpackrpc.CallWithCodec(
codec, structs.ServiceRegistrationGetServiceRPCMethod, req, resp)
require.NoError(t, err2, msg)
require.Len(t, resp.Services, 1, msg)
// Anything but the first iteration should result in an
// empty token as we only have two entries.
switch i {
case 1:
require.Empty(t, resp.NextToken)
default:
require.NotEmpty(t, resp.NextToken)
req.NextToken = resp.NextToken
}
}
}
// Test the default namespace pagnination.
serviceRegReq2 := structs.ServiceRegistrationByNameRequest{
ServiceName: services[0].ServiceName,
QueryOptions: structs.QueryOptions{
Namespace: structs.DefaultNamespace,
Region: DefaultRegion,
PerPage: 1,
},
}
var serviceRegResp2 structs.ServiceRegistrationByNameResponse
namespaceTestFn(&serviceRegReq2, &serviceRegResp2)
// Test the platform namespace pagnination.
serviceRegReq3 := structs.ServiceRegistrationByNameRequest{
ServiceName: services[1].ServiceName,
QueryOptions: structs.QueryOptions{
Namespace: services[1].Namespace,
Region: DefaultRegion,
PerPage: 1,
},
}
var serviceRegResp3 structs.ServiceRegistrationByNameResponse
namespaceTestFn(&serviceRegReq3, &serviceRegResp3)
},
name: "filtering and pagination",
},
}
for _, tc := range testCases {

View File

@@ -34,7 +34,9 @@ type Paginator struct {
appendFunc func(interface{}) error
}
// NewPaginator returns a new Paginator.
// NewPaginator returns a new Paginator. Any error creating the paginator is
// due to bad user filter input, RPC functions should therefore return a 400
// error code along with an appropriate message.
func NewPaginator(iter Iterator, tokenizer Tokenizer, filters []Filter,
opts structs.QueryOptions, appendFunc func(interface{}) error) (*Paginator, error) {

View File

@@ -68,7 +68,7 @@ type StructsTokenizer struct {
}
// NewStructsTokenizer returns a new StructsTokenizer.
func NewStructsTokenizer(it Iterator, opts StructsTokenizerOptions) StructsTokenizer {
func NewStructsTokenizer(_ Iterator, opts StructsTokenizerOptions) StructsTokenizer {
return StructsTokenizer{
opts: opts,
}

View File

@@ -153,6 +153,24 @@ func (s *ServiceRegistration) Validate() error {
return nil
}
// GetID is a helper for getting the ID when the object may be nil and is
// required for pagination.
func (s *ServiceRegistration) GetID() string {
if s == nil {
return ""
}
return s.ID
}
// GetNamespace is a helper for getting the namespace when the object may be
// nil and is required for pagination.
func (s *ServiceRegistration) GetNamespace() string {
if s == nil {
return ""
}
return s.Namespace
}
// ServiceRegistrationUpsertRequest is the request object used to upsert one or
// more service registrations.
type ServiceRegistrationUpsertRequest struct {

View File

@@ -382,6 +382,62 @@ func TestServiceRegistration_Equal(t *testing.T) {
}
}
func TestServiceRegistration_GetID(t *testing.T) {
testCases := []struct {
inputServiceRegistration *ServiceRegistration
expectedOutput string
name string
}{
{
inputServiceRegistration: nil,
expectedOutput: "",
name: "nil input",
},
{
inputServiceRegistration: &ServiceRegistration{
ID: "_nomad-task-2873cf75-42e5-7c45-ca1c-415f3e18be3d-group-cache-example-cache-db",
},
expectedOutput: "_nomad-task-2873cf75-42e5-7c45-ca1c-415f3e18be3d-group-cache-example-cache-db",
name: "generic input 1",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualOutput := tc.inputServiceRegistration.GetID()
require.Equal(t, tc.expectedOutput, actualOutput)
})
}
}
func TestServiceRegistration_GetNamespace(t *testing.T) {
testCases := []struct {
inputServiceRegistration *ServiceRegistration
expectedOutput string
name string
}{
{
inputServiceRegistration: nil,
expectedOutput: "",
name: "nil input",
},
{
inputServiceRegistration: &ServiceRegistration{
Namespace: "platform",
},
expectedOutput: "platform",
name: "generic input 1",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualOutput := tc.inputServiceRegistration.GetNamespace()
require.Equal(t, tc.expectedOutput, actualOutput)
})
}
}
func TestServiceRegistrationListRequest_StaleReadSupport(t *testing.T) {
req := &ServiceRegistrationListRequest{}
require.True(t, req.IsRead())

View File

@@ -77,6 +77,23 @@ The table below shows this endpoint's support for
- `:service_name` `(string: <required>)` - Specifies the service name. This is
specified as part of the path.
- `namespace` `(string: "default")` - Specifies the target namespace. This
parameter is used before any `filter` expression is applied.
- `next_token` `(string: "")` - This endpoint supports paging. The `next_token`
parameter accepts a string which identifies the next expected service. This
value can be obtained from the `X-Nomad-NextToken` header from the previous
response.
- `per_page` `(int: 0)` - Specifies a maximum number of services to return for
this request. If omitted, the response is not paginated. The value of the
`X-Nomad-NextToken` header of the last response can be used as the `next_token`
of the next request to fetch additional pages.
- `filter` `(string: "")` - Specifies the [expression](/api-docs#filtering)
used to filter the results. Consider using pagination or a query parameter to
reduce resource used to serve the request.
### Sample Request
```shell-session