From 9813a55d44029efbb794c830756e20d670be3732 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 18 May 2020 13:47:13 -0400 Subject: [PATCH] endpoint to expose all jobs across all namespaces Allow a `/v1/jobs?all_namespaces=true` to list all jobs across all namespaces. The returned list is to contain a `Namespace` field indicating the job namespace. If ACL is enabled, the request token needs to be a management token or have `namespace:list-jobs` capability on all existing namespaces. --- api/jobs.go | 7 +++ command/agent/job_endpoint.go | 1 + command/agent/job_endpoint_test.go | 40 +++++++++++++ nomad/job_endpoint.go | 94 +++++++++++++++++++++++++++++- nomad/job_endpoint_test.go | 68 +++++++++++++++++++++ nomad/state/state_store_oss.go | 4 ++ nomad/structs/structs.go | 2 + website/pages/api-docs/jobs.mdx | 6 ++ 8 files changed, 220 insertions(+), 2 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 5759476f9..416c45064 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -142,6 +142,13 @@ func (j *Jobs) PrefixList(prefix string) ([]*JobListStub, *QueryMeta, error) { return j.List(&QueryOptions{Prefix: prefix}) } +// ListAll is used to list all of the existing jobs in all namespaces. +func (j *Jobs) ListAll() ([]*JobListStub, *QueryMeta, error) { + return j.List(&QueryOptions{ + Params: map[string]string{"all_namespaces": "true"}, + }) +} + // Info is used to retrieve information about a particular // job given its unique ID. func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 5b3d36ee8..d650ffef6 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -31,6 +31,7 @@ func (s *HTTPServer) jobListRequest(resp http.ResponseWriter, req *http.Request) return nil, nil } + args.AllNamespaces, _ = strconv.ParseBool(req.URL.Query().Get("all_namespaces")) var out structs.JobListResponse if err := s.agent.RPC("Job.List", &args, &out); err != nil { return nil, err diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index f740aa062..b0d4befa1 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -127,6 +127,46 @@ func TestHTTP_PrefixJobsList(t *testing.T) { }) } +func TestHTTP_JobsList_AllNamespaces_OSS(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + for i := 0; i < 3; i++ { + // Create the job + job := mock.Job() + args := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var resp structs.JobRegisterResponse + err := s.Agent.RPC("Job.Register", &args, &resp) + require.NoError(t, err) + } + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/jobs?all_namespaces=true", nil) + require.NoError(t, err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.JobsRequest(respW, req) + require.NoError(t, err) + + // Check for the index + require.NotEmpty(t, respW.HeaderMap.Get("X-Nomad-Index"), "missing index") + require.Equal(t, "true", respW.HeaderMap.Get("X-Nomad-KnownLeader"), "missing known leader") + require.NotEmpty(t, respW.HeaderMap.Get("X-Nomad-LastContact"), "missing last contact") + + // Check the job + j := obj.([]*structs.JobListStub) + require.Len(t, j, 3) + + require.Equal(t, "default", j[0].Namespace) + }) +} + func TestHTTP_JobsRegister(t *testing.T) { t.Parallel() httpTest(t, nil, func(s *TestAgent) { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 536a051ee..4f6aa3c38 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1139,14 +1139,38 @@ func (j *Job) GetJobVersions(args *structs.JobVersionsRequest, return j.srv.blockingRPC(&opts) } +func (j *Job) validateListAllJobsPermission(aclObj *acl.ACL, state *state.StateStore) error { + if aclObj == nil || aclObj.IsManagement() { + return nil + } + + // namespaces + nses, err := state.NamespaceNames() + if err != nil { + return err + } + + for _, ns := range nses { + if !aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) { + return structs.ErrPermissionDenied + + } + } + + return nil +} + // List is used to list the jobs registered in the system -func (j *Job) List(args *structs.JobListRequest, - reply *structs.JobListResponse) error { +func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error { if done, err := j.srv.forward("Job.List", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "job", "list"}, time.Now()) + if args.AllNamespaces { + return j.listAllNamespaces(args, reply) + } + // Check for list-job permissions if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { return err @@ -1204,6 +1228,72 @@ func (j *Job) List(args *structs.JobListRequest, return j.srv.blockingRPC(&opts) } +// listAllNamespaces lists all jobs across all namespaces +func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.JobListResponse) error { + // Check for list-job permissions + aclObj, err := j.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } + prefix := args.QueryOptions.Prefix + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + // check if user has permission to all namespaces + if err := j.validateListAllJobsPermission(aclObj, state); err != nil { + return err + } + + // Capture all the jobs + iter, err := state.Jobs(ws) + + if err != nil { + return err + } + + var jobs []*structs.JobListStub + for { + raw := iter.Next() + if raw == nil { + break + } + job := raw.(*structs.Job) + if prefix != "" && !strings.HasPrefix(job.ID, prefix) { + continue + } + summary, err := state.JobSummaryByID(ws, job.Namespace, job.ID) + if err != nil { + return fmt.Errorf("unable to look up summary for job: %v", job.ID) + } + + stub := job.Stub(summary) + stub.Namespace = job.Namespace + jobs = append(jobs, stub) + } + reply.Jobs = jobs + + // Use the last index that affected the jobs table or summary + jindex, err := state.Index("jobs") + if err != nil { + return err + } + sindex, err := state.Index("job_summary") + if err != nil { + return err + } + reply.Index = helper.Uint64Max(jindex, sindex) + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return j.srv.blockingRPC(&opts) + +} + // Allocations is used to list the allocations for a job func (j *Job) Allocations(args *structs.JobSpecificRequest, reply *structs.JobAllocationsResponse) error { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 2f3af3fd9..f91e83c5c 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -3782,6 +3782,74 @@ func TestJobEndpoint_ListJobs(t *testing.T) { } } +// TestJobEndpoint_ListJobs_AllNamespaces_OSS asserts that server +// returns all jobs across namespace. +// +func TestJobEndpoint_ListJobs_AllNamespaces_OSS(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + state := s1.fsm.State() + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup the jobs + get := &structs.JobListRequest{ + AllNamespaces: true, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobListResponse + err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp2) + require.NoError(t, err) + require.Equal(t, uint64(1000), resp2.Index) + require.Len(t, resp2.Jobs, 1) + require.Equal(t, job.ID, resp2.Jobs[0].ID) + require.Equal(t, structs.DefaultNamespace, resp2.Jobs[0].Namespace) + + // Lookup the jobs by prefix + get = &structs.JobListRequest{ + AllNamespaces: true, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: job.Namespace, + Prefix: resp2.Jobs[0].ID[:4], + }, + } + var resp3 structs.JobListResponse + err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp3) + require.NoError(t, err) + require.Equal(t, uint64(1000), resp3.Index) + require.Len(t, resp3.Jobs, 1) + require.Equal(t, job.ID, resp3.Jobs[0].ID) + require.Equal(t, structs.DefaultNamespace, resp2.Jobs[0].Namespace) + + // Lookup the jobs by prefix + get = &structs.JobListRequest{ + AllNamespaces: true, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: job.Namespace, + Prefix: "z" + resp2.Jobs[0].ID[:4], + }, + } + var resp4 structs.JobListResponse + err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp4) + require.NoError(t, err) + require.Equal(t, uint64(1000), resp4.Index) + require.Empty(t, resp4.Jobs) +} + func TestJobEndpoint_ListJobs_WithACL(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store_oss.go b/nomad/state/state_store_oss.go index 77cb7181d..cf6841e0d 100644 --- a/nomad/state/state_store_oss.go +++ b/nomad/state/state_store_oss.go @@ -17,3 +17,7 @@ func (s *StateStore) namespaceExists(txn *memdb.Txn, namespace string) (bool, er func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *memdb.Txn) error { return nil } + +func (s *StateStore) NamespaceNames() ([]string, error) { + return []string{structs.DefaultNamespace}, nil +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index f56fd8519..429ae7b28 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -609,6 +609,7 @@ type JobSpecificRequest struct { // JobListRequest is used to parameterize a list request type JobListRequest struct { + AllNamespaces bool QueryOptions } @@ -4105,6 +4106,7 @@ type JobListStub struct { ID string ParentID string Name string + Namespace string `json:",omitempty"` Datacenters []string Type string Priority int diff --git a/website/pages/api-docs/jobs.mdx b/website/pages/api-docs/jobs.mdx index f1dd114bd..53f560204 100644 --- a/website/pages/api-docs/jobs.mdx +++ b/website/pages/api-docs/jobs.mdx @@ -30,6 +30,12 @@ The table below shows this endpoint's support for - `prefix` `(string: "")` - Specifies a string to filter jobs on based on an index prefix. This is specified as a query string parameter. +- `all_namespaces` `(bool: false)` - Specifies whether to return the all + known jobs across all namespaces. When set to `true`, the list stubs will + contain a `Namespace` field indicating the job namespace. + The option requires `namespace:list-jobs` ACL capability on all existing + namespaces. + ### Sample Request ```shell-sessioncurl \