Events/acl events (#9595)

* fix acl event creation

* allow way to access secretID without exposing it to stream

test that values are omitted

test event creation

test acl events

payloads are pointers

fix failing tests, do all security steps inside constructor

* increase time

* ignore empty tokens

* uncomment line

* changelog
This commit is contained in:
Drew Bailey
2020-12-11 10:40:50 -05:00
committed by GitHub
parent 893211f6f6
commit 3e793ea3c4
9 changed files with 287 additions and 94 deletions

View File

@@ -3,6 +3,9 @@
IMPROVEMENTS:
* consul/connect: interpolate the connect, service meta, and service canary meta blocks with the task environment [[GH-9586](https://github.com/hashicorp/nomad/pull/9586)]
BUG FIXES:
* core: Fixed a bug where ACLToken and ACLPolicy changes were ignored by the event stream [[GH-9595](https://github.com/hashicorp/nomad/issues/9595)]
## 1.0.0 (December 8, 2020)
FEATURES:

View File

@@ -15,6 +15,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
@@ -3276,3 +3277,148 @@ func TestFSM_SnapshotRestore_Namespaces(t *testing.T) {
t.Fatalf("bad: \n%#v\n%#v", out2, ns2)
}
}
func TestFSM_ACLEvents(t *testing.T) {
t.Parallel()
cases := []struct {
desc string
setupfn func(t *testing.T, fsm *nomadFSM)
raftReq func(t *testing.T) []byte
reqTopic structs.Topic
eventfn func(t *testing.T, e []structs.Event)
}{
{
desc: "ACLToken upserted",
raftReq: func(t *testing.T) []byte {
req := structs.ACLTokenUpsertRequest{
Tokens: []*structs.ACLToken{mock.ACLToken()},
}
buf, err := structs.Encode(structs.ACLTokenUpsertRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLToken,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLToken)
require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID)
require.Equal(t, e[0].Type, structs.TypeACLTokenUpserted)
},
},
{
desc: "ACLToken deleted",
setupfn: func(t *testing.T, fsm *nomadFSM) {
token := mock.ACLToken()
token.SecretID = "26be01d3-df3a-45e9-9f49-4487a3dc3496"
token.AccessorID = "b971acba-bbe5-4274-bdfa-8bb1f542a8c1"
require.NoError(t,
fsm.State().UpsertACLTokens(
structs.MsgTypeTestSetup, 10, []*structs.ACLToken{token}))
},
raftReq: func(t *testing.T) []byte {
req := structs.ACLTokenDeleteRequest{
AccessorIDs: []string{"b971acba-bbe5-4274-bdfa-8bb1f542a8c1"},
}
buf, err := structs.Encode(structs.ACLTokenDeleteRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLToken,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLToken)
require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID)
require.Equal(t, e[0].Type, structs.TypeACLTokenDeleted)
},
},
{
desc: "ACLPolicy upserted",
raftReq: func(t *testing.T) []byte {
req := structs.ACLPolicyUpsertRequest{
Policies: []*structs.ACLPolicy{mock.ACLPolicy()},
}
buf, err := structs.Encode(structs.ACLPolicyUpsertRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLPolicy,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLPolicy)
require.Equal(t, e[0].Type, structs.TypeACLPolicyUpserted)
},
},
{
desc: "ACLPolicy deleted",
setupfn: func(t *testing.T, fsm *nomadFSM) {
policy := mock.ACLPolicy()
policy.Name = "some-policy"
require.NoError(t,
fsm.State().UpsertACLPolicies(
structs.MsgTypeTestSetup, 10, []*structs.ACLPolicy{policy}))
},
raftReq: func(t *testing.T) []byte {
req := structs.ACLPolicyDeleteRequest{
Names: []string{"some-policy"},
}
buf, err := structs.Encode(structs.ACLPolicyDeleteRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLPolicy,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLPolicy)
require.Equal(t, e[0].Type, structs.TypeACLPolicyDeleted)
},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
fsm := testFSM(t)
// Setup any state necessary
if tc.setupfn != nil {
tc.setupfn(t, fsm)
}
// Apply the log
resp := fsm.Apply(makeLog(tc.raftReq(t)))
require.Nil(t, resp)
broker, err := fsm.State().EventBroker()
require.NoError(t, err)
subReq := &stream.SubscribeRequest{
Topics: map[structs.Topic][]string{
tc.reqTopic: {"*"},
},
}
sub, err := broker.Subscribe(subReq)
require.NoError(t, err)
var events []structs.Event
testutil.WaitForResult(func() (bool, error) {
out, err := sub.NextNoBlock()
require.NoError(t, err)
if out == nil {
return false, fmt.Errorf("expected events got nil")
}
events = out
return true, nil
}, func(err error) {
require.Fail(t, err.Error())
})
tc.eventfn(t, events)
})
}
}

View File

@@ -56,12 +56,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLToken,
Key: before.AccessorID,
Payload: structs.ACLTokenEvent{
ACLToken: before,
},
Topic: structs.TopicACLToken,
Key: before.AccessorID,
Payload: structs.NewACLTokenEvent(before),
}, true
case "acl_policy":
before, ok := change.Before.(*structs.ACLPolicy)
@@ -71,7 +70,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: before.Name,
Payload: structs.ACLPolicyEvent{
Payload: &structs.ACLPolicyEvent{
ACLPolicy: before,
},
}, true
@@ -102,12 +101,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLToken,
Key: after.AccessorID,
Payload: structs.ACLTokenEvent{
ACLToken: after,
},
Topic: structs.TopicACLToken,
Key: after.AccessorID,
Payload: structs.NewACLTokenEvent(after),
}, true
case "acl_policy":
after, ok := change.After.(*structs.ACLPolicy)
@@ -117,7 +115,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: after.Name,
Payload: structs.ACLPolicyEvent{
Payload: &structs.ACLPolicyEvent{
ACLPolicy: after,
},
}, true

View File

@@ -41,6 +41,59 @@ func TestEventFromChange_SingleEventPerTable(t *testing.T) {
require.Equal(t, out.Events[0].Type, structs.TypeJobRegistered)
}
func TestEventFromChange_ACLTokenSecretID(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
token := mock.ACLToken()
require.NotEmpty(t, token.SecretID)
// Create
changes := Changes{
Index: 100,
MsgType: structs.NodeRegisterRequestType,
Changes: memdb.Changes{
{
Table: "acl_token",
Before: nil,
After: token,
},
},
}
out := eventsFromChanges(s.db.ReadTxn(), changes)
require.Len(t, out.Events, 1)
// Ensure original value not altered
require.NotEmpty(t, token.SecretID)
aclTokenEvent, ok := out.Events[0].Payload.(*structs.ACLTokenEvent)
require.True(t, ok)
require.Empty(t, aclTokenEvent.ACLToken.SecretID)
require.Equal(t, token.SecretID, aclTokenEvent.SecretID())
// Delete
changes = Changes{
Index: 100,
MsgType: structs.NodeDeregisterRequestType,
Changes: memdb.Changes{
{
Table: "acl_token",
Before: token,
After: nil,
},
},
}
out2 := eventsFromChanges(s.db.ReadTxn(), changes)
require.Len(t, out2.Events, 1)
tokenEvent2, ok := out2.Events[0].Payload.(*structs.ACLTokenEvent)
require.True(t, ok)
require.Empty(t, tokenEvent2.ACLToken.SecretID)
}
// TestEventFromChange_NodeSecretID ensures that a node's secret ID is not
// included in a node event
func TestEventFromChange_NodeSecretID(t *testing.T) {

View File

@@ -5028,7 +5028,7 @@ func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.J
// UpsertACLPolicies is used to create or update a set of ACL policies
func (s *StateStore) UpsertACLPolicies(msgType structs.MessageType, index uint64, policies []*structs.ACLPolicy) error {
txn := s.db.WriteTxn(index)
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
for _, policy := range policies {
@@ -5128,7 +5128,7 @@ func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error
// UpsertACLTokens is used to create or update a set of ACL tokens
func (s *StateStore) UpsertACLTokens(msgType structs.MessageType, index uint64, tokens []*structs.ACLToken) error {
txn := s.db.WriteTxn(index)
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
for _, token := range tokens {

View File

@@ -29,8 +29,7 @@ type EventBrokerCfg struct {
type EventBroker struct {
// mu protects subscriptions
mu sync.Mutex
mu sync.Mutex
subscriptions *subscriptions
// eventBuf stores a configurable amount of events in memory
@@ -189,8 +188,8 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
return
case update := <-e.aclCh:
switch payload := update.Payload.(type) {
case structs.ACLTokenEvent:
tokenSecretID := payload.ACLToken.SecretID
case *structs.ACLTokenEvent:
tokenSecretID := payload.SecretID()
// Token was deleted
if update.Type == structs.TypeACLTokenDeleted {
@@ -214,7 +213,7 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
return !aclAllowsSubscription(aclObj, sub.req)
})
case structs.ACLPolicyEvent:
case *structs.ACLPolicyEvent:
// Re-evaluate each subscriptions permissions since a policy
// change may or may not affect the subscription
e.checkSubscriptionsAgainstPolicyChange()
@@ -240,9 +239,14 @@ func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() {
aclSnapshot := e.aclDelegate.TokenProvider()
for tokenSecretID := range e.subscriptions.byToken {
// if tokenSecretID is empty ACLs were disabled at time of subscribing
if tokenSecretID == "" {
continue
}
aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID)
if err != nil || aclObj == nil {
e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}

View File

@@ -116,7 +116,7 @@ func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) {
require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state))
}
func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) {
func TestEventBroker_handleACLUpdates_TokenDeleted(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
@@ -133,13 +133,9 @@ func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) {
defer sub1.Unsubscribe()
aclEvent := structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenDeleted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: "foo",
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenDeleted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: "foo"}),
}
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}})
@@ -209,13 +205,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
@@ -233,13 +225,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
@@ -257,13 +245,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
@@ -281,13 +265,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
@@ -305,13 +285,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
@@ -329,13 +305,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
@@ -353,13 +325,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
@@ -377,13 +345,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
@@ -400,13 +364,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.ACLTokenEvent{
ACLToken: &structs.ACLToken{
SecretID: secretID,
},
},
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
@@ -426,7 +386,7 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyUpserted,
Payload: structs.ACLPolicyEvent{
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
@@ -450,7 +410,7 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyUpserted,
Payload: structs.ACLPolicyEvent{
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
@@ -474,7 +434,7 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyDeleted,
Payload: structs.ACLPolicyEvent{
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},

View File

@@ -120,6 +120,23 @@ type NodeStreamEvent struct {
type ACLTokenEvent struct {
ACLToken *ACLToken
secretID string
}
// NewACLTokenEvent takes a token and creates a new ACLTokenEvent. It creates
// a copy of the passed in ACLToken and empties out the copied tokens SecretID
func NewACLTokenEvent(token *ACLToken) *ACLTokenEvent {
c := token.Copy()
c.SecretID = ""
return &ACLTokenEvent{
ACLToken: c,
secretID: token.SecretID,
}
}
func (a *ACLTokenEvent) SecretID() string {
return a.secretID
}
type ACLPolicyEvent struct {

View File

@@ -10813,6 +10813,18 @@ type ACLToken struct {
ModifyIndex uint64
}
func (a *ACLToken) Copy() *ACLToken {
c := new(ACLToken)
*c = *a
c.Policies = make([]string, len(a.Policies))
copy(c.Policies, a.Policies)
c.Hash = make([]byte, len(a.Hash))
copy(c.Hash, a.Hash)
return c
}
var (
// AnonymousACLToken is used no SecretID is provided, and the
// request is made anonymously.