diff --git a/api/csi.go b/api/csi.go index 5168614ef..a3f4452fd 100644 --- a/api/csi.go +++ b/api/csi.go @@ -10,12 +10,12 @@ type CSIVolumes struct { client *Client } -// CSIVolumes returns a handle on the allocs endpoints. +// CSIVolumes returns a handle on the CSIVolumes endpoint func (c *Client) CSIVolumes() *CSIVolumes { return &CSIVolumes{client: c} } -// List returns all CSI volumes, ignoring driver +// List returns all CSI volumes func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, error) { var resp []*CSIVolumeListStub qm, err := v.client.query("/v1/csi/volumes", &resp, q) @@ -26,12 +26,12 @@ func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, er return resp, qm, nil } -// DriverList returns all CSI volumes for the specified driver -func (v *CSIVolumes) DriverList(driver string) ([]*CSIVolumeListStub, *QueryMeta, error) { - return v.List(&QueryOptions{Prefix: driver}) +// PluginList returns all CSI volumes for the specified plugin id +func (v *CSIVolumes) PluginList(pluginID string) ([]*CSIVolumeListStub, *QueryMeta, error) { + return v.List(&QueryOptions{Prefix: pluginID}) } -// Info is used to retrieve a single allocation. +// Info is used to retrieve a single CSIVolume func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, error) { var resp CSIVolume qm, err := v.client.query("/v1/csi/volume/"+id, &resp, q) @@ -41,13 +41,12 @@ func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, e return &resp, qm, nil } -func (v *CSIVolumes) Register(vol *CSIVolume, w *WriteOptions) error { +func (v *CSIVolumes) Register(vol *CSIVolume, w *WriteOptions) (*WriteMeta, error) { req := CSIVolumeRegisterRequest{ Volumes: []*CSIVolume{vol}, } - var resp struct{} - _, err := v.client.write("/v1/csi/volume/"+vol.ID, req, &resp, w) - return err + meta, err := v.client.write("/v1/csi/volume/"+vol.ID, req, nil, w) + return meta, err } func (v *CSIVolumes) Deregister(id string, w *WriteOptions) error { @@ -81,7 +80,6 @@ const ( // CSIVolume is used for serialization, see also nomad/structs/csi.go type CSIVolume struct { ID string - Driver string Namespace string Topologies []*CSITopology AccessMode CSIVolumeAccessMode @@ -92,16 +90,17 @@ type CSIVolume struct { // Healthy is true iff all the denormalized plugin health fields are true, and the // volume has not been marked for garbage collection - Healthy bool - VolumeGC time.Time - ControllerName string - ControllerHealthy bool - NodeHealthy int - NodeExpected int - ResourceExhausted time.Time + Healthy bool + VolumeGC time.Time + PluginID string + ControllersHealthy int + ControllersExpected int + NodesHealthy int + NodesExpected int + ResourceExhausted time.Time - CreatedIndex uint64 - ModifiedIndex uint64 + CreateIndex uint64 + ModifyIndex uint64 } type CSIVolumeIndexSort []*CSIVolumeListStub @@ -111,7 +110,7 @@ func (v CSIVolumeIndexSort) Len() int { } func (v CSIVolumeIndexSort) Less(i, j int) bool { - return v[i].CreatedIndex > v[j].CreatedIndex + return v[i].CreateIndex > v[j].CreateIndex } func (v CSIVolumeIndexSort) Swap(i, j int) { @@ -121,7 +120,6 @@ func (v CSIVolumeIndexSort) Swap(i, j int) { // CSIVolumeListStub omits allocations. See also nomad/structs/csi.go type CSIVolumeListStub struct { ID string - Driver string Namespace string Topologies []*CSITopology AccessMode CSIVolumeAccessMode @@ -129,16 +127,17 @@ type CSIVolumeListStub struct { // Healthy is true iff all the denormalized plugin health fields are true, and the // volume has not been marked for garbage collection - Healthy bool - VolumeGC time.Time - ControllerName string - ControllerHealthy bool - NodeHealthy int - NodeExpected int - ResourceExhausted time.Time + Healthy bool + VolumeGC time.Time + PluginID string + ControllersHealthy int + ControllersExpected int + NodesHealthy int + NodesExpected int + ResourceExhausted time.Time - CreatedIndex uint64 - ModifiedIndex uint64 + CreateIndex uint64 + ModifyIndex uint64 } type CSIVolumeRegisterRequest struct { @@ -150,3 +149,75 @@ type CSIVolumeDeregisterRequest struct { VolumeIDs []string WriteRequest } + +// CSI Plugins are jobs with plugin specific data +type CSIPlugins struct { + client *Client +} + +type CSIPlugin struct { + ID string + Type CSIPluginType + Namespace string + Jobs map[string]map[string]*Job + + ControllersHealthy int + Controllers map[string]*CSIInfo + NodesHealthy int + Nodes map[string]*CSIInfo + + CreateIndex uint64 + ModifyIndex uint64 +} + +type CSIPluginListStub struct { + ID string + Type CSIPluginType + JobIDs map[string]map[string]struct{} + ControllersHealthy int + ControllersExpected int + NodesHealthy int + NodesExpected int + CreateIndex uint64 + ModifyIndex uint64 +} + +type CSIPluginIndexSort []*CSIPluginListStub + +func (v CSIPluginIndexSort) Len() int { + return len(v) +} + +func (v CSIPluginIndexSort) Less(i, j int) bool { + return v[i].CreateIndex > v[j].CreateIndex +} + +func (v CSIPluginIndexSort) Swap(i, j int) { + v[i], v[j] = v[j], v[i] +} + +// CSIPlugins returns a handle on the CSIPlugins endpoint +func (c *Client) CSIPlugins() *CSIPlugins { + return &CSIPlugins{client: c} +} + +// List returns all CSI plugins +func (v *CSIPlugins) List(q *QueryOptions) ([]*CSIPluginListStub, *QueryMeta, error) { + var resp []*CSIPluginListStub + qm, err := v.client.query("/v1/csi/plugins", &resp, q) + if err != nil { + return nil, nil, err + } + sort.Sort(CSIPluginIndexSort(resp)) + return resp, qm, nil +} + +// Info is used to retrieve a single CSI Plugin Job +func (v *CSIPlugins) Info(id string, q *QueryOptions) (*CSIPlugin, *QueryMeta, error) { + var resp *CSIPlugin + qm, err := v.client.query("/v1/csi/plugin/"+id, &resp, q) + if err != nil { + return nil, nil, err + } + return resp, qm, nil +} diff --git a/api/csi_test.go b/api/csi_test.go index 4e13ed4f9..3557bd0ed 100644 --- a/api/csi_test.go +++ b/api/csi_test.go @@ -3,7 +3,7 @@ package api import ( "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestCSIVolumes_CRUD(t *testing.T) { @@ -14,9 +14,9 @@ func TestCSIVolumes_CRUD(t *testing.T) { // Successful empty result vols, qm, err := v.List(nil) - assert.NoError(t, err) - assert.NotEqual(t, 0, qm.LastIndex) - assert.Equal(t, 0, len(vols)) + require.NoError(t, err) + require.NotEqual(t, 0, qm.LastIndex) + require.Equal(t, 0, len(vols)) // Authorized QueryOpts. Use the root token to just bypass ACL details opts := &QueryOptions{ @@ -32,38 +32,89 @@ func TestCSIVolumes_CRUD(t *testing.T) { } // Register a volume - v.Register(&CSIVolume{ - ID: "DEADBEEF-63C7-407F-AE82-C99FBEF78FEB", - Driver: "minnie", + id := "DEADBEEF-31B5-8F78-7986-DD404FDA0CD1" + _, err = v.Register(&CSIVolume{ + ID: id, Namespace: "default", + PluginID: "adam", AccessMode: CSIVolumeAccessModeMultiNodeSingleWriter, AttachmentMode: CSIVolumeAttachmentModeFilesystem, Topologies: []*CSITopology{{Segments: map[string]string{"foo": "bar"}}}, }, wpts) + require.NoError(t, err) // Successful result with volumes vols, qm, err = v.List(opts) - assert.NoError(t, err) - assert.NotEqual(t, 0, qm.LastIndex) - assert.Equal(t, 1, len(vols)) + require.NoError(t, err) + require.NotEqual(t, 0, qm.LastIndex) + require.Equal(t, 1, len(vols)) // Successful info query - vol, qm, err := v.Info("DEADBEEF-63C7-407F-AE82-C99FBEF78FEB", opts) - assert.NoError(t, err) - assert.Equal(t, "minnie", vol.Driver) - assert.Equal(t, "bar", vol.Topologies[0].Segments["foo"]) + vol, qm, err := v.Info(id, opts) + require.NoError(t, err) + require.Equal(t, "bar", vol.Topologies[0].Segments["foo"]) // Deregister the volume - err = v.Deregister("DEADBEEF-63C7-407F-AE82-C99FBEF78FEB", wpts) - assert.NoError(t, err) + err = v.Deregister(id, wpts) + require.NoError(t, err) // Successful empty result vols, qm, err = v.List(nil) - assert.NoError(t, err) - assert.NotEqual(t, 0, qm.LastIndex) - assert.Equal(t, 0, len(vols)) + require.NoError(t, err) + require.NotEqual(t, 0, qm.LastIndex) + require.Equal(t, 0, len(vols)) // Failed info query - vol, qm, err = v.Info("DEADBEEF-63C7-407F-AE82-C99FBEF78FEB", opts) - assert.Error(t, err, "missing") + vol, qm, err = v.Info(id, opts) + require.Error(t, err, "missing") +} + +func TestCSIPlugins_viaJob(t *testing.T) { + t.Parallel() + c, s, root := makeACLClient(t, nil, nil) + defer s.Stop() + p := c.CSIPlugins() + + // Successful empty result + plugs, qm, err := p.List(nil) + require.NoError(t, err) + require.NotEqual(t, 0, qm.LastIndex) + require.Equal(t, 0, len(plugs)) + + // Authorized QueryOpts. Use the root token to just bypass ACL details + opts := &QueryOptions{ + Region: "global", + Namespace: "default", + AuthToken: root.SecretID, + } + + wpts := &WriteOptions{ + Region: "global", + Namespace: "default", + AuthToken: root.SecretID, + } + + // Register a plugin job + j := c.Jobs() + job := testJob() + job.Namespace = stringToPtr("default") + job.TaskGroups[0].Tasks[0].CSIPluginConfig = &TaskCSIPluginConfig{ + ID: "foo", + Type: "monolith", + MountDir: "/not-empty", + } + _, _, err = j.Register(job, wpts) + require.NoError(t, err) + + // Successful result with the plugin + plugs, qm, err = p.List(opts) + require.NoError(t, err) + require.NotEqual(t, 0, qm.LastIndex) + require.Equal(t, 1, len(plugs)) + + // Successful info query + plug, qm, err := p.Info("foo", opts) + require.NoError(t, err) + require.NotNil(t, plug.Jobs[*job.Namespace][*job.ID]) + require.Equal(t, *job.ID, *plug.Jobs[*job.Namespace][*job.ID].ID) } diff --git a/command/agent/csi_endpoint.go b/command/agent/csi_endpoint.go index 705157ec6..c6daad16f 100644 --- a/command/agent/csi_endpoint.go +++ b/command/agent/csi_endpoint.go @@ -29,10 +29,8 @@ func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Reque // CSIVolumeSpecificRequest dispatches GET and PUT func (s *HTTPServer) CSIVolumeSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Tokenize the suffix of the path to get the volume id reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/csi/volume/") - - // tokenize the suffix of the path to get the alloc id and find the action - // invoked on the alloc id tokens := strings.Split(reqSuffix, "/") if len(tokens) > 2 || len(tokens) < 1 { return nil, CodedError(404, resourceNotFoundErr) @@ -66,7 +64,7 @@ func (s *HTTPServer) csiVolumeGet(id string, resp http.ResponseWriter, req *http setMeta(resp, &out.QueryMeta) if out.Volume == nil { - return nil, CodedError(404, "alloc not found") + return nil, CodedError(404, "volume not found") } return out.Volume, nil @@ -116,3 +114,56 @@ func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *h return nil, nil } + +// CSIPluginsRequest lists CSI plugins +func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "GET" { + return nil, CodedError(405, ErrInvalidMethod) + } + + args := structs.CSIPluginListRequest{} + + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.CSIPluginListResponse + if err := s.agent.RPC("CSIPlugin.List", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + return out.Plugins, nil +} + +// CSIPluginSpecificRequest list the job with CSIInfo +func (s *HTTPServer) CSIPluginSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "GET" { + return nil, CodedError(405, ErrInvalidMethod) + } + + // Tokenize the suffix of the path to get the plugin id + reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/csi/plugin/") + tokens := strings.Split(reqSuffix, "/") + if len(tokens) > 2 || len(tokens) < 1 { + return nil, CodedError(404, resourceNotFoundErr) + } + id := tokens[0] + + args := structs.CSIPluginGetRequest{ID: id} + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.CSIPluginGetResponse + if err := s.agent.RPC("CSIPlugin.Get", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.Plugin == nil { + return nil, CodedError(404, "plugin not found") + } + + return out.Plugin, nil +} diff --git a/command/agent/http.go b/command/agent/http.go index 57100eca1..dcd270213 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -255,6 +255,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/csi/volumes", s.wrap(s.CSIVolumesRequest)) s.mux.HandleFunc("/v1/csi/volume/", s.wrap(s.CSIVolumeSpecificRequest)) + s.mux.HandleFunc("/v1/csi/plugins", s.wrap(s.CSIPluginsRequest)) + s.mux.HandleFunc("/v1/csi/plugin/", s.wrap(s.CSIPluginSpecificRequest)) s.mux.HandleFunc("/v1/acl/policies", s.wrap(s.ACLPoliciesRequest)) s.mux.HandleFunc("/v1/acl/policy/", s.wrap(s.ACLPolicySpecificRequest)) diff --git a/contributing/checklist-rpc-endpoint.md b/contributing/checklist-rpc-endpoint.md index 30b5d36e6..29ed912dc 100644 --- a/contributing/checklist-rpc-endpoint.md +++ b/contributing/checklist-rpc-endpoint.md @@ -15,6 +15,7 @@ Prefer adding a new message to changing any existing RPC messages. * RPCs are resolved by matching the method name for bound structs [net/rpc](https://golang.org/pkg/net/rpc/) * Check ACLs for security, list endpoints filter by ACL + * Register new RPC struct in `nomad/server.go` * Wrapper for the HTTP request in `command/agent/foo_endpoint.go` * Backwards compatibility requires a new endpoint, an upgraded client or server may be forwarding this request to an old server, diff --git a/nomad/csi_volume_endpoint.go b/nomad/csi_endpoint.go similarity index 63% rename from nomad/csi_volume_endpoint.go rename to nomad/csi_endpoint.go index e10b9a55e..c1d83266d 100644 --- a/nomad/csi_volume_endpoint.go +++ b/nomad/csi_endpoint.go @@ -61,10 +61,16 @@ func (srv *Server) WriteACLObj(args *structs.WriteRequest) (*acl.ACL, error) { return srv.QueryACLObj(opts) } -// replyCSIVolumeIndex sets the reply with the last index that modified the table csi_volumes -func (srv *Server) replySetCSIVolumeIndex(state *state.StateStore, reply *structs.QueryMeta) error { - // Use the last index that affected the table - index, err := state.Index("csi_volumes") +const ( + csiVolumeTable = "csi_volumes" + csiPluginTable = "csi_plugins" +) + +// replySetIndex sets the reply with the last index that modified the table +func (srv *Server) replySetIndex(table string, reply *structs.QueryMeta) error { + s := srv.fsm.State() + + index, err := s.Index(table) if err != nil { return err } @@ -98,8 +104,8 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV var err error var iter memdb.ResultIterator - if args.Driver != "" { - iter, err = state.CSIVolumesByDriver(ws, args.Driver) + if args.PluginID != "" { + iter, err = state.CSIVolumesByPluginID(ws, args.PluginID) } else { iter, err = state.CSIVolumes(ws) } @@ -117,7 +123,12 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV if raw == nil { break } + vol := raw.(*structs.CSIVolume) + vol, err := state.CSIVolumeDenormalizePlugins(ws, vol) + if err != nil { + return err + } // Filter on the request namespace to avoid ACL checks by volume if ns != "" && vol.Namespace != args.RequestNamespace() { @@ -136,7 +147,7 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV } } reply.Volumes = vs - return v.srv.replySetCSIVolumeIndex(state, &reply.QueryMeta) + return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta) }} return v.srv.blockingRPC(&opts) } @@ -168,12 +179,15 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol return err } - if vol == nil { - return structs.ErrMissingCSIVolumeID + if vol != nil { + vol, err = state.CSIVolumeDenormalize(ws, vol) + } + if err != nil { + return err } reply.Volume = vol - return v.srv.replySetCSIVolumeIndex(state, &reply.QueryMeta) + return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta) }} return v.srv.blockingRPC(&opts) } @@ -215,7 +229,7 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru return err } - return v.srv.replySetCSIVolumeIndex(state, &reply.QueryMeta) + return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta) } // Deregister removes a set of volumes @@ -248,5 +262,103 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply * return err } - return v.srv.replySetCSIVolumeIndex(state, &reply.QueryMeta) + return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta) +} + +// CSIPlugin wraps the structs.CSIPlugin with request data and server context +type CSIPlugin struct { + srv *Server + logger log.Logger +} + +// List replies with CSIPlugins, filtered by ACL access +func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIPluginListResponse) error { + if done, err := v.srv.forward("CSIPlugin.List", args, args, reply); done { + return err + } + + aclObj, err := v.srv.QueryACLObj(&args.QueryOptions) + if err != nil { + return err + } + + if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityCSIAccess) { + return structs.ErrPermissionDenied + } + + metricsStart := time.Now() + defer metrics.MeasureSince([]string{"nomad", "plugin", "list"}, metricsStart) + + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + // Query all plugins + iter, err := state.CSIPlugins(ws) + if err != nil { + return err + } + + // Collect results + var ps []*structs.CSIPluginListStub + for { + raw := iter.Next() + if raw == nil { + break + } + + plug := raw.(*structs.CSIPlugin) + + // FIXME we should filter the ACL access for the plugin's + // namespace, but plugins don't currently have namespaces + ps = append(ps, plug.Stub()) + } + + reply.Plugins = ps + return v.srv.replySetIndex(csiPluginTable, &reply.QueryMeta) + }} + return v.srv.blockingRPC(&opts) +} + +// Get fetches detailed information about a specific plugin +func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPluginGetResponse) error { + if done, err := v.srv.forward("CSIPlugin.Get", args, args, reply); done { + return err + } + + aclObj, err := v.srv.QueryACLObj(&args.QueryOptions) + if err != nil { + return err + } + + if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityCSIAccess) { + return structs.ErrPermissionDenied + } + + metricsStart := time.Now() + defer metrics.MeasureSince([]string{"nomad", "plugin", "get"}, metricsStart) + + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + plug, err := state.CSIPluginByID(ws, args.ID) + if err != nil { + return err + } + + if plug != nil { + plug, err = state.CSIPluginDenormalize(ws, plug) + } + if err != nil { + return err + } + + // FIXME we should re-check the ACL access for the plugin's + // namespace, but plugins don't currently have namespaces + + reply.Plugin = plug + return v.srv.replySetIndex(csiPluginTable, &reply.QueryMeta) + }} + return v.srv.blockingRPC(&opts) } diff --git a/nomad/csi_volume_endpoint_test.go b/nomad/csi_endpoint_test.go similarity index 69% rename from nomad/csi_volume_endpoint_test.go rename to nomad/csi_endpoint_test.go index a66cc5477..bb1cac005 100644 --- a/nomad/csi_volume_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -38,7 +38,7 @@ func TestCSIVolumeEndpoint_Get(t *testing.T) { Namespace: ns, AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - Driver: "minnie", + PluginID: "minnie", }} err := state.CSIVolumeRegister(0, vols) require.NoError(t, err) @@ -84,7 +84,7 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) { vols := []*structs.CSIVolume{{ ID: id0, Namespace: "notTheNamespace", - Driver: "minnie", + PluginID: "minnie", AccessMode: structs.CSIVolumeAccessModeMultiNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, Topologies: []*structs.CSITopology{{ @@ -124,7 +124,7 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) { require.Equal(t, vols[0].ID, resp2.Volume.ID) // Registration does not update - req1.Volumes[0].Driver = "adam" + req1.Volumes[0].PluginID = "adam" err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Register", req1, resp1) require.Error(t, err, "exists") @@ -143,7 +143,8 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) { // Volume is missing err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req2, resp2) - require.Error(t, err, "missing") + require.NoError(t, err) + require.Nil(t, resp2.Volume) } func TestCSIVolumeEndpoint_List(t *testing.T) { @@ -175,19 +176,19 @@ func TestCSIVolumeEndpoint_List(t *testing.T) { Namespace: ns, AccessMode: structs.CSIVolumeAccessModeMultiNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - Driver: "minnie", + PluginID: "minnie", }, { ID: id1, Namespace: ns, AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - Driver: "adam", + PluginID: "adam", }, { ID: id2, Namespace: ms, AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - Driver: "paddy", + PluginID: "paddy", }} err := state.CSIVolumeRegister(0, vols) require.NoError(t, err) @@ -211,9 +212,9 @@ func TestCSIVolumeEndpoint_List(t *testing.T) { } require.Equal(t, 0, len(ids)) - // Query by Driver + // Query by PluginID req = &structs.CSIVolumeListRequest{ - Driver: "adam", + PluginID: "adam", QueryOptions: structs.QueryOptions{ Region: "global", Namespace: ns, @@ -225,9 +226,9 @@ func TestCSIVolumeEndpoint_List(t *testing.T) { require.Equal(t, 1, len(resp.Volumes)) require.Equal(t, vols[1].ID, resp.Volumes[0].ID) - // Query by Driver, ACL filters all results + // Query by PluginID, ACL filters all results req = &structs.CSIVolumeListRequest{ - Driver: "paddy", + PluginID: "paddy", QueryOptions: structs.QueryOptions{ Region: "global", Namespace: ms, @@ -238,3 +239,93 @@ func TestCSIVolumeEndpoint_List(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(resp.Volumes)) } + +func TestCSIPluginEndpoint_RegisterViaJob(t *testing.T) { + t.Parallel() + srv, shutdown := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer shutdown() + testutil.WaitForLeader(t, srv.RPC) + + ns := structs.DefaultNamespace + + job := mock.Job() + job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ + ID: "foo", + Type: structs.CSIPluginTypeMonolith, + MountDir: "non-empty", + } + + state := srv.fsm.State() + state.BootstrapACLTokens(1, 0, mock.ACLManagementToken()) + srv.config.ACLEnabled = true + policy := mock.NamespacePolicy(ns, "", []string{ + acl.NamespaceCapabilityCSICreateVolume, + acl.NamespaceCapabilitySubmitJob, + }) + validToken := mock.CreatePolicyAndToken(t, state, 1001, acl.NamespaceCapabilityCSICreateVolume, policy) + + codec := rpcClient(t, srv) + + // Create the register request + req1 := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: ns, + AuthToken: validToken.SecretID, + }, + } + resp1 := &structs.JobRegisterResponse{} + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req1, resp1) + require.NoError(t, err) + require.NotEqual(t, 0, resp1.Index) + + // Get the plugin back out + policy = mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIAccess}) + getToken := mock.CreatePolicyAndToken(t, state, 1001, "csi-access", policy) + + req2 := &structs.CSIPluginGetRequest{ + ID: "foo", + QueryOptions: structs.QueryOptions{ + Region: "global", + AuthToken: getToken.SecretID, + }, + } + resp2 := &structs.CSIPluginGetResponse{} + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) + require.NoError(t, err) + require.NotEqual(t, 0, resp2.Index) + + // List plugins + req3 := &structs.CSIPluginListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + AuthToken: getToken.SecretID, + }, + } + resp3 := &structs.CSIPluginListResponse{} + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.List", req3, resp3) + require.NoError(t, err) + require.Equal(t, 1, len(resp3.Plugins)) + + // Deregistration works + req4 := &structs.JobDeregisterRequest{ + JobID: job.ID, + Purge: true, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: ns, + AuthToken: validToken.SecretID, + }, + } + resp4 := &structs.JobDeregisterResponse{} + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req4, resp4) + require.NoError(t, err) + + // Plugin is missing + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) + require.NoError(t, err) + require.Nil(t, resp2.Plugin) +} diff --git a/nomad/server.go b/nomad/server.go index 9dda975bf..41ffe8cbe 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -250,6 +250,7 @@ type endpoints struct { Plan *Plan Alloc *Alloc CSIVolume *CSIVolume + CSIPlugin *CSIPlugin Deployment *Deployment Region *Region Search *Search @@ -1094,6 +1095,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { s.staticEndpoints.Job = NewJobEndpoints(s) s.staticEndpoints.Node = &Node{srv: s, logger: s.logger.Named("client")} // Add but don't register s.staticEndpoints.CSIVolume = &CSIVolume{srv: s, logger: s.logger.Named("csi_volume")} + s.staticEndpoints.CSIPlugin = &CSIPlugin{srv: s, logger: s.logger.Named("csi_plugin")} s.staticEndpoints.Deployment = &Deployment{srv: s, logger: s.logger.Named("deployment")} s.staticEndpoints.Operator = &Operator{srv: s, logger: s.logger.Named("operator")} s.staticEndpoints.Periodic = &Periodic{srv: s, logger: s.logger.Named("periodic")} @@ -1123,6 +1125,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { server.Register(s.staticEndpoints.Eval) server.Register(s.staticEndpoints.Job) server.Register(s.staticEndpoints.CSIVolume) + server.Register(s.staticEndpoints.CSIPlugin) server.Register(s.staticEndpoints.Deployment) server.Register(s.staticEndpoints.Operator) server.Register(s.staticEndpoints.Periodic) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 3ea7321b8..8a7db3e6e 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -48,6 +48,7 @@ func init() { schedulerConfigTableSchema, clusterMetaTableSchema, csiVolumeTableSchema, + csiPluginTableSchema, }...) } @@ -679,13 +680,11 @@ func clusterMetaTableSchema() *memdb.TableSchema { } } +// CSIVolumes are identified by id globally, and searchable by driver func csiVolumeTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: "csi_volumes", Indexes: map[string]*memdb.IndexSchema{ - // Primary index is used for volume upsert - // and simple direct lookup. ID is required to be - // unique. "id": { Name: "id", AllowMissing: false, @@ -694,12 +693,29 @@ func csiVolumeTableSchema() *memdb.TableSchema { Field: "ID", }, }, - "driver": { - Name: "driver", + "plugin_id": { + Name: "plugin_id", AllowMissing: false, Unique: false, Indexer: &memdb.StringFieldIndex{ - Field: "Driver", + Field: "PluginID", + }, + }, + }, + } +} + +// CSIPlugins are identified by id globally, and searchable by driver +func csiPluginTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "csi_plugins", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "ID", }, }, }, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 74e1085ee..d8860de94 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -677,6 +677,9 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } + if err := upsertNodeCSIPlugins(txn, node, index); err != nil { + return fmt.Errorf("csi plugin update failed: %v", err) + } txn.Commit() return nil @@ -704,6 +707,11 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error { if err := txn.Delete("nodes", existing); err != nil { return fmt.Errorf("node delete failed: %s: %v", nodeID, err) } + + node := existing.(*structs.Node) + if err := deleteNodeCSIPlugins(txn, node, index); err != nil { + return fmt.Errorf("csi plugin delete failed: %v", err) + } } if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { @@ -931,6 +939,95 @@ func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEv } } +// upsertNodeCSIPlugins indexes csi plugins for volume retrieval, with health. It's called +// on upsertNodeEvents, so that event driven health changes are updated +func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) error { + if len(node.CSIControllerPlugins) == 0 && len(node.CSINodePlugins) == 0 { + return nil + } + + loop := func(info *structs.CSIInfo) error { + raw, err := txn.First("csi_plugins", "id", info.PluginID) + if err != nil { + return fmt.Errorf("csi_plugin lookup error: %s %v", info.PluginID, err) + } + + var plug *structs.CSIPlugin + if raw != nil { + plug = raw.(*structs.CSIPlugin).Copy(index) + } else { + plug = structs.NewCSIPlugin(info.PluginID, index) + } + + plug.AddPlugin(node.ID, info, index) + + err = txn.Insert("csi_plugins", plug) + if err != nil { + return fmt.Errorf("csi_plugins insert error: %v", err) + } + + return nil + } + + for _, info := range node.CSIControllerPlugins { + err := loop(info) + if err != nil { + return err + } + } + + for _, info := range node.CSINodePlugins { + err := loop(info) + if err != nil { + return err + } + } + + return nil +} + +// deleteNodeCSIPlugins cleans up CSIInfo node health status, called in DeleteNode +func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) error { + if len(node.CSIControllerPlugins) == 0 && len(node.CSINodePlugins) == 0 { + return nil + } + + names := map[string]struct{}{} + for _, info := range node.CSIControllerPlugins { + names[info.PluginID] = struct{}{} + } + for _, info := range node.CSINodePlugins { + names[info.PluginID] = struct{}{} + } + + for id := range names { + raw, err := txn.First("csi_plugins", "id", id) + if err != nil { + return fmt.Errorf("csi_plugins lookup error %s: %v", id, err) + } + if raw == nil { + return fmt.Errorf("csi_plugins missing plugin %s", id) + } + + plug := raw.(*structs.CSIPlugin).Copy(index) + plug.DeleteNode(node.ID, index) + + if plug.IsEmpty() { + err := txn.Delete("csi_plugins", plug) + if err != nil { + return fmt.Errorf("csi_plugins delete error: %v", err) + } + } + + err = txn.Insert("csi_plugins", plug) + if err != nil { + return fmt.Errorf("csi_plugins update error %s: %v", id, err) + } + } + + return nil +} + // NodeByID is used to lookup a node by ID func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) { txn := s.db.Txn(false) @@ -1068,6 +1165,10 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b return fmt.Errorf("unable to upsert job into job_version table: %v", err) } + if err := s.upsertJobCSIPlugins(index, job, txn); err != nil { + return fmt.Errorf("unable to upsert csi_plugins table: %v", err) + } + // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -1162,6 +1263,11 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn return err } + // Delete the csi_plugins + if err := s.deleteJobCSIPlugins(index, job, txn); err != nil { + return err + } + // Delete the job summary if _, err = txn.DeleteAll("job_summary", "id", namespace, jobID); err != nil { return fmt.Errorf("deleing job summary failed: %v", err) @@ -1511,21 +1617,26 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) return iter, nil } -// CSIVolumeRegister adds a volume to the server store, iff it's not new +// CSIVolumeRegister adds a volume to the server store, failing if it already exists func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error { txn := s.db.Txn(true) defer txn.Abort() for _, v := range volumes { // Check for volume existence - _, obj, err := txn.FirstWatch("csi_volumes", "id", v.ID) + obj, err := txn.First("csi_volumes", "id", v.ID) if err != nil { - return fmt.Errorf("volume existence check: %v", err) + return fmt.Errorf("volume existence check error: %v", err) } if obj != nil { return fmt.Errorf("volume exists: %s", v.ID) } + if v.CreateIndex == 0 { + v.CreateIndex = index + v.ModifyIndex = index + } + err = txn.Insert("csi_volumes", v) if err != nil { return fmt.Errorf("volume insert: %v", err) @@ -1546,19 +1657,22 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, id string) (*structs.CSIVo } ws.Add(watchCh) - if obj != nil { - v := obj.(*structs.CSIVolume) - return v, nil + if obj == nil { + return nil, nil } - return nil, nil + vol := obj.(*structs.CSIVolume) + // Health data is stale, so set this volume unhealthy until it's denormalized + vol.Healthy = false + + return vol, nil } -// CSIVolumes looks up the entire csi_volumes table -func (s *StateStore) CSIVolumesByDriver(ws memdb.WatchSet, driver string) (memdb.ResultIterator, error) { +// CSIVolumes looks up csi_volumes by pluginID +func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) - iter, err := txn.Get("csi_volumes", "driver", driver) + iter, err := txn.Get("csi_volumes", "plugin_id", pluginID) if err != nil { return nil, fmt.Errorf("volume lookup failed: %v", err) } @@ -1593,11 +1707,13 @@ func (s *StateStore) CSIVolumeClaim(index uint64, id string, alloc *structs.Allo return fmt.Errorf("volume not found: %s", id) } - volume, ok := row.(*structs.CSIVolume) + orig, ok := row.(*structs.CSIVolume) if !ok { return fmt.Errorf("volume row conversion error") } + volume := orig.Copy(index) + if !volume.Claim(claim, alloc) { return fmt.Errorf("volume max claim reached") } @@ -1634,6 +1750,222 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, ids []string) error { return nil } +// upsertJobCSIPlugins is called on UpsertJob and maintains the csi_plugin index of jobs +func (s *StateStore) upsertJobCSIPlugins(index uint64, job *structs.Job, txn *memdb.Txn) error { + ws := memdb.NewWatchSet() + plugs, err := s.csiPluginsByJob(ws, job, index) + if err != nil { + return fmt.Errorf("%v", err) + } + + // Append this job to all of them + for _, plug := range plugs { + if plug.CreateIndex != index { + plug = plug.Copy(index) + } + + plug.AddJob(job) + plug.ModifyIndex = index + err := txn.Insert("csi_plugins", plug) + if err != nil { + return err + } + } + + return nil +} + +// csiPluginsByJob finds or creates CSIPlugins identified by the configuration contained in job +func (s *StateStore) csiPluginsByJob(ws memdb.WatchSet, job *structs.Job, index uint64) (map[string]*structs.CSIPlugin, error) { + txn := s.db.Txn(false) + defer txn.Abort() + + plugs := map[string]*structs.CSIPlugin{} + + for _, tg := range job.TaskGroups { + for _, t := range tg.Tasks { + if t.CSIPluginConfig == nil { + continue + } + + plug, ok := plugs[t.CSIPluginConfig.ID] + if ok { + continue + } + + plug, err := s.CSIPluginByID(ws, t.CSIPluginConfig.ID) + if err != nil { + return nil, err + } + + if plug == nil { + plug = structs.NewCSIPlugin(t.CSIPluginConfig.ID, index) + plug.Type = t.CSIPluginConfig.Type + } + + plugs[t.CSIPluginConfig.ID] = plug + } + } + + return plugs, nil +} + +// deleteJobCSIPlugins is called on DeleteJob +func (s *StateStore) deleteJobCSIPlugins(index uint64, job *structs.Job, txn *memdb.Txn) error { + ws := memdb.NewWatchSet() + plugs, err := s.csiPluginsByJob(ws, job, index) + if err != nil { + return fmt.Errorf("%v", err) + } + + // Remove this job from each plugin. If the plugin has no jobs left, remove it + for _, plug := range plugs { + if plug.CreateIndex != index { + plug = plug.Copy(index) + } + + plug.DeleteJob(job) + if plug.IsEmpty() { + err = txn.Delete("csi_plugins", plug) + } else { + plug.ModifyIndex = index + err = txn.Insert("csi_plugins", plug) + } + if err != nil { + return fmt.Errorf("csi_plugins update: %v", err) + } + } + + return nil +} + +// CSIVolumeDenormalize takes a CSIVolume and denormalizes for the API +func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { + if vol == nil { + return nil, nil + } + + vol, err := s.CSIVolumeDenormalizePlugins(ws, vol) + if err != nil { + return nil, err + } + + return s.csiVolumeDenormalizeAllocs(ws, vol) +} + +// CSIVolumeDenormalize returns a CSIVolume with current health and plugins +func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { + if vol == nil { + return nil, nil + } + + // Lookup CSIPlugin, the health records, and calculate volume health + txn := s.db.Txn(false) + defer txn.Abort() + + plug, err := s.CSIPluginByID(ws, vol.PluginID) + if err != nil { + return nil, fmt.Errorf("plugin lookup error: %s %v", vol.PluginID, err) + } + if plug == nil { + vol.ControllersHealthy = 0 + vol.NodesHealthy = 0 + vol.Healthy = false + return vol, nil + } + + vol.ControllersHealthy = plug.ControllersHealthy + vol.NodesHealthy = plug.NodesHealthy + // This number is incorrect! The expected number of node plugins is actually this + + // the number of blocked evaluations for the jobs controlling these plugins + vol.ControllersExpected = len(plug.Controllers) + vol.NodesExpected = len(plug.Nodes) + + vol.Healthy = vol.ControllersHealthy > 0 && vol.NodesHealthy > 0 + + return vol, nil +} + +// csiVolumeDenormalizeAllocs returns a CSIVolume with allocations +func (s *StateStore) csiVolumeDenormalizeAllocs(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { + for id := range vol.ReadAllocs { + a, err := s.AllocByID(ws, id) + if err != nil { + return nil, err + } + vol.ReadAllocs[id] = a + } + + for id := range vol.WriteAllocs { + a, err := s.AllocByID(ws, id) + if err != nil { + return nil, err + } + vol.WriteAllocs[id] = a + } + + for id := range vol.PastAllocs { + a, err := s.AllocByID(ws, id) + if err != nil { + return nil, err + } + vol.PastAllocs[id] = a + } + + return vol, nil +} + +// CSIPlugins returns the unfiltered list of all plugin health status +func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) { + txn := s.db.Txn(false) + defer txn.Abort() + + iter, err := txn.Get("csi_plugins", "id") + if err != nil { + return nil, fmt.Errorf("csi_plugins lookup failed: %v", err) + } + + return iter, nil +} + +// CSIPluginByID returns the one named CSIPlugin +func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) { + txn := s.db.Txn(false) + defer txn.Abort() + + raw, err := txn.First("csi_plugins", "id", id) + if err != nil { + return nil, fmt.Errorf("csi_plugin lookup failed: %s %v", id, err) + } + + if raw == nil { + return nil, nil + } + + plug := raw.(*structs.CSIPlugin) + + return plug, nil +} + +// CSIPluginDenormalize returns a CSIPlugin with jobs +func (s *StateStore) CSIPluginDenormalize(ws memdb.WatchSet, plug *structs.CSIPlugin) (*structs.CSIPlugin, error) { + if plug == nil { + return nil, nil + } + + for ns, js := range plug.Jobs { + for id := range js { + j, err := s.JobByID(ws, ns, id) + if err != nil { + return nil, err + } + plug.Jobs[ns][id] = j + } + } + + return plug, nil +} + // UpsertPeriodicLaunch is used to register a launch or update it. func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error { txn := s.db.Txn(true) @@ -2448,8 +2780,8 @@ func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, termin return out, nil } -// AllocsByJob returns all the allocations by job id -func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error) { +// AllocsByJob returns allocations by job id +func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, anyCreateIndex bool) ([]*structs.Allocation, error) { txn := s.db.Txn(false) // Get the job @@ -2481,7 +2813,7 @@ func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all // If the allocation belongs to a job with the same ID but a different // create index and we are not getting all the allocations whose Jobs // matches the same Job ID then we skip it - if !all && job != nil && alloc.Job.CreateIndex != job.CreateIndex { + if !anyCreateIndex && job != nil && alloc.Job.CreateIndex != job.CreateIndex { continue } out = append(out, raw.(*structs.Allocation)) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index b5666927e..df24da193 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2830,18 +2830,20 @@ func TestStateStore_RestoreJobSummary(t *testing.T) { func TestStateStore_CSIVolume(t *testing.T) { state := testStateStore(t) - v0 := structs.CreateCSIVolume("foo") - v0.ID = "DEADBEEF-70AD-4672-9178-802BCA500C87" + id0, id1 := uuid.Generate(), uuid.Generate() + + v0 := structs.NewCSIVolume("foo") + v0.ID = id0 v0.Namespace = "default" - v0.Driver = "minnie" + v0.PluginID = "minnie" v0.Healthy = true v0.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter v0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem - v1 := structs.CreateCSIVolume("foo") - v1.ID = "BAADF00D-70AD-4672-9178-802BCA500C87" + v1 := structs.NewCSIVolume("foo") + v1.ID = id1 v1.Namespace = "default" - v1.Driver = "adam" + v1.PluginID = "adam" v1.Healthy = true v1.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter v1.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem @@ -2869,18 +2871,18 @@ func TestStateStore_CSIVolume(t *testing.T) { require.Equal(t, 2, len(vs)) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByDriver(ws, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, "minnie") require.NoError(t, err) vs = slurp(iter) require.Equal(t, 1, len(vs)) err = state.CSIVolumeDeregister(1, []string{ - "BAADF00D-70AD-4672-9178-802BCA500C87", + id1, }) require.NoError(t, err) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByDriver(ws, "adam") + iter, err = state.CSIVolumesByPluginID(ws, "adam") require.NoError(t, err) vs = slurp(iter) require.Equal(t, 0, len(vs)) @@ -2898,26 +2900,147 @@ func TestStateStore_CSIVolume(t *testing.T) { w := structs.CSIVolumeClaimWrite u := structs.CSIVolumeClaimRelease - err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, r) + err = state.CSIVolumeClaim(2, id0, a0, r) require.NoError(t, err) - err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a1, w) + err = state.CSIVolumeClaim(2, id0, a1, w) require.NoError(t, err) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByDriver(ws, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, "minnie") require.NoError(t, err) vs = slurp(iter) require.False(t, vs[0].CanWrite()) - err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, u) + err = state.CSIVolumeClaim(2, id0, a0, u) require.NoError(t, err) ws = memdb.NewWatchSet() - iter, err = state.CSIVolumesByDriver(ws, "minnie") + iter, err = state.CSIVolumesByPluginID(ws, "minnie") require.NoError(t, err) vs = slurp(iter) require.True(t, vs[0].CanReadOnly()) } +// TestStateStore_CSIPluginJobs creates plugin jobs and tests that they create a CSIPlugin +func TestStateStore_CSIPluginJobs(t *testing.T) { + index := uint64(999) + state := testStateStore(t) + testStateStore_CSIPluginJobs(t, index, state) +} + +func testStateStore_CSIPluginJobs(t *testing.T, index uint64, state *StateStore) (uint64, *StateStore) { + j0 := mock.Job() + j0.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ + ID: "foo", + Type: structs.CSIPluginTypeController, + } + + j1 := mock.Job() + j1.Type = structs.JobTypeSystem + j1.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ + ID: "foo", + Type: structs.CSIPluginTypeNode, + } + + index++ + err := state.UpsertJob(index, j0) + require.NoError(t, err) + + index++ + err = state.UpsertJob(index, j1) + require.NoError(t, err) + + // Get the plugin back out by id + ws := memdb.NewWatchSet() + plug, err := state.CSIPluginByID(ws, "foo") + require.NoError(t, err) + + require.Equal(t, "foo", plug.ID) + + jids := map[string]struct{}{j0.ID: struct{}{}, j1.ID: struct{}{}} + for jid := range plug.Jobs[structs.DefaultNamespace] { + delete(jids, jid) + } + require.Equal(t, 0, len(jids)) + + return index, state +} + +// TestStateStore_CSIPluginNodes uses the state from jobs, and uses node fingerprinting to update health +func TestStateStore_CSIPluginNodes(t *testing.T) { + index := uint64(999) + state := testStateStore(t) + index, state = testStateStore_CSIPluginJobs(t, index, state) + testStateStore_CSIPluginNodes(t, index, state) +} + +func testStateStore_CSIPluginNodes(t *testing.T, index uint64, state *StateStore) (uint64, *StateStore) { + // Create Nodes fingerprinting the plugins + ns := []*structs.Node{mock.Node(), mock.Node(), mock.Node()} + + for _, n := range ns { + index++ + err := state.UpsertNode(index, n) + require.NoError(t, err) + } + + // Fingerprint a running controller plugin + n0 := ns[0].Copy() + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ + "foo": { + PluginID: "foo", + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + + index++ + err := state.UpsertNode(index, n0) + require.NoError(t, err) + + // Fingerprint two running node plugins + for _, n := range ns[1:] { + n = n.Copy() + n.CSINodePlugins = map[string]*structs.CSIInfo{ + "foo": { + PluginID: "foo", + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + } + + ws := memdb.NewWatchSet() + plug, err := state.CSIPluginByID(ws, "foo") + require.NoError(t, err) + + require.Equal(t, "foo", plug.ID) + require.Equal(t, 1, plug.ControllersHealthy) + require.Equal(t, 2, plug.NodesHealthy) + + return index, state +} + +// TestStateStore_CSIPluginBackwards gets the node state first, and the job state second +func TestStateStore_CSIPluginBackwards(t *testing.T) { + index := uint64(999) + state := testStateStore(t) + index, state = testStateStore_CSIPluginNodes(t, index, state) + testStateStore_CSIPluginJobs(t, index, state) +} + func TestStateStore_Indexes(t *testing.T) { t.Parallel() diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index d511e9e75..bcb0a8845 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -135,9 +135,9 @@ func ValidCSIVolumeWriteAccessMode(accessMode CSIVolumeAccessMode) bool { } } +// CSIVolume is the full representation of a CSI Volume type CSIVolume struct { ID string - Driver string Namespace string Topologies []*CSITopology AccessMode CSIVolumeAccessMode @@ -150,66 +150,76 @@ type CSIVolume struct { // Healthy is true if all the denormalized plugin health fields are true, and the // volume has not been marked for garbage collection - Healthy bool - VolumeGC time.Time - ControllerName string - ControllerHealthy bool - Controller []*Job - NodeHealthy int - NodeExpected int - ResourceExhausted time.Time + Healthy bool + VolumeGC time.Time + PluginID string + ControllersHealthy int + ControllersExpected int + NodesHealthy int + NodesExpected int + ResourceExhausted time.Time - CreatedIndex uint64 - ModifiedIndex uint64 + CreateIndex uint64 + ModifyIndex uint64 } +// CSIVolListStub is partial representation of a CSI Volume for inclusion in lists type CSIVolListStub struct { - ID string - Driver string - Namespace string - Topologies []*CSITopology - AccessMode CSIVolumeAccessMode - AttachmentMode CSIVolumeAttachmentMode - CurrentReaders int - CurrentWriters int - Healthy bool - VolumeGC time.Time - ControllerName string - ControllerHealthy bool - NodeHealthy int - NodeExpected int - CreatedIndex uint64 - ModifiedIndex uint64 + ID string + Namespace string + Topologies []*CSITopology + AccessMode CSIVolumeAccessMode + AttachmentMode CSIVolumeAttachmentMode + CurrentReaders int + CurrentWriters int + Healthy bool + VolumeGC time.Time + PluginID string + ControllersHealthy int + ControllersExpected int + NodesHealthy int + NodesExpected int + CreateIndex uint64 + ModifyIndex uint64 } -func CreateCSIVolume(controllerName string) *CSIVolume { - return &CSIVolume{ - ControllerName: controllerName, - ReadAllocs: map[string]*Allocation{}, - WriteAllocs: map[string]*Allocation{}, - PastAllocs: map[string]*Allocation{}, - Topologies: []*CSITopology{}, +// NewCSIVolume creates the volume struct. No side-effects +func NewCSIVolume(pluginID string) *CSIVolume { + out := &CSIVolume{ + ID: pluginID, } + + out.newStructs() + return out +} + +func (v *CSIVolume) newStructs() { + if v.Topologies == nil { + v.Topologies = []*CSITopology{} + } + + v.ReadAllocs = map[string]*Allocation{} + v.WriteAllocs = map[string]*Allocation{} + v.PastAllocs = map[string]*Allocation{} } func (v *CSIVolume) Stub() *CSIVolListStub { stub := CSIVolListStub{ - ID: v.ID, - Driver: v.Driver, - Namespace: v.Namespace, - Topologies: v.Topologies, - AccessMode: v.AccessMode, - AttachmentMode: v.AttachmentMode, - CurrentReaders: len(v.ReadAllocs), - CurrentWriters: len(v.WriteAllocs), - Healthy: v.Healthy, - VolumeGC: v.VolumeGC, - ControllerName: v.ControllerName, - ControllerHealthy: v.ControllerHealthy, - NodeHealthy: v.NodeHealthy, - NodeExpected: v.NodeExpected, - CreatedIndex: v.CreatedIndex, - ModifiedIndex: v.ModifiedIndex, + ID: v.ID, + Namespace: v.Namespace, + Topologies: v.Topologies, + AccessMode: v.AccessMode, + AttachmentMode: v.AttachmentMode, + CurrentReaders: len(v.ReadAllocs), + CurrentWriters: len(v.WriteAllocs), + Healthy: v.Healthy, + VolumeGC: v.VolumeGC, + PluginID: v.PluginID, + ControllersHealthy: v.ControllersHealthy, + NodesHealthy: v.NodesHealthy, + NodesExpected: v.NodesExpected, + CreateIndex: v.CreateIndex, + ModifyIndex: v.ModifyIndex, } return &stub @@ -238,6 +248,29 @@ func (v *CSIVolume) CanWrite() bool { } } +// Copy returns a copy of the volume, which shares only the Topologies slice +func (v *CSIVolume) Copy(index uint64) *CSIVolume { + copy := *v + out := © + out.newStructs() + out.ModifyIndex = index + + for k, v := range v.ReadAllocs { + out.ReadAllocs[k] = v + } + + for k, v := range v.WriteAllocs { + out.WriteAllocs[k] = v + } + + for k, v := range v.PastAllocs { + out.PastAllocs[k] = v + } + + return out +} + +// Claim updates the allocations and changes the volume state func (v *CSIVolume) Claim(claim CSIVolumeClaimMode, alloc *Allocation) bool { switch claim { case CSIVolumeClaimRead: @@ -250,30 +283,39 @@ func (v *CSIVolume) Claim(claim CSIVolumeClaimMode, alloc *Allocation) bool { return false } +// ClaimRead marks an allocation as using a volume read-only func (v *CSIVolume) ClaimRead(alloc *Allocation) bool { if !v.CanReadOnly() { return false } - v.ReadAllocs[alloc.ID] = alloc + // Allocations are copy on write, so we want to keep the id but don't need the + // pointer. We'll get it from the db in denormalize. + v.ReadAllocs[alloc.ID] = nil delete(v.WriteAllocs, alloc.ID) delete(v.PastAllocs, alloc.ID) return true } +// ClaimWrite marks an allocation as using a volume as a writer func (v *CSIVolume) ClaimWrite(alloc *Allocation) bool { if !v.CanWrite() { return false } - v.WriteAllocs[alloc.ID] = alloc + // Allocations are copy on write, so we want to keep the id but don't need the + // pointer. We'll get it from the db in denormalize. + v.WriteAllocs[alloc.ID] = nil delete(v.ReadAllocs, alloc.ID) delete(v.PastAllocs, alloc.ID) return true } +// ClaimRelease is called when the allocation has terminated and already stopped using the volume func (v *CSIVolume) ClaimRelease(alloc *Allocation) bool { delete(v.ReadAllocs, alloc.ID) delete(v.WriteAllocs, alloc.ID) - v.PastAllocs[alloc.ID] = alloc + // Allocations are copy on write, so we want to keep the id but don't need the + // pointer. We'll get it from the db in denormalize. + v.PastAllocs[alloc.ID] = nil return true } @@ -292,11 +334,10 @@ func (v *CSIVolume) Equal(o *CSIVolume) bool { // Omit the plugin health fields, their values are controlled by plugin jobs if v.ID == o.ID && - v.Driver == o.Driver && v.Namespace == o.Namespace && v.AccessMode == o.AccessMode && v.AttachmentMode == o.AttachmentMode && - v.ControllerName == o.ControllerName { + v.PluginID == o.PluginID { // Setwise equality of topologies var ok bool for _, t := range v.Topologies { @@ -323,8 +364,8 @@ func (v *CSIVolume) Validate() error { if v.ID == "" { errs = append(errs, "missing volume id") } - if v.Driver == "" { - errs = append(errs, "missing driver") + if v.PluginID == "" { + errs = append(errs, "missing plugin id") } if v.Namespace == "" { errs = append(errs, "missing namespace") @@ -388,7 +429,7 @@ type CSIVolumeClaimRequest struct { } type CSIVolumeListRequest struct { - Driver string + PluginID string QueryOptions } @@ -406,3 +447,189 @@ type CSIVolumeGetResponse struct { Volume *CSIVolume QueryMeta } + +// CSIPlugin bundles job and info context for the plugin for clients +type CSIPlugin struct { + ID string + Type CSIPluginType + + // Jobs is updated by UpsertJob, and keeps an index of jobs containing node or + // controller tasks for this plugin. It is addressed by [job.Namespace][job.ID] + Jobs map[string]map[string]*Job + + ControllersHealthy int + Controllers map[string]*CSIInfo + NodesHealthy int + Nodes map[string]*CSIInfo + + CreateIndex uint64 + ModifyIndex uint64 +} + +// NewCSIPlugin creates the plugin struct. No side-effects +func NewCSIPlugin(id string, index uint64) *CSIPlugin { + out := &CSIPlugin{ + ID: id, + CreateIndex: index, + ModifyIndex: index, + } + + out.newStructs() + return out +} + +func (p *CSIPlugin) newStructs() { + p.Jobs = map[string]map[string]*Job{} + p.Controllers = map[string]*CSIInfo{} + p.Nodes = map[string]*CSIInfo{} +} + +func (p *CSIPlugin) Copy(index uint64) *CSIPlugin { + copy := *p + out := © + out.newStructs() + out.ModifyIndex = index + + for ns, js := range p.Jobs { + out.Jobs[ns] = map[string]*Job{} + + for jid, j := range js { + out.Jobs[ns][jid] = j + } + } + + for k, v := range p.Controllers { + out.Controllers[k] = v + } + + for k, v := range p.Nodes { + out.Nodes[k] = v + } + + return out +} + +// AddJob adds a job entry to the plugin +func (p *CSIPlugin) AddJob(job *Job) { + if _, ok := p.Jobs[job.Namespace]; !ok { + p.Jobs[job.Namespace] = map[string]*Job{} + } + p.Jobs[job.Namespace][job.ID] = nil +} + +func (p *CSIPlugin) DeleteJob(job *Job) { + delete(p.Jobs[job.Namespace], job.ID) +} + +// AddPlugin adds a single plugin running on the node. Called from state.NodeUpdate in a +// transaction +func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo, index uint64) { + if info.ControllerInfo != nil { + prev, ok := p.Controllers[nodeID] + if ok && prev.Healthy { + p.ControllersHealthy -= 1 + } + p.Controllers[nodeID] = info + if info.Healthy { + p.ControllersHealthy += 1 + } + } + + if info.NodeInfo != nil { + prev, ok := p.Nodes[nodeID] + if ok && prev.Healthy { + p.NodesHealthy -= 1 + } + p.Nodes[nodeID] = info + if info.Healthy { + p.NodesHealthy += 1 + } + } + + p.ModifyIndex = index +} + +// DeleteNode removes all plugins from the node. Called from state.DeleteNode in a +// transaction +func (p *CSIPlugin) DeleteNode(nodeID string, index uint64) { + prev, ok := p.Controllers[nodeID] + if ok && prev.Healthy { + p.ControllersHealthy -= 1 + } + delete(p.Controllers, nodeID) + + prev, ok = p.Nodes[nodeID] + if ok && prev.Healthy { + p.NodesHealthy -= 1 + } + delete(p.Nodes, nodeID) + + p.ModifyIndex = index +} + +type CSIPluginListStub struct { + ID string + Type CSIPluginType + JobIDs map[string]map[string]struct{} + ControllersHealthy int + ControllersExpected int + NodesHealthy int + NodesExpected int + CreateIndex uint64 + ModifyIndex uint64 +} + +func (p *CSIPlugin) Stub() *CSIPluginListStub { + ids := map[string]map[string]struct{}{} + for ns, js := range p.Jobs { + ids[ns] = map[string]struct{}{} + for id := range js { + ids[ns][id] = struct{}{} + } + } + + return &CSIPluginListStub{ + ID: p.ID, + Type: p.Type, + JobIDs: ids, + ControllersHealthy: p.ControllersHealthy, + ControllersExpected: len(p.Controllers), + NodesHealthy: p.NodesHealthy, + NodesExpected: len(p.Nodes), + CreateIndex: p.CreateIndex, + ModifyIndex: p.ModifyIndex, + } +} + +func (p *CSIPlugin) IsEmpty() bool { + if !(len(p.Controllers) == 0 && len(p.Nodes) == 0) { + return false + } + + empty := true + for _, m := range p.Jobs { + if len(m) > 0 { + empty = false + } + } + return empty +} + +type CSIPluginListRequest struct { + QueryOptions +} + +type CSIPluginListResponse struct { + Plugins []*CSIPluginListStub + QueryMeta +} + +type CSIPluginGetRequest struct { + ID string + QueryOptions +} + +type CSIPluginGetResponse struct { + Plugin *CSIPlugin + QueryMeta +} diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index 85fbaf58d..7119b7b72 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -7,7 +7,7 @@ import ( ) func TestCSIVolumeClaim(t *testing.T) { - vol := CreateCSIVolume("") + vol := NewCSIVolume("") vol.AccessMode = CSIVolumeAccessModeMultiNodeSingleWriter vol.Healthy = true diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index e6a63bfd7..2e3e1edd2 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -18,7 +18,6 @@ const ( errUnknownNomadVersion = "Unable to determine Nomad version" errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later" errMissingAllocID = "Missing allocation ID" - errMissingCSIVolumeID = "Missing Volume ID" // Prefix based errors that are used to check if the error is of a given // type. These errors should be created with the associated constructor. @@ -42,7 +41,6 @@ var ( ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion) ErrNodeLacksRpc = errors.New(errNodeLacksRpc) ErrMissingAllocID = errors.New(errMissingAllocID) - ErrMissingCSIVolumeID = errors.New(errMissingCSIVolumeID) ) // IsErrNoLeader returns whether the error is due to there being no leader.