endpoint to expose all jobs across all namespaces

Allow a `/v1/jobs?all_namespaces=true` to list all jobs across all
namespaces.  The returned list is to contain a `Namespace` field
indicating the job namespace.

If ACL is enabled, the request token needs to be a management token or
have `namespace:list-jobs` capability on all existing namespaces.
This commit is contained in:
Mahmood Ali
2020-05-18 13:47:13 -04:00
parent 9400ac674e
commit 9813a55d44
8 changed files with 220 additions and 2 deletions

View File

@@ -142,6 +142,13 @@ func (j *Jobs) PrefixList(prefix string) ([]*JobListStub, *QueryMeta, error) {
return j.List(&QueryOptions{Prefix: prefix})
}
// ListAll is used to list all of the existing jobs in all namespaces.
func (j *Jobs) ListAll() ([]*JobListStub, *QueryMeta, error) {
return j.List(&QueryOptions{
Params: map[string]string{"all_namespaces": "true"},
})
}
// Info is used to retrieve information about a particular
// job given its unique ID.
func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {

View File

@@ -31,6 +31,7 @@ func (s *HTTPServer) jobListRequest(resp http.ResponseWriter, req *http.Request)
return nil, nil
}
args.AllNamespaces, _ = strconv.ParseBool(req.URL.Query().Get("all_namespaces"))
var out structs.JobListResponse
if err := s.agent.RPC("Job.List", &args, &out); err != nil {
return nil, err

View File

@@ -127,6 +127,46 @@ func TestHTTP_PrefixJobsList(t *testing.T) {
})
}
func TestHTTP_JobsList_AllNamespaces_OSS(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
for i := 0; i < 3; i++ {
// Create the job
job := mock.Job()
args := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var resp structs.JobRegisterResponse
err := s.Agent.RPC("Job.Register", &args, &resp)
require.NoError(t, err)
}
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/jobs?all_namespaces=true", nil)
require.NoError(t, err)
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.JobsRequest(respW, req)
require.NoError(t, err)
// Check for the index
require.NotEmpty(t, respW.HeaderMap.Get("X-Nomad-Index"), "missing index")
require.Equal(t, "true", respW.HeaderMap.Get("X-Nomad-KnownLeader"), "missing known leader")
require.NotEmpty(t, respW.HeaderMap.Get("X-Nomad-LastContact"), "missing last contact")
// Check the job
j := obj.([]*structs.JobListStub)
require.Len(t, j, 3)
require.Equal(t, "default", j[0].Namespace)
})
}
func TestHTTP_JobsRegister(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {

View File

@@ -1139,14 +1139,38 @@ func (j *Job) GetJobVersions(args *structs.JobVersionsRequest,
return j.srv.blockingRPC(&opts)
}
func (j *Job) validateListAllJobsPermission(aclObj *acl.ACL, state *state.StateStore) error {
if aclObj == nil || aclObj.IsManagement() {
return nil
}
// namespaces
nses, err := state.NamespaceNames()
if err != nil {
return err
}
for _, ns := range nses {
if !aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) {
return structs.ErrPermissionDenied
}
}
return nil
}
// List is used to list the jobs registered in the system
func (j *Job) List(args *structs.JobListRequest,
reply *structs.JobListResponse) error {
func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error {
if done, err := j.srv.forward("Job.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "list"}, time.Now())
if args.AllNamespaces {
return j.listAllNamespaces(args, reply)
}
// Check for list-job permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
return err
@@ -1204,6 +1228,72 @@ func (j *Job) List(args *structs.JobListRequest,
return j.srv.blockingRPC(&opts)
}
// listAllNamespaces lists all jobs across all namespaces
func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.JobListResponse) error {
// Check for list-job permissions
aclObj, err := j.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
prefix := args.QueryOptions.Prefix
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// check if user has permission to all namespaces
if err := j.validateListAllJobsPermission(aclObj, state); err != nil {
return err
}
// Capture all the jobs
iter, err := state.Jobs(ws)
if err != nil {
return err
}
var jobs []*structs.JobListStub
for {
raw := iter.Next()
if raw == nil {
break
}
job := raw.(*structs.Job)
if prefix != "" && !strings.HasPrefix(job.ID, prefix) {
continue
}
summary, err := state.JobSummaryByID(ws, job.Namespace, job.ID)
if err != nil {
return fmt.Errorf("unable to look up summary for job: %v", job.ID)
}
stub := job.Stub(summary)
stub.Namespace = job.Namespace
jobs = append(jobs, stub)
}
reply.Jobs = jobs
// Use the last index that affected the jobs table or summary
jindex, err := state.Index("jobs")
if err != nil {
return err
}
sindex, err := state.Index("job_summary")
if err != nil {
return err
}
reply.Index = helper.Uint64Max(jindex, sindex)
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// Allocations is used to list the allocations for a job
func (j *Job) Allocations(args *structs.JobSpecificRequest,
reply *structs.JobAllocationsResponse) error {

View File

@@ -3782,6 +3782,74 @@ func TestJobEndpoint_ListJobs(t *testing.T) {
}
}
// TestJobEndpoint_ListJobs_AllNamespaces_OSS asserts that server
// returns all jobs across namespace.
//
func TestJobEndpoint_ListJobs_AllNamespaces_OSS(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
state := s1.fsm.State()
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Lookup the jobs
get := &structs.JobListRequest{
AllNamespaces: true,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: job.Namespace,
},
}
var resp2 structs.JobListResponse
err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp2)
require.NoError(t, err)
require.Equal(t, uint64(1000), resp2.Index)
require.Len(t, resp2.Jobs, 1)
require.Equal(t, job.ID, resp2.Jobs[0].ID)
require.Equal(t, structs.DefaultNamespace, resp2.Jobs[0].Namespace)
// Lookup the jobs by prefix
get = &structs.JobListRequest{
AllNamespaces: true,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: job.Namespace,
Prefix: resp2.Jobs[0].ID[:4],
},
}
var resp3 structs.JobListResponse
err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp3)
require.NoError(t, err)
require.Equal(t, uint64(1000), resp3.Index)
require.Len(t, resp3.Jobs, 1)
require.Equal(t, job.ID, resp3.Jobs[0].ID)
require.Equal(t, structs.DefaultNamespace, resp2.Jobs[0].Namespace)
// Lookup the jobs by prefix
get = &structs.JobListRequest{
AllNamespaces: true,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: job.Namespace,
Prefix: "z" + resp2.Jobs[0].ID[:4],
},
}
var resp4 structs.JobListResponse
err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp4)
require.NoError(t, err)
require.Equal(t, uint64(1000), resp4.Index)
require.Empty(t, resp4.Jobs)
}
func TestJobEndpoint_ListJobs_WithACL(t *testing.T) {
t.Parallel()
require := require.New(t)

View File

@@ -17,3 +17,7 @@ func (s *StateStore) namespaceExists(txn *memdb.Txn, namespace string) (bool, er
func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *memdb.Txn) error {
return nil
}
func (s *StateStore) NamespaceNames() ([]string, error) {
return []string{structs.DefaultNamespace}, nil
}

View File

@@ -609,6 +609,7 @@ type JobSpecificRequest struct {
// JobListRequest is used to parameterize a list request
type JobListRequest struct {
AllNamespaces bool
QueryOptions
}
@@ -4105,6 +4106,7 @@ type JobListStub struct {
ID string
ParentID string
Name string
Namespace string `json:",omitempty"`
Datacenters []string
Type string
Priority int

View File

@@ -30,6 +30,12 @@ The table below shows this endpoint's support for
- `prefix` `(string: "")` - Specifies a string to filter jobs on based on
an index prefix. This is specified as a query string parameter.
- `all_namespaces` `(bool: false)` - Specifies whether to return the all
known jobs across all namespaces. When set to `true`, the list stubs will
contain a `Namespace` field indicating the job namespace.
The option requires `namespace:list-jobs` ACL capability on all existing
namespaces.
### Sample Request
```shell-sessioncurl \