CSI: volume and plugin allocations in the API (#8590)

* command/agent/csi_endpoint: explicitly convert to API structs, and convert allocs for single object get endpoints
This commit is contained in:
Lang Martin
2020-08-11 12:24:41 -04:00
committed by GitHub
parent 8888ab380b
commit 8a095fca90
4 changed files with 430 additions and 17 deletions

View File

@@ -925,6 +925,7 @@ type TaskEvent struct {
Time int64
DisplayMessage string
Details map[string]string
Message string
// DEPRECATION NOTICE: The following fields are all deprecated. see TaskEvent struct in structs.go for details.
FailsTask bool
RestartReason string
@@ -933,7 +934,6 @@ type TaskEvent struct {
DriverMessage string
ExitCode int
Signal int
Message string
KillReason string
KillTimeout time.Duration
KillError string

View File

@@ -5,11 +5,16 @@ import (
"strconv"
"strings"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
)
func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
switch req.Method {
case http.MethodPut, http.MethodPost:
return s.csiVolumePut(resp, req)
case http.MethodGet:
default:
return nil, CodedError(405, ErrInvalidMethod)
}
@@ -61,7 +66,7 @@ func (s *HTTPServer) CSIVolumeSpecificRequest(resp http.ResponseWriter, req *htt
case http.MethodGet:
return s.csiVolumeGet(id, resp, req)
case http.MethodPut:
return s.csiVolumePut(id, resp, req)
return s.csiVolumePut(resp, req)
case http.MethodDelete:
return s.csiVolumeDelete(id, resp, req)
default:
@@ -100,15 +105,20 @@ func (s *HTTPServer) csiVolumeGet(id string, resp http.ResponseWriter, req *http
return nil, CodedError(404, "volume not found")
}
vol := structsCSIVolumeToApi(out.Volume)
// remove sensitive fields, as our redaction mechanism doesn't
// help serializing here
out.Volume.Secrets = nil
out.Volume.MountOptions = nil
return out.Volume, nil
vol.Secrets = nil
vol.MountOptions = nil
return vol, nil
}
func (s *HTTPServer) csiVolumePut(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
func (s *HTTPServer) csiVolumePut(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
switch req.Method {
case http.MethodPost, http.MethodPut:
default:
return nil, CodedError(405, ErrInvalidMethod)
}
@@ -133,7 +143,7 @@ func (s *HTTPServer) csiVolumePut(id string, resp http.ResponseWriter, req *http
}
func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "DELETE" {
if req.Method != http.MethodDelete {
return nil, CodedError(405, ErrInvalidMethod)
}
@@ -193,7 +203,7 @@ func (s *HTTPServer) csiVolumeDetach(id string, resp http.ResponseWriter, req *h
// CSIPluginsRequest lists CSI plugins
func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
}
@@ -225,7 +235,7 @@ func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Reque
// CSIPluginSpecificRequest list the job with CSIInfo
func (s *HTTPServer) CSIPluginSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
}
@@ -252,5 +262,294 @@ func (s *HTTPServer) CSIPluginSpecificRequest(resp http.ResponseWriter, req *htt
return nil, CodedError(404, "plugin not found")
}
return out.Plugin, nil
return structsCSIPluginToApi(out.Plugin), nil
}
// structsCSIPluginToApi converts CSIPlugin, setting Expected the count of known plugin
// instances
func structsCSIPluginToApi(plug *structs.CSIPlugin) *api.CSIPlugin {
out := &api.CSIPlugin{
ID: plug.ID,
Provider: plug.Provider,
Version: plug.Version,
Allocations: make([]*api.AllocationListStub, len(plug.Allocations)),
ControllerRequired: plug.ControllerRequired,
ControllersHealthy: plug.ControllersHealthy,
ControllersExpected: len(plug.Controllers),
Controllers: make(map[string]*api.CSIInfo),
NodesHealthy: plug.NodesHealthy,
NodesExpected: len(plug.Nodes),
Nodes: make(map[string]*api.CSIInfo),
CreateIndex: plug.CreateIndex,
ModifyIndex: plug.ModifyIndex,
}
for k, v := range plug.Controllers {
out.Controllers[k] = structsCSIInfoToApi(v)
}
for k, v := range plug.Nodes {
out.Nodes[k] = structsCSIInfoToApi(v)
}
for _, a := range plug.Allocations {
out.Allocations = append(out.Allocations, structsAllocListStubToApi(a))
}
return out
}
// structsCSIVolumeToApi converts CSIVolume, creating the allocation array
func structsCSIVolumeToApi(vol *structs.CSIVolume) *api.CSIVolume {
out := &api.CSIVolume{
ID: vol.ID,
Name: vol.Name,
ExternalID: vol.ExternalID,
Namespace: vol.Namespace,
Topologies: structsCSITopolgiesToApi(vol.Topologies),
AccessMode: structsCSIAccessModeToApi(vol.AccessMode),
AttachmentMode: structsCSIAttachmentModeToApi(vol.AttachmentMode),
MountOptions: structsCSIMountOptionsToApi(vol.MountOptions),
Secrets: structsCSISecretsToApi(vol.Secrets),
Parameters: vol.Parameters,
Context: vol.Context,
// Allocations is the collapsed list of both read and write allocs
Allocations: []*api.AllocationListStub{},
Schedulable: vol.Schedulable,
PluginID: vol.PluginID,
Provider: vol.Provider,
ProviderVersion: vol.ProviderVersion,
ControllerRequired: vol.ControllerRequired,
ControllersHealthy: vol.ControllersHealthy,
ControllersExpected: vol.ControllersExpected,
NodesHealthy: vol.NodesHealthy,
NodesExpected: vol.NodesExpected,
ResourceExhausted: vol.ResourceExhausted,
CreateIndex: vol.CreateIndex,
ModifyIndex: vol.ModifyIndex,
}
for _, a := range vol.WriteAllocs {
out.Allocations = append(out.Allocations, structsAllocListStubToApi(a.Stub()))
}
for _, a := range vol.ReadAllocs {
out.Allocations = append(out.Allocations, structsAllocListStubToApi(a.Stub()))
}
return out
}
// structsCSIInfoToApi converts CSIInfo, part of CSIPlugin
func structsCSIInfoToApi(info *structs.CSIInfo) *api.CSIInfo {
out := &api.CSIInfo{
PluginID: info.PluginID,
Healthy: info.Healthy,
HealthDescription: info.HealthDescription,
UpdateTime: info.UpdateTime,
RequiresControllerPlugin: info.RequiresControllerPlugin,
RequiresTopologies: info.RequiresTopologies,
}
if info.ControllerInfo != nil {
out.ControllerInfo = &api.CSIControllerInfo{
SupportsReadOnlyAttach: info.ControllerInfo.SupportsReadOnlyAttach,
SupportsAttachDetach: info.ControllerInfo.SupportsAttachDetach,
SupportsListVolumes: info.ControllerInfo.SupportsListVolumes,
SupportsListVolumesAttachedNodes: info.ControllerInfo.SupportsListVolumesAttachedNodes,
}
}
if info.NodeInfo != nil {
out.NodeInfo = &api.CSINodeInfo{
ID: info.NodeInfo.ID,
MaxVolumes: info.NodeInfo.MaxVolumes,
RequiresNodeStageVolume: info.NodeInfo.RequiresNodeStageVolume,
}
if info.NodeInfo.AccessibleTopology != nil {
out.NodeInfo.AccessibleTopology = &api.CSITopology{}
out.NodeInfo.AccessibleTopology.Segments = info.NodeInfo.AccessibleTopology.Segments
}
}
return out
}
// structsAllocListStubToApi converts AllocListStub, for CSIPlugin
func structsAllocListStubToApi(alloc *structs.AllocListStub) *api.AllocationListStub {
out := &api.AllocationListStub{
ID: alloc.ID,
EvalID: alloc.EvalID,
Name: alloc.Name,
Namespace: alloc.Namespace,
NodeID: alloc.NodeID,
NodeName: alloc.NodeName,
JobID: alloc.JobID,
JobType: alloc.JobType,
JobVersion: alloc.JobVersion,
TaskGroup: alloc.TaskGroup,
DesiredStatus: alloc.DesiredStatus,
DesiredDescription: alloc.DesiredDescription,
ClientStatus: alloc.ClientStatus,
ClientDescription: alloc.ClientDescription,
FollowupEvalID: alloc.FollowupEvalID,
PreemptedAllocations: alloc.PreemptedAllocations,
PreemptedByAllocation: alloc.PreemptedByAllocation,
CreateIndex: alloc.CreateIndex,
ModifyIndex: alloc.ModifyIndex,
CreateTime: alloc.CreateTime,
ModifyTime: alloc.ModifyTime,
}
out.DeploymentStatus = structsAllocDeploymentStatusToApi(alloc.DeploymentStatus)
out.RescheduleTracker = structsRescheduleTrackerToApi(alloc.RescheduleTracker)
for k, v := range alloc.TaskStates {
out.TaskStates[k] = structsTaskStateToApi(v)
}
return out
}
// structsAllocDeploymentStatusToApi converts RescheduleTracker, part of AllocListStub
func structsAllocDeploymentStatusToApi(ads *structs.AllocDeploymentStatus) *api.AllocDeploymentStatus {
out := &api.AllocDeploymentStatus{
Healthy: ads.Healthy,
Timestamp: ads.Timestamp,
Canary: ads.Canary,
ModifyIndex: ads.ModifyIndex,
}
return out
}
// structsRescheduleTrackerToApi converts RescheduleTracker, part of AllocListStub
func structsRescheduleTrackerToApi(rt *structs.RescheduleTracker) *api.RescheduleTracker {
out := &api.RescheduleTracker{}
for _, e := range rt.Events {
out.Events = append(out.Events, &api.RescheduleEvent{
RescheduleTime: e.RescheduleTime,
PrevAllocID: e.PrevAllocID,
PrevNodeID: e.PrevNodeID,
})
}
return out
}
// structsTaskStateToApi converts TaskState, part of AllocListStub
func structsTaskStateToApi(ts *structs.TaskState) *api.TaskState {
out := &api.TaskState{
State: ts.State,
Failed: ts.Failed,
Restarts: ts.Restarts,
LastRestart: ts.LastRestart,
StartedAt: ts.StartedAt,
FinishedAt: ts.FinishedAt,
}
for _, te := range ts.Events {
out.Events = append(out.Events, structsTaskEventToApi(te))
}
return out
}
// structsTaskEventToApi converts TaskEvents, part of AllocListStub
func structsTaskEventToApi(te *structs.TaskEvent) *api.TaskEvent {
out := &api.TaskEvent{
Type: te.Type,
Time: te.Time,
DisplayMessage: te.DisplayMessage,
Details: te.Details,
// DEPRECATION NOTICE: The following fields are all deprecated. see TaskEvent struct in structs.go for details.
FailsTask: te.FailsTask,
RestartReason: te.RestartReason,
SetupError: te.SetupError,
DriverError: te.DriverError,
DriverMessage: te.DriverMessage,
ExitCode: te.ExitCode,
Signal: te.Signal,
Message: te.Message,
KillReason: te.KillReason,
KillTimeout: te.KillTimeout,
KillError: te.KillError,
StartDelay: te.StartDelay,
DownloadError: te.DownloadError,
ValidationError: te.ValidationError,
DiskLimit: te.DiskLimit,
FailedSibling: te.FailedSibling,
VaultError: te.VaultError,
TaskSignalReason: te.TaskSignalReason,
TaskSignal: te.TaskSignal,
GenericSource: te.GenericSource,
}
return out
}
// structsCSITopolgiesToApi converts topologies, part of structsCSIVolumeToApi
func structsCSITopolgiesToApi(tops []*structs.CSITopology) []*api.CSITopology {
out := make([]*api.CSITopology, 0, len(tops))
for _, t := range tops {
out = append(out, &api.CSITopology{
Segments: t.Segments,
})
}
return out
}
// structsCSIAccessModeToApi converts access mode, part of structsCSIVolumeToApi
func structsCSIAccessModeToApi(mode structs.CSIVolumeAccessMode) api.CSIVolumeAccessMode {
switch mode {
case structs.CSIVolumeAccessModeSingleNodeReader:
return api.CSIVolumeAccessModeSingleNodeReader
case structs.CSIVolumeAccessModeSingleNodeWriter:
return api.CSIVolumeAccessModeSingleNodeWriter
case structs.CSIVolumeAccessModeMultiNodeReader:
return api.CSIVolumeAccessModeMultiNodeReader
case structs.CSIVolumeAccessModeMultiNodeSingleWriter:
return api.CSIVolumeAccessModeMultiNodeSingleWriter
case structs.CSIVolumeAccessModeMultiNodeMultiWriter:
return api.CSIVolumeAccessModeMultiNodeMultiWriter
default:
}
return api.CSIVolumeAccessModeUnknown
}
// structsCSIAttachmentModeToApiModeToApi converts attachment mode, part of structsCSIVolumeToApi
func structsCSIAttachmentModeToApi(mode structs.CSIVolumeAttachmentMode) api.CSIVolumeAttachmentMode {
switch mode {
case structs.CSIVolumeAttachmentModeBlockDevice:
return api.CSIVolumeAttachmentModeBlockDevice
case structs.CSIVolumeAttachmentModeFilesystem:
return api.CSIVolumeAttachmentModeFilesystem
default:
}
return api.CSIVolumeAttachmentModeUnknown
}
// structsCSIMountOptionsToApi converts mount options, part of structsCSIVolumeToApi
func structsCSIMountOptionsToApi(opts *structs.CSIMountOptions) *api.CSIMountOptions {
if opts == nil {
return nil
}
return &api.CSIMountOptions{
FSType: opts.FSType,
MountFlags: opts.MountFlags,
}
}
func structsCSISecretsToApi(secrets structs.CSISecrets) api.CSISecrets {
out := make(api.CSISecrets, len(secrets))
for k, v := range secrets {
out[k] = v
}
return out
}

View File

@@ -0,0 +1,101 @@
package agent
import (
"bytes"
"net/http"
"net/http/httptest"
"testing"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)
func TestHTTP_CSIEndpointPlugin(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
server := s.Agent.Server()
cleanup := state.CreateTestCSIPlugin(server.State(), "foo")
defer cleanup()
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/plugin/csi/foo", body)
require.NoError(t, err)
resp := httptest.NewRecorder()
obj, err := s.Server.CSIPluginSpecificRequest(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code)
out, ok := obj.(*api.CSIPlugin)
require.True(t, ok)
require.Equal(t, 1, out.ControllersExpected)
require.Equal(t, 1, out.ControllersHealthy)
require.Len(t, out.Controllers, 1)
require.Equal(t, 2, out.NodesExpected)
require.Equal(t, 2, out.NodesHealthy)
require.Len(t, out.Nodes, 2)
})
}
func TestHTTP_CSIEndpointUtils(t *testing.T) {
secrets := structsCSISecretsToApi(structs.CSISecrets{
"foo": "bar",
})
require.Equal(t, "bar", secrets["foo"])
tops := structsCSITopolgiesToApi([]*structs.CSITopology{{
Segments: map[string]string{"foo": "bar"},
}})
require.Equal(t, "bar", tops[0].Segments["foo"])
}
func TestHTTP_CSIEndpointVolume(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
server := s.Agent.Server()
cleanup := state.CreateTestCSIPluginNodeOnly(server.State(), "foo")
defer cleanup()
args := structs.CSIVolumeRegisterRequest{
Volumes: []*structs.CSIVolume{{
ID: "bar",
PluginID: "foo",
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}},
}
body := encodeReq(args)
req, err := http.NewRequest("PUT", "/v1/volumes", body)
require.NoError(t, err)
resp := httptest.NewRecorder()
_, err = s.Server.CSIVolumesRequest(resp, req)
require.NoError(t, err, "put error")
require.Equal(t, 200, resp.Code)
req, err = http.NewRequest("GET", "/v1/volume/csi/bar", nil)
require.NoError(t, err)
resp = httptest.NewRecorder()
raw, err := s.Server.CSIVolumeSpecificRequest(resp, req)
require.NoError(t, err, "get error")
require.Equal(t, 200, resp.Code)
out, ok := raw.(*api.CSIVolume)
require.True(t, ok)
pretty.Log(out)
require.Equal(t, 1, out.ControllersExpected)
require.Equal(t, 1, out.ControllersHealthy)
require.Equal(t, 2, out.NodesExpected)
require.Equal(t, 2, out.NodesHealthy)
})
}

