From bb0140803e2b354f4f0852da22c2d7cc99e5bac3 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 5 Jun 2023 15:36:52 -0400 Subject: [PATCH] node pools: implement RPC to list jobs in a given node pool (#17396) Implements the `NodePool.ListJobs` RPC, with pagination and filtering based on the existing `Job.List` RPC. --- nomad/node_pool_endpoint.go | 131 ++++++ nomad/node_pool_endpoint_test.go | 389 ++++++++++++++++++ nomad/state/schema.go | 8 + nomad/state/state_store.go | 14 + nomad/structs/node_pool.go | 13 + .../docs/operations/metrics-reference.mdx | 1 + 6 files changed, 556 insertions(+) diff --git a/nomad/node_pool_endpoint.go b/nomad/node_pool_endpoint.go index 708117b4f..46abc0863 100644 --- a/nomad/node_pool_endpoint.go +++ b/nomad/node_pool_endpoint.go @@ -4,6 +4,8 @@ package nomad import ( + "errors" + "fmt" "net/http" "time" @@ -252,3 +254,132 @@ func (n *NodePool) DeleteNodePools(args *structs.NodePoolDeleteRequest, reply *s reply.Index = index return nil } + +// ListJobs is used to retrieve a list of jobs for a given node pool. It supports +// pagination and filtering. +func (n *NodePool) ListJobs(args *structs.NodePoolJobsRequest, reply *structs.NodePoolJobsResponse) error { + authErr := n.srv.Authenticate(n.ctx, args) + if done, err := n.srv.forward("NodePool.ListJobs", args, args, reply); done { + return err + } + n.srv.MeasureRPCRate("node_pool", structs.RateMetricRead, args) + if authErr != nil { + return structs.ErrPermissionDenied + } + defer metrics.MeasureSince([]string{"nomad", "node_pool", "list_jobs"}, time.Now()) + + // Resolve ACL token and verify it has read capability for the pool. + aclObj, err := n.srv.ResolveACL(args) + if err != nil { + return err + } + if !aclObj.AllowNodePoolOperation(args.Name, acl.NodePoolCapabilityRead) { + return structs.ErrPermissionDenied + } + allowNsFunc := aclObj.AllowNsOpFunc(acl.NamespaceCapabilityListJobs) + namespace := args.RequestNamespace() + + // Setup the blocking query. This largely mirrors the Jobs.List RPC but with + // an additional paginator filter for the node pool. + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, store *state.StateStore) error { + // ensure the node pool exists + pool, err := store.NodePoolByName(ws, args.Name) + if err != nil { + return err + } + if pool == nil { + return nil + } + + var iter memdb.ResultIterator + + // Get the namespaces the user is allowed to access. + allowableNamespaces, err := allowedNSes(aclObj, store, allowNsFunc) + if errors.Is(err, structs.ErrPermissionDenied) { + // return empty jobs if token isn't authorized for any + // namespace, matching other endpoints + reply.Jobs = make([]*structs.JobListStub, 0) + } else if err != nil { + return err + } else { + + filters := []paginator.Filter{ + paginator.NamespaceFilter{ + AllowableNamespaces: allowableNamespaces, + }, + } + + if namespace == structs.AllNamespacesSentinel { + iter, err = store.JobsByPool(ws, args.Name) + } else { + iter, err = store.JobsByNamespace(ws, namespace) + filters = append(filters, + paginator.GenericFilter{ + Allow: func(raw interface{}) (bool, error) { + job := raw.(*structs.Job) + if job == nil || job.NodePool != args.Name { + return false, nil + } + return true, nil + }, + }) + } + if err != nil { + return err + } + + tokenizer := paginator.NewStructsTokenizer( + iter, + paginator.StructsTokenizerOptions{ + WithNamespace: true, + WithID: true, + }, + ) + + var jobs []*structs.JobListStub + + paginator, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions, + func(raw interface{}) error { + job := raw.(*structs.Job) + summary, err := store.JobSummaryByID(ws, job.Namespace, job.ID) + if err != nil || summary == nil { + return fmt.Errorf("unable to look up summary for job: %v", job.ID) + } + jobs = append(jobs, job.Stub(summary, args.Fields)) + return nil + }) + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to create result paginator: %v", err) + } + + nextToken, err := paginator.Page() + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to read result page: %v", err) + } + + reply.QueryMeta.NextToken = nextToken + reply.Jobs = jobs + } + + // Use the last index that affected the jobs table or summary + jindex, err := store.Index("jobs") + if err != nil { + return err + } + sindex, err := store.Index("job_summary") + if err != nil { + return err + } + reply.Index = helper.Max(jindex, sindex) + + // Set the query response + n.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return n.srv.blockingRPC(&opts) +} diff --git a/nomad/node_pool_endpoint_test.go b/nomad/node_pool_endpoint_test.go index c15519e50..4bd3b41f4 100644 --- a/nomad/node_pool_endpoint_test.go +++ b/nomad/node_pool_endpoint_test.go @@ -10,8 +10,10 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-set" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -1095,3 +1097,390 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) { }) } } + +func TestNodePoolEndpoint_ListJobs_ACLs(t *testing.T) { + ci.Parallel(t) + + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + store := s1.fsm.State() + index := uint64(1000) + + var err error + + // Populate state with some node pools. + poolDev := &structs.NodePool{ + Name: "dev-1", + Description: "test node pool for dev-1", + } + poolProd := &structs.NodePool{ + Name: "prod-1", + Description: "test node pool for prod-1", + } + err = store.UpsertNodePools(structs.MsgTypeTestSetup, index, []*structs.NodePool{ + poolDev, + poolProd, + }) + must.NoError(t, err) + + // for refering to the jobs in assertions + jobIDs := map[string]string{} + + // register jobs in all pools and all namespaces + for _, ns := range []string{"engineering", "system", "default"} { + index++ + must.NoError(t, store.UpsertNamespaces(index, []*structs.Namespace{{Name: ns}})) + + for _, pool := range []string{"dev-1", "prod-1", "default"} { + job := mock.MinJob() + job.Namespace = ns + job.NodePool = pool + jobIDs[ns+"+"+pool] = job.ID + index++ + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + } + } + + req := &structs.NodePoolJobsRequest{ + Name: "dev-1", + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: structs.AllNamespacesSentinel}, + } + + // Expect failure for request without a token + var resp structs.NodePoolJobsResponse + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) + must.EqError(t, err, structs.ErrPermissionDenied.Error()) + + // Management token can read any namespace / any pool + // var mgmtResp structs.NodePoolJobsResponse + req.AuthToken = root.SecretID + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) + must.NoError(t, err) + must.Len(t, 3, resp.Jobs) + must.SliceContainsAll(t, + helper.ConvertSlice(resp.Jobs, func(j *structs.JobListStub) string { return j.ID }), + []string{jobIDs["engineering+dev-1"], jobIDs["system+dev-1"], jobIDs["default+dev-1"]}) + + // Policy that allows access to any pool but one namespace + index++ + devToken := mock.CreatePolicyAndToken(t, store, index, "dev-node-pools", + fmt.Sprintf("%s\n%s\n%s\n", + mock.NodePoolPolicy("dev-*", "read", nil), + mock.NodePoolPolicy("default", "read", nil), + mock.NamespacePolicy("engineering", "read", nil)), + ) + req.AuthToken = devToken.SecretID + + // with wildcard namespace + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) + must.NoError(t, err) + must.Len(t, 1, resp.Jobs) + must.Eq(t, jobIDs["engineering+dev-1"], resp.Jobs[0].ID) + + // with specific allowed namespaces + req.Namespace = "engineering" + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) + must.NoError(t, err) + must.Len(t, 1, resp.Jobs) + must.Eq(t, jobIDs["engineering+dev-1"], resp.Jobs[0].ID) + + // with disallowed namespace + req.Namespace = "system" + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) + must.NoError(t, err) + must.Len(t, 0, resp.Jobs) + + // with disallowed pool but allowed namespace + req.Namespace = "engineering" + req.Name = "prod-1" + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) + must.EqError(t, err, structs.ErrPermissionDenied.Error()) +} + +func TestNodePoolEndpoint_ListJobs_Blocking(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + store := s1.fsm.State() + index := uint64(1000) + + var err error + + // Populate state with a node pool and a job in the default pool + poolDev := &structs.NodePool{ + Name: "dev-1", + Description: "test node pool for dev-1", + } + err = store.UpsertNodePools(structs.MsgTypeTestSetup, index, []*structs.NodePool{poolDev}) + must.NoError(t, err) + + job := mock.MinJob() + index++ + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + + req := &structs.NodePoolJobsRequest{ + Name: "default", + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: structs.AllNamespacesSentinel}, + } + + // List the job and get the index + var resp structs.NodePoolJobsResponse + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) + must.Len(t, 1, resp.Jobs) + must.Eq(t, index, resp.Index) + must.Eq(t, "default", resp.Jobs[0].NodePool) + + // Moving a job into a pool we're watching should trigger a watch + index++ + time.AfterFunc(100*time.Millisecond, func() { + job = job.Copy() + job.NodePool = "dev-1" + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + }) + + req.Name = "dev-1" + req.MinQueryIndex = index + req.MaxQueryTime = 500 * time.Millisecond + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) + must.Len(t, 1, resp.Jobs) + must.Eq(t, index, resp.Index) + + // Moving a job out of a pool we're watching should trigger a watch + index++ + time.AfterFunc(100*time.Millisecond, func() { + job = job.Copy() + job.NodePool = "default" + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + }) + + req.Name = "dev-1" + req.MinQueryIndex = index + req.MaxQueryTime = 500 * time.Millisecond + err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) + + must.Len(t, 0, resp.Jobs) + must.Eq(t, index, resp.Index) +} + +func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) { + ci.Parallel(t) + + s1, _, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + store := s1.fsm.State() + index := uint64(1000) + + var err error + + // Populate state with some node pools. + poolDev := &structs.NodePool{ + Name: "dev-1", + Description: "test node pool for dev-1", + } + poolProd := &structs.NodePool{ + Name: "prod-1", + Description: "test node pool for prod-1", + } + err = store.UpsertNodePools(structs.MsgTypeTestSetup, index, []*structs.NodePool{ + poolDev, + poolProd, + }) + must.NoError(t, err) + + index++ + must.NoError(t, store.UpsertNamespaces(index, + []*structs.Namespace{{Name: "non-default"}, {Name: "other"}})) + + // create a set of jobs. these are in the order that the state store will + // return them from the iterator (sorted by key) for ease of writing tests + mocks := []struct { + name string + pool string + namespace string + status string + }{ + {name: "job-00", pool: "dev-1", namespace: "default", status: structs.JobStatusPending}, + {name: "job-01", pool: "dev-1", namespace: "default", status: structs.JobStatusPending}, + {name: "job-02", pool: "default", namespace: "default", status: structs.JobStatusPending}, + {name: "job-03", pool: "dev-1", namespace: "non-default", status: structs.JobStatusPending}, + {name: "job-04", pool: "dev-1", namespace: "default", status: structs.JobStatusRunning}, + {name: "job-05", pool: "dev-1", namespace: "default", status: structs.JobStatusRunning}, + {name: "job-06", pool: "dev-1", namespace: "other", status: structs.JobStatusPending}, + // job-07 is missing for missing index assertion + {name: "job-08", pool: "prod-1", namespace: "default", status: structs.JobStatusRunning}, + {name: "job-09", pool: "prod-1", namespace: "non-default", status: structs.JobStatusPending}, + {name: "job-10", pool: "dev-1", namespace: "default", status: structs.JobStatusPending}, + } + for _, m := range mocks { + job := mock.MinJob() + job.ID = m.name + job.Name = m.name + job.NodePool = m.pool + job.Status = m.status + job.Namespace = m.namespace + index++ + job.CreateIndex = index + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + } + + // Policy that allows access to 2 pools and any namespace + index++ + devToken := mock.CreatePolicyAndToken(t, store, index, "dev-node-pools", + fmt.Sprintf("%s\n%s\n%s\n", + mock.NodePoolPolicy("dev-*", "read", nil), + mock.NodePoolPolicy("default", "read", nil), + mock.NamespacePolicy("*", "read", nil)), + ) + + cases := []struct { + name string + pool string + namespace string + filter string + nextToken string + pageSize int32 + expectedNextToken string + expectedIDs []string + expectedError string + }{ + { + name: "test00 all dev pool default NS", + pool: "dev-1", + expectedIDs: []string{"job-00", "job-01", "job-04", "job-05", "job-10"}, + }, + { + name: "test01 size-2 page-1 dev pool default NS", + pool: "dev-1", + pageSize: 2, + expectedNextToken: "default.job-04", + expectedIDs: []string{"job-00", "job-01"}, + }, + { + name: "test02 size-2 page-1 dev pool wildcard NS", + pool: "dev-1", + namespace: "*", + pageSize: 2, + expectedNextToken: "default.job-04", + expectedIDs: []string{"job-00", "job-01"}, + }, + { + name: "test03 size-2 page-2 dev pool default NS", + pool: "dev-1", + pageSize: 2, + nextToken: "default.job-04", + expectedNextToken: "default.job-10", + expectedIDs: []string{"job-04", "job-05"}, + }, + { + name: "test04 size-2 page-2 wildcard NS", + pool: "dev-1", + namespace: "*", + pageSize: 2, + nextToken: "default.job-04", + expectedNextToken: "default.job-10", + expectedIDs: []string{"job-04", "job-05"}, + }, + { + name: "test05 no valid results with filters", + pool: "dev-1", + pageSize: 2, + nextToken: "", + filter: `Name matches "not-job"`, + expectedIDs: []string{}, + }, + { + name: "test06 go-bexpr filter across namespaces", + pool: "dev-1", + namespace: "*", + filter: `Name matches "job-0[12345]"`, + expectedIDs: []string{"job-01", "job-04", "job-05", "job-03"}, + }, + { + name: "test07 go-bexpr filter with pagination", + pool: "dev-1", + namespace: "*", + filter: `Name matches "job-0[12345]"`, + pageSize: 3, + expectedNextToken: "non-default.job-03", + expectedIDs: []string{"job-01", "job-04", "job-05"}, + }, + { + name: "test08 go-bexpr filter in namespace", + pool: "dev-1", + namespace: "non-default", + filter: `Status == "pending"`, + expectedIDs: []string{"job-03"}, + }, + { + name: "test09 go-bexpr invalid expression", + pool: "dev-1", + filter: `NotValid`, + expectedError: "failed to read filter expression", + }, + { + name: "test10 go-bexpr invalid field", + pool: "dev-1", + filter: `InvalidField == "value"`, + expectedError: "error finding value in datum", + }, + { + name: "test11 missing index", + pool: "dev-1", + pageSize: 1, + nextToken: "default.job-07", + expectedIDs: []string{"job-10"}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + req := &structs.NodePoolJobsRequest{ + Name: tc.pool, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: tc.namespace, + Filter: tc.filter, + PerPage: tc.pageSize, + NextToken: tc.nextToken, + }, + } + req.AuthToken = devToken.SecretID + + var resp structs.NodePoolJobsResponse + err := msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) + if tc.expectedError == "" { + must.NoError(t, err) + } else { + must.Error(t, err) + must.ErrorContains(t, err, tc.expectedError) + return + } + + got := set.FromFunc(resp.Jobs, + func(j *structs.JobListStub) string { return j.ID }) + must.True(t, got.ContainsSlice(tc.expectedIDs), + must.Sprintf("unexpected page of jobs: %v", got)) + + must.Eq(t, tc.expectedNextToken, resp.QueryMeta.NextToken, + must.Sprint("unexpected NextToken")) + }) + } + +} diff --git a/nomad/state/schema.go b/nomad/state/schema.go index fd7b4c819..50b1b7050 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -237,6 +237,14 @@ func jobTableSchema() *memdb.TableSchema { Conditional: jobIsPeriodic, }, }, + "pool": { + Name: "pool", + AllowMissing: false, + Unique: false, + Indexer: &memdb.StringFieldIndex{ + Field: "NodePool", + }, + }, }, } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index c774e2f9a..df294a4c1 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2287,6 +2287,20 @@ func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, return iter, nil } +// JobsByPool returns an iterator over all jobs in a given node pool. +func (s *StateStore) JobsByPool(ws memdb.WatchSet, pool string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := txn.Get("jobs", "pool", pool) + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + return iter, nil +} + // JobSummaryByID returns a job summary object which matches a specific id. func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) { txn := s.db.ReadTxn() diff --git a/nomad/structs/node_pool.go b/nomad/structs/node_pool.go index 50b81f7a5..ae5819c1a 100644 --- a/nomad/structs/node_pool.go +++ b/nomad/structs/node_pool.go @@ -181,3 +181,16 @@ type NodePoolDeleteRequest struct { Names []string WriteRequest } + +// NodePoolJobsRequest is used to make a request for the jobs in a specific node pool. +type NodePoolJobsRequest struct { + Name string + Fields *JobStubFields + QueryOptions +} + +// NodePoolJobsResponse returns a list of jobs in a specific node pool. +type NodePoolJobsResponse struct { + Jobs []*JobListStub + QueryMeta +} diff --git a/website/content/docs/operations/metrics-reference.mdx b/website/content/docs/operations/metrics-reference.mdx index 48df00c51..84ce46bfb 100644 --- a/website/content/docs/operations/metrics-reference.mdx +++ b/website/content/docs/operations/metrics-reference.mdx @@ -392,6 +392,7 @@ those listed in [Key Metrics](#key-metrics) above. | `nomad.nomad.namespace.list_namespace` | Time elapsed for `Namespace.ListNamespaces` | Nanoseconds | Summary | host | | `nomad.nomad.namespace.upsert_namespaces` | Time elapsed for `Namespace.UpsertNamespaces` | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.list` | Time elapsed for `NodePool.List` RPC call | Nanoseconds | Summary | host | +| `nomad.nomad.node_pool.list_jobs` | Time elapsed for `NodePool.List` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.get_node_pool` | Time elapsed for `NodePool.GetNodePool` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.upsert_node_pools` | Time elapsed for `NodePool.UpsertNodePools` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.delete_node_pools` | Time elapsed for `NodePool.DeleteNodePools` RPC call | Nanoseconds | Summary | host |