csi: server-side plugin state tracking and api (#6966)

* structs: CSIPlugin indexes jobs acting as plugins and node updates

* schema: csi_plugins table for CSIPlugin

* nomad: csi_endpoint use vol.Denormalize, plugin requests

* nomad: csi_volume_endpoint: rename to csi_endpoint

* agent: add CSI plugin endpoints

* state_store_test: use generated ids to avoid t.Parallel conflicts

* contributing: add note about registering new RPC structs

* command: agent http register plugin lists

* api: CSI plugin queries, ControllerHealthy -> ControllersHealthy

* state_store: copy on write for volumes and plugins

* structs: copy on write for volumes and plugins

* state_store: CSIVolumeByID returns an unhealthy volume, denormalize

* nomad: csi_endpoint use CSIVolumeDenormalizePlugins

* structs: remove struct errors for missing objects

* nomad: csi_endpoint return nil for missing objects, not errors

* api: return meta from Register to avoid EOF error

* state_store: CSIVolumeDenormalize keep allocs in their own maps

* state_store: CSIVolumeDeregister error on missing volume

* state_store: CSIVolumeRegister set indexes

* nomad: csi_endpoint use CSIVolumeDenormalizePlugins tests
This commit is contained in:
Lang Martin
2020-01-28 10:28:34 -05:00
committed by Tim Gross
parent 1250d56333
commit 15ffae2798
14 changed files with 1252 additions and 174 deletions

View File

@@ -10,12 +10,12 @@ type CSIVolumes struct {
client *Client
}
// CSIVolumes returns a handle on the allocs endpoints.
// CSIVolumes returns a handle on the CSIVolumes endpoint
func (c *Client) CSIVolumes() *CSIVolumes {
return &CSIVolumes{client: c}
}
// List returns all CSI volumes, ignoring driver
// List returns all CSI volumes
func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, error) {
var resp []*CSIVolumeListStub
qm, err := v.client.query("/v1/csi/volumes", &resp, q)
@@ -26,12 +26,12 @@ func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, er
return resp, qm, nil
}
// DriverList returns all CSI volumes for the specified driver
func (v *CSIVolumes) DriverList(driver string) ([]*CSIVolumeListStub, *QueryMeta, error) {
return v.List(&QueryOptions{Prefix: driver})
// PluginList returns all CSI volumes for the specified plugin id
func (v *CSIVolumes) PluginList(pluginID string) ([]*CSIVolumeListStub, *QueryMeta, error) {
return v.List(&QueryOptions{Prefix: pluginID})
}
// Info is used to retrieve a single allocation.
// Info is used to retrieve a single CSIVolume
func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, error) {
var resp CSIVolume
qm, err := v.client.query("/v1/csi/volume/"+id, &resp, q)
@@ -41,13 +41,12 @@ func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, e
return &resp, qm, nil
}
func (v *CSIVolumes) Register(vol *CSIVolume, w *WriteOptions) error {
func (v *CSIVolumes) Register(vol *CSIVolume, w *WriteOptions) (*WriteMeta, error) {
req := CSIVolumeRegisterRequest{
Volumes: []*CSIVolume{vol},
}
var resp struct{}
_, err := v.client.write("/v1/csi/volume/"+vol.ID, req, &resp, w)
return err
meta, err := v.client.write("/v1/csi/volume/"+vol.ID, req, nil, w)
return meta, err
}
func (v *CSIVolumes) Deregister(id string, w *WriteOptions) error {
@@ -81,7 +80,6 @@ const (
// CSIVolume is used for serialization, see also nomad/structs/csi.go
type CSIVolume struct {
ID string
Driver string
Namespace string
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode
@@ -92,16 +90,17 @@ type CSIVolume struct {
// Healthy is true iff all the denormalized plugin health fields are true, and the
// volume has not been marked for garbage collection
Healthy bool
VolumeGC time.Time
ControllerName string
ControllerHealthy bool
NodeHealthy int
NodeExpected int
ResourceExhausted time.Time
Healthy bool
VolumeGC time.Time
PluginID string
ControllersHealthy int
ControllersExpected int
NodesHealthy int
NodesExpected int
ResourceExhausted time.Time
CreatedIndex uint64
ModifiedIndex uint64
CreateIndex uint64
ModifyIndex uint64
}
type CSIVolumeIndexSort []*CSIVolumeListStub
@@ -111,7 +110,7 @@ func (v CSIVolumeIndexSort) Len() int {
}
func (v CSIVolumeIndexSort) Less(i, j int) bool {
return v[i].CreatedIndex > v[j].CreatedIndex
return v[i].CreateIndex > v[j].CreateIndex
}
func (v CSIVolumeIndexSort) Swap(i, j int) {
@@ -121,7 +120,6 @@ func (v CSIVolumeIndexSort) Swap(i, j int) {
// CSIVolumeListStub omits allocations. See also nomad/structs/csi.go
type CSIVolumeListStub struct {
ID string
Driver string
Namespace string
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode
@@ -129,16 +127,17 @@ type CSIVolumeListStub struct {
// Healthy is true iff all the denormalized plugin health fields are true, and the
// volume has not been marked for garbage collection
Healthy bool
VolumeGC time.Time
ControllerName string
ControllerHealthy bool
NodeHealthy int
NodeExpected int
ResourceExhausted time.Time
Healthy bool
VolumeGC time.Time
PluginID string
ControllersHealthy int
ControllersExpected int
NodesHealthy int
NodesExpected int
ResourceExhausted time.Time
CreatedIndex uint64
ModifiedIndex uint64
CreateIndex uint64
ModifyIndex uint64
}
type CSIVolumeRegisterRequest struct {
@@ -150,3 +149,75 @@ type CSIVolumeDeregisterRequest struct {
VolumeIDs []string
WriteRequest
}
// CSI Plugins are jobs with plugin specific data
type CSIPlugins struct {
client *Client
}
type CSIPlugin struct {
ID string
Type CSIPluginType
Namespace string
Jobs map[string]map[string]*Job
ControllersHealthy int
Controllers map[string]*CSIInfo
NodesHealthy int
Nodes map[string]*CSIInfo
CreateIndex uint64
ModifyIndex uint64
}
type CSIPluginListStub struct {
ID string
Type CSIPluginType
JobIDs map[string]map[string]struct{}
ControllersHealthy int
ControllersExpected int
NodesHealthy int
NodesExpected int
CreateIndex uint64
ModifyIndex uint64
}
type CSIPluginIndexSort []*CSIPluginListStub
func (v CSIPluginIndexSort) Len() int {
return len(v)
}
func (v CSIPluginIndexSort) Less(i, j int) bool {
return v[i].CreateIndex > v[j].CreateIndex
}
func (v CSIPluginIndexSort) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
// CSIPlugins returns a handle on the CSIPlugins endpoint
func (c *Client) CSIPlugins() *CSIPlugins {
return &CSIPlugins{client: c}
}
// List returns all CSI plugins
func (v *CSIPlugins) List(q *QueryOptions) ([]*CSIPluginListStub, *QueryMeta, error) {
var resp []*CSIPluginListStub
qm, err := v.client.query("/v1/csi/plugins", &resp, q)
if err != nil {
return nil, nil, err
}
sort.Sort(CSIPluginIndexSort(resp))
return resp, qm, nil
}
// Info is used to retrieve a single CSI Plugin Job
func (v *CSIPlugins) Info(id string, q *QueryOptions) (*CSIPlugin, *QueryMeta, error) {
var resp *CSIPlugin
qm, err := v.client.query("/v1/csi/plugin/"+id, &resp, q)
if err != nil {
return nil, nil, err
}
return resp, qm, nil
}

View File

@@ -3,7 +3,7 @@ package api
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCSIVolumes_CRUD(t *testing.T) {
@@ -14,9 +14,9 @@ func TestCSIVolumes_CRUD(t *testing.T) {
// Successful empty result
vols, qm, err := v.List(nil)
assert.NoError(t, err)
assert.NotEqual(t, 0, qm.LastIndex)
assert.Equal(t, 0, len(vols))
require.NoError(t, err)
require.NotEqual(t, 0, qm.LastIndex)
require.Equal(t, 0, len(vols))
// Authorized QueryOpts. Use the root token to just bypass ACL details
opts := &QueryOptions{
@@ -32,38 +32,89 @@ func TestCSIVolumes_CRUD(t *testing.T) {
}
// Register a volume
v.Register(&CSIVolume{
ID: "DEADBEEF-63C7-407F-AE82-C99FBEF78FEB",
Driver: "minnie",
id := "DEADBEEF-31B5-8F78-7986-DD404FDA0CD1"
_, err = v.Register(&CSIVolume{
ID: id,
Namespace: "default",
PluginID: "adam",
AccessMode: CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: CSIVolumeAttachmentModeFilesystem,
Topologies: []*CSITopology{{Segments: map[string]string{"foo": "bar"}}},
}, wpts)
require.NoError(t, err)
// Successful result with volumes
vols, qm, err = v.List(opts)
assert.NoError(t, err)
assert.NotEqual(t, 0, qm.LastIndex)
assert.Equal(t, 1, len(vols))
require.NoError(t, err)
require.NotEqual(t, 0, qm.LastIndex)
require.Equal(t, 1, len(vols))
// Successful info query
vol, qm, err := v.Info("DEADBEEF-63C7-407F-AE82-C99FBEF78FEB", opts)
assert.NoError(t, err)
assert.Equal(t, "minnie", vol.Driver)
assert.Equal(t, "bar", vol.Topologies[0].Segments["foo"])
vol, qm, err := v.Info(id, opts)
require.NoError(t, err)
require.Equal(t, "bar", vol.Topologies[0].Segments["foo"])
// Deregister the volume
err = v.Deregister("DEADBEEF-63C7-407F-AE82-C99FBEF78FEB", wpts)
assert.NoError(t, err)
err = v.Deregister(id, wpts)
require.NoError(t, err)
// Successful empty result
vols, qm, err = v.List(nil)
assert.NoError(t, err)
assert.NotEqual(t, 0, qm.LastIndex)
assert.Equal(t, 0, len(vols))
require.NoError(t, err)
require.NotEqual(t, 0, qm.LastIndex)
require.Equal(t, 0, len(vols))
// Failed info query
vol, qm, err = v.Info("DEADBEEF-63C7-407F-AE82-C99FBEF78FEB", opts)
assert.Error(t, err, "missing")
vol, qm, err = v.Info(id, opts)
require.Error(t, err, "missing")
}
func TestCSIPlugins_viaJob(t *testing.T) {
t.Parallel()
c, s, root := makeACLClient(t, nil, nil)
defer s.Stop()
p := c.CSIPlugins()
// Successful empty result
plugs, qm, err := p.List(nil)
require.NoError(t, err)
require.NotEqual(t, 0, qm.LastIndex)
require.Equal(t, 0, len(plugs))
// Authorized QueryOpts. Use the root token to just bypass ACL details
opts := &QueryOptions{
Region: "global",
Namespace: "default",
AuthToken: root.SecretID,
}
wpts := &WriteOptions{
Region: "global",
Namespace: "default",
AuthToken: root.SecretID,
}
// Register a plugin job
j := c.Jobs()
job := testJob()
job.Namespace = stringToPtr("default")
job.TaskGroups[0].Tasks[0].CSIPluginConfig = &TaskCSIPluginConfig{
ID: "foo",
Type: "monolith",
MountDir: "/not-empty",
}
_, _, err = j.Register(job, wpts)
require.NoError(t, err)
// Successful result with the plugin
plugs, qm, err = p.List(opts)
require.NoError(t, err)
require.NotEqual(t, 0, qm.LastIndex)
require.Equal(t, 1, len(plugs))
// Successful info query
plug, qm, err := p.Info("foo", opts)
require.NoError(t, err)
require.NotNil(t, plug.Jobs[*job.Namespace][*job.ID])
require.Equal(t, *job.ID, *plug.Jobs[*job.Namespace][*job.ID].ID)
}

View File

@@ -29,10 +29,8 @@ func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Reque
// CSIVolumeSpecificRequest dispatches GET and PUT
func (s *HTTPServer) CSIVolumeSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Tokenize the suffix of the path to get the volume id
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/csi/volume/")
// tokenize the suffix of the path to get the alloc id and find the action
// invoked on the alloc id
tokens := strings.Split(reqSuffix, "/")
if len(tokens) > 2 || len(tokens) < 1 {
return nil, CodedError(404, resourceNotFoundErr)
@@ -66,7 +64,7 @@ func (s *HTTPServer) csiVolumeGet(id string, resp http.ResponseWriter, req *http
setMeta(resp, &out.QueryMeta)
if out.Volume == nil {
return nil, CodedError(404, "alloc not found")
return nil, CodedError(404, "volume not found")
}
return out.Volume, nil
@@ -116,3 +114,56 @@ func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *h
return nil, nil
}
// CSIPluginsRequest lists CSI plugins
func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.CSIPluginListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.CSIPluginListResponse
if err := s.agent.RPC("CSIPlugin.List", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out.Plugins, nil
}
// CSIPluginSpecificRequest list the job with CSIInfo
func (s *HTTPServer) CSIPluginSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
}
// Tokenize the suffix of the path to get the plugin id
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/csi/plugin/")
tokens := strings.Split(reqSuffix, "/")
if len(tokens) > 2 || len(tokens) < 1 {
return nil, CodedError(404, resourceNotFoundErr)
}
id := tokens[0]
args := structs.CSIPluginGetRequest{ID: id}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.CSIPluginGetResponse
if err := s.agent.RPC("CSIPlugin.Get", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
if out.Plugin == nil {
return nil, CodedError(404, "plugin not found")
}
return out.Plugin, nil
}

View File

@@ -255,6 +255,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/csi/volumes", s.wrap(s.CSIVolumesRequest))
s.mux.HandleFunc("/v1/csi/volume/", s.wrap(s.CSIVolumeSpecificRequest))
s.mux.HandleFunc("/v1/csi/plugins", s.wrap(s.CSIPluginsRequest))
s.mux.HandleFunc("/v1/csi/plugin/", s.wrap(s.CSIPluginSpecificRequest))
s.mux.HandleFunc("/v1/acl/policies", s.wrap(s.ACLPoliciesRequest))
s.mux.HandleFunc("/v1/acl/policy/", s.wrap(s.ACLPolicySpecificRequest))

View File

@@ -15,6 +15,7 @@ Prefer adding a new message to changing any existing RPC messages.
* RPCs are resolved by matching the method name for bound structs
[net/rpc](https://golang.org/pkg/net/rpc/)
* Check ACLs for security, list endpoints filter by ACL
* Register new RPC struct in `nomad/server.go`
* Wrapper for the HTTP request in `command/agent/foo_endpoint.go`
* Backwards compatibility requires a new endpoint, an upgraded
client or server may be forwarding this request to an old server,

View File

@@ -61,10 +61,16 @@ func (srv *Server) WriteACLObj(args *structs.WriteRequest) (*acl.ACL, error) {
return srv.QueryACLObj(opts)
}
// replyCSIVolumeIndex sets the reply with the last index that modified the table csi_volumes
func (srv *Server) replySetCSIVolumeIndex(state *state.StateStore, reply *structs.QueryMeta) error {
// Use the last index that affected the table
index, err := state.Index("csi_volumes")
const (
csiVolumeTable = "csi_volumes"
csiPluginTable = "csi_plugins"
)
// replySetIndex sets the reply with the last index that modified the table
func (srv *Server) replySetIndex(table string, reply *structs.QueryMeta) error {
s := srv.fsm.State()
index, err := s.Index(table)
if err != nil {
return err
}
@@ -98,8 +104,8 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
var err error
var iter memdb.ResultIterator
if args.Driver != "" {
iter, err = state.CSIVolumesByDriver(ws, args.Driver)
if args.PluginID != "" {
iter, err = state.CSIVolumesByPluginID(ws, args.PluginID)
} else {
iter, err = state.CSIVolumes(ws)
}
@@ -117,7 +123,12 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
if raw == nil {
break
}
vol := raw.(*structs.CSIVolume)
vol, err := state.CSIVolumeDenormalizePlugins(ws, vol)
if err != nil {
return err
}
// Filter on the request namespace to avoid ACL checks by volume
if ns != "" && vol.Namespace != args.RequestNamespace() {
@@ -136,7 +147,7 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
}
}
reply.Volumes = vs
return v.srv.replySetCSIVolumeIndex(state, &reply.QueryMeta)
return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta)
}}
return v.srv.blockingRPC(&opts)
}
@@ -168,12 +179,15 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol
return err
}
if vol == nil {
return structs.ErrMissingCSIVolumeID
if vol != nil {
vol, err = state.CSIVolumeDenormalize(ws, vol)
}
if err != nil {
return err
}
reply.Volume = vol
return v.srv.replySetCSIVolumeIndex(state, &reply.QueryMeta)
return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta)
}}
return v.srv.blockingRPC(&opts)
}
@@ -215,7 +229,7 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
return err
}
return v.srv.replySetCSIVolumeIndex(state, &reply.QueryMeta)
return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta)
}
// Deregister removes a set of volumes
@@ -248,5 +262,103 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *
return err
}
return v.srv.replySetCSIVolumeIndex(state, &reply.QueryMeta)
return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta)
}
// CSIPlugin wraps the structs.CSIPlugin with request data and server context
type CSIPlugin struct {
srv *Server
logger log.Logger
}
// List replies with CSIPlugins, filtered by ACL access
func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIPluginListResponse) error {
if done, err := v.srv.forward("CSIPlugin.List", args, args, reply); done {
return err
}
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions)
if err != nil {
return err
}
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityCSIAccess) {
return structs.ErrPermissionDenied
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "plugin", "list"}, metricsStart)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Query all plugins
iter, err := state.CSIPlugins(ws)
if err != nil {
return err
}
// Collect results
var ps []*structs.CSIPluginListStub
for {
raw := iter.Next()
if raw == nil {
break
}
plug := raw.(*structs.CSIPlugin)
// FIXME we should filter the ACL access for the plugin's
// namespace, but plugins don't currently have namespaces
ps = append(ps, plug.Stub())
}
reply.Plugins = ps
return v.srv.replySetIndex(csiPluginTable, &reply.QueryMeta)
}}
return v.srv.blockingRPC(&opts)
}
// Get fetches detailed information about a specific plugin
func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPluginGetResponse) error {
if done, err := v.srv.forward("CSIPlugin.Get", args, args, reply); done {
return err
}
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions)
if err != nil {
return err
}
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityCSIAccess) {
return structs.ErrPermissionDenied
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "plugin", "get"}, metricsStart)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
plug, err := state.CSIPluginByID(ws, args.ID)
if err != nil {
return err
}
if plug != nil {
plug, err = state.CSIPluginDenormalize(ws, plug)
}
if err != nil {
return err
}
// FIXME we should re-check the ACL access for the plugin's
// namespace, but plugins don't currently have namespaces
reply.Plugin = plug
return v.srv.replySetIndex(csiPluginTable, &reply.QueryMeta)
}}
return v.srv.blockingRPC(&opts)
}

