mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
event stream: add events for CSI volumes and plugins (#24724)
Adds new topics to the event stream for CSI volumes and CSI plugins. We'll emit event when either is created or deleted, and when CSI volumes are claimed.
This commit is contained in:
3
.changelog/24724.txt
Normal file
3
.changelog/24724.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:improvement
|
||||
csi: Added CSI volume and plugin events to the event stream
|
||||
```
|
||||
@@ -43,6 +43,9 @@ var MsgTypeEvents = map[structs.MessageType]string{
|
||||
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
|
||||
structs.HostVolumeRegisterRequestType: structs.TypeHostVolumeRegistered,
|
||||
structs.HostVolumeDeleteRequestType: structs.TypeHostVolumeDeleted,
|
||||
structs.CSIVolumeRegisterRequestType: structs.TypeCSIVolumeRegistered,
|
||||
structs.CSIVolumeDeregisterRequestType: structs.TypeCSIVolumeDeregistered,
|
||||
structs.CSIVolumeClaimRequestType: structs.TypeCSIVolumeClaim,
|
||||
}
|
||||
|
||||
func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
|
||||
@@ -190,7 +193,6 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicHostVolume,
|
||||
Key: before.ID,
|
||||
FilterKeys: []string{
|
||||
before.ID,
|
||||
before.Name,
|
||||
@@ -201,6 +203,40 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
||||
Volume: before,
|
||||
},
|
||||
}, true
|
||||
case TableCSIVolumes:
|
||||
before, ok := change.Before.(*structs.CSIVolume)
|
||||
if !ok {
|
||||
return structs.Event{}, false
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicCSIVolume,
|
||||
Key: before.ID,
|
||||
FilterKeys: []string{
|
||||
before.ID,
|
||||
before.Name,
|
||||
before.PluginID,
|
||||
},
|
||||
Namespace: before.Namespace,
|
||||
Payload: &structs.CSIVolumeEvent{
|
||||
Volume: before,
|
||||
},
|
||||
}, true
|
||||
case TableCSIPlugins:
|
||||
// note: there is no CSIPlugin event type, because CSI plugins don't
|
||||
// have their own write RPCs; they are always created/removed via
|
||||
// node updates
|
||||
before, ok := change.Before.(*structs.CSIPlugin)
|
||||
if !ok {
|
||||
return structs.Event{}, false
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicCSIPlugin,
|
||||
Key: before.ID,
|
||||
FilterKeys: []string{before.ID},
|
||||
Payload: &structs.CSIPluginEvent{
|
||||
Plugin: before,
|
||||
},
|
||||
}, true
|
||||
}
|
||||
return structs.Event{}, false
|
||||
}
|
||||
@@ -396,6 +432,40 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
|
||||
Volume: after,
|
||||
},
|
||||
}, true
|
||||
case TableCSIVolumes:
|
||||
after, ok := change.After.(*structs.CSIVolume)
|
||||
if !ok {
|
||||
return structs.Event{}, false
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicCSIVolume,
|
||||
Key: after.ID,
|
||||
FilterKeys: []string{
|
||||
after.ID,
|
||||
after.Name,
|
||||
after.PluginID,
|
||||
},
|
||||
Namespace: after.Namespace,
|
||||
Payload: &structs.CSIVolumeEvent{
|
||||
Volume: after,
|
||||
},
|
||||
}, true
|
||||
case TableCSIPlugins:
|
||||
// note: there is no CSIPlugin event type, because CSI plugins don't
|
||||
// have their own write RPCs; they are always created/removed via
|
||||
// node updates
|
||||
after, ok := change.After.(*structs.CSIPlugin)
|
||||
if !ok {
|
||||
return structs.Event{}, false
|
||||
}
|
||||
return structs.Event{
|
||||
Topic: structs.TopicCSIPlugin,
|
||||
Key: after.ID,
|
||||
FilterKeys: []string{after.ID},
|
||||
Payload: &structs.CSIPluginEvent{
|
||||
Plugin: after,
|
||||
},
|
||||
}, true
|
||||
}
|
||||
|
||||
return structs.Event{}, false
|
||||
|
||||
@@ -1216,7 +1216,6 @@ func Test_eventsFromChanges_ACLBindingRule(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEvents_HostVolumes(t *testing.T) {
|
||||
|
||||
ci.Parallel(t)
|
||||
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
||||
defer store.StopEventBroker()
|
||||
@@ -1258,6 +1257,84 @@ func TestEvents_HostVolumes(t *testing.T) {
|
||||
must.Eq(t, "HostVolumeDeleted", events[4].Type)
|
||||
}
|
||||
|
||||
func TestEvents_CSIVolumes(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
||||
defer store.StopEventBroker()
|
||||
|
||||
index, err := store.LatestIndex()
|
||||
must.NoError(t, err)
|
||||
|
||||
plugin := mock.CSIPlugin()
|
||||
vol := mock.CSIVolume(plugin)
|
||||
|
||||
index++
|
||||
must.NoError(t, store.UpsertCSIVolume(index, []*structs.CSIVolume{vol}))
|
||||
|
||||
alloc := mock.Alloc()
|
||||
index++
|
||||
store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc})
|
||||
|
||||
claim := &structs.CSIVolumeClaim{
|
||||
AllocationID: alloc.ID,
|
||||
NodeID: uuid.Generate(),
|
||||
Mode: structs.CSIVolumeClaimGC,
|
||||
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
|
||||
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
|
||||
State: structs.CSIVolumeClaimStateReadyToFree,
|
||||
}
|
||||
index++
|
||||
must.NoError(t, store.CSIVolumeClaim(index, time.Now().UnixNano(), vol.Namespace, vol.ID, claim))
|
||||
|
||||
index++
|
||||
must.NoError(t, store.CSIVolumeDeregister(index, vol.Namespace, []string{vol.ID}, false))
|
||||
|
||||
events := WaitForEvents(t, store, 0, 3, 1*time.Second)
|
||||
must.Len(t, 3, events)
|
||||
must.Eq(t, "CSIVolume", events[0].Topic)
|
||||
must.Eq(t, "CSIVolumeRegistered", events[0].Type)
|
||||
must.Eq(t, "CSIVolume", events[1].Topic)
|
||||
must.Eq(t, "CSIVolumeClaim", events[1].Type)
|
||||
must.Eq(t, "CSIVolume", events[2].Topic)
|
||||
must.Eq(t, "CSIVolumeDeregistered", events[2].Type)
|
||||
|
||||
}
|
||||
|
||||
func TestEvents_CSIPlugins(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
||||
defer store.StopEventBroker()
|
||||
|
||||
index, err := store.LatestIndex()
|
||||
must.NoError(t, err)
|
||||
|
||||
node := mock.Node()
|
||||
plugin := mock.CSIPlugin()
|
||||
|
||||
index++
|
||||
must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node))
|
||||
|
||||
node = node.Copy()
|
||||
node.CSINodePlugins = map[string]*structs.CSIInfo{
|
||||
plugin.ID: {
|
||||
PluginID: plugin.ID,
|
||||
Healthy: true,
|
||||
UpdateTime: time.Now(),
|
||||
},
|
||||
}
|
||||
index++
|
||||
must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node))
|
||||
|
||||
events := WaitForEvents(t, store, 0, 3, 1*time.Second)
|
||||
must.Len(t, 3, events)
|
||||
must.Eq(t, "Node", events[0].Topic)
|
||||
must.Eq(t, "NodeRegistration", events[0].Type)
|
||||
must.Eq(t, "Node", events[1].Topic)
|
||||
must.Eq(t, "NodeRegistration", events[1].Type)
|
||||
must.Eq(t, "CSIPlugin", events[2].Topic)
|
||||
must.Eq(t, "NodeRegistration", events[2].Type)
|
||||
}
|
||||
|
||||
func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
|
||||
t.Helper()
|
||||
|
||||
|
||||
@@ -27,6 +27,8 @@ const (
|
||||
TableAllocs = "allocs"
|
||||
TableJobSubmission = "job_submission"
|
||||
TableHostVolumes = "host_volumes"
|
||||
TableCSIVolumes = "csi_volumes"
|
||||
TableCSIPlugins = "csi_plugins"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -1150,7 +1152,7 @@ func clusterMetaTableSchema() *memdb.TableSchema {
|
||||
// CSIVolumes are identified by id globally, and searchable by driver
|
||||
func csiVolumeTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: "csi_volumes",
|
||||
Name: TableCSIVolumes,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": {
|
||||
Name: "id",
|
||||
@@ -1182,7 +1184,7 @@ func csiVolumeTableSchema() *memdb.TableSchema {
|
||||
// CSIPlugins are identified by id globally, and searchable by driver
|
||||
func csiPluginTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: "csi_plugins",
|
||||
Name: TableCSIPlugins,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": {
|
||||
Name: "id",
|
||||
|
||||
@@ -1398,7 +1398,7 @@ func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEv
|
||||
func upsertCSIPluginsForNode(txn *txn, node *structs.Node, index uint64) error {
|
||||
|
||||
upsertFn := func(info *structs.CSIInfo) error {
|
||||
raw, err := txn.First("csi_plugins", "id", info.PluginID)
|
||||
raw, err := txn.First(TableCSIPlugins, "id", info.PluginID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("csi_plugin lookup error: %s %v", info.PluginID, err)
|
||||
}
|
||||
@@ -1429,7 +1429,7 @@ func upsertCSIPluginsForNode(txn *txn, node *structs.Node, index uint64) error {
|
||||
|
||||
plug.ModifyIndex = index
|
||||
|
||||
err = txn.Insert("csi_plugins", plug)
|
||||
err = txn.Insert(TableCSIPlugins, plug)
|
||||
if err != nil {
|
||||
return fmt.Errorf("csi_plugins insert error: %v", err)
|
||||
}
|
||||
@@ -1458,7 +1458,7 @@ func upsertCSIPluginsForNode(txn *txn, node *structs.Node, index uint64) error {
|
||||
|
||||
// remove the client node from any plugin that's not
|
||||
// running on it.
|
||||
iter, err := txn.Get("csi_plugins", "id")
|
||||
iter, err := txn.Get(TableCSIPlugins, "id")
|
||||
if err != nil {
|
||||
return fmt.Errorf("csi_plugins lookup failed: %v", err)
|
||||
}
|
||||
@@ -1503,7 +1503,7 @@ func upsertCSIPluginsForNode(txn *txn, node *structs.Node, index uint64) error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
|
||||
if err := txn.Insert("index", &IndexEntry{TableCSIPlugins, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
@@ -1525,7 +1525,7 @@ func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
|
||||
}
|
||||
|
||||
for id := range names {
|
||||
raw, err := txn.First("csi_plugins", "id", id)
|
||||
raw, err := txn.First(TableCSIPlugins, "id", id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("csi_plugins lookup error %s: %v", id, err)
|
||||
}
|
||||
@@ -1546,7 +1546,7 @@ func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
|
||||
if err := txn.Insert("index", &IndexEntry{TableCSIPlugins, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
@@ -1556,13 +1556,13 @@ func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
|
||||
// updateOrGCPlugin updates a plugin but will delete it if the plugin is empty
|
||||
func updateOrGCPlugin(index uint64, txn Txn, plug *structs.CSIPlugin) error {
|
||||
if plug.IsEmpty() {
|
||||
err := txn.Delete("csi_plugins", plug)
|
||||
err := txn.Delete(TableCSIPlugins, plug)
|
||||
if err != nil {
|
||||
return fmt.Errorf("csi_plugins delete error: %v", err)
|
||||
}
|
||||
} else {
|
||||
plug.ModifyIndex = index
|
||||
err := txn.Insert("csi_plugins", plug)
|
||||
err := txn.Insert(TableCSIPlugins, plug)
|
||||
if err != nil {
|
||||
return fmt.Errorf("csi_plugins update error %s: %v", plug.ID, err)
|
||||
}
|
||||
@@ -1661,7 +1661,7 @@ func (s *StateStore) deleteJobFromPlugins(index uint64, txn Txn, job *structs.Jo
|
||||
}
|
||||
|
||||
if len(plugins) > 0 {
|
||||
if err = txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
|
||||
if err = txn.Insert("index", &IndexEntry{TableCSIPlugins, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -2561,7 +2561,7 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string)
|
||||
|
||||
// UpsertCSIVolume inserts a volume in the state store.
|
||||
func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) error {
|
||||
txn := s.db.WriteTxn(index)
|
||||
txn := s.db.WriteTxnMsgT(structs.CSIVolumeRegisterRequestType, index)
|
||||
defer txn.Abort()
|
||||
|
||||
for _, v := range volumes {
|
||||
@@ -2571,7 +2571,7 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume)
|
||||
return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace)
|
||||
}
|
||||
|
||||
obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID)
|
||||
obj, err := txn.First(TableCSIVolumes, "id", v.Namespace, v.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("volume existence check error: %v", err)
|
||||
}
|
||||
@@ -2600,13 +2600,13 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume)
|
||||
v.WriteAllocs[allocID] = nil
|
||||
}
|
||||
|
||||
err = txn.Insert("csi_volumes", v)
|
||||
err = txn.Insert(TableCSIVolumes, v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("volume insert: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
|
||||
if err := txn.Insert("index", &IndexEntry{TableCSIVolumes, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
@@ -2619,7 +2619,7 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error)
|
||||
txn := s.db.ReadTxn()
|
||||
defer txn.Abort()
|
||||
|
||||
iter, err := txn.Get("csi_volumes", "id")
|
||||
iter, err := txn.Get(TableCSIVolumes, "id")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("csi_volumes lookup failed: %v", err)
|
||||
}
|
||||
@@ -2635,7 +2635,7 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error)
|
||||
func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", namespace, id)
|
||||
watchCh, obj, err := txn.FirstWatch(TableCSIVolumes, "id", namespace, id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("volume lookup failed for %s: %v", id, err)
|
||||
}
|
||||
@@ -2656,7 +2656,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st
|
||||
func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, pluginID string) (memdb.ResultIterator, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
iter, err := txn.Get("csi_volumes", "plugin_id", pluginID)
|
||||
iter, err := txn.Get(TableCSIVolumes, "plugin_id", pluginID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("volume lookup failed: %v", err)
|
||||
}
|
||||
@@ -2684,7 +2684,7 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID
|
||||
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID)
|
||||
iter, err := txn.Get(TableCSIVolumes, "id_prefix", namespace, volumeID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2698,7 +2698,7 @@ func (s *StateStore) csiVolumeByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
// Walk the entire csi_volumes table
|
||||
iter, err := txn.Get("csi_volumes", "id")
|
||||
iter, err := txn.Get(TableCSIVolumes, "id")
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -2750,7 +2750,7 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string
|
||||
txn := s.db.ReadTxn()
|
||||
for id, namespace := range ids {
|
||||
if strings.HasPrefix(id, prefix) {
|
||||
watchCh, raw, err := txn.FirstWatch("csi_volumes", "id", namespace, id)
|
||||
watchCh, raw, err := txn.FirstWatch(TableCSIVolumes, "id", namespace, id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
|
||||
}
|
||||
@@ -2771,7 +2771,7 @@ func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace, prefix
|
||||
|
||||
func (s *StateStore) csiVolumesByNamespaceImpl(txn *txn, ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) {
|
||||
|
||||
iter, err := txn.Get("csi_volumes", "id_prefix", namespace, prefix)
|
||||
iter, err := txn.Get(TableCSIVolumes, "id_prefix", namespace, prefix)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("volume lookup failed: %v", err)
|
||||
}
|
||||
@@ -2783,10 +2783,10 @@ func (s *StateStore) csiVolumesByNamespaceImpl(txn *txn, ws memdb.WatchSet, name
|
||||
|
||||
// CSIVolumeClaim updates the volume's claim count and allocation list
|
||||
func (s *StateStore) CSIVolumeClaim(index uint64, now int64, namespace, id string, claim *structs.CSIVolumeClaim) error {
|
||||
txn := s.db.WriteTxn(index)
|
||||
txn := s.db.WriteTxnMsgT(structs.CSIVolumeClaimRequestType, index)
|
||||
defer txn.Abort()
|
||||
|
||||
row, err := txn.First("csi_volumes", "id", namespace, id)
|
||||
row, err := txn.First(TableCSIVolumes, "id", namespace, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("volume lookup failed: %s: %v", id, err)
|
||||
}
|
||||
@@ -2844,11 +2844,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, now int64, namespace, id strin
|
||||
volume.WriteAllocs[allocID] = nil
|
||||
}
|
||||
|
||||
if err = txn.Insert("csi_volumes", volume); err != nil {
|
||||
if err = txn.Insert(TableCSIVolumes, volume); err != nil {
|
||||
return fmt.Errorf("volume update failed: %s: %v", id, err)
|
||||
}
|
||||
|
||||
if err = txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
|
||||
if err = txn.Insert("index", &IndexEntry{TableCSIVolumes, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
@@ -2857,11 +2857,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, now int64, namespace, id strin
|
||||
|
||||
// CSIVolumeDeregister removes the volume from the server
|
||||
func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []string, force bool) error {
|
||||
txn := s.db.WriteTxn(index)
|
||||
txn := s.db.WriteTxnMsgT(structs.CSIVolumeDeregisterRequestType, index)
|
||||
defer txn.Abort()
|
||||
|
||||
for _, id := range ids {
|
||||
existing, err := txn.First("csi_volumes", "id", namespace, id)
|
||||
existing, err := txn.First(TableCSIVolumes, "id", namespace, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("volume lookup failed: %s: %v", id, err)
|
||||
}
|
||||
@@ -2885,12 +2885,12 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s
|
||||
}
|
||||
}
|
||||
|
||||
if err = txn.Delete("csi_volumes", existing); err != nil {
|
||||
if err = txn.Delete(TableCSIVolumes, existing); err != nil {
|
||||
return fmt.Errorf("volume delete failed: %s: %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
|
||||
if err := txn.Insert("index", &IndexEntry{TableCSIVolumes, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
@@ -3072,7 +3072,7 @@ func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error)
|
||||
txn := s.db.ReadTxn()
|
||||
defer txn.Abort()
|
||||
|
||||
iter, err := txn.Get("csi_plugins", "id")
|
||||
iter, err := txn.Get(TableCSIPlugins, "id")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("csi_plugins lookup failed: %v", err)
|
||||
}
|
||||
@@ -3086,7 +3086,7 @@ func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error)
|
||||
func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
iter, err := txn.Get("csi_plugins", "id_prefix", pluginID)
|
||||
iter, err := txn.Get(TableCSIPlugins, "id_prefix", pluginID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -3110,7 +3110,7 @@ func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPl
|
||||
// CSIPluginByIDTxn returns a named CSIPlugin
|
||||
func (s *StateStore) CSIPluginByIDTxn(txn Txn, ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) {
|
||||
|
||||
watchCh, obj, err := txn.FirstWatch("csi_plugins", "id", id)
|
||||
watchCh, obj, err := txn.FirstWatch(TableCSIPlugins, "id", id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("csi_plugin lookup failed: %s %v", id, err)
|
||||
}
|
||||
@@ -3167,7 +3167,7 @@ func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) erro
|
||||
txn := s.db.WriteTxn(index)
|
||||
defer txn.Abort()
|
||||
|
||||
existing, err := txn.First("csi_plugins", "id", plug.ID)
|
||||
existing, err := txn.First(TableCSIPlugins, "id", plug.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("csi_plugin lookup error: %s %v", plug.ID, err)
|
||||
}
|
||||
@@ -3178,11 +3178,11 @@ func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) erro
|
||||
plug.CreateTime = existing.(*structs.CSIPlugin).CreateTime
|
||||
}
|
||||
|
||||
err = txn.Insert("csi_plugins", plug)
|
||||
err = txn.Insert(TableCSIPlugins, plug)
|
||||
if err != nil {
|
||||
return fmt.Errorf("csi_plugins insert error: %v", err)
|
||||
}
|
||||
if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
|
||||
if err := txn.Insert("index", &IndexEntry{TableCSIPlugins, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
return txn.Commit()
|
||||
@@ -3242,7 +3242,7 @@ func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error {
|
||||
return structs.ErrCSIPluginInUse
|
||||
}
|
||||
|
||||
err = txn.Delete("csi_plugins", plug)
|
||||
err = txn.Delete(TableCSIPlugins, plug)
|
||||
if err != nil {
|
||||
return fmt.Errorf("csi_plugins delete error: %v", err)
|
||||
}
|
||||
@@ -5900,13 +5900,13 @@ func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, t
|
||||
}
|
||||
|
||||
for _, plugIn := range plugIns {
|
||||
err = txn.Insert("csi_plugins", plugIn)
|
||||
err = txn.Insert(TableCSIPlugins, plugIn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("csi_plugins insert error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
|
||||
if err := txn.Insert("index", &IndexEntry{TableCSIPlugins, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -181,7 +181,7 @@ func (r *StateRestore) ScalingPolicyRestore(scalingPolicy *structs.ScalingPolicy
|
||||
|
||||
// CSIPluginRestore is used to restore a CSI plugin
|
||||
func (r *StateRestore) CSIPluginRestore(plugin *structs.CSIPlugin) error {
|
||||
if err := r.txn.Insert("csi_plugins", plugin); err != nil {
|
||||
if err := r.txn.Insert(TableCSIPlugins, plugin); err != nil {
|
||||
return fmt.Errorf("csi plugin insert failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
@@ -189,7 +189,7 @@ func (r *StateRestore) CSIPluginRestore(plugin *structs.CSIPlugin) error {
|
||||
|
||||
// CSIVolumeRestore is used to restore a CSI volume
|
||||
func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error {
|
||||
if err := r.txn.Insert("csi_volumes", volume); err != nil {
|
||||
if err := r.txn.Insert(TableCSIVolumes, volume); err != nil {
|
||||
return fmt.Errorf("csi volume insert failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -367,6 +367,14 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
|
||||
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityHostVolumeRead); !ok {
|
||||
return false
|
||||
}
|
||||
case structs.TopicCSIVolume:
|
||||
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityCSIReadVolume); !ok {
|
||||
return false
|
||||
}
|
||||
case structs.TopicCSIPlugin:
|
||||
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
|
||||
return false
|
||||
}
|
||||
case structs.TopicNode:
|
||||
if ok := aclObj.AllowNodeRead(); !ok {
|
||||
return false
|
||||
|
||||
@@ -32,6 +32,8 @@ const (
|
||||
TopicACLBindingRule Topic = "ACLBindingRule"
|
||||
TopicService Topic = "Service"
|
||||
TopicHostVolume Topic = "HostVolume"
|
||||
TopicCSIVolume Topic = "CSIVolume"
|
||||
TopicCSIPlugin Topic = "CSIPlugin"
|
||||
TopicAll Topic = "*"
|
||||
|
||||
TypeNodeRegistration = "NodeRegistration"
|
||||
@@ -66,6 +68,9 @@ const (
|
||||
TypeServiceDeregistration = "ServiceDeregistration"
|
||||
TypeHostVolumeRegistered = "HostVolumeRegistered"
|
||||
TypeHostVolumeDeleted = "HostVolumeDeleted"
|
||||
TypeCSIVolumeRegistered = "CSIVolumeRegistered"
|
||||
TypeCSIVolumeDeregistered = "CSIVolumeDeregistered"
|
||||
TypeCSIVolumeClaim = "CSIVolumeClaim"
|
||||
)
|
||||
|
||||
// Event represents a change in Nomads state.
|
||||
@@ -197,3 +202,15 @@ type ACLBindingRuleEvent struct {
|
||||
type HostVolumeEvent struct {
|
||||
Volume *HostVolume
|
||||
}
|
||||
|
||||
// CSIVolumeEvent holds a newly updated or deleted CSI volume to be
|
||||
// used as an event in the event stream
|
||||
type CSIVolumeEvent struct {
|
||||
Volume *CSIVolume
|
||||
}
|
||||
|
||||
// CSIPluginEvent holds a newly updated or deleted CSI plugin to be
|
||||
// used as an event in the event stream
|
||||
type CSIPluginEvent struct {
|
||||
Plugin *CSIPlugin
|
||||
}
|
||||
|
||||
@@ -141,9 +141,9 @@ func (w *Watcher) getVolumes(ctx context.Context, minIndex uint64) ([]*structs.C
|
||||
}
|
||||
|
||||
// getVolumesImpl retrieves all volumes from the passed state store.
|
||||
func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
|
||||
func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {
|
||||
|
||||
iter, err := state.CSIVolumes(ws)
|
||||
iter, err := store.CSIVolumes(ws)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
@@ -159,7 +159,7 @@ func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (in
|
||||
}
|
||||
|
||||
// Use the last index that affected the volume table
|
||||
index, err := state.Index("csi_volumes")
|
||||
index, err := store.Index(state.TableCSIVolumes)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ the nature of this endpoint individual topics require specific policies.
|
||||
Note that if you do not include a `topic` parameter all topics will be included
|
||||
by default, requiring a management token.
|
||||
|
||||
|
||||
| Topic | ACL Required |
|
||||
|--------------|------------------------------|
|
||||
| `*` | `management` |
|
||||
@@ -35,6 +36,8 @@ by default, requiring a management token.
|
||||
| `ACLRole` | `management` |
|
||||
| `ACLToken` | `management` |
|
||||
| `Allocation` | `namespace:read-job` |
|
||||
| `CSIPlugin` | `namespace:read-job` |
|
||||
| `CSIVolume` | `namespace:csi-read-volume` |
|
||||
| `Deployment` | `namespace:read-job` |
|
||||
| `Evaluation` | `namespace:read-job` |
|
||||
| `HostVolume` | `namespace:host-volume-read` |
|
||||
@@ -72,6 +75,8 @@ by default, requiring a management token.
|
||||
| ACLRoles | ACLRole |
|
||||
| ACLToken | ACLToken |
|
||||
| Allocation | Allocation (no job information) |
|
||||
| CSIPlugin | CSIPlugin |
|
||||
| CSIVolume | CSIVolume |
|
||||
| Deployment | Deployment |
|
||||
| Evaluation | Evaluation |
|
||||
| HostVolume | HostVolume (dynamic host volumes only) |
|
||||
@@ -94,6 +99,8 @@ by default, requiring a management token.
|
||||
| AllocationCreated |
|
||||
| AllocationUpdateDesiredStatus |
|
||||
| AllocationUpdated |
|
||||
| CSIVolumeDeregistered |
|
||||
| CSIVolumeRegistered |
|
||||
| DeploymentAllocHealth |
|
||||
| DeploymentPromotion |
|
||||
| DeploymentStatusUpdate |
|
||||
|
||||
Reference in New Issue
Block a user