diff --git a/.changelog/24849.txt b/.changelog/24849.txt new file mode 100644 index 000000000..848a18c50 --- /dev/null +++ b/.changelog/24849.txt @@ -0,0 +1,3 @@ +```release-note:improvement +event stream: adds ability to authenticate using workload identities +``` diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index e05df130f..8c0e818c3 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/go-msgpack/v2/codec" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" @@ -61,6 +62,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) { Topics: args.Topics, Index: uint64(args.Index), Namespace: args.Namespace, + Authenticate: func() error { + if err := e.srv.Authenticate(nil, &args); err != nil { + return err + } + resolvedACL, err := e.srv.ResolveACL(&args) + if err != nil { + return err + } + return validateACL(args.Namespace, args.Topics, resolvedACL) + }, } // Get the servers broker and subscribe @@ -74,21 +85,21 @@ func (e *Event) stream(conn io.ReadWriteCloser) { var subscription *stream.Subscription var subErr error - // Track whether the ACL token being used has an expiry time. - var expiryTime *time.Time - - // Check required ACL permissions for requested Topics - if e.srv.config.ACLEnabled { - subscription, expiryTime, subErr = publisher.SubscribeWithACLCheck(subReq) - } else { - subscription, subErr = publisher.Subscribe(subReq) - } + subscription, subErr = publisher.Subscribe(subReq) if subErr != nil { handleJsonResultError(subErr, pointer.Of(int64(500)), encoder) return } defer subscription.Unsubscribe() + // because we have authenticated, the identity will be set, so extract expiration time + var exp time.Time + if c := args.GetIdentity().GetClaims(); c != nil { + exp = c.Expiry.Time() + } else if t := args.GetIdentity().GetACLToken(); t != nil && t.ExpirationTime != nil { + exp = *t.ExpirationTime + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() // goroutine to detect remote side closing @@ -111,9 +122,9 @@ func (e *Event) stream(conn io.ReadWriteCloser) { return } - // Ensure the token being used is not expired before we any events + // Ensure the token being used is not expired before we send any events // to subscribers. - if expiryTime != nil && expiryTime.Before(time.Now().UTC()) { + if !exp.IsZero() && exp.Before(time.Now().UTC()) { select { case errCh <- structs.ErrTokenExpired: case <-ctx.Done(): @@ -213,3 +224,47 @@ func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) { Error: structs.NewRpcError(err, code), }) } + +func validateACL(namespace string, topics map[structs.Topic][]string, aclObj *acl.ACL) error { + for topic := range topics { + switch topic { + case structs.TopicDeployment, + structs.TopicEvaluation, + structs.TopicAllocation, + structs.TopicJob, + structs.TopicService: + if ok := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob); !ok { + return structs.ErrPermissionDenied + } + case structs.TopicHostVolume: + if ok := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityHostVolumeRead); !ok { + return structs.ErrPermissionDenied + } + case structs.TopicCSIVolume: + if ok := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityCSIReadVolume); !ok { + return structs.ErrPermissionDenied + } + case structs.TopicCSIPlugin: + if ok := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob); !ok { + return structs.ErrPermissionDenied + } + case structs.TopicNode: + if ok := aclObj.AllowNodeRead(); !ok { + return structs.ErrPermissionDenied + } + case structs.TopicNodePool: + // Require management token for node pools since we can't filter + // out node pools the token doesn't have access to. + if ok := aclObj.IsManagement(); !ok { + return structs.ErrPermissionDenied + } + default: + if ok := aclObj.IsManagement(); !ok { + return structs.ErrPermissionDenied + } + } + } + + return nil + +} diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go index 4e11bb997..f419cec25 100644 --- a/nomad/event_endpoint_test.go +++ b/nomad/event_endpoint_test.go @@ -312,200 +312,176 @@ OUTER: } } -func TestEventStream_ACL(t *testing.T) { +func TestEventStream_validateACL(t *testing.T) { ci.Parallel(t) require := require.New(t) - // start server - s, _, cleanupS := TestACLServer(t, nil) - defer cleanupS() - testutil.WaitForLeader(t, s.RPC) - - policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) - tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) - - policyNsGood := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}) - tokenNsFoo := mock.CreatePolicyAndToken(t, s.State(), 1006, "valid", policyNsGood) - - policyNsNode := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}) - policyNsNode += "\n" + mock.NodePolicy("read") - tokenNsNode := mock.CreatePolicyAndToken(t, s.State(), 1007, "validnNsNode", policyNsNode) - cases := []struct { Name string - Token string Topics map[structs.Topic][]string Namespace string - ExpectedErr string - PublishFn func(p *stream.EventBroker) + Policy string + Management bool + ExpectedErr error }{ { - Name: "no token", - Token: "", - Topics: map[structs.Topic][]string{ - structs.TopicAll: {"*"}, - }, - ExpectedErr: structs.ErrPermissionDenied.Error(), - }, - { - Name: "bad token", - Token: tokenBad.SecretID, - Topics: map[structs.Topic][]string{ - structs.TopicAll: {"*"}, - }, - ExpectedErr: structs.ErrPermissionDenied.Error(), - }, - { - Name: "job namespace token - correct ns", - Token: tokenNsFoo.SecretID, + Name: "read-job topics - correct ns", Topics: map[structs.Topic][]string{ structs.TopicJob: {"*"}, structs.TopicEvaluation: {"*"}, structs.TopicAllocation: {"*"}, structs.TopicDeployment: {"*"}, + structs.TopicService: {"*"}, }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}), Namespace: "foo", - ExpectedErr: "subscription closed by server", - PublishFn: func(p *stream.EventBroker) { - p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) - }, + Management: false, + ExpectedErr: nil, }, { - Name: "job namespace token - incorrect ns", - Token: tokenNsFoo.SecretID, + Name: "read-job topic - incorrect ns", Topics: map[structs.Topic][]string{ structs.TopicJob: {"*"}, // good }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}), Namespace: "bar", // bad - ExpectedErr: structs.ErrPermissionDenied.Error(), - PublishFn: func(p *stream.EventBroker) { - p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) - }, + Management: false, + ExpectedErr: structs.ErrPermissionDenied, }, { - Name: "job namespace token - request management topic", - Token: tokenNsFoo.SecretID, + Name: "read all topics - correct policy", Topics: map[structs.Topic][]string{ structs.TopicAll: {"*"}, // bad }, - Namespace: "foo", - ExpectedErr: structs.ErrPermissionDenied.Error(), - PublishFn: func(p *stream.EventBroker) { - p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) - }, + Policy: "", + Namespace: "*", + Management: true, + ExpectedErr: nil, }, { - Name: "job namespace token - request invalid node topic", - Token: tokenNsFoo.SecretID, + Name: "read all topics - incorrect policy", + Topics: map[structs.Topic][]string{ + structs.TopicAll: {"*"}, // bad + }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}), + Namespace: "foo", + Management: false, + ExpectedErr: structs.ErrPermissionDenied, + }, + { + Name: "read node - valid policy", + Topics: map[structs.Topic][]string{ + structs.TopicNode: {"*"}, // bad + }, + Policy: mock.NodePolicy(acl.PolicyRead), + Namespace: "foo", + Management: false, + ExpectedErr: nil, + }, + { + Name: "read node - invalid policy", Topics: map[structs.Topic][]string{ structs.TopicEvaluation: {"*"}, // good structs.TopicNode: {"*"}, // bad }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}), Namespace: "foo", - ExpectedErr: structs.ErrPermissionDenied.Error(), - PublishFn: func(p *stream.EventBroker) { - p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) - }, + Management: false, + ExpectedErr: structs.ErrPermissionDenied, }, { - Name: "job+node namespace token, valid", - Token: tokenNsNode.SecretID, + Name: "read node pool - correct policy", Topics: map[structs.Topic][]string{ - structs.TopicEvaluation: {"*"}, // good - structs.TopicNode: {"*"}, // good + structs.TopicNodePool: {"*"}, // bad }, + Policy: "", + Namespace: "", + Management: true, + ExpectedErr: nil, + }, + { + Name: "read node pool - incorrect policy", + Topics: map[structs.Topic][]string{ + structs.TopicNodePool: {"*"}, // bad + }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}), Namespace: "foo", - ExpectedErr: "subscription closed by server", - PublishFn: func(p *stream.EventBroker) { - p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Node", Payload: mock.Node()}}}) + Management: false, + ExpectedErr: structs.ErrPermissionDenied, + }, + { + Name: "read host volumes - correct policy and ns", + Topics: map[structs.Topic][]string{ + structs.TopicHostVolume: {"*"}, }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityHostVolumeRead}), + Namespace: "foo", + Management: false, + ExpectedErr: nil, + }, + { + Name: "read host volumes - incorrect policy or ns", + Topics: map[structs.Topic][]string{ + structs.TopicHostVolume: {"*"}, + }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}), + Namespace: "foo", + Management: false, + ExpectedErr: structs.ErrPermissionDenied, + }, + { + Name: "read csi volumes - correct policy and ns", + Topics: map[structs.Topic][]string{ + structs.TopicCSIVolume: {"*"}, + }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityCSIReadVolume}), + Namespace: "foo", + Management: false, + ExpectedErr: nil, + }, + { + Name: "read csi volumes - incorrect policy or ns", + Topics: map[structs.Topic][]string{ + structs.TopicCSIVolume: {"*"}, + }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}), + Namespace: "foo", + Management: false, + ExpectedErr: structs.ErrPermissionDenied, + }, + { + Name: "read csi plugin - correct policy and ns", + Topics: map[structs.Topic][]string{ + structs.TopicCSIPlugin: {"*"}, + }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}), + Namespace: "foo", + Management: false, + ExpectedErr: nil, + }, + { + Name: "read csi plugin - incorrect policy or ns", + Topics: map[structs.Topic][]string{ + structs.TopicCSIPlugin: {"*"}, + }, + Policy: mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}), + Namespace: "bar", + Management: false, + ExpectedErr: structs.ErrPermissionDenied, }, } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - var ns string - if tc.Namespace != "" { - ns = tc.Namespace - } - // Create request for all topics and keys - req := structs.EventStreamRequest{ - Topics: tc.Topics, - QueryOptions: structs.QueryOptions{ - Region: s.Region(), - Namespace: ns, - AuthToken: tc.Token, - }, - } - handler, err := s.StreamingRpcHandler("Event.Stream") - require.Nil(err) - - // create pipe - p1, p2 := net.Pipe() - defer p1.Close() - defer p2.Close() - - errCh := make(chan error) - streamMsg := make(chan *structs.EventStreamWrapper) - - go handler(p2) - - // Start decoder - go func() { - decoder := codec.NewDecoder(p1, structs.MsgpackHandle) - for { - var msg structs.EventStreamWrapper - if err := decoder.Decode(&msg); err != nil { - if err == io.EOF || strings.Contains(err.Error(), "closed") { - return - } - errCh <- fmt.Errorf("error decoding: %w", err) - } - - streamMsg <- &msg - } - }() - - // send request - encoder := codec.NewEncoder(p1, structs.MsgpackHandle) - require.Nil(encoder.Encode(req)) - - publisher, err := s.State().EventBroker() + p, err := acl.Parse(tc.Policy) require.NoError(err) - // publish some events - node := mock.Node() + testACL, err := acl.NewACL(tc.Management, []*acl.Policy{p}) + require.NoError(err) - publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}}) - publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}}) - - if tc.PublishFn != nil { - tc.PublishFn(publisher) - } - - timeout := time.After(5 * time.Second) - OUTER: - for { - select { - case <-timeout: - t.Fatal("timeout waiting for events") - case err := <-errCh: - t.Fatal(err) - case msg := <-streamMsg: - // force error by closing all subscriptions - publisher.CloseAll() - if msg.Error == nil { - continue - } - - if strings.Contains(msg.Error.Error(), tc.ExpectedErr) { - break OUTER - } else { - t.Fatalf("unexpected error %v", msg.Error) - } - } - } + err = validateACL(tc.Namespace, tc.Topics, testACL) + require.Equal(tc.ExpectedErr, err) }) } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 38c821e61..f5b690291 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -122,15 +122,6 @@ type StateStore struct { stopEventBroker func() } -type streamACLDelegate struct { - s *StateStore -} - -func (a *streamACLDelegate) TokenProvider() stream.ACLTokenProvider { - resolver, _ := a.s.Snapshot() - return resolver -} - // NewStateStore is used to create a new state store func NewStateStore(config *StateStoreConfig) (*StateStore, error) { if err := config.Validate(); err != nil { @@ -154,7 +145,7 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { if config.EnablePublisher { // Create new event publisher using provided config - broker, err := stream.NewEventBroker(ctx, &streamACLDelegate{s}, stream.EventBrokerCfg{ + broker, err := stream.NewEventBroker(ctx, stream.EventBrokerCfg{ EventBufferSize: config.EventBufferSize, Logger: config.Logger, }) diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 9445cec31..bf9e22c03 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -5,15 +5,11 @@ package stream import ( "context" - "errors" "fmt" "sync" "sync/atomic" - "time" "github.com/armon/go-metrics" - "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/go-hclog" @@ -43,9 +39,6 @@ type EventBroker struct { // the Commit call in the FSM hot path. publishCh chan *structs.Events - aclDelegate ACLDelegate - aclCache *structs.ACLCache[*acl.ACL] - aclCh chan structs.Event logger hclog.Logger @@ -55,7 +48,7 @@ type EventBroker struct { // A goroutine is run in the background to publish events to an event buffer. // Cancelling the context will shutdown the goroutine to free resources, and stop // all publishing. -func NewEventBroker(ctx context.Context, aclDelegate ACLDelegate, cfg EventBrokerCfg) (*EventBroker, error) { +func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) (*EventBroker, error) { if cfg.Logger == nil { cfg.Logger = hclog.NewNullLogger() } @@ -67,12 +60,10 @@ func NewEventBroker(ctx context.Context, aclDelegate ACLDelegate, cfg EventBroke buffer := newEventBuffer(cfg.EventBufferSize) e := &EventBroker{ - logger: cfg.Logger.Named("event_broker"), - eventBuf: buffer, - publishCh: make(chan *structs.Events, 64), - aclCh: make(chan structs.Event, 10), - aclDelegate: aclDelegate, - aclCache: structs.NewACLCache[*acl.ACL](aclCacheSize), + logger: cfg.Logger.Named("event_broker"), + eventBuf: buffer, + publishCh: make(chan *structs.Events, 64), + aclCh: make(chan structs.Event, 10), subscriptions: &subscriptions{ byToken: make(map[string]map[*SubscribeRequest]*Subscription), }, @@ -98,7 +89,7 @@ func (e *EventBroker) Publish(events *structs.Events) { // Notify the broker to check running subscriptions against potentially // updated ACL Token or Policy for _, event := range events.Events { - if event.Topic == structs.TopicACLToken || event.Topic == structs.TopicACLPolicy { + if event.Topic == structs.TopicACLToken || event.Topic == structs.TopicACLPolicy || event.Topic == structs.TopicACLRole { e.aclCh <- event } } @@ -106,27 +97,6 @@ func (e *EventBroker) Publish(events *structs.Events) { e.publishCh <- events } -// SubscribeWithACLCheck validates the SubscribeRequest's token and requested -// topics to ensure that the tokens privileges are sufficient. It will also -// return the token expiry time, if any. It is the callers responsibility to -// check this before publishing events to the caller. -func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, *time.Time, error) { - aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token) - if err != nil { - return nil, nil, structs.ErrPermissionDenied - } - - if allowed := aclAllowsSubscription(aclObj, req); !allowed { - return nil, nil, structs.ErrPermissionDenied - } - - sub, err := e.Subscribe(req) - if err != nil { - return nil, nil, err - } - return sub, expiryTime, nil -} - // Subscribe returns a new Subscription for a given request. A Subscription // will receive an initial empty currentItem value which points to the first item // in the buffer. This allows the new subscription to call Next() without first checking @@ -162,6 +132,14 @@ func (e *EventBroker) Subscribe(req *SubscribeRequest) (*Subscription, error) { start.link.next.Store(head) close(start.link.nextCh) + if req.Authenticate == nil { + req.Authenticate = func() error { + return nil + } + } else if err := req.Authenticate(); err != nil { + return nil, err + } + sub := newSubscription(req, start, e.subscriptions.unsubscribeFn(req)) e.subscriptions.add(req, sub) @@ -201,26 +179,8 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { continue } - // If broker cannot fetch state there is nothing more to do - if e.aclDelegate == nil { - continue - } - - aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID) - if err != nil { - e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err) - e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) - continue - } - - if expiryTime != nil && expiryTime.Before(time.Now().UTC()) { - e.logger.Info("ACL token is expired, closing subscriptions") - e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) - continue - } - e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool { - return !aclAllowsSubscription(aclObj, sub.req) + return sub.req.Authenticate() != nil }) case *structs.ACLPolicyEvent, *structs.ACLRoleStreamEvent: @@ -242,159 +202,18 @@ func (e *EventBroker) checkSubscriptionsAgainstACLChange() { e.mu.Lock() defer e.mu.Unlock() - // If broker cannot fetch state there is nothing more to do - if e.aclDelegate == nil { - return - } - - aclSnapshot := e.aclDelegate.TokenProvider() for tokenSecretID := range e.subscriptions.byToken { // if tokenSecretID is empty ACLs were disabled at time of subscribing if tokenSecretID == "" { continue } - aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID) - if err != nil { - e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err) - e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) - continue - } - - if expiryTime != nil && expiryTime.Before(time.Now().UTC()) { - e.logger.Info("ACL token is expired, closing subscriptions") - e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) - continue - } - e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool { - return !aclAllowsSubscription(aclObj, sub.req) + return sub.req.Authenticate() != nil }) } } -func aclObjFromSnapshotForTokenSecretID( - aclSnapshot ACLTokenProvider, aclCache *structs.ACLCache[*acl.ACL], tokenSecretID string) ( - *acl.ACL, *time.Time, error) { - - aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID) - if err != nil { - return nil, nil, err - } - - if aclToken == nil { - return nil, nil, structs.ErrTokenNotFound - } - if aclToken.IsExpired(time.Now().UTC()) { - return nil, nil, structs.ErrTokenExpired - } - - // Check if this is a management token - if aclToken.Type == structs.ACLManagementToken { - return acl.ManagementACL, aclToken.ExpirationTime, nil - } - - aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles)) - - for _, policyName := range aclToken.Policies { - policy, err := aclSnapshot.ACLPolicyByName(nil, policyName) - if err != nil { - return nil, nil, errors.New("error finding acl policy") - } - if policy == nil { - // Ignore policies that don't exist, since they don't grant any - // more privilege. - continue - } - aclPolicies = append(aclPolicies, policy) - } - - // Iterate all the token role links, so we can unpack these and identify - // the ACL policies. - for _, roleLink := range aclToken.Roles { - - role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID) - if err != nil { - return nil, nil, err - } - if role == nil { - continue - } - - for _, policyLink := range role.Policies { - policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name) - if err != nil { - return nil, nil, errors.New("error finding acl policy") - } - if policy == nil { - // Ignore policies that don't exist, since they don't grant any - // more privilege. - continue - } - aclPolicies = append(aclPolicies, policy) - } - } - - aclObj, err := structs.CompileACLObject(aclCache, aclPolicies) - if err != nil { - return nil, nil, err - } - return aclObj, aclToken.ExpirationTime, nil -} - -type ACLTokenProvider interface { - ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) - ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error) - GetACLRoleByID(ws memdb.WatchSet, roleID string) (*structs.ACLRole, error) -} - -type ACLDelegate interface { - TokenProvider() ACLTokenProvider -} - -func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool { - for topic := range subReq.Topics { - switch topic { - case structs.TopicDeployment, - structs.TopicEvaluation, - structs.TopicAllocation, - structs.TopicJob, - structs.TopicService: - if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok { - return false - } - case structs.TopicHostVolume: - if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityHostVolumeRead); !ok { - return false - } - case structs.TopicCSIVolume: - if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityCSIReadVolume); !ok { - return false - } - case structs.TopicCSIPlugin: - if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok { - return false - } - case structs.TopicNode: - if ok := aclObj.AllowNodeRead(); !ok { - return false - } - case structs.TopicNodePool: - // Require management token for node pools since we can't filter - // out node pools the token doesn't have access to. - if ok := aclObj.IsManagement(); !ok { - return false - } - default: - if ok := aclObj.IsManagement(); !ok { - return false - } - } - } - - return true -} - func (s *Subscription) forceClose() { if atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed) { close(s.forceClosed) diff --git a/nomad/stream/event_broker_test.go b/nomad/stream/event_broker_test.go index 618cb4456..d2a458446 100644 --- a/nomad/stream/event_broker_test.go +++ b/nomad/stream/event_broker_test.go @@ -9,14 +9,8 @@ import ( "testing" "time" - "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/helper/pointer" - "github.com/hashicorp/nomad/helper/uuid" - "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -32,7 +26,7 @@ func TestEventBroker_PublishChangesAndSubscribe(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{EventBufferSize: 100}) + publisher, err := NewEventBroker(ctx, EventBrokerCfg{EventBufferSize: 100}) require.NoError(t, err) sub, err := publisher.Subscribe(subscription) @@ -80,7 +74,7 @@ func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{}) + publisher, err := NewEventBroker(ctx, EventBrokerCfg{}) require.NoError(t, err) sub1, err := publisher.Subscribe(&SubscribeRequest{}) @@ -103,7 +97,7 @@ func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) { // TestEventBroker_EmptyReqToken_DistinctSubscriptions tests subscription // hanlding behavior when ACLs are disabled (request Token is empty). // Subscriptions are mapped by their request token. when that token is empty, -// the subscriptions should still be handled indeppendtly of each other when +// the subscriptions should still be handled independently of each other when // unssubscribing. func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) { ci.Parallel(t) @@ -111,7 +105,7 @@ func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{}) + publisher, err := NewEventBroker(ctx, EventBrokerCfg{}) require.NoError(t, err) // first subscription, empty token @@ -129,353 +123,46 @@ func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) { require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state)) } -func TestEventBroker_handleACLUpdates_TokenDeleted(t *testing.T) { +func TestEventBroker_handleACLUpdates(t *testing.T) { ci.Parallel(t) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + secretID := "1234" - publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{}) - require.NoError(t, err) - - sub1, err := publisher.Subscribe(&SubscribeRequest{ - Topics: map[structs.Topic][]string{ - "*": {"*"}, - }, - Token: "foo", - }) - require.NoError(t, err) - defer sub1.Unsubscribe() - - aclEvent := structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenDeleted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: "foo"}), - } - - publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}}) - for { - _, err := sub1.Next(ctx) - if err == ErrSubscriptionClosed { - break - } - } - - out, err := sub1.Next(ctx) - require.Error(t, err) - require.Equal(t, ErrSubscriptionClosed, err) - require.Equal(t, structs.Events{}, out) -} - -type fakeACLDelegate struct { - tokenProvider ACLTokenProvider -} - -func (d *fakeACLDelegate) TokenProvider() ACLTokenProvider { - return d.tokenProvider -} - -type fakeACLTokenProvider struct { - policy *structs.ACLPolicy - policyErr error - token *structs.ACLToken - tokenErr error - role *structs.ACLRole - roleErr error -} - -func (p *fakeACLTokenProvider) ACLTokenBySecretID(_ memdb.WatchSet, _ string) (*structs.ACLToken, error) { - return p.token, p.tokenErr -} - -func (p *fakeACLTokenProvider) ACLPolicyByName(_ memdb.WatchSet, _ string) (*structs.ACLPolicy, error) { - return p.policy, p.policyErr -} - -func (p *fakeACLTokenProvider) GetACLRoleByID(_ memdb.WatchSet, _ string) (*structs.ACLRole, error) { - return p.role, p.roleErr -} - -func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) { - ci.Parallel(t) - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - secretID := "some-secret-id" - cases := []struct { - policyBeforeRules string - policyAfterRules string - topics map[structs.Topic][]string - desc string - event structs.Event - policyEvent structs.Event - shouldUnsubscribe bool - initialSubErr bool + testCases := []struct { + name string + event structs.Event + shouldPassAuth bool }{ { - desc: "subscribed to deployments and removed access", - policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), - policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}), - shouldUnsubscribe: true, + name: "token deleted", event: structs.Event{ - Topic: structs.TopicDeployment, - Type: structs.TypeDeploymentUpdate, - Payload: structs.DeploymentEvent{ - Deployment: &structs.Deployment{ - ID: "some-id", - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenDeleted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, - policyEvent: structs.Event{ + shouldPassAuth: false, // shouldn't matter in token delete event + }, + { + name: "token updated - auth passes", + event: structs.Event{ Topic: structs.TopicACLToken, Type: structs.TypeACLTokenUpserted, Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, + shouldPassAuth: false, }, { - desc: "subscribed to evals and removed access", - policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), - policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}), - shouldUnsubscribe: true, + name: "token updated - auth fails", event: structs.Event{ - Topic: structs.TopicEvaluation, - Type: structs.TypeEvalUpdated, - Payload: structs.EvaluationEvent{ - Evaluation: &structs.Evaluation{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ Topic: structs.TopicACLToken, Type: structs.TypeACLTokenUpserted, Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, + shouldPassAuth: true, }, { - desc: "subscribed to allocs and removed access", - policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), - policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}), - shouldUnsubscribe: true, + name: "policy deleted", event: structs.Event{ - Topic: structs.TopicAllocation, - Type: structs.TypeAllocationUpdated, - Payload: structs.AllocationEvent{ - Allocation: &structs.Allocation{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), - }, - }, - { - desc: "subscribed to nodes and removed access", - policyBeforeRules: mock.NodePolicy(acl.PolicyRead), - policyAfterRules: mock.NodePolicy(acl.PolicyDeny), - shouldUnsubscribe: true, - event: structs.Event{ - Topic: structs.TopicNode, - Type: structs.TypeNodeRegistration, - Payload: structs.NodeStreamEvent{ - Node: &structs.Node{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), - }, - }, - { - desc: "subscribed to evals in all namespaces and removed access", - policyBeforeRules: mock.NamespacePolicy("*", "", []string{acl.NamespaceCapabilityReadJob}), - policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), - shouldUnsubscribe: true, - event: structs.Event{ - Topic: structs.TopicEvaluation, - Type: structs.TypeEvalUpdated, - Namespace: "foo", - Payload: structs.EvaluationEvent{ - Evaluation: &structs.Evaluation{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), - }, - }, - { - desc: "subscribed to deployments and no access change", - policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), - policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), - shouldUnsubscribe: false, - event: structs.Event{ - Topic: structs.TopicDeployment, - Type: structs.TypeDeploymentUpdate, - Payload: structs.DeploymentEvent{ - Deployment: &structs.Deployment{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), - }, - }, - { - desc: "subscribed to evals and no access change", - policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), - policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), - shouldUnsubscribe: false, - event: structs.Event{ - Topic: structs.TopicEvaluation, - Type: structs.TypeEvalUpdated, - Payload: structs.EvaluationEvent{ - Evaluation: &structs.Evaluation{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), - }, - }, - { - desc: "subscribed to allocs and no access change", - policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), - policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), - shouldUnsubscribe: false, - event: structs.Event{ - Topic: structs.TopicAllocation, - Type: structs.TypeAllocationUpdated, - Payload: structs.AllocationEvent{ - Allocation: &structs.Allocation{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), - }, - }, - { - desc: "subscribed to nodes and no access change", - policyBeforeRules: mock.NodePolicy(acl.PolicyRead), - policyAfterRules: mock.NodePolicy(acl.PolicyRead), - shouldUnsubscribe: false, - event: structs.Event{ - Topic: structs.TopicNode, - Type: structs.TypeNodeRegistration, - Payload: structs.NodeStreamEvent{ - Node: &structs.Node{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), - }, - }, - { - desc: "initial token insufficient privileges", - initialSubErr: true, - policyBeforeRules: mock.NodePolicy(acl.PolicyDeny), - event: structs.Event{ - Topic: structs.TopicNode, - Type: structs.TypeNodeRegistration, - Payload: structs.NodeStreamEvent{ - Node: &structs.Node{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), - }, - }, - { - desc: "subscribed to nodes and policy change no change", - policyBeforeRules: mock.NodePolicy(acl.PolicyRead), - policyAfterRules: mock.NodePolicy(acl.PolicyWrite), - shouldUnsubscribe: false, - event: structs.Event{ - Topic: structs.TopicNode, - Type: structs.TypeNodeRegistration, - Payload: structs.NodeStreamEvent{ - Node: &structs.Node{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLPolicy, - Type: structs.TypeACLPolicyUpserted, - Payload: &structs.ACLPolicyEvent{ - ACLPolicy: &structs.ACLPolicy{ - Name: "some-policy", - }, - }, - }, - }, - { - desc: "subscribed to nodes and policy change no access", - policyBeforeRules: mock.NodePolicy(acl.PolicyRead), - policyAfterRules: mock.NodePolicy(acl.PolicyDeny), - shouldUnsubscribe: true, - event: structs.Event{ - Topic: structs.TopicNode, - Type: structs.TypeNodeRegistration, - Payload: structs.NodeStreamEvent{ - Node: &structs.Node{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLPolicy, - Type: structs.TypeACLPolicyUpserted, - Payload: &structs.ACLPolicyEvent{ - ACLPolicy: &structs.ACLPolicy{ - Name: "some-policy", - }, - }, - }, - }, - { - desc: "subscribed to nodes policy deleted", - policyBeforeRules: mock.NodePolicy(acl.PolicyRead), - policyAfterRules: "", - shouldUnsubscribe: true, - event: structs.Event{ - Topic: structs.TopicNode, - Type: structs.TypeNodeRegistration, - Payload: structs.NodeStreamEvent{ - Node: &structs.Node{ - ID: "some-id", - }, - }, - }, - policyEvent: structs.Event{ Topic: structs.TopicACLPolicy, Type: structs.TypeACLPolicyDeleted, Payload: &structs.ACLPolicyEvent{ @@ -484,592 +171,118 @@ func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) { }, }, }, - }, - } - - for _, tc := range cases { - t.Run(tc.desc, func(t *testing.T) { - - policy := &structs.ACLPolicy{ - Name: "some-policy", - Rules: tc.policyBeforeRules, - } - policy.SetHash() - - tokenProvider := &fakeACLTokenProvider{ - policy: policy, - token: &structs.ACLToken{ - SecretID: secretID, - Policies: []string{policy.Name}, - }, - } - - aclDelegate := &fakeACLDelegate{ - tokenProvider: tokenProvider, - } - - publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{}) - require.NoError(t, err) - - var ns string - if tc.event.Namespace != "" { - ns = tc.event.Namespace - } else { - ns = structs.DefaultNamespace - } - - sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ - Topics: map[structs.Topic][]string{ - tc.event.Topic: {"*"}, - }, - Namespace: ns, - Token: secretID, - }) - require.Nil(t, expiryTime) - - if tc.initialSubErr { - require.Error(t, err) - require.Nil(t, sub) - return - } else { - require.NoError(t, err) - } - publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}}) - - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) - defer cancel() - _, err = sub.Next(ctx) - require.NoError(t, err) - - // Update the mock provider to use the after rules - policyAfter := &structs.ACLPolicy{ - Name: "some-new-policy", - Rules: tc.policyAfterRules, - ModifyIndex: 101, // The ModifyIndex is used to caclulate the acl cache key - } - policyAfter.SetHash() - - tokenProvider.policy = policyAfter - - // Publish ACL event triggering subscription re-evaluation - publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}}) - // Publish another event - publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}}) - - // If we are expecting to unsubscribe consume the subscription - // until the expected error occurs. - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) - defer cancel() - if tc.shouldUnsubscribe { - for { - _, err = sub.Next(ctx) - if err != nil { - if err == context.DeadlineExceeded { - require.Fail(t, err.Error()) - } - if err == ErrSubscriptionClosed { - break - } - } - } - } else { - _, err = sub.Next(ctx) - require.NoError(t, err) - } - - publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}}) - - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) - defer cancel() - _, err = sub.Next(ctx) - if tc.shouldUnsubscribe { - require.Equal(t, ErrSubscriptionClosed, err) - } else { - require.NoError(t, err) - } - }) - } -} - -func TestEventBroker_handleACLUpdates_roleUpdated(t *testing.T) { - ci.Parallel(t) - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - // Generate a UUID to use in all tests for the token secret ID and the role - // ID. - tokenSecretID := uuid.Generate() - roleID := uuid.Generate() - - cases := []struct { - name string - aclPolicy *structs.ACLPolicy - roleBeforePolicyLinks []*structs.ACLRolePolicyLink - roleAfterPolicyLinks []*structs.ACLRolePolicyLink - topics map[structs.Topic][]string - event structs.Event - policyEvent structs.Event - shouldUnsubscribe bool - initialSubErr bool - }{ - { - name: "deployments access policy link removed", - aclPolicy: &structs.ACLPolicy{ - Name: "test-event-broker-acl-policy", - Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ - acl.NamespaceCapabilityReadJob}, - ), - }, - roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, - shouldUnsubscribe: true, - event: structs.Event{ - Topic: structs.TopicDeployment, - Type: structs.TypeDeploymentUpdate, - Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}}, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), - }, + shouldPassAuth: false, }, { - name: "evaluations access policy link removed", - aclPolicy: &structs.ACLPolicy{ - Name: "test-event-broker-acl-policy", - Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ - acl.NamespaceCapabilityReadJob}, - ), - }, - roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, - shouldUnsubscribe: true, + name: "policy updated - auth passes", event: structs.Event{ - Topic: structs.TopicEvaluation, - Type: structs.TypeEvalUpdated, - Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}}, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), - }, - }, - { - name: "allocations access policy link removed", - aclPolicy: &structs.ACLPolicy{ - Name: "test-event-broker-acl-policy", - Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ - acl.NamespaceCapabilityReadJob}, - ), - }, - roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, - shouldUnsubscribe: true, - event: structs.Event{ - Topic: structs.TopicAllocation, - Type: structs.TypeAllocationUpdated, - Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}}, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), - }, - }, - { - name: "nodes access policy link removed", - aclPolicy: &structs.ACLPolicy{ - Name: "test-event-broker-acl-policy", - Rules: mock.NodePolicy(acl.PolicyRead), - }, - roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, - shouldUnsubscribe: true, - event: structs.Event{ - Topic: structs.TopicNode, - Type: structs.TypeNodeRegistration, - Payload: structs.NodeStreamEvent{Node: &structs.Node{}}, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), - }, - }, - { - name: "deployment access no change", - aclPolicy: &structs.ACLPolicy{ - Name: "test-event-broker-acl-policy", - Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ - acl.NamespaceCapabilityReadJob}, - ), - }, - roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - shouldUnsubscribe: false, - event: structs.Event{ - Topic: structs.TopicDeployment, - Type: structs.TypeDeploymentUpdate, - Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}}, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), - }, - }, - { - name: "evaluations access no change", - aclPolicy: &structs.ACLPolicy{ - Name: "test-event-broker-acl-policy", - Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ - acl.NamespaceCapabilityReadJob}, - ), - }, - roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - shouldUnsubscribe: false, - event: structs.Event{ - Topic: structs.TopicEvaluation, - Type: structs.TypeEvalUpdated, - Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}}, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), - }, - }, - { - name: "allocations access no change", - aclPolicy: &structs.ACLPolicy{ - Name: "test-event-broker-acl-policy", - Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ - acl.NamespaceCapabilityReadJob}, - ), - }, - roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - shouldUnsubscribe: false, - event: structs.Event{ - Topic: structs.TopicAllocation, - Type: structs.TypeAllocationUpdated, - Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}}, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), - }, - }, - { - name: "nodes access no change", - aclPolicy: &structs.ACLPolicy{ - Name: "test-event-broker-acl-policy", - Rules: mock.NodePolicy(acl.PolicyRead), - }, - roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, - shouldUnsubscribe: false, - event: structs.Event{ - Topic: structs.TopicNode, - Type: structs.TypeNodeRegistration, - Payload: structs.NodeStreamEvent{Node: &structs.Node{}}, - }, - policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), - }, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - - // Build our fake token provider containing the relevant state - // objects and add this to our new delegate. Keeping the token - // provider setup separate means we can easily update its state. - tokenProvider := &fakeACLTokenProvider{ - policy: tc.aclPolicy, - token: &structs.ACLToken{ - SecretID: tokenSecretID, - Roles: []*structs.ACLTokenRoleLink{{ID: roleID}}, - }, - role: &structs.ACLRole{ - ID: uuid.Short(), - Policies: []*structs.ACLRolePolicyLink{ - {Name: tc.aclPolicy.Name}, + Topic: structs.TopicACLPolicy, + Type: structs.TypeACLPolicyUpserted, + Payload: &structs.ACLPolicyEvent{ + ACLPolicy: &structs.ACLPolicy{ + Name: "some-policy", }, }, - } - aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider} - - publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{}) - require.NoError(t, err) - - ns := structs.DefaultNamespace - if tc.event.Namespace != "" { - ns = tc.event.Namespace - } - - sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ - Topics: map[structs.Topic][]string{tc.event.Topic: {"*"}}, - Namespace: ns, - Token: tokenSecretID, - }) - require.Nil(t, expiryTime) - - if tc.initialSubErr { - require.Error(t, err) - require.Nil(t, sub) - return - } - - require.NoError(t, err) - publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}}) - - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) - defer cancel() - _, err = sub.Next(ctx) - require.NoError(t, err) - - // Overwrite the ACL role policy links with the updated version - // which is expected to cause a change in the subscription. - tokenProvider.role.Policies = tc.roleAfterPolicyLinks - - // Publish ACL event triggering subscription re-evaluation - publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}}) - publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}}) - - // If we are expecting to unsubscribe consume the subscription - // until the expected error occurs. - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) - defer cancel() - if tc.shouldUnsubscribe { - for { - _, err = sub.Next(ctx) - if err != nil { - if err == context.DeadlineExceeded { - require.Fail(t, err.Error()) - } - if err == ErrSubscriptionClosed { - break - } - } - } - } else { - _, err = sub.Next(ctx) - require.NoError(t, err) - } - - publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}}) - - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) - defer cancel() - _, err = sub.Next(ctx) - if tc.shouldUnsubscribe { - require.Equal(t, ErrSubscriptionClosed, err) - } else { - require.NoError(t, err) - } - }) - } -} - -func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) { - ci.Parallel(t) - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - cases := []struct { - name string - inputToken *structs.ACLToken - shouldExpire bool - }{ - { - name: "token does not expire", - inputToken: &structs.ACLToken{ - AccessorID: uuid.Generate(), - SecretID: uuid.Generate(), - ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()), - Type: structs.ACLManagementToken, }, - shouldExpire: false, + shouldPassAuth: true, }, { - name: "token does expire", - inputToken: &structs.ACLToken{ - AccessorID: uuid.Generate(), - SecretID: uuid.Generate(), - ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()), - Type: structs.ACLManagementToken, - }, - shouldExpire: true, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - - // Build our fake token provider containing the relevant state - // objects and add this to our new delegate. Keeping the token - // provider setup separate means we can easily update its state. - tokenProvider := &fakeACLTokenProvider{token: tc.inputToken} - aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider} - - publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{}) - require.NoError(t, err) - - fakeNodeEvent := structs.Event{ - Topic: structs.TopicNode, - Type: structs.TypeNodeRegistration, - Payload: structs.NodeStreamEvent{Node: &structs.Node{}}, - } - - fakeTokenEvent := structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}), - } - - sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ - Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}}, - Token: tc.inputToken.SecretID, - }) - require.NoError(t, err) - require.NotNil(t, sub) - require.NotNil(t, expiryTime) - - // Publish an event and check that there is a new item in the - // subscription queue. - publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{fakeNodeEvent}}) - - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) - defer cancel() - _, err = sub.Next(ctx) - require.NoError(t, err) - - // If the test states the token should expire, set the expiration - // time to a previous time. - if tc.shouldExpire { - tokenProvider.token.ExpirationTime = pointer.Of( - time.Date(1987, time.April, 13, 8, 3, 0, 0, time.UTC), - ) - } - - // Publish some events to trigger re-evaluation of the subscription. - publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{fakeTokenEvent}}) - publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{fakeNodeEvent}}) - - // If we are expecting to unsubscribe consume the subscription - // until the expected error occurs. - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) - defer cancel() - - if tc.shouldExpire { - for { - if _, err = sub.Next(ctx); err != nil { - if err == context.DeadlineExceeded { - require.Fail(t, err.Error()) - } - if err == ErrSubscriptionClosed { - break - } - } - } - } else { - _, err = sub.Next(ctx) - require.NoError(t, err) - } - }) - } -} - -func TestEventBroker_NodePool_ACL(t *testing.T) { - ci.Parallel(t) - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - testCases := []struct { - name string - token *structs.ACLToken - policy *structs.ACLPolicy - expectedErr string - }{ - { - name: "management token", - token: &structs.ACLToken{ - AccessorID: uuid.Generate(), - SecretID: uuid.Generate(), - Type: structs.ACLManagementToken, + name: "policy updated - auth fails", + event: structs.Event{ + Topic: structs.TopicACLPolicy, + Type: structs.TypeACLTokenUpserted, + Payload: &structs.ACLPolicyEvent{ + ACLPolicy: &structs.ACLPolicy{ + Name: "some-policy", + }, + }, }, + shouldPassAuth: false, }, { - name: "client token", - token: &structs.ACLToken{ - AccessorID: uuid.Generate(), - SecretID: uuid.Generate(), - Type: structs.ACLClientToken, + name: "role delete", + event: structs.Event{ + Topic: structs.TopicACLRole, + Type: structs.TypeACLRoleDeleted, + Payload: &structs.ACLRoleStreamEvent{ + ACLRole: &structs.ACLRole{ + ID: "1234", + }, + }, }, - expectedErr: structs.ErrPermissionDenied.Error(), + shouldPassAuth: false, }, { - name: "node pool read", - token: &structs.ACLToken{ - AccessorID: uuid.Generate(), - SecretID: uuid.Generate(), - Type: structs.ACLClientToken, - Policies: []string{"node-pool-read"}, + name: "role updated - auth passes", + event: structs.Event{ + Topic: structs.TopicACLRole, + Type: structs.TypeACLRoleUpserted, + Payload: &structs.ACLRoleStreamEvent{ + ACLRole: &structs.ACLRole{ + ID: "1234", + }, + }, }, - policy: &structs.ACLPolicy{ - Name: "node-pool-read", - Rules: `node_pool "*" { policy = "read" }`, - }, - expectedErr: structs.ErrPermissionDenied.Error(), + shouldPassAuth: true, }, { - name: "node pool write", - token: &structs.ACLToken{ - AccessorID: uuid.Generate(), - SecretID: uuid.Generate(), - Type: structs.ACLClientToken, - Policies: []string{"node-pool-write"}, + name: "role updated - auth fails", + event: structs.Event{ + Topic: structs.TopicACLRole, + Type: structs.TypeACLRoleUpserted, + Payload: &structs.ACLRoleStreamEvent{ + ACLRole: &structs.ACLRole{ + ID: "1234", + }, + }, }, - policy: &structs.ACLPolicy{ - Name: "node-pool-write", - Rules: `node_pool "*" { policy = "write" }`, - }, - expectedErr: structs.ErrPermissionDenied.Error(), + shouldPassAuth: false, }, } for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - tokenProvider := &fakeACLTokenProvider{token: tc.token, policy: tc.policy} - aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider} + ctx := context.Background() - publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{}) - must.NoError(t, err) + publisher, err := NewEventBroker(ctx, EventBrokerCfg{}) + require.NoError(t, err) - _, _, err = publisher.SubscribeWithACLCheck(&SubscribeRequest{ - Topics: map[structs.Topic][]string{structs.TopicNodePool: {"*"}}, - Token: tc.token.SecretID, - }) + testSubReq := &SubscribeRequest{ + Topics: map[structs.Topic][]string{ + "*": {"*"}, + }, + Token: secretID, + Authenticate: func() error { + return nil + }, + } - if tc.expectedErr != "" { - must.ErrorContains(t, err, tc.expectedErr) - } else { - must.NoError(t, err) + sub, err := publisher.Subscribe(testSubReq) + require.NoError(t, err) + + if !tc.shouldPassAuth { + testSubReq.Authenticate = func() error { + return structs.ErrPermissionDenied } - }) - } + } + // publish the ACL event + publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}}) + + _, err = sub.Next(ctx) + + ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + + // try to read another event + _, err = sub.Next(ctx) + if !tc.shouldPassAuth { + require.ErrorIs(t, err, ErrSubscriptionClosed) + } else { + require.ErrorIs(t, err, context.DeadlineExceeded) + } + + sub.Unsubscribe() + cancel() + } } func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go index c1a8119c3..474829988 100644 --- a/nomad/stream/subscription.go +++ b/nomad/stream/subscription.go @@ -59,6 +59,11 @@ type SubscribeRequest struct { // the closest index in the buffer will be returned if there is not // an exact match StartExactlyAtIndex bool + + // Authenticate is a callback that authenticates the token + // associated with the SubscribeRequest has not expired and + // has the correct permissions + Authenticate func() error } func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {