mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
event stream: add events for dynamic host volumes (#24721)
Add a new topic to the event stream for host volumes. We'll emit events when a dynamic host volume is registered or deregistered, and whenever a node fingerprints with a changed volume. Ref: https://hashicorp.atlassian.net/browse/NET-11549
This commit is contained in:
@@ -83,7 +83,7 @@ func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, tim
|
||||
}
|
||||
maxAttempts--
|
||||
if maxAttempts == 0 {
|
||||
require.Failf(t, "reached max attempts waiting for desired event count", "count %d", len(got))
|
||||
require.Failf(t, "reached max attempts waiting for desired event count", "count %d got: %+v", len(got), got)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
@@ -41,6 +41,8 @@ var MsgTypeEvents = map[structs.MessageType]string{
|
||||
structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration,
|
||||
structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration,
|
||||
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
|
||||
structs.HostVolumeRegisterRequestType: structs.TypeHostVolumeRegistered,
|
||||
structs.HostVolumeDeleteRequestType: structs.TypeHostVolumeDeleted,
|
||||
}
|
||||
|
||||
func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
|
||||
@@ -181,6 +183,24 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
||||
Service: before,
|
||||
},
|
||||
}, true
|
||||
case TableHostVolumes:
|
||||
before, ok := change.Before.(*structs.HostVolume)
|
||||
if !ok {
|
||||
return structs.Event{}, false
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicHostVolume,
|
||||
Key: before.ID,
|
||||
FilterKeys: []string{
|
||||
before.ID,
|
||||
before.Name,
|
||||
before.PluginID,
|
||||
},
|
||||
Namespace: before.Namespace,
|
||||
Payload: &structs.HostVolumeEvent{
|
||||
Volume: before,
|
||||
},
|
||||
}, true
|
||||
}
|
||||
return structs.Event{}, false
|
||||
}
|
||||
@@ -358,6 +378,24 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
||||
Service: after,
|
||||
},
|
||||
}, true
|
||||
case TableHostVolumes:
|
||||
after, ok := change.After.(*structs.HostVolume)
|
||||
if !ok {
|
||||
return structs.Event{}, false
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicHostVolume,
|
||||
Key: after.ID,
|
||||
FilterKeys: []string{
|
||||
after.ID,
|
||||
after.Name,
|
||||
after.PluginID,
|
||||
},
|
||||
Namespace: after.Namespace,
|
||||
Payload: &structs.HostVolumeEvent{
|
||||
Volume: after,
|
||||
},
|
||||
}, true
|
||||
}
|
||||
|
||||
return structs.Event{}, false
|
||||
|
||||
@@ -1215,6 +1215,49 @@ func Test_eventsFromChanges_ACLBindingRule(t *testing.T) {
|
||||
must.Eq(t, bindingRule, receivedDeleteChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule)
|
||||
}
|
||||
|
||||
func TestEvents_HostVolumes(t *testing.T) {
|
||||
|
||||
ci.Parallel(t)
|
||||
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
||||
defer store.StopEventBroker()
|
||||
|
||||
index, err := store.LatestIndex()
|
||||
must.NoError(t, err)
|
||||
|
||||
node := mock.Node()
|
||||
index++
|
||||
must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node, NodeUpsertWithNodePool))
|
||||
|
||||
vol := mock.HostVolume()
|
||||
vol.NodeID = node.ID
|
||||
index++
|
||||
must.NoError(t, store.UpsertHostVolume(index, vol))
|
||||
|
||||
node = node.Copy()
|
||||
node.HostVolumes = map[string]*structs.ClientHostVolumeConfig{vol.Name: {
|
||||
Name: vol.Name,
|
||||
Path: "/var/nomad/alloc_mounts" + uuid.Generate(),
|
||||
}}
|
||||
index++
|
||||
must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node, NodeUpsertWithNodePool))
|
||||
|
||||
index++
|
||||
must.NoError(t, store.DeleteHostVolume(index, vol.Namespace, vol.ID))
|
||||
|
||||
events := WaitForEvents(t, store, 0, 5, 1*time.Second)
|
||||
must.Len(t, 5, events)
|
||||
must.Eq(t, "Node", events[0].Topic)
|
||||
must.Eq(t, "NodeRegistration", events[0].Type)
|
||||
must.Eq(t, "HostVolume", events[1].Topic)
|
||||
must.Eq(t, "HostVolumeRegistered", events[1].Type)
|
||||
must.Eq(t, "Node", events[2].Topic)
|
||||
must.Eq(t, "NodeRegistration", events[2].Type)
|
||||
must.Eq(t, "HostVolume", events[3].Topic)
|
||||
must.Eq(t, "NodeRegistration", events[3].Type)
|
||||
must.Eq(t, "HostVolume", events[4].Topic)
|
||||
must.Eq(t, "HostVolumeDeleted", events[4].Type)
|
||||
}
|
||||
|
||||
func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
|
||||
t.Helper()
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs
|
||||
|
||||
// UpsertHostVolume upserts a host volume
|
||||
func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) error {
|
||||
txn := s.db.WriteTxn(index)
|
||||
txn := s.db.WriteTxnMsgT(structs.HostVolumeRegisterRequestType, index)
|
||||
defer txn.Abort()
|
||||
|
||||
if exists, err := s.namespaceExists(txn, vol.Namespace); err != nil {
|
||||
@@ -117,7 +117,7 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err
|
||||
|
||||
// DeleteHostVolume deletes a host volume
|
||||
func (s *StateStore) DeleteHostVolume(index uint64, ns string, id string) error {
|
||||
txn := s.db.WriteTxn(index)
|
||||
txn := s.db.WriteTxnMsgT(structs.HostVolumeDeleteRequestType, index)
|
||||
defer txn.Abort()
|
||||
|
||||
obj, err := txn.First(TableHostVolumes, indexID, ns, id)
|
||||
|
||||
@@ -363,6 +363,10 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
|
||||
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
|
||||
return false
|
||||
}
|
||||
case structs.TopicHostVolume:
|
||||
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityHostVolumeRead); !ok {
|
||||
return false
|
||||
}
|
||||
case structs.TopicNode:
|
||||
if ok := aclObj.AllowNodeRead(); !ok {
|
||||
return false
|
||||
|
||||
@@ -31,6 +31,7 @@ const (
|
||||
TopicACLAuthMethod Topic = "ACLAuthMethod"
|
||||
TopicACLBindingRule Topic = "ACLBindingRule"
|
||||
TopicService Topic = "Service"
|
||||
TopicHostVolume Topic = "HostVolume"
|
||||
TopicAll Topic = "*"
|
||||
|
||||
TypeNodeRegistration = "NodeRegistration"
|
||||
@@ -63,6 +64,8 @@ const (
|
||||
TypeACLBindingRuleDeleted = "ACLBindingRuleDeleted"
|
||||
TypeServiceRegistration = "ServiceRegistration"
|
||||
TypeServiceDeregistration = "ServiceDeregistration"
|
||||
TypeHostVolumeRegistered = "HostVolumeRegistered"
|
||||
TypeHostVolumeDeleted = "HostVolumeDeleted"
|
||||
)
|
||||
|
||||
// Event represents a change in Nomads state.
|
||||
@@ -188,3 +191,9 @@ type ACLAuthMethodEvent struct {
|
||||
type ACLBindingRuleEvent struct {
|
||||
ACLBindingRule *ACLBindingRule
|
||||
}
|
||||
|
||||
// HostVolumeEvent holds a newly updated or deleted dynamic host volume to be
|
||||
// used as an event in the event stream
|
||||
type HostVolumeEvent struct {
|
||||
Volume *HostVolume
|
||||
}
|
||||
|
||||
@@ -28,19 +28,20 @@ the nature of this endpoint individual topics require specific policies.
|
||||
Note that if you do not include a `topic` parameter all topics will be included
|
||||
by default, requiring a management token.
|
||||
|
||||
| Topic | ACL Required |
|
||||
| ------------ | -------------------- |
|
||||
| `*` | `management` |
|
||||
| `ACLToken` | `management` |
|
||||
| `ACLPolicy` | `management` |
|
||||
| `ACLRole` | `management` |
|
||||
| `Job` | `namespace:read-job` |
|
||||
| `Allocation` | `namespace:read-job` |
|
||||
| `Deployment` | `namespace:read-job` |
|
||||
| `Evaluation` | `namespace:read-job` |
|
||||
| `Node` | `node:read` |
|
||||
| `NodePool` | `management` |
|
||||
| `Service` | `namespace:read-job` |
|
||||
| Topic | ACL Required |
|
||||
|--------------|------------------------------|
|
||||
| `*` | `management` |
|
||||
| `ACLPolicy` | `management` |
|
||||
| `ACLRole` | `management` |
|
||||
| `ACLToken` | `management` |
|
||||
| `Allocation` | `namespace:read-job` |
|
||||
| `Deployment` | `namespace:read-job` |
|
||||
| `Evaluation` | `namespace:read-job` |
|
||||
| `HostVolume` | `namespace:host-volume-read` |
|
||||
| `Job` | `namespace:read-job` |
|
||||
| `NodePool` | `management` |
|
||||
| `Node` | `node:read` |
|
||||
| `Service` | `namespace:read-job` |
|
||||
|
||||
### Parameters
|
||||
|
||||
@@ -65,50 +66,54 @@ by default, requiring a management token.
|
||||
|
||||
### Event Topics
|
||||
|
||||
| Topic | Output |
|
||||
| ---------- | ------------------------------- |
|
||||
| ACLToken | ACLToken |
|
||||
| ACLPolicy | ACLPolicy |
|
||||
| ACLRoles | ACLRole |
|
||||
| Allocation | Allocation (no job information) |
|
||||
| Job | Job |
|
||||
| Evaluation | Evaluation |
|
||||
| Deployment | Deployment |
|
||||
| Node | Node |
|
||||
| NodeDrain | Node |
|
||||
| NodePool | NodePool |
|
||||
| Service | Service Registrations |
|
||||
| Topic | Output |
|
||||
|------------|----------------------------------------|
|
||||
| ACLPolicy | ACLPolicy |
|
||||
| ACLRoles | ACLRole |
|
||||
| ACLToken | ACLToken |
|
||||
| Allocation | Allocation (no job information) |
|
||||
| Deployment | Deployment |
|
||||
| Evaluation | Evaluation |
|
||||
| HostVolume | HostVolume (dynamic host volumes only) |
|
||||
| Job | Job |
|
||||
| Node | Node |
|
||||
| NodeDrain | Node |
|
||||
| NodePool | NodePool |
|
||||
| Service | Service Registrations |
|
||||
|
||||
### Event Types
|
||||
|
||||
| Type |
|
||||
| ----------------------------- |
|
||||
| ACLTokenUpserted |
|
||||
| ACLTokenDeleted |
|
||||
| ACLPolicyUpserted |
|
||||
|-------------------------------|
|
||||
| ACLPolicyDeleted |
|
||||
| ACLRoleUpserted |
|
||||
| ACLPolicyUpserted |
|
||||
| ACLRoleDeleted |
|
||||
| ACLRoleUpserted |
|
||||
| ACLTokenDeleted |
|
||||
| ACLTokenUpserted |
|
||||
| AllocationCreated |
|
||||
| AllocationUpdated |
|
||||
| AllocationUpdateDesiredStatus |
|
||||
| DeploymentStatusUpdate |
|
||||
| DeploymentPromotion |
|
||||
| AllocationUpdated |
|
||||
| DeploymentAllocHealth |
|
||||
| DeploymentPromotion |
|
||||
| DeploymentStatusUpdate |
|
||||
| EvaluationUpdated |
|
||||
| JobRegistered |
|
||||
| JobDeregistered |
|
||||
| HostVolumeDeleted |
|
||||
| HostVolumeRegistered |
|
||||
| JobBatchDeregistered |
|
||||
| NodeRegistration |
|
||||
| JobDeregistered |
|
||||
| JobRegistered |
|
||||
| NodeDeregistration |
|
||||
| NodeEligibility |
|
||||
| NodeDrain |
|
||||
| NodeEligibility |
|
||||
| NodeEvent |
|
||||
| NodePoolUpserted |
|
||||
| NodePoolDeleted |
|
||||
| NodePoolUpserted |
|
||||
| NodeRegistration |
|
||||
| PlanResult |
|
||||
| ServiceRegistration |
|
||||
| ServiceDeregistration |
|
||||
| ServiceRegistration |
|
||||
|
||||
|
||||
### Sample Request
|
||||
|
||||
|
||||
Reference in New Issue
Block a user