View File

@@ -24,14 +24,27 @@ func TestStateStore(t testing.T) *StateStore {
return state
}
// CreateTestPlugin is a helper that generates the node + fingerprint results necessary to
// create a CSIPlugin by directly inserting into the state store. It's exported for use in
// other test packages
// CreateTestCSIPlugin is a helper that generates the node + fingerprint results necessary
// to create a CSIPlugin by directly inserting into the state store. The plugin requires a
// controller.
func CreateTestCSIPlugin(s *StateStore, id string) func() {
return createTestCSIPlugin(s, id, true)
}
// CreateTestCSIPluginNodeOnly is a helper that generates the node + fingerprint results
// necessary to create a CSIPlugin by directly inserting into the state store. The plugin
// does not require a controller. In tests that exercise volume registration, this prevents
// an error attempting to RPC the node.
func CreateTestCSIPluginNodeOnly(s *StateStore, id string) func() {
return createTestCSIPlugin(s, id, false)
}
func createTestCSIPlugin(s *StateStore, id string, requiresController bool) func() {
// Create some nodes
ns := make([]*structs.Node, 3)
for i := range ns {
n := mock.Node()
n.Attributes["nomad.version"] = "0.11.0"
ns[i] = n
}
@@ -42,7 +55,7 @@ func CreateTestCSIPlugin(s *StateStore, id string) func() {
AllocID: uuid.Generate(),
Healthy: true,
HealthDescription: "healthy",
RequiresControllerPlugin: true,
RequiresControllerPlugin: requiresController,
RequiresTopologies: false,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
@@ -61,7 +74,7 @@ func CreateTestCSIPlugin(s *StateStore, id string) func() {
AllocID: uuid.Generate(),
Healthy: true,
HealthDescription: "healthy",
RequiresControllerPlugin: true,
RequiresControllerPlugin: requiresController,
RequiresTopologies: false,
NodeInfo: &structs.CSINodeInfo{
ID: n.ID,