View File

@@ -38,7 +38,7 @@ func TestCSIVolumeEndpoint_Get(t *testing.T) {
Namespace: ns,
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
Driver: "minnie",
PluginID: "minnie",
}}
err := state.CSIVolumeRegister(0, vols)
require.NoError(t, err)
@@ -84,7 +84,7 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) {
vols := []*structs.CSIVolume{{
ID: id0,
Namespace: "notTheNamespace",
Driver: "minnie",
PluginID: "minnie",
AccessMode: structs.CSIVolumeAccessModeMultiNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
Topologies: []*structs.CSITopology{{
@@ -124,7 +124,7 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) {
require.Equal(t, vols[0].ID, resp2.Volume.ID)
// Registration does not update
req1.Volumes[0].Driver = "adam"
req1.Volumes[0].PluginID = "adam"
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Register", req1, resp1)
require.Error(t, err, "exists")
@@ -143,7 +143,8 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) {
// Volume is missing
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req2, resp2)
require.Error(t, err, "missing")
require.NoError(t, err)
require.Nil(t, resp2.Volume)
}
func TestCSIVolumeEndpoint_List(t *testing.T) {
@@ -175,19 +176,19 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
Namespace: ns,
AccessMode: structs.CSIVolumeAccessModeMultiNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
Driver: "minnie",
PluginID: "minnie",
}, {
ID: id1,
Namespace: ns,
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
Driver: "adam",
PluginID: "adam",
}, {
ID: id2,
Namespace: ms,
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
Driver: "paddy",
PluginID: "paddy",
}}
err := state.CSIVolumeRegister(0, vols)
require.NoError(t, err)
@@ -211,9 +212,9 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
}
require.Equal(t, 0, len(ids))
// Query by Driver
// Query by PluginID
req = &structs.CSIVolumeListRequest{
Driver: "adam",
PluginID: "adam",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: ns,
@@ -225,9 +226,9 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
require.Equal(t, 1, len(resp.Volumes))
require.Equal(t, vols[1].ID, resp.Volumes[0].ID)
// Query by Driver, ACL filters all results
// Query by PluginID, ACL filters all results
req = &structs.CSIVolumeListRequest{
Driver: "paddy",
PluginID: "paddy",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: ms,
@@ -238,3 +239,93 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(resp.Volumes))
}
func TestCSIPluginEndpoint_RegisterViaJob(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
ns := structs.DefaultNamespace
job := mock.Job()
job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{
ID: "foo",
Type: structs.CSIPluginTypeMonolith,
MountDir: "non-empty",
}
state := srv.fsm.State()
state.BootstrapACLTokens(1, 0, mock.ACLManagementToken())
srv.config.ACLEnabled = true
policy := mock.NamespacePolicy(ns, "", []string{
acl.NamespaceCapabilityCSICreateVolume,
acl.NamespaceCapabilitySubmitJob,
})
validToken := mock.CreatePolicyAndToken(t, state, 1001, acl.NamespaceCapabilityCSICreateVolume, policy)
codec := rpcClient(t, srv)
// Create the register request
req1 := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: ns,
AuthToken: validToken.SecretID,
},
}
resp1 := &structs.JobRegisterResponse{}
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req1, resp1)
require.NoError(t, err)
require.NotEqual(t, 0, resp1.Index)
// Get the plugin back out
policy = mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIAccess})
getToken := mock.CreatePolicyAndToken(t, state, 1001, "csi-access", policy)
req2 := &structs.CSIPluginGetRequest{
ID: "foo",
QueryOptions: structs.QueryOptions{
Region: "global",
AuthToken: getToken.SecretID,
},
}
resp2 := &structs.CSIPluginGetResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2)
require.NoError(t, err)
require.NotEqual(t, 0, resp2.Index)
// List plugins
req3 := &structs.CSIPluginListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
AuthToken: getToken.SecretID,
},
}
resp3 := &structs.CSIPluginListResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.List", req3, resp3)
require.NoError(t, err)
require.Equal(t, 1, len(resp3.Plugins))
// Deregistration works
req4 := &structs.JobDeregisterRequest{
JobID: job.ID,
Purge: true,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: ns,
AuthToken: validToken.SecretID,
},
}
resp4 := &structs.JobDeregisterResponse{}
err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req4, resp4)
require.NoError(t, err)
// Plugin is missing
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2)
require.NoError(t, err)
require.Nil(t, resp2.Plugin)
}

