mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
Merge pull request #8001 from hashicorp/f-jobs-list-across-nses
endpoint to expose all jobs across all namespaces
This commit is contained in:
@@ -142,6 +142,13 @@ func (j *Jobs) PrefixList(prefix string) ([]*JobListStub, *QueryMeta, error) {
|
||||
return j.List(&QueryOptions{Prefix: prefix})
|
||||
}
|
||||
|
||||
// ListAll is used to list all of the existing jobs in all namespaces.
|
||||
func (j *Jobs) ListAll() ([]*JobListStub, *QueryMeta, error) {
|
||||
return j.List(&QueryOptions{
|
||||
Params: map[string]string{"all_namespaces": "true"},
|
||||
})
|
||||
}
|
||||
|
||||
// Info is used to retrieve information about a particular
|
||||
// job given its unique ID.
|
||||
func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {
|
||||
@@ -867,6 +874,7 @@ type JobListStub struct {
|
||||
ID string
|
||||
ParentID string
|
||||
Name string
|
||||
Namespace string `json:",omitempty"`
|
||||
Datacenters []string
|
||||
Type string
|
||||
Priority int
|
||||
|
||||
@@ -31,6 +31,7 @@ func (s *HTTPServer) jobListRequest(resp http.ResponseWriter, req *http.Request)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
args.AllNamespaces, _ = strconv.ParseBool(req.URL.Query().Get("all_namespaces"))
|
||||
var out structs.JobListResponse
|
||||
if err := s.agent.RPC("Job.List", &args, &out); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -127,6 +127,46 @@ func TestHTTP_PrefixJobsList(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_JobsList_AllNamespaces_OSS(t *testing.T) {
|
||||
t.Parallel()
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
for i := 0; i < 3; i++ {
|
||||
// Create the job
|
||||
job := mock.Job()
|
||||
args := structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
},
|
||||
}
|
||||
var resp structs.JobRegisterResponse
|
||||
err := s.Agent.RPC("Job.Register", &args, &resp)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Make the HTTP request
|
||||
req, err := http.NewRequest("GET", "/v1/jobs?all_namespaces=true", nil)
|
||||
require.NoError(t, err)
|
||||
respW := httptest.NewRecorder()
|
||||
|
||||
// Make the request
|
||||
obj, err := s.Server.JobsRequest(respW, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check for the index
|
||||
require.NotEmpty(t, respW.HeaderMap.Get("X-Nomad-Index"), "missing index")
|
||||
require.Equal(t, "true", respW.HeaderMap.Get("X-Nomad-KnownLeader"), "missing known leader")
|
||||
require.NotEmpty(t, respW.HeaderMap.Get("X-Nomad-LastContact"), "missing last contact")
|
||||
|
||||
// Check the job
|
||||
j := obj.([]*structs.JobListStub)
|
||||
require.Len(t, j, 3)
|
||||
|
||||
require.Equal(t, "default", j[0].Namespace)
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_JobsRegister(t *testing.T) {
|
||||
t.Parallel()
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
|
||||
@@ -1122,14 +1122,46 @@ func (j *Job) GetJobVersions(args *structs.JobVersionsRequest,
|
||||
return j.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
// allowedNSes returns a set (as map of ns->true) of the namespaces a token has access to.
|
||||
// Returns `nil` set if the token has access to all namespaces
|
||||
// and ErrPermissionDenied if the token has no capabilities on any namespace.
|
||||
func (j *Job) allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, error) {
|
||||
if aclObj == nil || aclObj.IsManagement() {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// namespaces
|
||||
nses, err := state.NamespaceNames()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := make(map[string]bool, len(nses))
|
||||
|
||||
for _, ns := range nses {
|
||||
if aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) {
|
||||
r[ns] = true
|
||||
}
|
||||
}
|
||||
|
||||
if len(r) == 0 {
|
||||
return nil, structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// List is used to list the jobs registered in the system
|
||||
func (j *Job) List(args *structs.JobListRequest,
|
||||
reply *structs.JobListResponse) error {
|
||||
func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error {
|
||||
if done, err := j.srv.forward("Job.List", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "job", "list"}, time.Now())
|
||||
|
||||
if args.AllNamespaces {
|
||||
return j.listAllNamespaces(args, reply)
|
||||
}
|
||||
|
||||
// Check for list-job permissions
|
||||
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
|
||||
return err
|
||||
@@ -1187,6 +1219,82 @@ func (j *Job) List(args *structs.JobListRequest,
|
||||
return j.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
// listAllNamespaces lists all jobs across all namespaces
|
||||
func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.JobListResponse) error {
|
||||
// Check for list-job permissions
|
||||
aclObj, err := j.srv.ResolveToken(args.AuthToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
prefix := args.QueryOptions.Prefix
|
||||
|
||||
// Setup the blocking query
|
||||
opts := blockingOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
run: func(ws memdb.WatchSet, state *state.StateStore) error {
|
||||
// check if user has permission to all namespaces
|
||||
allowedNSes, err := j.allowedNSes(aclObj, state)
|
||||
if err == structs.ErrPermissionDenied {
|
||||
// return empty jobs if token isn't authorized for any
|
||||
// namespace, matching other endpoints
|
||||
reply.Jobs = []*structs.JobListStub{}
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Capture all the jobs
|
||||
iter, err := state.Jobs(ws)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var jobs []*structs.JobListStub
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
job := raw.(*structs.Job)
|
||||
if allowedNSes != nil && !allowedNSes[job.Namespace] {
|
||||
// not permitted to this name namespace
|
||||
continue
|
||||
}
|
||||
if prefix != "" && !strings.HasPrefix(job.ID, prefix) {
|
||||
continue
|
||||
}
|
||||
summary, err := state.JobSummaryByID(ws, job.Namespace, job.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to look up summary for job: %v", job.ID)
|
||||
}
|
||||
|
||||
stub := job.Stub(summary)
|
||||
stub.Namespace = job.Namespace
|
||||
jobs = append(jobs, stub)
|
||||
}
|
||||
reply.Jobs = jobs
|
||||
|
||||
// Use the last index that affected the jobs table or summary
|
||||
jindex, err := state.Index("jobs")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sindex, err := state.Index("job_summary")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reply.Index = helper.Uint64Max(jindex, sindex)
|
||||
|
||||
// Set the query response
|
||||
j.srv.setQueryMeta(&reply.QueryMeta)
|
||||
return nil
|
||||
}}
|
||||
return j.srv.blockingRPC(&opts)
|
||||
|
||||
}
|
||||
|
||||
// Allocations is used to list the allocations for a job
|
||||
func (j *Job) Allocations(args *structs.JobSpecificRequest,
|
||||
reply *structs.JobAllocationsResponse) error {
|
||||
|
||||
@@ -3782,6 +3782,74 @@ func TestJobEndpoint_ListJobs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestJobEndpoint_ListJobs_AllNamespaces_OSS asserts that server
|
||||
// returns all jobs across namespace.
|
||||
//
|
||||
func TestJobEndpoint_ListJobs_AllNamespaces_OSS(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s1, cleanupS1 := TestServer(t, nil)
|
||||
defer cleanupS1()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request
|
||||
job := mock.Job()
|
||||
state := s1.fsm.State()
|
||||
err := state.UpsertJob(1000, job)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Lookup the jobs
|
||||
get := &structs.JobListRequest{
|
||||
AllNamespaces: true,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
var resp2 structs.JobListResponse
|
||||
err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(1000), resp2.Index)
|
||||
require.Len(t, resp2.Jobs, 1)
|
||||
require.Equal(t, job.ID, resp2.Jobs[0].ID)
|
||||
require.Equal(t, structs.DefaultNamespace, resp2.Jobs[0].Namespace)
|
||||
|
||||
// Lookup the jobs by prefix
|
||||
get = &structs.JobListRequest{
|
||||
AllNamespaces: true,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
Prefix: resp2.Jobs[0].ID[:4],
|
||||
},
|
||||
}
|
||||
var resp3 structs.JobListResponse
|
||||
err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp3)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(1000), resp3.Index)
|
||||
require.Len(t, resp3.Jobs, 1)
|
||||
require.Equal(t, job.ID, resp3.Jobs[0].ID)
|
||||
require.Equal(t, structs.DefaultNamespace, resp2.Jobs[0].Namespace)
|
||||
|
||||
// Lookup the jobs by prefix
|
||||
get = &structs.JobListRequest{
|
||||
AllNamespaces: true,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
Prefix: "z" + resp2.Jobs[0].ID[:4],
|
||||
},
|
||||
}
|
||||
var resp4 structs.JobListResponse
|
||||
err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp4)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(1000), resp4.Index)
|
||||
require.Empty(t, resp4.Jobs)
|
||||
}
|
||||
|
||||
func TestJobEndpoint_ListJobs_WithACL(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
@@ -17,3 +17,7 @@ func (s *StateStore) namespaceExists(txn *memdb.Txn, namespace string) (bool, er
|
||||
func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *memdb.Txn) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StateStore) NamespaceNames() ([]string, error) {
|
||||
return []string{structs.DefaultNamespace}, nil
|
||||
}
|
||||
|
||||
@@ -609,6 +609,7 @@ type JobSpecificRequest struct {
|
||||
|
||||
// JobListRequest is used to parameterize a list request
|
||||
type JobListRequest struct {
|
||||
AllNamespaces bool
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
@@ -4108,6 +4109,7 @@ type JobListStub struct {
|
||||
ID string
|
||||
ParentID string
|
||||
Name string
|
||||
Namespace string `json:",omitempty"`
|
||||
Datacenters []string
|
||||
Type string
|
||||
Priority int
|
||||
|
||||
@@ -30,6 +30,10 @@ The table below shows this endpoint's support for
|
||||
- `prefix` `(string: "")` - Specifies a string to filter jobs on based on
|
||||
an index prefix. This is specified as a query string parameter.
|
||||
|
||||
- `all_namespaces` `(bool: false)` - Specifies whether to return the all
|
||||
known jobs across all namespaces the token has `namespace:list-jobs` ACL
|
||||
capability on.
|
||||
|
||||
### Sample Request
|
||||
|
||||
```shell-session
|
||||
@@ -54,6 +58,7 @@ $ curl https://localhost:4646/v1/jobs?prefix=team
|
||||
"StatusDescription": "",
|
||||
"JobSummary": {
|
||||
"JobID": "example",
|
||||
"Namespace": "default",
|
||||
"Summary": {
|
||||
"cache": {
|
||||
"Queued": 1,
|
||||
|
||||
Reference in New Issue
Block a user