diff --git a/api/tasks.go b/api/tasks.go index 8275813e6..27bbdca2d 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -925,6 +925,7 @@ type TaskEvent struct { Time int64 DisplayMessage string Details map[string]string + Message string // DEPRECATION NOTICE: The following fields are all deprecated. see TaskEvent struct in structs.go for details. FailsTask bool RestartReason string @@ -933,7 +934,6 @@ type TaskEvent struct { DriverMessage string ExitCode int Signal int - Message string KillReason string KillTimeout time.Duration KillError string diff --git a/command/agent/csi_endpoint.go b/command/agent/csi_endpoint.go index 95c36f2a9..16734f7b6 100644 --- a/command/agent/csi_endpoint.go +++ b/command/agent/csi_endpoint.go @@ -5,11 +5,16 @@ import ( "strconv" "strings" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" ) func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - if req.Method != "GET" { + switch req.Method { + case http.MethodPut, http.MethodPost: + return s.csiVolumePut(resp, req) + case http.MethodGet: + default: return nil, CodedError(405, ErrInvalidMethod) } @@ -61,7 +66,7 @@ func (s *HTTPServer) CSIVolumeSpecificRequest(resp http.ResponseWriter, req *htt case http.MethodGet: return s.csiVolumeGet(id, resp, req) case http.MethodPut: - return s.csiVolumePut(id, resp, req) + return s.csiVolumePut(resp, req) case http.MethodDelete: return s.csiVolumeDelete(id, resp, req) default: @@ -100,15 +105,20 @@ func (s *HTTPServer) csiVolumeGet(id string, resp http.ResponseWriter, req *http return nil, CodedError(404, "volume not found") } + vol := structsCSIVolumeToApi(out.Volume) + // remove sensitive fields, as our redaction mechanism doesn't // help serializing here - out.Volume.Secrets = nil - out.Volume.MountOptions = nil - return out.Volume, nil + vol.Secrets = nil + vol.MountOptions = nil + + return vol, nil } -func (s *HTTPServer) csiVolumePut(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { - if req.Method != "PUT" { +func (s *HTTPServer) csiVolumePut(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + switch req.Method { + case http.MethodPost, http.MethodPut: + default: return nil, CodedError(405, ErrInvalidMethod) } @@ -133,7 +143,7 @@ func (s *HTTPServer) csiVolumePut(id string, resp http.ResponseWriter, req *http } func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { - if req.Method != "DELETE" { + if req.Method != http.MethodDelete { return nil, CodedError(405, ErrInvalidMethod) } @@ -193,7 +203,7 @@ func (s *HTTPServer) csiVolumeDetach(id string, resp http.ResponseWriter, req *h // CSIPluginsRequest lists CSI plugins func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - if req.Method != "GET" { + if req.Method != http.MethodGet { return nil, CodedError(405, ErrInvalidMethod) } @@ -225,7 +235,7 @@ func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Reque // CSIPluginSpecificRequest list the job with CSIInfo func (s *HTTPServer) CSIPluginSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - if req.Method != "GET" { + if req.Method != http.MethodGet { return nil, CodedError(405, ErrInvalidMethod) } @@ -252,5 +262,294 @@ func (s *HTTPServer) CSIPluginSpecificRequest(resp http.ResponseWriter, req *htt return nil, CodedError(404, "plugin not found") } - return out.Plugin, nil + return structsCSIPluginToApi(out.Plugin), nil +} + +// structsCSIPluginToApi converts CSIPlugin, setting Expected the count of known plugin +// instances +func structsCSIPluginToApi(plug *structs.CSIPlugin) *api.CSIPlugin { + out := &api.CSIPlugin{ + ID: plug.ID, + Provider: plug.Provider, + Version: plug.Version, + Allocations: make([]*api.AllocationListStub, len(plug.Allocations)), + ControllerRequired: plug.ControllerRequired, + ControllersHealthy: plug.ControllersHealthy, + ControllersExpected: len(plug.Controllers), + Controllers: make(map[string]*api.CSIInfo), + NodesHealthy: plug.NodesHealthy, + NodesExpected: len(plug.Nodes), + Nodes: make(map[string]*api.CSIInfo), + CreateIndex: plug.CreateIndex, + ModifyIndex: plug.ModifyIndex, + } + + for k, v := range plug.Controllers { + out.Controllers[k] = structsCSIInfoToApi(v) + } + + for k, v := range plug.Nodes { + out.Nodes[k] = structsCSIInfoToApi(v) + } + + for _, a := range plug.Allocations { + out.Allocations = append(out.Allocations, structsAllocListStubToApi(a)) + } + + return out +} + +// structsCSIVolumeToApi converts CSIVolume, creating the allocation array +func structsCSIVolumeToApi(vol *structs.CSIVolume) *api.CSIVolume { + out := &api.CSIVolume{ + ID: vol.ID, + Name: vol.Name, + ExternalID: vol.ExternalID, + Namespace: vol.Namespace, + Topologies: structsCSITopolgiesToApi(vol.Topologies), + AccessMode: structsCSIAccessModeToApi(vol.AccessMode), + AttachmentMode: structsCSIAttachmentModeToApi(vol.AttachmentMode), + MountOptions: structsCSIMountOptionsToApi(vol.MountOptions), + Secrets: structsCSISecretsToApi(vol.Secrets), + Parameters: vol.Parameters, + Context: vol.Context, + + // Allocations is the collapsed list of both read and write allocs + Allocations: []*api.AllocationListStub{}, + + Schedulable: vol.Schedulable, + PluginID: vol.PluginID, + Provider: vol.Provider, + ProviderVersion: vol.ProviderVersion, + ControllerRequired: vol.ControllerRequired, + ControllersHealthy: vol.ControllersHealthy, + ControllersExpected: vol.ControllersExpected, + NodesHealthy: vol.NodesHealthy, + NodesExpected: vol.NodesExpected, + ResourceExhausted: vol.ResourceExhausted, + CreateIndex: vol.CreateIndex, + ModifyIndex: vol.ModifyIndex, + } + + for _, a := range vol.WriteAllocs { + out.Allocations = append(out.Allocations, structsAllocListStubToApi(a.Stub())) + } + + for _, a := range vol.ReadAllocs { + out.Allocations = append(out.Allocations, structsAllocListStubToApi(a.Stub())) + } + + return out +} + +// structsCSIInfoToApi converts CSIInfo, part of CSIPlugin +func structsCSIInfoToApi(info *structs.CSIInfo) *api.CSIInfo { + out := &api.CSIInfo{ + PluginID: info.PluginID, + Healthy: info.Healthy, + HealthDescription: info.HealthDescription, + UpdateTime: info.UpdateTime, + RequiresControllerPlugin: info.RequiresControllerPlugin, + RequiresTopologies: info.RequiresTopologies, + } + + if info.ControllerInfo != nil { + out.ControllerInfo = &api.CSIControllerInfo{ + SupportsReadOnlyAttach: info.ControllerInfo.SupportsReadOnlyAttach, + SupportsAttachDetach: info.ControllerInfo.SupportsAttachDetach, + SupportsListVolumes: info.ControllerInfo.SupportsListVolumes, + SupportsListVolumesAttachedNodes: info.ControllerInfo.SupportsListVolumesAttachedNodes, + } + } + + if info.NodeInfo != nil { + out.NodeInfo = &api.CSINodeInfo{ + ID: info.NodeInfo.ID, + MaxVolumes: info.NodeInfo.MaxVolumes, + RequiresNodeStageVolume: info.NodeInfo.RequiresNodeStageVolume, + } + + if info.NodeInfo.AccessibleTopology != nil { + out.NodeInfo.AccessibleTopology = &api.CSITopology{} + out.NodeInfo.AccessibleTopology.Segments = info.NodeInfo.AccessibleTopology.Segments + } + } + + return out +} + +// structsAllocListStubToApi converts AllocListStub, for CSIPlugin +func structsAllocListStubToApi(alloc *structs.AllocListStub) *api.AllocationListStub { + out := &api.AllocationListStub{ + ID: alloc.ID, + EvalID: alloc.EvalID, + Name: alloc.Name, + Namespace: alloc.Namespace, + NodeID: alloc.NodeID, + NodeName: alloc.NodeName, + JobID: alloc.JobID, + JobType: alloc.JobType, + JobVersion: alloc.JobVersion, + TaskGroup: alloc.TaskGroup, + DesiredStatus: alloc.DesiredStatus, + DesiredDescription: alloc.DesiredDescription, + ClientStatus: alloc.ClientStatus, + ClientDescription: alloc.ClientDescription, + FollowupEvalID: alloc.FollowupEvalID, + PreemptedAllocations: alloc.PreemptedAllocations, + PreemptedByAllocation: alloc.PreemptedByAllocation, + CreateIndex: alloc.CreateIndex, + ModifyIndex: alloc.ModifyIndex, + CreateTime: alloc.CreateTime, + ModifyTime: alloc.ModifyTime, + } + + out.DeploymentStatus = structsAllocDeploymentStatusToApi(alloc.DeploymentStatus) + out.RescheduleTracker = structsRescheduleTrackerToApi(alloc.RescheduleTracker) + + for k, v := range alloc.TaskStates { + out.TaskStates[k] = structsTaskStateToApi(v) + } + + return out +} + +// structsAllocDeploymentStatusToApi converts RescheduleTracker, part of AllocListStub +func structsAllocDeploymentStatusToApi(ads *structs.AllocDeploymentStatus) *api.AllocDeploymentStatus { + out := &api.AllocDeploymentStatus{ + Healthy: ads.Healthy, + Timestamp: ads.Timestamp, + Canary: ads.Canary, + ModifyIndex: ads.ModifyIndex, + } + return out +} + +// structsRescheduleTrackerToApi converts RescheduleTracker, part of AllocListStub +func structsRescheduleTrackerToApi(rt *structs.RescheduleTracker) *api.RescheduleTracker { + out := &api.RescheduleTracker{} + + for _, e := range rt.Events { + out.Events = append(out.Events, &api.RescheduleEvent{ + RescheduleTime: e.RescheduleTime, + PrevAllocID: e.PrevAllocID, + PrevNodeID: e.PrevNodeID, + }) + } + + return out +} + +// structsTaskStateToApi converts TaskState, part of AllocListStub +func structsTaskStateToApi(ts *structs.TaskState) *api.TaskState { + out := &api.TaskState{ + State: ts.State, + Failed: ts.Failed, + Restarts: ts.Restarts, + LastRestart: ts.LastRestart, + StartedAt: ts.StartedAt, + FinishedAt: ts.FinishedAt, + } + + for _, te := range ts.Events { + out.Events = append(out.Events, structsTaskEventToApi(te)) + } + + return out +} + +// structsTaskEventToApi converts TaskEvents, part of AllocListStub +func structsTaskEventToApi(te *structs.TaskEvent) *api.TaskEvent { + out := &api.TaskEvent{ + Type: te.Type, + Time: te.Time, + DisplayMessage: te.DisplayMessage, + Details: te.Details, + + // DEPRECATION NOTICE: The following fields are all deprecated. see TaskEvent struct in structs.go for details. + FailsTask: te.FailsTask, + RestartReason: te.RestartReason, + SetupError: te.SetupError, + DriverError: te.DriverError, + DriverMessage: te.DriverMessage, + ExitCode: te.ExitCode, + Signal: te.Signal, + Message: te.Message, + KillReason: te.KillReason, + KillTimeout: te.KillTimeout, + KillError: te.KillError, + StartDelay: te.StartDelay, + DownloadError: te.DownloadError, + ValidationError: te.ValidationError, + DiskLimit: te.DiskLimit, + FailedSibling: te.FailedSibling, + VaultError: te.VaultError, + TaskSignalReason: te.TaskSignalReason, + TaskSignal: te.TaskSignal, + GenericSource: te.GenericSource, + } + + return out +} + +// structsCSITopolgiesToApi converts topologies, part of structsCSIVolumeToApi +func structsCSITopolgiesToApi(tops []*structs.CSITopology) []*api.CSITopology { + out := make([]*api.CSITopology, 0, len(tops)) + for _, t := range tops { + out = append(out, &api.CSITopology{ + Segments: t.Segments, + }) + } + + return out +} + +// structsCSIAccessModeToApi converts access mode, part of structsCSIVolumeToApi +func structsCSIAccessModeToApi(mode structs.CSIVolumeAccessMode) api.CSIVolumeAccessMode { + switch mode { + case structs.CSIVolumeAccessModeSingleNodeReader: + return api.CSIVolumeAccessModeSingleNodeReader + case structs.CSIVolumeAccessModeSingleNodeWriter: + return api.CSIVolumeAccessModeSingleNodeWriter + case structs.CSIVolumeAccessModeMultiNodeReader: + return api.CSIVolumeAccessModeMultiNodeReader + case structs.CSIVolumeAccessModeMultiNodeSingleWriter: + return api.CSIVolumeAccessModeMultiNodeSingleWriter + case structs.CSIVolumeAccessModeMultiNodeMultiWriter: + return api.CSIVolumeAccessModeMultiNodeMultiWriter + default: + } + return api.CSIVolumeAccessModeUnknown +} + +// structsCSIAttachmentModeToApiModeToApi converts attachment mode, part of structsCSIVolumeToApi +func structsCSIAttachmentModeToApi(mode structs.CSIVolumeAttachmentMode) api.CSIVolumeAttachmentMode { + switch mode { + case structs.CSIVolumeAttachmentModeBlockDevice: + return api.CSIVolumeAttachmentModeBlockDevice + case structs.CSIVolumeAttachmentModeFilesystem: + return api.CSIVolumeAttachmentModeFilesystem + default: + } + return api.CSIVolumeAttachmentModeUnknown +} + +// structsCSIMountOptionsToApi converts mount options, part of structsCSIVolumeToApi +func structsCSIMountOptionsToApi(opts *structs.CSIMountOptions) *api.CSIMountOptions { + if opts == nil { + return nil + } + + return &api.CSIMountOptions{ + FSType: opts.FSType, + MountFlags: opts.MountFlags, + } +} + +func structsCSISecretsToApi(secrets structs.CSISecrets) api.CSISecrets { + out := make(api.CSISecrets, len(secrets)) + for k, v := range secrets { + out[k] = v + } + return out } diff --git a/command/agent/csi_endpoint_test.go b/command/agent/csi_endpoint_test.go new file mode 100644 index 000000000..ef5262f89 --- /dev/null +++ b/command/agent/csi_endpoint_test.go @@ -0,0 +1,101 @@ +package agent + +import ( + "bytes" + "net/http" + "net/http/httptest" + "testing" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/kr/pretty" + "github.com/stretchr/testify/require" +) + +func TestHTTP_CSIEndpointPlugin(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + server := s.Agent.Server() + cleanup := state.CreateTestCSIPlugin(server.State(), "foo") + defer cleanup() + + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("GET", "/v1/plugin/csi/foo", body) + require.NoError(t, err) + + resp := httptest.NewRecorder() + obj, err := s.Server.CSIPluginSpecificRequest(resp, req) + require.NoError(t, err) + require.Equal(t, 200, resp.Code) + + out, ok := obj.(*api.CSIPlugin) + require.True(t, ok) + + require.Equal(t, 1, out.ControllersExpected) + require.Equal(t, 1, out.ControllersHealthy) + require.Len(t, out.Controllers, 1) + + require.Equal(t, 2, out.NodesExpected) + require.Equal(t, 2, out.NodesHealthy) + require.Len(t, out.Nodes, 2) + }) +} + +func TestHTTP_CSIEndpointUtils(t *testing.T) { + secrets := structsCSISecretsToApi(structs.CSISecrets{ + "foo": "bar", + }) + + require.Equal(t, "bar", secrets["foo"]) + + tops := structsCSITopolgiesToApi([]*structs.CSITopology{{ + Segments: map[string]string{"foo": "bar"}, + }}) + + require.Equal(t, "bar", tops[0].Segments["foo"]) +} + +func TestHTTP_CSIEndpointVolume(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + server := s.Agent.Server() + cleanup := state.CreateTestCSIPluginNodeOnly(server.State(), "foo") + defer cleanup() + + args := structs.CSIVolumeRegisterRequest{ + Volumes: []*structs.CSIVolume{{ + ID: "bar", + PluginID: "foo", + AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }}, + } + body := encodeReq(args) + req, err := http.NewRequest("PUT", "/v1/volumes", body) + require.NoError(t, err) + + resp := httptest.NewRecorder() + _, err = s.Server.CSIVolumesRequest(resp, req) + require.NoError(t, err, "put error") + require.Equal(t, 200, resp.Code) + + req, err = http.NewRequest("GET", "/v1/volume/csi/bar", nil) + require.NoError(t, err) + + resp = httptest.NewRecorder() + raw, err := s.Server.CSIVolumeSpecificRequest(resp, req) + require.NoError(t, err, "get error") + require.Equal(t, 200, resp.Code) + + out, ok := raw.(*api.CSIVolume) + require.True(t, ok) + + pretty.Log(out) + + require.Equal(t, 1, out.ControllersExpected) + require.Equal(t, 1, out.ControllersHealthy) + require.Equal(t, 2, out.NodesExpected) + require.Equal(t, 2, out.NodesHealthy) + }) +} diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 353c8de1d..de069e409 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -24,14 +24,27 @@ func TestStateStore(t testing.T) *StateStore { return state } -// CreateTestPlugin is a helper that generates the node + fingerprint results necessary to -// create a CSIPlugin by directly inserting into the state store. It's exported for use in -// other test packages +// CreateTestCSIPlugin is a helper that generates the node + fingerprint results necessary +// to create a CSIPlugin by directly inserting into the state store. The plugin requires a +// controller. func CreateTestCSIPlugin(s *StateStore, id string) func() { + return createTestCSIPlugin(s, id, true) +} + +// CreateTestCSIPluginNodeOnly is a helper that generates the node + fingerprint results +// necessary to create a CSIPlugin by directly inserting into the state store. The plugin +// does not require a controller. In tests that exercise volume registration, this prevents +// an error attempting to RPC the node. +func CreateTestCSIPluginNodeOnly(s *StateStore, id string) func() { + return createTestCSIPlugin(s, id, false) +} + +func createTestCSIPlugin(s *StateStore, id string, requiresController bool) func() { // Create some nodes ns := make([]*structs.Node, 3) for i := range ns { n := mock.Node() + n.Attributes["nomad.version"] = "0.11.0" ns[i] = n } @@ -42,7 +55,7 @@ func CreateTestCSIPlugin(s *StateStore, id string) func() { AllocID: uuid.Generate(), Healthy: true, HealthDescription: "healthy", - RequiresControllerPlugin: true, + RequiresControllerPlugin: requiresController, RequiresTopologies: false, ControllerInfo: &structs.CSIControllerInfo{ SupportsReadOnlyAttach: true, @@ -61,7 +74,7 @@ func CreateTestCSIPlugin(s *StateStore, id string) func() { AllocID: uuid.Generate(), Healthy: true, HealthDescription: "healthy", - RequiresControllerPlugin: true, + RequiresControllerPlugin: requiresController, RequiresTopologies: false, NodeInfo: &structs.CSINodeInfo{ ID: n.ID,