From b5fc8883a4366993a2ad28e415922453bbdb326c Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 17 Jun 2020 09:11:08 -0400 Subject: [PATCH] rpc: allow querying allocs across namespaces This implements the backend handling for querying across namespaces for allocation list endpoints. --- nomad/alloc_endpoint.go | 16 ++++++++++++++-- nomad/job_endpoint.go | 4 ++-- nomad/state/state_store.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index fb55c614c..0909c3907 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -30,7 +30,8 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now()) // Check namespace read-job permissions - if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + aclObj, err := a.srv.ResolveToken(args.AuthToken) + if err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { return structs.ErrPermissionDenied @@ -44,7 +45,18 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes // Capture all the allocations var err error var iter memdb.ResultIterator - if prefix := args.QueryOptions.Prefix; prefix != "" { + + prefix := args.QueryOptions.Prefix + if args.RequestNamespace() == structs.AllNamespacesSentinel { + allowedNSes, err := allowedNSes(aclObj, state) + if err != nil { + return err + } + iter, err = state.AllocsByIDPrefixInNSes(ws, allowedNSes, prefix) + if err != nil { + return err + } + } else if prefix != "" { iter, err = state.AllocsByIDPrefix(ws, args.RequestNamespace(), prefix) } else { iter, err = state.AllocsByNamespace(ws, args.RequestNamespace()) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 3566eb767..6cd89b540 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1183,7 +1183,7 @@ func (j *Job) GetJobVersions(args *structs.JobVersionsRequest, // allowedNSes returns a set (as map of ns->true) of the namespaces a token has access to. // Returns `nil` set if the token has access to all namespaces // and ErrPermissionDenied if the token has no capabilities on any namespace. -func (j *Job) allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, error) { +func allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, error) { if aclObj == nil || aclObj.IsManagement() { return nil, nil } @@ -1292,7 +1292,7 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { // check if user has permission to all namespaces - allowedNSes, err := j.allowedNSes(aclObj, state) + allowedNSes, err := allowedNSes(aclObj, state) if err == structs.ErrPermissionDenied { // return empty jobs if token isn't authorized for any // namespace, matching other endpoints diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 757d84f3b..9827fdffd 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3146,6 +3146,38 @@ func allocNamespaceFilter(namespace string) func(interface{}) bool { } } +// AllocsByIDPrefix is used to lookup allocs by prefix +func (s *StateStore) AllocsByIDPrefixInNSes(ws memdb.WatchSet, namespaces map[string]bool, prefix string) (memdb.ResultIterator, error) { + txn := s.db.Txn(false) + + var iter memdb.ResultIterator + var err error + if prefix != "" { + iter, err = txn.Get("allocs", "id_prefix", prefix) + } else { + iter, err = txn.Get("allocs", "id") + + } + if err != nil { + return nil, fmt.Errorf("alloc lookup failed: %v", err) + } + + ws.Add(iter.WatchCh()) + + // Wrap the iterator in a filter + nsesFilter := func(raw interface{}) bool { + alloc, ok := raw.(*structs.Allocation) + if !ok { + return true + } + + return namespaces[alloc.Namespace] + } + + wrap := memdb.NewFilterIterator(iter, nsesFilter) + return wrap, nil +} + // AllocsByNode returns all the allocations by node func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) { txn := s.db.Txn(false)