View File

@@ -250,6 +250,7 @@ type endpoints struct {
Plan *Plan
Alloc *Alloc
CSIVolume *CSIVolume
CSIPlugin *CSIPlugin
Deployment *Deployment
Region *Region
Search *Search
@@ -1094,6 +1095,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
s.staticEndpoints.Job = NewJobEndpoints(s)
s.staticEndpoints.Node = &Node{srv: s, logger: s.logger.Named("client")} // Add but don't register
s.staticEndpoints.CSIVolume = &CSIVolume{srv: s, logger: s.logger.Named("csi_volume")}
s.staticEndpoints.CSIPlugin = &CSIPlugin{srv: s, logger: s.logger.Named("csi_plugin")}
s.staticEndpoints.Deployment = &Deployment{srv: s, logger: s.logger.Named("deployment")}
s.staticEndpoints.Operator = &Operator{srv: s, logger: s.logger.Named("operator")}
s.staticEndpoints.Periodic = &Periodic{srv: s, logger: s.logger.Named("periodic")}
@@ -1123,6 +1125,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
server.Register(s.staticEndpoints.Eval)
server.Register(s.staticEndpoints.Job)
server.Register(s.staticEndpoints.CSIVolume)
server.Register(s.staticEndpoints.CSIPlugin)
server.Register(s.staticEndpoints.Deployment)
server.Register(s.staticEndpoints.Operator)
server.Register(s.staticEndpoints.Periodic)

View File

@@ -48,6 +48,7 @@ func init() {
schedulerConfigTableSchema,
clusterMetaTableSchema,
csiVolumeTableSchema,
csiPluginTableSchema,
}...)
}
@@ -679,13 +680,11 @@ func clusterMetaTableSchema() *memdb.TableSchema {
}
}
// CSIVolumes are identified by id globally, and searchable by driver
func csiVolumeTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "csi_volumes",
Indexes: map[string]*memdb.IndexSchema{
// Primary index is used for volume upsert
// and simple direct lookup. ID is required to be
// unique.
"id": {
Name: "id",
AllowMissing: false,
@@ -694,12 +693,29 @@ func csiVolumeTableSchema() *memdb.TableSchema {
Field: "ID",
},
},
"driver": {
Name: "driver",
"plugin_id": {
Name: "plugin_id",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Driver",
Field: "PluginID",
},
},
},
}
}
// CSIPlugins are identified by id globally, and searchable by driver
func csiPluginTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "csi_plugins",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
},
},
},

