diff --git a/.changelog/15013.txt b/.changelog/15013.txt new file mode 100644 index 000000000..2007a55a1 --- /dev/null +++ b/.changelog/15013.txt @@ -0,0 +1,3 @@ +```release-note:security +event stream: Fixed a bug where ACL token expiration was not checked when emitting events +``` diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index 2628b11a1..dcfaf49a2 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -59,9 +59,13 @@ func (e *Event) stream(conn io.ReadWriteCloser) { // start subscription to publisher var subscription *stream.Subscription var subErr error + + // Track whether the ACL token being used has an expiry time. + var expiryTime *time.Time + // Check required ACL permissions for requested Topics if e.srv.config.ACLEnabled { - subscription, subErr = publisher.SubscribeWithACLCheck(subReq) + subscription, expiryTime, subErr = publisher.SubscribeWithACLCheck(subReq) } else { subscription, subErr = publisher.Subscribe(subReq) } @@ -93,6 +97,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) { return } + // Ensure the token being used is not expired before we any events + // to subscribers. + if expiryTime != nil && expiryTime.Before(time.Now().UTC()) { + select { + case errCh <- structs.ErrTokenExpired: + case <-ctx.Done(): + } + return + } + // Continue if there are no events if len(events.Events) == 0 { continue diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go index 31dddfa7e..b0231a485 100644 --- a/nomad/event_endpoint_test.go +++ b/nomad/event_endpoint_test.go @@ -15,11 +15,13 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/mitchellh/mapstructure" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -625,3 +627,117 @@ OUTER: } } } + +// TestEventStream_ACLTokenExpiry ensure a subscription does not receive events +// and is closed once the token has expired. +func TestEventStream_ACLTokenExpiry(t *testing.T) { + ci.Parallel(t) + + // Start our test server and wait until we have a leader. + testServer, _, testServerCleanup := TestACLServer(t, nil) + defer testServerCleanup() + testutil.WaitForLeader(t, testServer.RPC) + + // Create and upsert and ACL token which has a short expiry set. + aclTokenWithExpiry := mock.ACLManagementToken() + aclTokenWithExpiry.ExpirationTime = pointer.Of(time.Now().Add(2 * time.Second)) + + must.NoError(t, testServer.fsm.State().UpsertACLTokens( + structs.MsgTypeTestSetup, 10, []*structs.ACLToken{aclTokenWithExpiry})) + + req := structs.EventStreamRequest{ + Topics: map[structs.Topic][]string{"Job": {"*"}}, + QueryOptions: structs.QueryOptions{ + Region: testServer.Region(), + Namespace: structs.DefaultNamespace, + AuthToken: aclTokenWithExpiry.SecretID, + }, + } + + handler, err := testServer.StreamingRpcHandler("Event.Stream") + must.NoError(t, err) + + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *structs.EventStreamWrapper) + + go handler(p2) + + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg structs.EventStreamWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %w", err) + } + + streamMsg <- &msg + } + }() + + publisher, err := testServer.State().EventBroker() + must.NoError(t, err) + + jobEvent := structs.JobEvent{ + Job: mock.Job(), + } + + // send req + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + must.Nil(t, encoder.Encode(req)) + + // publish some events + publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}}) + publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}}) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second)) + defer cancel() + + errChStream := make(chan error, 1) + go func() { + for { + select { + case <-ctx.Done(): + errChStream <- ctx.Err() + return + case err := <-errCh: + errChStream <- err + return + case msg := <-streamMsg: + if msg.Error == nil { + continue + } + + errChStream <- msg.Error + return + } + } + }() + + // Generate a timeout for the test and for the expiry. The expiry timeout + // is used to trigger an update which will close the subscription as the + // event stream only reacts to change in state. + testTimeout := time.After(4 * time.Second) + expiryTimeout := time.After(time.Until(*aclTokenWithExpiry.ExpirationTime)) + + for { + select { + case <-testTimeout: + t.Fatal("timeout waiting for event stream to close") + case err := <-errCh: + t.Fatal(err) + case <-expiryTimeout: + publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}}) + case err := <-errChStream: + // Success + must.StrContains(t, err.Error(), "ACL token expired") + return + } + } +} diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 179ffbce6..8ecf33ebd 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -109,19 +109,25 @@ func (e *EventBroker) Publish(events *structs.Events) { e.publishCh <- events } -// SubscribeWithACLCheck validates the SubscribeRequest's token and requested Topics -// to ensure that the tokens privileges are sufficient enough. -func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, error) { - aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token) +// SubscribeWithACLCheck validates the SubscribeRequest's token and requested +// topics to ensure that the tokens privileges are sufficient. It will also +// return the token expiry time, if any. It is the callers responsibility to +// check this before publishing events to the caller. +func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, *time.Time, error) { + aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token) if err != nil { - return nil, structs.ErrPermissionDenied + return nil, nil, structs.ErrPermissionDenied } if allowed := aclAllowsSubscription(aclObj, req); !allowed { - return nil, structs.ErrPermissionDenied + return nil, nil, structs.ErrPermissionDenied } - return e.Subscribe(req) + sub, err := e.Subscribe(req) + if err != nil { + return nil, nil, err + } + return sub, expiryTime, nil } // Subscribe returns a new Subscription for a given request. A Subscription @@ -203,13 +209,19 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { continue } - aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID) + aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID) if err != nil || aclObj == nil { e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err) e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) continue } + if expiryTime != nil && expiryTime.Before(time.Now().UTC()) { + e.logger.Info("ACL token is expired, closing subscriptions") + e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) + continue + } + e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool { return !aclAllowsSubscription(aclObj, sub.req) }) @@ -245,13 +257,19 @@ func (e *EventBroker) checkSubscriptionsAgainstACLChange() { continue } - aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID) + aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID) if err != nil || aclObj == nil { e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err) e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) continue } + if expiryTime != nil && expiryTime.Before(time.Now().UTC()) { + e.logger.Info("ACL token is expired, closing subscriptions") + e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) + continue + } + e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool { return !aclAllowsSubscription(aclObj, sub.req) }) @@ -259,23 +277,24 @@ func (e *EventBroker) checkSubscriptionsAgainstACLChange() { } func aclObjFromSnapshotForTokenSecretID( - aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) { + aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) ( + *acl.ACL, *time.Time, error) { aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID) if err != nil { - return nil, err + return nil, nil, err } if aclToken == nil { - return nil, structs.ErrTokenNotFound + return nil, nil, structs.ErrTokenNotFound } if aclToken.IsExpired(time.Now().UTC()) { - return nil, structs.ErrTokenExpired + return nil, nil, structs.ErrTokenExpired } // Check if this is a management token if aclToken.Type == structs.ACLManagementToken { - return acl.ManagementACL, nil + return acl.ManagementACL, aclToken.ExpirationTime, nil } aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles)) @@ -283,7 +302,7 @@ func aclObjFromSnapshotForTokenSecretID( for _, policyName := range aclToken.Policies { policy, err := aclSnapshot.ACLPolicyByName(nil, policyName) if err != nil || policy == nil { - return nil, errors.New("error finding acl policy") + return nil, nil, errors.New("error finding acl policy") } aclPolicies = append(aclPolicies, policy) } @@ -294,7 +313,7 @@ func aclObjFromSnapshotForTokenSecretID( role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID) if err != nil { - return nil, err + return nil, nil, err } if role == nil { continue @@ -303,13 +322,17 @@ func aclObjFromSnapshotForTokenSecretID( for _, policyLink := range role.Policies { policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name) if err != nil || policy == nil { - return nil, errors.New("error finding acl policy") + return nil, nil, errors.New("error finding acl policy") } aclPolicies = append(aclPolicies, policy) } } - return structs.CompileACLObject(aclCache, aclPolicies) + aclObj, err := structs.CompileACLObject(aclCache, aclPolicies) + if err != nil { + return nil, nil, err + } + return aclObj, aclToken.ExpirationTime, nil } type ACLTokenProvider interface { diff --git a/nomad/stream/event_broker_test.go b/nomad/stream/event_broker_test.go index 5ccf24cea..dfeeb6177 100644 --- a/nomad/stream/event_broker_test.go +++ b/nomad/stream/event_broker_test.go @@ -514,13 +514,14 @@ func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) { ns = structs.DefaultNamespace } - sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ + sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ Topics: map[structs.Topic][]string{ tc.event.Topic: {"*"}, }, Namespace: ns, Token: secretID, }) + require.Nil(t, expiryTime) if tc.initialSubErr { require.Error(t, err) @@ -811,11 +812,12 @@ func TestEventBroker_handleACLUpdates_roleUpdated(t *testing.T) { ns = tc.event.Namespace } - sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ + sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ Topics: map[structs.Topic][]string{tc.event.Topic: {"*"}}, Namespace: ns, Token: tokenSecretID, }) + require.Nil(t, expiryTime) if tc.initialSubErr { require.Error(t, err) @@ -931,12 +933,13 @@ func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) { Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}), } - sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ + sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}}, Token: tc.inputToken.SecretID, }) require.NoError(t, err) require.NotNil(t, sub) + require.NotNil(t, expiryTime) // Publish an event and check that there is a new item in the // subscription queue.