From ad99a4501266cd6d3291d0d1798f0d20d3c66da9 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 3 Mar 2022 17:27:04 -0500 Subject: [PATCH] Fix CSI volume list with prefix and `*` namespace (#12184) When using a prefix value and the * wildcard for namespace, the endpoint would not take the prefix value into consideration due to the order in which the checks were executed but also the logic for retrieving volumes from the state store. This commit changes the order to check for a prefix first and wraps the result iterator of the state store query in a filter to apply the prefix. --- .changelog/12184.txt | 3 +++ nomad/csi_endpoint.go | 8 +++++--- nomad/csi_endpoint_test.go | 16 ++++++++++++++++ nomad/state/state_store.go | 31 ++++++++++++++++++++++++++++++- 4 files changed, 54 insertions(+), 4 deletions(-) create mode 100644 .changelog/12184.txt diff --git a/.changelog/12184.txt b/.changelog/12184.txt new file mode 100644 index 000000000..e885a9ad5 --- /dev/null +++ b/.changelog/12184.txt @@ -0,0 +1,3 @@ +```release-note:bug +api: Apply prefix filter when querying CSI volumes in all namespaces +``` diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 092fc16ff..66083452f 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -129,10 +129,12 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV iter, err = snap.CSIVolumesByNodeID(ws, prefix, args.NodeID) } else if args.PluginID != "" { iter, err = snap.CSIVolumesByPluginID(ws, ns, prefix, args.PluginID) - } else if ns == structs.AllNamespacesSentinel { - iter, err = snap.CSIVolumes(ws) - } else { + } else if prefix != "" { + iter, err = snap.CSIVolumesByIDPrefix(ws, ns, prefix) + } else if ns != structs.AllNamespacesSentinel { iter, err = snap.CSIVolumesByNamespace(ws, ns, prefix) + } else { + iter, err = snap.CSIVolumes(ws) } if err != nil { diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index e4235ee21..4b8275064 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -736,6 +736,22 @@ func TestCSIVolumeEndpoint_ListAllNamespaces(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(1001), resp.Index) require.Len(t, resp.Volumes, len(vols)) + + // Lookup volumes in all namespaces with prefix + get = &structs.CSIVolumeListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Prefix: id0[:4], + Namespace: "*", + }, + } + var resp2 structs.CSIVolumeListResponse + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.List", get, &resp2) + require.NoError(t, err) + require.Equal(t, uint64(1001), resp.Index) + require.Len(t, resp2.Volumes, 1) + require.Equal(t, vols[0].ID, resp2.Volumes[0].ID) + require.Equal(t, structs.DefaultNamespace, resp2.Volumes[0].Namespace) } func TestCSIVolumeEndpoint_Create(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index b8215b4ba..4cc1902aa 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2265,8 +2265,13 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, } // CSIVolumesByIDPrefix supports search. Caller should snapshot if it wants to -// also denormalize the plugins. +// also denormalize the plugins. If using a prefix with the wildcard namespace, +// the results will not use the index prefix. func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) { + if namespace == structs.AllNamespacesSentinel { + return s.csiVolumeByIDPrefixAllNamespaces(ws, volumeID) + } + txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID) @@ -2279,6 +2284,30 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID return iter, nil } +func (s *StateStore) csiVolumeByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + // Walk the entire csi_volumes table + iter, err := txn.Get("csi_volumes", "id") + + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + // Filter the iterator by ID prefix + f := func(raw interface{}) bool { + v, ok := raw.(*structs.CSIVolume) + if !ok { + return false + } + return !strings.HasPrefix(v.ID, prefix) + } + wrap := memdb.NewFilterIterator(iter, f) + return wrap, nil +} + // CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should // snapshot if it wants to also denormalize the plugins. func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) {