event stream: adds ability to authenticate using Workload Identity (#24849)

This commit is contained in:
Michael Smithhisler
2025-01-23 11:49:54 -05:00
committed by GitHub
parent d621211108
commit 5befea62b7
7 changed files with 320 additions and 1258 deletions

3
.changelog/24849.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:improvement
event stream: adds ability to authenticate using workload identities
```

View File

@@ -10,6 +10,7 @@ import (
"github.com/hashicorp/go-msgpack/v2/codec"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
@@ -61,6 +62,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
Topics: args.Topics,
Index: uint64(args.Index),
Namespace: args.Namespace,
Authenticate: func() error {
if err := e.srv.Authenticate(nil, &args); err != nil {
return err
}
resolvedACL, err := e.srv.ResolveACL(&args)
if err != nil {
return err
}
return validateACL(args.Namespace, args.Topics, resolvedACL)
},
}
// Get the servers broker and subscribe
@@ -74,21 +85,21 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
var subscription *stream.Subscription
var subErr error
// Track whether the ACL token being used has an expiry time.
var expiryTime *time.Time
// Check required ACL permissions for requested Topics
if e.srv.config.ACLEnabled {
subscription, expiryTime, subErr = publisher.SubscribeWithACLCheck(subReq)
} else {
subscription, subErr = publisher.Subscribe(subReq)
}
subscription, subErr = publisher.Subscribe(subReq)
if subErr != nil {
handleJsonResultError(subErr, pointer.Of(int64(500)), encoder)
return
}
defer subscription.Unsubscribe()
// because we have authenticated, the identity will be set, so extract expiration time
var exp time.Time
if c := args.GetIdentity().GetClaims(); c != nil {
exp = c.Expiry.Time()
} else if t := args.GetIdentity().GetACLToken(); t != nil && t.ExpirationTime != nil {
exp = *t.ExpirationTime
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// goroutine to detect remote side closing
@@ -111,9 +122,9 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
return
}
// Ensure the token being used is not expired before we any events
// Ensure the token being used is not expired before we send any events
// to subscribers.
if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
if !exp.IsZero() && exp.Before(time.Now().UTC()) {
select {
case errCh <- structs.ErrTokenExpired:
case <-ctx.Done():
@@ -213,3 +224,47 @@ func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) {
Error: structs.NewRpcError(err, code),
})
}
func validateACL(namespace string, topics map[structs.Topic][]string, aclObj *acl.ACL) error {
for topic := range topics {
switch topic {
case structs.TopicDeployment,
structs.TopicEvaluation,
structs.TopicAllocation,
structs.TopicJob,
structs.TopicService:
if ok := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob); !ok {
return structs.ErrPermissionDenied
}
case structs.TopicHostVolume:
if ok := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityHostVolumeRead); !ok {
return structs.ErrPermissionDenied
}
case structs.TopicCSIVolume:
if ok := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityCSIReadVolume); !ok {
return structs.ErrPermissionDenied
}
case structs.TopicCSIPlugin:
if ok := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob); !ok {
return structs.ErrPermissionDenied
}
case structs.TopicNode:
if ok := aclObj.AllowNodeRead(); !ok {
return structs.ErrPermissionDenied
}
case structs.TopicNodePool:
// Require management token for node pools since we can't filter
// out node pools the token doesn't have access to.
if ok := aclObj.IsManagement(); !ok {
return structs.ErrPermissionDenied
}
default:
if ok := aclObj.IsManagement(); !ok {
return structs.ErrPermissionDenied
}
}
}
return nil
}

View File

@@ -312,200 +312,176 @@ OUTER:
}
}
func TestEventStream_ACL(t *testing.T) {
func TestEventStream_validateACL(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
// start server
s, _, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyNsGood := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob})
tokenNsFoo := mock.CreatePolicyAndToken(t, s.State(), 1006, "valid", policyNsGood)
policyNsNode := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob})
policyNsNode += "\n" + mock.NodePolicy("read")
tokenNsNode := mock.CreatePolicyAndToken(t, s.State(), 1007, "validnNsNode", policyNsNode)
cases := []struct {
Name string
Token string
Topics map[structs.Topic][]string
Namespace string
ExpectedErr string
PublishFn func(p *stream.EventBroker)
Policy string
Management bool
ExpectedErr error
}{
{
Name: "no token",
Token: "",
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"},
},
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "bad token",
Token: tokenBad.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"},
},
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "job namespace token - correct ns",
Token: tokenNsFoo.SecretID,
Name: "read-job topics - correct ns",
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
structs.TopicEvaluation: {"*"},
structs.TopicAllocation: {"*"},
structs.TopicDeployment: {"*"},
structs.TopicService: {"*"},
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}),
Namespace: "foo",
ExpectedErr: "subscription closed by server",
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
Management: false,
ExpectedErr: nil,
},
{
Name: "job namespace token - incorrect ns",
Token: tokenNsFoo.SecretID,
Name: "read-job topic - incorrect ns",
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"}, // good
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}),
Namespace: "bar", // bad
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
Management: false,
ExpectedErr: structs.ErrPermissionDenied,
},
{
Name: "job namespace token - request management topic",
Token: tokenNsFoo.SecretID,
Name: "read all topics - correct policy",
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"}, // bad
},
Namespace: "foo",
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
Policy: "",
Namespace: "*",
Management: true,
ExpectedErr: nil,
},
{
Name: "job namespace token - request invalid node topic",
Token: tokenNsFoo.SecretID,
Name: "read all topics - incorrect policy",
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"}, // bad
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}),
Namespace: "foo",
Management: false,
ExpectedErr: structs.ErrPermissionDenied,
},
{
Name: "read node - valid policy",
Topics: map[structs.Topic][]string{
structs.TopicNode: {"*"}, // bad
},
Policy: mock.NodePolicy(acl.PolicyRead),
Namespace: "foo",
Management: false,
ExpectedErr: nil,
},
{
Name: "read node - invalid policy",
Topics: map[structs.Topic][]string{
structs.TopicEvaluation: {"*"}, // good
structs.TopicNode: {"*"}, // bad
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}),
Namespace: "foo",
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
Management: false,
ExpectedErr: structs.ErrPermissionDenied,
},
{
Name: "job+node namespace token, valid",
Token: tokenNsNode.SecretID,
Name: "read node pool - correct policy",
Topics: map[structs.Topic][]string{
structs.TopicEvaluation: {"*"}, // good
structs.TopicNode: {"*"}, // good
structs.TopicNodePool: {"*"}, // bad
},
Policy: "",
Namespace: "",
Management: true,
ExpectedErr: nil,
},
{
Name: "read node pool - incorrect policy",
Topics: map[structs.Topic][]string{
structs.TopicNodePool: {"*"}, // bad
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}),
Namespace: "foo",
ExpectedErr: "subscription closed by server",
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Node", Payload: mock.Node()}}})
Management: false,
ExpectedErr: structs.ErrPermissionDenied,
},
{
Name: "read host volumes - correct policy and ns",
Topics: map[structs.Topic][]string{
structs.TopicHostVolume: {"*"},
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityHostVolumeRead}),
Namespace: "foo",
Management: false,
ExpectedErr: nil,
},
{
Name: "read host volumes - incorrect policy or ns",
Topics: map[structs.Topic][]string{
structs.TopicHostVolume: {"*"},
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}),
Namespace: "foo",
Management: false,
ExpectedErr: structs.ErrPermissionDenied,
},
{
Name: "read csi volumes - correct policy and ns",
Topics: map[structs.Topic][]string{
structs.TopicCSIVolume: {"*"},
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityCSIReadVolume}),
Namespace: "foo",
Management: false,
ExpectedErr: nil,
},
{
Name: "read csi volumes - incorrect policy or ns",
Topics: map[structs.Topic][]string{
structs.TopicCSIVolume: {"*"},
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}),
Namespace: "foo",
Management: false,
ExpectedErr: structs.ErrPermissionDenied,
},
{
Name: "read csi plugin - correct policy and ns",
Topics: map[structs.Topic][]string{
structs.TopicCSIPlugin: {"*"},
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}),
Namespace: "foo",
Management: false,
ExpectedErr: nil,
},
{
Name: "read csi plugin - incorrect policy or ns",
Topics: map[structs.Topic][]string{
structs.TopicCSIPlugin: {"*"},
},
Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}),
Namespace: "bar",
Management: false,
ExpectedErr: structs.ErrPermissionDenied,
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
var ns string
if tc.Namespace != "" {
ns = tc.Namespace
}
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: tc.Topics,
QueryOptions: structs.QueryOptions{
Region: s.Region(),
Namespace: ns,
AuthToken: tc.Token,
},
}
handler, err := s.StreamingRpcHandler("Event.Stream")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
publisher, err := s.State().EventBroker()
p, err := acl.Parse(tc.Policy)
require.NoError(err)
// publish some events
node := mock.Node()
testACL, err := acl.NewACL(tc.Management, []*acl.Policy{p})
require.NoError(err)
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
if tc.PublishFn != nil {
tc.PublishFn(publisher)
}
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for events")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
// force error by closing all subscriptions
publisher.CloseAll()
if msg.Error == nil {
continue
}
if strings.Contains(msg.Error.Error(), tc.ExpectedErr) {
break OUTER
} else {
t.Fatalf("unexpected error %v", msg.Error)
}
}
}
err = validateACL(tc.Namespace, tc.Topics, testACL)
require.Equal(tc.ExpectedErr, err)
})
}
}

View File

@@ -122,15 +122,6 @@ type StateStore struct {
stopEventBroker func()
}
type streamACLDelegate struct {
s *StateStore
}
func (a *streamACLDelegate) TokenProvider() stream.ACLTokenProvider {
resolver, _ := a.s.Snapshot()
return resolver
}
// NewStateStore is used to create a new state store
func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
if err := config.Validate(); err != nil {
@@ -154,7 +145,7 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
if config.EnablePublisher {
// Create new event publisher using provided config
broker, err := stream.NewEventBroker(ctx, &streamACLDelegate{s}, stream.EventBrokerCfg{
broker, err := stream.NewEventBroker(ctx, stream.EventBrokerCfg{
EventBufferSize: config.EventBufferSize,
Logger: config.Logger,
})

View File

@@ -5,15 +5,11 @@ package stream
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/go-hclog"
@@ -43,9 +39,6 @@ type EventBroker struct {
// the Commit call in the FSM hot path.
publishCh chan *structs.Events
aclDelegate ACLDelegate
aclCache *structs.ACLCache[*acl.ACL]
aclCh chan structs.Event
logger hclog.Logger
@@ -55,7 +48,7 @@ type EventBroker struct {
// A goroutine is run in the background to publish events to an event buffer.
// Cancelling the context will shutdown the goroutine to free resources, and stop
// all publishing.
func NewEventBroker(ctx context.Context, aclDelegate ACLDelegate, cfg EventBrokerCfg) (*EventBroker, error) {
func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) (*EventBroker, error) {
if cfg.Logger == nil {
cfg.Logger = hclog.NewNullLogger()
}
@@ -67,12 +60,10 @@ func NewEventBroker(ctx context.Context, aclDelegate ACLDelegate, cfg EventBroke
buffer := newEventBuffer(cfg.EventBufferSize)
e := &EventBroker{
logger: cfg.Logger.Named("event_broker"),
eventBuf: buffer,
publishCh: make(chan *structs.Events, 64),
aclCh: make(chan structs.Event, 10),
aclDelegate: aclDelegate,
aclCache: structs.NewACLCache[*acl.ACL](aclCacheSize),
logger: cfg.Logger.Named("event_broker"),
eventBuf: buffer,
publishCh: make(chan *structs.Events, 64),
aclCh: make(chan structs.Event, 10),
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
@@ -98,7 +89,7 @@ func (e *EventBroker) Publish(events *structs.Events) {
// Notify the broker to check running subscriptions against potentially
// updated ACL Token or Policy
for _, event := range events.Events {
if event.Topic == structs.TopicACLToken || event.Topic == structs.TopicACLPolicy {
if event.Topic == structs.TopicACLToken || event.Topic == structs.TopicACLPolicy || event.Topic == structs.TopicACLRole {
e.aclCh <- event
}
}
@@ -106,27 +97,6 @@ func (e *EventBroker) Publish(events *structs.Events) {
e.publishCh <- events
}
// SubscribeWithACLCheck validates the SubscribeRequest's token and requested
// topics to ensure that the tokens privileges are sufficient. It will also
// return the token expiry time, if any. It is the callers responsibility to
// check this before publishing events to the caller.
func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, *time.Time, error) {
aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token)
if err != nil {
return nil, nil, structs.ErrPermissionDenied
}
if allowed := aclAllowsSubscription(aclObj, req); !allowed {
return nil, nil, structs.ErrPermissionDenied
}
sub, err := e.Subscribe(req)
if err != nil {
return nil, nil, err
}
return sub, expiryTime, nil
}
// Subscribe returns a new Subscription for a given request. A Subscription
// will receive an initial empty currentItem value which points to the first item
// in the buffer. This allows the new subscription to call Next() without first checking
@@ -162,6 +132,14 @@ func (e *EventBroker) Subscribe(req *SubscribeRequest) (*Subscription, error) {
start.link.next.Store(head)
close(start.link.nextCh)
if req.Authenticate == nil {
req.Authenticate = func() error {
return nil
}
} else if err := req.Authenticate(); err != nil {
return nil, err
}
sub := newSubscription(req, start, e.subscriptions.unsubscribeFn(req))
e.subscriptions.add(req, sub)
@@ -201,26 +179,8 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
continue
}
// If broker cannot fetch state there is nothing more to do
if e.aclDelegate == nil {
continue
}
aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID)
if err != nil {
e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
e.logger.Info("ACL token is expired, closing subscriptions")
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
return !aclAllowsSubscription(aclObj, sub.req)
return sub.req.Authenticate() != nil
})
case *structs.ACLPolicyEvent, *structs.ACLRoleStreamEvent:
@@ -242,159 +202,18 @@ func (e *EventBroker) checkSubscriptionsAgainstACLChange() {
e.mu.Lock()
defer e.mu.Unlock()
// If broker cannot fetch state there is nothing more to do
if e.aclDelegate == nil {
return
}
aclSnapshot := e.aclDelegate.TokenProvider()
for tokenSecretID := range e.subscriptions.byToken {
// if tokenSecretID is empty ACLs were disabled at time of subscribing
if tokenSecretID == "" {
continue
}
aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID)
if err != nil {
e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
e.logger.Info("ACL token is expired, closing subscriptions")
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
return !aclAllowsSubscription(aclObj, sub.req)
return sub.req.Authenticate() != nil
})
}
}
func aclObjFromSnapshotForTokenSecretID(
aclSnapshot ACLTokenProvider, aclCache *structs.ACLCache[*acl.ACL], tokenSecretID string) (
*acl.ACL, *time.Time, error) {
aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID)
if err != nil {
return nil, nil, err
}
if aclToken == nil {
return nil, nil, structs.ErrTokenNotFound
}
if aclToken.IsExpired(time.Now().UTC()) {
return nil, nil, structs.ErrTokenExpired
}
// Check if this is a management token
if aclToken.Type == structs.ACLManagementToken {
return acl.ManagementACL, aclToken.ExpirationTime, nil
}
aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles))
for _, policyName := range aclToken.Policies {
policy, err := aclSnapshot.ACLPolicyByName(nil, policyName)
if err != nil {
return nil, nil, errors.New("error finding acl policy")
}
if policy == nil {
// Ignore policies that don't exist, since they don't grant any
// more privilege.
continue
}
aclPolicies = append(aclPolicies, policy)
}
// Iterate all the token role links, so we can unpack these and identify
// the ACL policies.
for _, roleLink := range aclToken.Roles {
role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID)
if err != nil {
return nil, nil, err
}
if role == nil {
continue
}
for _, policyLink := range role.Policies {
policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name)
if err != nil {
return nil, nil, errors.New("error finding acl policy")
}
if policy == nil {
// Ignore policies that don't exist, since they don't grant any
// more privilege.
continue
}
aclPolicies = append(aclPolicies, policy)
}
}
aclObj, err := structs.CompileACLObject(aclCache, aclPolicies)
if err != nil {
return nil, nil, err
}
return aclObj, aclToken.ExpirationTime, nil
}
type ACLTokenProvider interface {
ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error)
ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error)
GetACLRoleByID(ws memdb.WatchSet, roleID string) (*structs.ACLRole, error)
}
type ACLDelegate interface {
TokenProvider() ACLTokenProvider
}
func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
for topic := range subReq.Topics {
switch topic {
case structs.TopicDeployment,
structs.TopicEvaluation,
structs.TopicAllocation,
structs.TopicJob,
structs.TopicService:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
return false
}
case structs.TopicHostVolume:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityHostVolumeRead); !ok {
return false
}
case structs.TopicCSIVolume:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityCSIReadVolume); !ok {
return false
}
case structs.TopicCSIPlugin:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
return false
}
case structs.TopicNode:
if ok := aclObj.AllowNodeRead(); !ok {
return false
}
case structs.TopicNodePool:
// Require management token for node pools since we can't filter
// out node pools the token doesn't have access to.
if ok := aclObj.IsManagement(); !ok {
return false
}
default:
if ok := aclObj.IsManagement(); !ok {
return false
}
}
}
return true
}
func (s *Subscription) forceClose() {
if atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed) {
close(s.forceClosed)

File diff suppressed because it is too large Load Diff

View File

@@ -59,6 +59,11 @@ type SubscribeRequest struct {
// the closest index in the buffer will be returned if there is not
// an exact match
StartExactlyAtIndex bool
// Authenticate is a callback that authenticates the token
// associated with the SubscribeRequest has not expired and
// has the correct permissions
Authenticate func() error
}
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {