From f0f4cbb848f760b93e8699300cfd36f16ebb1f1a Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 6 Jun 2023 10:43:43 -0400 Subject: [PATCH] node pools: list nodes in pool (#17413) --- acl/acl.go | 3 + command/agent/http.go | 26 ++ command/agent/http_test.go | 68 ++++ command/agent/node_endpoint.go | 22 +- command/agent/node_pool_endpoint.go | 38 ++ command/agent/node_pool_endpoint_test.go | 108 ++++++ nomad/node_pool_endpoint.go | 92 ++++- nomad/node_pool_endpoint_test.go | 325 +++++++++++++++++- nomad/state/schema.go | 8 + nomad/state/state_store.go | 14 + nomad/state/state_store_test.go | 63 ++++ nomad/structs/node_pool.go | 13 + website/content/api-docs/node-pools.mdx | 132 +++++++ .../docs/operations/metrics-reference.mdx | 3 +- 14 files changed, 895 insertions(+), 20 deletions(-) diff --git a/acl/acl.go b/acl/acl.go index 5e3d79c23..26b67eb85 100644 --- a/acl/acl.go +++ b/acl/acl.go @@ -710,6 +710,9 @@ func (a *ACL) AllowAgentWrite() bool { // AllowNodeRead checks if read operations are allowed for a node func (a *ACL) AllowNodeRead() bool { switch { + // a is nil if ACLs are disabled. + case a == nil: + return true case a.management: return true case a.node == PolicyWrite: diff --git a/command/agent/http.go b/command/agent/http.go index e27992ae0..2a36979d0 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -1051,6 +1051,32 @@ func parseNode(req *http.Request, nodeID *string) { } } +// parseNodeListStubFields parses query parameters related to node list stubs +// fields. +func parseNodeListStubFields(req *http.Request) (*structs.NodeStubFields, error) { + fields := &structs.NodeStubFields{} + + // Parse resources field selection. + resources, err := parseBool(req, "resources") + if err != nil { + return nil, err + } + if resources != nil { + fields.Resources = *resources + } + + // Parse OS field selection. + os, err := parseBool(req, "os") + if err != nil { + return nil, err + } + if os != nil { + fields.OS = *os + } + + return fields, nil +} + // parseWriteRequest is a convenience method for endpoints that need to parse a // write request. func (s *HTTPServer) parseWriteRequest(req *http.Request, w *structs.WriteRequest) { diff --git a/command/agent/http_test.go b/command/agent/http_test.go index bd541029d..7287d8d79 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -727,6 +727,74 @@ func TestParsePagination(t *testing.T) { } } +func TestParseNodeListStubFields(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + req string + expected *structs.NodeStubFields + expectedErr string + }{ + { + name: "parse resources", + req: "/v1/nodes?resources=true", + expected: &structs.NodeStubFields{ + Resources: true, + }, + }, + { + name: "parse os", + req: "/v1/nodes?os=true", + expected: &structs.NodeStubFields{ + OS: true, + }, + }, + { + name: "no resources but with os", + req: "/v1/nodes?resources=false&os=true", + expected: &structs.NodeStubFields{ + OS: true, + }, + }, + { + name: "invalid resources value", + req: "/v1/nodes?resources=invalid", + expectedErr: `Failed to parse value of "resources"`, + }, + { + name: "invalid os value", + req: "/v1/nodes?os=invalid", + expectedErr: `Failed to parse value of "os"`, + }, + { + name: "invalid key is ignored", + req: "/v1/nodes?key=invalid", + expected: &structs.NodeStubFields{}, + }, + { + name: "no field", + req: "/v1/nodes", + expected: &structs.NodeStubFields{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + req, err := http.NewRequest("GET", tc.req, nil) + must.NoError(t, err) + + got, err := parseNodeListStubFields(req) + if tc.expectedErr != "" { + must.ErrorContains(t, err, tc.expectedErr) + } else { + must.NoError(t, err) + must.Eq(t, tc.expected, got) + } + }) + } +} + // TestHTTP_VerifyHTTPSClient asserts that a client certificate signed by the // appropriate CA is required when VerifyHTTPSClient=true. func TestHTTP_VerifyHTTPSClient(t *testing.T) { diff --git a/command/agent/node_endpoint.go b/command/agent/node_endpoint.go index 38a40f8ba..17c92eb7d 100644 --- a/command/agent/node_endpoint.go +++ b/command/agent/node_endpoint.go @@ -4,6 +4,7 @@ package agent import ( + "fmt" "net/http" "strings" @@ -21,25 +22,12 @@ func (s *HTTPServer) NodesRequest(resp http.ResponseWriter, req *http.Request) ( return nil, nil } - args.Fields = &structs.NodeStubFields{} - // Parse resources field selection - resources, err := parseBool(req, "resources") + // Parse fields selection. + fields, err := parseNodeListStubFields(req) if err != nil { - return nil, err - } - if resources != nil { - args.Fields.Resources = *resources - } - - // Parse OS - os, err := parseBool(req, "os") - if err != nil { - return nil, err - } - - if os != nil { - args.Fields.OS = *os + return nil, CodedError(http.StatusBadRequest, fmt.Errorf("Failed to parse node list fields: %v", err).Error()) } + args.Fields = fields var out structs.NodeListResponse if err := s.agent.RPC("Node.List", &args, &out); err != nil { diff --git a/command/agent/node_pool_endpoint.go b/command/agent/node_pool_endpoint.go index 8f83bde75..754c79318 100644 --- a/command/agent/node_pool_endpoint.go +++ b/command/agent/node_pool_endpoint.go @@ -4,6 +4,7 @@ package agent import ( + "fmt" "net/http" "strings" @@ -24,6 +25,9 @@ func (s *HTTPServer) NodePoolsRequest(resp http.ResponseWriter, req *http.Reques func (s *HTTPServer) NodePoolSpecificRequest(resp http.ResponseWriter, req *http.Request) (any, error) { path := strings.TrimPrefix(req.URL.Path, "/v1/node/pool/") switch { + case strings.HasSuffix(path, "/nodes"): + poolName := strings.TrimSuffix(path, "/nodes") + return s.nodePoolNodesList(resp, req, poolName) default: return s.nodePoolCRUD(resp, req, path) } @@ -119,3 +123,37 @@ func (s *HTTPServer) nodePoolDelete(resp http.ResponseWriter, req *http.Request, setIndex(resp, out.Index) return nil, nil } + +func (s *HTTPServer) nodePoolNodesList(resp http.ResponseWriter, req *http.Request, poolName string) (interface{}, error) { + args := structs.NodePoolNodesRequest{ + Name: poolName, + } + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + // Parse node fields selection. + fields, err := parseNodeListStubFields(req) + if err != nil { + return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Failed to parse node list fields: %v", err)) + } + args.Fields = fields + + if args.Prefix != "" { + // the prefix argument is ambiguous for this endpoint (does it refer to + // the node pool name or the node IDs like /v1/nodes?) so the RPC + // handler ignores it + return nil, CodedError(http.StatusBadRequest, "prefix argument not allowed") + } + + var out structs.NodePoolNodesResponse + if err := s.agent.RPC("NodePool.ListNodes", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.Nodes == nil { + out.Nodes = make([]*structs.NodeListStub, 0) + } + return out.Nodes, nil +} diff --git a/command/agent/node_pool_endpoint_test.go b/command/agent/node_pool_endpoint_test.go index c8a69ea85..1efe17957 100644 --- a/command/agent/node_pool_endpoint_test.go +++ b/command/agent/node_pool_endpoint_test.go @@ -312,3 +312,111 @@ func TestHTTP_NodePool_Delete(t *testing.T) { must.Nil(t, got) }) } + +func TestHTTP_NodePool_NodesList(t *testing.T) { + ci.Parallel(t) + httpTest(t, + func(c *Config) { + // Disable client so it doesn't impact tests since we're registering + // our own test nodes. + c.Client.Enabled = false + }, + func(s *TestAgent) { + // Populate state with test data. + pool1 := mock.NodePool() + pool2 := mock.NodePool() + args := structs.NodePoolUpsertRequest{ + NodePools: []*structs.NodePool{pool1, pool2}, + } + var resp structs.GenericResponse + err := s.Agent.RPC("NodePool.UpsertNodePools", &args, &resp) + must.NoError(t, err) + + // Split test nodes between default, pool1, and pool2. + nodesByPool := make(map[string][]*structs.Node) + for i := 0; i < 10; i++ { + node := mock.Node() + switch i % 3 { + case 0: + // Leave node pool value empty so node goes to default. + case 1: + node.NodePool = pool1.Name + case 2: + node.NodePool = pool2.Name + } + nodeRegReq := structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + var nodeRegResp structs.NodeUpdateResponse + err := s.Agent.RPC("Node.Register", &nodeRegReq, &nodeRegResp) + must.NoError(t, err) + + nodesByPool[node.NodePool] = append(nodesByPool[node.NodePool], node) + } + + testCases := []struct { + name string + pool string + args string + expectedNodes []*structs.Node + expectedErr string + validateFn func(*testing.T, []*structs.NodeListStub) + }{ + { + name: "nodes in default", + pool: structs.NodePoolDefault, + expectedNodes: nodesByPool[structs.NodePoolDefault], + validateFn: func(t *testing.T, stubs []*structs.NodeListStub) { + must.Nil(t, stubs[0].NodeResources) + }, + }, + { + name: "nodes in pool1 with resources", + pool: pool1.Name, + args: "resources=true", + expectedNodes: nodesByPool[pool1.Name], + validateFn: func(t *testing.T, stubs []*structs.NodeListStub) { + must.NotNil(t, stubs[0].NodeResources) + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Make HTTP request. + path := fmt.Sprintf("/v1/node/pool/%s/nodes?%s", tc.pool, tc.args) + req, err := http.NewRequest("GET", path, nil) + must.NoError(t, err) + respW := httptest.NewRecorder() + + obj, err := s.Server.NodePoolSpecificRequest(respW, req) + if tc.expectedErr != "" { + must.ErrorContains(t, err, tc.expectedErr) + return + } + must.NoError(t, err) + + // Verify request only has expected nodes. + stubs := obj.([]*structs.NodeListStub) + must.Len(t, len(tc.expectedNodes), stubs) + for _, node := range tc.expectedNodes { + must.SliceContainsFunc(t, stubs, node, func(s *structs.NodeListStub, n *structs.Node) bool { + return s.ID == n.ID + }) + } + + // Verify respose. + if tc.validateFn != nil { + tc.validateFn(t, stubs) + } + + // Verify response index. + gotIndex, err := strconv.ParseUint(respW.HeaderMap.Get("X-Nomad-Index"), 10, 64) + must.NoError(t, err) + must.NonZero(t, gotIndex) + }) + } + }) +} diff --git a/nomad/node_pool_endpoint.go b/nomad/node_pool_endpoint.go index 46abc0863..72b7fd8fa 100644 --- a/nomad/node_pool_endpoint.go +++ b/nomad/node_pool_endpoint.go @@ -262,7 +262,7 @@ func (n *NodePool) ListJobs(args *structs.NodePoolJobsRequest, reply *structs.No if done, err := n.srv.forward("NodePool.ListJobs", args, args, reply); done { return err } - n.srv.MeasureRPCRate("node_pool", structs.RateMetricRead, args) + n.srv.MeasureRPCRate("node_pool", structs.RateMetricList, args) if authErr != nil { return structs.ErrPermissionDenied } @@ -383,3 +383,93 @@ func (n *NodePool) ListJobs(args *structs.NodePoolJobsRequest, reply *structs.No }} return n.srv.blockingRPC(&opts) } + +// ListNodes is used to retrieve a list of nodes for a give node pool. It +// supports pagination and filtering. +func (n *NodePool) ListNodes(args *structs.NodePoolNodesRequest, reply *structs.NodePoolNodesResponse) error { + authErr := n.srv.Authenticate(n.ctx, args) + if done, err := n.srv.forward("NodePool.ListNodes", args, args, reply); done { + return err + } + n.srv.MeasureRPCRate("node_pool", structs.RateMetricList, args) + if authErr != nil { + return structs.ErrPermissionDenied + } + defer metrics.MeasureSince([]string{"nomad", "node_pool", "list_nodes"}, time.Now()) + + // Resolve ACL token and verify it has read capability for nodes and the + // node pool. + aclObj, err := n.srv.ResolveACL(args) + if err != nil { + return err + } + + allowed := aclObj.AllowNodeRead() && + aclObj.AllowNodePoolOperation(args.Name, acl.NodePoolCapabilityRead) + if !allowed { + return structs.ErrPermissionDenied + } + + // Setup blocking query. + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, store *state.StateStore) error { + // Verify node pool exists. + pool, err := store.NodePoolByName(ws, args.Name) + if err != nil { + return err + } + if pool == nil { + return nil + } + + // Fetch nodes in the pool. + var iter memdb.ResultIterator + if args.Name == structs.NodePoolAll { + iter, err = store.Nodes(ws) + } else { + iter, err = store.NodesByNodePool(ws, args.Name) + } + if err != nil { + return err + } + + // Setup paginator by node ID. + pageOpts := paginator.StructsTokenizerOptions{ + WithID: true, + } + tokenizer := paginator.NewStructsTokenizer(iter, pageOpts) + + var nodes []*structs.NodeListStub + pager, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions, + func(raw interface{}) error { + node := raw.(*structs.Node) + nodes = append(nodes, node.Stub(args.Fields)) + return nil + }) + if err != nil { + return structs.NewErrRPCCodedf(http.StatusBadRequest, "failed to create result paginator: %v", err) + } + + nextToken, err := pager.Page() + if err != nil { + return structs.NewErrRPCCodedf(http.StatusBadRequest, "failed to read result page: %v", err) + } + + reply.QueryMeta.NextToken = nextToken + reply.Nodes = nodes + + // Use the last index that affected the nodes table. + index, err := store.Index("nodes") + if err != nil { + return err + } + reply.Index = helper.Max(1, index) + + // Set the query response. + n.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return n.srv.blockingRPC(&opts) +} diff --git a/nomad/node_pool_endpoint_test.go b/nomad/node_pool_endpoint_test.go index 4bd3b41f4..febc6a487 100644 --- a/nomad/node_pool_endpoint_test.go +++ b/nomad/node_pool_endpoint_test.go @@ -1482,5 +1482,328 @@ func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) { must.Sprint("unexpected NextToken")) }) } - +} + +func TestNodePoolEndpoint_ListNodes(t *testing.T) { + ci.Parallel(t) + + s, cleanupS := TestServer(t, nil) + defer cleanupS() + + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Populate state with test data. + pool1 := mock.NodePool() + pool2 := mock.NodePool() + err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool1, pool2}) + must.NoError(t, err) + + // Split test nodes between default, pool1, and pool2. + for i := 0; i < 9; i++ { + node := mock.Node() + switch i % 3 { + case 0: + node.ID = fmt.Sprintf("00000000-0000-0000-0000-0000000000%02d", i/3) + node.NodePool = structs.NodePoolDefault + case 1: + node.ID = fmt.Sprintf("11111111-0000-0000-0000-0000000000%02d", i/3) + node.NodePool = pool1.Name + case 2: + node.ID = fmt.Sprintf("22222222-0000-0000-0000-0000000000%02d", i/3) + node.NodePool = pool2.Name + } + switch i % 2 { + case 0: + node.Attributes["os.name"] = "Windows" + case 1: + node.Attributes["os.name"] = "Linux" + } + err := s.fsm.State().UpsertNode(structs.MsgTypeTestSetup, uint64(1000+1), node) + must.NoError(t, err) + } + + testCases := []struct { + name string + req *structs.NodePoolNodesRequest + expectedErr string + expected []string + expectedNextToken string + }{ + { + name: "nodes in default", + req: &structs.NodePoolNodesRequest{ + Name: structs.NodePoolDefault, + }, + expected: []string{ + "00000000-0000-0000-0000-000000000000", + "00000000-0000-0000-0000-000000000001", + "00000000-0000-0000-0000-000000000002", + }, + }, + { + name: "nodes in all", + req: &structs.NodePoolNodesRequest{ + Name: structs.NodePoolAll, + }, + expected: []string{ + "00000000-0000-0000-0000-000000000000", + "00000000-0000-0000-0000-000000000001", + "00000000-0000-0000-0000-000000000002", + "11111111-0000-0000-0000-000000000000", + "11111111-0000-0000-0000-000000000001", + "11111111-0000-0000-0000-000000000002", + "22222222-0000-0000-0000-000000000000", + "22222222-0000-0000-0000-000000000001", + "22222222-0000-0000-0000-000000000002", + }, + }, + { + name: "nodes in pool1 with OS", + req: &structs.NodePoolNodesRequest{ + Name: pool1.Name, + Fields: &structs.NodeStubFields{ + OS: true, + }, + }, + expected: []string{ + "11111111-0000-0000-0000-000000000000", + "11111111-0000-0000-0000-000000000001", + "11111111-0000-0000-0000-000000000002", + }, + }, + { + name: "nodes in pool2 filtered by OS", + req: &structs.NodePoolNodesRequest{ + Name: pool2.Name, + QueryOptions: structs.QueryOptions{ + Filter: `Attributes["os.name"] == "Windows"`, + }, + }, + expected: []string{ + "22222222-0000-0000-0000-000000000000", + "22222222-0000-0000-0000-000000000002", + }, + }, + { + name: "nodes in pool1 paginated with resources", + req: &structs.NodePoolNodesRequest{ + Name: pool1.Name, + Fields: &structs.NodeStubFields{ + Resources: true, + }, + QueryOptions: structs.QueryOptions{ + PerPage: 2, + }, + }, + expected: []string{ + "11111111-0000-0000-0000-000000000000", + "11111111-0000-0000-0000-000000000001", + }, + expectedNextToken: "11111111-0000-0000-0000-000000000002", + }, + { + name: "nodes in pool1 paginated with resources - page 2", + req: &structs.NodePoolNodesRequest{ + Name: pool1.Name, + Fields: &structs.NodeStubFields{ + Resources: true, + }, + QueryOptions: structs.QueryOptions{ + PerPage: 2, + NextToken: "11111111-0000-0000-0000-000000000002", + }, + }, + expected: []string{ + "11111111-0000-0000-0000-000000000002", + }, + expectedNextToken: "", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Always send the request to the global region. + tc.req.Region = "global" + + // Make node pool nodes request. + var resp structs.NodePoolNodesResponse + err := msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", tc.req, &resp) + + // Check response. + if tc.expectedErr != "" { + must.ErrorContains(t, err, tc.expectedErr) + must.SliceEmpty(t, resp.Nodes) + } else { + must.NoError(t, err) + + got := make([]string, len(resp.Nodes)) + for i, stub := range resp.Nodes { + got[i] = stub.ID + } + must.Eq(t, tc.expected, got) + must.Eq(t, tc.expectedNextToken, resp.NextToken) + + if tc.req.Fields != nil { + if tc.req.Fields.Resources { + must.NotNil(t, resp.Nodes[0].NodeResources) + must.NotNil(t, resp.Nodes[0].ReservedResources) + } + if tc.req.Fields.OS { + must.NotEq(t, "", resp.Nodes[0].Attributes["os.name"]) + } + } + } + }) + } +} + +func TestNodePoolEndpoint_ListNodes_ACL(t *testing.T) { + ci.Parallel(t) + + s, root, cleanupS := TestACLServer(t, nil) + defer cleanupS() + + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Populate state. + pool := mock.NodePool() + pool.Name = "dev-1" + err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool}) + must.NoError(t, err) + + node := mock.Node() + node.NodePool = pool.Name + err = s.fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1001, node) + must.NoError(t, err) + + // Create test ACL tokens. + validToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1002, "valid", + fmt.Sprintf("%s\n%s", mock.NodePoolPolicy("dev-*", "read", nil), mock.NodePolicy("read")), + ) + poolOnlyToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1004, "pool-only", + mock.NodePoolPolicy("dev-*", "read", nil), + ) + nodeOnlyToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1006, "node-only", + mock.NodePolicy("read"), + ) + noPolicyToken := mock.CreateToken(t, s.fsm.State(), 1008, nil) + + testCases := []struct { + name string + pool string + token string + expected []string + expectedErr string + }{ + { + name: "management token is allowed", + token: root.SecretID, + pool: pool.Name, + expected: []string{node.ID}, + }, + { + name: "valid token is allowed", + token: validToken.SecretID, + pool: pool.Name, + expected: []string{node.ID}, + }, + { + name: "pool only not enough", + token: poolOnlyToken.SecretID, + pool: pool.Name, + expectedErr: structs.ErrPermissionDenied.Error(), + }, + { + name: "node only not enough", + token: nodeOnlyToken.SecretID, + pool: pool.Name, + expectedErr: structs.ErrPermissionDenied.Error(), + }, + { + name: "no policy not allowed", + token: noPolicyToken.SecretID, + pool: pool.Name, + expectedErr: structs.ErrPermissionDenied.Error(), + }, + { + name: "token not allowed for pool", + token: validToken.SecretID, + pool: structs.NodePoolDefault, + expectedErr: structs.ErrPermissionDenied.Error(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Make node pool ndoes request. + req := &structs.NodePoolNodesRequest{ + Name: tc.pool, + QueryOptions: structs.QueryOptions{ + Region: "global", + AuthToken: tc.token, + }, + } + var resp structs.NodePoolNodesResponse + err := msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", req, &resp) + if tc.expectedErr != "" { + must.ErrorContains(t, err, tc.expectedErr) + must.SliceEmpty(t, resp.Nodes) + } else { + must.NoError(t, err) + + // Check response. + got := make([]string, len(resp.Nodes)) + for i, node := range resp.Nodes { + got[i] = node.ID + } + must.Eq(t, tc.expected, got) + } + }) + } +} + +func TestNodePoolEndpoint_ListNodes_BlockingQuery(t *testing.T) { + ci.Parallel(t) + + s, cleanupS := TestServer(t, nil) + defer cleanupS() + + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Populate state with some node pools. + pool := mock.NodePool() + err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool}) + must.NoError(t, err) + + // Register node in pool. + // Insert triggers watchers. + node := mock.Node() + node.NodePool = pool.Name + time.AfterFunc(100*time.Millisecond, func() { + s.fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1001, node) + }) + + req := &structs.NodePoolNodesRequest{ + Name: pool.Name, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 1000, + }, + } + var resp structs.NodePoolNodesResponse + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", req, &resp) + must.NoError(t, err) + must.Eq(t, 1001, resp.Index) + + // Delete triggers watchers. + time.AfterFunc(100*time.Millisecond, func() { + s.fsm.State().DeleteNode(structs.MsgTypeTestSetup, 1002, []string{node.ID}) + }) + + req.MinQueryIndex = 1001 + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", req, &resp) + must.NoError(t, err) + must.Eq(t, 1002, resp.Index) } diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 50b1b7050..e1738bf8a 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -160,6 +160,14 @@ func nodeTableSchema() *memdb.TableSchema { Field: "SecretID", }, }, + "node_pool": { + Name: "node_pool", + AllowMissing: false, + Unique: false, + Indexer: &memdb.StringFieldIndex{ + Field: "NodePool", + }, + }, }, } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index df294a4c1..4fe907060 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1635,6 +1635,20 @@ func (s *StateStore) NodeBySecretID(ws memdb.WatchSet, secretID string) (*struct return nil, nil } +// NodesByNodePool returns an iterator over all nodes that are part of the +// given node pool. +func (s *StateStore) NodesByNodePool(ws memdb.WatchSet, pool string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := txn.Get("nodes", "node_pool", pool) + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + return iter, nil +} + // Nodes returns an iterator over all the nodes func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 61ff10089..2296c1582 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2012,6 +2012,69 @@ func TestStateStore_NodesByIDPrefix(t *testing.T) { } } +func TestStateStore_NodesByNodePool(t *testing.T) { + ci.Parallel(t) + + state := testStateStore(t) + + pool := mock.NodePool() + err := state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool}) + must.NoError(t, err) + + node1 := mock.Node() + node1.NodePool = structs.NodePoolDefault + err = state.UpsertNode(structs.MsgTypeTestSetup, 1001, node1) + must.NoError(t, err) + + node2 := mock.Node() + node2.NodePool = pool.Name + err = state.UpsertNode(structs.MsgTypeTestSetup, 1002, node2) + must.NoError(t, err) + + testCases := []struct { + name string + pool string + expected []string + }{ + { + name: "default", + pool: structs.NodePoolDefault, + expected: []string{ + node1.ID, + }, + }, + { + name: "pool", + pool: pool.Name, + expected: []string{ + node2.ID, + }, + }, + { + name: "empty pool", + pool: "", + expected: []string{}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create watcher to test that getters don't cause it to fire. + ws := memdb.NewWatchSet() + + iter, err := state.NodesByNodePool(ws, tc.pool) + must.NoError(t, err) + + got := []string{} + for raw := iter.Next(); raw != nil; raw = iter.Next() { + got = append(got, raw.(*structs.Node).ID) + } + + must.SliceContainsAll(t, tc.expected, got) + must.False(t, watchFired(ws)) + }) + } +} + func TestStateStore_UpsertJob_Job(t *testing.T) { ci.Parallel(t) diff --git a/nomad/structs/node_pool.go b/nomad/structs/node_pool.go index ae5819c1a..7616ae4f0 100644 --- a/nomad/structs/node_pool.go +++ b/nomad/structs/node_pool.go @@ -182,6 +182,19 @@ type NodePoolDeleteRequest struct { WriteRequest } +// NodePoolNodesRequest is used to list all nodes that are part of a node pool. +type NodePoolNodesRequest struct { + Name string + Fields *NodeStubFields + QueryOptions +} + +// NodePoolNodesResponse is used to return a list nodes in the node pool. +type NodePoolNodesResponse struct { + Nodes []*NodeListStub + QueryMeta +} + // NodePoolJobsRequest is used to make a request for the jobs in a specific node pool. type NodePoolJobsRequest struct { Name string diff --git a/website/content/api-docs/node-pools.mdx b/website/content/api-docs/node-pools.mdx index 3b00286dd..9250ba5d8 100644 --- a/website/content/api-docs/node-pools.mdx +++ b/website/content/api-docs/node-pools.mdx @@ -210,4 +210,136 @@ The table below shows this endpoint's support for $ nomad operator api -X DELETE /v1/node/pool/prod-eng ``` +## List Node Pool Nodes + +This endpoint list the nodes in a node pool. + +| Method | Path | Produces | +| ------ | -------------------------------- | ------------------ | +| `GET` | `/v1/node/pool/:node_pool/nodes` | `application/json` | + +The table below shows this endpoint's support for +[blocking queries](/nomad/api-docs#blocking-queries) and +[required ACLs](/nomad/api-docs#acls). + +| Blocking Queries | ACL Required | +| ---------------- | ----------------------------------- | +| `YES` | `node:read`
`node_pool:read` | + +### Parameters + +- `:node_pool` `(string: )`- Specifies the node pool to list nodes. + +- `next_token` `(string: "")` - This endpoint supports paging. The `next_token` + parameter accepts a string which identifies the next expected node. This + value can be obtained from the `X-Nomad-NextToken` header from the previous + response. + +- `per_page` `(int: 0)` - Specifies a maximum number of nodes to return for + this request. If omitted, the response is not paginated. The value of the + `X-Nomad-NextToken` header of the last response can be used as the + `next_token` of the next request to fetch additional pages. + +- `filter` `(string: "")` - Specifies the [expression](/nomad/api-docs#filtering) + used to filter the results. Consider using pagination to reduce resource used + to serve the request. + +- `resources` `(bool: false)` - Specifies whether or not to include the + `NodeResources` and `ReservedResources` fields in the response. + +- `os` `(bool: false)` - Specifies whether or not to include special attributes + such as operating system name in the response. + +### Sample Request + +```shell-session +$ nomad operator api /v1/node/pool/prod-eng/nodes +``` + +```shell-session +$ nomad operator api /v1/node/pool/prod-eng/nodes?os=true +``` + +### Sample Response + +```json +[ + { + "Address": "10.138.0.5", + "Attributes": { + "os.name": "ubuntu" + }, + "CreateIndex": 6, + "Datacenter": "dc1", + "Drain": false, + "Drivers": { + "java": { + "Attributes": { + "driver.java.runtime": "OpenJDK Runtime Environment (build 1.8.0_162-8u162-b12-1~deb9u1-b12)", + "driver.java.vm": "OpenJDK 64-Bit Server VM (build 25.162-b12, mixed mode)", + "driver.java.version": "openjdk version \"1.8.0_162" + }, + "Detected": true, + "HealthDescription": "", + "Healthy": true, + "UpdateTime": "2018-04-11T23:33:48.781948669Z" + }, + "qemu": { + "Attributes": null, + "Detected": false, + "HealthDescription": "", + "Healthy": false, + "UpdateTime": "2018-04-11T23:33:48.7819898Z" + }, + "rkt": { + "Attributes": { + "driver.rkt.appc.version": "0.8.11", + "driver.rkt.volumes.enabled": "1", + "driver.rkt.version": "1.29.0" + }, + "Detected": true, + "HealthDescription": "Driver rkt is detected: true", + "Healthy": true, + "UpdateTime": "2018-04-11T23:34:48.81079772Z" + }, + "docker": { + "Attributes": { + "driver.docker.bridge_ip": "172.17.0.1", + "driver.docker.version": "18.03.0-ce", + "driver.docker.volumes.enabled": "1" + }, + "Detected": true, + "HealthDescription": "Driver is available and responsive", + "Healthy": true, + "UpdateTime": "2018-04-11T23:34:48.713720323Z" + }, + "exec": { + "Attributes": {}, + "Detected": true, + "HealthDescription": "Driver exec is detected: true", + "Healthy": true, + "UpdateTime": "2018-04-11T23:34:48.711026521Z" + }, + "raw_exec": { + "Attributes": {}, + "Detected": true, + "HealthDescription": "", + "Healthy": true, + "UpdateTime": "2018-04-11T23:33:48.710448534Z" + } + }, + "ID": "f7476465-4d6e-c0de-26d0-e383c49be941", + "LastDrain": null, + "ModifyIndex": 2526, + "Name": "nomad-4", + "NodeClass": "", + "NodePool": "prod-eng", + "SchedulingEligibility": "eligible", + "Status": "ready", + "StatusDescription": "", + "Version": "0.8.0-rc1" + } +] +``` + [api_scheduler_alog]: /nomad/api-docs/operator/scheduler#scheduleralgorithm diff --git a/website/content/docs/operations/metrics-reference.mdx b/website/content/docs/operations/metrics-reference.mdx index 84ce46bfb..539fd8977 100644 --- a/website/content/docs/operations/metrics-reference.mdx +++ b/website/content/docs/operations/metrics-reference.mdx @@ -392,7 +392,8 @@ those listed in [Key Metrics](#key-metrics) above. | `nomad.nomad.namespace.list_namespace` | Time elapsed for `Namespace.ListNamespaces` | Nanoseconds | Summary | host | | `nomad.nomad.namespace.upsert_namespaces` | Time elapsed for `Namespace.UpsertNamespaces` | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.list` | Time elapsed for `NodePool.List` RPC call | Nanoseconds | Summary | host | -| `nomad.nomad.node_pool.list_jobs` | Time elapsed for `NodePool.List` RPC call | Nanoseconds | Summary | host | +| `nomad.nomad.node_pool.list_jobs` | Time elapsed for `NodePool.ListJobs` RPC call | Nanoseconds | Summary | host | +| `nomad.nomad.node_pool.list_nodes` | Time elapsed for `NodePool.ListNodes` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.get_node_pool` | Time elapsed for `NodePool.GetNodePool` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.upsert_node_pools` | Time elapsed for `NodePool.UpsertNodePools` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.delete_node_pools` | Time elapsed for `NodePool.DeleteNodePools` RPC call | Nanoseconds | Summary | host |