View File

@@ -677,6 +677,9 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
if err := upsertNodeCSIPlugins(txn, node, index); err != nil {
return fmt.Errorf("csi plugin update failed: %v", err)
}
txn.Commit()
return nil
@@ -704,6 +707,11 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error {
if err := txn.Delete("nodes", existing); err != nil {
return fmt.Errorf("node delete failed: %s: %v", nodeID, err)
}
node := existing.(*structs.Node)
if err := deleteNodeCSIPlugins(txn, node, index); err != nil {
return fmt.Errorf("csi plugin delete failed: %v", err)
}
}
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
@@ -931,6 +939,95 @@ func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEv
}
}
// upsertNodeCSIPlugins indexes csi plugins for volume retrieval, with health. It's called
// on upsertNodeEvents, so that event driven health changes are updated
func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) error {
if len(node.CSIControllerPlugins) == 0 && len(node.CSINodePlugins) == 0 {
return nil
}
loop := func(info *structs.CSIInfo) error {
raw, err := txn.First("csi_plugins", "id", info.PluginID)
if err != nil {
return fmt.Errorf("csi_plugin lookup error: %s %v", info.PluginID, err)
}
var plug *structs.CSIPlugin
if raw != nil {
plug = raw.(*structs.CSIPlugin).Copy(index)
} else {
plug = structs.NewCSIPlugin(info.PluginID, index)
}
plug.AddPlugin(node.ID, info, index)
err = txn.Insert("csi_plugins", plug)
if err != nil {
return fmt.Errorf("csi_plugins insert error: %v", err)
}
return nil
}
for _, info := range node.CSIControllerPlugins {
err := loop(info)
if err != nil {
return err
}
}
for _, info := range node.CSINodePlugins {
err := loop(info)
if err != nil {
return err
}
}
return nil
}
// deleteNodeCSIPlugins cleans up CSIInfo node health status, called in DeleteNode
func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) error {
if len(node.CSIControllerPlugins) == 0 && len(node.CSINodePlugins) == 0 {
return nil
}
names := map[string]struct{}{}
for _, info := range node.CSIControllerPlugins {
names[info.PluginID] = struct{}{}
}
for _, info := range node.CSINodePlugins {
names[info.PluginID] = struct{}{}
}
for id := range names {
raw, err := txn.First("csi_plugins", "id", id)
if err != nil {
return fmt.Errorf("csi_plugins lookup error %s: %v", id, err)
}
if raw == nil {
return fmt.Errorf("csi_plugins missing plugin %s", id)
}
plug := raw.(*structs.CSIPlugin).Copy(index)
plug.DeleteNode(node.ID, index)
if plug.IsEmpty() {
err := txn.Delete("csi_plugins", plug)
if err != nil {
return fmt.Errorf("csi_plugins delete error: %v", err)
}
}
err = txn.Insert("csi_plugins", plug)
if err != nil {
return fmt.Errorf("csi_plugins update error %s: %v", id, err)
}
}
return nil
}
// NodeByID is used to lookup a node by ID
func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) {
txn := s.db.Txn(false)
@@ -1068,6 +1165,10 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b
return fmt.Errorf("unable to upsert job into job_version table: %v", err)
}
if err := s.upsertJobCSIPlugins(index, job, txn); err != nil {
return fmt.Errorf("unable to upsert csi_plugins table: %v", err)
}
// Insert the job
if err := txn.Insert("jobs", job); err != nil {
return fmt.Errorf("job insert failed: %v", err)
@@ -1162,6 +1263,11 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
return err
}
// Delete the csi_plugins
if err := s.deleteJobCSIPlugins(index, job, txn); err != nil {
return err
}
// Delete the job summary
if _, err = txn.DeleteAll("job_summary", "id", namespace, jobID); err != nil {
return fmt.Errorf("deleing job summary failed: %v", err)
@@ -1511,21 +1617,26 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string)
return iter, nil
}
// CSIVolumeRegister adds a volume to the server store, iff it's not new
// CSIVolumeRegister adds a volume to the server store, failing if it already exists
func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error {
txn := s.db.Txn(true)
defer txn.Abort()
for _, v := range volumes {
// Check for volume existence
_, obj, err := txn.FirstWatch("csi_volumes", "id", v.ID)
obj, err := txn.First("csi_volumes", "id", v.ID)
if err != nil {
return fmt.Errorf("volume existence check: %v", err)
return fmt.Errorf("volume existence check error: %v", err)
}
if obj != nil {
return fmt.Errorf("volume exists: %s", v.ID)
}
if v.CreateIndex == 0 {
v.CreateIndex = index
v.ModifyIndex = index
}
err = txn.Insert("csi_volumes", v)
if err != nil {
return fmt.Errorf("volume insert: %v", err)
@@ -1546,19 +1657,22 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, id string) (*structs.CSIVo
}
ws.Add(watchCh)
if obj != nil {
v := obj.(*structs.CSIVolume)
return v, nil
if obj == nil {
return nil, nil
}
return nil, nil
vol := obj.(*structs.CSIVolume)
// Health data is stale, so set this volume unhealthy until it's denormalized
vol.Healthy = false
return vol, nil
}
// CSIVolumes looks up the entire csi_volumes table
func (s *StateStore) CSIVolumesByDriver(ws memdb.WatchSet, driver string) (memdb.ResultIterator, error) {
// CSIVolumes looks up csi_volumes by pluginID
func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
iter, err := txn.Get("csi_volumes", "driver", driver)
iter, err := txn.Get("csi_volumes", "plugin_id", pluginID)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %v", err)
}
@@ -1593,11 +1707,13 @@ func (s *StateStore) CSIVolumeClaim(index uint64, id string, alloc *structs.Allo
return fmt.Errorf("volume not found: %s", id)
}
volume, ok := row.(*structs.CSIVolume)
orig, ok := row.(*structs.CSIVolume)
if !ok {
return fmt.Errorf("volume row conversion error")
}
volume := orig.Copy(index)
if !volume.Claim(claim, alloc) {
return fmt.Errorf("volume max claim reached")
}
@@ -1634,6 +1750,222 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, ids []string) error {
return nil
}
// upsertJobCSIPlugins is called on UpsertJob and maintains the csi_plugin index of jobs
func (s *StateStore) upsertJobCSIPlugins(index uint64, job *structs.Job, txn *memdb.Txn) error {
ws := memdb.NewWatchSet()
plugs, err := s.csiPluginsByJob(ws, job, index)
if err != nil {
return fmt.Errorf("%v", err)
}
// Append this job to all of them
for _, plug := range plugs {
if plug.CreateIndex != index {
plug = plug.Copy(index)
}
plug.AddJob(job)
plug.ModifyIndex = index
err := txn.Insert("csi_plugins", plug)
if err != nil {
return err
}
}
return nil
}
// csiPluginsByJob finds or creates CSIPlugins identified by the configuration contained in job
func (s *StateStore) csiPluginsByJob(ws memdb.WatchSet, job *structs.Job, index uint64) (map[string]*structs.CSIPlugin, error) {
txn := s.db.Txn(false)
defer txn.Abort()
plugs := map[string]*structs.CSIPlugin{}
for _, tg := range job.TaskGroups {
for _, t := range tg.Tasks {
if t.CSIPluginConfig == nil {
continue
}
plug, ok := plugs[t.CSIPluginConfig.ID]
if ok {
continue
}
plug, err := s.CSIPluginByID(ws, t.CSIPluginConfig.ID)
if err != nil {
return nil, err
}
if plug == nil {
plug = structs.NewCSIPlugin(t.CSIPluginConfig.ID, index)
plug.Type = t.CSIPluginConfig.Type
}
plugs[t.CSIPluginConfig.ID] = plug
}
}
return plugs, nil
}
// deleteJobCSIPlugins is called on DeleteJob
func (s *StateStore) deleteJobCSIPlugins(index uint64, job *structs.Job, txn *memdb.Txn) error {
ws := memdb.NewWatchSet()
plugs, err := s.csiPluginsByJob(ws, job, index)
if err != nil {
return fmt.Errorf("%v", err)
}
// Remove this job from each plugin. If the plugin has no jobs left, remove it
for _, plug := range plugs {
if plug.CreateIndex != index {
plug = plug.Copy(index)
}
plug.DeleteJob(job)
if plug.IsEmpty() {
err = txn.Delete("csi_plugins", plug)
} else {
plug.ModifyIndex = index
err = txn.Insert("csi_plugins", plug)
}
if err != nil {
return fmt.Errorf("csi_plugins update: %v", err)
}
}
return nil
}
// CSIVolumeDenormalize takes a CSIVolume and denormalizes for the API
func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
if vol == nil {
return nil, nil
}
vol, err := s.CSIVolumeDenormalizePlugins(ws, vol)
if err != nil {
return nil, err
}
return s.csiVolumeDenormalizeAllocs(ws, vol)
}
// CSIVolumeDenormalize returns a CSIVolume with current health and plugins
func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
if vol == nil {
return nil, nil
}
// Lookup CSIPlugin, the health records, and calculate volume health
txn := s.db.Txn(false)
defer txn.Abort()
plug, err := s.CSIPluginByID(ws, vol.PluginID)
if err != nil {
return nil, fmt.Errorf("plugin lookup error: %s %v", vol.PluginID, err)
}
if plug == nil {
vol.ControllersHealthy = 0
vol.NodesHealthy = 0
vol.Healthy = false
return vol, nil
}
vol.ControllersHealthy = plug.ControllersHealthy
vol.NodesHealthy = plug.NodesHealthy
// This number is incorrect! The expected number of node plugins is actually this +
// the number of blocked evaluations for the jobs controlling these plugins
vol.ControllersExpected = len(plug.Controllers)
vol.NodesExpected = len(plug.Nodes)
vol.Healthy = vol.ControllersHealthy > 0 && vol.NodesHealthy > 0
return vol, nil
}
// csiVolumeDenormalizeAllocs returns a CSIVolume with allocations
func (s *StateStore) csiVolumeDenormalizeAllocs(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
for id := range vol.ReadAllocs {
a, err := s.AllocByID(ws, id)
if err != nil {
return nil, err
}
vol.ReadAllocs[id] = a
}
for id := range vol.WriteAllocs {
a, err := s.AllocByID(ws, id)
if err != nil {
return nil, err
}
vol.WriteAllocs[id] = a
}
for id := range vol.PastAllocs {
a, err := s.AllocByID(ws, id)
if err != nil {
return nil, err
}
vol.PastAllocs[id] = a
}
return vol, nil
}
// CSIPlugins returns the unfiltered list of all plugin health status
func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
defer txn.Abort()
iter, err := txn.Get("csi_plugins", "id")
if err != nil {
return nil, fmt.Errorf("csi_plugins lookup failed: %v", err)
}
return iter, nil
}
// CSIPluginByID returns the one named CSIPlugin
func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) {
txn := s.db.Txn(false)
defer txn.Abort()
raw, err := txn.First("csi_plugins", "id", id)
if err != nil {
return nil, fmt.Errorf("csi_plugin lookup failed: %s %v", id, err)
}
if raw == nil {
return nil, nil
}
plug := raw.(*structs.CSIPlugin)
return plug, nil
}
// CSIPluginDenormalize returns a CSIPlugin with jobs
func (s *StateStore) CSIPluginDenormalize(ws memdb.WatchSet, plug *structs.CSIPlugin) (*structs.CSIPlugin, error) {
if plug == nil {
return nil, nil
}
for ns, js := range plug.Jobs {
for id := range js {
j, err := s.JobByID(ws, ns, id)
if err != nil {
return nil, err
}
plug.Jobs[ns][id] = j
}
}
return plug, nil
}
// UpsertPeriodicLaunch is used to register a launch or update it.
func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error {
txn := s.db.Txn(true)
@@ -2448,8 +2780,8 @@ func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, termin
return out, nil
}
// AllocsByJob returns all the allocations by job id
func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error) {
// AllocsByJob returns allocations by job id
func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, anyCreateIndex bool) ([]*structs.Allocation, error) {
txn := s.db.Txn(false)
// Get the job
@@ -2481,7 +2813,7 @@ func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all
// If the allocation belongs to a job with the same ID but a different
// create index and we are not getting all the allocations whose Jobs
// matches the same Job ID then we skip it
if !all && job != nil && alloc.Job.CreateIndex != job.CreateIndex {
if !anyCreateIndex && job != nil && alloc.Job.CreateIndex != job.CreateIndex {
continue
}
out = append(out, raw.(*structs.Allocation))

View File

@@ -2830,18 +2830,20 @@ func TestStateStore_RestoreJobSummary(t *testing.T) {
func TestStateStore_CSIVolume(t *testing.T) {
state := testStateStore(t)
v0 := structs.CreateCSIVolume("foo")
v0.ID = "DEADBEEF-70AD-4672-9178-802BCA500C87"
id0, id1 := uuid.Generate(), uuid.Generate()
v0 := structs.NewCSIVolume("foo")
v0.ID = id0
v0.Namespace = "default"
v0.Driver = "minnie"
v0.PluginID = "minnie"
v0.Healthy = true
v0.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
v1 := structs.CreateCSIVolume("foo")
v1.ID = "BAADF00D-70AD-4672-9178-802BCA500C87"
v1 := structs.NewCSIVolume("foo")
v1.ID = id1
v1.Namespace = "default"
v1.Driver = "adam"
v1.PluginID = "adam"
v1.Healthy = true
v1.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v1.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
@@ -2869,18 +2871,18 @@ func TestStateStore_CSIVolume(t *testing.T) {
require.Equal(t, 2, len(vs))
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByDriver(ws, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 1, len(vs))
err = state.CSIVolumeDeregister(1, []string{
"BAADF00D-70AD-4672-9178-802BCA500C87",
id1,
})
require.NoError(t, err)
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByDriver(ws, "adam")
iter, err = state.CSIVolumesByPluginID(ws, "adam")
require.NoError(t, err)
vs = slurp(iter)
require.Equal(t, 0, len(vs))
@@ -2898,26 +2900,147 @@ func TestStateStore_CSIVolume(t *testing.T) {
w := structs.CSIVolumeClaimWrite
u := structs.CSIVolumeClaimRelease
err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, r)
err = state.CSIVolumeClaim(2, id0, a0, r)
require.NoError(t, err)
err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a1, w)
err = state.CSIVolumeClaim(2, id0, a1, w)
require.NoError(t, err)
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByDriver(ws, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.False(t, vs[0].CanWrite())
err = state.CSIVolumeClaim(2, "DEADBEEF-70AD-4672-9178-802BCA500C87", a0, u)
err = state.CSIVolumeClaim(2, id0, a0, u)
require.NoError(t, err)
ws = memdb.NewWatchSet()
iter, err = state.CSIVolumesByDriver(ws, "minnie")
iter, err = state.CSIVolumesByPluginID(ws, "minnie")
require.NoError(t, err)
vs = slurp(iter)
require.True(t, vs[0].CanReadOnly())
}
// TestStateStore_CSIPluginJobs creates plugin jobs and tests that they create a CSIPlugin
func TestStateStore_CSIPluginJobs(t *testing.T) {
index := uint64(999)
state := testStateStore(t)
testStateStore_CSIPluginJobs(t, index, state)
}
func testStateStore_CSIPluginJobs(t *testing.T, index uint64, state *StateStore) (uint64, *StateStore) {
j0 := mock.Job()
j0.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{
ID: "foo",
Type: structs.CSIPluginTypeController,
}
j1 := mock.Job()
j1.Type = structs.JobTypeSystem
j1.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{
ID: "foo",
Type: structs.CSIPluginTypeNode,
}
index++
err := state.UpsertJob(index, j0)
require.NoError(t, err)
index++
err = state.UpsertJob(index, j1)
require.NoError(t, err)
// Get the plugin back out by id
ws := memdb.NewWatchSet()
plug, err := state.CSIPluginByID(ws, "foo")
require.NoError(t, err)
require.Equal(t, "foo", plug.ID)
jids := map[string]struct{}{j0.ID: struct{}{}, j1.ID: struct{}{}}
for jid := range plug.Jobs[structs.DefaultNamespace] {
delete(jids, jid)
}
require.Equal(t, 0, len(jids))
return index, state
}
// TestStateStore_CSIPluginNodes uses the state from jobs, and uses node fingerprinting to update health
func TestStateStore_CSIPluginNodes(t *testing.T) {
index := uint64(999)
state := testStateStore(t)
index, state = testStateStore_CSIPluginJobs(t, index, state)
testStateStore_CSIPluginNodes(t, index, state)
}
func testStateStore_CSIPluginNodes(t *testing.T, index uint64, state *StateStore) (uint64, *StateStore) {
// Create Nodes fingerprinting the plugins
ns := []*structs.Node{mock.Node(), mock.Node(), mock.Node()}
for _, n := range ns {
index++
err := state.UpsertNode(index, n)
require.NoError(t, err)
}
// Fingerprint a running controller plugin
n0 := ns[0].Copy()
n0.CSIControllerPlugins = map[string]*structs.CSIInfo{
"foo": {
PluginID: "foo",
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsListVolumes: true,
},
},
}
index++
err := state.UpsertNode(index, n0)
require.NoError(t, err)
// Fingerprint two running node plugins
for _, n := range ns[1:] {
n = n.Copy()
n.CSINodePlugins = map[string]*structs.CSIInfo{
"foo": {
PluginID: "foo",
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)
}
ws := memdb.NewWatchSet()
plug, err := state.CSIPluginByID(ws, "foo")
require.NoError(t, err)
require.Equal(t, "foo", plug.ID)
require.Equal(t, 1, plug.ControllersHealthy)
require.Equal(t, 2, plug.NodesHealthy)
return index, state
}
// TestStateStore_CSIPluginBackwards gets the node state first, and the job state second
func TestStateStore_CSIPluginBackwards(t *testing.T) {
index := uint64(999)
state := testStateStore(t)
index, state = testStateStore_CSIPluginNodes(t, index, state)
testStateStore_CSIPluginJobs(t, index, state)
}
func TestStateStore_Indexes(t *testing.T) {
t.Parallel()

View File

@@ -135,9 +135,9 @@ func ValidCSIVolumeWriteAccessMode(accessMode CSIVolumeAccessMode) bool {
}
}
// CSIVolume is the full representation of a CSI Volume
type CSIVolume struct {
ID string
Driver string
Namespace string
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode
@@ -150,66 +150,76 @@ type CSIVolume struct {
// Healthy is true if all the denormalized plugin health fields are true, and the
// volume has not been marked for garbage collection
Healthy bool
VolumeGC time.Time
ControllerName string
ControllerHealthy bool
Controller []*Job
NodeHealthy int
NodeExpected int
ResourceExhausted time.Time
Healthy bool
VolumeGC time.Time
PluginID string
ControllersHealthy int
ControllersExpected int
NodesHealthy int
NodesExpected int
ResourceExhausted time.Time
CreatedIndex uint64
ModifiedIndex uint64
CreateIndex uint64
ModifyIndex uint64
}
// CSIVolListStub is partial representation of a CSI Volume for inclusion in lists
type CSIVolListStub struct {
ID string
Driver string
Namespace string
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode
AttachmentMode CSIVolumeAttachmentMode
CurrentReaders int
CurrentWriters int
Healthy bool
VolumeGC time.Time
ControllerName string
ControllerHealthy bool
NodeHealthy int
NodeExpected int
CreatedIndex uint64
ModifiedIndex uint64
ID string
Namespace string
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode
AttachmentMode CSIVolumeAttachmentMode
CurrentReaders int
CurrentWriters int
Healthy bool
VolumeGC time.Time
PluginID string
ControllersHealthy int
ControllersExpected int
NodesHealthy int
NodesExpected int
CreateIndex uint64
ModifyIndex uint64
}
func CreateCSIVolume(controllerName string) *CSIVolume {
return &CSIVolume{
ControllerName: controllerName,
ReadAllocs: map[string]*Allocation{},
WriteAllocs: map[string]*Allocation{},
PastAllocs: map[string]*Allocation{},
Topologies: []*CSITopology{},
// NewCSIVolume creates the volume struct. No side-effects
func NewCSIVolume(pluginID string) *CSIVolume {
out := &CSIVolume{
ID: pluginID,
}
out.newStructs()
return out
}
func (v *CSIVolume) newStructs() {
if v.Topologies == nil {
v.Topologies = []*CSITopology{}
}
v.ReadAllocs = map[string]*Allocation{}
v.WriteAllocs = map[string]*Allocation{}
v.PastAllocs = map[string]*Allocation{}
}
func (v *CSIVolume) Stub() *CSIVolListStub {
stub := CSIVolListStub{
ID: v.ID,
Driver: v.Driver,
Namespace: v.Namespace,
Topologies: v.Topologies,
AccessMode: v.AccessMode,
AttachmentMode: v.AttachmentMode,
CurrentReaders: len(v.ReadAllocs),
CurrentWriters: len(v.WriteAllocs),
Healthy: v.Healthy,
VolumeGC: v.VolumeGC,
ControllerName: v.ControllerName,
ControllerHealthy: v.ControllerHealthy,
NodeHealthy: v.NodeHealthy,
NodeExpected: v.NodeExpected,
CreatedIndex: v.CreatedIndex,
ModifiedIndex: v.ModifiedIndex,
ID: v.ID,
Namespace: v.Namespace,
Topologies: v.Topologies,
AccessMode: v.AccessMode,
AttachmentMode: v.AttachmentMode,
CurrentReaders: len(v.ReadAllocs),
CurrentWriters: len(v.WriteAllocs),
Healthy: v.Healthy,
VolumeGC: v.VolumeGC,
PluginID: v.PluginID,
ControllersHealthy: v.ControllersHealthy,
NodesHealthy: v.NodesHealthy,
NodesExpected: v.NodesExpected,
CreateIndex: v.CreateIndex,
ModifyIndex: v.ModifyIndex,
}
return &stub
@@ -238,6 +248,29 @@ func (v *CSIVolume) CanWrite() bool {
}
}
// Copy returns a copy of the volume, which shares only the Topologies slice
func (v *CSIVolume) Copy(index uint64) *CSIVolume {
copy := *v
out := &copy
out.newStructs()
out.ModifyIndex = index
for k, v := range v.ReadAllocs {
out.ReadAllocs[k] = v
}
for k, v := range v.WriteAllocs {
out.WriteAllocs[k] = v
}
for k, v := range v.PastAllocs {
out.PastAllocs[k] = v
}
return out
}
// Claim updates the allocations and changes the volume state
func (v *CSIVolume) Claim(claim CSIVolumeClaimMode, alloc *Allocation) bool {
switch claim {
case CSIVolumeClaimRead:
@@ -250,30 +283,39 @@ func (v *CSIVolume) Claim(claim CSIVolumeClaimMode, alloc *Allocation) bool {
return false
}
// ClaimRead marks an allocation as using a volume read-only
func (v *CSIVolume) ClaimRead(alloc *Allocation) bool {
if !v.CanReadOnly() {
return false
}
v.ReadAllocs[alloc.ID] = alloc
// Allocations are copy on write, so we want to keep the id but don't need the
// pointer. We'll get it from the db in denormalize.
v.ReadAllocs[alloc.ID] = nil
delete(v.WriteAllocs, alloc.ID)
delete(v.PastAllocs, alloc.ID)
return true
}
// ClaimWrite marks an allocation as using a volume as a writer
func (v *CSIVolume) ClaimWrite(alloc *Allocation) bool {
if !v.CanWrite() {
return false
}
v.WriteAllocs[alloc.ID] = alloc
// Allocations are copy on write, so we want to keep the id but don't need the
// pointer. We'll get it from the db in denormalize.
v.WriteAllocs[alloc.ID] = nil
delete(v.ReadAllocs, alloc.ID)
delete(v.PastAllocs, alloc.ID)
return true
}
// ClaimRelease is called when the allocation has terminated and already stopped using the volume
func (v *CSIVolume) ClaimRelease(alloc *Allocation) bool {
delete(v.ReadAllocs, alloc.ID)
delete(v.WriteAllocs, alloc.ID)
v.PastAllocs[alloc.ID] = alloc
// Allocations are copy on write, so we want to keep the id but don't need the
// pointer. We'll get it from the db in denormalize.
v.PastAllocs[alloc.ID] = nil
return true
}
@@ -292,11 +334,10 @@ func (v *CSIVolume) Equal(o *CSIVolume) bool {
// Omit the plugin health fields, their values are controlled by plugin jobs
if v.ID == o.ID &&
v.Driver == o.Driver &&
v.Namespace == o.Namespace &&
v.AccessMode == o.AccessMode &&
v.AttachmentMode == o.AttachmentMode &&
v.ControllerName == o.ControllerName {
v.PluginID == o.PluginID {
// Setwise equality of topologies
var ok bool
for _, t := range v.Topologies {
@@ -323,8 +364,8 @@ func (v *CSIVolume) Validate() error {
if v.ID == "" {
errs = append(errs, "missing volume id")
}
if v.Driver == "" {
errs = append(errs, "missing driver")
if v.PluginID == "" {
errs = append(errs, "missing plugin id")
}
if v.Namespace == "" {
errs = append(errs, "missing namespace")
@@ -388,7 +429,7 @@ type CSIVolumeClaimRequest struct {
}
type CSIVolumeListRequest struct {
Driver string
PluginID string
QueryOptions
}
@@ -406,3 +447,189 @@ type CSIVolumeGetResponse struct {
Volume *CSIVolume
QueryMeta
}
// CSIPlugin bundles job and info context for the plugin for clients
type CSIPlugin struct {
ID string
Type CSIPluginType
// Jobs is updated by UpsertJob, and keeps an index of jobs containing node or
// controller tasks for this plugin. It is addressed by [job.Namespace][job.ID]
Jobs map[string]map[string]*Job
ControllersHealthy int
Controllers map[string]*CSIInfo
NodesHealthy int
Nodes map[string]*CSIInfo
CreateIndex uint64
ModifyIndex uint64
}
// NewCSIPlugin creates the plugin struct. No side-effects
func NewCSIPlugin(id string, index uint64) *CSIPlugin {
out := &CSIPlugin{
ID: id,
CreateIndex: index,
ModifyIndex: index,
}
out.newStructs()
return out
}
func (p *CSIPlugin) newStructs() {
p.Jobs = map[string]map[string]*Job{}
p.Controllers = map[string]*CSIInfo{}
p.Nodes = map[string]*CSIInfo{}
}
func (p *CSIPlugin) Copy(index uint64) *CSIPlugin {
copy := *p
out := &copy
out.newStructs()
out.ModifyIndex = index
for ns, js := range p.Jobs {
out.Jobs[ns] = map[string]*Job{}
for jid, j := range js {
out.Jobs[ns][jid] = j
}
}
for k, v := range p.Controllers {
out.Controllers[k] = v
}
for k, v := range p.Nodes {
out.Nodes[k] = v
}
return out
}
// AddJob adds a job entry to the plugin
func (p *CSIPlugin) AddJob(job *Job) {
if _, ok := p.Jobs[job.Namespace]; !ok {
p.Jobs[job.Namespace] = map[string]*Job{}
}
p.Jobs[job.Namespace][job.ID] = nil
}
func (p *CSIPlugin) DeleteJob(job *Job) {
delete(p.Jobs[job.Namespace], job.ID)
}
// AddPlugin adds a single plugin running on the node. Called from state.NodeUpdate in a
// transaction
func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo, index uint64) {
if info.ControllerInfo != nil {
prev, ok := p.Controllers[nodeID]
if ok && prev.Healthy {
p.ControllersHealthy -= 1
}
p.Controllers[nodeID] = info
if info.Healthy {
p.ControllersHealthy += 1
}
}
if info.NodeInfo != nil {
prev, ok := p.Nodes[nodeID]
if ok && prev.Healthy {
p.NodesHealthy -= 1
}
p.Nodes[nodeID] = info
if info.Healthy {
p.NodesHealthy += 1
}
}
p.ModifyIndex = index
}
// DeleteNode removes all plugins from the node. Called from state.DeleteNode in a
// transaction
func (p *CSIPlugin) DeleteNode(nodeID string, index uint64) {
prev, ok := p.Controllers[nodeID]
if ok && prev.Healthy {
p.ControllersHealthy -= 1
}
delete(p.Controllers, nodeID)
prev, ok = p.Nodes[nodeID]
if ok && prev.Healthy {
p.NodesHealthy -= 1
}
delete(p.Nodes, nodeID)
p.ModifyIndex = index
}
type CSIPluginListStub struct {
ID string
Type CSIPluginType
JobIDs map[string]map[string]struct{}
ControllersHealthy int
ControllersExpected int
NodesHealthy int
NodesExpected int
CreateIndex uint64
ModifyIndex uint64
}
func (p *CSIPlugin) Stub() *CSIPluginListStub {
ids := map[string]map[string]struct{}{}
for ns, js := range p.Jobs {
ids[ns] = map[string]struct{}{}
for id := range js {
ids[ns][id] = struct{}{}
}
}
return &CSIPluginListStub{
ID: p.ID,
Type: p.Type,
JobIDs: ids,
ControllersHealthy: p.ControllersHealthy,
ControllersExpected: len(p.Controllers),
NodesHealthy: p.NodesHealthy,
NodesExpected: len(p.Nodes),
CreateIndex: p.CreateIndex,
ModifyIndex: p.ModifyIndex,
}
}
func (p *CSIPlugin) IsEmpty() bool {
if !(len(p.Controllers) == 0 && len(p.Nodes) == 0) {
return false
}
empty := true
for _, m := range p.Jobs {
if len(m) > 0 {
empty = false
}
}
return empty
}
type CSIPluginListRequest struct {
QueryOptions
}
type CSIPluginListResponse struct {
Plugins []*CSIPluginListStub
QueryMeta
}
type CSIPluginGetRequest struct {
ID string
QueryOptions
}
type CSIPluginGetResponse struct {
Plugin *CSIPlugin
QueryMeta
}

View File

@@ -7,7 +7,7 @@ import (
)
func TestCSIVolumeClaim(t *testing.T) {
vol := CreateCSIVolume("")
vol := NewCSIVolume("")
vol.AccessMode = CSIVolumeAccessModeMultiNodeSingleWriter
vol.Healthy = true

View File

@@ -18,7 +18,6 @@ const (
errUnknownNomadVersion = "Unable to determine Nomad version"
errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later"
errMissingAllocID = "Missing allocation ID"
errMissingCSIVolumeID = "Missing Volume ID"
// Prefix based errors that are used to check if the error is of a given
// type. These errors should be created with the associated constructor.
@@ -42,7 +41,6 @@ var (
ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion)
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
ErrMissingAllocID = errors.New(errMissingAllocID)
ErrMissingCSIVolumeID = errors.New(errMissingCSIVolumeID)
)
// IsErrNoLeader returns whether the error is due to there being no leader.